mirror of
https://github.com/SamyRai/tercul-backend.git
synced 2025-12-27 00:31:35 +00:00
* docs: Update TASKS.md and PRODUCTION-TASKS.md to reflect current codebase state (December 2024 audit) * refactor: Unify all commands into a single Cobra CLI - Refactor cmd/api/main.go into 'tercul serve' command - Refactor cmd/worker/main.go into 'tercul worker' command - Refactor cmd/tools/enrich/main.go into 'tercul enrich' command - Add 'tercul bleve-migrate' command for Bleve index migration - Extract common initialization logic into cmd/cli/internal/bootstrap - Update Dockerfile to build unified CLI - Update README with new CLI usage This consolidates all entry points into a single, maintainable CLI structure. * fix: Fix CodeQL workflow and add comprehensive test coverage - Fix Go version mismatch by setting up Go before CodeQL init - Add Go version verification step - Improve error handling for code scanning upload - Add comprehensive test suite for CLI commands: - Bleve migration tests with in-memory indexes - Edge case tests (empty data, large batches, errors) - Command-level integration tests - Bootstrap initialization tests - Optimize tests to use in-memory Bleve indexes for speed - Add test tags for skipping slow tests in short mode - Update workflow documentation Test coverage: 18.1% with 806 lines of test code All tests passing in short mode * fix: Fix test workflow and Bleve test double-close panic - Add POSTGRES_USER to PostgreSQL service configuration in test workflow - Fix TestInitBleveIndex double-close panic by removing defer before explicit close - Test now passes successfully Fixes failing Unit Tests workflow in PR #64
416 lines
12 KiB
Go
416 lines
12 KiB
Go
package commands
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
"tercul/internal/data/sql"
|
|
"tercul/internal/domain"
|
|
"tercul/internal/platform/config"
|
|
"tercul/internal/platform/db"
|
|
"tercul/internal/platform/log"
|
|
|
|
"github.com/blevesearch/bleve/v2"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
const (
|
|
// Default batch size for processing translations
|
|
defaultBatchSize = 50000
|
|
// Checkpoint file to track progress
|
|
checkpointFile = ".bleve_migration_checkpoint"
|
|
)
|
|
|
|
type checkpoint struct {
|
|
LastProcessedID uint `json:"last_processed_id"`
|
|
TotalProcessed int `json:"total_processed"`
|
|
LastUpdated time.Time `json:"last_updated"`
|
|
}
|
|
|
|
// NewBleveMigrateCommand creates a new Cobra command for Bleve migration
|
|
func NewBleveMigrateCommand() *cobra.Command {
|
|
var (
|
|
indexPath string
|
|
batchSize int
|
|
resume bool
|
|
verify bool
|
|
)
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "bleve-migrate",
|
|
Short: "Migrate translations from PostgreSQL to Bleve index",
|
|
Long: `Migrate all translations from PostgreSQL database to a Bleve search index.
|
|
|
|
This command:
|
|
- Fetches all translations from the database
|
|
- Indexes them in batches for efficient processing
|
|
- Supports resuming from checkpoints
|
|
- Provides progress tracking
|
|
- Can verify indexed data after migration
|
|
|
|
Example:
|
|
tercul bleve-migrate --index ./data/bleve_index --batch 50000 --verify`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
if indexPath == "" {
|
|
return fmt.Errorf("index path is required (use --index flag)")
|
|
}
|
|
|
|
// Initialize logger
|
|
log.Init("bleve-migrate", "development")
|
|
logger := log.FromContext(context.Background())
|
|
|
|
// Load configuration
|
|
cfg, err := config.LoadConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
// Initialize database
|
|
database, err := db.InitDB(cfg, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize database: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := db.Close(database); err != nil {
|
|
logger.Error(err, "Error closing database")
|
|
}
|
|
}()
|
|
|
|
// Create repositories
|
|
repos := sql.NewRepositories(database, cfg)
|
|
|
|
// Initialize or open Bleve index
|
|
logger.Info(fmt.Sprintf("Initializing Bleve index at %s", indexPath))
|
|
index, err := initBleveIndex(indexPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize Bleve index: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := index.Close(); err != nil {
|
|
logger.Error(err, "Error closing Bleve index")
|
|
}
|
|
}()
|
|
|
|
// Load checkpoint if resuming
|
|
var cp *checkpoint
|
|
if resume {
|
|
cp = loadCheckpoint()
|
|
if cp != nil {
|
|
logger.Info(fmt.Sprintf("Resuming from checkpoint: last_id=%d, total_processed=%d", cp.LastProcessedID, cp.TotalProcessed))
|
|
}
|
|
}
|
|
|
|
// Run migration
|
|
ctx := context.Background()
|
|
stats, err := migrateTranslations(ctx, repos.Translation, index, batchSize, cp, logger, ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("migration failed: %w", err)
|
|
}
|
|
|
|
logger.Info(fmt.Sprintf("Migration completed: indexed=%d, errors=%d, duration=%v", stats.TotalIndexed, stats.TotalErrors, stats.Duration))
|
|
|
|
// Verify if requested
|
|
if verify {
|
|
logger.Info("Verifying indexed translations")
|
|
if err := verifyIndex(index, repos.Translation, logger, ctx); err != nil {
|
|
return fmt.Errorf("verification failed: %w", err)
|
|
}
|
|
logger.Info("Verification completed successfully")
|
|
}
|
|
|
|
// Clean up checkpoint file
|
|
if err := os.Remove(checkpointFile); err != nil && !os.IsNotExist(err) {
|
|
logger.Warn(fmt.Sprintf("Failed to remove checkpoint file: %v", err))
|
|
}
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
// Add flags
|
|
cmd.Flags().StringVarP(&indexPath, "index", "i", "", "Path to Bleve index directory (required)")
|
|
cmd.Flags().IntVarP(&batchSize, "batch", "b", defaultBatchSize, "Batch size for processing translations")
|
|
cmd.Flags().BoolVarP(&resume, "resume", "r", false, "Resume from last checkpoint")
|
|
cmd.Flags().BoolVarP(&verify, "verify", "v", false, "Verify indexed translations after migration")
|
|
|
|
// Mark index as required
|
|
_ = cmd.MarkFlagRequired("index")
|
|
|
|
return cmd
|
|
}
|
|
|
|
// initBleveIndex creates or opens a Bleve index with the appropriate mapping for translations
|
|
func initBleveIndex(indexPath string) (bleve.Index, error) {
|
|
// Check if index already exists
|
|
index, err := bleve.Open(indexPath)
|
|
if err == nil {
|
|
return index, nil
|
|
}
|
|
|
|
// Index doesn't exist, create it
|
|
mapping := bleve.NewIndexMapping()
|
|
|
|
// Create document mapping for Translation
|
|
translationMapping := bleve.NewDocumentMapping()
|
|
|
|
// ID field (not analyzed, stored)
|
|
idMapping := bleve.NewTextFieldMapping()
|
|
idMapping.Store = true
|
|
idMapping.Index = true
|
|
idMapping.Analyzer = "keyword"
|
|
translationMapping.AddFieldMappingsAt("id", idMapping)
|
|
|
|
// Title field (analyzed, stored)
|
|
titleMapping := bleve.NewTextFieldMapping()
|
|
titleMapping.Store = true
|
|
titleMapping.Index = true
|
|
titleMapping.Analyzer = "standard"
|
|
translationMapping.AddFieldMappingsAt("title", titleMapping)
|
|
|
|
// Content field (analyzed, stored)
|
|
contentMapping := bleve.NewTextFieldMapping()
|
|
contentMapping.Store = true
|
|
contentMapping.Index = true
|
|
contentMapping.Analyzer = "standard"
|
|
translationMapping.AddFieldMappingsAt("content", contentMapping)
|
|
|
|
// Description field (analyzed, stored)
|
|
descriptionMapping := bleve.NewTextFieldMapping()
|
|
descriptionMapping.Store = true
|
|
descriptionMapping.Index = true
|
|
descriptionMapping.Analyzer = "standard"
|
|
translationMapping.AddFieldMappingsAt("description", descriptionMapping)
|
|
|
|
// Language field (not analyzed, stored, for filtering)
|
|
languageMapping := bleve.NewTextFieldMapping()
|
|
languageMapping.Store = true
|
|
languageMapping.Index = true
|
|
languageMapping.Analyzer = "keyword"
|
|
translationMapping.AddFieldMappingsAt("language", languageMapping)
|
|
|
|
// Status field (not analyzed, stored, for filtering)
|
|
statusMapping := bleve.NewTextFieldMapping()
|
|
statusMapping.Store = true
|
|
statusMapping.Index = true
|
|
statusMapping.Analyzer = "keyword"
|
|
translationMapping.AddFieldMappingsAt("status", statusMapping)
|
|
|
|
// TranslatableID field (not analyzed, stored)
|
|
translatableIDMapping := bleve.NewNumericFieldMapping()
|
|
translatableIDMapping.Store = true
|
|
translatableIDMapping.Index = true
|
|
translationMapping.AddFieldMappingsAt("translatable_id", translatableIDMapping)
|
|
|
|
// TranslatableType field (not analyzed, stored, for filtering)
|
|
translatableTypeMapping := bleve.NewTextFieldMapping()
|
|
translatableTypeMapping.Store = true
|
|
translatableTypeMapping.Index = true
|
|
translatableTypeMapping.Analyzer = "keyword"
|
|
translationMapping.AddFieldMappingsAt("translatable_type", translatableTypeMapping)
|
|
|
|
// TranslatorID field (not analyzed, stored)
|
|
translatorIDMapping := bleve.NewNumericFieldMapping()
|
|
translatorIDMapping.Store = true
|
|
translatorIDMapping.Index = true
|
|
translationMapping.AddFieldMappingsAt("translator_id", translatorIDMapping)
|
|
|
|
// Add translation mapping to index
|
|
mapping.AddDocumentMapping("translation", translationMapping)
|
|
|
|
// Create index directory if it doesn't exist
|
|
if err := os.MkdirAll(filepath.Dir(indexPath), 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create index directory: %w", err)
|
|
}
|
|
|
|
// Create the index
|
|
index, err = bleve.New(indexPath, mapping)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Bleve index: %w", err)
|
|
}
|
|
|
|
return index, nil
|
|
}
|
|
|
|
type migrationStats struct {
|
|
TotalIndexed int
|
|
TotalErrors int
|
|
Duration time.Duration
|
|
}
|
|
|
|
// migrateTranslations migrates all translations from PostgreSQL to Bleve index
|
|
func migrateTranslations(
|
|
ctx context.Context,
|
|
repo domain.TranslationRepository,
|
|
index bleve.Index,
|
|
batchSize int,
|
|
cp *checkpoint,
|
|
logger *log.Logger,
|
|
ctxForLog context.Context,
|
|
) (*migrationStats, error) {
|
|
startTime := time.Now()
|
|
stats := &migrationStats{}
|
|
|
|
// Fetch all translations
|
|
logger.Info("Fetching all translations from database")
|
|
translations, err := repo.ListAll(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch translations: %w", err)
|
|
}
|
|
|
|
totalTranslations := len(translations)
|
|
logger.Info(fmt.Sprintf("Found %d translations", totalTranslations))
|
|
|
|
// Filter translations if resuming from checkpoint
|
|
if cp != nil && cp.LastProcessedID > 0 {
|
|
filtered := make([]domain.Translation, 0, len(translations))
|
|
for _, t := range translations {
|
|
if t.ID > cp.LastProcessedID {
|
|
filtered = append(filtered, t)
|
|
}
|
|
}
|
|
translations = filtered
|
|
stats.TotalIndexed = cp.TotalProcessed
|
|
logger.Info(fmt.Sprintf("Filtered translations: remaining=%d, already_processed=%d", len(translations), cp.TotalProcessed))
|
|
}
|
|
|
|
// Process translations in batches
|
|
batch := make([]domain.Translation, 0, batchSize)
|
|
lastProcessedID := uint(0)
|
|
|
|
for i, translation := range translations {
|
|
batch = append(batch, translation)
|
|
lastProcessedID = translation.ID
|
|
|
|
// Process batch when it reaches the batch size or at the end
|
|
if len(batch) >= batchSize || i == len(translations)-1 {
|
|
if err := indexBatch(index, batch, logger); err != nil {
|
|
logger.Error(err, fmt.Sprintf("Failed to index batch of size %d", len(batch)))
|
|
stats.TotalErrors += len(batch)
|
|
// Continue with next batch instead of failing completely
|
|
} else {
|
|
stats.TotalIndexed += len(batch)
|
|
}
|
|
|
|
// Save checkpoint
|
|
cpData := checkpoint{
|
|
LastProcessedID: lastProcessedID,
|
|
TotalProcessed: stats.TotalIndexed,
|
|
LastUpdated: time.Now(),
|
|
}
|
|
if err := saveCheckpoint(&cpData); err != nil {
|
|
logger.Warn(fmt.Sprintf("Failed to save checkpoint: %v", err))
|
|
}
|
|
|
|
// Log progress
|
|
progress := float64(stats.TotalIndexed) / float64(totalTranslations) * 100
|
|
logger.Info(fmt.Sprintf("Migration progress: %d/%d (%.2f%%)", stats.TotalIndexed, totalTranslations, progress))
|
|
|
|
// Clear batch
|
|
batch = batch[:0]
|
|
}
|
|
}
|
|
|
|
stats.Duration = time.Since(startTime)
|
|
return stats, nil
|
|
}
|
|
|
|
// indexBatch indexes a batch of translations
|
|
func indexBatch(index bleve.Index, translations []domain.Translation, logger *log.Logger) error {
|
|
batch := index.NewBatch()
|
|
for _, t := range translations {
|
|
doc := map[string]interface{}{
|
|
"id": strconv.FormatUint(uint64(t.ID), 10),
|
|
"title": t.Title,
|
|
"content": t.Content,
|
|
"description": t.Description,
|
|
"language": t.Language,
|
|
"status": string(t.Status),
|
|
"translatable_id": t.TranslatableID,
|
|
"translatable_type": t.TranslatableType,
|
|
}
|
|
|
|
if t.TranslatorID != nil {
|
|
doc["translator_id"] = *t.TranslatorID
|
|
}
|
|
|
|
docID := fmt.Sprintf("translation_%d", t.ID)
|
|
if err := batch.Index(docID, doc); err != nil {
|
|
return fmt.Errorf("failed to add document to batch: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := index.Batch(batch); err != nil {
|
|
return fmt.Errorf("failed to index batch: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// verifyIndex verifies that all translations in the database are indexed in Bleve
|
|
func verifyIndex(index bleve.Index, repo domain.TranslationRepository, logger *log.Logger, ctx context.Context) error {
|
|
// Fetch all translations
|
|
translations, err := repo.ListAll(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch translations: %w", err)
|
|
}
|
|
|
|
logger.Info(fmt.Sprintf("Verifying %d indexed translations", len(translations)))
|
|
|
|
missing := 0
|
|
for _, t := range translations {
|
|
docID := fmt.Sprintf("translation_%d", t.ID)
|
|
doc, err := index.Document(docID)
|
|
if err != nil {
|
|
logger.Warn(fmt.Sprintf("Translation %d not found in index: %v", t.ID, err))
|
|
missing++
|
|
continue
|
|
}
|
|
if doc == nil {
|
|
logger.Warn(fmt.Sprintf("Translation %d not found in index (nil document)", t.ID))
|
|
missing++
|
|
continue
|
|
}
|
|
}
|
|
|
|
if missing > 0 {
|
|
return fmt.Errorf("verification failed: %d translations missing from index", missing)
|
|
}
|
|
|
|
logger.Info("All translations verified in index")
|
|
return nil
|
|
}
|
|
|
|
// saveCheckpoint saves the migration checkpoint to a file
|
|
func saveCheckpoint(cp *checkpoint) error {
|
|
data, err := json.Marshal(cp)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal checkpoint: %w", err)
|
|
}
|
|
|
|
if err := os.WriteFile(checkpointFile, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write checkpoint file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadCheckpoint loads the migration checkpoint from a file
|
|
func loadCheckpoint() *checkpoint {
|
|
data, err := os.ReadFile(checkpointFile)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
var cp checkpoint
|
|
if err := json.Unmarshal(data, &cp); err != nil {
|
|
return nil
|
|
}
|
|
|
|
return &cp
|
|
}
|