package service import ( "context" "fmt" "log" "time" "bugulma/backend/internal/domain" "bugulma/backend/internal/matching" "bugulma/backend/internal/matching/engine" ) // EventDrivenMatchingService provides real-time match updates via event-driven architecture type EventDrivenMatchingService struct { eventBus domain.EventBus matchingService *matching.Service resourceFlowRepo domain.ResourceFlowRepository matchRepo domain.MatchRepository websocketHub *WebSocketHub incrementalCalc *IncrementalMatchCalculator } // NewEventDrivenMatchingService creates a new event-driven matching service func NewEventDrivenMatchingService( eventBus domain.EventBus, matchingService *matching.Service, resourceFlowRepo domain.ResourceFlowRepository, matchRepo domain.MatchRepository, websocketHub *WebSocketHub, economicService *EconomicService, ) *EventDrivenMatchingService { return &EventDrivenMatchingService{ eventBus: eventBus, matchingService: matchingService, resourceFlowRepo: resourceFlowRepo, matchRepo: matchRepo, websocketHub: websocketHub, incrementalCalc: NewIncrementalMatchCalculator( matchingService, matchRepo, economicService, ), } } // StartEventProcessing begins listening for events that affect matches func (edms *EventDrivenMatchingService) StartEventProcessing(ctx context.Context) error { if edms.eventBus == nil { log.Println("Event bus not available, skipping event processing") return nil } // Subscribe to resource flow events if err := edms.eventBus.Subscribe("resource_flow.*", func(event domain.Event) error { return edms.handleResourceFlowEvent(ctx, event) }); err != nil { return fmt.Errorf("failed to subscribe to resource flow events: %w", err) } // Subscribe to organization events if err := edms.eventBus.Subscribe("organization.*", func(event domain.Event) error { return edms.handleOrganizationEvent(ctx, event) }); err != nil { return fmt.Errorf("failed to subscribe to organization events: %w", err) } // Subscribe to match events if err := edms.eventBus.Subscribe("match.*", edms.handleMatchEvent); err != nil { return fmt.Errorf("failed to subscribe to match events: %w", err) } log.Println("Event-driven matching service started") return nil } // handleResourceFlowEvent processes resource flow changes func (edms *EventDrivenMatchingService) handleResourceFlowEvent(ctx context.Context, event domain.Event) error { log.Printf("Processing resource flow event: %s for entity %s", event.Type, event.EntityID) switch event.Type { case domain.EventTypeResourceFlowCreated: return edms.handleResourceFlowCreated(ctx, event) case domain.EventTypeResourceFlowUpdated: return edms.handleResourceFlowUpdated(ctx, event) case domain.EventTypeResourceFlowDeleted: return edms.handleResourceFlowDeleted(ctx, event) default: log.Printf("Unhandled resource flow event type: %s", event.Type) return nil } } // handleOrganizationEvent processes organization changes func (edms *EventDrivenMatchingService) handleOrganizationEvent(ctx context.Context, event domain.Event) error { log.Printf("Processing organization event: %s for entity %s", event.Type, event.EntityID) // Organization changes can affect matches for that organization // Invalidate cache and trigger recalculation affectedMatches, err := edms.findMatchesByOrganization(ctx, event.EntityID) if err != nil { return fmt.Errorf("failed to find matches for organization: %w", err) } return edms.processAffectedMatches(affectedMatches, event) } // handleMatchEvent processes match-related events func (edms *EventDrivenMatchingService) handleMatchEvent(event domain.Event) error { log.Printf("Processing match event: %s for entity %s", event.Type, event.EntityID) // Notify WebSocket clients about match changes edms.notifyMatchUpdate(event) return nil } // handleResourceFlowCreated processes new resource flow creation func (edms *EventDrivenMatchingService) handleResourceFlowCreated(ctx context.Context, event domain.Event) error { // Find potential matches for the new resource flow flow, err := edms.resourceFlowRepo.GetByID(ctx, event.EntityID) if err != nil { return fmt.Errorf("failed to get resource flow: %w", err) } // Run matching algorithm for this flow candidates, err := edms.findPotentialMatchesForFlow(ctx, flow) if err != nil { return fmt.Errorf("failed to find matches for new flow: %w", err) } // Create matches for high-confidence candidates for _, candidate := range candidates { if candidate.OverallScore > 0.7 { // High confidence threshold // Convert to engine.Candidate engineCandidate := &engine.Candidate{ SourceFlow: candidate.SourceFlow, TargetFlow: candidate.TargetFlow, DistanceKm: candidate.DistanceKm, CompatibilityScore: candidate.CompatibilityScore, EconomicScore: candidate.EconomicScore, TemporalScore: candidate.TemporalScore, QualityScore: candidate.QualityScore, OverallScore: candidate.OverallScore, Priority: candidate.Priority, RiskLevel: candidate.RiskLevel, } _, err := edms.matchingService.CreateMatch(context.Background(), engineCandidate, "system") if err != nil { log.Printf("Failed to create match for candidate: %v", err) continue } // Notify via WebSocket edms.notifyNewMatch(candidate, event.OrgID) } } return nil } // handleResourceFlowUpdated processes resource flow updates func (edms *EventDrivenMatchingService) handleResourceFlowUpdated(ctx context.Context, event domain.Event) error { // Find and invalidate affected matches affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID) if err != nil { return fmt.Errorf("failed to find affected matches: %w", err) } return edms.processAffectedMatches(affectedMatches, event) } // handleResourceFlowDeleted processes resource flow deletion func (edms *EventDrivenMatchingService) handleResourceFlowDeleted(ctx context.Context, event domain.Event) error { // Find matches that depend on this resource flow and mark them as affected affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID) if err != nil { return fmt.Errorf("failed to find affected matches: %w", err) } // Cancel matches that depend on deleted resource flows for _, matchID := range affectedMatches { err := edms.matchingService.UpdateMatchStatus( context.Background(), matchID, domain.MatchStatusCancelled, "system", "Resource flow was deleted", ) if err != nil { log.Printf("Failed to cancel match %s: %v", matchID, err) } } return nil } // processAffectedMatches handles matches affected by data changes func (edms *EventDrivenMatchingService) processAffectedMatches(matchIDs []string, event domain.Event) error { for _, matchID := range matchIDs { // Notify clients about the change edms.notifyMatchUpdate(event) // Optionally trigger incremental recalculation // This could be expensive, so we might want to batch or delay go func(id string) { if err := edms.incrementalCalc.RecalculateMatch(context.Background(), id); err != nil { log.Printf("Failed to recalculate match %s: %v", id, err) } }(matchID) } return nil } // findPotentialMatchesForFlow finds potential matches for a resource flow func (edms *EventDrivenMatchingService) findPotentialMatchesForFlow(ctx context.Context, flow *domain.ResourceFlow) ([]*engine.Candidate, error) { // Use the existing matching criteria logic criteria := matching.Criteria{ ResourceType: flow.Type, MaxDistanceKm: 50.0, // Default distance MinCompatibility: 0.3, MinEconomicScore: 0.2, MinTemporalScore: 0.4, MinQualityScore: 0.4, } return edms.matchingService.FindMatches(ctx, criteria) } // findMatchesByResourceFlow finds matches that involve a specific resource flow func (edms *EventDrivenMatchingService) findMatchesByResourceFlow(ctx context.Context, flowID string) ([]string, error) { matches, err := edms.matchRepo.GetByResourceID(ctx, flowID) if err != nil { return nil, err } matchIDs := make([]string, len(matches)) for i, match := range matches { matchIDs[i] = match.ID } return matchIDs, nil } // findMatchesByOrganization finds matches for a specific organization func (edms *EventDrivenMatchingService) findMatchesByOrganization(ctx context.Context, orgID string) ([]string, error) { matches, err := edms.matchRepo.GetByOrganizationID(ctx, orgID) if err != nil { return nil, err } matchIDs := make([]string, len(matches)) for i, match := range matches { matchIDs[i] = match.ID } return matchIDs, nil } // notifyMatchUpdate sends WebSocket notification about match changes func (edms *EventDrivenMatchingService) notifyMatchUpdate(event domain.Event) { if edms.websocketHub != nil { message := domain.WebSocketMessage{ Type: "match_updated", Payload: map[string]interface{}{ "match_id": event.EntityID, "event": event.Type, "timestamp": event.Timestamp, }, Timestamp: time.Now(), } edms.websocketHub.BroadcastToOrganization(event.OrgID, message) } } // notifyNewMatch sends WebSocket notification about new matches func (edms *EventDrivenMatchingService) notifyNewMatch(candidate *engine.Candidate, orgID string) { if edms.websocketHub != nil { message := domain.WebSocketMessage{ Type: "new_match", Payload: map[string]interface{}{ "compatibility_score": candidate.CompatibilityScore, "economic_score": candidate.EconomicScore, "overall_score": candidate.OverallScore, "distance_km": candidate.DistanceKm, }, Timestamp: time.Now(), } edms.websocketHub.BroadcastToOrganization(orgID, message) } } // PublishEvent publishes an event to the event bus func (edms *EventDrivenMatchingService) PublishEvent(eventType domain.EventType, entityID, orgID, userID string, payload map[string]interface{}) error { event := domain.Event{ ID: fmt.Sprintf("%s-%d", entityID, time.Now().UnixNano()), Type: eventType, EntityID: entityID, OrgID: orgID, UserID: userID, Timestamp: time.Now(), Payload: payload, } return edms.eventBus.Publish(event) }