turash/bugulma/backend/GRAPH_DATABASE_INTEGRATION.md
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

16 KiB

Neo4j Graph Database Integration

This document describes the hybrid PostgreSQL + Neo4j architecture implementation based on the concept documentation.

Architecture Overview

The platform uses a hybrid database architecture:

PostgreSQL + PostGIS

  • Purpose: Primary data store for structured data and geospatial queries
  • Stores: Organizations, Sites, ResourceFlows, Matches, SharedAssets with full GORM models
  • Strengths: ACID compliance, complex spatial queries, JSONB for flexible data
  • Use Cases: Transactional operations, detailed entity storage, spatial filtering

Neo4j Graph Database

  • Purpose: Graph traversal and relationship-based matching
  • Stores: Same entities as nodes with relationships (OPERATES_AT, HOSTS, MATCHABLE_TO)
  • Strengths: Multi-hop traversals, pattern matching, relationship queries
  • Use Cases: Advanced matching algorithms, network analysis, symbiosis discovery

Synchronization Pattern

  • Event-driven sync: PostgreSQL → Neo4j on create/update/delete
  • Dual-write pattern: Services write to PostgreSQL (primary), then sync to Neo4j (secondary)
  • Graceful degradation: System continues working if Neo4j is unavailable

Implementation Components

1. Database Configuration (pkg/database/neo4j.go)

Neo4j Driver Setup:

config := database.Neo4jConfig{
    URI:      "neo4j://localhost:7687",
    Username: "neo4j",
    Password: "password",
    Database: "neo4j",
}

driver, err := database.NewNeo4jDriver(config)

Features:

  • Connection pooling (max 50 connections)
  • 30-second acquisition timeout
  • 30-second transaction retry time
  • Automatic connectivity verification

Schema Initialization:

err := database.InitializeSchema(ctx, driver, "neo4j")

Creates:

  • Uniqueness constraints on id fields for all node types
  • Indexes for performance:
    • Organization: name, sector, subtype
    • Site: location (lat/lng), site_type
    • ResourceFlow: type, direction, type+direction composite
    • Match: status, compatibility_score
    • SharedAsset: type

2. Graph Repositories (internal/repository/graph_repository.go)

GraphOrganizationRepository

Sync Organization to Graph:

repo := repository.NewGraphOrganizationRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, organization)

Cypher Operation:

  • MERGE (o:Organization {id: $id}) - Upsert node
  • Sets all properties including JSONB as JSON strings
  • Stores timestamps in RFC3339 format

Properties Synced:

  • Basic: id, name, sector, subtype, description, address
  • Location: latitude, longitude
  • Business: legal_form, industrial_sector, company_size, years_operation
  • Products/Services: sells_products, offers_services (as JSON)
  • Metadata: verified, created_at, updated_at

GraphSiteRepository

Sync Site to Graph:

repo := repository.NewGraphSiteRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, site)

Cypher Operation:

  • MERGE (s:Site {id: $id}) - Upsert site node
  • MERGE (o)-[:OPERATES_AT]->(s) - Create relationship to owner organization
  • Conditional relationship creation (only if owner exists)

Relationships Created:

  • Organization -[:OPERATES_AT]-> Site

GraphResourceFlowRepository

Sync Resource Flow to Graph:

repo := repository.NewGraphResourceFlowRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, resourceFlow)

Cypher Operation:

  • MERGE (rf:ResourceFlow {id: $id}) - Upsert resource flow node
  • MERGE (s)-[:HOSTS]->(rf) - Create relationship to site
  • Unpacks JSONB fields (Quality, Quantity, TimeProfile, EconomicData) for graph properties

Relationships Created:

  • Site -[:HOSTS]-> ResourceFlow

Properties Synced:

  • Identification: id, business_id, site_id, direction, type
  • Quality: temperature_celsius, pressure_bar, purity_pct, grade, hazardousness, physical_state
  • Quantity: amount, unit, temporal_unit
  • Temporal: supply_pattern
  • Economic: cost_in, cost_out, transportation_cost
  • Metadata: precision_level, source_type

