turash/bugulma/backend/GRAPH_DATABASE_INTEGRATION.md
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

652 lines
16 KiB
Markdown

# Neo4j Graph Database Integration
This document describes the hybrid PostgreSQL + Neo4j architecture implementation based on the concept documentation.
## Architecture Overview
The platform uses a **hybrid database architecture**:
### PostgreSQL + PostGIS
- **Purpose**: Primary data store for structured data and geospatial queries
- **Stores**: Organizations, Sites, ResourceFlows, Matches, SharedAssets with full GORM models
- **Strengths**: ACID compliance, complex spatial queries, JSONB for flexible data
- **Use Cases**: Transactional operations, detailed entity storage, spatial filtering
### Neo4j Graph Database
- **Purpose**: Graph traversal and relationship-based matching
- **Stores**: Same entities as nodes with relationships (OPERATES_AT, HOSTS, MATCHABLE_TO)
- **Strengths**: Multi-hop traversals, pattern matching, relationship queries
- **Use Cases**: Advanced matching algorithms, network analysis, symbiosis discovery
### Synchronization Pattern
- **Event-driven sync**: PostgreSQL → Neo4j on create/update/delete
- **Dual-write pattern**: Services write to PostgreSQL (primary), then sync to Neo4j (secondary)
- **Graceful degradation**: System continues working if Neo4j is unavailable
## Implementation Components
### 1. Database Configuration (`pkg/database/neo4j.go`)
**Neo4j Driver Setup**:
```go
config := database.Neo4jConfig{
URI: "neo4j://localhost:7687",
Username: "neo4j",
Password: "password",
Database: "neo4j",
}
driver, err := database.NewNeo4jDriver(config)
```
**Features**:
- Connection pooling (max 50 connections)
- 30-second acquisition timeout
- 30-second transaction retry time
- Automatic connectivity verification
**Schema Initialization**:
```go
err := database.InitializeSchema(ctx, driver, "neo4j")
```
Creates:
- **Uniqueness constraints** on `id` fields for all node types
- **Indexes** for performance:
- Organization: name, sector, subtype
- Site: location (lat/lng), site_type
- ResourceFlow: type, direction, type+direction composite
- Match: status, compatibility_score
- SharedAsset: type
### 2. Graph Repositories (`internal/repository/graph_repository.go`)
#### GraphOrganizationRepository
**Sync Organization to Graph**:
```go
repo := repository.NewGraphOrganizationRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, organization)
```
**Cypher Operation**:
- `MERGE (o:Organization {id: $id})` - Upsert node
- Sets all properties including JSONB as JSON strings
- Stores timestamps in RFC3339 format
**Properties Synced**:
- Basic: id, name, sector, subtype, description, address
- Location: latitude, longitude
- Business: legal_form, industrial_sector, company_size, years_operation
- Products/Services: sells_products, offers_services (as JSON)
- Metadata: verified, created_at, updated_at
#### GraphSiteRepository
**Sync Site to Graph**:
```go
repo := repository.NewGraphSiteRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, site)
```
**Cypher Operation**:
- `MERGE (s:Site {id: $id})` - Upsert site node
- `MERGE (o)-[:OPERATES_AT]->(s)` - Create relationship to owner organization
- Conditional relationship creation (only if owner exists)
**Relationships Created**:
- `Organization -[:OPERATES_AT]-> Site`
#### GraphResourceFlowRepository
**Sync Resource Flow to Graph**:
```go
repo := repository.NewGraphResourceFlowRepository(driver, "neo4j")
err := repo.SyncToGraph(ctx, resourceFlow)
```
**Cypher Operation**:
- `MERGE (rf:ResourceFlow {id: $id})` - Upsert resource flow node
- `MERGE (s)-[:HOSTS]->(rf)` - Create relationship to site
- Unpacks JSONB fields (Quality, Quantity, TimeProfile, EconomicData) for graph properties
**Relationships Created**:
- `Site -[:HOSTS]-> ResourceFlow`
**Properties Synced**:
- Identification: id, business_id, site_id, direction, type
- Quality: temperature_celsius, pressure_bar, purity_pct, grade, hazardousness, physical_state
- Quantity: amount, unit, temporal_unit
- Temporal: supply_pattern
- Economic: cost_in, cost_out, transportation_cost
- Metadata: precision_level, source_type
**Graph-Based Matching**:
```go
matches, err := repo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)
```
**Cypher Query** (from concept documentation):
```cypher
MATCH (sourceFlow:ResourceFlow {id: $flowID})<-[:HOSTS]-(sourceSite:Site),
(targetFlow:ResourceFlow)<-[:HOSTS]-(targetSite:Site)
WHERE sourceFlow.direction = 'output'
AND targetFlow.direction = 'input'
AND sourceFlow.type = targetFlow.type
AND abs(coalesce(sourceFlow.temperature_celsius, 0) - coalesce(targetFlow.temperature_celsius, 0)) <= 10
WITH sourceFlow, targetFlow, sourceSite, targetSite,
point.distance(
point({longitude: sourceSite.longitude, latitude: sourceSite.latitude}),
point({longitude: targetSite.longitude, latitude: targetSite.latitude})
) / 1000 AS distance_km
WHERE distance_km <= $maxDistance
RETURN targetFlow.id, targetFlow.type, targetFlow.temperature_celsius,
targetFlow.amount, targetSite.name, distance_km
ORDER BY distance_km ASC
LIMIT 50
```
**Matching Features**:
- Direction filtering (output → input)
- Type matching (heat → heat, water → water, etc.)
- Temperature compatibility (ΔT ≤ 10°C)
- Geospatial distance calculation (Neo4j point.distance)
- Distance-based ranking
### 3. Graph Sync Service (`internal/service/graph_sync_service.go`)
**Purpose**: Orchestrates synchronization between PostgreSQL and Neo4j
**Service Creation**:
```go
graphSyncService := service.NewGraphSyncService(
orgGraphRepo,
siteGraphRepo,
flowGraphRepo,
)
```
**Single Entity Sync**:
```go
// Sync organization
err := graphSyncService.SyncOrganization(ctx, organization)
// Sync site
err := graphSyncService.SyncSite(ctx, site)
// Sync resource flow
err := graphSyncService.SyncResourceFlow(ctx, resourceFlow)
```
**Deletion from Graph**:
```go
err := graphSyncService.DeleteOrganization(ctx, orgID)
err := graphSyncService.DeleteSite(ctx, siteID)
err := graphSyncService.DeleteResourceFlow(ctx, flowID)
```
**Bulk Synchronization**:
```go
// Bulk sync organizations
err := graphSyncService.BulkSyncOrganizations(ctx, organizations)
// Bulk sync sites
err := graphSyncService.BulkSyncSites(ctx, sites)
// Bulk sync resource flows
err := graphSyncService.BulkSyncResourceFlows(ctx, resourceFlows)
```
**Graph Matching**:
```go
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
```
**Graceful Degradation**:
- All sync methods check if graph repo is nil
- If Neo4j is disabled, sync operations are no-ops (return nil)
- Application continues working with PostgreSQL only
### 4. Configuration (`pkg/config/config.go`)
**Environment Variables**:
```bash
# Neo4j Configuration
NEO4J_URI=neo4j://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your-password
NEO4J_DATABASE=neo4j
NEO4J_ENABLED=true # Set to false to disable graph sync
# PostgreSQL Configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=bugulma
POSTGRES_PASSWORD=bugulma
POSTGRES_DB=bugulma_city
POSTGRES_SSLMODE=disable
```
**Config Loading**:
```go
cfg := config.Load()
// Neo4j is disabled by default
// Set NEO4J_ENABLED=true to enable
if cfg.Neo4jEnabled {
// Initialize Neo4j driver and sync
}
```
## Graph Schema
### Node Types
**Organization**:
```cypher
(:Organization {
id: string,
name: string,
sector: string,
subtype: string,
latitude: float,
longitude: float,
industrial_sector: string,
company_size: int,
sells_products: json,
offers_services: json,
...
})
```
**Site**:
```cypher
(:Site {
id: string,
name: string,
latitude: float,
longitude: float,
site_type: string,
floor_area_m2: float,
available_utilities: json,
...
})
```
**ResourceFlow**:
```cypher
(:ResourceFlow {
id: string,
business_id: string,
site_id: string,
direction: string, // 'input' or 'output'
type: string, // 'heat', 'water', 'steam', etc.
temperature_celsius: float,
amount: float,
unit: string,
precision_level: string,
...
})
```
### Relationships
**Core Relationships** (from concept):
```cypher
(Organization)-[:OPERATES_AT]->(Site)
(Site)-[:HOSTS]->(ResourceFlow)
(ResourceFlow)-[:MATCHABLE_TO {efficiency, distance, savings}]->(ResourceFlow)
(Organization)-[:TRUSTS]->(Organization)
```
**Implemented**:
-`Organization -[:OPERATES_AT]-> Site`
-`Site -[:HOSTS]-> ResourceFlow`
- 🔄 `ResourceFlow -[:MATCHABLE_TO]-> ResourceFlow` (computed dynamically via queries)
- 🔄 `Organization -[:TRUSTS]-> Organization` (to be implemented)
## Integration with Existing Services
### Pattern for Service Integration
**Example: OrganizationService with Graph Sync**
```go
type OrganizationService struct {
repo domain.OrganizationRepository
graphSyncService *GraphSyncService // Optional
}
func (s *OrganizationService) Create(req CreateOrganizationRequest) (*domain.Organization, error) {
// 1. Create in PostgreSQL (primary)
org := &domain.Organization{ /* ... */ }
if err := s.repo.Create(org); err != nil {
return nil, err
}
// 2. Sync to Neo4j (secondary, non-blocking)
if s.graphSyncService != nil {
go func() {
ctx := context.Background()
if err := s.graphSyncService.SyncOrganization(ctx, org); err != nil {
log.Printf("Failed to sync organization to graph: %v", err)
}
}()
}
return org, nil
}
func (s *OrganizationService) Delete(id string) error {
// 1. Delete from PostgreSQL (primary)
if err := s.repo.Delete(id); err != nil {
return err
}
// 2. Delete from Neo4j (secondary)
if s.graphSyncService != nil {
go func() {
ctx := context.Background()
if err := s.graphSyncService.DeleteOrganization(ctx, id); err != nil {
log.Printf("Failed to delete organization from graph: %v", err)
}
}()
}
return nil
}
```
## Usage Examples
### Setup Neo4j Integration
```go
package main
import (
"context"
"log"
"bugulma/backend/pkg/database"
"bugulma/backend/pkg/config"
"bugulma/backend/internal/repository"
"bugulma/backend/internal/service"
)
func main() {
cfg := config.Load()
var graphSyncService *service.GraphSyncService
if cfg.Neo4jEnabled {
// Initialize Neo4j driver
neo4jConfig := database.Neo4jConfig{
URI: cfg.Neo4jURI,
Username: cfg.Neo4jUsername,
Password: cfg.Neo4jPassword,
Database: cfg.Neo4jDatabase,
}
driver, err := database.NewNeo4jDriver(neo4jConfig)
if err != nil {
log.Printf("Warning: Failed to connect to Neo4j: %v", err)
} else {
// Initialize schema
ctx := context.Background()
if err := database.InitializeSchema(ctx, driver, cfg.Neo4jDatabase); err != nil {
log.Printf("Warning: Failed to initialize Neo4j schema: %v", err)
}
// Create graph repositories
orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase)
siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase)
flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase)
// Create graph sync service
graphSyncService = service.NewGraphSyncService(
orgGraphRepo,
siteGraphRepo,
flowGraphRepo,
)
log.Println("Neo4j graph database integration enabled")
}
}
// Pass graphSyncService to other services
// ...
}
```
### Bulk Sync Existing Data
```go
func syncExistingData(
orgRepo domain.OrganizationRepository,
siteRepo domain.SiteRepository,
flowRepo domain.ResourceFlowRepository,
graphSyncService *service.GraphSyncService,
) error {
ctx := context.Background()
// Sync all organizations
orgs, err := orgRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncOrganizations(ctx, orgs); err != nil {
return err
}
log.Printf("Synced %d organizations to graph", len(orgs))
// Sync all sites
sites, err := siteRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncSites(ctx, sites); err != nil {
return err
}
log.Printf("Synced %d sites to graph", len(sites))
// Sync all resource flows
flows, err := flowRepo.GetAll()
if err != nil {
return err
}
if err := graphSyncService.BulkSyncResourceFlows(ctx, flows); err != nil {
return err
}
log.Printf("Synced %d resource flows to graph", len(flows))
return nil
}
```
### Graph-Based Matching
```go
// Use graph traversal for advanced matching
func findAdvancedMatches(
graphSyncService *service.GraphSyncService,
flowID string,
) {
ctx := context.Background()
// Find matches using Neo4j graph traversal
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
if err != nil {
log.Printf("Graph matching failed, falling back to PostgreSQL: %v", err)
// Fallback to PostgreSQL-based matching
return
}
// Process graph matches
for _, match := range matches {
log.Printf("Match found: %s at %s (%.2f km away)",
match["target_flow_id"],
match["target_site_name"],
match["distance_km"],
)
}
}
```
## Performance Considerations
### Query Performance
**PostgreSQL**: Excellent for:
- Spatial filtering (PostGIS indexes)
- JSONB queries (GIN indexes)
- Transaction-heavy workloads
- Exact point lookups
**Neo4j**: Excellent for:
- Multi-hop graph traversals
- Relationship pattern matching
- Network analysis
- Discovering indirect connections
### When to Use Each Database
**Use PostgreSQL When**:
- CRUD operations on single entities
- Spatial radius queries (GetWithinRadius)
- Transaction integrity is critical
- Querying by specific attributes
**Use Neo4j When**:
- Finding indirect symbiosis opportunities
- Multi-party matching (3+ organizations)
- Trust network analysis
- Supply chain relationship mapping
- Pattern-based discovery
### Scaling Strategy
**MVP** (current):
- Single PostgreSQL instance
- Single Neo4j instance
- Async sync (fire-and-forget)
**Scale Phase** (1000+ businesses):
- PostgreSQL read replicas
- Neo4j clustering (Enterprise)
- Event-driven sync via NATS/Kafka
- Eventual consistency model
## Future Enhancements
### Phase 2: Advanced Graph Features
1. **MATCHABLE_TO Relationship**:
- Pre-compute match relationships
- Store efficiency scores as edge properties
- Update on resource flow changes
2. **Trust Network**:
- `(Organization)-[:TRUSTS]->(Organization)`
- Trust score propagation
- Transitive trust calculations
3. **Multi-Party Matching**:
- 3+ party circular symbioses
- Cypher queries for complex patterns
- Optimization algorithms
4. **Knowledge Graph Integration**:
- Semantic relationships
- Taxonomy integration (NACE, EWC codes)
- GraphRAG for AI-enhanced matching
### Phase 3: Federation
**Zone-Based Architecture** (from concept):
- Regional Neo4j graphs (city zones, industrial parks)
- Global federation layer
- Cross-zone query protocol
- Data sovereignty compliance
## Troubleshooting
### Common Issues
**Neo4j Connection Refused**:
```
Error: failed to verify Neo4j connectivity: connection refused
```
**Solution**: Ensure Neo4j is running on specified URI, check credentials
**Schema Creation Fails**:
```
Error: failed to create constraint: syntax error
```
**Solution**: Verify Neo4j version supports `IF NOT EXISTS` syntax (4.0+)
**Sync Performance Degradation**:
**Solution**: Enable async sync (goroutines), batch updates, use event queue
### Monitoring
**Recommended Metrics**:
- Neo4j connection pool usage
- Sync operation latency
- Failed sync count
- Graph query response time
- Node/relationship count
## References
- **Concept Documentation**: `concept/09_graph_database_design.md`
- **Dev Guide**: `dev_guides/02_neo4j_driver.md`
- **Example Queries**: `concept/23_example_query_in_cypher_neo4j.md`
- **Neo4j Go Driver**: <https://github.com/neo4j/neo4j-go-driver>
- **Neo4j Cypher Manual**: <https://neo4j.com/docs/cypher-manual/current/>