turash/bugulma/backend/cmd/cli/cmd/sync.go

441 lines
12 KiB
Go

package cmd
import (
"context"
"fmt"
"log"
"time"
"bugulma/backend/cmd/cli/internal"
"bugulma/backend/internal/domain"
"bugulma/backend/internal/repository"
"bugulma/backend/internal/service"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
"github.com/spf13/cobra"
"gorm.io/gorm"
)
var (
syncDryRun bool
syncClearFirst bool
syncSQLitePath string
syncGeographicalType string
)
var syncCmd = &cobra.Command{
Use: "sync",
Short: "Data synchronization operations",
Long: `Synchronize data between different data stores.
Supports syncing PostgreSQL data to Neo4j graph database and geographical data migration.`,
}
var syncGraphCmd = &cobra.Command{
Use: "graph",
Short: "Sync PostgreSQL to Neo4j",
Long: `Synchronize data from PostgreSQL to Neo4j graph database.
This command fetches organizations, sites, and addresses from PostgreSQL
and creates corresponding nodes and relationships in Neo4j.`,
RunE: runSyncGraph,
}
var syncGeographicalCmd = &cobra.Command{
Use: "geographical",
Short: "Import geographical data from SQLite",
Long: `Import geographical data from SQLite database to PostgreSQL.
Supports importing building polygons, road networks, and green spaces from OSM data.`,
RunE: runSyncGeographical,
}
func init() {
syncGraphCmd.Flags().BoolVar(&syncDryRun, "dry-run", false, "Show what would be synced without actually syncing")
syncGraphCmd.Flags().BoolVar(&syncClearFirst, "clear-first", false, "Clear existing Neo4j data before syncing")
syncGeographicalCmd.Flags().StringVar(&syncSQLitePath, "sqlite-path", "/Users/damirmukimov/projects/city_resource_graph/data/bugulma_city_data.db", "Path to SQLite database file")
syncGeographicalCmd.Flags().StringVar(&syncGeographicalType, "type", "all", "Type of geographical data to import (buildings, roads, green_spaces, all)")
syncCmd.AddCommand(syncGraphCmd)
syncCmd.AddCommand(syncGeographicalCmd)
}
func runSyncGraph(cmd *cobra.Command, args []string) error {
cfg, err := getConfig()
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
if !cfg.Neo4jEnabled {
return fmt.Errorf("Neo4j is not enabled in configuration")
}
if isVerbose() {
log.Println("Starting PostgreSQL to Neo4j sync...")
}
// Connect to PostgreSQL
db, err := internal.ConnectPostgres(cfg)
if err != nil {
return fmt.Errorf("failed to connect to PostgreSQL: %w", err)
}
if isVerbose() {
log.Println("✓ Connected to PostgreSQL")
}
// Connect to Neo4j
driver, err := internal.ConnectNeo4j(cfg)
if err != nil {
return fmt.Errorf("failed to connect to Neo4j: %w", err)
}
if driver == nil {
return fmt.Errorf("Neo4j connection failed")
}
defer driver.Close(context.Background())
if isVerbose() {
log.Println("✓ Connected to Neo4j")
}
// Initialize repositories
orgRepo := repository.NewOrganizationRepository(db)
siteRepo := repository.NewSiteRepository(db)
addressRepo := repository.NewAddressRepository(db)
orgGraphRepo := repository.NewGraphOrganizationRepository(driver, cfg.Neo4jDatabase)
siteGraphRepo := repository.NewGraphSiteRepository(driver, cfg.Neo4jDatabase)
addressGraphRepo := repository.NewGraphAddressRepository(driver, cfg.Neo4jDatabase)
flowGraphRepo := repository.NewGraphResourceFlowRepository(driver, cfg.Neo4jDatabase)
matchGraphRepo := repository.NewGraphMatchRepository(driver, cfg.Neo4jDatabase)
sharedAssetGraphRepo := repository.NewGraphSharedAssetRepository(driver, cfg.Neo4jDatabase)
productGraphRepo := repository.NewGraphProductRepository(driver, cfg.Neo4jDatabase)
serviceGraphRepo := repository.NewGraphServiceRepository(driver, cfg.Neo4jDatabase)
// Initialize sync service
syncService := service.NewGraphSyncService(
orgGraphRepo,
siteGraphRepo,
addressGraphRepo,
flowGraphRepo,
matchGraphRepo,
sharedAssetGraphRepo,
productGraphRepo,
serviceGraphRepo,
)
if syncDryRun {
return runDryRun(db, orgRepo, siteRepo, addressRepo)
}
// Clear existing Neo4j data if requested
if syncClearFirst {
if isVerbose() {
log.Println("Clearing existing Neo4j data...")
}
ctx := context.Background()
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: cfg.Neo4jDatabase})
_, err = session.Run(ctx, "MATCH (n) DETACH DELETE n", nil)
session.Close(ctx)
if err != nil {
return fmt.Errorf("failed to clear Neo4j: %w", err)
}
if isVerbose() {
log.Println("✓ Cleared Neo4j database")
}
}
ctx := context.Background()
// Fetch all organizations from PostgreSQL
if isVerbose() {
log.Println("Fetching organizations from PostgreSQL...")
}
orgs, err := orgRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch organizations: %w", err)
}
if !isQuiet() {
log.Printf("✓ Fetched %d organizations", len(orgs))
}
// Fetch all sites from PostgreSQL
if isVerbose() {
log.Println("Fetching sites from PostgreSQL...")
}
sites, err := siteRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch sites: %w", err)
}
if !isQuiet() {
log.Printf("✓ Fetched %d sites", len(sites))
}
// Fetch all addresses from PostgreSQL
if isVerbose() {
log.Println("Fetching addresses from PostgreSQL...")
}
addresses, err := addressRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch addresses: %w", err)
}
if !isQuiet() {
log.Printf("✓ Fetched %d addresses", len(addresses))
}
// Sync organizations
if isVerbose() {
log.Println("Syncing organizations to Neo4j...")
}
startTime := time.Now()
if err := syncService.BulkSyncOrganizations(ctx, orgs); err != nil {
log.Printf("Warning: Bulk sync had issues: %v", err)
}
if !isQuiet() {
log.Printf("✓ Synced %d organizations in %v", len(orgs), time.Since(startTime))
}
// Sync sites
if isVerbose() {
log.Println("Syncing sites to Neo4j...")
}
startTime = time.Now()
if err := syncService.BulkSyncSites(ctx, sites); err != nil {
log.Printf("Warning: Bulk sync had issues: %v", err)
}
if !isQuiet() {
log.Printf("✓ Synced %d sites in %v", len(sites), time.Since(startTime))
}
// Sync addresses
if isVerbose() {
log.Println("Syncing addresses to Neo4j...")
}
startTime = time.Now()
if err := syncService.BulkSyncAddresses(ctx, addresses); err != nil {
log.Printf("Warning: Bulk sync had issues: %v", err)
}
if !isQuiet() {
log.Printf("✓ Synced %d addresses in %v", len(addresses), time.Since(startTime))
}
// Verify sync
if isVerbose() {
log.Println("Verifying sync...")
}
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: cfg.Neo4jDatabase})
defer session.Close(ctx)
result, err := session.Run(ctx, `
MATCH (o:Organization) WITH count(o) AS orgCount
MATCH (s:Site) WITH orgCount, count(s) AS siteCount
MATCH (a:Address) WITH orgCount, siteCount, count(a) AS addressCount
MATCH ()-[r:OPERATES_AT]->() WITH orgCount, siteCount, addressCount, count(r) AS relCount
RETURN orgCount, siteCount, addressCount, relCount
`, nil)
if err != nil {
return fmt.Errorf("failed to verify: %w", err)
}
if result.Next(ctx) {
record := result.Record()
orgCount, _ := record.Get("orgCount")
siteCount, _ := record.Get("siteCount")
addressCount, _ := record.Get("addressCount")
relCount, _ := record.Get("relCount")
if !isQuiet() {
fmt.Println("\n=== Sync Complete ===")
fmt.Printf("Organizations: %v\n", orgCount)
fmt.Printf("Sites: %v\n", siteCount)
fmt.Printf("Addresses: %v\n", addressCount)
fmt.Printf("OPERATES_AT relationships: %v\n", relCount)
}
}
if !isQuiet() {
log.Println("✓ Sync completed successfully!")
}
return nil
}
func runDryRun(db *gorm.DB, orgRepo domain.OrganizationRepository, siteRepo domain.SiteRepository, addressRepo domain.AddressRepository) error {
ctx := context.Background()
fmt.Println("=== Dry Run: What would be synced ===")
orgs, err := orgRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch organizations: %w", err)
}
fmt.Printf("Organizations: %d\n", len(orgs))
sites, err := siteRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch sites: %w", err)
}
fmt.Printf("Sites: %d\n", len(sites))
addresses, err := addressRepo.GetAll(ctx)
if err != nil {
return fmt.Errorf("failed to fetch addresses: %w", err)
}
fmt.Printf("Addresses: %d\n", len(addresses))
fmt.Println("\nRun without --dry-run to perform the actual sync.")
return nil
}
func runSyncGeographical(cmd *cobra.Command, args []string) error {
cfg, err := getConfig()
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
if isVerbose() {
log.Println("Starting geographical data migration...")
}
// Connect to PostgreSQL
db, err := internal.ConnectPostgres(cfg)
if err != nil {
return fmt.Errorf("failed to connect to PostgreSQL: %w", err)
}
if isVerbose() {
log.Println("✓ Connected to PostgreSQL")
}
// Initialize repositories
geoFeatureRepo := repository.NewGeographicalFeatureRepository(db)
siteRepo := repository.NewSiteRepository(db)
// Initialize geographical migration service
migrationService, err := service.NewGeographicalDataMigrationService(
db,
geoFeatureRepo,
siteRepo,
syncSQLitePath,
)
if err != nil {
return fmt.Errorf("failed to create migration service: %w", err)
}
defer migrationService.Close()
if isVerbose() {
log.Printf("✓ Created migration service with SQLite path: %s", syncSQLitePath)
}
// Run migration based on type
migrateBuildings := syncGeographicalType == "buildings" || syncGeographicalType == "all"
migrateRoads := syncGeographicalType == "roads" || syncGeographicalType == "all"
migrateGreenSpaces := syncGeographicalType == "green_spaces" || syncGeographicalType == "all"
if !migrateBuildings && !migrateRoads && !migrateGreenSpaces {
return fmt.Errorf("unknown geographical type: %s. Use: buildings, roads, green_spaces, or all", syncGeographicalType)
}
if migrateBuildings {
if isVerbose() {
log.Println("Migrating building polygons...")
}
progress, err := migrationService.MigrateBuildingPolygons(context.Background())
if err != nil {
return fmt.Errorf("building migration failed: %w", err)
}
printMigrationProgress("Buildings", progress)
}
if migrateRoads {
if isVerbose() {
log.Println("Migrating road network...")
}
progress, err := migrationService.MigrateRoadNetwork(context.Background())
if err != nil {
return fmt.Errorf("road migration failed: %w", err)
}
printMigrationProgress("Roads", progress)
}
if migrateGreenSpaces {
if isVerbose() {
log.Println("Migrating green spaces...")
}
progress, err := migrationService.MigrateGreenSpaces(context.Background())
if err != nil {
return fmt.Errorf("green space migration failed: %w", err)
}
printMigrationProgress("Green Spaces", progress)
}
// Print final statistics
if isVerbose() {
log.Println("Generating migration statistics...")
}
stats, err := migrationService.GetMigrationStatistics(context.Background())
if err == nil {
printMigrationStatistics(stats)
}
if !isQuiet() {
log.Println("✓ Geographical data migration completed successfully!")
}
return nil
}
func printMigrationProgress(operation string, progress *service.MigrationProgress) {
if isQuiet() {
return
}
fmt.Printf("\n=== %s Migration Progress ===\n", operation)
fmt.Printf("Total Records: %d\n", progress.TotalRecords)
fmt.Printf("Processed: %d\n", progress.ProcessedRecords)
fmt.Printf("Successful: %d\n", progress.Successful)
fmt.Printf("Failed: %d\n", progress.Failed)
fmt.Printf("Progress: %.1f%%\n", progress.ProgressPercent)
if len(progress.ErrorMessages) > 0 {
fmt.Printf("Errors: %d\n", len(progress.ErrorMessages))
for i, err := range progress.ErrorMessages {
if i >= 5 { // Limit error output
fmt.Printf("... and %d more errors\n", len(progress.ErrorMessages)-5)
break
}
fmt.Printf(" - %s\n", err)
}
}
}
func printMigrationStatistics(stats map[string]interface{}) {
if isQuiet() {
return
}
fmt.Println("\n=== Migration Statistics ===")
if sitesStats, ok := stats["sites"].(map[string]interface{}); ok {
fmt.Println("\nSites:")
if total, ok := sitesStats["total_sites"].(int64); ok {
fmt.Printf(" Total: %d\n", total)
}
if withPolygons, ok := sitesStats["sites_with_polygons"].(int64); ok {
fmt.Printf(" With Polygons: %d\n", withPolygons)
}
if coverage, ok := sitesStats["polygon_coverage_percent"].(float64); ok {
fmt.Printf(" Polygon Coverage: %.1f%%\n", coverage)
}
}
if greenSpaceArea, ok := stats["green_space_total_area_m2"].(float64); ok {
fmt.Printf("\nGreen Space Total Area: %.0f m²\n", greenSpaceArea)
}
if roadStats, ok := stats["roads"].(map[string]interface{}); ok {
fmt.Println("\nRoad Network:")
for key, value := range roadStats {
fmt.Printf(" %s: %v\n", key, value)
}
}
}