package repository import ( "bugulma/backend/internal/domain" "context" "fmt" "time" "github.com/neo4j/neo4j-go-driver/v5/neo4j" ) // GraphAddressRepository manages Address nodes in Neo4j type GraphAddressRepository struct { driver neo4j.DriverWithContext database string } // NewGraphAddressRepository creates a new graph address repository func NewGraphAddressRepository(driver neo4j.DriverWithContext, dbName string) *GraphAddressRepository { return &GraphAddressRepository{ driver: driver, database: dbName, } } // SyncToGraph syncs an address to the graph database func (r *GraphAddressRepository) SyncToGraph(ctx context.Context, address *domain.Address) error { session := r.driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeWrite, DatabaseName: r.database, }) defer session.Close(ctx) // Use ExecuteWrite for automatic retry logic _, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) { cypher := ` MERGE (a:Address {id: $id}) SET a.street = $street, a.city = $city, a.region = $region, a.postal_code = $postal_code, a.country = $country, a.formatted_ru = $formatted_ru, a.formatted_en = $formatted_en, a.formatted_tt = $formatted_tt, a.latitude = $latitude, a.longitude = $longitude, a.address_type = $address_type, a.verified = $verified, a.source = $source, a.created_at = $created_at, a.updated_at = $updated_at RETURN a.id ` params := map[string]interface{}{ "id": address.ID, "street": address.Street, "city": address.City, "region": address.Region, "postal_code": address.PostalCode, "country": address.Country, "formatted_ru": address.FormattedRu, "formatted_en": address.FormattedEn, "formatted_tt": address.FormattedTt, "latitude": address.Latitude, "longitude": address.Longitude, "address_type": string(address.AddressType), "verified": address.Verified, "source": address.Source, "created_at": address.CreatedAt.Format(time.RFC3339), "updated_at": address.UpdatedAt.Format(time.RFC3339), } result, err := tx.Run(ctx, cypher, params) if err != nil { return nil, fmt.Errorf("failed to execute address merge: %w", err) } if err := result.Err(); err != nil { return nil, fmt.Errorf("failed to consume address result: %w", err) } return nil, nil }) return err } // SyncWithRelationships syncs an address and creates LOCATED_AT relationships // This is the recommended method for bulk sync as it handles both node and relationships atomically func (r *GraphAddressRepository) SyncWithRelationships(ctx context.Context, address *domain.Address, orgIDs []string, siteIDs []string) error { session := r.driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeWrite, DatabaseName: r.database, }) defer session.Close(ctx) // Use ExecuteWrite with a single transaction for atomicity _, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) { // First, create/update the Address node addressCypher := ` MERGE (a:Address {id: $id}) SET a.street = $street, a.city = $city, a.region = $region, a.postal_code = $postal_code, a.country = $country, a.formatted_ru = $formatted_ru, a.formatted_en = $formatted_en, a.formatted_tt = $formatted_tt, a.latitude = $latitude, a.longitude = $longitude, a.address_type = $address_type, a.verified = $verified, a.source = $source, a.created_at = $created_at, a.updated_at = $updated_at RETURN a.id ` params := map[string]interface{}{ "id": address.ID, "street": address.Street, "city": address.City, "region": address.Region, "postal_code": address.PostalCode, "country": address.Country, "formatted_ru": address.FormattedRu, "formatted_en": address.FormattedEn, "formatted_tt": address.FormattedTt, "latitude": address.Latitude, "longitude": address.Longitude, "address_type": string(address.AddressType), "verified": address.Verified, "source": address.Source, "created_at": address.CreatedAt.Format(time.RFC3339), "updated_at": address.UpdatedAt.Format(time.RFC3339), } result, err := tx.Run(ctx, addressCypher, params) if err != nil { return nil, fmt.Errorf("failed to execute address merge: %w", err) } if err := result.Err(); err != nil { return nil, fmt.Errorf("failed to consume address result: %w", err) } // Create Organization -> LOCATED_AT -> Address relationships using UNWIND for batch efficiency if len(orgIDs) > 0 { orgCypher := ` MATCH (a:Address {id: $address_id}) UNWIND $org_ids AS org_id MATCH (o:Organization {id: org_id}) MERGE (o)-[:LOCATED_AT]->(a) ` orgResult, err := tx.Run(ctx, orgCypher, map[string]interface{}{ "address_id": address.ID, "org_ids": orgIDs, }) if err != nil { return nil, fmt.Errorf("failed to create organization relationships: %w", err) } if err := orgResult.Err(); err != nil { return nil, fmt.Errorf("failed to consume organization relationship result: %w", err) } } // Create Site -> LOCATED_AT -> Address relationships using UNWIND for batch efficiency if len(siteIDs) > 0 { siteCypher := ` MATCH (a:Address {id: $address_id}) UNWIND $site_ids AS site_id MATCH (s:Site {id: site_id}) MERGE (s)-[:LOCATED_AT]->(a) ` siteResult, err := tx.Run(ctx, siteCypher, map[string]interface{}{ "address_id": address.ID, "site_ids": siteIDs, }) if err != nil { return nil, fmt.Errorf("failed to create site relationships: %w", err) } if err := siteResult.Err(); err != nil { return nil, fmt.Errorf("failed to consume site relationship result: %w", err) } } return nil, nil }) return err } // DeleteFromGraph deletes an address from the graph database func (r *GraphAddressRepository) DeleteFromGraph(ctx context.Context, id string) error { session := r.driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeWrite, DatabaseName: r.database, }) defer session.Close(ctx) _, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) { cypher := ` MATCH (a:Address {id: $id}) DETACH DELETE a ` result, err := tx.Run(ctx, cypher, map[string]interface{}{"id": id}) if err != nil { return nil, fmt.Errorf("failed to delete address: %w", err) } if err := result.Err(); err != nil { return nil, fmt.Errorf("failed to consume delete result: %w", err) } return nil, nil }) return err } // GetByID retrieves an address from the graph func (r *GraphAddressRepository) GetByID(ctx context.Context, id string) (*domain.Address, error) { session := r.driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, DatabaseName: r.database, }) defer session.Close(ctx) result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) { cypher := ` MATCH (a:Address {id: $id}) RETURN a ` result, err := tx.Run(ctx, cypher, map[string]interface{}{"id": id}) if err != nil { return nil, err } if result.Next(ctx) { record := result.Record() node, ok := record.Get("a") if !ok { return nil, fmt.Errorf("address node not found in result") } return r.nodeToAddress(node.(neo4j.Node)) } return nil, fmt.Errorf("address not found: %s", id) }) if err != nil { return nil, err } return result.(*domain.Address), nil } // GetNearby finds addresses within a radius using spatial point.distance func (r *GraphAddressRepository) GetNearby(ctx context.Context, lat, lng, radiusKm float64, limit int) ([]*domain.Address, error) { session := r.driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, DatabaseName: r.database, }) defer session.Close(ctx) result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) { cypher := ` MATCH (a:Address) WHERE a.latitude IS NOT NULL AND a.longitude IS NOT NULL WITH a, point.distance( point({latitude: $lat, longitude: $lng}), point({latitude: a.latitude, longitude: a.longitude}) ) / 1000.0 AS distance_km WHERE distance_km <= $radius_km RETURN a, distance_km ORDER BY distance_km LIMIT $limit ` params := map[string]interface{}{ "lat": lat, "lng": lng, "radius_km": radiusKm, "limit": limit, } result, err := tx.Run(ctx, cypher, params) if err != nil { return nil, err } var addresses []*domain.Address for result.Next(ctx) { record := result.Record() node, ok := record.Get("a") if !ok { continue } address, err := r.nodeToAddress(node.(neo4j.Node)) if err != nil { return nil, err } addresses = append(addresses, address) } if err := result.Err(); err != nil { return nil, err } return addresses, nil }) if err != nil { return nil, err } return result.([]*domain.Address), nil } // nodeToAddress converts a Neo4j node to an Address domain object func (r *GraphAddressRepository) nodeToAddress(node neo4j.Node) (*domain.Address, error) { props := node.Props address := &domain.Address{ ID: props["id"].(string), Street: getStringProp(props, "street"), City: getStringProp(props, "city"), Region: getStringProp(props, "region"), PostalCode: getStringProp(props, "postal_code"), Country: getStringProp(props, "country"), FormattedRu: getStringProp(props, "formatted_ru"), FormattedEn: getStringProp(props, "formatted_en"), FormattedTt: getStringProp(props, "formatted_tt"), Source: getStringProp(props, "source"), Verified: getBoolProp(props, "verified"), } // Parse address type enum if typeStr, ok := props["address_type"].(string); ok { address.AddressType = domain.AddressType(typeStr) } // Parse numeric fields if val, ok := props["latitude"].(float64); ok { address.Latitude = val } if val, ok := props["longitude"].(float64); ok { address.Longitude = val } // Parse timestamps if createdAt, ok := props["created_at"].(string); ok { if t, err := time.Parse(time.RFC3339, createdAt); err == nil { address.CreatedAt = t } } if updatedAt, ok := props["updated_at"].(string); ok { if t, err := time.Parse(time.RFC3339, updatedAt); err == nil { address.UpdatedAt = t } } return address, nil }