翼度科技»论坛 编程开发 mysql 查看内容

如何快速在表级别做同构或者异构数据库之间的数据迁移/备份

10

主题

10

帖子

30

积分

新手上路

Rank: 1

积分
30
 
与库级别的备份还原这一重量级操作套路不同,单个表或者多个表的备份还原,亦或是跨实例或者异构数据库之间的数据同步,作为DBA日常处理的最多的问题之一,如何简洁,快速,高效地实现快速,是一个值得思考的问题?
 
同一个实例上表级别备份还原

对于源表和目标表,如果在同一个实例上,仅从SQL的层面就可以实现表级别的备份还原:

  • MySQL中 :create target_table like source_table; insert into target_table select * from source_table;
  • SQLServer中:select * into target_table from source_table;
  • PostgreSQL中:create table target as table source_table;
正常情况下,MySQL和SQLServer都可以实现跨库操作,也就是源表和目标表可以不在同一个数据库中,PostgreSQL中无法直接实现跨库操作。
 
非一个实例上表级别备份还原

对于源表和目标表,如果不在同一个实例,以上方法均无法完成单表/多表的备份,可采用的方法:

  • 1,基于表级别的逻辑备份和还原,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
  • 2,MySQL可以直接备份表对应的文件到目标实例,前提是开启了innodb_file_per_table
  • 3,自定义代码实现,类似于ETL的功能
 MySQL中表文件级别备份还原

在MySQL表级别迁移/备份实现:
  1. 源表在DB01,目标表在DB02
  2. USE db02;
  3. CREATE TABLE my_table
  4. (
  5.     c1 BIGINT AUTO_INCREMENT PRIMARY key,
  6.     c2 VARCHAR(100),
  7.     c3 VARCHAR(100),
  8.     c4 VARCHAR(100),
  9.     c5 DATETIME(6)
  10. );
  11. -- 丢弃目标表的idb数据文件
  12. USE db02
  13. alter table my_table discard tablespace;   
  14. -- 锁定源表防止被修改
  15. USE db01;
  16. flush table my_table for EXPORT;
  17. -- 然后拷贝源表src的cfg文件和ibd文件到目标表dst,命令如下:
  18. cp ./db01/my_table.ibd ./db02/
  19. -- 对原表执行解锁操作
  20. USE db01;
  21. unlock TABLES;
  22. -- 修改目标表的文件属主
  23. chown mysql.mysql my_table.ibd
  24. -- 对目标表执行导入
  25. USE db02;
  26. alter table my_table import TABLESPACE;
  27. USE db02;
  28. SELECT * FROM my_table LIMIT 100;
复制代码
自定义代码实现表级别同构/异构数据库之间同步

