# Sync Job Package This package handles data synchronization between the database and Weaviate vector database using background job processing. ## Architecture Overview The sync job package has been refactored to eliminate code duplication and improve maintainability by following the Single Responsibility Principle and DRY principles. ### Key Components #### 1. **Types** (`types.go`) - Centralized type definitions and constants - Task payload structures - Default configuration values #### 2. **Batch Processor** (`batch_processor.go`) - Handles batch processing of entities for sync operations - Uses the existing global Weaviate client (`weaviate.Client`) - Provides consistent error handling and logging - Supports configurable batch sizes #### 3. **Task Handlers** (`task_handlers.go`) - Generic payload unmarshaling using Go generics - Simplified handler functions with reduced duplication - Consistent error handling patterns #### 4. **Queue Management** (`queue.go`) - Generic task enqueueing function - Consistent delay configuration - Centralized logging #### 5. **Entity Sync** (`entities_sync.go`) - Simplified entity synchronization using the batch processor - Removed duplicate Weaviate client creation logic - Cleaner separation of concerns #### 6. **Edge Sync** (`edges_sync.go`) - Refactored to use the batch processor pattern - Consistent with entity sync approach - Better error handling ## Refactoring Improvements ### Before Refactoring - **Duplicate Weaviate client creation** in multiple files - **Hardcoded batch sizes** scattered throughout the code - **Repeated error handling patterns** in each sync function - **Manual JSON unmarshaling** in each task handler - **Duplicate task enqueueing logic** with similar patterns ### After Refactoring - **Single Weaviate client** provided via dependency injection - **Centralized batch processing** with configurable sizes - **Generic payload handling** using Go generics - **Consistent error handling** across all sync operations - **DRY task enqueueing** with reusable functions ## Usage ### Creating a Sync Job ```go syncJob := syncjob.NewSyncJob(db, asynqClient) ``` ### Running Full Sync ```go err := syncJob.RunFullSync(ctx) ``` ### Enqueueing Individual Tasks ```go // Enqueue entity sync err := syncjob.EnqueueEntitySync(asynqClient, "Work") // Enqueue edge sync err := syncjob.EnqueueEdgeSync(asynqClient, 100, 0) // Enqueue full sync err := syncjob.EnqueueFullSync(asynqClient) ``` ### Registering Handlers ```go syncjob.RegisterQueueHandlers(server, syncJob) ``` ## Configuration Batch sizes and delays are configurable through: - Environment variables (via `config.Cfg.BatchSize`) - Default constants in `types.go` - Individual task delays for different operation types ## Dependencies - **Database**: Uses GORM for database operations - **Weaviate**: Uses the `WeaviateWrapper` interface, which is provided via dependency injection. - **Background Jobs**: Uses Asynq for task queue management - **Configuration**: Uses the application's config package ## Error Handling The refactored code provides: - **Graceful degradation**: Continues processing other batches if one fails - **Detailed logging**: Comprehensive error messages with context - **Batch-level error aggregation**: Reports total errors per batch - **Consistent error propagation**: Standardized error handling patterns