Graph-Based Matching:

matches, err := repo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)

Cypher Query (from concept documentation):

MATCH (sourceFlow:ResourceFlow {id: $flowID})<-[:HOSTS]-(sourceSite:Site),
      (targetFlow:ResourceFlow)<-[:HOSTS]-(targetSite:Site)
WHERE sourceFlow.direction = 'output'
  AND targetFlow.direction = 'input'
  AND sourceFlow.type = targetFlow.type
  AND abs(coalesce(sourceFlow.temperature_celsius, 0) - coalesce(targetFlow.temperature_celsius, 0)) <= 10
WITH sourceFlow, targetFlow, sourceSite, targetSite,
     point.distance(
         point({longitude: sourceSite.longitude, latitude: sourceSite.latitude}),
         point({longitude: targetSite.longitude, latitude: targetSite.latitude})
     ) / 1000 AS distance_km
WHERE distance_km <= $maxDistance
RETURN targetFlow.id, targetFlow.type, targetFlow.temperature_celsius,
       targetFlow.amount, targetSite.name, distance_km
ORDER BY distance_km ASC
LIMIT 50

Matching Features:

  • Direction filtering (output → input)
  • Type matching (heat → heat, water → water, etc.)
  • Temperature compatibility (ΔT ≤ 10°C)
  • Geospatial distance calculation (Neo4j point.distance)
  • Distance-based ranking

3. Graph Sync Service (internal/service/graph_sync_service.go)

Purpose: Orchestrates synchronization between PostgreSQL and Neo4j

Service Creation:

graphSyncService := service.NewGraphSyncService(
    orgGraphRepo,
    siteGraphRepo,
    flowGraphRepo,
)

Single Entity Sync:

// Sync organization
err := graphSyncService.SyncOrganization(ctx, organization)

// Sync site
err := graphSyncService.SyncSite(ctx, site)

// Sync resource flow
err := graphSyncService.SyncResourceFlow(ctx, resourceFlow)

Deletion from Graph:

err := graphSyncService.DeleteOrganization(ctx, orgID)
err := graphSyncService.DeleteSite(ctx, siteID)
err := graphSyncService.DeleteResourceFlow(ctx, flowID)

Bulk Synchronization:

// Bulk sync organizations
err := graphSyncService.BulkSyncOrganizations(ctx, organizations)

// Bulk sync sites
err := graphSyncService.BulkSyncSites(ctx, sites)

// Bulk sync resource flows
err := graphSyncService.BulkSyncResourceFlows(ctx, resourceFlows)

Graph Matching:

matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)

Graceful Degradation:

  • All sync methods check if graph repo is nil
  • If Neo4j is disabled, sync operations are no-ops (return nil)
  • Application continues working with PostgreSQL only

4. Configuration (pkg/config/config.go)

Environment Variables:

# Neo4j Configuration
NEO4J_URI=neo4j://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your-password
NEO4J_DATABASE=neo4j
NEO4J_ENABLED=true  # Set to false to disable graph sync

# PostgreSQL Configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=bugulma
POSTGRES_PASSWORD=bugulma
POSTGRES_DB=bugulma_city
POSTGRES_SSLMODE=disable

Config Loading:

cfg := config.Load()

// Neo4j is disabled by default
// Set NEO4J_ENABLED=true to enable
if cfg.Neo4jEnabled {
    // Initialize Neo4j driver and sync
}

Graph Schema

Node Types

Organization:

(:Organization {
  id: string,
  name: string,
  sector: string,
  subtype: string,
  latitude: float,
  longitude: float,
  industrial_sector: string,
  company_size: int,
  sells_products: json,
  offers_services: json,
  ...
})

Site:

(:Site {
  id: string,
  name: string,
  latitude: float,
  longitude: float,
  site_type: string,
  floor_area_m2: float,
  available_utilities: json,
  ...
})

ResourceFlow:

