package syncjob import ( "context" "fmt" "log" "tercul/internal/models" ) // SyncAllEdges syncs all edges by enqueueing batch jobs. func (s *SyncJob) SyncAllEdges(ctx context.Context) error { log.Println("Enqueueing edge sync jobs...") var count int64 if err := s.DB.Model(&models.Edge{}).Count(&count).Error; err != nil { return fmt.Errorf("error counting edges: %w", err) } batchSize := 100 for offset := 0; offset < int(count); offset += batchSize { if err := EnqueueEdgeSync(s.AsynqClient, batchSize, offset); err != nil { log.Printf("Error enqueueing edge sync job (offset %d): %v", offset, err) } else { log.Printf("Enqueued edge sync job (offset %d, batch size %d)", offset, batchSize) } } log.Println("Edge sync jobs enqueued successfully.") return nil } // SyncEdgesBatch syncs a batch of edges. func (s *SyncJob) SyncEdgesBatch(ctx context.Context, batchSize, offset int) error { log.Printf("Syncing edges batch (offset %d, batch size %d)...", offset, batchSize) var edges []models.Edge if err := s.DB.Limit(batchSize).Offset(offset).Find(&edges).Error; err != nil { return fmt.Errorf("error fetching edges batch: %w", err) } // Convert edges to map format for batch processing var edgeMaps []map[string]interface{} for _, edge := range edges { edgeMap := map[string]interface{}{ "id": edge.ID, "sourceTable": edge.SourceTable, "sourceID": edge.SourceID, "targetTable": edge.TargetTable, "targetID": edge.TargetID, "relation": edge.Relation, "language": edge.Language, "extra": edge.Extra, } edgeMaps = append(edgeMaps, edgeMap) } batchProcessor := NewBatchProcessor(s.DB) return batchProcessor.CreateObjectsBatch(ctx, "Edge", edgeMaps) }