mirror of
https://github.com/SamyRai/turash.git
synced 2025-12-26 23:01:33 +00:00
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools
Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
* GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
* GET /api/v1/users/me/organizations - User organizations
* POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue
API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules
Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
309 lines
10 KiB
Go
309 lines
10 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"bugulma/backend/internal/domain"
|
|
"bugulma/backend/internal/matching"
|
|
"bugulma/backend/internal/matching/engine"
|
|
)
|
|
|
|
// EventDrivenMatchingService provides real-time match updates via event-driven architecture
|
|
type EventDrivenMatchingService struct {
|
|
eventBus domain.EventBus
|
|
matchingService *matching.Service
|
|
resourceFlowRepo domain.ResourceFlowRepository
|
|
matchRepo domain.MatchRepository
|
|
websocketHub *WebSocketHub
|
|
incrementalCalc *IncrementalMatchCalculator
|
|
}
|
|
|
|
// NewEventDrivenMatchingService creates a new event-driven matching service
|
|
func NewEventDrivenMatchingService(
|
|
eventBus domain.EventBus,
|
|
matchingService *matching.Service,
|
|
resourceFlowRepo domain.ResourceFlowRepository,
|
|
matchRepo domain.MatchRepository,
|
|
websocketHub *WebSocketHub,
|
|
) *EventDrivenMatchingService {
|
|
|
|
return &EventDrivenMatchingService{
|
|
eventBus: eventBus,
|
|
matchingService: matchingService,
|
|
resourceFlowRepo: resourceFlowRepo,
|
|
matchRepo: matchRepo,
|
|
websocketHub: websocketHub,
|
|
incrementalCalc: &IncrementalMatchCalculator{
|
|
matchingService: matchingService,
|
|
matchRepo: matchRepo,
|
|
},
|
|
}
|
|
}
|
|
|
|
// StartEventProcessing begins listening for events that affect matches
|
|
func (edms *EventDrivenMatchingService) StartEventProcessing(ctx context.Context) error {
|
|
if edms.eventBus == nil {
|
|
log.Println("Event bus not available, skipping event processing")
|
|
return nil
|
|
}
|
|
|
|
// Subscribe to resource flow events
|
|
if err := edms.eventBus.Subscribe("resource_flow.*", func(event domain.Event) error {
|
|
return edms.handleResourceFlowEvent(ctx, event)
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to subscribe to resource flow events: %w", err)
|
|
}
|
|
|
|
// Subscribe to organization events
|
|
if err := edms.eventBus.Subscribe("organization.*", func(event domain.Event) error {
|
|
return edms.handleOrganizationEvent(ctx, event)
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to subscribe to organization events: %w", err)
|
|
}
|
|
|
|
// Subscribe to match events
|
|
if err := edms.eventBus.Subscribe("match.*", edms.handleMatchEvent); err != nil {
|
|
return fmt.Errorf("failed to subscribe to match events: %w", err)
|
|
}
|
|
|
|
log.Println("Event-driven matching service started")
|
|
return nil
|
|
}
|
|
|
|
// handleResourceFlowEvent processes resource flow changes
|
|
func (edms *EventDrivenMatchingService) handleResourceFlowEvent(ctx context.Context, event domain.Event) error {
|
|
log.Printf("Processing resource flow event: %s for entity %s", event.Type, event.EntityID)
|
|
|
|
switch event.Type {
|
|
case domain.EventTypeResourceFlowCreated:
|
|
return edms.handleResourceFlowCreated(ctx, event)
|
|
case domain.EventTypeResourceFlowUpdated:
|
|
return edms.handleResourceFlowUpdated(ctx, event)
|
|
case domain.EventTypeResourceFlowDeleted:
|
|
return edms.handleResourceFlowDeleted(ctx, event)
|
|
default:
|
|
log.Printf("Unhandled resource flow event type: %s", event.Type)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// handleOrganizationEvent processes organization changes
|
|
func (edms *EventDrivenMatchingService) handleOrganizationEvent(ctx context.Context, event domain.Event) error {
|
|
log.Printf("Processing organization event: %s for entity %s", event.Type, event.EntityID)
|
|
|
|
// Organization changes can affect matches for that organization
|
|
// Invalidate cache and trigger recalculation
|
|
affectedMatches, err := edms.findMatchesByOrganization(ctx, event.EntityID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find matches for organization: %w", err)
|
|
}
|
|
|
|
return edms.processAffectedMatches(affectedMatches, event)
|
|
}
|
|
|
|
// handleMatchEvent processes match-related events
|
|
func (edms *EventDrivenMatchingService) handleMatchEvent(event domain.Event) error {
|
|
log.Printf("Processing match event: %s for entity %s", event.Type, event.EntityID)
|
|
|
|
// Notify WebSocket clients about match changes
|
|
edms.notifyMatchUpdate(event)
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleResourceFlowCreated processes new resource flow creation
|
|
func (edms *EventDrivenMatchingService) handleResourceFlowCreated(ctx context.Context, event domain.Event) error {
|
|
// Find potential matches for the new resource flow
|
|
flow, err := edms.resourceFlowRepo.GetByID(ctx, event.EntityID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get resource flow: %w", err)
|
|
}
|
|
|
|
// Run matching algorithm for this flow
|
|
candidates, err := edms.findPotentialMatchesForFlow(ctx, flow)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find matches for new flow: %w", err)
|
|
}
|
|
|
|
// Create matches for high-confidence candidates
|
|
for _, candidate := range candidates {
|
|
if candidate.OverallScore > 0.7 { // High confidence threshold
|
|
// Convert to engine.Candidate
|
|
engineCandidate := &engine.Candidate{
|
|
SourceFlow: candidate.SourceFlow,
|
|
TargetFlow: candidate.TargetFlow,
|
|
DistanceKm: candidate.DistanceKm,
|
|
CompatibilityScore: candidate.CompatibilityScore,
|
|
EconomicScore: candidate.EconomicScore,
|
|
TemporalScore: candidate.TemporalScore,
|
|
QualityScore: candidate.QualityScore,
|
|
OverallScore: candidate.OverallScore,
|
|
Priority: candidate.Priority,
|
|
RiskLevel: candidate.RiskLevel,
|
|
}
|
|
_, err := edms.matchingService.CreateMatch(context.Background(), engineCandidate, "system")
|
|
if err != nil {
|
|
log.Printf("Failed to create match for candidate: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Notify via WebSocket
|
|
edms.notifyNewMatch(candidate, event.OrgID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleResourceFlowUpdated processes resource flow updates
|
|
func (edms *EventDrivenMatchingService) handleResourceFlowUpdated(ctx context.Context, event domain.Event) error {
|
|
// Find and invalidate affected matches
|
|
affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find affected matches: %w", err)
|
|
}
|
|
|
|
return edms.processAffectedMatches(affectedMatches, event)
|
|
}
|
|
|
|
// handleResourceFlowDeleted processes resource flow deletion
|
|
func (edms *EventDrivenMatchingService) handleResourceFlowDeleted(ctx context.Context, event domain.Event) error {
|
|
// Find matches that depend on this resource flow and mark them as affected
|
|
affectedMatches, err := edms.findMatchesByResourceFlow(ctx, event.EntityID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find affected matches: %w", err)
|
|
}
|
|
|
|
// Cancel matches that depend on deleted resource flows
|
|
for _, matchID := range affectedMatches {
|
|
err := edms.matchingService.UpdateMatchStatus(
|
|
context.Background(),
|
|
matchID,
|
|
domain.MatchStatusCancelled,
|
|
"system",
|
|
"Resource flow was deleted",
|
|
)
|
|
if err != nil {
|
|
log.Printf("Failed to cancel match %s: %v", matchID, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processAffectedMatches handles matches affected by data changes
|
|
func (edms *EventDrivenMatchingService) processAffectedMatches(matchIDs []string, event domain.Event) error {
|
|
for _, matchID := range matchIDs {
|
|
// Notify clients about the change
|
|
edms.notifyMatchUpdate(event)
|
|
|
|
// Optionally trigger incremental recalculation
|
|
// This could be expensive, so we might want to batch or delay
|
|
go func(id string) {
|
|
if err := edms.incrementalCalc.RecalculateMatch(context.Background(), id); err != nil {
|
|
log.Printf("Failed to recalculate match %s: %v", id, err)
|
|
}
|
|
}(matchID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// findPotentialMatchesForFlow finds potential matches for a resource flow
|
|
func (edms *EventDrivenMatchingService) findPotentialMatchesForFlow(ctx context.Context, flow *domain.ResourceFlow) ([]*engine.Candidate, error) {
|
|
// Use the existing matching criteria logic
|
|
criteria := matching.Criteria{
|
|
ResourceType: flow.Type,
|
|
MaxDistanceKm: 50.0, // Default distance
|
|
MinCompatibility: 0.3,
|
|
MinEconomicScore: 0.2,
|
|
MinTemporalScore: 0.4,
|
|
MinQualityScore: 0.4,
|
|
}
|
|
|
|
return edms.matchingService.FindMatches(ctx, criteria)
|
|
}
|
|
|
|
// findMatchesByResourceFlow finds matches that involve a specific resource flow
|
|
func (edms *EventDrivenMatchingService) findMatchesByResourceFlow(ctx context.Context, flowID string) ([]string, error) {
|
|
matches, err := edms.matchRepo.GetByResourceID(ctx, flowID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
matchIDs := make([]string, len(matches))
|
|
for i, match := range matches {
|
|
matchIDs[i] = match.ID
|
|
}
|
|
|
|
return matchIDs, nil
|
|
}
|
|
|
|
// findMatchesByOrganization finds matches for a specific organization
|
|
func (edms *EventDrivenMatchingService) findMatchesByOrganization(ctx context.Context, orgID string) ([]string, error) {
|
|
matches, err := edms.matchRepo.GetByOrganizationID(ctx, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
matchIDs := make([]string, len(matches))
|
|
for i, match := range matches {
|
|
matchIDs[i] = match.ID
|
|
}
|
|
|
|
return matchIDs, nil
|
|
}
|
|
|
|
// notifyMatchUpdate sends WebSocket notification about match changes
|
|
func (edms *EventDrivenMatchingService) notifyMatchUpdate(event domain.Event) {
|
|
if edms.websocketHub != nil {
|
|
message := domain.WebSocketMessage{
|
|
Type: "match_updated",
|
|
Payload: map[string]interface{}{
|
|
"match_id": event.EntityID,
|
|
"event": event.Type,
|
|
"timestamp": event.Timestamp,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
edms.websocketHub.BroadcastToOrganization(event.OrgID, message)
|
|
}
|
|
}
|
|
|
|
// notifyNewMatch sends WebSocket notification about new matches
|
|
func (edms *EventDrivenMatchingService) notifyNewMatch(candidate *engine.Candidate, orgID string) {
|
|
if edms.websocketHub != nil {
|
|
message := domain.WebSocketMessage{
|
|
Type: "new_match",
|
|
Payload: map[string]interface{}{
|
|
"compatibility_score": candidate.CompatibilityScore,
|
|
"economic_score": candidate.EconomicScore,
|
|
"overall_score": candidate.OverallScore,
|
|
"distance_km": candidate.DistanceKm,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
edms.websocketHub.BroadcastToOrganization(orgID, message)
|
|
}
|
|
}
|
|
|
|
// PublishEvent publishes an event to the event bus
|
|
func (edms *EventDrivenMatchingService) PublishEvent(eventType domain.EventType, entityID, orgID, userID string, payload map[string]interface{}) error {
|
|
event := domain.Event{
|
|
ID: fmt.Sprintf("%s-%d", entityID, time.Now().UnixNano()),
|
|
Type: eventType,
|
|
EntityID: entityID,
|
|
OrgID: orgID,
|
|
UserID: userID,
|
|
Timestamp: time.Now(),
|
|
Payload: payload,
|
|
}
|
|
|
|
return edms.eventBus.Publish(event)
|
|
}
|