turash/bugulma/backend/internal/service/graph_sync_service.go
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

337 lines
10 KiB
Go

package service
import (
"bugulma/backend/internal/domain"
"bugulma/backend/internal/repository"
"context"
"fmt"
"log"
)
// GraphSyncService handles synchronization between PostgreSQL and Neo4j
type GraphSyncService struct {
orgGraphRepo *repository.GraphOrganizationRepository
siteGraphRepo *repository.GraphSiteRepository
addressGraphRepo *repository.GraphAddressRepository
flowGraphRepo *repository.GraphResourceFlowRepository
matchGraphRepo *repository.GraphMatchRepository
sharedAssetGraphRepo *repository.GraphSharedAssetRepository
}
// NewGraphSyncService creates a new graph sync service
func NewGraphSyncService(
orgGraphRepo *repository.GraphOrganizationRepository,
siteGraphRepo *repository.GraphSiteRepository,
addressGraphRepo *repository.GraphAddressRepository,
flowGraphRepo *repository.GraphResourceFlowRepository,
matchGraphRepo *repository.GraphMatchRepository,
sharedAssetGraphRepo *repository.GraphSharedAssetRepository,
) *GraphSyncService {
return &GraphSyncService{
orgGraphRepo: orgGraphRepo,
siteGraphRepo: siteGraphRepo,
addressGraphRepo: addressGraphRepo,
flowGraphRepo: flowGraphRepo,
matchGraphRepo: matchGraphRepo,
sharedAssetGraphRepo: sharedAssetGraphRepo,
}
}
// SyncOrganization syncs an organization to the graph database
func (s *GraphSyncService) SyncOrganization(ctx context.Context, org *domain.Organization) error {
if s.orgGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.orgGraphRepo.SyncToGraph(ctx, org); err != nil {
return fmt.Errorf("failed to sync organization to graph: %w", err)
}
log.Printf("Synced organization %s to graph database", org.ID)
return nil
}
// DeleteOrganization deletes an organization from the graph database
func (s *GraphSyncService) DeleteOrganization(ctx context.Context, id string) error {
if s.orgGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.orgGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete organization from graph: %w", err)
}
log.Printf("Deleted organization %s from graph database", id)
return nil
}
// SyncSite syncs a site to the graph database
func (s *GraphSyncService) SyncSite(ctx context.Context, site *domain.Site) error {
if s.siteGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.siteGraphRepo.SyncToGraph(ctx, site); err != nil {
return fmt.Errorf("failed to sync site to graph: %w", err)
}
log.Printf("Synced site %s to graph database", site.ID)
return nil
}
// DeleteSite deletes a site from the graph database
func (s *GraphSyncService) DeleteSite(ctx context.Context, id string) error {
if s.siteGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.siteGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete site from graph: %w", err)
}
log.Printf("Deleted site %s from graph database", id)
return nil
}
// SyncResourceFlow syncs a resource flow to the graph database
func (s *GraphSyncService) SyncResourceFlow(ctx context.Context, flow *domain.ResourceFlow) error {
if s.flowGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.flowGraphRepo.SyncToGraph(ctx, flow); err != nil {
return fmt.Errorf("failed to sync resource flow to graph: %w", err)
}
log.Printf("Synced resource flow %s to graph database", flow.ID)
return nil
}
// DeleteResourceFlow deletes a resource flow from the graph database
func (s *GraphSyncService) DeleteResourceFlow(ctx context.Context, id string) error {
if s.flowGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.flowGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete resource flow from graph: %w", err)
}
log.Printf("Deleted resource flow %s from graph database", id)
return nil
}
// FindMatchesInGraph uses Neo4j graph traversal for advanced matching
func (s *GraphSyncService) FindMatchesInGraph(ctx context.Context, flowID string, maxDistanceKm float64) ([]map[string]interface{}, error) {
if s.flowGraphRepo == nil {
return nil, fmt.Errorf("graph matching not available")
}
matches, err := s.flowGraphRepo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)
if err != nil {
return nil, fmt.Errorf("failed to find matches in graph: %w", err)
}
log.Printf("Found %d potential matches in graph for flow %s", len(matches), flowID)
return matches, nil
}
// BulkSyncOrganizations syncs multiple organizations to graph
func (s *GraphSyncService) BulkSyncOrganizations(ctx context.Context, orgs []*domain.Organization) error {
if s.orgGraphRepo == nil {
return nil
}
for _, org := range orgs {
if err := s.SyncOrganization(ctx, org); err != nil {
log.Printf("Warning: Failed to sync organization %s: %v", org.ID, err)
// Continue with other organizations
}
}
log.Printf("Bulk synced %d organizations to graph", len(orgs))
return nil
}
// BulkSyncSites syncs multiple sites to graph
func (s *GraphSyncService) BulkSyncSites(ctx context.Context, sites []*domain.Site) error {
if s.siteGraphRepo == nil {
return nil
}
for _, site := range sites {
if err := s.SyncSite(ctx, site); err != nil {
log.Printf("Warning: Failed to sync site %s: %v", site.ID, err)
// Continue with other sites
}
}
log.Printf("Bulk synced %d sites to graph", len(sites))
return nil
}
// BulkSyncResourceFlows syncs multiple resource flows to graph
func (s *GraphSyncService) BulkSyncResourceFlows(ctx context.Context, flows []*domain.ResourceFlow) error {
if s.flowGraphRepo == nil {
return nil
}
for _, flow := range flows {
if err := s.SyncResourceFlow(ctx, flow); err != nil {
log.Printf("Warning: Failed to sync resource flow %s: %v", flow.ID, err)
// Continue with other flows
}
}
log.Printf("Bulk synced %d resource flows to graph", len(flows))
return nil
}
// SyncAddress syncs an address to the graph database
func (s *GraphSyncService) SyncAddress(ctx context.Context, address *domain.Address) error {
if s.addressGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.addressGraphRepo.SyncToGraph(ctx, address); err != nil {
return fmt.Errorf("failed to sync address to graph: %w", err)
}
log.Printf("Synced address %s to graph database", address.ID)
return nil
}
// DeleteAddress deletes an address from the graph database
func (s *GraphSyncService) DeleteAddress(ctx context.Context, id string) error {
if s.addressGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.addressGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete address from graph: %w", err)
}
log.Printf("Deleted address %s from graph database", id)
return nil
}
// BulkSyncAddresses syncs multiple addresses to graph with their relationships
func (s *GraphSyncService) BulkSyncAddresses(ctx context.Context, addresses []*domain.Address) error {
if s.addressGraphRepo == nil {
return nil
}
for _, address := range addresses {
// Extract organization IDs from preloaded associations
var orgIDs []string
for _, org := range address.Organizations {
orgIDs = append(orgIDs, org.ID)
}
// Extract site IDs from preloaded associations
var siteIDs []string
for _, site := range address.Sites {
siteIDs = append(siteIDs, site.ID)
}
// Use atomic sync with relationships for better performance
if err := s.addressGraphRepo.SyncWithRelationships(ctx, address, orgIDs, siteIDs); err != nil {
log.Printf("Warning: Failed to sync address %s: %v", address.ID, err)
// Continue with other addresses
}
}
log.Printf("Bulk synced %d addresses to graph", len(addresses))
return nil
}
// SyncMatch syncs a match to the graph database
func (s *GraphSyncService) SyncMatch(ctx context.Context, match *domain.Match) error {
if s.matchGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.matchGraphRepo.SyncToGraph(ctx, match); err != nil {
return fmt.Errorf("failed to sync match to graph: %w", err)
}
log.Printf("Synced match %s to graph database", match.ID)
return nil
}
// DeleteMatch deletes a match from the graph database
func (s *GraphSyncService) DeleteMatch(ctx context.Context, id string) error {
if s.matchGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.matchGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete match from graph: %w", err)
}
log.Printf("Deleted match %s from graph database", id)
return nil
}
// BulkSyncMatches syncs multiple matches to graph
func (s *GraphSyncService) BulkSyncMatches(ctx context.Context, matches []*domain.Match) error {
if s.matchGraphRepo == nil {
return nil
}
for _, match := range matches {
if err := s.SyncMatch(ctx, match); err != nil {
log.Printf("Warning: Failed to sync match %s: %v", match.ID, err)
// Continue with other matches
}
}
log.Printf("Bulk synced %d matches to graph", len(matches))
return nil
}
// SyncSharedAsset syncs a shared asset to the graph database
func (s *GraphSyncService) SyncSharedAsset(ctx context.Context, asset *domain.SharedAsset) error {
if s.sharedAssetGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.sharedAssetGraphRepo.SyncToGraph(ctx, asset); err != nil {
return fmt.Errorf("failed to sync shared asset to graph: %w", err)
}
log.Printf("Synced shared asset %s to graph database", asset.ID)
return nil
}
// DeleteSharedAsset deletes a shared asset from the graph database
func (s *GraphSyncService) DeleteSharedAsset(ctx context.Context, id string) error {
if s.sharedAssetGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.sharedAssetGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete shared asset from graph: %w", err)
}
log.Printf("Deleted shared asset %s from graph database", id)
return nil
}
// BulkSyncSharedAssets syncs multiple shared assets to graph
func (s *GraphSyncService) BulkSyncSharedAssets(ctx context.Context, assets []*domain.SharedAsset) error {
if s.sharedAssetGraphRepo == nil {
return nil
}
for _, asset := range assets {
if err := s.SyncSharedAsset(ctx, asset); err != nil {
log.Printf("Warning: Failed to sync shared asset %s: %v", asset.ID, err)
// Continue with other assets
}
}
log.Printf("Bulk synced %d shared assets to graph", len(assets))
return nil
}