turash/bugulma/backend/internal/repository/graph_address_repository.go
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

377 lines
10 KiB
Go

package repository
import (
"bugulma/backend/internal/domain"
"context"
"fmt"
"time"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
// GraphAddressRepository manages Address nodes in Neo4j
type GraphAddressRepository struct {
driver neo4j.DriverWithContext
database string
}
// NewGraphAddressRepository creates a new graph address repository
func NewGraphAddressRepository(driver neo4j.DriverWithContext, dbName string) *GraphAddressRepository {
return &GraphAddressRepository{
driver: driver,
database: dbName,
}
}
// SyncToGraph syncs an address to the graph database
func (r *GraphAddressRepository) SyncToGraph(ctx context.Context, address *domain.Address) error {
session := r.driver.NewSession(ctx, neo4j.SessionConfig{
AccessMode: neo4j.AccessModeWrite,
DatabaseName: r.database,
})
defer session.Close(ctx)
// Use ExecuteWrite for automatic retry logic
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MERGE (a:Address {id: $id})
SET a.street = $street,
a.city = $city,
a.region = $region,
a.postal_code = $postal_code,
a.country = $country,
a.formatted_ru = $formatted_ru,
a.formatted_en = $formatted_en,
a.formatted_tt = $formatted_tt,
a.latitude = $latitude,
a.longitude = $longitude,
a.address_type = $address_type,
a.verified = $verified,
a.source = $source,
a.created_at = $created_at,
a.updated_at = $updated_at
RETURN a.id
`
params := map[string]interface{}{
"id": address.ID,
"street": address.Street,
"city": address.City,
"region": address.Region,
"postal_code": address.PostalCode,
"country": address.Country,
"formatted_ru": address.FormattedRu,
"formatted_en": address.FormattedEn,
"formatted_tt": address.FormattedTt,
"latitude": address.Latitude,
"longitude": address.Longitude,
"address_type": string(address.AddressType),
"verified": address.Verified,
"source": address.Source,
"created_at": address.CreatedAt.Format(time.RFC3339),
"updated_at": address.UpdatedAt.Format(time.RFC3339),
}
result, err := tx.Run(ctx, cypher, params)
if err != nil {
return nil, fmt.Errorf("failed to execute address merge: %w", err)
}
if err := result.Err(); err != nil {
return nil, fmt.Errorf("failed to consume address result: %w", err)
}
return nil, nil
})
return err
}
// SyncWithRelationships syncs an address and creates LOCATED_AT relationships
// This is the recommended method for bulk sync as it handles both node and relationships atomically
func (r *GraphAddressRepository) SyncWithRelationships(ctx context.Context, address *domain.Address, orgIDs []string, siteIDs []string) error {
session := r.driver.NewSession(ctx, neo4j.SessionConfig{
AccessMode: neo4j.AccessModeWrite,
DatabaseName: r.database,
})
defer session.Close(ctx)
// Use ExecuteWrite with a single transaction for atomicity
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
// First, create/update the Address node
addressCypher := `
MERGE (a:Address {id: $id})
SET a.street = $street,
a.city = $city,
a.region = $region,
a.postal_code = $postal_code,
a.country = $country,
a.formatted_ru = $formatted_ru,
a.formatted_en = $formatted_en,
a.formatted_tt = $formatted_tt,
a.latitude = $latitude,
a.longitude = $longitude,
a.address_type = $address_type,
a.verified = $verified,
a.source = $source,
a.created_at = $created_at,
a.updated_at = $updated_at
RETURN a.id
`
params := map[string]interface{}{
"id": address.ID,
"street": address.Street,
"city": address.City,
"region": address.Region,
"postal_code": address.PostalCode,
"country": address.Country,
"formatted_ru": address.FormattedRu,
"formatted_en": address.FormattedEn,
"formatted_tt": address.FormattedTt,
"latitude": address.Latitude,
"longitude": address.Longitude,
"address_type": string(address.AddressType),
"verified": address.Verified,
"source": address.Source,
"created_at": address.CreatedAt.Format(time.RFC3339),
"updated_at": address.UpdatedAt.Format(time.RFC3339),
}
result, err := tx.Run(ctx, addressCypher, params)
if err != nil {
return nil, fmt.Errorf("failed to execute address merge: %w", err)
}
if err := result.Err(); err != nil {
return nil, fmt.Errorf("failed to consume address result: %w", err)
}
// Create Organization -> LOCATED_AT -> Address relationships using UNWIND for batch efficiency
if len(orgIDs) > 0 {
orgCypher := `
MATCH (a:Address {id: $address_id})
UNWIND $org_ids AS org_id
MATCH (o:Organization {id: org_id})
MERGE (o)-[:LOCATED_AT]->(a)
`
orgResult, err := tx.Run(ctx, orgCypher, map[string]interface{}{
"address_id": address.ID,
"org_ids": orgIDs,
})
if err != nil {
return nil, fmt.Errorf("failed to create organization relationships: %w", err)
}
if err := orgResult.Err(); err != nil {
return nil, fmt.Errorf("failed to consume organization relationship result: %w", err)
}
}
// Create Site -> LOCATED_AT -> Address relationships using UNWIND for batch efficiency
if len(siteIDs) > 0 {
siteCypher := `
MATCH (a:Address {id: $address_id})
UNWIND $site_ids AS site_id
MATCH (s:Site {id: site_id})
MERGE (s)-[:LOCATED_AT]->(a)
`
siteResult, err := tx.Run(ctx, siteCypher, map[string]interface{}{
"address_id": address.ID,
"site_ids": siteIDs,
})
if err != nil {
return nil, fmt.Errorf("failed to create site relationships: %w", err)
}
if err := siteResult.Err(); err != nil {
return nil, fmt.Errorf("failed to consume site relationship result: %w", err)
}
}
return nil, nil
})
return err
}
// DeleteFromGraph deletes an address from the graph database
func (r *GraphAddressRepository) DeleteFromGraph(ctx context.Context, id string) error {
session := r.driver.NewSession(ctx, neo4j.SessionConfig{
AccessMode: neo4j.AccessModeWrite,
DatabaseName: r.database,
})
defer session.Close(ctx)
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (a:Address {id: $id})
DETACH DELETE a
`
result, err := tx.Run(ctx, cypher, map[string]interface{}{"id": id})
if err != nil {
return nil, fmt.Errorf("failed to delete address: %w", err)
}
if err := result.Err(); err != nil {
return nil, fmt.Errorf("failed to consume delete result: %w", err)
}
return nil, nil
})
return err
}
// GetByID retrieves an address from the graph
func (r *GraphAddressRepository) GetByID(ctx context.Context, id string) (*domain.Address, error) {
session := r.driver.NewSession(ctx, neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: r.database,
})
defer session.Close(ctx)
result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (a:Address {id: $id})
RETURN a
`
result, err := tx.Run(ctx, cypher, map[string]interface{}{"id": id})
if err != nil {
return nil, err
}
if result.Next(ctx) {
record := result.Record()
node, ok := record.Get("a")
if !ok {
return nil, fmt.Errorf("address node not found in result")
}
return r.nodeToAddress(node.(neo4j.Node))
}
return nil, fmt.Errorf("address not found: %s", id)
})
if err != nil {
return nil, err
}
return result.(*domain.Address), nil
}
// GetNearby finds addresses within a radius using spatial point.distance
func (r *GraphAddressRepository) GetNearby(ctx context.Context, lat, lng, radiusKm float64, limit int) ([]*domain.Address, error) {
session := r.driver.NewSession(ctx, neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
DatabaseName: r.database,
})
defer session.Close(ctx)
result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
cypher := `
MATCH (a:Address)
WHERE a.latitude IS NOT NULL AND a.longitude IS NOT NULL
WITH a, point.distance(
point({latitude: $lat, longitude: $lng}),
point({latitude: a.latitude, longitude: a.longitude})
) / 1000.0 AS distance_km
WHERE distance_km <= $radius_km
RETURN a, distance_km
ORDER BY distance_km
LIMIT $limit
`
params := map[string]interface{}{
"lat": lat,
"lng": lng,
"radius_km": radiusKm,
"limit": limit,
}
result, err := tx.Run(ctx, cypher, params)
if err != nil {
return nil, err
}
var addresses []*domain.Address
for result.Next(ctx) {
record := result.Record()
node, ok := record.Get("a")
if !ok {
continue
}
address, err := r.nodeToAddress(node.(neo4j.Node))
if err != nil {
return nil, err
}
addresses = append(addresses, address)
}
if err := result.Err(); err != nil {
return nil, err
}
return addresses, nil
})
if err != nil {
return nil, err
}
return result.([]*domain.Address), nil
}
// nodeToAddress converts a Neo4j node to an Address domain object
func (r *GraphAddressRepository) nodeToAddress(node neo4j.Node) (*domain.Address, error) {
props := node.Props
address := &domain.Address{
ID: props["id"].(string),
Street: getStringProp(props, "street"),
City: getStringProp(props, "city"),
Region: getStringProp(props, "region"),
PostalCode: getStringProp(props, "postal_code"),
Country: getStringProp(props, "country"),
FormattedRu: getStringProp(props, "formatted_ru"),
FormattedEn: getStringProp(props, "formatted_en"),
FormattedTt: getStringProp(props, "formatted_tt"),
Source: getStringProp(props, "source"),
Verified: getBoolProp(props, "verified"),
}
// Parse address type enum
if typeStr, ok := props["address_type"].(string); ok {
address.AddressType = domain.AddressType(typeStr)
}
// Parse numeric fields
if val, ok := props["latitude"].(float64); ok {
address.Latitude = val
}
if val, ok := props["longitude"].(float64); ok {
address.Longitude = val
}
// Parse timestamps
if createdAt, ok := props["created_at"].(string); ok {
if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
address.CreatedAt = t
}
}
if updatedAt, ok := props["updated_at"].(string); ok {
if t, err := time.Parse(time.RFC3339, updatedAt); err == nil {
address.UpdatedAt = t
}
}
return address, nil
}