mirror of
https://github.com/SamyRai/turash.git
synced 2025-12-26 23:01:33 +00:00
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools
Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
* GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
* GET /api/v1/users/me/organizations - User organizations
* POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue
API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules
Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
652 lines
16 KiB
Markdown
652 lines
16 KiB
Markdown
# Neo4j Graph Database Integration
|
|
|
|
This document describes the hybrid PostgreSQL + Neo4j architecture implementation based on the concept documentation.
|
|
|
|
## Architecture Overview
|
|
|
|
The platform uses a **hybrid database architecture**:
|
|
|
|
### PostgreSQL + PostGIS
|
|
|
|
- **Purpose**: Primary data store for structured data and geospatial queries
|
|
- **Stores**: Organizations, Sites, ResourceFlows, Matches, SharedAssets with full GORM models
|
|
- **Strengths**: ACID compliance, complex spatial queries, JSONB for flexible data
|
|
- **Use Cases**: Transactional operations, detailed entity storage, spatial filtering
|
|
|
|
### Neo4j Graph Database
|
|
|
|
- **Purpose**: Graph traversal and relationship-based matching
|
|
- **Stores**: Same entities as nodes with relationships (OPERATES_AT, HOSTS, MATCHABLE_TO)
|
|
- **Strengths**: Multi-hop traversals, pattern matching, relationship queries
|
|
- **Use Cases**: Advanced matching algorithms, network analysis, symbiosis discovery
|
|
|
|
### Synchronization Pattern
|
|
|
|
- **Event-driven sync**: PostgreSQL → Neo4j on create/update/delete
|
|
- **Dual-write pattern**: Services write to PostgreSQL (primary), then sync to Neo4j (secondary)
|
|
- **Graceful degradation**: System continues working if Neo4j is unavailable
|
|
|
|
## Implementation Components
|
|
|
|
### 1. Database Configuration (`pkg/database/neo4j.go`)
|
|
|
|
**Neo4j Driver Setup**:
|
|
|
|
```go
|
|
config := database.Neo4jConfig{
|
|
URI: "neo4j://localhost:7687",
|
|
Username: "neo4j",
|
|
Password: "password",
|
|
Database: "neo4j",
|
|
}
|
|
|
|
driver, err := database.NewNeo4jDriver(config)
|
|
```
|
|
|
|
**Features**:
|
|
|
|
- Connection pooling (max 50 connections)
|
|
- 30-second acquisition timeout
|
|
- 30-second transaction retry time
|
|
- Automatic connectivity verification
|
|
|
|
**Schema Initialization**:
|
|
|
|
```go
|
|
err := database.InitializeSchema(ctx, driver, "neo4j")
|
|
```
|
|
|
|
Creates:
|
|
|
|
- **Uniqueness constraints** on `id` fields for all node types
|
|
- **Indexes** for performance:
|
|
- Organization: name, sector, subtype
|
|
- Site: location (lat/lng), site_type
|
|
- ResourceFlow: type, direction, type+direction composite
|
|
- Match: status, compatibility_score
|
|
- SharedAsset: type
|
|
|
|
### 2. Graph Repositories (`internal/repository/graph_repository.go`)
|
|
|
|
#### GraphOrganizationRepository
|
|
|
|
**Sync Organization to Graph**:
|
|
|
|
```go
|
|
repo := repository.NewGraphOrganizationRepository(driver, "neo4j")
|
|
err := repo.SyncToGraph(ctx, organization)
|
|
```
|
|
|
|
**Cypher Operation**:
|
|
|
|
- `MERGE (o:Organization {id: $id})` - Upsert node
|
|
- Sets all properties including JSONB as JSON strings
|
|
- Stores timestamps in RFC3339 format
|
|
|
|
**Properties Synced**:
|
|
|
|
- Basic: id, name, sector, subtype, description, address
|
|
- Location: latitude, longitude
|
|
- Business: legal_form, industrial_sector, company_size, years_operation
|
|
- Products/Services: sells_products, offers_services (as JSON)
|
|
- Metadata: verified, created_at, updated_at
|
|
|
|
#### GraphSiteRepository
|
|
|
|
**Sync Site to Graph**:
|
|
|
|
```go
|
|
repo := repository.NewGraphSiteRepository(driver, "neo4j")
|
|
err := repo.SyncToGraph(ctx, site)
|
|
```
|
|
|
|
**Cypher Operation**:
|
|
|
|
- `MERGE (s:Site {id: $id})` - Upsert site node
|
|
- `MERGE (o)-[:OPERATES_AT]->(s)` - Create relationship to owner organization
|
|
- Conditional relationship creation (only if owner exists)
|
|
|
|
**Relationships Created**:
|
|
|
|
- `Organization -[:OPERATES_AT]-> Site`
|
|
|
|
#### GraphResourceFlowRepository
|
|
|
|
**Sync Resource Flow to Graph**:
|
|
|
|
```go
|
|
repo := repository.NewGraphResourceFlowRepository(driver, "neo4j")
|
|
err := repo.SyncToGraph(ctx, resourceFlow)
|
|
```
|
|
|
|
**Cypher Operation**:
|
|
|
|
- `MERGE (rf:ResourceFlow {id: $id})` - Upsert resource flow node
|
|
- `MERGE (s)-[:HOSTS]->(rf)` - Create relationship to site
|
|
- Unpacks JSONB fields (Quality, Quantity, TimeProfile, EconomicData) for graph properties
|
|
|
|
**Relationships Created**:
|
|
|
|
- `Site -[:HOSTS]-> ResourceFlow`
|
|
|
|
**Properties Synced**:
|
|
|
|
- Identification: id, business_id, site_id, direction, type
|
|
- Quality: temperature_celsius, pressure_bar, purity_pct, grade, hazardousness, physical_state
|
|
- Quantity: amount, unit, temporal_unit
|
|
- Temporal: supply_pattern
|
|
- Economic: cost_in, cost_out, transportation_cost
|
|
- Metadata: precision_level, source_type
|
|
|
|
**Graph-Based Matching**:
|
|
|
|
```go
|
|
matches, err := repo.FindMatchesInGraph(ctx, flowID, maxDistanceKm)
|
|
```
|
|
|
|
**Cypher Query** (from concept documentation):
|
|
|
|
```cypher
|
|
MATCH (sourceFlow:ResourceFlow {id: $flowID})<-[:HOSTS]-(sourceSite:Site),
|
|
(targetFlow:ResourceFlow)<-[:HOSTS]-(targetSite:Site)
|
|
WHERE sourceFlow.direction = 'output'
|
|
AND targetFlow.direction = 'input'
|
|
AND sourceFlow.type = targetFlow.type
|
|
AND abs(coalesce(sourceFlow.temperature_celsius, 0) - coalesce(targetFlow.temperature_celsius, 0)) <= 10
|
|
WITH sourceFlow, targetFlow, sourceSite, targetSite,
|
|
point.distance(
|
|
point({longitude: sourceSite.longitude, latitude: sourceSite.latitude}),
|
|
point({longitude: targetSite.longitude, latitude: targetSite.latitude})
|
|
) / 1000 AS distance_km
|
|
WHERE distance_km <= $maxDistance
|
|
RETURN targetFlow.id, targetFlow.type, targetFlow.temperature_celsius,
|
|
targetFlow.amount, targetSite.name, distance_km
|
|
ORDER BY distance_km ASC
|
|
LIMIT 50
|
|
```
|
|
|
|
**Matching Features**:
|
|
|
|
- Direction filtering (output → input)
|
|
- Type matching (heat → heat, water → water, etc.)
|
|
- Temperature compatibility (ΔT ≤ 10°C)
|
|
- Geospatial distance calculation (Neo4j point.distance)
|
|
- Distance-based ranking
|
|
|
|
### 3. Graph Sync Service (`internal/service/graph_sync_service.go`)
|
|
|
|
**Purpose**: Orchestrates synchronization between PostgreSQL and Neo4j
|
|
|
|
**Service Creation**:
|
|
|
|
```go
|
|
graphSyncService := service.NewGraphSyncService(
|
|
orgGraphRepo,
|
|
siteGraphRepo,
|
|
flowGraphRepo,
|
|
)
|
|
```
|
|
|
|
**Single Entity Sync**:
|
|
|
|
```go
|
|
// Sync organization
|
|
err := graphSyncService.SyncOrganization(ctx, organization)
|
|
|
|
// Sync site
|
|
err := graphSyncService.SyncSite(ctx, site)
|
|
|
|
// Sync resource flow
|
|
err := graphSyncService.SyncResourceFlow(ctx, resourceFlow)
|
|
```
|
|
|
|
**Deletion from Graph**:
|
|
|
|
```go
|
|
err := graphSyncService.DeleteOrganization(ctx, orgID)
|
|
err := graphSyncService.DeleteSite(ctx, siteID)
|
|
err := graphSyncService.DeleteResourceFlow(ctx, flowID)
|
|
```
|
|
|
|
**Bulk Synchronization**:
|
|
|
|
```go
|
|
// Bulk sync organizations
|
|
err := graphSyncService.BulkSyncOrganizations(ctx, organizations)
|
|
|
|
// Bulk sync sites
|
|
err := graphSyncService.BulkSyncSites(ctx, sites)
|
|
|
|
// Bulk sync resource flows
|
|
err := graphSyncService.BulkSyncResourceFlows(ctx, resourceFlows)
|
|
```
|
|
|
|
**Graph Matching**:
|
|
|
|
```go
|
|
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
|
|
```
|
|
|
|
**Graceful Degradation**:
|
|
|
|
- All sync methods check if graph repo is nil
|
|
- If Neo4j is disabled, sync operations are no-ops (return nil)
|
|
- Application continues working with PostgreSQL only
|
|
|
|
### 4. Configuration (`pkg/config/config.go`)
|
|
|
|
**Environment Variables**:
|
|
|
|
```bash
|
|
# Neo4j Configuration
|
|
NEO4J_URI=neo4j://localhost:7687
|
|
NEO4J_USERNAME=neo4j
|
|
NEO4J_PASSWORD=your-password
|
|
NEO4J_DATABASE=neo4j
|
|
NEO4J_ENABLED=true # Set to false to disable graph sync
|
|
|
|
# PostgreSQL Configuration
|
|
POSTGRES_HOST=localhost
|
|
POSTGRES_PORT=5432
|
|
POSTGRES_USER=bugulma
|
|
POSTGRES_PASSWORD=bugulma
|
|
POSTGRES_DB=bugulma_city
|
|
POSTGRES_SSLMODE=disable
|
|
```
|
|
|
|
**Config Loading**:
|
|
|
|
```go
|
|
cfg := config.Load()
|
|
|
|
// Neo4j is disabled by default
|
|
// Set NEO4J_ENABLED=true to enable
|
|
if cfg.Neo4jEnabled {
|
|
// Initialize Neo4j driver and sync
|
|
}
|
|
```
|
|
|
|
## Graph Schema
|
|
|
|
### Node Types
|
|
|
|
**Organization**:
|
|
|
|
```cypher
|
|
(:Organization {
|
|
id: string,
|
|
name: string,
|
|
sector: string,
|
|
subtype: string,
|
|
latitude: float,
|
|
longitude: float,
|
|
industrial_sector: string,
|
|
company_size: int,
|
|
sells_products: json,
|
|
offers_services: json,
|
|
...
|
|
})
|
|
```
|
|
|
|
**Site**:
|
|
|
|
```cypher
|
|
(:Site {
|
|
id: string,
|
|
name: string,
|
|
latitude: float,
|
|
longitude: float,
|
|
site_type: string,
|
|
floor_area_m2: float,
|
|
available_utilities: json,
|
|
...
|
|
})
|
|
```
|
|
|
|
**ResourceFlow**:
|
|
|
|
```cypher
|
|
(:ResourceFlow {
|
|
id: string,
|
|
business_id: string,
|
|
site_id: string,
|
|
direction: string, // 'input' or 'output'
|
|
type: string, // 'heat', 'water', 'steam', etc.
|
|
temperature_celsius: float,
|
|
amount: float,
|
|
unit: string,
|
|
precision_level: string,
|
|
...
|
|
})
|
|
```
|
|
|
|
### Relationships
|
|
|
|
**Core Relationships** (from concept):
|
|
|
|
```cypher
|
|
(Organization)-[:OPERATES_AT]->(Site)
|
|
(Site)-[:HOSTS]->(ResourceFlow)
|
|
(ResourceFlow)-[:MATCHABLE_TO {efficiency, distance, savings}]->(ResourceFlow)
|
|
(Organization)-[:TRUSTS]->(Organization)
|
|
```
|
|
|
|
**Implemented**:
|
|
|
|
- ✅ `Organization -[:OPERATES_AT]-> Site`
|
|
- ✅ `Site -[:HOSTS]-> ResourceFlow`
|
|
- 🔄 `ResourceFlow -[:MATCHABLE_TO]-> ResourceFlow` (computed dynamically via queries)
|
|
- 🔄 `Organization -[:TRUSTS]-> Organization` (to be implemented)
|
|
|
|
## Integration with Existing Services
|
|
|
|
### Pattern for Service Integration
|
|
|
|
**Example: OrganizationService with Graph Sync**
|
|
|
|
```go
|
|
type OrganizationService struct {
|
|
repo domain.OrganizationRepository
|
|
graphSyncService *GraphSyncService // Optional
|
|
}
|
|
|
|
func (s *OrganizationService) Create(req CreateOrganizationRequest) (*domain.Organization, error) {
|
|
// 1. Create in PostgreSQL (primary)
|
|
org := &domain.Organization{ /* ... */ }
|
|
if err := s.repo.Create(org); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 2. Sync to Neo4j (secondary, non-blocking)
|
|
if s.graphSyncService != nil {
|
|
go func() {
|
|
ctx := context.Background()
|
|
if err := s.graphSyncService.SyncOrganization(ctx, org); err != nil {
|
|
log.Printf("Failed to sync organization to graph: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
return org, nil
|
|
}
|
|
|
|
func (s *OrganizationService) Delete(id string) error {
|
|
// 1. Delete from PostgreSQL (primary)
|
|
if err := s.repo.Delete(id); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 2. Delete from Neo4j (secondary)
|
|
if s.graphSyncService != nil {
|
|
go func() {
|
|
ctx := context.Background()
|
|
if err := s.graphSyncService.DeleteOrganization(ctx, id); err != nil {
|
|
log.Printf("Failed to delete organization from graph: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
## Usage Examples
|
|
|
|
### Setup Neo4j Integration
|
|
|
|
```go
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
|
|
"bugulma/backend/pkg/database"
|
|
"bugulma/backend/pkg/config"
|
|
"bugulma/backend/internal/repository"
|
|
"bugulma/backend/internal/service"
|
|
)
|
|
|
|
func main() {
|
|
cfg := config.Load()
|
|
|
|
var graphSyncService *service.GraphSyncService
|
|
|
|
if cfg.Neo4jEnabled {
|
|
// Initialize Neo4j driver
|
|
neo4jConfig := database.Neo4jConfig{
|
|
URI: cfg.Neo4jURI,
|
|
Username: cfg.Neo4jUsername,
|
|
Password: cfg.Neo4jPassword,
|
|
Database: cfg.Neo4jDatabase,
|
|
}
|
|
|
|
driver, err := database.NewNeo4jDriver(neo4jConfig)
|
|
if err != nil {
|
|
log.Printf("Warning: Failed to connect to Neo4j: %v", err)
|
|
} else {
|
|
// Initialize schema
|
|
ctx := context.Background()
|
|
if err := database.InitializeSchema(ctx, driver, cfg.Neo4jDatabase); err != nil {
|
|
log.Printf("Warning: Failed to initialize Neo4j schema: %v", err)
|
|
}
|
|
|
|
// Create graph repositories
|
|
orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase)
|
|
siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase)
|
|
flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase)
|
|
|
|
// Create graph sync service
|
|
graphSyncService = service.NewGraphSyncService(
|
|
orgGraphRepo,
|
|
siteGraphRepo,
|
|
flowGraphRepo,
|
|
)
|
|
|
|
log.Println("Neo4j graph database integration enabled")
|
|
}
|
|
}
|
|
|
|
// Pass graphSyncService to other services
|
|
// ...
|
|
}
|
|
```
|
|
|
|
### Bulk Sync Existing Data
|
|
|
|
```go
|
|
func syncExistingData(
|
|
orgRepo domain.OrganizationRepository,
|
|
siteRepo domain.SiteRepository,
|
|
flowRepo domain.ResourceFlowRepository,
|
|
graphSyncService *service.GraphSyncService,
|
|
) error {
|
|
ctx := context.Background()
|
|
|
|
// Sync all organizations
|
|
orgs, err := orgRepo.GetAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := graphSyncService.BulkSyncOrganizations(ctx, orgs); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Synced %d organizations to graph", len(orgs))
|
|
|
|
// Sync all sites
|
|
sites, err := siteRepo.GetAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := graphSyncService.BulkSyncSites(ctx, sites); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Synced %d sites to graph", len(sites))
|
|
|
|
// Sync all resource flows
|
|
flows, err := flowRepo.GetAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := graphSyncService.BulkSyncResourceFlows(ctx, flows); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Synced %d resource flows to graph", len(flows))
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
### Graph-Based Matching
|
|
|
|
```go
|
|
// Use graph traversal for advanced matching
|
|
func findAdvancedMatches(
|
|
graphSyncService *service.GraphSyncService,
|
|
flowID string,
|
|
) {
|
|
ctx := context.Background()
|
|
|
|
// Find matches using Neo4j graph traversal
|
|
matches, err := graphSyncService.FindMatchesInGraph(ctx, flowID, 10.0)
|
|
if err != nil {
|
|
log.Printf("Graph matching failed, falling back to PostgreSQL: %v", err)
|
|
// Fallback to PostgreSQL-based matching
|
|
return
|
|
}
|
|
|
|
// Process graph matches
|
|
for _, match := range matches {
|
|
log.Printf("Match found: %s at %s (%.2f km away)",
|
|
match["target_flow_id"],
|
|
match["target_site_name"],
|
|
match["distance_km"],
|
|
)
|
|
}
|
|
}
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Query Performance
|
|
|
|
**PostgreSQL**: Excellent for:
|
|
|
|
- Spatial filtering (PostGIS indexes)
|
|
- JSONB queries (GIN indexes)
|
|
- Transaction-heavy workloads
|
|
- Exact point lookups
|
|
|
|
**Neo4j**: Excellent for:
|
|
|
|
- Multi-hop graph traversals
|
|
- Relationship pattern matching
|
|
- Network analysis
|
|
- Discovering indirect connections
|
|
|
|
### When to Use Each Database
|
|
|
|
**Use PostgreSQL When**:
|
|
|
|
- CRUD operations on single entities
|
|
- Spatial radius queries (GetWithinRadius)
|
|
- Transaction integrity is critical
|
|
- Querying by specific attributes
|
|
|
|
**Use Neo4j When**:
|
|
|
|
- Finding indirect symbiosis opportunities
|
|
- Multi-party matching (3+ organizations)
|
|
- Trust network analysis
|
|
- Supply chain relationship mapping
|
|
- Pattern-based discovery
|
|
|
|
### Scaling Strategy
|
|
|
|
**MVP** (current):
|
|
|
|
- Single PostgreSQL instance
|
|
- Single Neo4j instance
|
|
- Async sync (fire-and-forget)
|
|
|
|
**Scale Phase** (1000+ businesses):
|
|
|
|
- PostgreSQL read replicas
|
|
- Neo4j clustering (Enterprise)
|
|
- Event-driven sync via NATS/Kafka
|
|
- Eventual consistency model
|
|
|
|
## Future Enhancements
|
|
|
|
### Phase 2: Advanced Graph Features
|
|
|
|
1. **MATCHABLE_TO Relationship**:
|
|
- Pre-compute match relationships
|
|
- Store efficiency scores as edge properties
|
|
- Update on resource flow changes
|
|
|
|
2. **Trust Network**:
|
|
- `(Organization)-[:TRUSTS]->(Organization)`
|
|
- Trust score propagation
|
|
- Transitive trust calculations
|
|
|
|
3. **Multi-Party Matching**:
|
|
- 3+ party circular symbioses
|
|
- Cypher queries for complex patterns
|
|
- Optimization algorithms
|
|
|
|
4. **Knowledge Graph Integration**:
|
|
- Semantic relationships
|
|
- Taxonomy integration (NACE, EWC codes)
|
|
- GraphRAG for AI-enhanced matching
|
|
|
|
### Phase 3: Federation
|
|
|
|
**Zone-Based Architecture** (from concept):
|
|
|
|
- Regional Neo4j graphs (city zones, industrial parks)
|
|
- Global federation layer
|
|
- Cross-zone query protocol
|
|
- Data sovereignty compliance
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
**Neo4j Connection Refused**:
|
|
|
|
```
|
|
Error: failed to verify Neo4j connectivity: connection refused
|
|
```
|
|
|
|
**Solution**: Ensure Neo4j is running on specified URI, check credentials
|
|
|
|
**Schema Creation Fails**:
|
|
|
|
```
|
|
Error: failed to create constraint: syntax error
|
|
```
|
|
|
|
**Solution**: Verify Neo4j version supports `IF NOT EXISTS` syntax (4.0+)
|
|
|
|
**Sync Performance Degradation**:
|
|
**Solution**: Enable async sync (goroutines), batch updates, use event queue
|
|
|
|
### Monitoring
|
|
|
|
**Recommended Metrics**:
|
|
|
|
- Neo4j connection pool usage
|
|
- Sync operation latency
|
|
- Failed sync count
|
|
- Graph query response time
|
|
- Node/relationship count
|
|
|
|
## References
|
|
|
|
- **Concept Documentation**: `concept/09_graph_database_design.md`
|
|
- **Dev Guide**: `dev_guides/02_neo4j_driver.md`
|
|
- **Example Queries**: `concept/23_example_query_in_cypher_neo4j.md`
|
|
- **Neo4j Go Driver**: <https://github.com/neo4j/neo4j-go-driver>
|
|
- **Neo4j Cypher Manual**: <https://neo4j.com/docs/cypher-manual/current/>
|