tercul-backend/cmd/cli/commands/bleve_migrate.go
google-labs-jules[bot] 53aa4d0344
Security Hardening and GraphQL Caching (#69)
* feat: add security middleware, graphql apq, and improved linting

- Add RateLimit, RequestValidation, and CORS middleware.
- Configure middleware chain in API server.
- Implement Redis cache for GraphQL Automatic Persisted Queries.
- Add .golangci.yml and fix linting issues (shadowing, timeouts).

* feat: security, caching and linting config

- Fix .golangci.yml config for govet shadow check
- (Previous changes: Security middleware, GraphQL APQ, Linting fixes)

* fix: resolve remaining lint errors

- Fix unhandled errors in tests (errcheck)
- Define constants for repeated strings (goconst)
- Suppress high complexity warnings with nolint:gocyclo
- Fix integer overflow warnings (gosec)
- Add package comments
- Split long lines (lll)
- Rename Analyse -> Analyze (misspell)
- Fix naked returns and unused params

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
2025-12-01 00:14:22 +01:00

418 lines
12 KiB
Go

package commands
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
"tercul/internal/data/sql"
"tercul/internal/domain"
"tercul/internal/platform/config"
"tercul/internal/platform/db"
"tercul/internal/platform/log"
"github.com/blevesearch/bleve/v2"
"github.com/spf13/cobra"
)
const (
// Default batch size for processing translations
defaultBatchSize = 50000
// Checkpoint file to track progress
checkpointFile = ".bleve_migration_checkpoint"
AnalyzerKeyword = "keyword"
AnalyzerStandard = "standard"
)
type checkpoint struct {
LastProcessedID uint `json:"last_processed_id"`
TotalProcessed int `json:"total_processed"`
LastUpdated time.Time `json:"last_updated"`
}
// NewBleveMigrateCommand creates a new Cobra command for Bleve migration
func NewBleveMigrateCommand() *cobra.Command {
var (
indexPath string
batchSize int
resume bool
verify bool
)
cmd := &cobra.Command{
Use: "bleve-migrate",
Short: "Migrate translations from PostgreSQL to Bleve index",
Long: `Migrate all translations from PostgreSQL database to a Bleve search index.
This command:
- Fetches all translations from the database
- Indexes them in batches for efficient processing
- Supports resuming from checkpoints
- Provides progress tracking
- Can verify indexed data after migration
Example:
tercul bleve-migrate --index ./data/bleve_index --batch 50000 --verify`,
RunE: func(cmd *cobra.Command, args []string) error {
if indexPath == "" {
return fmt.Errorf("index path is required (use --index flag)")
}
// Initialize logger
log.Init("bleve-migrate", "development")
logger := log.FromContext(context.Background())
// Load configuration
cfg, err := config.LoadConfig()
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
// Initialize database
database, err := db.InitDB(cfg, nil)
if err != nil {
return fmt.Errorf("failed to initialize database: %w", err)
}
defer func() {
if err := db.Close(database); err != nil {
logger.Error(err, "Error closing database")
}
}()
// Create repositories
repos := sql.NewRepositories(database, cfg)
// Initialize or open Bleve index
logger.Info(fmt.Sprintf("Initializing Bleve index at %s", indexPath))
index, err := initBleveIndex(indexPath)
if err != nil {
return fmt.Errorf("failed to initialize Bleve index: %w", err)
}
defer func() {
if err := index.Close(); err != nil {
logger.Error(err, "Error closing Bleve index")
}
}()
// Load checkpoint if resuming
var cp *checkpoint
if resume {
cp = loadCheckpoint()
if cp != nil {
logger.Info(fmt.Sprintf("Resuming from checkpoint: last_id=%d, total_processed=%d", cp.LastProcessedID, cp.TotalProcessed))
}
}
// Run migration
ctx := context.Background()
stats, err := migrateTranslations(ctx, repos.Translation, index, batchSize, cp, logger)
if err != nil {
return fmt.Errorf("migration failed: %w", err)
}
logger.Info(fmt.Sprintf("Migration completed: indexed=%d, errors=%d, duration=%v", stats.TotalIndexed, stats.TotalErrors, stats.Duration))
// Verify if requested
if verify {
logger.Info("Verifying indexed translations")
if err := verifyIndex(index, repos.Translation, logger, ctx); err != nil {
return fmt.Errorf("verification failed: %w", err)
}
logger.Info("Verification completed successfully")
}
// Clean up checkpoint file
if err := os.Remove(checkpointFile); err != nil && !os.IsNotExist(err) {
logger.Warn(fmt.Sprintf("Failed to remove checkpoint file: %v", err))
}
return nil
},
}
// Add flags
cmd.Flags().StringVarP(&indexPath, "index", "i", "", "Path to Bleve index directory (required)")
cmd.Flags().IntVarP(&batchSize, "batch", "b", defaultBatchSize, "Batch size for processing translations")
cmd.Flags().BoolVarP(&resume, "resume", "r", false, "Resume from last checkpoint")
cmd.Flags().BoolVarP(&verify, "verify", "v", false, "Verify indexed translations after migration")
// Mark index as required
_ = cmd.MarkFlagRequired("index")
return cmd
}
// initBleveIndex creates or opens a Bleve index with the appropriate mapping for translations
func initBleveIndex(indexPath string) (bleve.Index, error) {
// Check if index already exists
index, err := bleve.Open(indexPath)
if err == nil {
return index, nil
}
// Index doesn't exist, create it
mapping := bleve.NewIndexMapping()
// Create document mapping for Translation
translationMapping := bleve.NewDocumentMapping()
// ID field (not analyzed, stored)
idMapping := bleve.NewTextFieldMapping()
idMapping.Store = true
idMapping.Index = true
idMapping.Analyzer = AnalyzerKeyword
translationMapping.AddFieldMappingsAt("id", idMapping)
// Title field (analyzed, stored)
titleMapping := bleve.NewTextFieldMapping()
titleMapping.Store = true
titleMapping.Index = true
titleMapping.Analyzer = AnalyzerStandard
translationMapping.AddFieldMappingsAt("title", titleMapping)
// Content field (analyzed, stored)
contentMapping := bleve.NewTextFieldMapping()
contentMapping.Store = true
contentMapping.Index = true
contentMapping.Analyzer = AnalyzerStandard
translationMapping.AddFieldMappingsAt("content", contentMapping)
// Description field (analyzed, stored)
descriptionMapping := bleve.NewTextFieldMapping()
descriptionMapping.Store = true
descriptionMapping.Index = true
descriptionMapping.Analyzer = AnalyzerStandard
translationMapping.AddFieldMappingsAt("description", descriptionMapping)
// Language field (not analyzed, stored, for filtering)
languageMapping := bleve.NewTextFieldMapping()
languageMapping.Store = true
languageMapping.Index = true
languageMapping.Analyzer = AnalyzerKeyword
translationMapping.AddFieldMappingsAt("language", languageMapping)
// Status field (not analyzed, stored, for filtering)
statusMapping := bleve.NewTextFieldMapping()
statusMapping.Store = true
statusMapping.Index = true
statusMapping.Analyzer = AnalyzerKeyword
translationMapping.AddFieldMappingsAt("status", statusMapping)
// TranslatableID field (not analyzed, stored)
translatableIDMapping := bleve.NewNumericFieldMapping()
translatableIDMapping.Store = true
translatableIDMapping.Index = true
translationMapping.AddFieldMappingsAt("translatable_id", translatableIDMapping)
// TranslatableType field (not analyzed, stored, for filtering)
translatableTypeMapping := bleve.NewTextFieldMapping()
translatableTypeMapping.Store = true
translatableTypeMapping.Index = true
translatableTypeMapping.Analyzer = AnalyzerKeyword
translationMapping.AddFieldMappingsAt("translatable_type", translatableTypeMapping)
// TranslatorID field (not analyzed, stored)
translatorIDMapping := bleve.NewNumericFieldMapping()
translatorIDMapping.Store = true
translatorIDMapping.Index = true
translationMapping.AddFieldMappingsAt("translator_id", translatorIDMapping)
// Add translation mapping to index
mapping.AddDocumentMapping("translation", translationMapping)
// Create index directory if it doesn't exist
if err := os.MkdirAll(filepath.Dir(indexPath), 0755); err != nil {
return nil, fmt.Errorf("failed to create index directory: %w", err)
}
// Create the index
index, err = bleve.New(indexPath, mapping)
if err != nil {
return nil, fmt.Errorf("failed to create Bleve index: %w", err)
}
return index, nil
}
type migrationStats struct {
TotalIndexed int
TotalErrors int
Duration time.Duration
}
// migrateTranslations migrates all translations from PostgreSQL to Bleve index
func migrateTranslations(
ctx context.Context,
repo domain.TranslationRepository,
index bleve.Index,
batchSize int,
cp *checkpoint,
logger *log.Logger,
) (*migrationStats, error) {
startTime := time.Now()
stats := &migrationStats{}
// Fetch all translations
logger.Info("Fetching all translations from database")
translations, err := repo.ListAll(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch translations: %w", err)
}
totalTranslations := len(translations)
logger.Info(fmt.Sprintf("Found %d translations", totalTranslations))
// Filter translations if resuming from checkpoint
if cp != nil && cp.LastProcessedID > 0 {
filtered := make([]domain.Translation, 0, len(translations))
for _, t := range translations {
if t.ID > cp.LastProcessedID {
filtered = append(filtered, t)
}
}
translations = filtered
stats.TotalIndexed = cp.TotalProcessed
logger.Info(fmt.Sprintf("Filtered translations: remaining=%d, already_processed=%d", len(translations), cp.TotalProcessed))
}
// Process translations in batches
batch := make([]domain.Translation, 0, batchSize)
lastProcessedID := uint(0)
for i, translation := range translations {
batch = append(batch, translation)
lastProcessedID = translation.ID
// Process batch when it reaches the batch size or at the end
if len(batch) >= batchSize || i == len(translations)-1 {
if err := indexBatch(index, batch); err != nil {
logger.Error(err, fmt.Sprintf("Failed to index batch of size %d", len(batch)))
stats.TotalErrors += len(batch)
// Continue with next batch instead of failing completely
} else {
stats.TotalIndexed += len(batch)
}
// Save checkpoint
cpData := checkpoint{
LastProcessedID: lastProcessedID,
TotalProcessed: stats.TotalIndexed,
LastUpdated: time.Now(),
}
if err := saveCheckpoint(&cpData); err != nil {
logger.Warn(fmt.Sprintf("Failed to save checkpoint: %v", err))
}
// Log progress
progress := float64(stats.TotalIndexed) / float64(totalTranslations) * 100
logger.Info(fmt.Sprintf("Migration progress: %d/%d (%.2f%%)", stats.TotalIndexed, totalTranslations, progress))
// Clear batch
batch = batch[:0]
}
}
stats.Duration = time.Since(startTime)
return stats, nil
}
// indexBatch indexes a batch of translations
func indexBatch(index bleve.Index, translations []domain.Translation) error {
batch := index.NewBatch()
for _, t := range translations {
doc := map[string]interface{}{
"id": strconv.FormatUint(uint64(t.ID), 10),
"title": t.Title,
"content": t.Content,
"description": t.Description,
"language": t.Language,
"status": string(t.Status),
"translatable_id": t.TranslatableID,
"translatable_type": t.TranslatableType,
}
if t.TranslatorID != nil {
doc["translator_id"] = *t.TranslatorID
}
docID := fmt.Sprintf("translation_%d", t.ID)
if err := batch.Index(docID, doc); err != nil {
return fmt.Errorf("failed to add document to batch: %w", err)
}
}
if err := index.Batch(batch); err != nil {
return fmt.Errorf("failed to index batch: %w", err)
}
return nil
}
// verifyIndex verifies that all translations in the database are indexed in Bleve
func verifyIndex(index bleve.Index, repo domain.TranslationRepository, logger *log.Logger, ctx context.Context) error {
// Fetch all translations
translations, err := repo.ListAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch translations: %w", err)
}
logger.Info(fmt.Sprintf("Verifying %d indexed translations", len(translations)))
missing := 0
for _, t := range translations {
docID := fmt.Sprintf("translation_%d", t.ID)
doc, err := index.Document(docID)
if err != nil {
logger.Warn(fmt.Sprintf("Translation %d not found in index: %v", t.ID, err))
missing++
continue
}
if doc == nil {
logger.Warn(fmt.Sprintf("Translation %d not found in index (nil document)", t.ID))
missing++
continue
}
}
if missing > 0 {
return fmt.Errorf("verification failed: %d translations missing from index", missing)
}
logger.Info("All translations verified in index")
return nil
}
// saveCheckpoint saves the migration checkpoint to a file
func saveCheckpoint(cp *checkpoint) error {
data, err := json.Marshal(cp)
if err != nil {
return fmt.Errorf("failed to marshal checkpoint: %w", err)
}
if err := os.WriteFile(checkpointFile, data, 0644); err != nil {
return fmt.Errorf("failed to write checkpoint file: %w", err)
}
return nil
}
// loadCheckpoint loads the migration checkpoint from a file
func loadCheckpoint() *checkpoint {
data, err := os.ReadFile(checkpointFile)
if err != nil {
return nil
}
var cp checkpoint
if err := json.Unmarshal(data, &cp); err != nil {
return nil
}
return &cp
}