mirror of
https://github.com/SamyRai/turash.git
synced 2025-12-26 23:01:33 +00:00
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools
Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
* GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
* GET /api/v1/users/me/organizations - User organizations
* POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue
API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules
Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
155 lines
4.0 KiB
Go
155 lines
4.0 KiB
Go
package service
|
|
|
|
import (
|
|
"bugulma/backend/internal/domain"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
// RedisEventBus implements domain.EventBus using Redis Streams
|
|
type RedisEventBus struct {
|
|
client *redis.Client
|
|
handlers map[string]func(domain.Event) error
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewRedisEventBus creates a new Redis-based event bus
|
|
func NewRedisEventBus(redisURL string) (*RedisEventBus, error) {
|
|
opt, err := redis.ParseURL(redisURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse Redis URL: %w", err)
|
|
}
|
|
|
|
client := redis.NewClient(opt)
|
|
|
|
// Test connection
|
|
ctx := context.Background()
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &RedisEventBus{
|
|
client: client,
|
|
handlers: make(map[string]func(domain.Event) error),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}, nil
|
|
}
|
|
|
|
// Publish sends an event to the event bus
|
|
func (eb *RedisEventBus) Publish(event domain.Event) error {
|
|
eventData, err := json.Marshal(event)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event: %w", err)
|
|
}
|
|
|
|
streamKey := fmt.Sprintf("events:%s", strings.ReplaceAll(string(event.Type), ".", ":"))
|
|
|
|
args := &redis.XAddArgs{
|
|
Stream: streamKey,
|
|
ID: "*", // Let Redis generate ID
|
|
Values: map[string]interface{}{
|
|
"event": string(eventData),
|
|
},
|
|
}
|
|
|
|
if err := eb.client.XAdd(eb.ctx, args).Err(); err != nil {
|
|
return fmt.Errorf("failed to publish event to Redis: %w", err)
|
|
}
|
|
|
|
log.Printf("Published event: %s for entity %s", event.Type, event.EntityID)
|
|
return nil
|
|
}
|
|
|
|
// Subscribe registers an event handler for events matching the pattern
|
|
func (eb *RedisEventBus) Subscribe(pattern string, handler func(domain.Event) error) error {
|
|
eb.handlers[pattern] = handler
|
|
|
|
// Convert pattern to Redis stream key pattern
|
|
streamPattern := fmt.Sprintf("events:%s", strings.ReplaceAll(pattern, ".", ":"))
|
|
|
|
// Start consumer goroutine
|
|
go eb.consumeStream(streamPattern, handler)
|
|
|
|
return nil
|
|
}
|
|
|
|
// consumeStream continuously consumes events from a Redis stream
|
|
func (eb *RedisEventBus) consumeStream(streamKey string, handler func(domain.Event) error) {
|
|
consumerGroup := "matching-service"
|
|
consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix())
|
|
|
|
// Create consumer group if it doesn't exist
|
|
eb.client.XGroupCreate(eb.ctx, streamKey, consumerGroup, "0").Err()
|
|
// Ignore error if group already exists
|
|
|
|
for {
|
|
select {
|
|
case <-eb.ctx.Done():
|
|
return
|
|
default:
|
|
// Read from stream
|
|
streams, err := eb.client.XReadGroup(eb.ctx, &redis.XReadGroupArgs{
|
|
Group: consumerGroup,
|
|
Consumer: consumerName,
|
|
Streams: []string{streamKey, ">"},
|
|
Count: 10,
|
|
Block: time.Second * 5,
|
|
}).Result()
|
|
|
|
if err != nil && err != redis.Nil {
|
|
log.Printf("Error reading from stream %s: %v", streamKey, err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
// Process messages
|
|
for _, stream := range streams {
|
|
for _, message := range stream.Messages {
|
|
if err := eb.processMessage(stream.Stream, message, handler); err != nil {
|
|
log.Printf("Error processing message: %v", err)
|
|
}
|
|
|
|
// Acknowledge message
|
|
eb.client.XAck(eb.ctx, stream.Stream, consumerGroup, message.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processMessage unmarshals and handles an event message
|
|
func (eb *RedisEventBus) processMessage(streamKey string, message redis.XMessage, handler func(domain.Event) error) error {
|
|
eventData, ok := message.Values["event"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("invalid event data in message")
|
|
}
|
|
|
|
var event domain.Event
|
|
if err := json.Unmarshal([]byte(eventData), &event); err != nil {
|
|
return fmt.Errorf("failed to unmarshal event: %w", err)
|
|
}
|
|
|
|
// Call the handler
|
|
if err := handler(event); err != nil {
|
|
return fmt.Errorf("event handler failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down the event bus
|
|
func (eb *RedisEventBus) Close() error {
|
|
eb.cancel()
|
|
return eb.client.Close()
|
|
}
|