  • MySQL中 :create target_table like source_table; insert into target_table select * from source_table;
  • SQLServer中:select * into target_table from source_table;
  • PostgreSQL中:create table target as table source_table;


  • 1,基于表级别的逻辑备份和还原,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
  • 2,MySQL可以直接备份表对应的文件到目标实例,前提是开启了innodb_file_per_table
  • 3,自定义代码实现,类似于ETL的功能

  1. 源表在DB01,目标表在DB02
  2. USE db02;
  3. CREATE TABLE my_table
  4. (
  6.     c2 VARCHAR(100),
  7.     c3 VARCHAR(100),
  8.     c4 VARCHAR(100),
  9.     c5 DATETIME(6)
  10. );
  11. -- 丢弃目标表的idb数据文件
  12. USE db02
  13. alter table my_table discard tablespace;   
  14. -- 锁定源表防止被修改
  15. USE db01;
  16. flush table my_table for EXPORT;
  17. -- 然后拷贝源表src的cfg文件和ibd文件到目标表dst,命令如下:
  18. cp ./db01/my_table.ibd ./db02/
  19. -- 对原表执行解锁操作
  20. USE db01;
  21. unlock TABLES;
  22. -- 修改目标表的文件属主
  23. chown mysql.mysql my_table.ibd
  24. -- 对目标表执行导入
  25. USE db02;
  26. alter table my_table import TABLESPACE;
  27. USE db02;
  28. SELECT * FROM my_table LIMIT 100;


  1. import base64
  2. import datetime
  3. import uuid
  4. import threading
  5. import multiprocessing
  6. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  7. from multiprocessing import Process
  8. from multiprocessing import Pool
  9. import psycopg2
  10. import pymssql
  11. import pymysql
  12. from enum import Enum
  13. import logging
  14. logger = logging.getLogger('data-sync-log')
  15. logger.setLevel(level=logging.INFO)
  16. formatter = logging.Formatter('%(asctime)s -%(name)s -%(levelname)s -%(message)s')
  17. # handler = logging.FileHandler("data-sync-log-{0}.txt".format(time.strftime("%Y-%m-%d-%H-%M-%S")))
  18. # handler.setFormatter(formatter)
  19. # logger.addHandler(handler)
  20. console = logging.StreamHandler()  
  21. console.setLevel(logging.INFO)
  22. logger.addHandler(console)
  23. class DataBaseType(Enum):
  24.     MSSQL = 1
  25.     POSTGRESQL = 2
  26.     MYSQL = 3
  27. def get_postgre_conn(conn):
  28.     # conn = psycopg2.connect(**conn, as_dict=True)
  29.     try:
  30.         conn = psycopg2.connect(**conn)
  31.         return conn
  32.     except Exception:
  33.         raise
  34. def get_mssql_conn(conn):
  35.     # conn = pymssql.connect(**conn, as_dict=True)
  36.     try:
  37.         conn = pymssql.connect(**conn)
  38.         return conn
  39.     except Exception:
  40.         raise
  41. def get_mysql_conn(conn):
  42.     # conn = pymssql.connect(**conn, as_dict=True)
  43.     try:
  44.         conn = pymysql.connect(**conn)
  45.         return conn
  46.     except Exception:
  47.         raise
  48. def get_db_conn(db_type, conn):
  49.     if db_type.name == 'MSSQL':
  50.         return get_mssql_conn(conn)
  51.     elif db_type.name == 'POSTGRESQL':
  52.         return get_postgre_conn(conn)
  53.     elif db_type.name == 'MYSQL':
  54.         return get_mysql_conn(conn)
  55.     return None
  56. def get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,read_page_size=50000):
  57.     source_conn = get_db_conn(source_server_type, source_server)
  58.     # for mssql the execute function will do nothing
  59.     # fetchmany will only return page size result to client for pymssql and pymysql
  60.     with source_conn.cursor() as source_cursor:
  61.         source_cursor.execute(source_select_sql.format(last_sync_timestamp))
  62.         while True:
  63.             list_result = source_cursor.fetchmany(read_page_size)
  64.             if list_result:
  65.                 yield list_result
  66.                 list_result.clear()
  67.             else:
  68.                 break
  69. def get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_page_size=50000):
  70.     source_conn = get_db_conn(source_server_type, source_server)
  71.     v_last_sync_timestamp = last_sync_timestamp;
  72.     list_result = []
  73.     #for pymysql, the execute will fetch all the result from server to client one time, so paging is necessary
  74.     with source_conn.cursor() as source_cursor:
  75.         current_start_no = 1
  76.         while True:
  77.             source_cursor.execute(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
  78.             #print(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
  79.             result = source_cursor.fetchmany(read_page_size)
  80.             if len(result)>0:
  81.                 current_start_no = current_start_no + 1
  82.                 for row in result:
  83.                     list_result.append(row)
  84.                     if len(list_result) == read_page_size:
  85.                         yield list_result
  86.                         list_result.clear()
  87.                 if len(list_result) > 0:
  88.                     yield list_result
  89.             else:
  90.                 break
  91. def get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size=50000):
  92.     source_conn = get_db_conn(source_server_type, source_server)
  93.     # target_conn.set_session(autocommit=True)
  94.     # psycopg2.extras.register_uuid()
  95.     '''
  96.     page_index = 0
  97.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  98.     para_col = para_col[:-1] + ')'
  99.     '''
  100.     v_last_execute_timestamp = last_sync_timestamp
  101.     v_current_timestamp = datetime.datetime.utcnow()
  102.     list_result = []
  103.     with source_conn.cursor(name="client_cursor") as source_cursor:
  104.         # for postgresql type data source
  105.         # create client cursor for postgresql database,
  106.         # fetchmany will return all the select result from server side in postgresql, so use client cursor
  107.         source_cursor.execute(source_select_sql.format(v_last_execute_timestamp))
  108.         for row in source_cursor:
  109.             list_result.append(row)
  110.             if len(list_result) == read_batch_size:
  111.                 yield list_result
  112.                 list_result.clear()
  113.         if len(list_result) > 0:
  114.             yield list_result
  115. def get_data_from_datasource(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size):
  116.     if source_server_type == DataBaseType.MYSQL:
  117.         return get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  118.                                    read_batch_size)
  119.     if source_server_type == DataBaseType.POSTGRESQL:
  120.         return get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  121.                                         read_batch_size)
  122.     if source_server_type == DataBaseType.MSSQL:
  123.         return get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
  124.                                    read_batch_size)
  125. def execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,  col_size, list_datasource, insert_batch_size):
  126.     batch_size_index = 0
  127.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  128.     para_col = para_col[:-1] + ')'
  129.     conn = get_db_conn(target_server_type, target_server)
  130.     conn.set_session(autocommit=True)
  131.     psycopg2.extras.register_uuid()
  132.     with conn.cursor() as cursor:
  133.         while batch_size_index * insert_batch_size < len(list_datasource):
  134.             try:
  135.                 star_pos = batch_size_index * insert_batch_size
  136.                 end_pos = (batch_size_index + 1) * insert_batch_size
  137.                 args_str = ','.join( cursor.mogrify(para_col, x).decode('utf-8') for x in tuple(list_datasource[star_pos: end_pos]))
  138.                 cursor.execute(target_insert_statement.format(args_str))
  139.                 logger.info(
  140.                     str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,
  141.                                                                                                 len(list_datasource[
  142.                                                                                                     star_pos:end_pos])))
  143.                 batch_size_index = batch_size_index + 1
  144.                 conn.commit()
  145.             except Exception as err:
  146.                 raise err
  147.     conn.close()
  148. def execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name, col_size,list_datasource, batch_size):
  149.     batch_size_index = 0
  150.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  151.     para_col = para_col[:-1] + ')'
  152.     conn = get_db_conn(target_server_type, target_server)
  153.     with conn.cursor() as cursor:
  154.         while batch_size_index * batch_size < len(list_datasource):
  155.             try:
  156.                 cursor.executemany(target_insert_statement, list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size])
  157.                 logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
  158.                 batch_size_index = batch_size_index + 1
  159.                 conn.commit()
  160.             except Exception as err:
  161.                 raise err
  162.     conn.close()
  163. def execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name, col_size, list_datasource, batch_size):
  164.     batch_size_index = 0
  165.     conn = get_db_conn(target_server_type, target_server)
  166.     with conn.cursor() as cursor:
  167.         while batch_size_index * batch_size < len(list_datasource):
  168.             try:
  169.                 conn.bulk_copy(table_name = target_insert_statement, elements=list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size],column_ids=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21])
  170.                 logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
  171.                 batch_size_index = batch_size_index + 1
  172.                 conn.commit()
  173.             except Exception as err:
  174.                 raise err
  175.     conn.close()
  176. def execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size):
  177.     if target_server_type == DataBaseType.MYSQL:
  178.         execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name,
  179.                                    col_size, list_data_source, insert_batch_size)
  180.     elif target_server_type == DataBaseType.POSTGRESQL:
  181.         execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,
  182.                                         col_size, list_data_source, insert_batch_size)
  183.     elif target_server_type == DataBaseType.MSSQL:
  184.         execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name,
  185.                                    col_size, list_data_source, insert_batch_size)
  186. def sync_data_from_source2target(source_server, source_server_type, source_select_sql,
  187.                                  target_server, target_server_type, target_table_name, col_size,
  188.                                  target_insert_statement,
  189.                                  sync_type='FULL', insert_batch_size=5000, read_batch_size=50000):
  190.     # target_conn = get_db_conn(target_server_type, target_server)
  191.     # target_conn.set_session(autocommit=True)
  192.     # psycopg2.extras.register_uuid()
  193.     '''
  194.     page_index = 0
  195.     para_col = '(' + ''.join('%s,' for x in range(col_size))
  196.     para_col = para_col[:-1] + ')'
  197.     '''
  198.     v_last_execute_timestamp = datetime.datetime.min;
  199.     v_current_timestamp = datetime.datetime.utcnow()
  200.     if sync_type == 'Delta':
  201.         last_sync_info = get_last_sync_position(target_server, target_server_type, target_table_name)
  202.         log_id = last_sync_info[0]
  203.         v_last_execute_timestamp = last_sync_info[2]
  204.     for list_data_source in get_data_from_datasource(source_server, source_server_type, source_select_sql,  v_last_execute_timestamp, read_batch_size):
  205.         execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size)
  206.     if sync_type == 'Delta':
  207.         update_last_sync_position(target_server, log_id, target_server_type, v_current_timestamp)
  208. def get_last_sync_position(conn, db_type, table_name):
  209.     conn = get_db_conn(db_type, conn)
  210.     with conn.cursor() as cursor:
  211.         cursor.execute('''select id,position_type,position_value from data_sync_position_log where table_name = %s ''',
  212.                        [table_name])
  213.         result = cursor.fetchone()
  214.     conn.close()
  215.     return result
  216. def update_last_sync_position(conn, id, db_type, position_value):
  217.     conn = get_db_conn(db_type, conn)
  218.     last_sync_timestamp = None
  219.     with conn.cursor() as cursor:
  220.         # print('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',[position_value,id])
  221.         cursor.execute('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',
  222.                        [position_value, id])
  223.     conn.commit()
  224.     conn.close()
  225.     return last_sync_timestamp
  226. def sync_data_from_source2target_worker(para_dict):
  227.     '''
  228.     process current
  229.     :param para_dict:
  230.     :return:
  231.     '''
  232.     list_future = []
  233.     with ProcessPoolExecutor(max_workers=10) as executor:
  234.         try:
  235.             # 通过submit函数提交执行的函数到进程池中,submit函数立即返回,不阻塞
  236.             for key, val in para_dict.items():
  237.                 future = executor.submit(sync_data_from_source2target,
  238.                                          val['source_server'],
  239.                                          val['source_server_type'],
  240.                                          val['source_select_sql'],
  241.                                          val['target_server'],
  242.                                          val['target_server_type'],
  243.                                          val['target_table_name'],
  244.                                          val['col_size'],
  245.                                          val['target_insert_statement'],
  246.                                          val['sync_type'],
  247.                                          val['insert_batch_size'],
  248.                                          val['read_page_size'])
  249.                 list_future.append(future)
  250.         except Exception as ex:
  251.             raise Exception('startup process exception: ' + str(ex))
  252.         try:
  253.             # 遍历list_future的result方法,获取进程的执行结果或者异常信息
  254.             for future in list_future:
  255.                 future.result()
  256.         except Exception as ex:
  257.             raise Exception('process execute exception: ' + str(ex))
  258. <br><br>def mysql_2_mysql():
  259.     para_dict = {}
  260.     sync_type = 'Full'  # Full or Delta
  261.     insert_batch_size = 5000
  262.     read_page_size = 50000
  263.     # mysql to mssql case
  264.     source_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****',  'password': '******'}
  265.     target_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****',  'password': '******'}
  266.     para_dict['test_mytable'] = {'source_server': source_conn,
  267.                                  'source_server_type': DataBaseType.MYSQL,
  268.                                  'source_select_sql': ''' SELECT c1,c2,c3,c4,c5 FROM my_table limit {0},{1}; ''',
  269.                                  'target_table_name': 'my_table',
  270.                                  'target_server': target_conn,
  271.                                  'target_server_type': DataBaseType.MYSQL,
  272.                                  #这个参数原本是想自动生成insert 语句的,直接写一个也不麻烦,这个参数废弃
  273.                                  'col_size': 7,
  274.                                  'target_insert_statement': '''INSERT INTO my_table (c1,c2,c3,c4,c5) VALUES (%s,%s,%s,%s,%s);''',
  275.                                  'insert_batch_size': insert_batch_size,
  276.                                  'read_page_size': read_page_size,
  277.                                  #原本一直想做Delta,也就是差异数据同步的,但是某些情况下无法实现,比如原表上发生的update或者delete操作,无法精确地找到差异的数据,这个参数废弃
  278.                                  'sync_type': 'FULL'}
  279.     sync_data_from_source2target_worker(para_dict)
  280. if __name__ == "__main__":
  281.     mysql_2_mysql()

2,MySQL中开启了innodb_file_per_table的情况下,可以直接通过flush table for EXPORT的方式备份表对应的文件到目标实例,然后通过alter table my_table import TABLESPACE;这种方式仅适应于MySQL,优点是简单快捷效率高。但是PostgreSQL和SQLServer中没有类似选项或者做法。

