turash/bugulma/backend/internal/service/event_driven_matching_service.go

311 lines
10 KiB
Go

package service
import (
"context"
"fmt"
"log"
"time"
"bugulma/backend/internal/domain"
"bugulma/backend/internal/matching"
"bugulma/backend/internal/matching/engine"
)
// EventDrivenMatchingService provides real-time match updates via event-driven architecture
type EventDrivenMatchingService struct {
eventBus domain.EventBus
matchingService *matching.Service
resourceFlowRepo domain.ResourceFlowRepository
matchRepo domain.MatchRepository
websocketHub *WebSocketHub
incrementalCalc *IncrementalMatchCalculator
}
// NewEventDrivenMatchingService creates a new event-driven matching service
func NewEventDrivenMatchingService(
eventBus domain.EventBus,
matchingService *matching.Service,
resourceFlowRepo domain.ResourceFlowRepository,
matchRepo domain.MatchRepository,
websocketHub *WebSocketHub,
economicService *EconomicService,
) *EventDrivenMatchingService {
return &EventDrivenMatchingService{
eventBus: eventBus,
matchingService: matchingService,
resourceFlowRepo: resourceFlowRepo,
matchRepo: matchRepo,
websocketHub: websocketHub,
incrementalCalc: NewIncrementalMatchCalculator(
matchingService,
matchRepo,
economicService,
),
}
}
// StartEventProcessing begins listening for events that affect matches
func (edms *EventDrivenMatchingService) StartEventProcessing(ctx context.Context) error {
if edms.eventBus == nil {
log.Println("Event bus not available, skipping event processing")
return nil
}
// Subscribe to resource flow events
if err := edms.eventBus.Subscribe("resource_flow.*", func(event domain.Event) error {
return edms.handleResourceFlowEvent(ctx, event)
}); err != nil {
return fmt.Errorf("failed to subscribe to resource flow events: %w", err)
}
// Subscribe to organization events
if err := edms.eventBus.Subscribe("organization.*", func(event domain.Event) error {
return edms.handleOrganizationEvent(ctx, event)
}); err != nil {
return fmt.Errorf("failed to subscribe to organization events: %w", err)
}
// Subscribe to match events
if err := edms.eventBus.Subscribe("match.*", edms.handleMatchEvent); err != nil {
return fmt.Errorf("failed to subscribe to match events: %w", err)
}
log.Println("Event-driven matching service started")
return nil
}
// handleResourceFlowEvent processes resource flow changes
func (edms *EventDrivenMatchingService) handleResourceFlowEvent(ctx context.Context, event domain.Event) error {
log.Printf("Processing resource flow event: %s for entity %s", event.Type, event.EntityID)
switch event.Type {
case domain.EventTypeResourceFlowCreated:
return edms.handleResourceFlowCreated(ctx, event)
case domain.EventTypeResourceFlowUpdated:
return edms.handleResourceFlowUpdated(ctx, event)
case domain.EventTypeResourceFlowDeleted:
return edms.handleResourceFlowDeleted(ctx, event)
default:
log.Printf("Unhandled resource flow event type: %s", event.Type)
return nil
}
}
// handleOrganizationEvent processes organization changes
func (edms *EventDrivenMatchingService) handleOrganizationEvent(ctx context.Context, event domain.Event) error {
log.Printf("Processing organization event: %s for entity %s", event.Type, event.EntityID)
// Organization changes can affect matches for that organization
// Invalidate cache and trigger recalculation
affectedMatches, err := edms.findMatchesByOrganization(ctx, event.EntityID)
if err != nil {
return fmt.Errorf("failed to find matches for organization: %w", err)
}
return edms.processAffectedMatches(affectedMatches, event)
}
// handleMatchEvent processes match-related events
func (edms *EventDrivenMatchingService) handleMatchEvent(event domain.Event) error {
log.Printf("Processing match event: %s for entity %s", event.Type, event.EntityID)
// Notify WebSocket clients about match changes
edms.notifyMatchUpdate(event)
return nil
}
// handleResourceFlowCreated processes new resource flow creation
func (edms *EventDrivenMatchingService) handleResourceFlowCreated(ctx context.Context, event domain.Event) error {
// Find potential matches for the new resource flow
flow, err := edms.resourceFlowRepo.GetByID(ctx, event.EntityID)
if err != nil {
return fmt.Errorf("failed to get resource flow: %w", err)
}
// Run matching algorithm for this flow
candidates, err := edms.findPotentialMatchesForFlow(ctx, flow)
if err != nil {
return fmt.Errorf("failed to find matches for new flow: %w", err)
}
// Create matches for high-confidence candidates
for _, candidate := range candidates {
if candidate.OverallScore > 0.7 { // High confidence threshold
// Convert to engine.Candidate
engineCandidate := &engine.Candidate{
SourceFlow: candidate.SourceFlow,
TargetFlow: candidate.TargetFlow,
DistanceKm: candidate.DistanceKm,
CompatibilityScore: candidate.CompatibilityScore,
EconomicScore: candidate.EconomicScore,
TemporalScore: candidate.TemporalScore,
QualityScore: candidate.QualityScore,
OverallScore: candidate.OverallScore,
Priority: candidate.Priority,
RiskLevel: candidate.RiskLevel,
}
_, err := edms.matchingService.CreateMatch(context.Background(), engineCandidate, "system")
if err != nil {
log.Printf("Failed to create match for candidate: %v", err)
continue
}
// Notify via WebSocket
edms.notifyNewMatch(candidate, event.OrgID)
}
}
return nil
}
// handleResourceFlowUpdated processes resource flow updates
func (edms *EventDrivenMatchingService) handleResourceFlowUpdated(ctx context.Context, event domain.Event) error {
// Find and invalidate affected matches
affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID)
if err != nil {
return fmt.Errorf("failed to find affected matches: %w", err)
}
return edms.processAffectedMatches(affectedMatches, event)
}
// handleResourceFlowDeleted processes resource flow deletion
func (edms *EventDrivenMatchingService) handleResourceFlowDeleted(ctx context.Context, event domain.Event) error {
// Find matches that depend on this resource flow and mark them as affected
affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID)
if err != nil {
return fmt.Errorf("failed to find affected matches: %w", err)
}
// Cancel matches that depend on deleted resource flows
for _, matchID := range affectedMatches {
err := edms.matchingService.UpdateMatchStatus(
context.Background(),
matchID,
domain.MatchStatusCancelled,
"system",
"Resource flow was deleted",
)
if err != nil {
log.Printf("Failed to cancel match %s: %v", matchID, err)
}
}
return nil
}
// processAffectedMatches handles matches affected by data changes
func (edms *EventDrivenMatchingService) processAffectedMatches(matchIDs []string, event domain.Event) error {
for _, matchID := range matchIDs {
// Notify clients about the change
edms.notifyMatchUpdate(event)
// Optionally trigger incremental recalculation
// This could be expensive, so we might want to batch or delay
go func(id string) {
if err := edms.incrementalCalc.RecalculateMatch(context.Background(), id); err != nil {
log.Printf("Failed to recalculate match %s: %v", id, err)
}
}(matchID)
}
return nil
}
// findPotentialMatchesForFlow finds potential matches for a resource flow
func (edms *EventDrivenMatchingService) findPotentialMatchesForFlow(ctx context.Context, flow *domain.ResourceFlow) ([]*engine.Candidate, error) {
// Use the existing matching criteria logic
criteria := matching.Criteria{
ResourceType: flow.Type,
MaxDistanceKm: 50.0, // Default distance
MinCompatibility: 0.3,
MinEconomicScore: 0.2,
MinTemporalScore: 0.4,
MinQualityScore: 0.4,
}
return edms.matchingService.FindMatches(ctx, criteria)
}
// findMatchesByResourceFlow finds matches that involve a specific resource flow
func (edms *EventDrivenMatchingService) findMatchesByResourceFlow(ctx context.Context, flowID string) ([]string, error) {
matches, err := edms.matchRepo.GetByResourceID(ctx, flowID)
if err != nil {
return nil, err
}
matchIDs := make([]string, len(matches))
for i, match := range matches {
matchIDs[i] = match.ID
}
return matchIDs, nil
}
// findMatchesByOrganization finds matches for a specific organization
func (edms *EventDrivenMatchingService) findMatchesByOrganization(ctx context.Context, orgID string) ([]string, error) {
matches, err := edms.matchRepo.GetByOrganizationID(ctx, orgID)
if err != nil {
return nil, err
}
matchIDs := make([]string, len(matches))
for i, match := range matches {
matchIDs[i] = match.ID
}
return matchIDs, nil
}
// notifyMatchUpdate sends WebSocket notification about match changes
func (edms *EventDrivenMatchingService) notifyMatchUpdate(event domain.Event) {
if edms.websocketHub != nil {
message := domain.WebSocketMessage{
Type: "match_updated",
Payload: map[string]interface{}{
"match_id": event.EntityID,
"event": event.Type,
"timestamp": event.Timestamp,
},
Timestamp: time.Now(),
}
edms.websocketHub.BroadcastToOrganization(event.OrgID, message)
}
}
// notifyNewMatch sends WebSocket notification about new matches
func (edms *EventDrivenMatchingService) notifyNewMatch(candidate *engine.Candidate, orgID string) {
if edms.websocketHub != nil {
message := domain.WebSocketMessage{
Type: "new_match",
Payload: map[string]interface{}{
"compatibility_score": candidate.CompatibilityScore,
"economic_score": candidate.EconomicScore,
"overall_score": candidate.OverallScore,
"distance_km": candidate.DistanceKm,
},
Timestamp: time.Now(),
}
edms.websocketHub.BroadcastToOrganization(orgID, message)
}
}
// PublishEvent publishes an event to the event bus
func (edms *EventDrivenMatchingService) PublishEvent(eventType domain.EventType, entityID, orgID, userID string, payload map[string]interface{}) error {
event := domain.Event{
ID: fmt.Sprintf("%s-%d", entityID, time.Now().UnixNano()),
Type: eventType,
EntityID: entityID,
OrgID: orgID,
UserID: userID,
Timestamp: time.Now(),
Payload: payload,
}
return edms.eventBus.Publish(event)
}