luceos
I wrote two Python scripts to achieve data merging.
The known issues are: there are problems with notifications and responses to users. I will paste the code below, hoping it can inspire others.
Code 1, used for aligning the structure of two databases.
code1:
from sqlalchemy import create_engine, MetaData, text
from sqlalchemy.schema import CreateColumn
# Database connection configuration
db1_url = "mysql+pymysql://root:root@localhost:3306/your_database1"
db2_url = "mysql+pymysql://root:root@localhost:3306/your_database2"
engine1 = create_engine(db1_url)
engine2 = create_engine(db2_url)
# Create MetaData objects
metadata1 = MetaData()
metadata2 = MetaData()
# Reflect the structure of database 1 and database 2
metadata1.reflect(bind=engine1)
metadata2.reflect(bind=engine2)
# Disable foreign key checks and create missing tables
with engine1.connect() as conn:
conn.execute(text("SET FOREIGN_KEY_CHECKS = 0"))
for table_name, table2 in metadata2.tables.items():
if table_name not in metadata1.tables:
try:
table2.create(bind=conn)
print(f"Table {table_name} has been created in database 1.")
except Exception as e:
print(f"Error occurred while creating table {table_name}: {e}")
conn.execute(text("SET FOREIGN_KEY_CHECKS = 1"))
# Add missing columns to the existing tables
with engine1.connect() as conn:
for table_name, table2 in metadata2.tables.items():
if table_name in metadata1.tables:
table1 = metadata1.tables[table_name]
existing_columns = {col.name for col in table1.columns}
for column in table2.columns:
if column.name not in existing_columns:
# Generate the SQL definition for the column
col_definition = str(CreateColumn(column).compile(dialect=engine1.dialect))
alter_sql = f"ALTER TABLE {table_name} ADD COLUMN {col_definition}"
try:
conn.execute(text(alter_sql))
print(f"Column {column.name} has been added to table {table_name} in database 1.")
except Exception as e:
print(f"Error occurred while adding column {column.name} to table {table_name}: {e}")
Code 2 is used to merge data.
import re
import logging
from collections import defaultdict
from sqlalchemy import create_engine, text, MetaData, Table, inspect, and_, select
from sqlalchemy.exc import IntegrityError
from tqdm import tqdm
# Configure logging - change the level from INFO to WARNING
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Database connections
db1_url = "mysql+pymysql://root:root@localhost:3306/your_database1"
db2_url = "mysql+pymysql://root:root@localhost:3306/your_database2"
engine1 = create_engine(db1_url)
engine2 = create_engine(db2_url)
# Order of tables to process
order = [
'users', 'groups', 'tags', 'discussions', 'posts', 'migrations',
'group_permission', 'banned_ips', 'api_keys', 'access_tokens',
'group_user', 'tag_user', 'discussion_tag', 'discussion_user', 'post_user',
'pages', 'custom_emojis', 'drafts', 'post_edit_histories', 'flags',
'fof_linguist_strings', 'fof_mason_fields', 'fof_mason_answers',
'fof_terms_policies', 'fof_terms_policy_user', 'therealsujitk_gifs',
'neonchat_chats', 'neonchat_messages', 'achievements', 'achievement_user',
'badges', 'badge_user', 'post_likes', 'post_reactions', 'user_followers',
'ignored_user', 'email_tokens', 'password_tokens', 'registration_tokens',
'notifications', 'kilowhat_audit_log', 'user_money_history', 'settings',
'html_headers', 'webhooks',
'badge_category', 'clarkwinkelmann_group_list', 'conversation_user',
'conversations', 'criteria', 'criterion_user', 'discussion_sticky_tag',
'discussion_views', 'fof_mason_discussion_answer', 'fof_merge_discussions_redirections',
'links', 'login_providers', 'lotteries', 'lottery_options', 'lottery_participants',
'messages', 'money_rewards', 'money_to_all', 'my_emoji', 'neonchat_chat_user',
'poll_options', 'poll_votes', 'polls', 'post_anonymous_reactions',
'post_edit_histories_archive', 'post_mentions_group', 'post_mentions_post',
'post_mentions_tag', 'post_mentions_user', 'post_spams', 'post_votes',
'rank_users', 'ranks', 'reactions', 'username_requests', 'warnings'
def get_primary_key_column(inspector, table_name):
"""Get the primary key column name of a table"""
pk_columns = inspector.get_pk_constraint(table_name)['constrained_columns']
if pk_columns:
return pk_columns[0] # Assume single-column primary key
return None
def get_table_columns(inspector, table_name):
"""Get all column names of a table"""
return [col['name'] for col in inspector.get_columns(table_name)]
def get_unique_constraints(connection, table_name):
"""Get all unique constraints and their columns"""
query = text("""
SELECT tc.CONSTRAINT_NAME, kcu.COLUMN_NAME
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.KEY_COLUMN_USAGE kcu
ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
AND tc.TABLE_NAME = kcu.TABLE_NAME
WHERE tc.TABLE_SCHEMA = DATABASE()
AND tc.TABLE_NAME = :table_name
AND tc.CONSTRAINT_TYPE IN ('UNIQUE', 'PRIMARY KEY')
ORDER BY tc.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
""")
result = connection.execute(query, {"table_name": table_name})
constraints = defaultdict(list)
for row in result:
constraints[row[0]].append(row[1])
return constraints
def get_foreign_keys(inspector, table_name):
"""Get all foreign keys of a table"""
return inspector.get_foreign_keys(table_name)
def parse_constraint_name(error_message):
"""Parse constraint name from MySQL error message"""
# Common MySQL unique constraint error formats
patterns = [
r"Duplicate entry ['\"](.+?)['\"] for key ['\"](.+?)['\"]", # Standard format
r"for key ['\"](.+?)['\"]", # Simplified format
]
for pattern in patterns:
match = re.search(pattern, str(error_message))
if match:
# For the first pattern, we need the second capture group
if pattern == patterns[0]:
constraint_name = match.group(2)
else:
# For other patterns, we need the first capture group
constraint_name = match.group(1)
# If the constraint includes the table name, only extract the constraint name
if '.' in constraint_name:
# Save full constraint name including table prefix
full_constraint_name = constraint_name
# Extract only the constraint name part
constraint_name = constraint_name.split('.')[-1]
return constraint_name
# If no constraint name found, log the error and return None
logger.warning(f"Unable to parse constraint name from error message: {error_message}")
return None
def extract_values_from_error(error_message):
"""Extract duplicate values from error message"""
match = re.search(r"Duplicate entry ['\"](.+?)['\"] for key", str(error_message))
if match:
duplicate_value = match.group(1)
return duplicate_value
return None
def find_existing_record(connection, table_name, constraints, constraint_name, row_data, error_message=None):
"""Find existing record in the target database based on unique constraints"""
table = Table(table_name, MetaData(), autoload_with=engine1)
pk_column = get_primary_key_column(inspect(engine1), table_name)
# Extract duplicate value from error message
duplicate_value = None
if error_message:
duplicate_value = extract_values_from_error(error_message)
# First, try to find related columns directly using the constraint name
columns = constraints.get(constraint_name, [])
# If no columns found, try to look for keys that might include the constraint name
if not columns:
for const_name, const_columns in constraints.items():
if constraint_name and constraint_name in const_name:
columns = const_columns
break
# If columns are still not found, try to search based on duplicate value in error message
if not columns and duplicate_value:
# Get all columns of the table
all_columns = get_table_columns(inspect(engine1), table_name)
# Try each column
for column_name in all_columns:
try:
# Build query
stmt = select(table).where(table.c[column_name] == duplicate_value)
result = connection.execute(stmt)
record = result.fetchone()
if record:
return record
except Exception:
continue
# If all previous attempts fail, build conditions based on constraint columns
if columns:
conditions = []
for column in columns:
if column in row_data:
conditions.append(table.c[column] == row_data[column])
if conditions:
# Query existing records
stmt = select(table).where(and_(*conditions))
result = connection.execute(stmt)
record = result.fetchone()
if record:
return record
# Finally, try to query common unique fields
common_unique_fields = {
'users': ['username', 'email'],
'groups': ['name_singular', 'name_plural'],
'tags': ['name', 'slug'],
'discussions': ['slug', 'title'],
}
if table_name in common_unique_fields:
for field in common_unique_fields[table_name]:
if field in row_data and row_data[field]:
try:
stmt = select(table).where(table.c[field] == row_data[field])
result = connection.execute(stmt)
record = result.fetchone()
if record:
return record
except Exception:
continue
logger.warning(f"Could not find existing record matching constraint {constraint_name}")
return None
def process_foreign_key(connection, table_name, foreign_key, row_data, mappings):
"""Process foreign key mappings for a single foreign key"""
local_cols = foreign_key['constrained_columns']
ref_cols = foreign_key['referred_columns']
ref_table = foreign_key['referred_table']
# Check if a mapping exists and if there is mapping data
if ref_table not in mappings or not mappings[ref_table]:
return row_data
# Process each foreign key column
for local_col, ref_col in zip(local_cols, ref_cols):
if local_col in row_data and row_data[local_col] is not None:
# Get old foreign key value
old_fk_value = row_data[local_col]
# Check if there is a corresponding new ID mapping
if old_fk_value in mappings[ref_table]:
row_data[local_col] = mappings[ref_table][old_fk_value]
return row_data
def handle_post_discussion_dependency(connection, table_name, row_data, mappings):
"""Special handling of foreign key dependencies between discussions and posts"""
if table_name == 'discussions':
# Don't process posts-related foreign keys for discussions
return row_data
if table_name == 'posts':
# Handle discussions foreign key in posts
foreign_keys = get_foreign_keys(inspect(engine2), table_name)
for fk in foreign_keys:
if fk['referred_table'] == 'discussions':
local_cols = fk['constrained_columns']
ref_cols = fk['referred_columns']
for local_col, ref_col in zip(local_cols, ref_cols):
if local_col in row_data and row_data[local_col] is not None:
old_discussion_id = row_data[local_col]
if old_discussion_id in mappings['discussions']:
row_data[local_col] = mappings['discussions'][old_discussion_id]
return row_data
def update_discussion_posts_fk(connection, mappings):
"""Update foreign key relations between discussions and posts"""
discussions_table = Table('discussions', MetaData(), autoload_with=engine1)
post_related_cols = ['first_post_id', 'last_post_id']
for col in post_related_cols:
# Create temporary mapping table
connection.execute(text("""
CREATE TEMPORARY TABLE temp_post_mapping (
old_id INT PRIMARY KEY,
new_id INT
)
"""))
# Insert mapping data
if mappings['posts']:
for old_id, new_id in mappings['posts'].items():
connection.execute(text(f"""
INSERT INTO temp_post_mapping (old_id, new_id)
VALUES ({old_id}, {new_id})
"""))
# Create another temporary table to store records to update
connection.execute(text("""
CREATE TEMPORARY TABLE temp_discussions_to_update (
discussion_id INT PRIMARY KEY,
old_post_id INT,
new_post_id INT
)
"""))
# Find all discussions that need updates
connection.execute(text(f"""
INSERT INTO temp_discussions_to_update (discussion_id, old_post_id, new_post_id)
SELECT d.id, d.{col}, m.new_id
FROM discussions d
JOIN temp_post_mapping m ON d.{col} = m.old_id
"""))
# Perform update using data from temporary table
connection.execute(text(f"""
UPDATE discussions d
JOIN temp_discussions_to_update t ON d.id = t.discussion_id
SET d.{col} = t.new_post_id
"""))
# Drop temporary tables
connection.execute(text("DROP TEMPORARY TABLE IF EXISTS temp_discussions_to_update"))
connection.execute(text("DROP TEMPORARY TABLE IF EXISTS temp_post_mapping"))
# Additional functions (e.g., find_existing_by_unique_columns, etc.) would be translated in a similar manner.
def main():
"""Main function to migrate data"""
# Initialize mapping table
mappings = {table: {} for table in order}
# Get inspector objects for both databases
inspector1 = inspect(engine1)
inspector2 = inspect(engine2)
# Begin migration process
with engine1.begin() as connection:
# Disable foreign key checks
connection.execute(text("SET FOREIGN_KEY_CHECKS=0"))
try:
# Process each table in order
for table_name in tqdm(order, desc="Migrating tables"):
pk_column = get_primary_key_column(inspector2, table_name)
if not pk_column:
logger.warning(f"Table {table_name} does not have a primary key, skipping")
continue
columns = get_table_columns(inspector2, table_name)
unique_constraints = get_unique_constraints(connection, table_name)
# Get source data
source_meta = MetaData()
source_table = Table(table_name, source_meta, autoload_with=engine2)
source_data = []
with engine2.connect() as source_conn:
stmt = select(source_table)
source_data = list(source_conn.execute(stmt))
# Get foreign key information
foreign_keys = get_foreign_keys(inspector2, table_name)
# Identify which primary keys are also foreign keys
fk_columns = set()
for fk in foreign_keys:
fk_columns.update(fk['constrained_columns'])
# Process each record
for row in tqdm(source_data, desc=f"Migrating {table_name}", leave=False):
row_data = dict(row._mapping)
old_id = row_data[pk_column]
# Process foreign key mappings
for fk in foreign_keys:
ref_table = fk['referred_table']
if table_name == 'discussions' and ref_table == 'posts':
continue
row_data = process_foreign_key(connection, table_name, fk, row_data, mappings)
# Handle special dependencies for discussions and posts
row_data = handle_post_discussion_dependency(connection, table_name, row_data, mappings)
# Prepare data for insertion
pk_columns = get_primary_key_columns(inspector2, table_name)
# Determine whether to retain primary key value
retain_pk = (
not is_autoincrement_column(inspector2, table_name, pk_column) or
table_name in ['email_tokens', 'password_tokens', 'registration_tokens']
)
# Prepare insert data
if retain_pk:
insert_data = row_data.copy()
else:
if len(pk_columns) > 1: # Composite primary key
insert_data = row_data.copy()
else:
insert_data = {col: row_data[col] for col in row_data if col != pk_columns[0]}
# First check if record already exists
existing_record = None
existing_record = find_existing_by_unique_columns(connection, table_name, row_data)
if existing_record:
existing_id = existing_record._mapping[pk_column]
mappings[table_name][old_id] = existing_id
continue # Skip insert process
try:
target_meta = MetaData()
target_table = Table(table_name, target_meta, autoload_with=engine1)
if pk_column in fk_columns:
result = connection.execute(target_table.insert().values(**insert_data))
new_id = insert_data[pk_column] # Use original primary key value
else:
result = connection.execute(target_table.insert().values(**insert_data))
new_id = result.inserted_primary_key[0]
if table_name == "group_permission":
composite_key = f"{row_data.get('group_id')}:{row_data.get('permission')}"
mappings[table_name][old_id] = composite_key
else:
mappings[table_name][old_id] = new_id
except IntegrityError as e:
error_msg = str(e)
constraint_name = parse_constraint_name(error_msg)
# Try to find existing record with the same unique value
existing_record = None
if constraint_name:
existing_record = find_existing_record(connection, table_name, unique_constraints, constraint_name, row_data, error_msg)
if not existing_record:
existing_record = find_existing_by_unique_columns(connection, table_name, row_data)
if existing_record:
existing_id = existing_record._mapping[pk_column]
mappings[table_name][old_id] = existing_id
else:
duplicate_value = extract_values_from_error(error_msg)
if duplicate_value:
all_columns = get_table_columns(inspector1, table_name)
table = Table(table_name, MetaData(), autoload_with=engine1)
for col in all_columns:
try:
stmt = select(table).where(table.c[col] == duplicate_value)
result = connection.execute(stmt)
record = result.fetchone()
if record:
existing_id = record._mapping[pk_column]
mappings[table_name][old_id] = existing_id
break
except Exception:
pass
if old_id not in mappings[table_name]:
logger.warning(f"Could not find existing record matching {table_name}.{old_id}")
update_discussion_posts_fk(connection, mappings)
finally:
connection.execute(text("SET FOREIGN_KEY_CHECKS=1"))
print("Database migration complete")
if __name__ == "__main__":
main()