package linguistics import ( "context" "encoding/json" "fmt" "log" "tercul/internal/domain" "time" "github.com/google/uuid" "github.com/hibiken/asynq" "gorm.io/gorm" ) const ( TaskLinguisticAnalysis = "analysis:linguistic" ) // LinguisticSyncJob manages the linguistic analysis sync process type LinguisticSyncJob struct { DB *gorm.DB Analyzer Analyzer Client *asynq.Client } // NewLinguisticSyncJob creates a new LinguisticSyncJob func NewLinguisticSyncJob(db *gorm.DB, analyzer Analyzer, client *asynq.Client) *LinguisticSyncJob { return &LinguisticSyncJob{ DB: db, Analyzer: analyzer, Client: client, } } // AnalysisPayload contains data for the linguistic analysis task type AnalysisPayload struct { WorkID uuid.UUID `json:"work_id"` } // EnqueueAnalysisForWork enqueues a linguistic analysis task for a specific work func EnqueueAnalysisForWork(client *asynq.Client, workID uuid.UUID) error { payload := AnalysisPayload{WorkID: workID} data, err := json.Marshal(payload) if err != nil { return err } task := asynq.NewTask(TaskLinguisticAnalysis, data) _, err = client.Enqueue(task, asynq.ProcessIn(5*time.Second)) if err != nil { return err } log.Printf("Enqueued linguistic analysis task for work ID %d", workID) return nil } // EnqueueAnalysisForAllWorks enqueues linguistic analysis tasks for all works func (j *LinguisticSyncJob) EnqueueAnalysisForAllWorks() error { log.Println("Enqueueing linguistic analysis jobs for all works...") var workIDs []uuid.UUID if err := j.DB.Model(&domain.Work{}).Pluck("id", &workIDs).Error; err != nil { return fmt.Errorf("error fetching work IDs: %w", err) } for _, workID := range workIDs { if err := EnqueueAnalysisForWork(j.Client, workID); err != nil { log.Printf("Error enqueueing linguistic analysis for work ID %s: %v", workID, err) } else { log.Printf("Enqueued linguistic analysis for work ID %s", workID) } } log.Println("Linguistic analysis jobs enqueued successfully.") return nil } // HandleLinguisticAnalysis handles the linguistic analysis task func (j *LinguisticSyncJob) HandleLinguisticAnalysis(ctx context.Context, t *asynq.Task) error { var payload AnalysisPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("failed to unmarshal linguistic analysis payload: %v", err) } log.Printf("Processing linguistic analysis for work ID %d", payload.WorkID) // Check if analysis already exists var count int64 if err := j.DB.Model(&domain.LanguageAnalysis{}).Where("work_id = ?", payload.WorkID).Count(&count).Error; err != nil { return fmt.Errorf("error checking existing analysis: %w", err) } // Skip if analysis already exists if count > 0 { log.Printf("Linguistic analysis already exists for work ID %d, skipping", payload.WorkID) return nil } // Perform the analysis if err := j.Analyzer.AnalyzeWork(ctx, payload.WorkID); err != nil { return fmt.Errorf("error analyzing work ID %d: %w", payload.WorkID, err) } log.Printf("Completed linguistic analysis for work ID %d", payload.WorkID) return nil } // RegisterLinguisticHandlers registers the linguistic analysis task handlers func RegisterLinguisticHandlers(mux *asynq.ServeMux, job *LinguisticSyncJob) { mux.HandleFunc(TaskLinguisticAnalysis, job.HandleLinguisticAnalysis) }