tercul-backend/scripts/prepare_migration.py
Damir Mukimov 4957117cb6 Initial commit: Tercul Go project with comprehensive architecture
- Core Go application with GraphQL API using gqlgen
- Comprehensive data models for literary works, authors, translations
- Repository pattern with caching layer
- Authentication and authorization system
- Linguistics analysis capabilities with multiple adapters
- Vector search integration with Weaviate
- Docker containerization support
- Python data migration and analysis scripts
- Clean architecture with proper separation of concerns
- Production-ready configuration and middleware
- Proper .gitignore excluding vendor/, database files, and build artifacts
2025-08-13 07:42:32 +02:00

996 lines
39 KiB
Python

#!/usr/bin/env python3
"""
Data Migration Preparation Script for Tercul
This script prepares data from the current SQLite database for migration
to the Go/GORM schema. It handles ID conversion, timestamp parsing,
content extraction, and relationship mapping.
"""
import sqlite3
import json
import re
import uuid
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import yaml
from bs4 import BeautifulSoup
import re
@dataclass
class MigrationMapping:
"""Mapping information for migration"""
old_id: str
new_id: int
table_name: str
class DataMigrationPreparer:
def __init__(self, source_db_path: str, output_dir: str = "migration_data"):
self.source_db_path = source_db_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.conn = sqlite3.connect(source_db_path)
self.conn.row_factory = sqlite3.Row
# ID mapping storage
self.id_mappings: Dict[str, Dict[str, int]] = {}
self.next_ids: Dict[str, int] = {}
# Initialize next IDs
self.next_ids = {
'countries': 1,
'authors': 1,
'works': 1,
'translations': 1,
'books': 1,
'users': 1,
'tags': 1,
'categories': 1
}
def __del__(self):
if hasattr(self, 'conn'):
self.conn.close()
def get_next_id(self, table_name: str) -> int:
"""Get the next available ID for a table"""
if table_name not in self.next_ids:
self.next_ids[table_name] = 1
current_id = self.next_ids[table_name]
self.next_ids[table_name] += 1
return current_id
def parse_timestamp(self, timestamp_str: str) -> Optional[str]:
"""Parse timestamp string to ISO format"""
if not timestamp_str:
return None
# Common timestamp formats
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%d',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%fZ'
]
for fmt in formats:
try:
dt = datetime.strptime(timestamp_str, fmt)
return dt.isoformat()
except ValueError:
continue
return None
def extract_yaml_content(self, content: str) -> Dict[str, Any]:
"""Extract content from Ruby object format"""
if not content or not isinstance(content, str):
return {}
# Check if it's Ruby object format
if '--- !ruby/hash:ActiveSupport::HashWithIndifferentAccess' in content:
try:
# Method 1: Try to extract text field directly using regex
text_match = re.search(r'text:\s*"([^"]*(?:\\"[^"]*)*)"', content, re.DOTALL)
if text_match:
text_content = text_match.group(1)
# Unescape the text content
text_content = text_content.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\')
return {'text': text_content}
# Method 2: Try to parse as YAML after cleaning
# First, unescape the content
unescaped = content.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\')
# Extract the YAML part
yaml_start = unescaped.find('---')
if yaml_start != -1:
yaml_content = unescaped[yaml_start:]
# Clean up the Ruby-specific parts
yaml_content = yaml_content.replace('!ruby/hash:ActiveSupport::HashWithIndifferentAccess', '')
# Parse YAML
data = yaml.safe_load(yaml_content)
return data or {}
except Exception as e:
# Method 3: Fallback - try to extract any text content
try:
# Look for any quoted content
text_match = re.search(r'text:\s*"([^"]*(?:\\"[^"]*)*)"', content, re.DOTALL)
if text_match:
text_content = text_match.group(1)
# Basic unescaping
text_content = text_content.replace('\\n', '\n').replace('\\"', '"')
return {'text': text_content}
except Exception as e2:
print(f"Error extracting text manually: {e2}")
return {}
# If not Ruby format, return as plain text
return {'text': content}
def html_to_markdown(self, content: str) -> str:
"""Convert HTML content to Markdown format"""
if not content or not isinstance(content, str):
return ""
try:
soup = BeautifulSoup(content, 'html.parser')
# Convert common HTML elements to Markdown
# Paragraphs
for p in soup.find_all('p'):
p.replace_with(f"\n\n{p.get_text()}\n\n")
# Headers
for i in range(1, 7):
for h in soup.find_all(f'h{i}'):
h.replace_with(f"\n\n{'#' * i} {h.get_text()}\n\n")
# Bold
for b in soup.find_all(['b', 'strong']):
b.replace_with(f"**{b.get_text()}**")
# Italic
for i in soup.find_all(['i', 'em']):
i.replace_with(f"*{i.get_text()}*")
# Lists
for ul in soup.find_all('ul'):
items = []
for li in ul.find_all('li'):
items.append(f"- {li.get_text()}")
ul.replace_with(f"\n\n{chr(10).join(items)}\n\n")
for ol in soup.find_all('ol'):
items = []
for i, li in enumerate(ol.find_all('li'), 1):
items.append(f"{i}. {li.get_text()}")
ol.replace_with(f"\n\n{chr(10).join(items)}\n\n")
# Blockquotes
for blockquote in soup.find_all('blockquote'):
lines = blockquote.get_text().split('\n')
quoted_lines = [f"> {line}" for line in lines if line.strip()]
blockquote.replace_with(f"\n\n{chr(10).join(quoted_lines)}\n\n")
# Links
for a in soup.find_all('a'):
href = a.get('href', '')
text = a.get_text()
a.replace_with(f"[{text}]({href})")
# Get the final text and clean up
result = soup.get_text()
# Clean up excessive whitespace
result = re.sub(r'\n\s*\n\s*\n', '\n\n', result)
result = result.strip()
return result
except Exception as e:
print(f"Error converting HTML to Markdown: {e}")
return content
def migrate_countries(self) -> List[Dict[str, Any]]:
"""Migrate countries data"""
print("Migrating countries...")
cursor = self.conn.cursor()
cursor.execute("""
SELECT c.id, c.created_at, c.updated_at,
ct.name, ct.language_code
FROM countries c
LEFT JOIN country_translations ct ON c.id = ct.country_id
ORDER BY c.id
""")
countries = {}
for row in cursor.fetchall():
country_id = row['id']
if country_id not in countries:
new_id = self.get_next_id('countries')
self.id_mappings.setdefault('countries', {})[country_id] = new_id
countries[country_id] = {
'id': new_id,
'name': '',
'code': '', # Will need to be generated or mapped
'phone_code': '',
'currency': '',
'continent': '',
'language': 'multi',
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
# Add translation data
if row['name'] and row['language_code']:
if row['language_code'] == 'en':
countries[country_id]['name'] = row['name']
# Generate country code from name
countries[country_id]['code'] = row['name'][:2].upper()
return list(countries.values())
def migrate_authors(self) -> List[Dict[str, Any]]:
"""Migrate authors data"""
print("Migrating authors...")
cursor = self.conn.cursor()
cursor.execute("""
SELECT a.id, a.date_of_birth, a.date_of_death, a.created_at, a.updated_at,
a.is_top, a.date_of_death_precision, a.date_of_birth_precision,
a.is_draft, a.custom_date_of_birth, a.custom_date_of_death, a.slug,
at.first_name, at.last_name, at.full_name, at.place_of_birth,
at.place_of_death, at.pen_names, at.language_code,
atr.body as biography
FROM authors a
LEFT JOIN author_translations at ON a.id = at.author_id
LEFT JOIN action_text_rich_texts atr ON at.id = atr.record_id AND atr.name = 'biography'
ORDER BY a.id
""")
authors = {}
for row in cursor.fetchall():
author_id = row['id']
if author_id not in authors:
new_id = self.get_next_id('authors')
self.id_mappings.setdefault('authors', {})[author_id] = new_id
authors[author_id] = {
'id': new_id,
'name': '',
'biography': '',
'birth_date': None,
'death_date': None,
'language': 'multi',
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
# Add translation data
if row['language_code'] == 'en':
if row['full_name']:
authors[author_id]['name'] = row['full_name']
elif row['first_name'] and row['last_name']:
authors[author_id]['name'] = f"{row['first_name']} {row['last_name']}"
# Parse dates
if row['date_of_birth']:
authors[author_id]['birth_date'] = self.parse_timestamp(row['date_of_birth'])
if row['date_of_death']:
authors[author_id]['death_date'] = self.parse_timestamp(row['date_of_death'])
# Add biography
if row['biography']:
content = self.extract_yaml_content(row['biography'])
if 'text' in content:
authors[author_id]['biography'] = self.html_to_markdown(content['text'])
return list(authors.values())
def migrate_works(self) -> List[Dict[str, Any]]:
"""Migrate works data"""
print("Migrating works...")
cursor = self.conn.cursor()
cursor.execute("""
SELECT w.id, w.author_id, w.date_created, w.age_restrictions,
w.literature_type, w.created_at, w.updated_at, w.is_top,
w.date_created_precision, w.is_draft, w.genres, w.slug
FROM works w
ORDER BY w.id
""")
works = []
for row in cursor.fetchall():
new_id = self.get_next_id('works')
self.id_mappings.setdefault('works', {})[row['id']] = new_id
# Map author_id
author_id = None
if row['author_id'] and row['author_id'] in self.id_mappings.get('authors', {}):
author_id = self.id_mappings['authors'][row['author_id']]
work = {
'id': new_id,
'title': '', # Will be filled from translations
'content': '', # Will be filled from translations
'description': '',
'author_id': author_id,
'published_at': self.parse_timestamp(row['date_created']),
'status': 'draft' if row['is_draft'] else 'published',
'language': 'multi',
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
works.append(work)
return works
def migrate_translations(self) -> List[Dict[str, Any]]:
"""Migrate all translations data (polymorphic approach)"""
print("Migrating translations...")
translations = []
# Migrate work translations
cursor = self.conn.cursor()
cursor.execute("""
SELECT wt.id, wt.work_id, wt.title, wt.audio_url, wt.translator,
wt.date_translated, wt.is_original_language, wt.created_at,
wt.updated_at, wt.language_code, wt.body
FROM work_translations wt
ORDER BY wt.work_id, wt.language_code
""")
for row in cursor.fetchall():
new_id = self.get_next_id('translations')
self.id_mappings.setdefault('translations', {})[row['id']] = new_id
# Map work_id
translatable_id = None
if row['work_id'] and row['work_id'] in self.id_mappings.get('works', {}):
translatable_id = self.id_mappings['works'][row['work_id']]
# Extract content
content = ""
if row['body']:
content_data = self.extract_yaml_content(row['body'])
if 'text' in content_data:
content = self.html_to_markdown(content_data['text'])
translation = {
'id': new_id,
'title': row['title'] or '',
'content': content,
'description': '',
'language': row['language_code'] or 'en',
'status': 'published' if row['is_original_language'] else 'draft',
'published_at': self.parse_timestamp(row['date_translated']),
'translatable_id': translatable_id,
'translatable_type': 'Work',
'translator_id': None, # Will need user mapping
'is_original_language': bool(row['is_original_language']),
'audio_url': row['audio_url'] or '',
'date_translated': self.parse_timestamp(row['date_translated']),
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
translations.append(translation)
# Migrate author translations
cursor.execute("""
SELECT at.id, at.author_id, at.first_name, at.last_name, at.full_name,
at.place_of_birth, at.place_of_death, at.pen_names, at.language_code,
at.created_at, at.updated_at,
atr.body as biography
FROM author_translations at
LEFT JOIN action_text_rich_texts atr ON at.id = atr.record_id AND atr.name = 'biography'
ORDER BY at.author_id, at.language_code
""")
for row in cursor.fetchall():
new_id = self.get_next_id('translations')
self.id_mappings.setdefault('translations', {})[row['id']] = new_id
# Map author_id
translatable_id = None
if row['author_id'] and row['author_id'] in self.id_mappings.get('authors', {}):
translatable_id = self.id_mappings['authors'][row['author_id']]
# Extract biography
content = ""
if row['biography']:
content_data = self.extract_yaml_content(row['biography'])
if 'text' in content_data:
content = self.html_to_markdown(content_data['text'])
# Create title from name components
title = ""
if row['full_name']:
title = row['full_name']
elif row['first_name'] and row['last_name']:
title = f"{row['first_name']} {row['last_name']}"
translation = {
'id': new_id,
'title': title,
'content': content,
'description': '',
'language': row['language_code'] or 'en',
'status': 'published',
'published_at': None,
'translatable_id': translatable_id,
'translatable_type': 'Author',
'translator_id': None,
'is_original_language': False,
'audio_url': '',
'date_translated': None,
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
translations.append(translation)
# Migrate country translations
cursor.execute("""
SELECT ct.id, ct.country_id, ct.name, ct.language_code,
ct.created_at, ct.updated_at
FROM country_translations ct
ORDER BY ct.country_id, ct.language_code
""")
for row in cursor.fetchall():
new_id = self.get_next_id('translations')
self.id_mappings.setdefault('translations', {})[row['id']] = new_id
# Map country_id
translatable_id = None
if row['country_id'] and row['country_id'] in self.id_mappings.get('countries', {}):
translatable_id = self.id_mappings['countries'][row['country_id']]
translation = {
'id': new_id,
'title': row['name'] or '',
'content': '',
'description': '',
'language': row['language_code'] or 'en',
'status': 'published',
'published_at': None,
'translatable_id': translatable_id,
'translatable_type': 'Country',
'translator_id': None,
'is_original_language': False,
'audio_url': '',
'date_translated': None,
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
translations.append(translation)
return translations
def migrate_books(self) -> List[Dict[str, Any]]:
"""Migrate books data"""
print("Migrating books...")
cursor = self.conn.cursor()
cursor.execute("""
SELECT b.id, b.created_at, b.updated_at, b.genre, b.types,
b.is_draft, b.genres,
bt.title, bt.annotation, bt.introduction, bt.isbn, bt.publisher,
bt.language_code
FROM books b
LEFT JOIN book_translations bt ON b.id = bt.book_id
ORDER BY b.id
""")
books = {}
for row in cursor.fetchall():
book_id = row['id']
if book_id not in books:
new_id = self.get_next_id('books')
self.id_mappings.setdefault('books', {})[book_id] = new_id
books[book_id] = {
'id': new_id,
'title': '',
'description': '',
'isbn': '',
'published_at': None,
'language': 'multi',
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
# Add translation data
if row['language_code'] == 'en':
if row['title']:
books[book_id]['title'] = row['title']
if row['annotation']:
books[book_id]['description'] = row['annotation']
elif row['introduction']:
books[book_id]['description'] = row['introduction']
if row['isbn']:
books[book_id]['isbn'] = row['isbn']
return list(books.values())
def migrate_users(self) -> List[Dict[str, Any]]:
"""Migrate users data"""
print("Migrating users...")
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM users")
users = []
for row in cursor.fetchall():
new_id = self.get_next_id('users')
self.id_mappings.setdefault('users', {})[row['id']] = new_id
user = {
'id': new_id,
'username': row['name'] or f"user_{new_id}",
'email': row['email'] or f"user_{new_id}@example.com",
'password': row['password'] or '', # Will need to be rehashed
'first_name': '',
'last_name': '',
'display_name': row['name'] or '',
'bio': '',
'avatar_url': '',
'role': 'reader',
'verified': False,
'active': True,
'created_at': self.parse_timestamp(row['created_at']),
'updated_at': self.parse_timestamp(row['updated_at'])
}
users.append(user)
return users
def create_junction_tables(self) -> Dict[str, List[Dict[str, Any]]]:
"""Create junction table data"""
print("Creating junction tables...")
junction_data = {
'book_works': [],
'author_countries': []
}
# Book-Works relationships
cursor = self.conn.cursor()
cursor.execute("SELECT book_id, work_id, `order` FROM book_works")
for row in cursor.fetchall():
book_id = self.id_mappings.get('books', {}).get(row['book_id'])
work_id = self.id_mappings.get('works', {}).get(row['work_id'])
if book_id and work_id:
junction_data['book_works'].append({
'book_id': book_id,
'work_id': work_id,
'order': int(row['order']) if row['order'] else 0
})
# Author-Countries relationships
cursor.execute("SELECT author_id, country_id FROM author_countries")
for row in cursor.fetchall():
author_id = self.id_mappings.get('authors', {}).get(row['author_id'])
country_id = self.id_mappings.get('countries', {}).get(row['country_id'])
if author_id and country_id:
junction_data['author_countries'].append({
'author_id': author_id,
'country_id': country_id
})
return junction_data
def create_sqlite_database(self):
"""Create SQLite database with Go/GORM schema and import data"""
print("Creating SQLite database...")
# Migrate core entities
countries = self.migrate_countries()
authors = self.migrate_authors()
works = self.migrate_works()
translations = self.migrate_translations()
books = self.migrate_books()
users = self.migrate_users()
# Create junction tables
junction_data = self.create_junction_tables()
# Create new SQLite database
db_path = self.output_dir / "tercul_go.db"
new_conn = sqlite3.connect(db_path)
new_conn.row_factory = sqlite3.Row
# Create tables with proper schema
self.create_tables(new_conn)
# Import data
self.import_data(new_conn, countries, authors, works, translations, books, users, junction_data)
new_conn.close()
print(f"SQLite database created: {db_path}")
return {
'countries': countries,
'authors': authors,
'works': works,
'translations': translations,
'books': books,
'users': users,
'junction_tables': junction_data,
'id_mappings': self.id_mappings,
'metadata': {
'total_records': len(countries) + len(authors) + len(works) + len(translations) + len(books) + len(users),
'migrated_at': datetime.now().isoformat(),
'source_database': self.source_db_path,
'target_database': str(db_path)
}
}
def create_tables(self, conn):
"""Create tables with Go/GORM compatible schema"""
cursor = conn.cursor()
# Create countries table
cursor.execute("""
CREATE TABLE countries (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
code TEXT,
phone_code TEXT,
currency TEXT,
continent TEXT,
language TEXT DEFAULT 'multi',
created_at TEXT,
updated_at TEXT
)
""")
# Create authors table
cursor.execute("""
CREATE TABLE authors (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
status TEXT DEFAULT 'active',
birth_date TEXT,
death_date TEXT,
language TEXT DEFAULT 'multi',
created_at TEXT,
updated_at TEXT
)
""")
# Create works table
cursor.execute("""
CREATE TABLE works (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
type TEXT DEFAULT 'other',
status TEXT DEFAULT 'draft',
published_at TEXT,
language TEXT DEFAULT 'multi',
created_at TEXT,
updated_at TEXT
)
""")
# Create translations table (polymorphic)
cursor.execute("""
CREATE TABLE translations (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
content TEXT,
description TEXT,
language TEXT NOT NULL,
status TEXT DEFAULT 'draft',
published_at TEXT,
translatable_id INTEGER NOT NULL,
translatable_type TEXT NOT NULL,
translator_id INTEGER,
is_original_language BOOLEAN DEFAULT FALSE,
audio_url TEXT,
date_translated TEXT,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (translator_id) REFERENCES users (id)
)
""")
# Create books table
cursor.execute("""
CREATE TABLE books (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
isbn TEXT,
format TEXT DEFAULT 'paperback',
status TEXT DEFAULT 'draft',
published_at TEXT,
language TEXT DEFAULT 'multi',
created_at TEXT,
updated_at TEXT
)
""")
# Create users table
cursor.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
first_name TEXT,
last_name TEXT,
display_name TEXT,
bio TEXT,
avatar_url TEXT,
role TEXT DEFAULT 'reader',
verified BOOLEAN DEFAULT FALSE,
active BOOLEAN DEFAULT TRUE,
created_at TEXT,
updated_at TEXT
)
""")
# Create junction tables
cursor.execute("""
CREATE TABLE book_works (
id INTEGER PRIMARY KEY,
book_id INTEGER NOT NULL,
work_id INTEGER NOT NULL,
"order" INTEGER DEFAULT 0,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (book_id) REFERENCES books (id),
FOREIGN KEY (work_id) REFERENCES works (id)
)
""")
cursor.execute("""
CREATE TABLE author_countries (
id INTEGER PRIMARY KEY,
author_id INTEGER NOT NULL,
country_id INTEGER NOT NULL,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (author_id) REFERENCES authors (id),
FOREIGN KEY (country_id) REFERENCES countries (id)
)
""")
# Create indexes
cursor.execute("CREATE INDEX idx_translations_translatable ON translations (translatable_id, translatable_type)")
cursor.execute("CREATE INDEX idx_translations_language ON translations (language)")
cursor.execute("CREATE INDEX idx_works_author ON works (author_id)")
cursor.execute("CREATE INDEX idx_book_works_book ON book_works (book_id)")
cursor.execute("CREATE INDEX idx_book_works_work ON book_works (work_id)")
cursor.execute("CREATE INDEX idx_author_countries_author ON author_countries (author_id)")
cursor.execute("CREATE INDEX idx_author_countries_country ON author_countries (country_id)")
conn.commit()
print("Tables created successfully")
def import_data(self, conn, countries, authors, works, translations, books, users, junction_data):
"""Import data into the new database"""
cursor = conn.cursor()
# Import countries
print("Importing countries...")
for country in countries:
cursor.execute("""
INSERT INTO countries (id, name, code, phone_code, currency, continent, language, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
country['id'], country['name'], country['code'], country['phone_code'],
country['currency'], country['continent'], country['language'],
country['created_at'], country['updated_at']
))
# Import authors
print("Importing authors...")
for author in authors:
cursor.execute("""
INSERT INTO authors (id, name, biography, birth_date, death_date, language, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
author['id'], author['name'], author['biography'], author['birth_date'],
author['death_date'], author['language'], author['created_at'], author['updated_at']
))
# Import works
print("Importing works...")
for work in works:
cursor.execute("""
INSERT INTO works (id, title, content, description, author_id, published_at, status, language, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
work['id'], work['title'], work['content'], work['description'],
work['author_id'], work['published_at'], work['status'], work['language'],
work['created_at'], work['updated_at']
))
# Import translations
print("Importing translations...")
for translation in translations:
cursor.execute("""
INSERT INTO translations (id, title, content, description, language, status, published_at,
translatable_id, translatable_type, translator_id, is_original_language,
audio_url, date_translated, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
translation['id'], translation['title'], translation['content'], translation['description'],
translation['language'], translation['status'], translation['published_at'],
translation['translatable_id'], translation['translatable_type'], translation['translator_id'],
translation['is_original_language'], translation['audio_url'], translation['date_translated'],
translation['created_at'], translation['updated_at']
))
# Import books
print("Importing books...")
for book in books:
cursor.execute("""
INSERT INTO books (id, title, description, isbn, published_at, language, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
book['id'], book['title'], book['description'], book['isbn'],
book['published_at'], book['language'], book['created_at'], book['updated_at']
))
# Import users
print("Importing users...")
for user in users:
cursor.execute("""
INSERT INTO users (id, username, email, password, first_name, last_name, display_name, bio, avatar_url, role, verified, active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
user['id'], user['username'], user['email'], user['password'],
user['first_name'], user['last_name'], user['display_name'], user['bio'],
user['avatar_url'], user['role'], user['verified'], user['active'],
user['created_at'], user['updated_at']
))
# Import junction tables
print("Importing junction tables...")
for book_work in junction_data['book_works']:
cursor.execute("""
INSERT INTO book_works (book_id, work_id, "order")
VALUES (?, ?, ?)
""", (book_work['book_id'], book_work['work_id'], book_work['order']))
for author_country in junction_data['author_countries']:
cursor.execute("""
INSERT INTO author_countries (author_id, country_id)
VALUES (?, ?)
""", (author_country['author_id'], author_country['country_id']))
conn.commit()
print("Data import completed")
def save_migration_data(self):
"""Save all migration data to JSON files"""
print("Saving migration data...")
# Migrate core entities
countries = self.migrate_countries()
authors = self.migrate_authors()
works = self.migrate_works()
translations = self.migrate_translations()
books = self.migrate_books()
users = self.migrate_users()
# Create junction tables
junction_data = self.create_junction_tables()
# Save data to files
migration_data = {
'countries': countries,
'authors': authors,
'works': works,
'translations': translations,
'books': books,
'users': users,
'junction_tables': junction_data,
'id_mappings': self.id_mappings,
'metadata': {
'total_records': len(countries) + len(authors) + len(works) + len(translations) + len(books) + len(users),
'migrated_at': datetime.now().isoformat(),
'source_database': self.source_db_path
}
}
# Save main migration data
with open(self.output_dir / 'migration_data.json', 'w', encoding='utf-8') as f:
json.dump(migration_data, f, indent=2, ensure_ascii=False)
# Save ID mappings separately for reference
with open(self.output_dir / 'id_mappings.json', 'w', encoding='utf-8') as f:
json.dump(self.id_mappings, f, indent=2, ensure_ascii=False)
# Save individual entity files
for entity_name, data in [
('countries', countries),
('authors', authors),
('works', works),
('translations', translations),
('books', books),
('users', users)
]:
with open(self.output_dir / f'{entity_name}.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Migration data saved to: {self.output_dir}")
return migration_data
def generate_migration_summary(self) -> Dict[str, Any]:
"""Generate a summary of the migration"""
summary = {
'source_database': self.source_db_path,
'migrated_at': datetime.now().isoformat(),
'record_counts': {
'countries': len(self.id_mappings.get('countries', {})),
'authors': len(self.id_mappings.get('authors', {})),
'works': len(self.id_mappings.get('works', {})),
'translations': len(self.id_mappings.get('translations', {})),
'books': len(self.id_mappings.get('books', {})),
'users': len(self.id_mappings.get('users', {}))
},
'total_records': sum(len(mapping) for mapping in self.id_mappings.values()),
'next_ids': self.next_ids
}
with open(self.output_dir / 'migration_summary.json', 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
return summary
def main():
"""Main function to run the data migration preparation"""
source_db = "tercul_data.db"
if not Path(source_db).exists():
print(f"Source database not found: {source_db}")
return
print("Starting data migration and database creation...")
preparer = DataMigrationPreparer(source_db)
# Create SQLite database with migrated data
migration_data = preparer.create_sqlite_database()
# Generate summary
summary = preparer.generate_migration_summary()
print("\nMigration and Database Creation Summary:")
print("=" * 50)
for entity, count in summary['record_counts'].items():
print(f"{entity:15}: {count:6,} records")
print(f"{'Total':15}: {summary['total_records']:6,} records")
print(f"\nSQLite database created: {preparer.output_dir}/tercul_go.db")
print(f"Migration data saved to: {preparer.output_dir}")
if __name__ == "__main__":
main()