turash/bugulma/backend/internal/service/geographical_data_migration_service.go

568 lines
15 KiB
Go

package service
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"bugulma/backend/internal/domain"
_ "github.com/mattn/go-sqlite3" // SQLite driver
"gorm.io/gorm"
)
// GeographicalDataMigrationService handles migration of geographical data from external sources to PostgreSQL
type GeographicalDataMigrationService struct {
db *gorm.DB
geoFeatureRepo domain.GeographicalFeatureRepository
siteRepo domain.SiteRepository
sqliteDB *sql.DB
}
// MigrationProgress tracks the progress of a migration operation
type MigrationProgress struct {
TotalRecords int `json:"total_records"`
ProcessedRecords int `json:"processed_records"`
Successful int `json:"successful"`
Failed int `json:"failed"`
ProgressPercent float64 `json:"progress_percent"`
CurrentOperation string `json:"current_operation"`
ErrorMessages []string `json:"error_messages,omitempty"`
}
// NewGeographicalDataMigrationService creates a new migration service
func NewGeographicalDataMigrationService(
db *gorm.DB,
geoFeatureRepo domain.GeographicalFeatureRepository,
siteRepo domain.SiteRepository,
sqliteDBPath string,
) (*GeographicalDataMigrationService, error) {
// Check if SQLite file exists
if _, err := os.Stat(sqliteDBPath); os.IsNotExist(err) {
return nil, fmt.Errorf("SQLite database file does not exist: %s", sqliteDBPath)
}
// Open SQLite database
sqliteDB, err := sql.Open("sqlite3", sqliteDBPath)
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
return &GeographicalDataMigrationService{
db: db,
geoFeatureRepo: geoFeatureRepo,
siteRepo: siteRepo,
sqliteDB: sqliteDB,
}, nil
}
// Close closes the SQLite database connection
func (s *GeographicalDataMigrationService) Close() error {
if s.sqliteDB != nil {
return s.sqliteDB.Close()
}
return nil
}
// MigrateBuildingPolygons upgrades existing sites with polygon geometries from OSM building data
func (s *GeographicalDataMigrationService) MigrateBuildingPolygons(ctx context.Context) (*MigrationProgress, error) {
progress := &MigrationProgress{
CurrentOperation: "Migrating Building Polygons",
ErrorMessages: []string{},
}
// Query OSM buildings from SQLite
rows, err := s.sqliteDB.Query(`
SELECT id, geometry, properties, osm_type, osm_id
FROM osm_features
WHERE feature_type = 'building'
`)
if err != nil {
return nil, fmt.Errorf("failed to query buildings: %w", err)
}
defer rows.Close()
var buildings []struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
for rows.Next() {
var b struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
if err := rows.Scan(&b.ID, &b.Geometry, &b.Properties, &b.OSMType, &b.OSMID); err != nil {
progress.ErrorMessages = append(progress.ErrorMessages, fmt.Sprintf("Failed to scan building row: %v", err))
continue
}
buildings = append(buildings, b)
}
progress.TotalRecords = len(buildings)
if progress.TotalRecords == 0 {
progress.ProgressPercent = 100.0
}
// Process each building
for i, building := range buildings {
progress.ProcessedRecords = i + 1
progress.ProgressPercent = float64(i+1) / float64(len(buildings)) * 100
// Try to match with existing site by ID or create new geographical feature
if err := s.processBuildingGeometry(ctx, building); err != nil {
progress.Failed++
progress.ErrorMessages = append(progress.ErrorMessages, fmt.Sprintf("Building %s: %v", building.ID, err))
} else {
progress.Successful++
}
}
return progress, nil
}
// processBuildingGeometry processes a single building geometry
func (s *GeographicalDataMigrationService) processBuildingGeometry(ctx context.Context, building struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}) error {
// First, try to find if this building corresponds to an existing site
// Sites might have IDs that match OSM building IDs
existingSite, err := s.siteRepo.GetByID(ctx, building.ID)
if err == nil && existingSite != nil {
// Update the site with polygon geometry
return s.updateSiteWithPolygon(ctx, existingSite.ID, building.Geometry, building.Properties)
}
// If no matching site, create as geographical feature with geometry in one query
featureID := fmt.Sprintf("building_%s", building.ID)
name := s.extractNameFromProperties(building.Properties)
properties := s.parseProperties(building.Properties)
query := `
INSERT INTO geographical_features (
id, name, feature_type, osm_type, osm_id, properties, source, quality_score, geometry
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ST_GeomFromGeoJSON(?))
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
osm_type = EXCLUDED.osm_type,
osm_id = EXCLUDED.osm_id,
properties = EXCLUDED.properties,
source = EXCLUDED.source,
quality_score = EXCLUDED.quality_score,
geometry = EXCLUDED.geometry,
updated_at = NOW()
`
result := s.db.WithContext(ctx).Exec(query,
featureID,
name,
string(domain.GeographicalFeatureTypeLandUse),
building.OSMType,
building.OSMID,
properties,
"osm_buildings",
0.9,
building.Geometry,
)
if result.Error != nil {
return fmt.Errorf("failed to insert building feature: %w", result.Error)
}
return nil
}
// updateSiteWithPolygon updates an existing site with polygon geometry
func (s *GeographicalDataMigrationService) updateSiteWithPolygon(ctx context.Context, siteID, geometry, properties string) error {
// Add footprint_geometry column to sites if it doesn't exist
if err := s.ensureFootprintGeometryColumn(); err != nil {
return fmt.Errorf("failed to ensure footprint column: %w", err)
}
// Update the site with polygon geometry
query := `
UPDATE sites
SET footprint_geometry = ST_GeomFromGeoJSON(?),
updated_at = NOW()
WHERE id = ?
`
result := s.db.WithContext(ctx).Exec(query, geometry, siteID)
if result.Error != nil {
return fmt.Errorf("failed to update site geometry: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("site %s not found", siteID)
}
return nil
}
// ensureFootprintGeometryColumn ensures the footprint_geometry column exists
func (s *GeographicalDataMigrationService) ensureFootprintGeometryColumn() error {
// Check if column exists
var exists bool
query := `
SELECT EXISTS(
SELECT 1 FROM information_schema.columns
WHERE table_name = 'sites' AND column_name = 'footprint_geometry'
)
`
if err := s.db.Raw(query).Scan(&exists).Error; err != nil {
return err
}
if !exists {
// Add the column
addColumnQuery := `
ALTER TABLE sites ADD COLUMN footprint_geometry GEOMETRY(POLYGON, 4326)
`
if err := s.db.Exec(addColumnQuery).Error; err != nil {
return fmt.Errorf("failed to add footprint_geometry column: %w", err)
}
// Add index
indexQuery := `
CREATE INDEX IF NOT EXISTS idx_sites_footprint ON sites USING GIST (footprint_geometry)
`
if err := s.db.Exec(indexQuery).Error; err != nil {
return fmt.Errorf("failed to create footprint index: %w", err)
}
}
return nil
}
// MigrateRoadNetwork imports road network data as geographical features
func (s *GeographicalDataMigrationService) MigrateRoadNetwork(ctx context.Context) (*MigrationProgress, error) {
progress := &MigrationProgress{
CurrentOperation: "Migrating Road Network",
ErrorMessages: []string{},
}
// Query road features from SQLite
rows, err := s.sqliteDB.Query(`
SELECT id, geometry, properties, osm_type, osm_id
FROM osm_features
WHERE feature_type = 'road'
`)
if err != nil {
return nil, fmt.Errorf("failed to query roads: %w", err)
}
defer rows.Close()
var roads []struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
for rows.Next() {
var r struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
if err := rows.Scan(&r.ID, &r.Geometry, &r.Properties, &r.OSMType, &r.OSMID); err != nil {
progress.ErrorMessages = append(progress.ErrorMessages, fmt.Sprintf("Failed to scan road row: %v", err))
continue
}
roads = append(roads, r)
}
progress.TotalRecords = len(roads)
// Process roads in batches
batchSize := 100
for i := 0; i < len(roads); i += batchSize {
end := i + batchSize
if end > len(roads) {
end = len(roads)
}
batch := roads[i:end]
if err := s.processRoadBatch(ctx, batch, progress); err != nil {
return progress, err
}
}
return progress, nil
}
// processRoadBatch processes a batch of road features
func (s *GeographicalDataMigrationService) processRoadBatch(ctx context.Context, roads []struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}, progress *MigrationProgress) error {
// Use raw SQL for bulk insert with geometries
tx := s.db.WithContext(ctx).Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
query := `
INSERT INTO geographical_features (
id, name, feature_type, osm_type, osm_id, properties, source, quality_score, geometry
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ST_GeomFromGeoJSON(?))
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
osm_type = EXCLUDED.osm_type,
osm_id = EXCLUDED.osm_id,
properties = EXCLUDED.properties,
source = EXCLUDED.source,
quality_score = EXCLUDED.quality_score,
geometry = EXCLUDED.geometry,
updated_at = NOW()
`
for _, road := range roads {
featureID := fmt.Sprintf("road_%s", road.ID)
name := s.extractNameFromProperties(road.Properties)
properties := s.parseProperties(road.Properties)
result := tx.Exec(query,
featureID,
name,
string(domain.GeographicalFeatureTypeRoad),
road.OSMType,
road.OSMID,
properties,
"osm_roads",
0.8,
road.Geometry,
)
if result.Error != nil {
tx.Rollback()
return fmt.Errorf("failed to insert road %s: %w", road.ID, result.Error)
}
}
if err := tx.Commit().Error; err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
progress.ProcessedRecords += len(roads)
progress.Successful += len(roads)
progress.ProgressPercent = float64(progress.ProcessedRecords) / float64(progress.TotalRecords) * 100
return nil
}
// MigrateGreenSpaces imports green space polygons
func (s *GeographicalDataMigrationService) MigrateGreenSpaces(ctx context.Context) (*MigrationProgress, error) {
progress := &MigrationProgress{
CurrentOperation: "Migrating Green Spaces",
ErrorMessages: []string{},
}
// Query green spaces from SQLite
rows, err := s.sqliteDB.Query(`
SELECT id, geometry, properties, osm_type, osm_id
FROM osm_features
WHERE feature_type = 'green_space'
`)
if err != nil {
return nil, fmt.Errorf("failed to query green spaces: %w", err)
}
defer rows.Close()
var greenSpaces []struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
for rows.Next() {
var gs struct {
ID string
Geometry string
Properties string
OSMType string
OSMID string
}
if err := rows.Scan(&gs.ID, &gs.Geometry, &gs.Properties, &gs.OSMType, &gs.OSMID); err != nil {
progress.ErrorMessages = append(progress.ErrorMessages, fmt.Sprintf("Failed to scan green space row: %v", err))
continue
}
greenSpaces = append(greenSpaces, gs)
}
progress.TotalRecords = len(greenSpaces)
// Process green spaces with raw SQL
tx := s.db.WithContext(ctx).Begin()
if tx.Error != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", tx.Error)
}
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
query := `
INSERT INTO geographical_features (
id, name, feature_type, osm_type, osm_id, properties, source, quality_score, geometry
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ST_GeomFromGeoJSON(?))
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
osm_type = EXCLUDED.osm_type,
osm_id = EXCLUDED.osm_id,
properties = EXCLUDED.properties,
source = EXCLUDED.source,
quality_score = EXCLUDED.quality_score,
geometry = EXCLUDED.geometry,
updated_at = NOW()
`
for i, greenSpace := range greenSpaces {
progress.ProcessedRecords = i + 1
progress.ProgressPercent = float64(i+1) / float64(len(greenSpaces)) * 100
featureID := fmt.Sprintf("greenspace_%s", greenSpace.ID)
name := s.extractNameFromProperties(greenSpace.Properties)
properties := s.parseProperties(greenSpace.Properties)
result := tx.Exec(query,
featureID,
name,
string(domain.GeographicalFeatureTypeGreenSpace),
greenSpace.OSMType,
greenSpace.OSMID,
properties,
"osm_green_spaces",
0.9,
greenSpace.Geometry,
)
if result.Error != nil {
tx.Rollback()
progress.Failed++
progress.ErrorMessages = append(progress.ErrorMessages, fmt.Sprintf("Green space %s: %v", greenSpace.ID, result.Error))
continue
}
progress.Successful++
}
if err := tx.Commit().Error; err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return progress, nil
}
// Helper methods
// insertGeometryForFeature inserts geometry for a geographical feature via raw SQL
func (s *GeographicalDataMigrationService) insertGeometryForFeature(ctx context.Context, featureID, geoJSON string) error {
query := `
UPDATE geographical_features
SET geometry = ST_GeomFromGeoJSON(?)
WHERE id = ?
`
result := s.db.WithContext(ctx).Exec(query, geoJSON, featureID)
if result.Error != nil {
return result.Error
}
return nil
}
// extractNameFromProperties extracts name from OSM properties JSON
func (s *GeographicalDataMigrationService) extractNameFromProperties(properties string) string {
if properties == "" {
return ""
}
var props map[string]interface{}
if err := json.Unmarshal([]byte(properties), &props); err != nil {
return ""
}
if name, ok := props["name"].(string); ok {
return name
}
return ""
}
// parseProperties parses OSM properties JSON into datatypes.JSON
func (s *GeographicalDataMigrationService) parseProperties(properties string) []byte {
if properties == "" {
return []byte("{}")
}
// Validate JSON
var props interface{}
if err := json.Unmarshal([]byte(properties), &props); err != nil {
return []byte("{}")
}
return []byte(properties)
}
// GetMigrationStatistics returns comprehensive statistics about migrated geographical data
func (s *GeographicalDataMigrationService) GetMigrationStatistics(ctx context.Context) (map[string]interface{}, error) {
stats := make(map[string]interface{})
// Building statistics
buildingStats, err := s.geoFeatureRepo.GetRoadNetworkStatistics(ctx)
if err == nil {
stats["road_network"] = buildingStats
}
// Green space statistics
greenSpaceArea, err := s.geoFeatureRepo.GetTotalArea(ctx, domain.GeographicalFeatureTypeGreenSpace, -90, -180, 90, 180)
if err == nil {
stats["green_space_total_area_km2"] = greenSpaceArea / 1000000 // convert m2 to km2
}
// Site geometry statistics
var siteStats struct {
SitesWithPolygons int64
TotalSites int64
}
s.db.Raw("SELECT COUNT(*) as total_sites FROM sites").Scan(&siteStats.TotalSites)
s.db.Raw("SELECT COUNT(*) as sites_with_polygons FROM sites WHERE footprint_geometry IS NOT NULL").Scan(&siteStats.SitesWithPolygons)
stats["sites"] = map[string]interface{}{
"total_sites": siteStats.TotalSites,
"sites_with_polygons": siteStats.SitesWithPolygons,
"polygon_coverage_percent": float64(siteStats.SitesWithPolygons) / float64(siteStats.TotalSites) * 100,
}
return stats, nil
}