Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools
Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
* GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
* GET /api/v1/users/me/organizations - User organizations
* POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue
API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules
Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
16 KiB
Neo4j Graph Database Integration
This document describes the hybrid PostgreSQL + Neo4j architecture implementation based on the concept documentation.
Architecture Overview
The platform uses a hybrid database architecture:
PostgreSQL + PostGIS
- Purpose: Primary data store for structured data and geospatial queries
- Stores: Organizations, Sites, ResourceFlows, Matches, SharedAssets with full GORM models
- Strengths: ACID compliance, complex spatial queries, JSONB for flexible data
- Use Cases: Transactional operations, detailed entity storage, spatial filtering
Neo4j Graph Database
- Purpose: Graph traversal and relationship-based matching
- Stores: Same entities as nodes with relationships (OPERATES_AT, HOSTS, MATCHABLE_TO)
- Strengths: Multi-hop traversals, pattern matching, relationship queries
- Use Cases: Advanced matching algorithms, network analysis, symbiosis discovery
Synchronization Pattern
- Event-driven sync: PostgreSQL → Neo4j on create/update/delete
- Dual-write pattern: Services write to PostgreSQL (primary), then sync to Neo4j (secondary)
- Graceful degradation: System continues working if Neo4j is unavailable
Implementation Components
1. Database Configuration (pkg/database/neo4j.go)
Neo4j Driver Setup:
config := database.Neo4jConfig{
URI: "neo4j://localhost:7687",
Username: "neo4j",
Password: "password",
Database: "neo4j",
}
driver, err := database.NewNeo4jDriver(config)
Features:
- Connection pooling (max 50 connections)
- 30-second acquisition timeout
- 30-second transaction retry time
- Automatic connectivity verification
Schema Initialization:
err := database.InitializeSchema(ctx, driver, "neo4j")
Creates:
- Uniqueness constraints on
idfields for all node types - Indexes for performance:
- Organization: name, sector, subtype
- Site: location (lat/lng), site_type
- ResourceFlow: type, direction, type+direction composite
- Match: status, compatibility_score
- SharedAsset: type
2. Graph Repositories (internal/repository/graph_repository.go)
GraphOrganizationRepository
Sync Organization to Graph:
repo := repository.NewGraphOrganizationRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, organization)
Cypher Operation:
MERGE (o:Organization {id: $id})- Upsert node- Sets all properties including JSONB as JSON strings
- Stores timestamps in RFC3339 format
Properties Synced:
- Basic: id, name, sector, subtype, description, address
- Location: latitude, longitude
- Business: legal_form, industrial_sector, company_size, years_operation
- Products/Services: sells_products, offers_services (as JSON)
- Metadata: verified, created_at, updated_at
GraphSiteRepository
Sync Site to Graph:
repo := repository.NewGraphSiteRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, site)
Cypher Operation:
MERGE (s:Site {id: $id})- Upsert site nodeMERGE (o)-[:OPERATES_AT]->(s)- Create relationship to owner organization- Conditional relationship creation (only if owner exists)
Relationships Created:
Organization -[:OPERATES_AT]-> Site
GraphResourceFlowRepository
Sync Resource Flow to Graph:
repo := repository.NewGraphResourceFlowRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, resourceFlow)
Cypher Operation:
MERGE (rf:ResourceFlow {id: $id})- Upsert resource flow nodeMERGE (s)-[:HOSTS]->(rf)- Create relationship to site- Unpacks JSONB fields (Quality, Quantity, TimeProfile, EconomicData) for graph properties
Relationships Created:
Site -[:HOSTS]-> ResourceFlow
Properties Synced:
- Identification: id, business_id, site_id, direction, type
- Quality: temperature_celsius, pressure_bar, purity_pct, grade, hazardousness, physical_state
- Quantity: amount, unit, temporal_unit
- Temporal: supply_pattern
- Economic: cost_in, cost_out, transportation_cost
- Metadata: precision_level, source_type
Graph-Based Matching:
matches, err := repo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)
Cypher Query (from concept documentation):
MATCH (sourceFlow:ResourceFlow {id: $flowID})<-[:HOSTS]-(sourceSite:Site),
(targetFlow:ResourceFlow)<-[:HOSTS]-(targetSite:Site)
WHERE sourceFlow.direction = 'output'
AND targetFlow.direction = 'input'
AND sourceFlow.type = targetFlow.type
AND abs(coalesce(sourceFlow.temperature_celsius, 0) - coalesce(targetFlow.temperature_celsius, 0)) <= 10
WITH sourceFlow, targetFlow, sourceSite, targetSite,
point.distance(
point({longitude: sourceSite.longitude, latitude: sourceSite.latitude}),
point({longitude: targetSite.longitude, latitude: targetSite.latitude})
) / 1000 AS distance_km
WHERE distance_km <= $maxDistance
RETURN targetFlow.id, targetFlow.type, targetFlow.temperature_celsius,
targetFlow.amount, targetSite.name, distance_km
ORDER BY distance_km ASC
LIMIT 50
Matching Features:
- Direction filtering (output → input)
- Type matching (heat → heat, water → water, etc.)
- Temperature compatibility (ΔT ≤ 10°C)
- Geospatial distance calculation (Neo4j point.distance)
- Distance-based ranking
3. Graph Sync Service (internal/service/graph_sync_service.go)
Purpose: Orchestrates synchronization between PostgreSQL and Neo4j
Service Creation:
graphSyncService := service.NewGraphSyncService(
orgGraphRepo,
siteGraphRepo,
flowGraphRepo,
)
Single Entity Sync:
// Sync organization
err := graphSyncService.SyncOrganization(ctx, organization)
// Sync site
err := graphSyncService.SyncSite(ctx, site)
// Sync resource flow
err := graphSyncService.SyncResourceFlow(ctx, resourceFlow)
Deletion from Graph:
err := graphSyncService.DeleteOrganization(ctx, orgID)
err := graphSyncService.DeleteSite(ctx, siteID)
err := graphSyncService.DeleteResourceFlow(ctx, flowID)
Bulk Synchronization:
// Bulk sync organizations
err := graphSyncService.BulkSyncOrganizations(ctx, organizations)
// Bulk sync sites
err := graphSyncService.BulkSyncSites(ctx, sites)
// Bulk sync resource flows
err := graphSyncService.BulkSyncResourceFlows(ctx, resourceFlows)
Graph Matching:
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
Graceful Degradation:
- All sync methods check if graph repo is nil
- If Neo4j is disabled, sync operations are no-ops (return nil)
- Application continues working with PostgreSQL only
4. Configuration (pkg/config/config.go)
Environment Variables:
# Neo4j Configuration
NEO4J_URI=neo4j://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your-password
NEO4J_DATABASE=neo4j
NEO4J_ENABLED=true # Set to false to disable graph sync
# PostgreSQL Configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=bugulma
POSTGRES_PASSWORD=bugulma
POSTGRES_DB=bugulma_city
POSTGRES_SSLMODE=disable
Config Loading:
cfg := config.Load()
// Neo4j is disabled by default
// Set NEO4J_ENABLED=true to enable
if cfg.Neo4jEnabled {
// Initialize Neo4j driver and sync
}
Graph Schema
Node Types
Organization:
(:Organization {
id: string,
name: string,
sector: string,
subtype: string,
latitude: float,
longitude: float,
industrial_sector: string,
company_size: int,
sells_products: json,
offers_services: json,
...
})
Site:
(:Site {
id: string,
name: string,
latitude: float,
longitude: float,
site_type: string,
floor_area_m2: float,
available_utilities: json,
...
})
ResourceFlow:
(:ResourceFlow {
id: string,
business_id: string,
site_id: string,
direction: string, // 'input' or 'output'
type: string, // 'heat', 'water', 'steam', etc.
temperature_celsius: float,
amount: float,
unit: string,
precision_level: string,
...
})
Relationships
Core Relationships (from concept):
(Organization)-[:OPERATES_AT]->(Site)
(Site)-[:HOSTS]->(ResourceFlow)
(ResourceFlow)-[:MATCHABLE_TO {efficiency, distance, savings}]->(ResourceFlow)
(Organization)-[:TRUSTS]->(Organization)
Implemented:
- ✅
Organization -[:OPERATES_AT]-> Site - ✅
Site -[:HOSTS]-> ResourceFlow - 🔄
ResourceFlow -[:MATCHABLE_TO]-> ResourceFlow(computed dynamically via queries) - 🔄
Organization -[:TRUSTS]-> Organization(to be implemented)
Integration with Existing Services
Pattern for Service Integration
Example: OrganizationService with Graph Sync
type OrganizationService struct {
repo domain.OrganizationRepository
graphSyncService *GraphSyncService // Optional
}
func (s *OrganizationService) Create(req CreateOrganizationRequest) (*domain.Organization, error) {
// 1. Create in PostgreSQL (primary)
org := &domain.Organization{ /* ... */ }
if err := s.repo.Create(org); err != nil {
return nil, err
}
// 2. Sync to Neo4j (secondary, non-blocking)
if s.graphSyncService != nil {
go func() {
ctx := context.Background()
if err := s.graphSyncService.SyncOrganization(ctx, org); err != nil {
log.Printf("Failed to sync organization to graph: %v", err)
}
}()
}
return org, nil
}
func (s *OrganizationService) Delete(id string) error {
// 1. Delete from PostgreSQL (primary)
if err := s.repo.Delete(id); err != nil {
return err
}
// 2. Delete from Neo4j (secondary)
if s.graphSyncService != nil {
go func() {
ctx := context.Background()
if err := s.graphSyncService.DeleteOrganization(ctx, id); err != nil {
log.Printf("Failed to delete organization from graph: %v", err)
}
}()
}
return nil
}
Usage Examples
Setup Neo4j Integration
package main
import (
"context"
"log"
"bugulma/backend/pkg/database"
"bugulma/backend/pkg/config"
"bugulma/backend/internal/repository"
"bugulma/backend/internal/service"
)
func main() {
cfg := config.Load()
var graphSyncService *service.GraphSyncService
if cfg.Neo4jEnabled {
// Initialize Neo4j driver
neo4jConfig := database.Neo4jConfig{
URI: cfg.Neo4jURI,
Username: cfg.Neo4jUsername,
Password: cfg.Neo4jPassword,
Database: cfg.Neo4jDatabase,
}
driver, err := database.NewNeo4jDriver(neo4jConfig)
if err != nil {
log.Printf("Warning: Failed to connect to Neo4j: %v", err)
} else {
// Initialize schema
ctx := context.Background()
if err := database.InitializeSchema(ctx, driver, cfg.Neo4jDatabase); err != nil {
log.Printf("Warning: Failed to initialize Neo4j schema: %v", err)
}
// Create graph repositories
orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase)
siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase)
flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase)
// Create graph sync service
graphSyncService = service.NewGraphSyncService(
orgGraphRepo,
siteGraphRepo,
flowGraphRepo,
)
log.Println("Neo4j graph database integration enabled")
}
}
// Pass graphSyncService to other services
// ...
}
Bulk Sync Existing Data
func syncExistingData(
orgRepo domain.OrganizationRepository,
siteRepo domain.SiteRepository,
flowRepo domain.ResourceFlowRepository,
graphSyncService *service.GraphSyncService,
) error {
ctx := context.Background()
// Sync all organizations
orgs, err := orgRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncOrganizations(ctx, orgs); err != nil {
return err
}
log.Printf("Synced %d organizations to graph", len(orgs))
// Sync all sites
sites, err := siteRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncSites(ctx, sites); err != nil {
return err
}
log.Printf("Synced %d sites to graph", len(sites))
// Sync all resource flows
flows, err := flowRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncResourceFlows(ctx, flows); err != nil {
return err
}
log.Printf("Synced %d resource flows to graph", len(flows))
return nil
}
Graph-Based Matching
// Use graph traversal for advanced matching
func findAdvancedMatches(
graphSyncService *service.GraphSyncService,
flowID string,
) {
ctx := context.Background()
// Find matches using Neo4j graph traversal
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
if err != nil {
log.Printf("Graph matching failed, falling back to PostgreSQL: %v", err)
// Fallback to PostgreSQL-based matching
return
}
// Process graph matches
for _, match := range matches {
log.Printf("Match found: %s at %s (%.2f km away)",
match["target_flow_id"],
match["target_site_name"],
match["distance_km"],
)
}
}
Performance Considerations
Query Performance
PostgreSQL: Excellent for:
- Spatial filtering (PostGIS indexes)
- JSONB queries (GIN indexes)
- Transaction-heavy workloads
- Exact point lookups
Neo4j: Excellent for:
- Multi-hop graph traversals
- Relationship pattern matching
- Network analysis
- Discovering indirect connections
When to Use Each Database
Use PostgreSQL When:
- CRUD operations on single entities
- Spatial radius queries (GetWithinRadius)
- Transaction integrity is critical
- Querying by specific attributes
Use Neo4j When:
- Finding indirect symbiosis opportunities
- Multi-party matching (3+ organizations)
- Trust network analysis
- Supply chain relationship mapping
- Pattern-based discovery
Scaling Strategy
MVP (current):
- Single PostgreSQL instance
- Single Neo4j instance
- Async sync (fire-and-forget)
Scale Phase (1000+ businesses):
- PostgreSQL read replicas
- Neo4j clustering (Enterprise)
- Event-driven sync via NATS/Kafka
- Eventual consistency model
Future Enhancements
Phase 2: Advanced Graph Features
-
MATCHABLE_TO Relationship:
- Pre-compute match relationships
- Store efficiency scores as edge properties
- Update on resource flow changes
-
Trust Network:
(Organization)-[:TRUSTS]->(Organization)- Trust score propagation
- Transitive trust calculations
-
Multi-Party Matching:
- 3+ party circular symbioses
- Cypher queries for complex patterns
- Optimization algorithms
-
Knowledge Graph Integration:
- Semantic relationships
- Taxonomy integration (NACE, EWC codes)
- GraphRAG for AI-enhanced matching
Phase 3: Federation
Zone-Based Architecture (from concept):
- Regional Neo4j graphs (city zones, industrial parks)
- Global federation layer
- Cross-zone query protocol
- Data sovereignty compliance
Troubleshooting
Common Issues
Neo4j Connection Refused:
Error: failed to verify Neo4j connectivity: connection refused
Solution: Ensure Neo4j is running on specified URI, check credentials
Schema Creation Fails:
Error: failed to create constraint: syntax error
Solution: Verify Neo4j version supports IF NOT EXISTS syntax (4.0+)
Sync Performance Degradation: Solution: Enable async sync (goroutines), batch updates, use event queue
Monitoring
Recommended Metrics:
- Neo4j connection pool usage
- Sync operation latency
- Failed sync count
- Graph query response time
- Node/relationship count
References
- Concept Documentation:
concept/09_graph_database_design.md - Dev Guide:
dev_guides/02_neo4j_driver.md - Example Queries:
concept/23_example_query_in_cypher_neo4j.md - Neo4j Go Driver: https://github.com/neo4j/neo4j-go-driver
- Neo4j Cypher Manual: https://neo4j.com/docs/cypher-manual/current/