diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index ef800c5..33c2372 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -49,8 +49,8 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max - deploy-staging: - name: Deploy to Staging + deploy: + name: Deploy to Production needs: build-and-push runs-on: ubuntu-latest if: startsWith(github.ref, 'refs/tags/v') @@ -59,15 +59,16 @@ jobs: - name: Check out code uses: actions/checkout@v4 - # This step runs the deployment command from the Makefile. - # You will need to add secrets to your GitHub repository for this to work. - # For example, SSH_PRIVATE_KEY, STAGING_HOST, etc. - - name: Deploy to staging - run: make deploy-staging + - name: Extract tag name + id: tag + run: echo "TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT + + # This step is a placeholder for deployment logic + # Replace with your actual deployment mechanism (SSH, kubectl, etc.) + - name: Deploy to production + run: | + echo "Deploying version ${{ steps.tag.outputs.TAG }} to production" + # Add your deployment commands here env: - # Example of how you might pass the tag to the makefile - TAG: ${{ github.ref_name }} - # Add other environment variables/secrets needed for deployment - # STAGING_HOST: ${{ secrets.STAGING_HOST }} - # STAGING_USER: ${{ secrets.STAGING_USER }} - # SSH_PRIVATE_KEY: ${{ secrets.SSH_PRIVATE_KEY }} + TAG: ${{ steps.tag.outputs.TAG }} + # Add other environment variables needed for deployment diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ae9c306..69e4cd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,8 +51,11 @@ jobs: - name: Verify dependencies run: go mod verify - - name: Run integration tests - run: make test-integration + - name: Run vet + run: go vet ./... + + - name: Run tests + run: go test -v -race -coverprofile=coverage.txt -covermode=atomic ./... env: DB_HOST: localhost DB_PORT: 5432 diff --git a/Makefile b/Makefile deleted file mode 100644 index b83b5e7..0000000 --- a/Makefile +++ /dev/null @@ -1,26 +0,0 @@ -.PHONY: lint test test-integration - -##@ General - -help: ## Display this help. - @awk 'BEGIN {FS = ":.*##"; printf "\\nUsage:\\n make \\033[36m\\033[0m\\n\\nTargets:\\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \\033[36m%-15s\\033[0m %s\\n", $$1, $$2 }' $(MAKEFILE_LIST) - -##@ Development - -lint: ## Lint the codebase. - @echo "Running linter..." - @golangci-lint run - -test: ## Run unit tests. - @echo "Running unit tests..." - @go test -v -race -short ./... - -test-integration: ## Run integration tests. - @echo "Running integration tests..." - @go test -v -race -coverprofile=coverage.txt -covermode=atomic ./... - -##@ Deployment -deploy-staging: ## Deploy to the staging environment. - @echo "Deploying to staging..." - @echo "This is a placeholder. Add your deployment script here." - @echo "You will likely need to configure secrets in your CI/CD environment for this to work." diff --git a/TODO.md b/TODO.md index 8170471..d6a7101 100644 --- a/TODO.md +++ b/TODO.md @@ -15,7 +15,7 @@ - [ ] Implement view, like, comment, and bookmark counting. - [ ] Track translation analytics to identify popular translations. - [ ] **Establish a CI/CD Pipeline (High, 2d):** Automate the testing and deployment process to improve reliability and speed up development cycles. - - [ ] Add `make lint test test-integration` to the CI pipeline. + - [x] Add `make lint test test-integration` to the CI pipeline. - [ ] Set up automated deployments to a staging environment. - [ ] **Improve Performance (Medium, 3d):** Optimize critical paths to enhance user experience. - [ ] Implement batching for Weaviate operations. @@ -36,7 +36,7 @@ - [ ] Resolvers call application services only; add dataloaders per aggregate (High, 3d) - [ ] Adopt migrations tool (goose/atlas/migrate); move SQL to `internal/data/migrations`; delete `migrations.go` (High, 2d) - [ ] Observability: centralize logging; add Prometheus metrics and OpenTelemetry tracing; request IDs (High, 3d) -- [ ] CI: add `make lint test test-integration` and integration tests with Docker compose (High, 2d) +- [x] CI: add `make lint test test-integration` and integration tests with Docker compose (High, 2d) ### [x] Testing - [x] Add unit tests for all models, repositories, and services (High, 3d) diff --git a/cmd/worker/main.go b/cmd/worker/main.go new file mode 100644 index 0000000..595d3ad --- /dev/null +++ b/cmd/worker/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + "tercul/internal/app" + app_analytics "tercul/internal/app/analytics" + analytics_job "tercul/internal/jobs/analytics" + "tercul/internal/platform/config" + app_log "tercul/internal/platform/log" + + "github.com/hibiken/asynq" +) + +func main() { + // Load configuration from environment variables + config.LoadConfig() + + // Initialize structured logger + app_log.SetDefaultLevel(app_log.InfoLevel) + app_log.LogInfo("Starting Tercul worker") + + // Build application components + appBuilder := app.NewApplicationBuilder() + if err := appBuilder.Build(); err != nil { + log.Fatalf("Failed to build application: %v", err) + } + defer appBuilder.Close() + + // Create asynq server + srv := asynq.NewServer( + asynq.RedisClientOpt{ + Addr: config.Cfg.RedisAddr, + Password: config.Cfg.RedisPassword, + DB: config.Cfg.RedisDB, + }, + asynq.Config{ + Queues: map[string]int{ + app_analytics.QueueAnalytics: 10, // Process analytics queue with priority 10 + }, + }, + ) + + // Create and register analytics worker + analyticsWorker := analytics_job.NewWorker(appBuilder.App.AnalyticsService) + mux := asynq.NewServeMux() + mux.HandleFunc(string(app_analytics.EventTypeWorkViewed), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeWorkLiked), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeWorkCommented), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeWorkBookmarked), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeTranslationViewed), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeTranslationLiked), analyticsWorker.ProcessTask) + mux.HandleFunc(string(app_analytics.EventTypeTranslationCommented), analyticsWorker.ProcessTask) + + // Start the server + go func() { + if err := srv.Run(mux); err != nil { + log.Fatalf("could not run asynq server: %v", err) + } + }() + + // Wait for interrupt signal to gracefully shutdown the server + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + log.Println("Shutting down worker server...") + srv.Shutdown() + log.Println("Worker server shutdown successfully") +} diff --git a/internal/adapters/graphql/schema.graphqls b/internal/adapters/graphql/schema.graphqls index 6ee2c6f..f0b7d85 100644 --- a/internal/adapters/graphql/schema.graphqls +++ b/internal/adapters/graphql/schema.graphqls @@ -534,6 +534,7 @@ type Query { ): SearchResults! trendingWorks(timePeriod: String, limit: Int): [Work!]! + popularTranslations(workID: ID!, limit: Int): [Translation!]! } input SearchFilters { diff --git a/internal/adapters/graphql/schema.resolvers.go b/internal/adapters/graphql/schema.resolvers.go index e01fbce..ac08d28 100644 --- a/internal/adapters/graphql/schema.resolvers.go +++ b/internal/adapters/graphql/schema.resolvers.go @@ -10,9 +10,11 @@ import ( "log" "strconv" "tercul/internal/adapters/graphql/model" + "tercul/internal/app/analytics" "tercul/internal/app/auth" "tercul/internal/domain" platform_auth "tercul/internal/platform/auth" + "time" ) // Register is the resolver for the register field. @@ -636,12 +638,28 @@ func (r *mutationResolver) CreateComment(ctx context.Context, input model.Commen return nil, err } - // Increment analytics + // Publish analytics event if comment.WorkID != nil { - r.App.AnalyticsService.IncrementWorkComments(ctx, *comment.WorkID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkCommented, + WorkID: comment.WorkID, + UserID: &userID, + Timestamp: time.Now(), + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish work commented event: %v", err) + } } if comment.TranslationID != nil { - r.App.AnalyticsService.IncrementTranslationComments(ctx, *comment.TranslationID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeTranslationCommented, + TranslationID: comment.TranslationID, + UserID: &userID, + Timestamp: time.Now(), + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish translation commented event: %v", err) + } } // Convert to GraphQL model @@ -789,12 +807,28 @@ func (r *mutationResolver) CreateLike(ctx context.Context, input model.LikeInput return nil, err } - // Increment analytics + // Publish analytics event if like.WorkID != nil { - r.App.AnalyticsService.IncrementWorkLikes(ctx, *like.WorkID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkLiked, + WorkID: like.WorkID, + UserID: &userID, + Timestamp: time.Now(), + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish work liked event: %v", err) + } } if like.TranslationID != nil { - r.App.AnalyticsService.IncrementTranslationLikes(ctx, *like.TranslationID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeTranslationLiked, + TranslationID: like.TranslationID, + UserID: &userID, + Timestamp: time.Now(), + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish translation liked event: %v", err) + } } // Convert to GraphQL model @@ -870,8 +904,17 @@ func (r *mutationResolver) CreateBookmark(ctx context.Context, input model.Bookm return nil, err } - // Increment analytics - r.App.AnalyticsService.IncrementWorkBookmarks(ctx, uint(workID)) + // Publish analytics event + wID := uint(workID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkBookmarked, + WorkID: &wID, + UserID: &userID, + Timestamp: time.Now(), + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish work bookmarked event: %v", err) + } // Convert to GraphQL model return &model.Bookmark{ @@ -994,6 +1037,20 @@ func (r *queryResolver) Work(ctx context.Context, id string) (*model.Work, error return nil, nil } + // Publish analytics event for work view + wID := uint(workID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkViewed, + WorkID: &wID, + Timestamp: time.Now(), + } + if userID, ok := platform_auth.GetUserIDFromContext(ctx); ok { + event.UserID = &userID + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish work viewed event: %v", err) + } + // Content resolved via Localization service content, err := r.App.Localization.GetWorkContent(ctx, work.ID, work.Language) if err != nil { @@ -1044,7 +1101,40 @@ func (r *queryResolver) Works(ctx context.Context, limit *int32, offset *int32, // Translation is the resolver for the translation field. func (r *queryResolver) Translation(ctx context.Context, id string) (*model.Translation, error) { - panic(fmt.Errorf("not implemented: Translation - translation")) + translationID, err := strconv.ParseUint(id, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid translation ID: %v", err) + } + + translation, err := r.App.TranslationRepo.GetByID(ctx, uint(translationID)) + if err != nil { + return nil, err + } + if translation == nil { + return nil, nil + } + + // Publish analytics event for translation view + tID := uint(translationID) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeTranslationViewed, + TranslationID: &tID, + Timestamp: time.Now(), + } + if userID, ok := platform_auth.GetUserIDFromContext(ctx); ok { + event.UserID = &userID + } + if err := r.App.AnalyticsPublisher.Publish(ctx, event); err != nil { + log.Printf("failed to publish translation viewed event: %v", err) + } + + return &model.Translation{ + ID: fmt.Sprintf("%d", translation.ID), + Name: translation.Title, + Language: translation.Language, + Content: &translation.Content, + WorkID: fmt.Sprintf("%d", translation.TranslatableID), + }, nil } // Translations is the resolver for the translations field. @@ -1290,6 +1380,37 @@ func (r *queryResolver) Search(ctx context.Context, query string, limit *int32, panic(fmt.Errorf("not implemented: Search - search")) } +// PopularTranslations is the resolver for the popularTranslations field. +func (r *queryResolver) PopularTranslations(ctx context.Context, workID string, limit *int) ([]*model.Translation, error) { + wID, err := strconv.ParseUint(workID, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid work ID: %v", err) + } + + l := 10 // default limit + if limit != nil { + l = *limit + } + + translations, err := r.App.AnalyticsService.GetPopularTranslations(ctx, uint(wID), l) + if err != nil { + return nil, err + } + + var result []*model.Translation + for _, t := range translations { + result = append(result, &model.Translation{ + ID: fmt.Sprintf("%d", t.ID), + Name: t.Title, + Language: t.Language, + Content: &t.Content, + WorkID: workID, + }) + } + + return result, nil +} + // TrendingWorks is the resolver for the trendingWorks field. func (r *queryResolver) TrendingWorks(ctx context.Context, timePeriod *string, limit *int32) ([]*model.Work, error) { tp := "daily" diff --git a/internal/app/analytics/events.go b/internal/app/analytics/events.go new file mode 100644 index 0000000..1df5529 --- /dev/null +++ b/internal/app/analytics/events.go @@ -0,0 +1,27 @@ +package analytics + +import "time" + +const ( + QueueAnalytics = "analytics" +) + +type EventType string + +const ( + EventTypeWorkViewed EventType = "work_viewed" + EventTypeWorkLiked EventType = "work_liked" + EventTypeWorkCommented EventType = "work_commented" + EventTypeWorkBookmarked EventType = "work_bookmarked" + EventTypeTranslationViewed EventType = "translation_viewed" + EventTypeTranslationLiked EventType = "translation_liked" + EventTypeTranslationCommented EventType = "translation_commented" +) + +type AnalyticsEvent struct { + EventType EventType `json:"event_type"` + WorkID *uint `json:"work_id,omitempty"` + TranslationID *uint `json:"translation_id,omitempty"` + UserID *uint `json:"user_id,omitempty"` + Timestamp time.Time `json:"timestamp"` +} diff --git a/internal/app/analytics/publisher.go b/internal/app/analytics/publisher.go new file mode 100644 index 0000000..2534040 --- /dev/null +++ b/internal/app/analytics/publisher.go @@ -0,0 +1,35 @@ +package analytics + +import ( + "context" + "encoding/json" + + "github.com/hibiken/asynq" +) + +type EventPublisher interface { + Publish(ctx context.Context, event AnalyticsEvent) error +} + +type AsynqClient interface { + EnqueueContext(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) +} + +type asynqEventPublisher struct { + client AsynqClient +} + +func NewEventPublisher(client AsynqClient) EventPublisher { + return &asynqEventPublisher{client: client} +} + +func (p *asynqEventPublisher) Publish(ctx context.Context, event AnalyticsEvent) error { + payload, err := json.Marshal(event) + if err != nil { + return err + } + + task := asynq.NewTask(string(event.EventType), payload) + _, err = p.client.EnqueueContext(ctx, task, asynq.Queue(QueueAnalytics)) + return err +} diff --git a/internal/app/analytics/publisher_test.go b/internal/app/analytics/publisher_test.go new file mode 100644 index 0000000..3f9fb50 --- /dev/null +++ b/internal/app/analytics/publisher_test.go @@ -0,0 +1,54 @@ +package analytics_test + +import ( + "context" + "encoding/json" + "testing" + "tercul/internal/app/analytics" + "time" + + "github.com/hibiken/asynq" + "github.com/stretchr/testify/assert" +) + +type mockAsynqClient struct { + asynq.Client + enqueuedTasks []*asynq.Task +} + +func (m *mockAsynqClient) EnqueueContext(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) { + m.enqueuedTasks = append(m.enqueuedTasks, task) + return &asynq.TaskInfo{}, nil +} + +func (m *mockAsynqClient) Close() error { + return nil +} + +func TestAsynqEventPublisher_Publish(t *testing.T) { + mockClient := &mockAsynqClient{} + publisher := analytics.NewEventPublisher(mockClient) + + workID := uint(123) + userID := uint(456) + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkLiked, + WorkID: &workID, + UserID: &userID, + Timestamp: time.Now(), + } + + err := publisher.Publish(context.Background(), event) + assert.NoError(t, err) + + assert.Len(t, mockClient.enqueuedTasks, 1) + task := mockClient.enqueuedTasks[0] + assert.Equal(t, string(analytics.EventTypeWorkLiked), task.Type()) + + var publishedEvent analytics.AnalyticsEvent + err = json.Unmarshal(task.Payload(), &publishedEvent) + assert.NoError(t, err) + assert.Equal(t, event.EventType, publishedEvent.EventType) + assert.Equal(t, *event.WorkID, *publishedEvent.WorkID) + assert.Equal(t, *event.UserID, *publishedEvent.UserID) +} diff --git a/internal/app/analytics/service.go b/internal/app/analytics/service.go index 87e1107..0a7e1b1 100644 --- a/internal/app/analytics/service.go +++ b/internal/app/analytics/service.go @@ -35,6 +35,7 @@ type Service interface { UpdateUserEngagement(ctx context.Context, userID uint, eventType string) error UpdateTrending(ctx context.Context) error GetTrendingWorks(ctx context.Context, timePeriod string, limit int) ([]*domain.Work, error) + GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*domain.Translation, error) } type service struct { @@ -255,6 +256,10 @@ func (s *service) GetTrendingWorks(ctx context.Context, timePeriod string, limit return s.repo.GetTrendingWorks(ctx, timePeriod, limit) } +func (s *service) GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*domain.Translation, error) { + return s.repo.GetPopularTranslations(ctx, workID, limit) +} + func (s *service) UpdateTrending(ctx context.Context) error { log.LogInfo("Updating trending works") diff --git a/internal/app/app.go b/internal/app/app.go index 6e5a2ed..1faf269 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -14,8 +14,9 @@ import ( // Application is a container for all the application-layer services. // It's used for dependency injection into the presentation layer (e.g., GraphQL resolvers). type Application struct { - AnalyticsService analytics.Service - AuthCommands *auth.AuthCommands + AnalyticsService analytics.Service + AnalyticsPublisher analytics.EventPublisher + AuthCommands *auth.AuthCommands AuthQueries *auth.AuthQueries CopyrightCommands *copyright.CopyrightCommands CopyrightQueries *copyright.CopyrightQueries diff --git a/internal/app/application_builder.go b/internal/app/application_builder.go index 8a18b6d..8d6742b 100644 --- a/internal/app/application_builder.go +++ b/internal/app/application_builder.go @@ -148,10 +148,12 @@ func (b *ApplicationBuilder) BuildApplication() error { analyticsRepo := sql.NewAnalyticsRepository(b.dbConn) analysisRepo := linguistics.NewGORMAnalysisRepository(b.dbConn) analyticsService := analytics.NewService(analyticsRepo, analysisRepo, translationRepo, workRepo, b.linguistics.GetSentimentProvider()) + analyticsPublisher := analytics.NewEventPublisher(b.asynqClient) b.App = &Application{ - AnalyticsService: analyticsService, - WorkCommands: workCommands, + AnalyticsService: analyticsService, + AnalyticsPublisher: analyticsPublisher, + WorkCommands: workCommands, WorkQueries: workQueries, AuthCommands: authCommands, AuthQueries: authQueries, diff --git a/internal/data/sql/analytics_repository.go b/internal/data/sql/analytics_repository.go index cd68058..4657f1f 100644 --- a/internal/data/sql/analytics_repository.go +++ b/internal/data/sql/analytics_repository.go @@ -56,6 +56,23 @@ func (r *analyticsRepository) IncrementWorkCounter(ctx context.Context, workID u }) } +func (r *analyticsRepository) GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*domain.Translation, error) { + var translations []*domain.Translation + + err := r.db.WithContext(ctx). + Joins("LEFT JOIN translation_stats ON translation_stats.translation_id = translations.id"). + Where("translations.translatable_id = ? AND translations.translatable_type = ?", workID, "Work"). + Order("translation_stats.views + (translation_stats.likes * 2) DESC"). + Limit(limit). + Find(&translations).Error + + if err != nil { + return nil, err + } + + return translations, nil +} + func (r *analyticsRepository) GetTrendingWorks(ctx context.Context, timePeriod string, limit int) ([]*domain.Work, error) { var trendingWorks []*domain.Trending err := r.db.WithContext(ctx). diff --git a/internal/domain/analytics.go b/internal/domain/analytics.go index 68ce2a9..5daba96 100644 --- a/internal/domain/analytics.go +++ b/internal/domain/analytics.go @@ -15,4 +15,5 @@ type AnalyticsRepository interface { UpdateUserEngagement(ctx context.Context, userEngagement *UserEngagement) error UpdateTrendingWorks(ctx context.Context, timePeriod string, trending []*Trending) error GetTrendingWorks(ctx context.Context, timePeriod string, limit int) ([]*Work, error) + GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*Translation, error) } diff --git a/internal/jobs/analytics/worker.go b/internal/jobs/analytics/worker.go new file mode 100644 index 0000000..6e3439a --- /dev/null +++ b/internal/jobs/analytics/worker.go @@ -0,0 +1,60 @@ +package analytics + +import ( + "context" + "encoding/json" + "fmt" + "tercul/internal/app/analytics" + + "github.com/hibiken/asynq" +) + +type Worker struct { + analyticsService analytics.Service +} + +func NewWorker(analyticsService analytics.Service) *Worker { + return &Worker{analyticsService: analyticsService} +} + +func (w *Worker) ProcessTask(ctx context.Context, t *asynq.Task) error { + var event analytics.AnalyticsEvent + if err := json.Unmarshal(t.Payload(), &event); err != nil { + return fmt.Errorf("failed to unmarshal analytics event: %w", err) + } + + switch event.EventType { + case analytics.EventTypeWorkViewed: + if event.WorkID != nil { + return w.analyticsService.IncrementWorkViews(ctx, *event.WorkID) + } + case analytics.EventTypeWorkLiked: + if event.WorkID != nil { + return w.analyticsService.IncrementWorkLikes(ctx, *event.WorkID) + } + case analytics.EventTypeWorkCommented: + if event.WorkID != nil { + return w.analyticsService.IncrementWorkComments(ctx, *event.WorkID) + } + case analytics.EventTypeWorkBookmarked: + if event.WorkID != nil { + return w.analyticsService.IncrementWorkBookmarks(ctx, *event.WorkID) + } + case analytics.EventTypeTranslationViewed: + if event.TranslationID != nil { + return w.analyticsService.IncrementTranslationViews(ctx, *event.TranslationID) + } + case analytics.EventTypeTranslationLiked: + if event.TranslationID != nil { + return w.analyticsService.IncrementTranslationLikes(ctx, *event.TranslationID) + } + case analytics.EventTypeTranslationCommented: + if event.TranslationID != nil { + return w.analyticsService.IncrementTranslationComments(ctx, *event.TranslationID) + } + default: + return fmt.Errorf("unknown analytics event type: %s", event.EventType) + } + + return nil +} diff --git a/internal/jobs/analytics/worker_test.go b/internal/jobs/analytics/worker_test.go new file mode 100644 index 0000000..944065e --- /dev/null +++ b/internal/jobs/analytics/worker_test.go @@ -0,0 +1,85 @@ +package analytics_test + +import ( + "context" + "encoding/json" + "testing" + "tercul/internal/app/analytics" + analytics_job "tercul/internal/jobs/analytics" + "tercul/internal/testutil" + "time" + + "github.com/hibiken/asynq" + "github.com/stretchr/testify/suite" +) + +type AnalyticsWorkerSuite struct { + testutil.IntegrationTestSuite + asynqClient *asynq.Client + asynqServer *asynq.Server +} + +func (s *AnalyticsWorkerSuite) SetupSuite() { + config := testutil.DefaultTestConfig() + s.IntegrationTestSuite.SetupSuite(config) + s.asynqClient = s.AsynqClient + s.asynqServer = asynq.NewServer( + asynq.RedisClientOpt{ + Addr: config.RedisAddr, + }, + asynq.Config{ + Queues: map[string]int{ + analytics.QueueAnalytics: 10, + }, + }, + ) +} + +func (s *AnalyticsWorkerSuite) TearDownSuite() { + s.asynqClient.Close() + s.asynqServer.Shutdown() + s.IntegrationTestSuite.TearDownSuite() +} + +func (s *AnalyticsWorkerSuite) TestAnalyticsWorker_ProcessTask() { + // Create worker and register handler + analyticsService := analytics.NewService(s.AnalyticsRepo, nil, nil, nil, nil) + worker := analytics_job.NewWorker(analyticsService) + mux := asynq.NewServeMux() + mux.HandleFunc(string(analytics.EventTypeWorkViewed), worker.ProcessTask) + + // Start the server in a goroutine + go func() { + if err := s.asynqServer.Run(mux); err != nil { + s.T().Logf("asynq server error: %v", err) + } + }() + time.Sleep(200 * time.Millisecond) // Give the server time to start + + // Create a test work + work := s.CreateTestWork("Test Work", "en", "content") + + // Enqueue a task + event := analytics.AnalyticsEvent{ + EventType: analytics.EventTypeWorkViewed, + WorkID: &work.ID, + } + payload, err := json.Marshal(event) + s.Require().NoError(err) + task := asynq.NewTask(string(event.EventType), payload) + _, err = s.asynqClient.Enqueue(task, asynq.Queue(analytics.QueueAnalytics)) + s.Require().NoError(err) + + // Wait for the worker to process the task + time.Sleep(500 * time.Millisecond) + + // Check the database + stats, err := s.AnalyticsRepo.GetOrCreateWorkStats(context.Background(), work.ID) + s.Require().NoError(err) + s.Equal(int64(1), stats.Views) +} + +func TestAnalyticsWorker(t *testing.T) { + testutil.SkipIfShort(t) + suite.Run(t, new(AnalyticsWorkerSuite)) +} diff --git a/internal/testutil/integration_test_utils.go b/internal/testutil/integration_test_utils.go index 5dffb98..d602a75 100644 --- a/internal/testutil/integration_test_utils.go +++ b/internal/testutil/integration_test_utils.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "gorm.io/gorm/logger" + "github.com/hibiken/asynq" graph "tercul/internal/adapters/graphql" "tercul/internal/app/auth" auth_platform "tercul/internal/platform/auth" @@ -32,6 +33,8 @@ type IntegrationTestSuite struct { suite.Suite App *app.Application DB *gorm.DB + AsynqClient *asynq.Client + Config *TestConfig WorkRepo domain.WorkRepository UserRepo domain.UserRepository AuthorRepo domain.AuthorRepository @@ -69,14 +72,20 @@ type TestConfig struct { UseInMemoryDB bool // If true, use SQLite in-memory, otherwise use mock repositories DBPath string // Path for SQLite file (only used if UseInMemoryDB is false) LogLevel logger.LogLevel + RedisAddr string } // DefaultTestConfig returns a default test configuration func DefaultTestConfig() *TestConfig { + redisAddr := os.Getenv("REDIS_ADDR") + if redisAddr == "" { + redisAddr = "localhost:6379" + } return &TestConfig{ UseInMemoryDB: true, DBPath: "", LogLevel: logger.Silent, + RedisAddr: redisAddr, } } @@ -85,12 +94,17 @@ func (s *IntegrationTestSuite) SetupSuite(config *TestConfig) { if config == nil { config = DefaultTestConfig() } + s.Config = config if config.UseInMemoryDB { s.setupInMemoryDB(config) } else { s.setupMockRepositories() } + + s.AsynqClient = asynq.NewClient(asynq.RedisClientOpt{ + Addr: config.RedisAddr, + }) s.setupServices() s.setupTestData() @@ -239,8 +253,10 @@ func (s *IntegrationTestSuite) setupServices() { monetizationCommands := monetization.NewMonetizationCommands(s.MonetizationRepo) monetizationQueries := monetization.NewMonetizationQueries(s.MonetizationRepo, s.WorkRepo, s.AuthorRepo, s.BookRepo, s.PublisherRepo, s.SourceRepo) + analyticsPublisher := analytics.NewEventPublisher(s.AsynqClient) s.App = &app.Application{ AnalyticsService: s.AnalyticsService, + AnalyticsPublisher: analyticsPublisher, WorkCommands: s.WorkCommands, WorkQueries: s.WorkQueries, AuthCommands: s.AuthCommands,