package sync import ( "context" "fmt" "gorm.io/gorm" "log" "strings" "tercul/internal/platform/config" "github.com/weaviate/weaviate-go-client/v5/weaviate" ) // BatchProcessor handles batch processing of entities for sync operations type BatchProcessor struct { db *gorm.DB defaultBatchSize int weaviateClient *weaviate.Client } // NewBatchProcessor creates a new BatchProcessor func NewBatchProcessor(db *gorm.DB, cfg *config.Config, weaviateClient *weaviate.Client) *BatchProcessor { batchSize := cfg.BatchSize if batchSize <= 0 { batchSize = DefaultBatchSize } return &BatchProcessor{ db: db, defaultBatchSize: batchSize, weaviateClient: weaviateClient, } } // ProcessEntitiesBatch processes a batch of entities from a given table func (bp *BatchProcessor) ProcessEntitiesBatch(ctx context.Context, className string, batchSize, offset int) error { log.Printf("Processing %s batch (offset %d, batch size %d)...", className, offset, batchSize) var results []map[string]interface{} tableName := toSnakeCase(className) if err := bp.db.Table(tableName).Limit(batchSize).Offset(offset).Find(&results).Error; err != nil { return fmt.Errorf("error fetching %s batch: %w", className, err) } if len(results) == 0 { log.Printf("No %s records found for batch (offset %d)", className, offset) return nil } return bp.CreateObjectsBatch(ctx, className, results) } // ProcessAllEntities processes all entities of a given type in batches func (bp *BatchProcessor) ProcessAllEntities(ctx context.Context, className string) error { log.Printf("Processing all %s entities...", className) var count int64 tableName := toSnakeCase(className) if err := bp.db.Table(tableName).Count(&count).Error; err != nil { return fmt.Errorf("error counting %s: %w", className, err) } if count == 0 { log.Printf("No %s entities found", className) return nil } for offset := 0; offset < int(count); offset += bp.defaultBatchSize { if err := bp.ProcessEntitiesBatch(ctx, className, bp.defaultBatchSize, offset); err != nil { log.Printf("Error processing %s batch (offset %d): %v", className, offset, err) // Continue with next batch instead of failing completely } } log.Printf("Completed processing all %s entities", className) return nil } // GetBatchSize returns the configured batch size func (bp *BatchProcessor) GetBatchSize() int { return bp.defaultBatchSize } // toSnakeCase converts a class name from CamelCase to snake_case func toSnakeCase(str string) string { // Handle special cases for known entity types specialCases := map[string]string{ "Work": "work", "Translation": "translation", "Author": "author", "Media": "media", "Category": "category", "TopicCluster": "topic_cluster", "LanguageAnalysis": "language_analysis", "WritingStyle": "writing_style", "LanguageEntity": "language_entity", "LinguisticLayer": "linguistic_layer", "EditorialWorkflow": "editorial_workflow", "WorkStats": "work_stats", "TranslationStats": "translation_stats", "MediaStats": "media_stats", "UserStats": "user_stats", "BookStats": "book_stats", "CollectionStats": "collection_stats", "TextMetadata": "text_metadata", "PoeticAnalysis": "poetic_analysis", "HybridEntityWork": "hybrid_entity_work", "CopyrightClaim": "copyright_claim", } if tableName, exists := specialCases[str]; exists { return tableName } // For other cases, convert CamelCase to snake_case var result string for i, r := range str { if i > 0 && 'A' <= r && r <= 'Z' { result += "_" } result += string(r) } return strings.ToLower(result) } // CreateObjectsBatch creates multiple objects in Weaviate using the existing client func (bp *BatchProcessor) CreateObjectsBatch(ctx context.Context, className string, objects []map[string]interface{}) error { var errors []error for _, record := range objects { objID := fmt.Sprintf("%v", record["id"]) if err := bp.createObject(ctx, className, objID, record); err != nil { log.Printf("Error syncing %s ID %s: %v", className, objID, err) errors = append(errors, err) } } if len(errors) > 0 { return fmt.Errorf("batch sync completed with %d errors", len(errors)) } return nil } // createObject creates a single object in Weaviate using the injected client. func (bp *BatchProcessor) createObject(ctx context.Context, className, objID string, properties map[string]interface{}) error { _, err := bp.weaviateClient.Data().Creator(). WithClassName(className). WithID(objID). WithProperties(properties). Do(ctx) if err != nil { return fmt.Errorf("failed to create %s object with ID %s: %w", className, objID, err) } log.Printf("Successfully synced %s ID %s", className, objID) return nil }