#!/usr/bin/env python3 """ Data Analysis Script for Tercul Database Migration This script analyzes the current SQLite database to understand data quality, relationships, and prepare for migration to the Go/GORM schema. """ import sqlite3 import json import re from datetime import datetime from typing import Dict, List, Any, Optional from dataclasses import dataclass from pathlib import Path @dataclass class TableStats: """Statistics for a database table""" name: str record_count: int sample_records: List[Dict[str, Any]] column_info: List[Dict[str, Any]] foreign_keys: List[Dict[str, Any]] data_quality_issues: List[str] class DataAnalyzer: def __init__(self, db_path: str): self.db_path = db_path self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row self.stats: Dict[str, TableStats] = {} def __del__(self): if hasattr(self, 'conn'): self.conn.close() def get_table_names(self) -> List[str]: """Get all table names from the database""" cursor = self.conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") return [row[0] for row in cursor.fetchall()] def get_table_info(self, table_name: str) -> List[Dict[str, Any]]: """Get column information for a table""" cursor = self.conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = [] for row in cursor.fetchall(): columns.append({ 'name': row[1], 'type': row[2], 'not_null': bool(row[3]), 'default_value': row[4], 'primary_key': bool(row[5]) }) return columns def get_foreign_keys(self, table_name: str) -> List[Dict[str, Any]]: """Get foreign key information for a table""" cursor = self.conn.cursor() cursor.execute(f"PRAGMA foreign_key_list({table_name})") foreign_keys = [] for row in cursor.fetchall(): foreign_keys.append({ 'id': row[0], 'seq': row[1], 'table': row[2], 'from': row[3], 'to': row[4], 'on_update': row[5], 'on_delete': row[6], 'match': row[7] }) return foreign_keys def get_record_count(self, table_name: str) -> int: """Get the number of records in a table""" cursor = self.conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM {table_name}") return cursor.fetchone()[0] def get_sample_records(self, table_name: str, limit: int = 3) -> List[Dict[str, Any]]: """Get sample records from a table""" cursor = self.conn.cursor() cursor.execute(f"SELECT * FROM {table_name} LIMIT {limit}") records = [] for row in cursor.fetchall(): record = {} for key in row.keys(): value = row[key] # Convert to JSON-serializable format if isinstance(value, datetime): value = value.isoformat() record[key] = value records.append(record) return records def analyze_uuid_format(self, table_name: str, id_column: str = 'id') -> List[str]: """Analyze UUID format in ID columns""" cursor = self.conn.cursor() cursor.execute(f"SELECT {id_column} FROM {table_name} WHERE {id_column} IS NOT NULL LIMIT 100") issues = [] uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE) for row in cursor.fetchall(): uuid_value = row[0] if uuid_value and not uuid_pattern.match(str(uuid_value)): issues.append(f"Invalid UUID format: {uuid_value}") return issues[:10] # Limit to first 10 issues def analyze_timestamp_format(self, table_name: str) -> List[str]: """Analyze timestamp format in datetime columns""" cursor = self.conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") timestamp_columns = [] for row in cursor.fetchall(): if 'time' in row[1].lower() or 'date' in row[1].lower() or 'at' in row[1].lower(): timestamp_columns.append(row[1]) issues = [] for column in timestamp_columns: cursor.execute(f"SELECT {column} FROM {table_name} WHERE {column} IS NOT NULL LIMIT 50") for row in cursor.fetchall(): timestamp_value = row[0] if timestamp_value: try: # Try to parse common timestamp formats if isinstance(timestamp_value, str): # Try different formats formats = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%fZ' ] parsed = False for fmt in formats: try: datetime.strptime(timestamp_value, fmt) parsed = True break except ValueError: continue if not parsed: issues.append(f"Invalid timestamp format in {column}: {timestamp_value}") except Exception as e: issues.append(f"Error parsing timestamp in {column}: {timestamp_value} - {e}") return issues[:10] # Limit to first 10 issues def analyze_relationships(self, table_name: str) -> List[str]: """Analyze relationship integrity""" cursor = self.conn.cursor() cursor.execute(f"PRAGMA foreign_key_list({table_name})") foreign_keys = cursor.fetchall() issues = [] for fk in foreign_keys: from_column = fk[3] to_table = fk[2] to_column = fk[4] # Check for orphaned references cursor.execute(f""" SELECT COUNT(*) FROM {table_name} t1 LEFT JOIN {to_table} t2 ON t1.{from_column} = t2.{to_column} WHERE t1.{from_column} IS NOT NULL AND t2.{to_column} IS NULL """) orphaned_count = cursor.fetchone()[0] if orphaned_count > 0: issues.append(f"Found {orphaned_count} orphaned references to {to_table}.{to_column}") return issues def analyze_content_format(self, table_name: str, content_columns: List[str]) -> List[str]: """Analyze content format in text columns""" cursor = self.conn.cursor() issues = [] for column in content_columns: cursor.execute(f"SELECT {column} FROM {table_name} WHERE {column} IS NOT NULL LIMIT 20") for row in cursor.fetchall(): content = row[0] if content and isinstance(content, str): # Check for YAML/Ruby format if '--- !ruby/hash:ActiveSupport::HashWithIndifferentAccess' in content: issues.append(f"YAML/Ruby format detected in {column}") # Check for HTML content if '
' in content or '