mirror of
https://github.com/SamyRai/tercul-backend.git
synced 2025-12-27 05:11:34 +00:00
This commit marks the completion of a major refactoring effort to stabilize the codebase, improve its structure, and prepare it for production. The key changes include: - **Domain Layer Consolidation:** The `Work` entity and its related types, along with all other domain entities and repository interfaces, have been consolidated into the main `internal/domain` package. This eliminates import cycles and provides a single, coherent source of truth for the domain model. - **Data Access Layer Refactoring:** The repository implementations in `internal/data/sql` have been updated to align with the new domain layer. The `BaseRepositoryImpl` has been corrected to use pointer receivers, and all concrete repositories now correctly embed it, ensuring consistent and correct behavior. - **Application Layer Stabilization:** All application services in `internal/app` have been updated to use the new domain types and repository interfaces. Dependency injection has been corrected throughout the application, ensuring that all services are initialized with the correct dependencies. - **GraphQL Adapter Fixes:** The GraphQL resolver implementation in `internal/adapters/graphql` has been updated to correctly handle the new domain types and service methods. The auto-generated GraphQL code has been regenerated to ensure it is in sync with the schema and runtime. - **Test Suite Overhaul:** All test suites have been fixed to correctly implement their respective interfaces and use the updated domain model. Mock repositories and test suites have been corrected to properly embed the `testify` base types, resolving numerous build and linter errors. - **Dependency Management:** The Go modules have been tidied, and the module cache has been cleaned to ensure a consistent and correct dependency graph. - **Code Quality and Verification:** The entire codebase now passes all builds, tests, and linter checks, ensuring a high level of quality and stability. This comprehensive effort has resulted in a more robust, maintainable, and production-ready application.
161 lines
4.8 KiB
Go
161 lines
4.8 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"gorm.io/gorm"
|
|
"log"
|
|
"strings"
|
|
"tercul/internal/platform/config"
|
|
|
|
"github.com/weaviate/weaviate-go-client/v5/weaviate"
|
|
)
|
|
|
|
// BatchProcessor handles batch processing of entities for sync operations
|
|
type BatchProcessor struct {
|
|
db *gorm.DB
|
|
defaultBatchSize int
|
|
weaviateClient *weaviate.Client
|
|
}
|
|
|
|
// NewBatchProcessor creates a new BatchProcessor
|
|
func NewBatchProcessor(db *gorm.DB, cfg *config.Config, weaviateClient *weaviate.Client) *BatchProcessor {
|
|
batchSize := cfg.BatchSize
|
|
if batchSize <= 0 {
|
|
batchSize = DefaultBatchSize
|
|
}
|
|
|
|
return &BatchProcessor{
|
|
db: db,
|
|
defaultBatchSize: batchSize,
|
|
weaviateClient: weaviateClient,
|
|
}
|
|
}
|
|
|
|
// ProcessEntitiesBatch processes a batch of entities from a given table
|
|
func (bp *BatchProcessor) ProcessEntitiesBatch(ctx context.Context, className string, batchSize, offset int) error {
|
|
log.Printf("Processing %s batch (offset %d, batch size %d)...", className, offset, batchSize)
|
|
|
|
var results []map[string]interface{}
|
|
tableName := toSnakeCase(className)
|
|
|
|
if err := bp.db.Table(tableName).Limit(batchSize).Offset(offset).Find(&results).Error; err != nil {
|
|
return fmt.Errorf("error fetching %s batch: %w", className, err)
|
|
}
|
|
|
|
if len(results) == 0 {
|
|
log.Printf("No %s records found for batch (offset %d)", className, offset)
|
|
return nil
|
|
}
|
|
|
|
return bp.CreateObjectsBatch(ctx, className, results)
|
|
}
|
|
|
|
// ProcessAllEntities processes all entities of a given type in batches
|
|
func (bp *BatchProcessor) ProcessAllEntities(ctx context.Context, className string) error {
|
|
log.Printf("Processing all %s entities...", className)
|
|
|
|
var count int64
|
|
tableName := toSnakeCase(className)
|
|
if err := bp.db.Table(tableName).Count(&count).Error; err != nil {
|
|
return fmt.Errorf("error counting %s: %w", className, err)
|
|
}
|
|
|
|
if count == 0 {
|
|
log.Printf("No %s entities found", className)
|
|
return nil
|
|
}
|
|
|
|
for offset := 0; offset < int(count); offset += bp.defaultBatchSize {
|
|
if err := bp.ProcessEntitiesBatch(ctx, className, bp.defaultBatchSize, offset); err != nil {
|
|
log.Printf("Error processing %s batch (offset %d): %v", className, offset, err)
|
|
// Continue with next batch instead of failing completely
|
|
}
|
|
}
|
|
|
|
log.Printf("Completed processing all %s entities", className)
|
|
return nil
|
|
}
|
|
|
|
// GetBatchSize returns the configured batch size
|
|
func (bp *BatchProcessor) GetBatchSize() int {
|
|
return bp.defaultBatchSize
|
|
}
|
|
|
|
// toSnakeCase converts a class name from CamelCase to snake_case
|
|
func toSnakeCase(str string) string {
|
|
// Handle special cases for known entity types
|
|
specialCases := map[string]string{
|
|
"Work": "work",
|
|
"Translation": "translation",
|
|
"Author": "author",
|
|
"Media": "media",
|
|
"Category": "category",
|
|
"TopicCluster": "topic_cluster",
|
|
"LanguageAnalysis": "language_analysis",
|
|
"WritingStyle": "writing_style",
|
|
"LanguageEntity": "language_entity",
|
|
"LinguisticLayer": "linguistic_layer",
|
|
"EditorialWorkflow": "editorial_workflow",
|
|
"WorkStats": "work_stats",
|
|
"TranslationStats": "translation_stats",
|
|
"MediaStats": "media_stats",
|
|
"UserStats": "user_stats",
|
|
"BookStats": "book_stats",
|
|
"CollectionStats": "collection_stats",
|
|
"TextMetadata": "text_metadata",
|
|
"PoeticAnalysis": "poetic_analysis",
|
|
"HybridEntityWork": "hybrid_entity_work",
|
|
"CopyrightClaim": "copyright_claim",
|
|
}
|
|
|
|
if tableName, exists := specialCases[str]; exists {
|
|
return tableName
|
|
}
|
|
|
|
// For other cases, convert CamelCase to snake_case
|
|
var result string
|
|
for i, r := range str {
|
|
if i > 0 && 'A' <= r && r <= 'Z' {
|
|
result += "_"
|
|
}
|
|
result += string(r)
|
|
}
|
|
return strings.ToLower(result)
|
|
}
|
|
|
|
// CreateObjectsBatch creates multiple objects in Weaviate using the existing client
|
|
func (bp *BatchProcessor) CreateObjectsBatch(ctx context.Context, className string, objects []map[string]interface{}) error {
|
|
var errors []error
|
|
|
|
for _, record := range objects {
|
|
objID := fmt.Sprintf("%v", record["id"])
|
|
if err := bp.createObject(ctx, className, objID, record); err != nil {
|
|
log.Printf("Error syncing %s ID %s: %v", className, objID, err)
|
|
errors = append(errors, err)
|
|
}
|
|
}
|
|
|
|
if len(errors) > 0 {
|
|
return fmt.Errorf("batch sync completed with %d errors", len(errors))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createObject creates a single object in Weaviate using the injected client.
|
|
func (bp *BatchProcessor) createObject(ctx context.Context, className, objID string, properties map[string]interface{}) error {
|
|
_, err := bp.weaviateClient.Data().Creator().
|
|
WithClassName(className).
|
|
WithID(objID).
|
|
WithProperties(properties).
|
|
Do(ctx)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create %s object with ID %s: %w", className, objID, err)
|
|
}
|
|
|
|
log.Printf("Successfully synced %s ID %s", className, objID)
|
|
return nil
|
|
}
|