tercul-backend/internal/jobs/sync/batch_processor.go
google-labs-jules[bot] fa90dd79da feat: Complete large-scale refactor and prepare for production
This commit marks the completion of a major refactoring effort to stabilize the codebase, improve its structure, and prepare it for production.

The key changes include:

- **Domain Layer Consolidation:** The `Work` entity and its related types, along with all other domain entities and repository interfaces, have been consolidated into the main `internal/domain` package. This eliminates import cycles and provides a single, coherent source of truth for the domain model.

- **Data Access Layer Refactoring:** The repository implementations in `internal/data/sql` have been updated to align with the new domain layer. The `BaseRepositoryImpl` has been corrected to use pointer receivers, and all concrete repositories now correctly embed it, ensuring consistent and correct behavior.

- **Application Layer Stabilization:** All application services in `internal/app` have been updated to use the new domain types and repository interfaces. Dependency injection has been corrected throughout the application, ensuring that all services are initialized with the correct dependencies.

- **GraphQL Adapter Fixes:** The GraphQL resolver implementation in `internal/adapters/graphql` has been updated to correctly handle the new domain types and service methods. The auto-generated GraphQL code has been regenerated to ensure it is in sync with the schema and runtime.

- **Test Suite Overhaul:** All test suites have been fixed to correctly implement their respective interfaces and use the updated domain model. Mock repositories and test suites have been corrected to properly embed the `testify` base types, resolving numerous build and linter errors.

- **Dependency Management:** The Go modules have been tidied, and the module cache has been cleaned to ensure a consistent and correct dependency graph.

- **Code Quality and Verification:** The entire codebase now passes all builds, tests, and linter checks, ensuring a high level of quality and stability.

This comprehensive effort has resulted in a more robust, maintainable, and production-ready application.
2025-10-07 11:09:37 +00:00

161 lines
4.8 KiB
Go

package sync
import (
"context"
"fmt"
"gorm.io/gorm"
"log"
"strings"
"tercul/internal/platform/config"
"github.com/weaviate/weaviate-go-client/v5/weaviate"
)
// BatchProcessor handles batch processing of entities for sync operations
type BatchProcessor struct {
db *gorm.DB
defaultBatchSize int
weaviateClient *weaviate.Client
}
// NewBatchProcessor creates a new BatchProcessor
func NewBatchProcessor(db *gorm.DB, cfg *config.Config, weaviateClient *weaviate.Client) *BatchProcessor {
batchSize := cfg.BatchSize
if batchSize <= 0 {
batchSize = DefaultBatchSize
}
return &BatchProcessor{
db: db,
defaultBatchSize: batchSize,
weaviateClient: weaviateClient,
}
}
// ProcessEntitiesBatch processes a batch of entities from a given table
func (bp *BatchProcessor) ProcessEntitiesBatch(ctx context.Context, className string, batchSize, offset int) error {
log.Printf("Processing %s batch (offset %d, batch size %d)...", className, offset, batchSize)
var results []map[string]interface{}
tableName := toSnakeCase(className)
if err := bp.db.Table(tableName).Limit(batchSize).Offset(offset).Find(&results).Error; err != nil {
return fmt.Errorf("error fetching %s batch: %w", className, err)
}
if len(results) == 0 {
log.Printf("No %s records found for batch (offset %d)", className, offset)
return nil
}
return bp.CreateObjectsBatch(ctx, className, results)
}
// ProcessAllEntities processes all entities of a given type in batches
func (bp *BatchProcessor) ProcessAllEntities(ctx context.Context, className string) error {
log.Printf("Processing all %s entities...", className)
var count int64
tableName := toSnakeCase(className)
if err := bp.db.Table(tableName).Count(&count).Error; err != nil {
return fmt.Errorf("error counting %s: %w", className, err)
}
if count == 0 {
log.Printf("No %s entities found", className)
return nil
}
for offset := 0; offset < int(count); offset += bp.defaultBatchSize {
if err := bp.ProcessEntitiesBatch(ctx, className, bp.defaultBatchSize, offset); err != nil {
log.Printf("Error processing %s batch (offset %d): %v", className, offset, err)
// Continue with next batch instead of failing completely
}
}
log.Printf("Completed processing all %s entities", className)
return nil
}
// GetBatchSize returns the configured batch size
func (bp *BatchProcessor) GetBatchSize() int {
return bp.defaultBatchSize
}
// toSnakeCase converts a class name from CamelCase to snake_case
func toSnakeCase(str string) string {
// Handle special cases for known entity types
specialCases := map[string]string{
"Work": "work",
"Translation": "translation",
"Author": "author",
"Media": "media",
"Category": "category",
"TopicCluster": "topic_cluster",
"LanguageAnalysis": "language_analysis",
"WritingStyle": "writing_style",
"LanguageEntity": "language_entity",
"LinguisticLayer": "linguistic_layer",
"EditorialWorkflow": "editorial_workflow",
"WorkStats": "work_stats",
"TranslationStats": "translation_stats",
"MediaStats": "media_stats",
"UserStats": "user_stats",
"BookStats": "book_stats",
"CollectionStats": "collection_stats",
"TextMetadata": "text_metadata",
"PoeticAnalysis": "poetic_analysis",
"HybridEntityWork": "hybrid_entity_work",
"CopyrightClaim": "copyright_claim",
}
if tableName, exists := specialCases[str]; exists {
return tableName
}
// For other cases, convert CamelCase to snake_case
var result string
for i, r := range str {
if i > 0 && 'A' <= r && r <= 'Z' {
result += "_"
}
result += string(r)
}
return strings.ToLower(result)
}
// CreateObjectsBatch creates multiple objects in Weaviate using the existing client
func (bp *BatchProcessor) CreateObjectsBatch(ctx context.Context, className string, objects []map[string]interface{}) error {
var errors []error
for _, record := range objects {
objID := fmt.Sprintf("%v", record["id"])
if err := bp.createObject(ctx, className, objID, record); err != nil {
log.Printf("Error syncing %s ID %s: %v", className, objID, err)
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fmt.Errorf("batch sync completed with %d errors", len(errors))
}
return nil
}
// createObject creates a single object in Weaviate using the injected client.
func (bp *BatchProcessor) createObject(ctx context.Context, className, objID string, properties map[string]interface{}) error {
_, err := bp.weaviateClient.Data().Creator().
WithClassName(className).
WithID(objID).
WithProperties(properties).
Do(ctx)
if err != nil {
return fmt.Errorf("failed to create %s object with ID %s: %w", className, objID, err)
}
log.Printf("Successfully synced %s ID %s", className, objID)
return nil
}