package service import ( "bugulma/backend/internal/domain" "context" "encoding/json" "fmt" "log" "strings" "time" "github.com/go-redis/redis/v8" ) // RedisEventBus implements domain.EventBus using Redis Streams type RedisEventBus struct { client *redis.Client handlers map[string]func(domain.Event) error ctx context.Context cancel context.CancelFunc } // NewRedisEventBus creates a new Redis-based event bus func NewRedisEventBus(redisURL string) (*RedisEventBus, error) { opt, err := redis.ParseURL(redisURL) if err != nil { return nil, fmt.Errorf("failed to parse Redis URL: %w", err) } client := redis.NewClient(opt) // Test connection ctx := context.Background() if err := client.Ping(ctx).Err(); err != nil { return nil, fmt.Errorf("failed to connect to Redis: %w", err) } ctx, cancel := context.WithCancel(context.Background()) return &RedisEventBus{ client: client, handlers: make(map[string]func(domain.Event) error), ctx: ctx, cancel: cancel, }, nil } // Publish sends an event to the event bus func (eb *RedisEventBus) Publish(event domain.Event) error { eventData, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } streamKey := fmt.Sprintf("events:%s", strings.ReplaceAll(string(event.Type), ".", ":")) args := &redis.XAddArgs{ Stream: streamKey, ID: "*", // Let Redis generate ID Values: map[string]interface{}{ "event": string(eventData), }, } if err := eb.client.XAdd(eb.ctx, args).Err(); err != nil { return fmt.Errorf("failed to publish event to Redis: %w", err) } log.Printf("Published event: %s for entity %s", event.Type, event.EntityID) return nil } // Subscribe registers an event handler for events matching the pattern func (eb *RedisEventBus) Subscribe(pattern string, handler func(domain.Event) error) error { eb.handlers[pattern] = handler // Convert pattern to Redis stream key pattern streamPattern := fmt.Sprintf("events:%s", strings.ReplaceAll(pattern, ".", ":")) // Start consumer goroutine go eb.consumeStream(streamPattern, handler) return nil } // consumeStream continuously consumes events from a Redis stream func (eb *RedisEventBus) consumeStream(streamKey string, handler func(domain.Event) error) { consumerGroup := "matching-service" consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix()) // Create consumer group if it doesn't exist eb.client.XGroupCreate(eb.ctx, streamKey, consumerGroup, "0").Err() // Ignore error if group already exists for { select { case <-eb.ctx.Done(): return default: // Read from stream streams, err := eb.client.XReadGroup(eb.ctx, &redis.XReadGroupArgs{ Group: consumerGroup, Consumer: consumerName, Streams: []string{streamKey, ">"}, Count: 10, Block: time.Second * 5, }).Result() if err != nil && err != redis.Nil { log.Printf("Error reading from stream %s: %v", streamKey, err) time.Sleep(time.Second) continue } // Process messages for _, stream := range streams { for _, message := range stream.Messages { if err := eb.processMessage(stream.Stream, message, handler); err != nil { log.Printf("Error processing message: %v", err) } // Acknowledge message eb.client.XAck(eb.ctx, stream.Stream, consumerGroup, message.ID) } } } } } // processMessage unmarshals and handles an event message func (eb *RedisEventBus) processMessage(streamKey string, message redis.XMessage, handler func(domain.Event) error) error { eventData, ok := message.Values["event"].(string) if !ok { return fmt.Errorf("invalid event data in message") } var event domain.Event if err := json.Unmarshal([]byte(eventData), &event); err != nil { return fmt.Errorf("failed to unmarshal event: %w", err) } // Call the handler if err := handler(event); err != nil { return fmt.Errorf("event handler failed: %w", err) } return nil } // Close shuts down the event bus func (eb *RedisEventBus) Close() error { eb.cancel() return eb.client.Close() }