# Neo4j Graph Database Integration This document describes the hybrid PostgreSQL + Neo4j architecture implementation based on the concept documentation. ## Architecture Overview The platform uses a **hybrid database architecture**: ### PostgreSQL + PostGIS - **Purpose**: Primary data store for structured data and geospatial queries - **Stores**: Organizations, Sites, ResourceFlows, Matches, SharedAssets with full GORM models - **Strengths**: ACID compliance, complex spatial queries, JSONB for flexible data - **Use Cases**: Transactional operations, detailed entity storage, spatial filtering ### Neo4j Graph Database - **Purpose**: Graph traversal and relationship-based matching - **Stores**: Same entities as nodes with relationships (OPERATES_AT, HOSTS, MATCHABLE_TO) - **Strengths**: Multi-hop traversals, pattern matching, relationship queries - **Use Cases**: Advanced matching algorithms, network analysis, symbiosis discovery ### Synchronization Pattern - **Event-driven sync**: PostgreSQL → Neo4j on create/update/delete - **Dual-write pattern**: Services write to PostgreSQL (primary), then sync to Neo4j (secondary) - **Graceful degradation**: System continues working if Neo4j is unavailable ## Implementation Components ### 1. Database Configuration (`pkg/database/neo4j.go`) **Neo4j Driver Setup**: ```go config := database.Neo4jConfig{ URI: "neo4j://localhost:7687", Username: "neo4j", Password: "password", Database: "neo4j", } driver, err := database.NewNeo4jDriver(config) ``` **Features**: - Connection pooling (max 50 connections) - 30-second acquisition timeout - 30-second transaction retry time - Automatic connectivity verification **Schema Initialization**: ```go err := database.InitializeSchema(ctx, driver, "neo4j") ``` Creates: - **Uniqueness constraints** on `id` fields for all node types - **Indexes** for performance: - Organization: name, sector, subtype - Site: location (lat/lng), site_type - ResourceFlow: type, direction, type+direction composite - Match: status, compatibility_score - SharedAsset: type ### 2. Graph Repositories (`internal/repository/graph_repository.go`) #### GraphOrganizationRepository **Sync Organization to Graph**: ```go repo := repository.NewGraphOrganizationRepository(driver, "neo4j") err := repo.SyncToGraph(ctx, organization) ``` **Cypher Operation**: - `MERGE (o:Organization {id: $id})` - Upsert node - Sets all properties including JSONB as JSON strings - Stores timestamps in RFC3339 format **Properties Synced**: - Basic: id, name, sector, subtype, description, address - Location: latitude, longitude - Business: legal_form, industrial_sector, company_size, years_operation - Products/Services: sells_products, offers_services (as JSON) - Metadata: verified, created_at, updated_at #### GraphSiteRepository **Sync Site to Graph**: ```go repo := repository.NewGraphSiteRepository(driver, "neo4j") err := repo.SyncToGraph(ctx, site) ``` **Cypher Operation**: - `MERGE (s:Site {id: $id})` - Upsert site node - `MERGE (o)-[:OPERATES_AT]->(s)` - Create relationship to owner organization - Conditional relationship creation (only if owner exists) **Relationships Created**: - `Organization -[:OPERATES_AT]-> Site` #### GraphResourceFlowRepository **Sync Resource Flow to Graph**: ```go repo := repository.NewGraphResourceFlowRepository(driver, "neo4j") err := repo.SyncToGraph(ctx, resourceFlow) ``` **Cypher Operation**: - `MERGE (rf:ResourceFlow {id: $id})` - Upsert resource flow node - `MERGE (s)-[:HOSTS]->(rf)` - Create relationship to site - Unpacks JSONB fields (Quality, Quantity, TimeProfile, EconomicData) for graph properties **Relationships Created**: - `Site -[:HOSTS]-> ResourceFlow` **Properties Synced**: - Identification: id, business_id, site_id, direction, type - Quality: temperature_celsius, pressure_bar, purity_pct, grade, hazardousness, physical_state - Quantity: amount, unit, temporal_unit - Temporal: supply_pattern - Economic: cost_in, cost_out, transportation_cost - Metadata: precision_level, source_type **Graph-Based Matching**: ```go matches, err := repo.FindMatchesInGraph(ctx, flowID, maxDistanceKm) ``` **Cypher Query** (from concept documentation): ```cypher MATCH (sourceFlow:ResourceFlow {id: $flowID})<-[:HOSTS]-(sourceSite:Site), (targetFlow:ResourceFlow)<-[:HOSTS]-(targetSite:Site) WHERE sourceFlow.direction = 'output' AND targetFlow.direction = 'input' AND sourceFlow.type = targetFlow.type AND abs(coalesce(sourceFlow.temperature_celsius, 0) - coalesce(targetFlow.temperature_celsius, 0)) <= 10 WITH sourceFlow, targetFlow, sourceSite, targetSite, point.distance( point({longitude: sourceSite.longitude, latitude: sourceSite.latitude}), point({longitude: targetSite.longitude, latitude: targetSite.latitude}) ) / 1000 AS distance_km WHERE distance_km <= $maxDistance RETURN targetFlow.id, targetFlow.type, targetFlow.temperature_celsius, targetFlow.amount, targetSite.name, distance_km ORDER BY distance_km ASC LIMIT 50 ``` **Matching Features**: - Direction filtering (output → input) - Type matching (heat → heat, water → water, etc.) - Temperature compatibility (ΔT ≤ 10°C) - Geospatial distance calculation (Neo4j point.distance) - Distance-based ranking ### 3. Graph Sync Service (`internal/service/graph_sync_service.go`) **Purpose**: Orchestrates synchronization between PostgreSQL and Neo4j **Service Creation**: ```go graphSyncService := service.NewGraphSyncService( orgGraphRepo, siteGraphRepo, flowGraphRepo, ) ``` **Single Entity Sync**: ```go // Sync organization err := graphSyncService.SyncOrganization(ctx, organization) // Sync site err := graphSyncService.SyncSite(ctx, site) // Sync resource flow err := graphSyncService.SyncResourceFlow(ctx, resourceFlow) ``` **Deletion from Graph**: ```go err := graphSyncService.DeleteOrganization(ctx, orgID) err := graphSyncService.DeleteSite(ctx, siteID) err := graphSyncService.DeleteResourceFlow(ctx, flowID) ``` **Bulk Synchronization**: ```go // Bulk sync organizations err := graphSyncService.BulkSyncOrganizations(ctx, organizations) // Bulk sync sites err := graphSyncService.BulkSyncSites(ctx, sites) // Bulk sync resource flows err := graphSyncService.BulkSyncResourceFlows(ctx, resourceFlows) ``` **Graph Matching**: ```go matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0) ``` **Graceful Degradation**: - All sync methods check if graph repo is nil - If Neo4j is disabled, sync operations are no-ops (return nil) - Application continues working with PostgreSQL only ### 4. Configuration (`pkg/config/config.go`) **Environment Variables**: ```bash # Neo4j Configuration NEO4J_URI=neo4j://localhost:7687 NEO4J_USERNAME=neo4j NEO4J_PASSWORD=your-password NEO4J_DATABASE=neo4j NEO4J_ENABLED=true # Set to false to disable graph sync # PostgreSQL Configuration POSTGRES_HOST=localhost POSTGRES_PORT=5432 POSTGRES_USER=bugulma POSTGRES_PASSWORD=bugulma POSTGRES_DB=bugulma_city POSTGRES_SSLMODE=disable ``` **Config Loading**: ```go cfg := config.Load() // Neo4j is disabled by default // Set NEO4J_ENABLED=true to enable if cfg.Neo4jEnabled { // Initialize Neo4j driver and sync } ``` ## Graph Schema ### Node Types **Organization**: ```cypher (:Organization { id: string, name: string, sector: string, subtype: string, latitude: float, longitude: float, industrial_sector: string, company_size: int, sells_products: json, offers_services: json, ... }) ``` **Site**: ```cypher (:Site { id: string, name: string, latitude: float, longitude: float, site_type: string, floor_area_m2: float, available_utilities: json, ... }) ``` **ResourceFlow**: ```cypher (:ResourceFlow { id: string, business_id: string, site_id: string, direction: string, // 'input' or 'output' type: string, // 'heat', 'water', 'steam', etc. temperature_celsius: float, amount: float, unit: string, precision_level: string, ... }) ``` ### Relationships **Core Relationships** (from concept): ```cypher (Organization)-[:OPERATES_AT]->(Site) (Site)-[:HOSTS]->(ResourceFlow) (ResourceFlow)-[:MATCHABLE_TO {efficiency, distance, savings}]->(ResourceFlow) (Organization)-[:TRUSTS]->(Organization) ``` **Implemented**: - ✅ `Organization -[:OPERATES_AT]-> Site` - ✅ `Site -[:HOSTS]-> ResourceFlow` - 🔄 `ResourceFlow -[:MATCHABLE_TO]-> ResourceFlow` (computed dynamically via queries) - 🔄 `Organization -[:TRUSTS]-> Organization` (to be implemented) ## Integration with Existing Services ### Pattern for Service Integration **Example: OrganizationService with Graph Sync** ```go type OrganizationService struct { repo domain.OrganizationRepository graphSyncService *GraphSyncService // Optional } func (s *OrganizationService) Create(req CreateOrganizationRequest) (*domain.Organization, error) { // 1. Create in PostgreSQL (primary) org := &domain.Organization{ /* ... */ } if err := s.repo.Create(org); err != nil { return nil, err } // 2. Sync to Neo4j (secondary, non-blocking) if s.graphSyncService != nil { go func() { ctx := context.Background() if err := s.graphSyncService.SyncOrganization(ctx, org); err != nil { log.Printf("Failed to sync organization to graph: %v", err) } }() } return org, nil } func (s *OrganizationService) Delete(id string) error { // 1. Delete from PostgreSQL (primary) if err := s.repo.Delete(id); err != nil { return err } // 2. Delete from Neo4j (secondary) if s.graphSyncService != nil { go func() { ctx := context.Background() if err := s.graphSyncService.DeleteOrganization(ctx, id); err != nil { log.Printf("Failed to delete organization from graph: %v", err) } }() } return nil } ``` ## Usage Examples ### Setup Neo4j Integration ```go package main import ( "context" "log" "bugulma/backend/pkg/database" "bugulma/backend/pkg/config" "bugulma/backend/internal/repository" "bugulma/backend/internal/service" ) func main() { cfg := config.Load() var graphSyncService *service.GraphSyncService if cfg.Neo4jEnabled { // Initialize Neo4j driver neo4jConfig := database.Neo4jConfig{ URI: cfg.Neo4jURI, Username: cfg.Neo4jUsername, Password: cfg.Neo4jPassword, Database: cfg.Neo4jDatabase, } driver, err := database.NewNeo4jDriver(neo4jConfig) if err != nil { log.Printf("Warning: Failed to connect to Neo4j: %v", err) } else { // Initialize schema ctx := context.Background() if err := database.InitializeSchema(ctx, driver, cfg.Neo4jDatabase); err != nil { log.Printf("Warning: Failed to initialize Neo4j schema: %v", err) } // Create graph repositories orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase) siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase) flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase) // Create graph sync service graphSyncService = service.NewGraphSyncService( orgGraphRepo, siteGraphRepo, flowGraphRepo, ) log.Println("Neo4j graph database integration enabled") } } // Pass graphSyncService to other services // ... } ``` ### Bulk Sync Existing Data ```go func syncExistingData( orgRepo domain.OrganizationRepository, siteRepo domain.SiteRepository, flowRepo domain.ResourceFlowRepository, graphSyncService *service.GraphSyncService, ) error { ctx := context.Background() // Sync all organizations orgs, err := orgRepo.GetAll() if err != nil { return err } if err := graphSyncService.BulkSyncOrganizations(ctx, orgs); err != nil { return err } log.Printf("Synced %d organizations to graph", len(orgs)) // Sync all sites sites, err := siteRepo.GetAll() if err != nil { return err } if err := graphSyncService.BulkSyncSites(ctx, sites); err != nil { return err } log.Printf("Synced %d sites to graph", len(sites)) // Sync all resource flows flows, err := flowRepo.GetAll() if err != nil { return err } if err := graphSyncService.BulkSyncResourceFlows(ctx, flows); err != nil { return err } log.Printf("Synced %d resource flows to graph", len(flows)) return nil } ``` ### Graph-Based Matching ```go // Use graph traversal for advanced matching func findAdvancedMatches( graphSyncService *service.GraphSyncService, flowID string, ) { ctx := context.Background() // Find matches using Neo4j graph traversal matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0) if err != nil { log.Printf("Graph matching failed, falling back to PostgreSQL: %v", err) // Fallback to PostgreSQL-based matching return } // Process graph matches for _, match := range matches { log.Printf("Match found: %s at %s (%.2f km away)", match["target_flow_id"], match["target_site_name"], match["distance_km"], ) } } ``` ## Performance Considerations ### Query Performance **PostgreSQL**: Excellent for: - Spatial filtering (PostGIS indexes) - JSONB queries (GIN indexes) - Transaction-heavy workloads - Exact point lookups **Neo4j**: Excellent for: - Multi-hop graph traversals - Relationship pattern matching - Network analysis - Discovering indirect connections ### When to Use Each Database **Use PostgreSQL When**: - CRUD operations on single entities - Spatial radius queries (GetWithinRadius) - Transaction integrity is critical - Querying by specific attributes **Use Neo4j When**: - Finding indirect symbiosis opportunities - Multi-party matching (3+ organizations) - Trust network analysis - Supply chain relationship mapping - Pattern-based discovery ### Scaling Strategy **MVP** (current): - Single PostgreSQL instance - Single Neo4j instance - Async sync (fire-and-forget) **Scale Phase** (1000+ businesses): - PostgreSQL read replicas - Neo4j clustering (Enterprise) - Event-driven sync via NATS/Kafka - Eventual consistency model ## Future Enhancements ### Phase 2: Advanced Graph Features 1. **MATCHABLE_TO Relationship**: - Pre-compute match relationships - Store efficiency scores as edge properties - Update on resource flow changes 2. **Trust Network**: - `(Organization)-[:TRUSTS]->(Organization)` - Trust score propagation - Transitive trust calculations 3. **Multi-Party Matching**: - 3+ party circular symbioses - Cypher queries for complex patterns - Optimization algorithms 4. **Knowledge Graph Integration**: - Semantic relationships - Taxonomy integration (NACE, EWC codes) - GraphRAG for AI-enhanced matching ### Phase 3: Federation **Zone-Based Architecture** (from concept): - Regional Neo4j graphs (city zones, industrial parks) - Global federation layer - Cross-zone query protocol - Data sovereignty compliance ## Troubleshooting ### Common Issues **Neo4j Connection Refused**: ``` Error: failed to verify Neo4j connectivity: connection refused ``` **Solution**: Ensure Neo4j is running on specified URI, check credentials **Schema Creation Fails**: ``` Error: failed to create constraint: syntax error ``` **Solution**: Verify Neo4j version supports `IF NOT EXISTS` syntax (4.0+) **Sync Performance Degradation**: **Solution**: Enable async sync (goroutines), batch updates, use event queue ### Monitoring **Recommended Metrics**: - Neo4j connection pool usage - Sync operation latency - Failed sync count - Graph query response time - Node/relationship count ## References - **Concept Documentation**: `concept/09_graph_database_design.md` - **Dev Guide**: `dev_guides/02_neo4j_driver.md` - **Example Queries**: `concept/23_example_query_in_cypher_neo4j.md` - **Neo4j Go Driver**: - **Neo4j Cypher Manual**: