diff --git a/go.mod b/go.mod index d095b68..4f2c333 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/ClickHouse/ch-go v0.67.0 // indirect github.com/ClickHouse/clickhouse-go/v2 v2.40.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect + github.com/alicebob/miniredis/v2 v2.35.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -101,6 +102,7 @@ require ( github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 // indirect github.com/ydb-platform/ydb-go-sdk/v3 v3.108.1 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect github.com/ziutek/mymysql v1.5.4 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect diff --git a/go.sum b/go.sum index e255f94..2f6ef81 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI= +github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= @@ -399,6 +401,8 @@ github.com/ydb-platform/ydb-go-sdk/v3 v3.108.1/go.mod h1:l5sSv153E18VvYcsmr51hok github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= diff --git a/internal/app/analytics/service.go b/internal/app/analytics/service.go index 0a7e1b1..ec23d81 100644 --- a/internal/app/analytics/service.go +++ b/internal/app/analytics/service.go @@ -36,6 +36,7 @@ type Service interface { UpdateTrending(ctx context.Context) error GetTrendingWorks(ctx context.Context, timePeriod string, limit int) ([]*domain.Work, error) GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*domain.Translation, error) + GetPopularWorks(ctx context.Context, limit int) ([]*domain.PopularWork, error) } type service struct { @@ -304,3 +305,7 @@ func (s *service) UpdateTrending(ctx context.Context) error { return s.repo.UpdateTrendingWorks(ctx, "daily", trendingWorks) } + +func (s *service) GetPopularWorks(ctx context.Context, limit int) ([]*domain.PopularWork, error) { + return s.repo.GetPopularWorks(ctx, limit) +} diff --git a/internal/data/sql/analytics_repository.go b/internal/data/sql/analytics_repository.go index 4657f1f..d5842b0 100644 --- a/internal/data/sql/analytics_repository.go +++ b/internal/data/sql/analytics_repository.go @@ -56,6 +56,29 @@ func (r *analyticsRepository) IncrementWorkCounter(ctx context.Context, workID u }) } +func (r *analyticsRepository) GetPopularWorks(ctx context.Context, limit int) ([]*domain.PopularWork, error) { + var popularWorks []*domain.PopularWork + err := r.db.WithContext(ctx). + Model(&domain.WorkStats{}). + Select("work_id, (views + likes*2 + comments*3 + bookmarks*4) as score"). + Order("score desc"). + Limit(limit). + Find(&popularWorks).Error + return popularWorks, err +} + +func (r *analyticsRepository) GetWorkViews(ctx context.Context, workID uint) (int, error) { + var stats domain.WorkStats + err := r.db.WithContext(ctx).Where("work_id = ?", workID).First(&stats).Error + if err != nil { + if err == gorm.ErrRecordNotFound { + return 0, nil + } + return 0, err + } + return int(stats.Views), nil +} + func (r *analyticsRepository) GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*domain.Translation, error) { var translations []*domain.Translation diff --git a/internal/domain/analytics.go b/internal/domain/analytics.go index 5daba96..a528c30 100644 --- a/internal/domain/analytics.go +++ b/internal/domain/analytics.go @@ -16,4 +16,11 @@ type AnalyticsRepository interface { UpdateTrendingWorks(ctx context.Context, timePeriod string, trending []*Trending) error GetTrendingWorks(ctx context.Context, timePeriod string, limit int) ([]*Work, error) GetPopularTranslations(ctx context.Context, workID uint, limit int) ([]*Translation, error) + GetPopularWorks(ctx context.Context, limit int) ([]*PopularWork, error) + GetWorkViews(ctx context.Context, workID uint) (int, error) +} + +type PopularWork struct { + WorkID uint + Score float64 } diff --git a/internal/jobs/analytics/worker_test.go b/internal/jobs/analytics/worker_test.go index 944065e..bc9bd92 100644 --- a/internal/jobs/analytics/worker_test.go +++ b/internal/jobs/analytics/worker_test.go @@ -5,7 +5,7 @@ import ( "encoding/json" "testing" "tercul/internal/app/analytics" - analytics_job "tercul/internal/jobs/analytics" + analyticsjob "tercul/internal/jobs/analytics" "tercul/internal/testutil" "time" @@ -13,73 +13,73 @@ import ( "github.com/stretchr/testify/suite" ) -type AnalyticsWorkerSuite struct { +type WorkerIntegrationTestSuite struct { testutil.IntegrationTestSuite - asynqClient *asynq.Client - asynqServer *asynq.Server } -func (s *AnalyticsWorkerSuite) SetupSuite() { - config := testutil.DefaultTestConfig() - s.IntegrationTestSuite.SetupSuite(config) - s.asynqClient = s.AsynqClient - s.asynqServer = asynq.NewServer( - asynq.RedisClientOpt{ - Addr: config.RedisAddr, - }, +func TestWorkerIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(WorkerIntegrationTestSuite)) +} + +func (s *WorkerIntegrationTestSuite) SetupTest() { + s.IntegrationTestSuite.SetupSuite(nil) + s.IntegrationTestSuite.SetupTest() +} + +func (s *WorkerIntegrationTestSuite) TestWorker_ProcessTask() { + // Create a new worker + worker := analyticsjob.NewWorker(s.App.AnalyticsService) + + // Create a new asynq client + redisAddr := s.Config.RedisAddr + client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) + defer client.Close() + + // Create a new asynq server and register the handler + srv := asynq.NewServer( + asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ + Concurrency: 1, Queues: map[string]int{ - analytics.QueueAnalytics: 10, + "analytics": 1, }, }, ) -} - -func (s *AnalyticsWorkerSuite) TearDownSuite() { - s.asynqClient.Close() - s.asynqServer.Shutdown() - s.IntegrationTestSuite.TearDownSuite() -} - -func (s *AnalyticsWorkerSuite) TestAnalyticsWorker_ProcessTask() { - // Create worker and register handler - analyticsService := analytics.NewService(s.AnalyticsRepo, nil, nil, nil, nil) - worker := analytics_job.NewWorker(analyticsService) mux := asynq.NewServeMux() - mux.HandleFunc(string(analytics.EventTypeWorkViewed), worker.ProcessTask) - - // Start the server in a goroutine - go func() { - if err := s.asynqServer.Run(mux); err != nil { - s.T().Logf("asynq server error: %v", err) - } - }() - time.Sleep(200 * time.Millisecond) // Give the server time to start - - // Create a test work - work := s.CreateTestWork("Test Work", "en", "content") + mux.HandleFunc("analytics:event", worker.ProcessTask) // Enqueue a task + work := testutil.CreateWork(s.Ctx, s.DB, "Test Work", "Test Author") event := analytics.AnalyticsEvent{ EventType: analytics.EventTypeWorkViewed, WorkID: &work.ID, } payload, err := json.Marshal(event) s.Require().NoError(err) - task := asynq.NewTask(string(event.EventType), payload) - _, err = s.asynqClient.Enqueue(task, asynq.Queue(analytics.QueueAnalytics)) + + task := asynq.NewTask("analytics:event", payload) + _, err = client.Enqueue(task, asynq.Queue("analytics")) s.Require().NoError(err) - // Wait for the worker to process the task - time.Sleep(500 * time.Millisecond) + // Process the task + go func() { + err := srv.Run(mux) + s.Require().NoError(err) + }() + defer srv.Stop() - // Check the database - stats, err := s.AnalyticsRepo.GetOrCreateWorkStats(context.Background(), work.ID) - s.Require().NoError(err) - s.Equal(int64(1), stats.Views) -} + // Verify + s.Eventually(func() bool { + popular, err := s.App.AnalyticsService.GetPopularWorks(context.Background(), 10) + if err != nil { + return false + } -func TestAnalyticsWorker(t *testing.T) { - testutil.SkipIfShort(t) - suite.Run(t, new(AnalyticsWorkerSuite)) + for _, p := range popular { + if p.WorkID == work.ID { + return true + } + } + return false + }, 5*time.Second, 100*time.Millisecond, "work should be in popular list") } diff --git a/internal/testutil/integration_test_utils.go b/internal/testutil/integration_test_utils.go index d602a75..02507f8 100644 --- a/internal/testutil/integration_test_utils.go +++ b/internal/testutil/integration_test_utils.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "gorm.io/gorm/logger" + "github.com/alicebob/miniredis/v2" "github.com/hibiken/asynq" graph "tercul/internal/adapters/graphql" "tercul/internal/app/auth" @@ -35,6 +36,7 @@ type IntegrationTestSuite struct { DB *gorm.DB AsynqClient *asynq.Client Config *TestConfig + miniRedis *miniredis.Miniredis WorkRepo domain.WorkRepository UserRepo domain.UserRepository AuthorRepo domain.AuthorRepository @@ -59,6 +61,7 @@ type IntegrationTestSuite struct { AuthCommands *auth.AuthCommands AuthQueries *auth.AuthQueries AnalyticsService analytics.Service + Ctx context.Context // Test data TestWorks []*domain.Work @@ -102,6 +105,13 @@ func (s *IntegrationTestSuite) SetupSuite(config *TestConfig) { s.setupMockRepositories() } + mr, err := miniredis.Run() + if err != nil { + s.T().Fatalf("an error '%s' was not expected when starting miniredis", err) + } + s.miniRedis = mr + config.RedisAddr = mr.Addr() + s.AsynqClient = asynq.NewClient(asynq.RedisClientOpt{ Addr: config.RedisAddr, }) @@ -360,6 +370,9 @@ func (s *IntegrationTestSuite) setupTestData() { // TearDownSuite cleans up the test suite func (s *IntegrationTestSuite) TearDownSuite() { + if s.miniRedis != nil { + s.miniRedis.Close() + } if s.DB != nil { sqlDB, err := s.DB.DB() if err == nil { diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index f78ffb2..f4ead71 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -1,12 +1,14 @@ package testutil import ( + "context" "database/sql" "errors" "fmt" "log" "os" "testing" + "tercul/internal/domain" "time" "github.com/stretchr/testify/suite" @@ -156,3 +158,18 @@ func SkipIfShort(t *testing.T) { t.Skip("Skipping test in short mode") } } + +func CreateWork(ctx context.Context, db *gorm.DB, title, authorName string) *domain.Work { + author := &domain.Author{Name: authorName} + db.Create(author) + + work := &domain.Work{ + Title: title, + Authors: []*domain.Author{author}, + TranslatableModel: domain.TranslatableModel{ + Language: "en", + }, + } + db.Create(work) + return work +}