(:ResourceFlow {
  id: string,
  business_id: string,
  site_id: string,
  direction: string,  // 'input' or 'output'
  type: string,       // 'heat', 'water', 'steam', etc.
  temperature_celsius: float,
  amount: float,
  unit: string,
  precision_level: string,
  ...
})

Relationships

Core Relationships (from concept):

(Organization)-[:OPERATES_AT]->(Site)
(Site)-[:HOSTS]->(ResourceFlow)
(ResourceFlow)-[:MATCHABLE_TO {efficiency, distance, savings}]->(ResourceFlow)
(Organization)-[:TRUSTS]->(Organization)

Implemented:

  • Organization -[:OPERATES_AT]-> Site
  • Site -[:HOSTS]-> ResourceFlow
  • 🔄 ResourceFlow -[:MATCHABLE_TO]-> ResourceFlow (computed dynamically via queries)
  • 🔄 Organization -[:TRUSTS]-> Organization (to be implemented)

Integration with Existing Services

Pattern for Service Integration

Example: OrganizationService with Graph Sync

type OrganizationService struct {
    repo             domain.OrganizationRepository
    graphSyncService *GraphSyncService  // Optional
}

func (s *OrganizationService) Create(req CreateOrganizationRequest) (*domain.Organization, error) {
    // 1. Create in PostgreSQL (primary)
    org := &domain.Organization{ /* ... */ }
    if err := s.repo.Create(org); err != nil {
        return nil, err
    }

    // 2. Sync to Neo4j (secondary, non-blocking)
    if s.graphSyncService != nil {
        go func() {
            ctx := context.Background()
            if err := s.graphSyncService.SyncOrganization(ctx, org); err != nil {
                log.Printf("Failed to sync organization to graph: %v", err)
            }
        }()
    }

    return org, nil
}

func (s *OrganizationService) Delete(id string) error {
    // 1. Delete from PostgreSQL (primary)
    if err := s.repo.Delete(id); err != nil {
        return err
    }

    // 2. Delete from Neo4j (secondary)
    if s.graphSyncService != nil {
        go func() {
            ctx := context.Background()
            if err := s.graphSyncService.DeleteOrganization(ctx, id); err != nil {
                log.Printf("Failed to delete organization from graph: %v", err)
            }
        }()
    }

    return nil
}

Usage Examples

Setup Neo4j Integration

package main

import (
    "context"
    "log"

    "bugulma/backend/pkg/database"
    "bugulma/backend/pkg/config"
    "bugulma/backend/internal/repository"
    "bugulma/backend/internal/service"
)

func main() {
    cfg := config.Load()

    var graphSyncService *service.GraphSyncService

    if cfg.Neo4jEnabled {
        // Initialize Neo4j driver
        neo4jConfig := database.Neo4jConfig{
            URI:      cfg.Neo4jURI,
            Username: cfg.Neo4jUsername,
            Password: cfg.Neo4jPassword,
            Database: cfg.Neo4jDatabase,
        }

        driver, err := database.NewNeo4jDriver(neo4jConfig)
        if err != nil {
            log.Printf("Warning: Failed to connect to Neo4j: %v", err)
        } else {
            // Initialize schema
            ctx := context.Background()
            if err := database.InitializeSchema(ctx, driver, cfg.Neo4jDatabase); err != nil {
                log.Printf("Warning: Failed to initialize Neo4j schema: %v", err)
            }

            // Create graph repositories
            orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase)
            siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase)
            flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase)

            // Create graph sync service
            graphSyncService = service.NewGraphSyncService(
                orgGraphRepo,
                siteGraphRepo,
                flowGraphRepo,
            )

            log.Println("Neo4j graph database integration enabled")
        }
    }

    // Pass graphSyncService to other services
    // ...
}

Bulk Sync Existing Data

