mirror of
https://github.com/SamyRai/tercul-backend.git
synced 2025-12-27 04:01:34 +00:00
This commit addresses several high-priority tasks from the TASKS.md file, including: - **Fix Background Job Panic:** Replaced `log.Fatalf` with `log.Printf` in the `asynq` server to prevent crashes. - **Refactor API Server Setup:** Consolidated the GraphQL Playground and Prometheus metrics endpoints into the main API server. - **Implement `DeleteUser` Mutation:** Implemented the `DeleteUser` resolver. - **Implement `CreateContribution` Mutation:** Implemented the `CreateContribution` resolver and its required application service. Additionally, this commit includes a major refactoring of the configuration management system to fix a broken build. The global `config.Cfg` variable has been removed and replaced with a dependency injection approach, where the configuration object is passed to all components that require it. This change has been applied across the entire codebase, including the test suite, to ensure a stable and testable application.
158 lines
4.6 KiB
Go
158 lines
4.6 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"gorm.io/gorm"
|
|
"log"
|
|
"strings"
|
|
"tercul/internal/platform/config"
|
|
"tercul/internal/platform/search"
|
|
)
|
|
|
|
// BatchProcessor handles batch processing of entities for sync operations
|
|
type BatchProcessor struct {
|
|
db *gorm.DB
|
|
defaultBatchSize int
|
|
}
|
|
|
|
// NewBatchProcessor creates a new BatchProcessor
|
|
func NewBatchProcessor(db *gorm.DB, cfg *config.Config) *BatchProcessor {
|
|
batchSize := cfg.BatchSize
|
|
if batchSize <= 0 {
|
|
batchSize = DefaultBatchSize
|
|
}
|
|
|
|
return &BatchProcessor{
|
|
db: db,
|
|
defaultBatchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
// ProcessEntitiesBatch processes a batch of entities from a given table
|
|
func (bp *BatchProcessor) ProcessEntitiesBatch(ctx context.Context, className string, batchSize, offset int) error {
|
|
log.Printf("Processing %s batch (offset %d, batch size %d)...", className, offset, batchSize)
|
|
|
|
var results []map[string]interface{}
|
|
tableName := toSnakeCase(className)
|
|
|
|
if err := bp.db.Table(tableName).Limit(batchSize).Offset(offset).Find(&results).Error; err != nil {
|
|
return fmt.Errorf("error fetching %s batch: %w", className, err)
|
|
}
|
|
|
|
if len(results) == 0 {
|
|
log.Printf("No %s records found for batch (offset %d)", className, offset)
|
|
return nil
|
|
}
|
|
|
|
return bp.CreateObjectsBatch(ctx, className, results)
|
|
}
|
|
|
|
// ProcessAllEntities processes all entities of a given type in batches
|
|
func (bp *BatchProcessor) ProcessAllEntities(ctx context.Context, className string) error {
|
|
log.Printf("Processing all %s entities...", className)
|
|
|
|
var count int64
|
|
tableName := toSnakeCase(className)
|
|
if err := bp.db.Table(tableName).Count(&count).Error; err != nil {
|
|
return fmt.Errorf("error counting %s: %w", className, err)
|
|
}
|
|
|
|
if count == 0 {
|
|
log.Printf("No %s entities found", className)
|
|
return nil
|
|
}
|
|
|
|
for offset := 0; offset < int(count); offset += bp.defaultBatchSize {
|
|
if err := bp.ProcessEntitiesBatch(ctx, className, bp.defaultBatchSize, offset); err != nil {
|
|
log.Printf("Error processing %s batch (offset %d): %v", className, offset, err)
|
|
// Continue with next batch instead of failing completely
|
|
}
|
|
}
|
|
|
|
log.Printf("Completed processing all %s entities", className)
|
|
return nil
|
|
}
|
|
|
|
// GetBatchSize returns the configured batch size
|
|
func (bp *BatchProcessor) GetBatchSize() int {
|
|
return bp.defaultBatchSize
|
|
}
|
|
|
|
// toSnakeCase converts a class name from CamelCase to snake_case
|
|
func toSnakeCase(str string) string {
|
|
// Handle special cases for known entity types
|
|
specialCases := map[string]string{
|
|
"Work": "work",
|
|
"Translation": "translation",
|
|
"Author": "author",
|
|
"Media": "media",
|
|
"Category": "category",
|
|
"TopicCluster": "topic_cluster",
|
|
"LanguageAnalysis": "language_analysis",
|
|
"WritingStyle": "writing_style",
|
|
"LanguageEntity": "language_entity",
|
|
"LinguisticLayer": "linguistic_layer",
|
|
"EditorialWorkflow": "editorial_workflow",
|
|
"WorkStats": "work_stats",
|
|
"TranslationStats": "translation_stats",
|
|
"MediaStats": "media_stats",
|
|
"UserStats": "user_stats",
|
|
"BookStats": "book_stats",
|
|
"CollectionStats": "collection_stats",
|
|
"TextMetadata": "text_metadata",
|
|
"PoeticAnalysis": "poetic_analysis",
|
|
"HybridEntityWork": "hybrid_entity_work",
|
|
"CopyrightClaim": "copyright_claim",
|
|
}
|
|
|
|
if tableName, exists := specialCases[str]; exists {
|
|
return tableName
|
|
}
|
|
|
|
// For other cases, convert CamelCase to snake_case
|
|
var result string
|
|
for i, r := range str {
|
|
if i > 0 && 'A' <= r && r <= 'Z' {
|
|
result += "_"
|
|
}
|
|
result += string(r)
|
|
}
|
|
return strings.ToLower(result)
|
|
}
|
|
|
|
// CreateObjectsBatch creates multiple objects in Weaviate using the existing client
|
|
func (bp *BatchProcessor) CreateObjectsBatch(ctx context.Context, className string, objects []map[string]interface{}) error {
|
|
var errors []error
|
|
|
|
for _, record := range objects {
|
|
objID := fmt.Sprintf("%v", record["id"])
|
|
if err := bp.createObject(ctx, className, objID, record); err != nil {
|
|
log.Printf("Error syncing %s ID %s: %v", className, objID, err)
|
|
errors = append(errors, err)
|
|
}
|
|
}
|
|
|
|
if len(errors) > 0 {
|
|
return fmt.Errorf("batch sync completed with %d errors", len(errors))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createObject creates a single object in Weaviate using the existing client
|
|
func (bp *BatchProcessor) createObject(ctx context.Context, className, objID string, properties map[string]interface{}) error {
|
|
_, err := search.Client.Data().Creator().
|
|
WithClassName(className).
|
|
WithID(objID).
|
|
WithProperties(properties).
|
|
Do(ctx)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create %s object with ID %s: %w", className, objID, err)
|
|
}
|
|
|
|
log.Printf("Successfully synced %s ID %s", className, objID)
|
|
return nil
|
|
}
|