package commands import ( "os" "os/signal" "syscall" "tercul/cmd/cli/internal/bootstrap" "tercul/internal/jobs/sync" "tercul/internal/platform/config" "tercul/internal/platform/db" app_log "tercul/internal/platform/log" "github.com/hibiken/asynq" "github.com/spf13/cobra" ) // NewWorkerCommand creates a new Cobra command for running background workers func NewWorkerCommand() *cobra.Command { cmd := &cobra.Command{ Use: "worker", Short: "Start the Tercul background worker", Long: `Start the Tercul background worker to process async jobs including: - Sync jobs (Weaviate indexing, etc.) - Linguistic analysis jobs - Trending calculation jobs`, RunE: func(cmd *cobra.Command, args []string) error { // Load configuration cfg, err := config.LoadConfig() if err != nil { return err } // Initialize logger app_log.Init("tercul-worker", cfg.Environment) app_log.Info("Starting Tercul worker...") // Initialize database connection database, err := db.InitDB(cfg, nil) // No metrics needed for the worker if err != nil { app_log.Fatal(err, "Failed to initialize database") } defer func() { if err := db.Close(database); err != nil { app_log.Error(err, "Error closing database") } }() // Initialize Weaviate client weaviateClient, err := bootstrap.NewWeaviateClient(cfg) if err != nil { app_log.Fatal(err, "Failed to create weaviate client") } // Initialize Asynq client and server redisConnection := asynq.RedisClientOpt{Addr: cfg.RedisAddr} asynqClient := asynq.NewClient(redisConnection) defer func() { if err := asynqClient.Close(); err != nil { app_log.Error(err, "Error closing asynq client") } }() srv := asynq.NewServer( redisConnection, asynq.Config{ Concurrency: 10, // Example concurrency Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, }, ) // Create SyncJob with all dependencies syncJob := sync.NewSyncJob(database, asynqClient, cfg, weaviateClient) // Create a new ServeMux for routing jobs mux := asynq.NewServeMux() // Register all job handlers sync.RegisterQueueHandlers(mux, syncJob) // Placeholder for other job handlers that might be added in the future // linguistics.RegisterLinguisticHandlers(mux, linguisticJob) // trending.RegisterTrendingHandlers(mux, analyticsService) // Start the server in a goroutine go func() { if err := srv.Run(mux); err != nil { app_log.Fatal(err, "Could not run asynq server") } }() app_log.Info("Worker started successfully.") // Wait for interrupt signal to gracefully shutdown the server quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit app_log.Info("Shutting down worker...") srv.Shutdown() app_log.Info("Worker shut down successfully.") return nil }, } return cmd }