func syncExistingData(
    orgRepo domain.OrganizationRepository,
    siteRepo domain.SiteRepository,
    flowRepo domain.ResourceFlowRepository,
    graphSyncService *service.GraphSyncService,
) error {
    ctx := context.Background()

    // Sync all organizations
    orgs, err := orgRepo.GetAll()
    if err != nil {
        return err
    }
    if err := graphSyncService.BulkSyncOrganizations(ctx, orgs); err != nil {
        return err
    }
    log.Printf("Synced %d organizations to graph", len(orgs))

    // Sync all sites
    sites, err := siteRepo.GetAll()
    if err != nil {
        return err
    }
    if err := graphSyncService.BulkSyncSites(ctx, sites); err != nil {
        return err
    }
    log.Printf("Synced %d sites to graph", len(sites))

    // Sync all resource flows
    flows, err := flowRepo.GetAll()
    if err != nil {
        return err
    }
    if err := graphSyncService.BulkSyncResourceFlows(ctx, flows); err != nil {
        return err
    }
    log.Printf("Synced %d resource flows to graph", len(flows))

    return nil
}

Graph-Based Matching

// Use graph traversal for advanced matching
func findAdvancedMatches(
    graphSyncService *service.GraphSyncService,
    flowID string,
) {
    ctx := context.Background()

    // Find matches using Neo4j graph traversal
    matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
    if err != nil {
        log.Printf("Graph matching failed, falling back to PostgreSQL: %v", err)
        // Fallback to PostgreSQL-based matching
        return
    }

    // Process graph matches
    for _, match := range matches {
        log.Printf("Match found: %s at %s (%.2f km away)",
            match["target_flow_id"],
            match["target_site_name"],
            match["distance_km"],
        )
    }
}

Performance Considerations

Query Performance

PostgreSQL: Excellent for:

  • Spatial filtering (PostGIS indexes)
  • JSONB queries (GIN indexes)
  • Transaction-heavy workloads
  • Exact point lookups

Neo4j: Excellent for:

  • Multi-hop graph traversals
  • Relationship pattern matching
  • Network analysis
  • Discovering indirect connections

When to Use Each Database

Use PostgreSQL When:

  • CRUD operations on single entities
  • Spatial radius queries (GetWithinRadius)
  • Transaction integrity is critical
  • Querying by specific attributes

Use Neo4j When:

  • Finding indirect symbiosis opportunities
  • Multi-party matching (3+ organizations)
  • Trust network analysis
  • Supply chain relationship mapping
  • Pattern-based discovery

Scaling Strategy

MVP (current):

  • Single PostgreSQL instance
  • Single Neo4j instance
  • Async sync (fire-and-forget)

Scale Phase (1000+ businesses):

  • PostgreSQL read replicas
  • Neo4j clustering (Enterprise)
  • Event-driven sync via NATS/Kafka
  • Eventual consistency model

Future Enhancements

Phase 2: Advanced Graph Features

  1. MATCHABLE_TO Relationship:

    • Pre-compute match relationships
    • Store efficiency scores as edge properties
    • Update on resource flow changes
  2. Trust Network:

    • (Organization)-[:TRUSTS]->(Organization)
    • Trust score propagation
    • Transitive trust calculations
  3. Multi-Party Matching:

    • 3+ party circular symbioses
    • Cypher queries for complex patterns
    • Optimization algorithms
  4. Knowledge Graph Integration:

    • Semantic relationships
    • Taxonomy integration (NACE, EWC codes)
    • GraphRAG for AI-enhanced matching

Phase 3: Federation

Zone-Based Architecture (from concept):

  • Regional Neo4j graphs (city zones, industrial parks)
  • Global federation layer
  • Cross-zone query protocol
  • Data sovereignty compliance

Troubleshooting

Common Issues

Neo4j Connection Refused:

Error: failed to verify Neo4j connectivity: connection refused

Solution: Ensure Neo4j is running on specified URI, check credentials

Schema Creation Fails:

Error: failed to create constraint: syntax error

Solution: Verify Neo4j version supports IF NOT EXISTS syntax (4.0+)

Sync Performance Degradation: Solution: Enable async sync (goroutines), batch updates, use event queue

Monitoring

Recommended Metrics:

  • Neo4j connection pool usage
  • Sync operation latency
  • Failed sync count
  • Graph query response time
  • Node/relationship count

References