package main import ( "log" "os" "os/signal" "syscall" dbsql "tercul/internal/data/sql" "tercul/internal/jobs/linguistics" "tercul/internal/jobs/sync" "tercul/internal/platform/cache" "tercul/internal/platform/config" "tercul/internal/platform/db" app_log "tercul/internal/platform/log" "github.com/hibiken/asynq" "github.com/weaviate/weaviate-go-client/v5/weaviate" ) func main() { // Load configuration from environment variables cfg, err := config.LoadConfig() if err != nil { log.Fatalf("cannot load config: %v", err) } // Initialize logger app_log.Init("tercul-worker", cfg.Environment) app_log.Info("Starting Tercul worker...") // Initialize database connection database, err := db.InitDB(cfg, nil) // No metrics needed for the worker if err != nil { app_log.Fatal(err, "Failed to initialize database") } defer func() { if err := db.Close(database); err != nil { app_log.Error(err, "Error closing database") } }() // Initialize Weaviate client weaviateCfg := weaviate.Config{ Host: cfg.WeaviateHost, Scheme: cfg.WeaviateScheme, } weaviateClient, err := weaviate.NewClient(weaviateCfg) if err != nil { app_log.Fatal(err, "Failed to create weaviate client") } // Initialize Asynq client and server redisConnection := asynq.RedisClientOpt{Addr: cfg.RedisAddr} asynqClient := asynq.NewClient(redisConnection) defer func() { if err := asynqClient.Close(); err != nil { app_log.Error(err, "Error closing asynq client") } }() srv := asynq.NewServer( redisConnection, asynq.Config{ Concurrency: 10, // Example concurrency Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, }, ) // Create SyncJob with all dependencies syncJob := sync.NewSyncJob(database, asynqClient, cfg, weaviateClient) repos := dbsql.NewRepositories(database, cfg) redisCache, cacheErr := cache.NewDefaultRedisCache(cfg) if cacheErr != nil { app_log.Warn("Redis cache initialization failed for linguistics: " + cacheErr.Error()) } sentimentProvider, spErr := linguistics.NewGoVADERSentimentProvider() if spErr != nil { app_log.Fatal(spErr, "Failed to create sentiment provider") } workAnalyticsDeps := linguistics.WorkAnalyticsDeps{ StatsRepo: repos.Analytics, LikeCounter: repos.Like, CommentCounter: repos.Comment, BookmarkCounter: repos.Bookmark, TranslationCount: repos.Translation, TranslationList: repos.Translation, } linguisticsFactory := linguistics.NewLinguisticsFactory( cfg, database, redisCache, 2, true, sentimentProvider, workAnalyticsDeps, ) linguisticJob := linguistics.NewLinguisticSyncJob(database, linguisticsFactory.GetAnalyzer(), asynqClient) // Create a new ServeMux for routing jobs mux := asynq.NewServeMux() // Register all job handlers sync.RegisterQueueHandlers(mux, syncJob) linguistics.RegisterLinguisticHandlers(mux, linguisticJob) // trending.RegisterTrendingHandlers(mux, analyticsService) // Start the server in a goroutine go func() { if err := srv.Run(mux); err != nil { app_log.Fatal(err, "Could not run asynq server") } }() app_log.Info("Worker started successfully.") // Wait for interrupt signal to gracefully shutdown the server quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit app_log.Info("Shutting down worker...") srv.Shutdown() app_log.Info("Worker shut down successfully.") }