tercul-backend/internal/jobs/sync
2025-09-02 15:02:04 +00:00
..
.keep I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
batch_processor.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
edges_sync.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
entities_sync.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
queue.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
README.md I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
syncjob.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
task_handlers.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00
types.go I have refactored the background jobs by moving all related logic from the syncjob/, linguistics/, and internal/enrich directories into the new internal/jobs/sync and internal/jobs/linguistics packages. I have also updated their package declarations to be consistent with their new locations. 2025-09-02 15:02:04 +00:00

Sync Job Package

This package handles data synchronization between the database and Weaviate vector database using background job processing.

Architecture Overview

The sync job package has been refactored to eliminate code duplication and improve maintainability by following the Single Responsibility Principle and DRY principles.

Key Components

1. Types (types.go)

  • Centralized type definitions and constants
  • Task payload structures
  • Default configuration values

2. Batch Processor (batch_processor.go)

  • Handles batch processing of entities for sync operations
  • Uses the existing global Weaviate client (weaviate.Client)
  • Provides consistent error handling and logging
  • Supports configurable batch sizes

3. Task Handlers (task_handlers.go)

  • Generic payload unmarshaling using Go generics
  • Simplified handler functions with reduced duplication
  • Consistent error handling patterns

4. Queue Management (queue.go)

  • Generic task enqueueing function
  • Consistent delay configuration
  • Centralized logging

5. Entity Sync (entities_sync.go)

  • Simplified entity synchronization using the batch processor
  • Removed duplicate Weaviate client creation logic
  • Cleaner separation of concerns

6. Edge Sync (edges_sync.go)

  • Refactored to use the batch processor pattern
  • Consistent with entity sync approach
  • Better error handling

Refactoring Improvements

Before Refactoring

  • Duplicate Weaviate client creation in multiple files
  • Hardcoded batch sizes scattered throughout the code
  • Repeated error handling patterns in each sync function
  • Manual JSON unmarshaling in each task handler
  • Duplicate task enqueueing logic with similar patterns

After Refactoring

  • Single Weaviate client using the existing global weaviate.Client
  • Centralized batch processing with configurable sizes
  • Generic payload handling using Go generics
  • Consistent error handling across all sync operations
  • DRY task enqueueing with reusable functions

Usage

Creating a Sync Job

syncJob := syncjob.NewSyncJob(db, asynqClient)

Running Full Sync

err := syncJob.RunFullSync(ctx)

Enqueueing Individual Tasks

// Enqueue entity sync
err := syncjob.EnqueueEntitySync(asynqClient, "Work")

// Enqueue edge sync
err := syncjob.EnqueueEdgeSync(asynqClient, 100, 0)

// Enqueue full sync
err := syncjob.EnqueueFullSync(asynqClient)

Registering Handlers

syncjob.RegisterQueueHandlers(server, syncJob)

Configuration

Batch sizes and delays are configurable through:

  • Environment variables (via config.Cfg.BatchSize)
  • Default constants in types.go
  • Individual task delays for different operation types

Dependencies

  • Database: Uses GORM for database operations
  • Weaviate: Uses the global weaviate.Client singleton
  • Background Jobs: Uses Asynq for task queue management
  • Configuration: Uses the application's config package

Error Handling

The refactored code provides:

  • Graceful degradation: Continues processing other batches if one fails
  • Detailed logging: Comprehensive error messages with context
  • Batch-level error aggregation: Reports total errors per batch
  • Consistent error propagation: Standardized error handling patterns