turash/bugulma/backend/internal/service/redis_event_bus.go
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

155 lines
4.0 KiB
Go

package service
import (
"bugulma/backend/internal/domain"
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/go-redis/redis/v8"
)
// RedisEventBus implements domain.EventBus using Redis Streams
type RedisEventBus struct {
client *redis.Client
handlers map[string]func(domain.Event) error
ctx context.Context
cancel context.CancelFunc
}
// NewRedisEventBus creates a new Redis-based event bus
func NewRedisEventBus(redisURL string) (*RedisEventBus, error) {
opt, err := redis.ParseURL(redisURL)
if err != nil {
return nil, fmt.Errorf("failed to parse Redis URL: %w", err)
}
client := redis.NewClient(opt)
// Test connection
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &RedisEventBus{
client: client,
handlers: make(map[string]func(domain.Event) error),
ctx: ctx,
cancel: cancel,
}, nil
}
// Publish sends an event to the event bus
func (eb *RedisEventBus) Publish(event domain.Event) error {
eventData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
streamKey := fmt.Sprintf("events:%s", strings.ReplaceAll(string(event.Type), ".", ":"))
args := &redis.XAddArgs{
Stream: streamKey,
ID: "*", // Let Redis generate ID
Values: map[string]interface{}{
"event": string(eventData),
},
}
if err := eb.client.XAdd(eb.ctx, args).Err(); err != nil {
return fmt.Errorf("failed to publish event to Redis: %w", err)
}
log.Printf("Published event: %s for entity %s", event.Type, event.EntityID)
return nil
}
// Subscribe registers an event handler for events matching the pattern
func (eb *RedisEventBus) Subscribe(pattern string, handler func(domain.Event) error) error {
eb.handlers[pattern] = handler
// Convert pattern to Redis stream key pattern
streamPattern := fmt.Sprintf("events:%s", strings.ReplaceAll(pattern, ".", ":"))
// Start consumer goroutine
go eb.consumeStream(streamPattern, handler)
return nil
}
// consumeStream continuously consumes events from a Redis stream
func (eb *RedisEventBus) consumeStream(streamKey string, handler func(domain.Event) error) {
consumerGroup := "matching-service"
consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix())
// Create consumer group if it doesn't exist
eb.client.XGroupCreate(eb.ctx, streamKey, consumerGroup, "0").Err()
// Ignore error if group already exists
for {
select {
case <-eb.ctx.Done():
return
default:
// Read from stream
streams, err := eb.client.XReadGroup(eb.ctx, &redis.XReadGroupArgs{
Group: consumerGroup,
Consumer: consumerName,
Streams: []string{streamKey, ">"},
Count: 10,
Block: time.Second * 5,
}).Result()
if err != nil && err != redis.Nil {
log.Printf("Error reading from stream %s: %v", streamKey, err)
time.Sleep(time.Second)
continue
}
// Process messages
for _, stream := range streams {
for _, message := range stream.Messages {
if err := eb.processMessage(stream.Stream, message, handler); err != nil {
log.Printf("Error processing message: %v", err)
}
// Acknowledge message
eb.client.XAck(eb.ctx, stream.Stream, consumerGroup, message.ID)
}
}
}
}
}
// processMessage unmarshals and handles an event message
func (eb *RedisEventBus) processMessage(streamKey string, message redis.XMessage, handler func(domain.Event) error) error {
eventData, ok := message.Values["event"].(string)
if !ok {
return fmt.Errorf("invalid event data in message")
}
var event domain.Event
if err := json.Unmarshal([]byte(eventData), &event); err != nil {
return fmt.Errorf("failed to unmarshal event: %w", err)
}
// Call the handler
if err := handler(event); err != nil {
return fmt.Errorf("event handler failed: %w", err)
}
return nil
}
// Close shuts down the event bus
func (eb *RedisEventBus) Close() error {
eb.cancel()
return eb.client.Close()
}