turash/bugulma/backend/internal/service/graph_sync_service.go

433 lines
13 KiB
Go

package service
import (
"bugulma/backend/internal/domain"
"bugulma/backend/internal/repository"
"context"
"fmt"
"log"
)
// GraphSyncService handles synchronization between PostgreSQL and Neo4j
type GraphSyncService struct {
orgGraphRepo *repository.GraphOrganizationRepository
siteGraphRepo *repository.GraphSiteRepository
addressGraphRepo *repository.GraphAddressRepository
flowGraphRepo *repository.GraphResourceFlowRepository
matchGraphRepo *repository.GraphMatchRepository
sharedAssetGraphRepo *repository.GraphSharedAssetRepository
productGraphRepo *repository.GraphProductRepository
serviceGraphRepo *repository.GraphServiceRepository
}
// NewGraphSyncService creates a new graph sync service
func NewGraphSyncService(
orgGraphRepo *repository.GraphOrganizationRepository,
siteGraphRepo *repository.GraphSiteRepository,
addressGraphRepo *repository.GraphAddressRepository,
flowGraphRepo *repository.GraphResourceFlowRepository,
matchGraphRepo *repository.GraphMatchRepository,
sharedAssetGraphRepo *repository.GraphSharedAssetRepository,
productGraphRepo *repository.GraphProductRepository,
serviceGraphRepo *repository.GraphServiceRepository,
) *GraphSyncService {
return &GraphSyncService{
orgGraphRepo: orgGraphRepo,
siteGraphRepo: siteGraphRepo,
addressGraphRepo: addressGraphRepo,
flowGraphRepo: flowGraphRepo,
matchGraphRepo: matchGraphRepo,
sharedAssetGraphRepo: sharedAssetGraphRepo,
productGraphRepo: productGraphRepo,
serviceGraphRepo: serviceGraphRepo,
}
}
// SyncOrganization syncs an organization to the graph database
func (s *GraphSyncService) SyncOrganization(ctx context.Context, org *domain.Organization) error {
if s.orgGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.orgGraphRepo.SyncToGraph(ctx, org); err != nil {
return fmt.Errorf("failed to sync organization to graph: %w", err)
}
log.Printf("Synced organization %s to graph database", org.ID)
return nil
}
// DeleteOrganization deletes an organization from the graph database
func (s *GraphSyncService) DeleteOrganization(ctx context.Context, id string) error {
if s.orgGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.orgGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete organization from graph: %w", err)
}
log.Printf("Deleted organization %s from graph database", id)
return nil
}
// SyncSite syncs a site to the graph database
func (s *GraphSyncService) SyncSite(ctx context.Context, site *domain.Site) error {
if s.siteGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.siteGraphRepo.SyncToGraph(ctx, site); err != nil {
return fmt.Errorf("failed to sync site to graph: %w", err)
}
log.Printf("Synced site %s to graph database", site.ID)
return nil
}
// DeleteSite deletes a site from the graph database
func (s *GraphSyncService) DeleteSite(ctx context.Context, id string) error {
if s.siteGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.siteGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete site from graph: %w", err)
}
log.Printf("Deleted site %s from graph database", id)
return nil
}
// SyncResourceFlow syncs a resource flow to the graph database
func (s *GraphSyncService) SyncResourceFlow(ctx context.Context, flow *domain.ResourceFlow) error {
if s.flowGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.flowGraphRepo.SyncToGraph(ctx, flow); err != nil {
return fmt.Errorf("failed to sync resource flow to graph: %w", err)
}
log.Printf("Synced resource flow %s to graph database", flow.ID)
return nil
}
// DeleteResourceFlow deletes a resource flow from the graph database
func (s *GraphSyncService) DeleteResourceFlow(ctx context.Context, id string) error {
if s.flowGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.flowGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete resource flow from graph: %w", err)
}
log.Printf("Deleted resource flow %s from graph database", id)
return nil
}
// FindMatchesInGraph uses Neo4j graph traversal for advanced matching
func (s *GraphSyncService) FindMatchesInGraph(ctx context.Context, flowID string, maxDistanceKm float64) ([]map[string]interface{}, error) {
if s.flowGraphRepo == nil {
return nil, fmt.Errorf("graph matching not available")
}
matches, err := s.flowGraphRepo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)
if err != nil {
return nil, fmt.Errorf("failed to find matches in graph: %w", err)
}
log.Printf("Found %d potential matches in graph for flow %s", len(matches), flowID)
return matches, nil
}
// BulkSyncOrganizations syncs multiple organizations to graph
func (s *GraphSyncService) BulkSyncOrganizations(ctx context.Context, orgs []*domain.Organization) error {
if s.orgGraphRepo == nil {
return nil
}
for _, org := range orgs {
if err := s.SyncOrganization(ctx, org); err != nil {
log.Printf("Warning: Failed to sync organization %s: %v", org.ID, err)
// Continue with other organizations
}
}
log.Printf("Bulk synced %d organizations to graph", len(orgs))
return nil
}
// BulkSyncSites syncs multiple sites to graph
func (s *GraphSyncService) BulkSyncSites(ctx context.Context, sites []*domain.Site) error {
if s.siteGraphRepo == nil {
return nil
}
for _, site := range sites {
if err := s.SyncSite(ctx, site); err != nil {
log.Printf("Warning: Failed to sync site %s: %v", site.ID, err)
// Continue with other sites
}
}
log.Printf("Bulk synced %d sites to graph", len(sites))
return nil
}
// BulkSyncResourceFlows syncs multiple resource flows to graph
func (s *GraphSyncService) BulkSyncResourceFlows(ctx context.Context, flows []*domain.ResourceFlow) error {
if s.flowGraphRepo == nil {
return nil
}
for _, flow := range flows {
if err := s.SyncResourceFlow(ctx, flow); err != nil {
log.Printf("Warning: Failed to sync resource flow %s: %v", flow.ID, err)
// Continue with other flows
}
}
log.Printf("Bulk synced %d resource flows to graph", len(flows))
return nil
}
// SyncAddress syncs an address to the graph database
func (s *GraphSyncService) SyncAddress(ctx context.Context, address *domain.Address) error {
if s.addressGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.addressGraphRepo.SyncToGraph(ctx, address); err != nil {
return fmt.Errorf("failed to sync address to graph: %w", err)
}
log.Printf("Synced address %s to graph database", address.ID)
return nil
}
// DeleteAddress deletes an address from the graph database
func (s *GraphSyncService) DeleteAddress(ctx context.Context, id string) error {
if s.addressGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.addressGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete address from graph: %w", err)
}
log.Printf("Deleted address %s from graph database", id)
return nil
}
// BulkSyncAddresses syncs multiple addresses to graph with their relationships
func (s *GraphSyncService) BulkSyncAddresses(ctx context.Context, addresses []*domain.Address) error {
if s.addressGraphRepo == nil {
return nil
}
for _, address := range addresses {
// Extract organization IDs from preloaded associations
var orgIDs []string
for _, org := range address.Organizations {
orgIDs = append(orgIDs, org.ID)
}
// Extract site IDs from preloaded associations
var siteIDs []string
for _, site := range address.Sites {
siteIDs = append(siteIDs, site.ID)
}
// Use atomic sync with relationships for better performance
if err := s.addressGraphRepo.SyncWithRelationships(ctx, address, orgIDs, siteIDs); err != nil {
log.Printf("Warning: Failed to sync address %s: %v", address.ID, err)
// Continue with other addresses
}
}
log.Printf("Bulk synced %d addresses to graph", len(addresses))
return nil
}
// SyncMatch syncs a match to the graph database
func (s *GraphSyncService) SyncMatch(ctx context.Context, match *domain.Match) error {
if s.matchGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.matchGraphRepo.SyncToGraph(ctx, match); err != nil {
return fmt.Errorf("failed to sync match to graph: %w", err)
}
log.Printf("Synced match %s to graph database", match.ID)
return nil
}
// DeleteMatch deletes a match from the graph database
func (s *GraphSyncService) DeleteMatch(ctx context.Context, id string) error {
if s.matchGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.matchGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete match from graph: %w", err)
}
log.Printf("Deleted match %s from graph database", id)
return nil
}
// BulkSyncMatches syncs multiple matches to graph
func (s *GraphSyncService) BulkSyncMatches(ctx context.Context, matches []*domain.Match) error {
if s.matchGraphRepo == nil {
return nil
}
for _, match := range matches {
if err := s.SyncMatch(ctx, match); err != nil {
log.Printf("Warning: Failed to sync match %s: %v", match.ID, err)
// Continue with other matches
}
}
log.Printf("Bulk synced %d matches to graph", len(matches))
return nil
}
// SyncSharedAsset syncs a shared asset to the graph database
func (s *GraphSyncService) SyncSharedAsset(ctx context.Context, asset *domain.SharedAsset) error {
if s.sharedAssetGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.sharedAssetGraphRepo.SyncToGraph(ctx, asset); err != nil {
return fmt.Errorf("failed to sync shared asset to graph: %w", err)
}
log.Printf("Synced shared asset %s to graph database", asset.ID)
return nil
}
// DeleteSharedAsset deletes a shared asset from the graph database
func (s *GraphSyncService) DeleteSharedAsset(ctx context.Context, id string) error {
if s.sharedAssetGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.sharedAssetGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete shared asset from graph: %w", err)
}
log.Printf("Deleted shared asset %s from graph database", id)
return nil
}
// BulkSyncSharedAssets syncs multiple shared assets to graph
func (s *GraphSyncService) BulkSyncSharedAssets(ctx context.Context, assets []*domain.SharedAsset) error {
if s.sharedAssetGraphRepo == nil {
return nil
}
for _, asset := range assets {
if err := s.SyncSharedAsset(ctx, asset); err != nil {
log.Printf("Warning: Failed to sync shared asset %s: %v", asset.ID, err)
// Continue with other assets
}
}
log.Printf("Bulk synced %d shared assets to graph", len(assets))
return nil
}
// SyncProduct syncs a product to the graph database
func (s *GraphSyncService) SyncProduct(ctx context.Context, product *domain.Product) error {
if s.productGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.productGraphRepo.SyncToGraph(ctx, product); err != nil {
return fmt.Errorf("failed to sync product to graph: %w", err)
}
log.Printf("Synced product %s to graph database", product.ID)
return nil
}
// DeleteProduct deletes a product from the graph database
func (s *GraphSyncService) DeleteProduct(ctx context.Context, id string) error {
if s.productGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.productGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete product from graph: %w", err)
}
log.Printf("Deleted product %s from graph database", id)
return nil
}
// SyncService syncs a service to the graph database
func (s *GraphSyncService) SyncService(ctx context.Context, service *domain.Service) error {
if s.serviceGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.serviceGraphRepo.SyncToGraph(ctx, service); err != nil {
return fmt.Errorf("failed to sync service to graph: %w", err)
}
log.Printf("Synced service %s to graph database", service.ID)
return nil
}
// DeleteService deletes a service from the graph database
func (s *GraphSyncService) DeleteService(ctx context.Context, id string) error {
if s.serviceGraphRepo == nil {
return nil // Graph sync disabled
}
if err := s.serviceGraphRepo.DeleteFromGraph(ctx, id); err != nil {
return fmt.Errorf("failed to delete service from graph: %w", err)
}
log.Printf("Deleted service %s from graph database", id)
return nil
}
// BulkSyncProducts syncs multiple products to graph
func (s *GraphSyncService) BulkSyncProducts(ctx context.Context, products []*domain.Product) error {
if s.productGraphRepo == nil {
return nil
}
for _, product := range products {
if err := s.SyncProduct(ctx, product); err != nil {
log.Printf("Warning: Failed to sync product %s: %v", product.ID, err)
// Continue with other products
}
}
log.Printf("Bulk synced %d products to graph", len(products))
return nil
}
// BulkSyncServices syncs multiple services to graph
func (s *GraphSyncService) BulkSyncServices(ctx context.Context, services []*domain.Service) error {
if s.serviceGraphRepo == nil {
return nil
}
for _, service := range services {
if err := s.SyncService(ctx, service); err != nil {
log.Printf("Warning: Failed to sync service %s: %v", service.ID, err)
// Continue with other services
}
}
log.Printf("Bulk synced %d services to graph", len(services))
return nil
}