鉴于对于表的同步,并不限于同构类型的数据库,或者是简单的表到表的备份,可以是异构数据库,或者是一个联合查询结果导出/备份到其他数据库,所以自定义代码的方式可以最大限度地适应各种情况。
代码实现大概思路就是,通过配置一个源(实际上是一个查询语句source_select_sql),再配置一个目标表(实际上是一个insert语句),定义好源的数据库类型(MySQL/PostgreSQL/SQLServer),定义好目标表的数据库类型(MySQL/PostgreSQL/SQLServer),以及相关的批处理参数,通过批量从源中读取数据(表或者查询),然后批量写入目标表,即可快速高地实现表在同构/异构级别的数据同步。
以下实现了MySQL/PostgreSQL/SQLServer之间,任意两个数据库之间表级别的并发数据备份/迁移,以下demo,忽略网络传输的情况下,以MySQL单实例本从A库到B库为例,每秒可导入5000行左右。

 具体代码实现
  1. import base64
  2. import datetime
  3. import uuid
  4. import threading
  5. import multiprocessing
  6. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  7. from multiprocessing import Process
  8. from multiprocessing import Pool
  9. import psycopg2
  10. import pymssql
  11. import pymysql
  12. from enum import Enum
  13. import logging
  14. logger = logging.getLogger('data-sync-log')
  15. logger.setLevel(level=logging.INFO)
  16. formatter = logging.Formatter('%(asctime)s -%(name)s -%(levelname)s -%(message)s')
  17. # handler = logging.FileHandler("data-sync-log-{0}.txt".format(time.strftime("%Y-%m-%d-%H-%M-%S")))
  18. # handler.setFormatter(formatter)
  19. # logger.addHandler(handler)
  20. console = logging.StreamHandler()  
  21. console.setLevel(logging.INFO)
  22. logger.addHandler(console)
  23. class DataBaseType(Enum):
  24.     MSSQL = 1
  25.     POSTGRESQL = 2
  26.     MYSQL = 3
  27. def get_postgre_conn(conn):
  28.     # conn = psycopg2.connect(**conn, as_dict=True)
  29.     try:
  30.         conn = psycopg2.connect(**conn)
  31.         return conn
  32.     except Exception:
  33.         raise
  34. def get_mssql_conn(conn):
  35.     # conn = pymssql.connect(**conn, as_dict=True)
  36.     try:
  37.         conn = pymssql.connect(**conn)
  38.         return conn
  39.     except Exception:
  40.         raise
  41. def get_mysql_conn(conn):
  42.     # conn = pymssql.connect(**conn, as_dict=True)
  43.     try:
  44.         conn = pymysql.connect(**conn)
  45.         return conn
  46.     except Exception:
  47.         raise
  48. def get_db_conn(db_type, conn):
  49.     if db_type.name == 'MSSQL':
  50.         return get_mssql_conn(conn)
  51.     elif db_type.name == 'POSTGRESQL':
  52.         return get_postgre_conn(conn)
  53.     elif db_type.name == 'MYSQL':
  54.         return get_mysql_conn(conn)
  55.     return None
  56. def get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,read_page_size=50000):
  57.     source_conn = get_db_conn(source_server_type, source_server)
  58.     # for mssql the execute function will do nothing
  59.     # fetchmany will only return page size result to client for pymssql and pymysql
  60.     with source_conn.cursor() as source_cursor:
  61.         source_cursor.execute(source_select_sql.format(last_sync_timestamp))
  62.         while True:
  63.             list_result = source_cursor.fetchmany(read_page_size)
  64.             if list_result:
  65.                 yield list_result
  66.                 list_result.clear()
  67.             else:
  68.                 break
  69. def get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_page_size=50000):
  70.     source_conn = get_db_conn(source_server_type, source_server)
  71.     v_last_sync_timestamp = last_sync_timestamp;
  72.     list_result = []
  73.     #for pymysql, the execute will fetch all the result from server to client one time, so paging is necessary
  74.     with source_conn.cursor() as source_cursor:
  75.         current_start_no = 1
  76.         while True:
  77.             source_cursor.execute(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
  78.             #print(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
  79.             result = source_cursor.fetchmany(read_page_size)
  80.             if len(result)>0:
  81.                 current_start_no = current_start_no + 1
  82.                 for row in result:
  83.                     list_result.append(row)
  84.                     if len(list_result) == read_page_size:
  85.                         yield list_result
  86.                         list_result.clear()
  87.                 if len(list_result) > 0:
  88.                     yield list_result
  89.             else:
  90.                 break
  91. def get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size=50000):
  92.     source_conn = get_db_conn(source_server_type, source_server)
  93.     # target_conn.set_session(autocommit=True)
  94.     # psycopg2.extras.register_uuid()
  95.     '''
  96.     page_index = 0
  97.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  98.     para_col = para_col[:-1] + ')'
  99.     '''
  100.     v_last_execute_timestamp = last_sync_timestamp
  101.     v_current_timestamp = datetime.datetime.utcnow()
  102.     list_result = []
  103.     with source_conn.cursor(name="client_cursor") as source_cursor:
  104.         # for postgresql type data source
  105.         # create client cursor for postgresql database,
  106.         # fetchmany will return all the select result from server side in postgresql, so use client cursor
  107.         source_cursor.execute(source_select_sql.format(v_last_execute_timestamp))
  108.         for row in source_cursor:
  109.             list_result.append(row)
  110.             if len(list_result) == read_batch_size:
  111.                 yield list_result
  112.                 list_result.clear()
  113.         if len(list_result) > 0:
  114.             yield list_result
  115. def get_data_from_datasource(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size):
  116.     if source_server_type == DataBaseType.MYSQL:
  117.         return get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  118.                                    read_batch_size)
  119.     if source_server_type == DataBaseType.POSTGRESQL:
  120.         return get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  121.                                         read_batch_size)
  122.     if source_server_type == DataBaseType.MSSQL:
  123.         return get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  124.                                    read_batch_size)
  125. def execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,  col_size, list_datasource, insert_batch_size):
  126.     batch_size_index = 0
  127.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  128.     para_col = para_col[:-1] + ')'
  129.     conn = get_db_conn(target_server_type, target_server)
  130.     conn.set_session(autocommit=True)
  131.     psycopg2.extras.register_uuid()
  132.     with conn.cursor() as cursor:
  133.         while batch_size_index * insert_batch_size < len(list_datasource):
  134.             try:
  135.                 star_pos = batch_size_index * insert_batch_size
  136.                 end_pos = (batch_size_index + 1) * insert_batch_size
  137.                 args_str = ','.join( cursor.mogrify(para_col, x).decode('utf-8') for x in tuple(list_datasource[star_pos: end_pos]))
  138.                 cursor.execute(target_insert_statement.format(args_str))
  139.                 logger.info(
  140.                     str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,
  141.                                                                                                 len(list_datasource[
  142.                                                                                                     star_pos:end_pos])))
  143.                 batch_size_index = batch_size_index + 1
  144.                 conn.commit()
  145.             except Exception as err:
  146.                 raise err
  147.     conn.close()
  148. def execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name, col_size,list_datasource, batch_size):
  149.     batch_size_index = 0
  150.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  151.     para_col = para_col[:-1] + ')'
  152.     conn = get_db_conn(target_server_type, target_server)
  153.     with conn.cursor() as cursor:
  154.         while batch_size_index * batch_size < len(list_datasource):
  155.             try:
  156.                 cursor.executemany(target_insert_statement, list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size])
  157.                 logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
  158.                 batch_size_index = batch_size_index + 1
  159.                 conn.commit()
  160.             except Exception as err:
  161.                 raise err
  162.     conn.close()
  163. def execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name, col_size, list_datasource, batch_size):
  164.     batch_size_index = 0
  165.     conn = get_db_conn(target_server_type, target_server)
  166.     with conn.cursor() as cursor:
  167.         while batch_size_index * batch_size < len(list_datasource):
  168.             try:
  169.                 conn.bulk_copy(table_name = target_insert_statement, elements=list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size],column_ids=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21])
  170.                 logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
  171.                 batch_size_index = batch_size_index + 1
  172.                 conn.commit()
  173.             except Exception as err:
  174.                 raise err
  175.     conn.close()
  176. def execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size):
  177.     if target_server_type == DataBaseType.MYSQL:
  178.         execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name,
  179.                                    col_size, list_data_source, insert_batch_size)
  180.     elif target_server_type == DataBaseType.POSTGRESQL:
  181.         execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,
  182.                                         col_size, list_data_source, insert_batch_size)
  183.     elif target_server_type == DataBaseType.MSSQL:
  184.         execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name,
  185.                                    col_size, list_data_source, insert_batch_size)
  186. def sync_data_from_source2target(source_server, source_server_type, source_select_sql,
  187.                                  target_server, target_server_type, target_table_name, col_size,
  188.                                  target_insert_statement,
  189.                                  sync_type='FULL', insert_batch_size=5000, read_batch_size=50000):
  190.     # target_conn = get_db_conn(target_server_type, target_server)
  191.     # target_conn.set_session(autocommit=True)
  192.     # psycopg2.extras.register_uuid()
  193.     '''
  194.     page_index = 0
  195.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  196.     para_col = para_col[:-1] + ')'
  197.     '''
  198.     v_last_execute_timestamp = datetime.datetime.min;
  199.     v_current_timestamp = datetime.datetime.utcnow()
  200.     if sync_type == 'Delta':
  201.         last_sync_info = get_last_sync_position(target_server, target_server_type, target_table_name)
  202.         log_id = last_sync_info[0]
  203.         v_last_execute_timestamp = last_sync_info[2]
  204.     for list_data_source in get_data_from_datasource(source_server, source_server_type, source_select_sql,  v_last_execute_timestamp, read_batch_size):
  205.         execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size)
  206.     if sync_type == 'Delta':
  207.         update_last_sync_position(target_server, log_id, target_server_type, v_current_timestamp)
  208. def get_last_sync_position(conn, db_type, table_name):
  209.     conn = get_db_conn(db_type, conn)
  210.     with conn.cursor() as cursor:
  211.         cursor.execute('''select id,position_type,position_value from data_sync_position_log where table_name = %s ''',
  212.                        [table_name])
  213.         result = cursor.fetchone()
  214.     conn.close()
  215.     return result
  216. def update_last_sync_position(conn, id, db_type, position_value):
  217.     conn = get_db_conn(db_type, conn)
  218.     last_sync_timestamp = None
  219.     with conn.cursor() as cursor:
  220.         # print('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',[position_value,id])
  221.         cursor.execute('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',
  222.                        [position_value, id])
  223.     conn.commit()
  224.     conn.close()
  225.     return last_sync_timestamp
  226. def sync_data_from_source2target_worker(para_dict):
  227.     '''
  228.     process current
  229.     :param para_dict:
  230.     :return:
  231.     '''
  232.     list_future = []
  233.     with ProcessPoolExecutor(max_workers=10) as executor:
  234.         try:
  235.             # 通过submit函数提交执行的函数到进程池中,submit函数立即返回,不阻塞
  236.             for key, val in para_dict.items():
  237.                 future = executor.submit(sync_data_from_source2target,
  238.                                          val['source_server'],
  239.                                          val['source_server_type'],
  240.                                          val['source_select_sql'],
  241.                                          val['target_server'],
  242.                                          val['target_server_type'],
  243.                                          val['target_table_name'],
  244.                                          val['col_size'],
  245.                                          val['target_insert_statement'],
  246.                                          val['sync_type'],
  247.                                          val['insert_batch_size'],
  248.                                          val['read_page_size'])
  249.                 list_future.append(future)
  250.         except Exception as ex:
  251.             raise Exception('startup process exception: ' + str(ex))
  252.         try:
  253.             # 遍历list_future的result方法,获取进程的执行结果或者异常信息
  254.             for future in list_future:
  255.                 future.result()
  256.         except Exception as ex:
  257.             raise Exception('process execute exception: ' + str(ex))
  258. <br><br>def mysql_2_mysql():
  259.     para_dict = {}
  260.     sync_type = 'Full'  # Full or Delta
  261.     insert_batch_size = 5000
  262.     read_page_size = 50000
  263.     # mysql to mssql case
  264.     source_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****',  'password': '******'}
  265.     target_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****',  'password': '******'}
  266.     para_dict['test_mytable'] = {'source_server': source_conn,
  267.                                  'source_server_type': DataBaseType.MYSQL,
  268.                                  'source_select_sql': ''' SELECT c1,c2,c3,c4,c5 FROM my_table limit {0},{1}; ''',
  269.                                  'target_table_name': 'my_table',
  270.                                  'target_server': target_conn,
  271.                                  'target_server_type': DataBaseType.MYSQL,
  272.                                  #这个参数原本是想自动生成insert 语句的,直接写一个也不麻烦,这个参数废弃
  273.                                  'col_size': 7,
  274.                                  'target_insert_statement': '''INSERT INTO my_table (c1,c2,c3,c4,c5) VALUES (%s,%s,%s,%s,%s);''',
  275.                                  'insert_batch_size': insert_batch_size,
  276.                                  'read_page_size': read_page_size,
  277.                                  #原本一直想做Delta,也就是差异数据同步的,但是某些情况下无法实现,比如原表上发生的update或者delete操作,无法精确地找到差异的数据,这个参数废弃
  278.                                  'sync_type': 'FULL'}
  279.     sync_data_from_source2target_worker(para_dict)
  280. if __name__ == "__main__":
  281.     mysql_2_mysql()
复制代码
 
总结

1,基于表级别的逻辑备份和还原,传统情况下,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
这种方式比较通用,适应于每个数据库,导出与导入效率不错,但是缺点也很明显,无法实现异构数据操作,对于复杂情况也无法实现,比如比备份或者导出一个联合查询的结果到另一种数据库中
2,MySQL中开启了innodb_file_per_table的情况下,可以直接通过flush table for EXPORT的方式备份表对应的文件到目标实例,然后通过alter table my_table import TABLESPACE;这种方式仅适应于MySQL,优点是简单快捷效率高。但是PostgreSQL和SQLServer中没有类似选项或者做法。
3,自定义代码实现,类似于ETL
这种方式最为灵活,可以跨实例,跨表结构(源表可以是一个结果集,目标是一个表),或者异构数据库之间的表级别备份/迁移,配置简单明了,如果有多个迁移对象的情况下,通过多进程/线程并发备份(迁移),灵活性和效率都有保证。缺点是需要自行做代码实现。
 

来源:https://www.cnblogs.com/wy123/p/18267491
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具