mirror of
https://github.com/SamyRai/tercul-backend.git
synced 2025-12-27 05:11:34 +00:00
3.3 KiB
3.3 KiB
Sync Job Package
This package handles data synchronization between the database and Weaviate vector database using background job processing.
Architecture Overview
The sync job package has been refactored to eliminate code duplication and improve maintainability by following the Single Responsibility Principle and DRY principles.
Key Components
1. Types (types.go)
- Centralized type definitions and constants
- Task payload structures
- Default configuration values
2. Batch Processor (batch_processor.go)
- Handles batch processing of entities for sync operations
- Uses the existing global Weaviate client (
weaviate.Client) - Provides consistent error handling and logging
- Supports configurable batch sizes
3. Task Handlers (task_handlers.go)
- Generic payload unmarshaling using Go generics
- Simplified handler functions with reduced duplication
- Consistent error handling patterns
4. Queue Management (queue.go)
- Generic task enqueueing function
- Consistent delay configuration
- Centralized logging
5. Entity Sync (entities_sync.go)
- Simplified entity synchronization using the batch processor
- Removed duplicate Weaviate client creation logic
- Cleaner separation of concerns
6. Edge Sync (edges_sync.go)
- Refactored to use the batch processor pattern
- Consistent with entity sync approach
- Better error handling
Refactoring Improvements
Before Refactoring
- Duplicate Weaviate client creation in multiple files
- Hardcoded batch sizes scattered throughout the code
- Repeated error handling patterns in each sync function
- Manual JSON unmarshaling in each task handler
- Duplicate task enqueueing logic with similar patterns
After Refactoring
- Single Weaviate client provided via dependency injection
- Centralized batch processing with configurable sizes
- Generic payload handling using Go generics
- Consistent error handling across all sync operations
- DRY task enqueueing with reusable functions
Usage
Creating a Sync Job
syncJob := syncjob.NewSyncJob(db, asynqClient)
Running Full Sync
err := syncJob.RunFullSync(ctx)
Enqueueing Individual Tasks
// Enqueue entity sync
err := syncjob.EnqueueEntitySync(asynqClient, "Work")
// Enqueue edge sync
err := syncjob.EnqueueEdgeSync(asynqClient, 100, 0)
// Enqueue full sync
err := syncjob.EnqueueFullSync(asynqClient)
Registering Handlers
syncjob.RegisterQueueHandlers(server, syncJob)
Configuration
Batch sizes and delays are configurable through:
- Environment variables (via
config.Cfg.BatchSize) - Default constants in
types.go - Individual task delays for different operation types
Dependencies
- Database: Uses GORM for database operations
- Weaviate: Uses the
WeaviateWrapperinterface, which is provided via dependency injection. - Background Jobs: Uses Asynq for task queue management
- Configuration: Uses the application's config package
Error Handling
The refactored code provides:
- Graceful degradation: Continues processing other batches if one fails
- Detailed logging: Comprehensive error messages with context
- Batch-level error aggregation: Reports total errors per batch
- Consistent error propagation: Standardized error handling patterns