package commands import ( "context" "encoding/json" "fmt" "os" "path/filepath" "strconv" "time" "github.com/blevesearch/bleve/v2" "github.com/spf13/cobra" "tercul/internal/data/sql" "tercul/internal/domain" "tercul/internal/platform/config" "tercul/internal/platform/db" "tercul/internal/platform/log" ) const ( // Default batch size for processing translations defaultBatchSize = 50000 // Checkpoint file to track progress checkpointFile = ".bleve_migration_checkpoint" ) type checkpoint struct { LastProcessedID uint `json:"last_processed_id"` TotalProcessed int `json:"total_processed"` LastUpdated time.Time `json:"last_updated"` } // NewBleveMigrateCommand creates a new Cobra command for Bleve migration func NewBleveMigrateCommand() *cobra.Command { var ( indexPath string batchSize int resume bool verify bool ) cmd := &cobra.Command{ Use: "bleve-migrate", Short: "Migrate translations from PostgreSQL to Bleve index", Long: `Migrate all translations from PostgreSQL database to a Bleve search index. This command: - Fetches all translations from the database - Indexes them in batches for efficient processing - Supports resuming from checkpoints - Provides progress tracking - Can verify indexed data after migration Example: tercul bleve-migrate --index ./data/bleve_index --batch 50000 --verify`, RunE: func(cmd *cobra.Command, args []string) error { if indexPath == "" { return fmt.Errorf("index path is required (use --index flag)") } // Initialize logger log.Init("bleve-migrate", "development") logger := log.FromContext(context.Background()) // Load configuration cfg, err := config.LoadConfig() if err != nil { return fmt.Errorf("failed to load config: %w", err) } // Initialize database database, err := db.InitDB(cfg, nil) if err != nil { return fmt.Errorf("failed to initialize database: %w", err) } defer func() { if err := db.Close(database); err != nil { logger.Error(err, "Error closing database") } }() // Create repositories repos := sql.NewRepositories(database, cfg) // Initialize or open Bleve index logger.Info(fmt.Sprintf("Initializing Bleve index at %s", indexPath)) index, err := initBleveIndex(indexPath) if err != nil { return fmt.Errorf("failed to initialize Bleve index: %w", err) } defer func() { if err := index.Close(); err != nil { logger.Error(err, "Error closing Bleve index") } }() // Load checkpoint if resuming var cp *checkpoint if resume { cp = loadCheckpoint() if cp != nil { logger.Info(fmt.Sprintf("Resuming from checkpoint: last_id=%d, total_processed=%d", cp.LastProcessedID, cp.TotalProcessed)) } } // Run migration ctx := context.Background() stats, err := migrateTranslations(ctx, repos.Translation, index, batchSize, cp, logger, ctx) if err != nil { return fmt.Errorf("migration failed: %w", err) } logger.Info(fmt.Sprintf("Migration completed: indexed=%d, errors=%d, duration=%v", stats.TotalIndexed, stats.TotalErrors, stats.Duration)) // Verify if requested if verify { logger.Info("Verifying indexed translations") if err := verifyIndex(index, repos.Translation, logger, ctx); err != nil { return fmt.Errorf("verification failed: %w", err) } logger.Info("Verification completed successfully") } // Clean up checkpoint file if err := os.Remove(checkpointFile); err != nil && !os.IsNotExist(err) { logger.Warn(fmt.Sprintf("Failed to remove checkpoint file: %v", err)) } return nil }, } // Add flags cmd.Flags().StringVarP(&indexPath, "index", "i", "", "Path to Bleve index directory (required)") cmd.Flags().IntVarP(&batchSize, "batch", "b", defaultBatchSize, "Batch size for processing translations") cmd.Flags().BoolVarP(&resume, "resume", "r", false, "Resume from last checkpoint") cmd.Flags().BoolVarP(&verify, "verify", "v", false, "Verify indexed translations after migration") // Mark index as required _ = cmd.MarkFlagRequired("index") return cmd } // initBleveIndex creates or opens a Bleve index with the appropriate mapping for translations func initBleveIndex(indexPath string) (bleve.Index, error) { // Check if index already exists index, err := bleve.Open(indexPath) if err == nil { return index, nil } // Index doesn't exist, create it mapping := bleve.NewIndexMapping() // Create document mapping for Translation translationMapping := bleve.NewDocumentMapping() // ID field (not analyzed, stored) idMapping := bleve.NewTextFieldMapping() idMapping.Store = true idMapping.Index = true idMapping.Analyzer = "keyword" translationMapping.AddFieldMappingsAt("id", idMapping) // Title field (analyzed, stored) titleMapping := bleve.NewTextFieldMapping() titleMapping.Store = true titleMapping.Index = true titleMapping.Analyzer = "standard" translationMapping.AddFieldMappingsAt("title", titleMapping) // Content field (analyzed, stored) contentMapping := bleve.NewTextFieldMapping() contentMapping.Store = true contentMapping.Index = true contentMapping.Analyzer = "standard" translationMapping.AddFieldMappingsAt("content", contentMapping) // Description field (analyzed, stored) descriptionMapping := bleve.NewTextFieldMapping() descriptionMapping.Store = true descriptionMapping.Index = true descriptionMapping.Analyzer = "standard" translationMapping.AddFieldMappingsAt("description", descriptionMapping) // Language field (not analyzed, stored, for filtering) languageMapping := bleve.NewTextFieldMapping() languageMapping.Store = true languageMapping.Index = true languageMapping.Analyzer = "keyword" translationMapping.AddFieldMappingsAt("language", languageMapping) // Status field (not analyzed, stored, for filtering) statusMapping := bleve.NewTextFieldMapping() statusMapping.Store = true statusMapping.Index = true statusMapping.Analyzer = "keyword" translationMapping.AddFieldMappingsAt("status", statusMapping) // TranslatableID field (not analyzed, stored) translatableIDMapping := bleve.NewNumericFieldMapping() translatableIDMapping.Store = true translatableIDMapping.Index = true translationMapping.AddFieldMappingsAt("translatable_id", translatableIDMapping) // TranslatableType field (not analyzed, stored, for filtering) translatableTypeMapping := bleve.NewTextFieldMapping() translatableTypeMapping.Store = true translatableTypeMapping.Index = true translatableTypeMapping.Analyzer = "keyword" translationMapping.AddFieldMappingsAt("translatable_type", translatableTypeMapping) // TranslatorID field (not analyzed, stored) translatorIDMapping := bleve.NewNumericFieldMapping() translatorIDMapping.Store = true translatorIDMapping.Index = true translationMapping.AddFieldMappingsAt("translator_id", translatorIDMapping) // Add translation mapping to index mapping.AddDocumentMapping("translation", translationMapping) // Create index directory if it doesn't exist if err := os.MkdirAll(filepath.Dir(indexPath), 0755); err != nil { return nil, fmt.Errorf("failed to create index directory: %w", err) } // Create the index index, err = bleve.New(indexPath, mapping) if err != nil { return nil, fmt.Errorf("failed to create Bleve index: %w", err) } return index, nil } type migrationStats struct { TotalIndexed int TotalErrors int Duration time.Duration } // migrateTranslations migrates all translations from PostgreSQL to Bleve index func migrateTranslations( ctx context.Context, repo domain.TranslationRepository, index bleve.Index, batchSize int, cp *checkpoint, logger *log.Logger, ctxForLog context.Context, ) (*migrationStats, error) { startTime := time.Now() stats := &migrationStats{} // Fetch all translations logger.Info("Fetching all translations from database") translations, err := repo.ListAll(ctx) if err != nil { return nil, fmt.Errorf("failed to fetch translations: %w", err) } totalTranslations := len(translations) logger.Info(fmt.Sprintf("Found %d translations", totalTranslations)) // Filter translations if resuming from checkpoint if cp != nil && cp.LastProcessedID > 0 { filtered := make([]domain.Translation, 0, len(translations)) for _, t := range translations { if t.ID > cp.LastProcessedID { filtered = append(filtered, t) } } translations = filtered stats.TotalIndexed = cp.TotalProcessed logger.Info(fmt.Sprintf("Filtered translations: remaining=%d, already_processed=%d", len(translations), cp.TotalProcessed)) } // Process translations in batches batch := make([]domain.Translation, 0, batchSize) lastProcessedID := uint(0) for i, translation := range translations { batch = append(batch, translation) lastProcessedID = translation.ID // Process batch when it reaches the batch size or at the end if len(batch) >= batchSize || i == len(translations)-1 { if err := indexBatch(index, batch, logger); err != nil { logger.Error(err, fmt.Sprintf("Failed to index batch of size %d", len(batch))) stats.TotalErrors += len(batch) // Continue with next batch instead of failing completely } else { stats.TotalIndexed += len(batch) } // Save checkpoint cpData := checkpoint{ LastProcessedID: lastProcessedID, TotalProcessed: stats.TotalIndexed, LastUpdated: time.Now(), } if err := saveCheckpoint(&cpData); err != nil { logger.Warn(fmt.Sprintf("Failed to save checkpoint: %v", err)) } // Log progress progress := float64(stats.TotalIndexed) / float64(totalTranslations) * 100 logger.Info(fmt.Sprintf("Migration progress: %d/%d (%.2f%%)", stats.TotalIndexed, totalTranslations, progress)) // Clear batch batch = batch[:0] } } stats.Duration = time.Since(startTime) return stats, nil } // indexBatch indexes a batch of translations func indexBatch(index bleve.Index, translations []domain.Translation, logger *log.Logger) error { batch := index.NewBatch() for _, t := range translations { doc := map[string]interface{}{ "id": strconv.FormatUint(uint64(t.ID), 10), "title": t.Title, "content": t.Content, "description": t.Description, "language": t.Language, "status": string(t.Status), "translatable_id": t.TranslatableID, "translatable_type": t.TranslatableType, } if t.TranslatorID != nil { doc["translator_id"] = *t.TranslatorID } docID := fmt.Sprintf("translation_%d", t.ID) if err := batch.Index(docID, doc); err != nil { return fmt.Errorf("failed to add document to batch: %w", err) } } if err := index.Batch(batch); err != nil { return fmt.Errorf("failed to index batch: %w", err) } return nil } // verifyIndex verifies that all translations in the database are indexed in Bleve func verifyIndex(index bleve.Index, repo domain.TranslationRepository, logger *log.Logger, ctx context.Context) error { // Fetch all translations translations, err := repo.ListAll(ctx) if err != nil { return fmt.Errorf("failed to fetch translations: %w", err) } logger.Info(fmt.Sprintf("Verifying %d indexed translations", len(translations))) missing := 0 for _, t := range translations { docID := fmt.Sprintf("translation_%d", t.ID) doc, err := index.Document(docID) if err != nil { logger.Warn(fmt.Sprintf("Translation %d not found in index: %v", t.ID, err)) missing++ continue } if doc == nil { logger.Warn(fmt.Sprintf("Translation %d not found in index (nil document)", t.ID)) missing++ continue } } if missing > 0 { return fmt.Errorf("verification failed: %d translations missing from index", missing) } logger.Info("All translations verified in index") return nil } // saveCheckpoint saves the migration checkpoint to a file func saveCheckpoint(cp *checkpoint) error { data, err := json.Marshal(cp) if err != nil { return fmt.Errorf("failed to marshal checkpoint: %w", err) } if err := os.WriteFile(checkpointFile, data, 0644); err != nil { return fmt.Errorf("failed to write checkpoint file: %w", err) } return nil } // loadCheckpoint loads the migration checkpoint from a file func loadCheckpoint() *checkpoint { data, err := os.ReadFile(checkpointFile) if err != nil { return nil } var cp checkpoint if err := json.Unmarshal(data, &cp); err != nil { return nil } return &cp }