turash/bugulma/backend/internal/handler/graph_handler.go

421 lines
12 KiB
Go

package handler
import (
"bugulma/backend/internal/domain"
"bugulma/backend/internal/repository"
"bugulma/backend/internal/service"
"context"
"net/http"
"github.com/gin-gonic/gin"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
// GraphHandler handles graph database queries and relationship exploration
type GraphHandler struct {
driver neo4j.DriverWithContext
database string
graphSyncService *service.GraphSyncService
}
// NewGraphHandler creates a new graph handler
func NewGraphHandler(driver neo4j.DriverWithContext, database string, graphSyncService *service.GraphSyncService) *GraphHandler {
return &GraphHandler{
driver: driver,
database: database,
graphSyncService: graphSyncService,
}
}
// GetOrganizationNetwork retrieves the network of organizations connected through resource flows
// @Summary Get organization network
// @Tags graph
// @Produce json
// @Param organizationId path string true "Organization ID"
// @Param depth query int false "Depth of traversal" default(2)
// @Success 200 {object} map[string]interface{}
// @Router /api/graph/organizations/{organizationId}/network [get]
func (h *GraphHandler) GetOrganizationNetwork(c *gin.Context) {
orgID := c.Param("organizationId")
depth := c.DefaultQuery("depth", "2")
session := h.driver.NewSession(c.Request.Context(), neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: h.database,
})
defer session.Close(c.Request.Context())
result, err := session.ExecuteRead(c.Request.Context(), func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH path = (o:Organization {id: $org_id})-[*1..` + depth + `]-(connected)
WHERE connected:Organization OR connected:Site OR connected:ResourceFlow
RETURN path
LIMIT 100
`
queryResult, err := tx.Run(c.Request.Context(), cypher, map[string]interface{}{
"org_id": orgID,
})
if err != nil {
return nil, err
}
var paths []interface{}
for queryResult.Next(c.Request.Context()) {
record := queryResult.Record()
if path, ok := record.Get("path"); ok {
paths = append(paths, path)
}
}
// Convert Neo4j paths to standard graph format
return PathToGraphData(paths), nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
graphData := result.(*GraphData)
c.JSON(http.StatusOK, gin.H{
"organization_id": orgID,
"depth": depth,
"graph": graphData,
"node_count": len(graphData.Nodes),
"edge_count": len(graphData.Edges),
})
}
// FindShortestPath finds the shortest path between two organizations
// @Summary Find shortest path between organizations
// @Tags graph
// @Produce json
// @Param sourceId query string true "Source Organization ID"
// @Param targetId query string true "Target Organization ID"
// @Success 200 {object} map[string]interface{}
// @Router /api/graph/shortest-path [get]
func (h *GraphHandler) FindShortestPath(c *gin.Context) {
sourceID := c.Query("sourceId")
targetID := c.Query("targetId")
if sourceID == "" || targetID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "sourceId and targetId are required"})
return
}
session := h.driver.NewSession(c.Request.Context(), neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: h.database,
})
defer session.Close(c.Request.Context())
result, err := session.ExecuteRead(c.Request.Context(), func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (source:Organization {id: $source_id}), (target:Organization {id: $target_id})
MATCH path = shortestPath((source)-[*]-(target))
RETURN path, length(path) as pathLength
`
queryResult, err := tx.Run(c.Request.Context(), cypher, map[string]interface{}{
"source_id": sourceID,
"target_id": targetID,
})
if err != nil {
return nil, err
}
if queryResult.Next(c.Request.Context()) {
record := queryResult.Record()
pathInterface, _ := record.Get("path")
pathLength, _ := record.Get("pathLength")
// Convert path to graph data
var paths []interface{}
if pathInterface != nil {
paths = append(paths, pathInterface)
}
graphData := PathToGraphData(paths)
return gin.H{
"found": true,
"length": pathLength,
"graph": graphData,
"node_count": len(graphData.Nodes),
"edge_count": len(graphData.Edges),
}, nil
}
return gin.H{
"found": false,
"length": -1,
"graph": &GraphData{Nodes: []GraphNode{}, Edges: []GraphEdge{}},
}, nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, result)
}
// GetSpatialProximity finds organizations and sites near a location
// @Summary Get spatially proximate entities
// @Tags graph
// @Produce json
// @Param lat query float64 true "Latitude"
// @Param lng query float64 true "Longitude"
// @Param radius query float64 false "Radius in km" default(5.0)
// @Success 200 {object} map[string]interface{}
// @Router /api/graph/spatial-proximity [get]
func (h *GraphHandler) GetSpatialProximity(c *gin.Context) {
lat := c.Query("lat")
lng := c.Query("lng")
radius := c.DefaultQuery("radius", "5.0")
if lat == "" || lng == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "lat and lng are required"})
return
}
session := h.driver.NewSession(c.Request.Context(), neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: h.database,
})
defer session.Close(c.Request.Context())
result, err := session.ExecuteRead(c.Request.Context(), func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (s:Site)
WHERE s.latitude IS NOT NULL AND s.longitude IS NOT NULL
WITH s, point.distance(
point({latitude: toFloat($lat), longitude: toFloat($lng)}),
point({latitude: s.latitude, longitude: s.longitude})
) / 1000.0 AS distance_km
WHERE distance_km <= toFloat($radius)
OPTIONAL MATCH (o:Organization)-[:OPERATES_AT]->(s)
RETURN s.id as site_id, s.name as site_name,
collect(DISTINCT {id: o.id, name: o.name}) as organizations,
distance_km
ORDER BY distance_km
LIMIT 50
`
queryResult, err := tx.Run(c.Request.Context(), cypher, map[string]interface{}{
"lat": lat,
"lng": lng,
"radius": radius,
})
if err != nil {
return nil, err
}
var results []map[string]interface{}
for queryResult.Next(c.Request.Context()) {
record := queryResult.Record()
results = append(results, map[string]interface{}{
"site_id": record.Values[0],
"site_name": record.Values[1],
"organizations": record.Values[2],
"distance_km": record.Values[3],
})
}
return results, nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{
"center": gin.H{
"lat": lat,
"lng": lng,
},
"radius_km": radius,
"results": result,
})
}
// GetMatchingOpportunities finds potential symbiosis opportunities using graph patterns
// @Summary Get matching opportunities
// @Tags graph
// @Produce json
// @Param resourceType query string false "Resource type filter"
// @Success 200 {array} map[string]interface{}
// @Router /api/graph/matching-opportunities [get]
func (h *GraphHandler) GetMatchingOpportunities(c *gin.Context) {
resourceType := c.Query("resourceType")
session := h.driver.NewSession(c.Request.Context(), neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: h.database,
})
defer session.Close(c.Request.Context())
result, err := session.ExecuteRead(c.Request.Context(), func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (output:ResourceFlow {direction: 'output'})
MATCH (input:ResourceFlow {direction: 'input'})
WHERE output.type = input.type
`
if resourceType != "" {
cypher += ` AND output.type = $resource_type`
}
cypher += `
OPTIONAL MATCH (output)<-[:HAS_FLOW]-(outputOrg:Organization)
OPTIONAL MATCH (input)<-[:HAS_FLOW]-(inputOrg:Organization)
WHERE outputOrg.id <> inputOrg.id
RETURN
output.id as output_flow_id,
input.id as input_flow_id,
output.type as resource_type,
outputOrg.id as source_org_id,
outputOrg.name as source_org_name,
inputOrg.id as target_org_id,
inputOrg.name as target_org_name
LIMIT 100
`
params := map[string]interface{}{}
if resourceType != "" {
params["resource_type"] = resourceType
}
queryResult, err := tx.Run(c.Request.Context(), cypher, params)
if err != nil {
return nil, err
}
var opportunities []map[string]interface{}
for queryResult.Next(c.Request.Context()) {
record := queryResult.Record()
opportunities = append(opportunities, map[string]interface{}{
"output_flow_id": record.Values[0],
"input_flow_id": record.Values[1],
"resource_type": record.Values[2],
"source_org_id": record.Values[3],
"source_org_name": record.Values[4],
"target_org_id": record.Values[5],
"target_org_name": record.Values[6],
})
}
return opportunities, nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, result)
}
// SyncGraphDatabase triggers a full sync from PostgreSQL to Neo4j
// @Summary Sync graph database
// @Tags graph
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Router /api/graph/sync [post]
func (h *GraphHandler) SyncGraphDatabase(c *gin.Context) {
if h.graphSyncService == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "Graph sync service is not available",
})
return
}
// Trigger full sync - this would sync all organizations, sites, and resource flows
// For now, return success as sync is typically done incrementally via events
// Full sync would require iterating through all entities
c.JSON(http.StatusOK, gin.H{
"status": "sync_available",
"message": "Graph sync service is available. Sync happens incrementally via events.",
"note": "For full sync, use the sync CLI command",
})
}
// GetGraphStatistics returns graph database statistics
// @Summary Get graph statistics
// @Tags graph
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Router /api/graph/statistics [get]
func (h *GraphHandler) GetGraphStatistics(c *gin.Context) {
session := h.driver.NewSession(c.Request.Context(), neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: h.database,
})
defer session.Close(c.Request.Context())
result, err := session.ExecuteRead(c.Request.Context(), func(tx neo4j.ManagedTransaction) (interface{}, error) {
// Get node counts
nodeCypher := `
MATCH (n)
RETURN labels(n)[0] as label, count(*) as count
`
nodeResult, err := tx.Run(c.Request.Context(), nodeCypher, nil)
if err != nil {
return nil, err
}
nodeCounts := make(map[string]int)
for nodeResult.Next(c.Request.Context()) {
record := nodeResult.Record()
if label, ok := record.Get("label"); ok {
if count, ok := record.Get("count"); ok {
nodeCounts[label.(string)] = int(count.(int64))
}
}
}
// Get relationship counts
relCypher := `
MATCH ()-[r]->()
RETURN type(r) as rel_type, count(*) as count
`
relResult, err := tx.Run(c.Request.Context(), relCypher, nil)
if err != nil {
return nil, err
}
relCounts := make(map[string]int)
for relResult.Next(c.Request.Context()) {
record := relResult.Record()
if relType, ok := record.Get("rel_type"); ok {
if count, ok := record.Get("count"); ok {
relCounts[relType.(string)] = int(count.(int64))
}
}
}
return gin.H{
"nodes": nodeCounts,
"relationships": relCounts,
}, nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, result)
}
// Helper function to sync a single organization to graph
func syncOrganizationToGraph(ctx context.Context, driver neo4j.DriverWithContext, database string, org *domain.Organization) error {
repo := repository.NewGraphOrganizationRepository(driver, database)
return repo.SyncToGraph(ctx, org)
}