|
与库级别的备份还原这一重量级操作套路不同,单个表或者多个表的备份还原,亦或是跨实例或者异构数据库之间的数据同步,作为DBA日常处理的最多的问题之一,如何简洁,快速,高效地实现快速,是一个值得思考的问题?
同一个实例上表级别备份还原
对于源表和目标表,如果在同一个实例上,仅从SQL的层面就可以实现表级别的备份还原:
- MySQL中 :create target_table like source_table; insert into target_table select * from source_table;
- SQLServer中:select * into target_table from source_table;
- PostgreSQL中:create table target as table source_table;
正常情况下,MySQL和SQLServer都可以实现跨库操作,也就是源表和目标表可以不在同一个数据库中,PostgreSQL中无法直接实现跨库操作。
非一个实例上表级别备份还原
对于源表和目标表,如果不在同一个实例,以上方法均无法完成单表/多表的备份,可采用的方法:
- 1,基于表级别的逻辑备份和还原,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
- 2,MySQL可以直接备份表对应的文件到目标实例,前提是开启了innodb_file_per_table
- 3,自定义代码实现,类似于ETL的功能
MySQL中表文件级别备份还原
在MySQL表级别迁移/备份实现:- 源表在DB01,目标表在DB02
- USE db02;
- CREATE TABLE my_table
- (
- c1 BIGINT AUTO_INCREMENT PRIMARY key,
- c2 VARCHAR(100),
- c3 VARCHAR(100),
- c4 VARCHAR(100),
- c5 DATETIME(6)
- );
- -- 丢弃目标表的idb数据文件
- USE db02
- alter table my_table discard tablespace;
-
- -- 锁定源表防止被修改
- USE db01;
- flush table my_table for EXPORT;
- -- 然后拷贝源表src的cfg文件和ibd文件到目标表dst,命令如下:
- cp ./db01/my_table.ibd ./db02/
- -- 对原表执行解锁操作
- USE db01;
- unlock TABLES;
- -- 修改目标表的文件属主
- chown mysql.mysql my_table.ibd
- -- 对目标表执行导入
- USE db02;
- alter table my_table import TABLESPACE;
- USE db02;
- SELECT * FROM my_table LIMIT 100;
复制代码 自定义代码实现表级别同构/异构数据库之间同步
鉴于对于表的同步,并不限于同构类型的数据库,或者是简单的表到表的备份,可以是异构数据库,或者是一个联合查询结果导出/备份到其他数据库,所以自定义代码的方式可以最大限度地适应各种情况。
代码实现大概思路就是,通过配置一个源(实际上是一个查询语句source_select_sql),再配置一个目标表(实际上是一个insert语句),定义好源的数据库类型(MySQL/PostgreSQL/SQLServer),定义好目标表的数据库类型(MySQL/PostgreSQL/SQLServer),以及相关的批处理参数,通过批量从源中读取数据(表或者查询),然后批量写入目标表,即可快速高地实现表在同构/异构级别的数据同步。
以下实现了MySQL/PostgreSQL/SQLServer之间,任意两个数据库之间表级别的并发数据备份/迁移,以下demo,忽略网络传输的情况下,以MySQL单实例本从A库到B库为例,每秒可导入5000行左右。
具体代码实现- import base64
- import datetime
- import uuid
- import threading
- import multiprocessing
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
- from multiprocessing import Process
- from multiprocessing import Pool
- import psycopg2
- import pymssql
- import pymysql
- from enum import Enum
- import logging
- logger = logging.getLogger('data-sync-log')
- logger.setLevel(level=logging.INFO)
- formatter = logging.Formatter('%(asctime)s -%(name)s -%(levelname)s -%(message)s')
- # handler = logging.FileHandler("data-sync-log-{0}.txt".format(time.strftime("%Y-%m-%d-%H-%M-%S")))
- # handler.setFormatter(formatter)
- # logger.addHandler(handler)
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- logger.addHandler(console)
- class DataBaseType(Enum):
- MSSQL = 1
- POSTGRESQL = 2
- MYSQL = 3
- def get_postgre_conn(conn):
- # conn = psycopg2.connect(**conn, as_dict=True)
- try:
- conn = psycopg2.connect(**conn)
- return conn
- except Exception:
- raise
- def get_mssql_conn(conn):
- # conn = pymssql.connect(**conn, as_dict=True)
- try:
- conn = pymssql.connect(**conn)
- return conn
- except Exception:
- raise
- def get_mysql_conn(conn):
- # conn = pymssql.connect(**conn, as_dict=True)
- try:
- conn = pymysql.connect(**conn)
- return conn
- except Exception:
- raise
- def get_db_conn(db_type, conn):
- if db_type.name == 'MSSQL':
- return get_mssql_conn(conn)
- elif db_type.name == 'POSTGRESQL':
- return get_postgre_conn(conn)
- elif db_type.name == 'MYSQL':
- return get_mysql_conn(conn)
- return None
- def get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,read_page_size=50000):
- source_conn = get_db_conn(source_server_type, source_server)
- # for mssql the execute function will do nothing
- # fetchmany will only return page size result to client for pymssql and pymysql
- with source_conn.cursor() as source_cursor:
- source_cursor.execute(source_select_sql.format(last_sync_timestamp))
- while True:
- list_result = source_cursor.fetchmany(read_page_size)
- if list_result:
- yield list_result
- list_result.clear()
- else:
- break
- def get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_page_size=50000):
- source_conn = get_db_conn(source_server_type, source_server)
- v_last_sync_timestamp = last_sync_timestamp;
- list_result = []
- #for pymysql, the execute will fetch all the result from server to client one time, so paging is necessary
- with source_conn.cursor() as source_cursor:
- current_start_no = 1
- while True:
- source_cursor.execute(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
- #print(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
- result = source_cursor.fetchmany(read_page_size)
- if len(result)>0:
- current_start_no = current_start_no + 1
- for row in result:
- list_result.append(row)
- if len(list_result) == read_page_size:
- yield list_result
- list_result.clear()
- if len(list_result) > 0:
- yield list_result
- else:
- break
- def get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size=50000):
- source_conn = get_db_conn(source_server_type, source_server)
- # target_conn.set_session(autocommit=True)
- # psycopg2.extras.register_uuid()
- '''
- page_index = 0
- para_col = '(' + ''.join('%s,' for x in range(col_size))
- para_col = para_col[:-1] + ')'
- '''
- v_last_execute_timestamp = last_sync_timestamp
- v_current_timestamp = datetime.datetime.utcnow()
- list_result = []
- with source_conn.cursor(name="client_cursor") as source_cursor:
- # for postgresql type data source
- # create client cursor for postgresql database,
- # fetchmany will return all the select result from server side in postgresql, so use client cursor
- source_cursor.execute(source_select_sql.format(v_last_execute_timestamp))
- for row in source_cursor:
- list_result.append(row)
- if len(list_result) == read_batch_size:
- yield list_result
- list_result.clear()
- if len(list_result) > 0:
- yield list_result
- def get_data_from_datasource(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size):
- if source_server_type == DataBaseType.MYSQL:
- return get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
- read_batch_size)
- if source_server_type == DataBaseType.POSTGRESQL:
- return get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
- read_batch_size)
- if source_server_type == DataBaseType.MSSQL:
- return get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
- read_batch_size)
- def execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name, col_size, list_datasource, insert_batch_size):
- batch_size_index = 0
- para_col = '(' + ''.join('%s,' for x in range(col_size))
- para_col = para_col[:-1] + ')'
- conn = get_db_conn(target_server_type, target_server)
- conn.set_session(autocommit=True)
- psycopg2.extras.register_uuid()
- with conn.cursor() as cursor:
- while batch_size_index * insert_batch_size < len(list_datasource):
- try:
- star_pos = batch_size_index * insert_batch_size
- end_pos = (batch_size_index + 1) * insert_batch_size
- args_str = ','.join( cursor.mogrify(para_col, x).decode('utf-8') for x in tuple(list_datasource[star_pos: end_pos]))
- cursor.execute(target_insert_statement.format(args_str))
- logger.info(
- str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,
- len(list_datasource[
- star_pos:end_pos])))
- batch_size_index = batch_size_index + 1
- conn.commit()
- except Exception as err:
- raise err
- conn.close()
- def execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name, col_size,list_datasource, batch_size):
- batch_size_index = 0
- para_col = '(' + ''.join('%s,' for x in range(col_size))
- para_col = para_col[:-1] + ')'
- conn = get_db_conn(target_server_type, target_server)
- with conn.cursor() as cursor:
- while batch_size_index * batch_size < len(list_datasource):
- try:
- cursor.executemany(target_insert_statement, list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size])
- logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
- batch_size_index = batch_size_index + 1
- conn.commit()
- except Exception as err:
- raise err
- conn.close()
- def execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name, col_size, list_datasource, batch_size):
- batch_size_index = 0
- conn = get_db_conn(target_server_type, target_server)
- with conn.cursor() as cursor:
- while batch_size_index * batch_size < len(list_datasource):
- try:
- conn.bulk_copy(table_name = target_insert_statement, elements=list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size],column_ids=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21])
- logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
- batch_size_index = batch_size_index + 1
- conn.commit()
- except Exception as err:
- raise err
- conn.close()
- def execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size):
- if target_server_type == DataBaseType.MYSQL:
- execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name,
- col_size, list_data_source, insert_batch_size)
- elif target_server_type == DataBaseType.POSTGRESQL:
- execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,
- col_size, list_data_source, insert_batch_size)
- elif target_server_type == DataBaseType.MSSQL:
- execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name,
- col_size, list_data_source, insert_batch_size)
- def sync_data_from_source2target(source_server, source_server_type, source_select_sql,
- target_server, target_server_type, target_table_name, col_size,
- target_insert_statement,
- sync_type='FULL', insert_batch_size=5000, read_batch_size=50000):
- # target_conn = get_db_conn(target_server_type, target_server)
- # target_conn.set_session(autocommit=True)
- # psycopg2.extras.register_uuid()
- '''
- page_index = 0
- para_col = '(' + ''.join('%s,' for x in range(col_size))
- para_col = para_col[:-1] + ')'
- '''
- v_last_execute_timestamp = datetime.datetime.min;
- v_current_timestamp = datetime.datetime.utcnow()
- if sync_type == 'Delta':
- last_sync_info = get_last_sync_position(target_server, target_server_type, target_table_name)
- log_id = last_sync_info[0]
- v_last_execute_timestamp = last_sync_info[2]
- for list_data_source in get_data_from_datasource(source_server, source_server_type, source_select_sql, v_last_execute_timestamp, read_batch_size):
- execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size)
- if sync_type == 'Delta':
- update_last_sync_position(target_server, log_id, target_server_type, v_current_timestamp)
- def get_last_sync_position(conn, db_type, table_name):
- conn = get_db_conn(db_type, conn)
- with conn.cursor() as cursor:
- cursor.execute('''select id,position_type,position_value from data_sync_position_log where table_name = %s ''',
- [table_name])
- result = cursor.fetchone()
- conn.close()
- return result
- def update_last_sync_position(conn, id, db_type, position_value):
- conn = get_db_conn(db_type, conn)
- last_sync_timestamp = None
- with conn.cursor() as cursor:
- # print('update data_sync_position_log set position_value = %s, last_update = now() where id = %s ',[position_value,id])
- cursor.execute('update data_sync_position_log set position_value = %s, last_update = now() where id = %s ',
- [position_value, id])
- conn.commit()
- conn.close()
- return last_sync_timestamp
- def sync_data_from_source2target_worker(para_dict):
- '''
- process current
- :param para_dict:
- :return:
- '''
- list_future = []
- with ProcessPoolExecutor(max_workers=10) as executor:
- try:
- # 通过submit函数提交执行的函数到进程池中,submit函数立即返回,不阻塞
- for key, val in para_dict.items():
- future = executor.submit(sync_data_from_source2target,
- val['source_server'],
- val['source_server_type'],
- val['source_select_sql'],
- val['target_server'],
- val['target_server_type'],
- val['target_table_name'],
- val['col_size'],
- val['target_insert_statement'],
- val['sync_type'],
- val['insert_batch_size'],
- val['read_page_size'])
- list_future.append(future)
- except Exception as ex:
- raise Exception('startup process exception: ' + str(ex))
- try:
- # 遍历list_future的result方法,获取进程的执行结果或者异常信息
- for future in list_future:
- future.result()
- except Exception as ex:
- raise Exception('process execute exception: ' + str(ex))
- <br><br>def mysql_2_mysql():
- para_dict = {}
- sync_type = 'Full' # Full or Delta
- insert_batch_size = 5000
- read_page_size = 50000
- # mysql to mssql case
- source_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****', 'password': '******'}
- target_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****', 'password': '******'}
- para_dict['test_mytable'] = {'source_server': source_conn,
- 'source_server_type': DataBaseType.MYSQL,
- 'source_select_sql': ''' SELECT c1,c2,c3,c4,c5 FROM my_table limit {0},{1}; ''',
- 'target_table_name': 'my_table',
- 'target_server': target_conn,
- 'target_server_type': DataBaseType.MYSQL,
- #这个参数原本是想自动生成insert 语句的,直接写一个也不麻烦,这个参数废弃
- 'col_size': 7,
- 'target_insert_statement': '''INSERT INTO my_table (c1,c2,c3,c4,c5) VALUES (%s,%s,%s,%s,%s);''',
- 'insert_batch_size': insert_batch_size,
- 'read_page_size': read_page_size,
- #原本一直想做Delta,也就是差异数据同步的,但是某些情况下无法实现,比如原表上发生的update或者delete操作,无法精确地找到差异的数据,这个参数废弃
- 'sync_type': 'FULL'}
- sync_data_from_source2target_worker(para_dict)
- if __name__ == "__main__":
- mysql_2_mysql()
复制代码
总结
1,基于表级别的逻辑备份和还原,传统情况下,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
这种方式比较通用,适应于每个数据库,导出与导入效率不错,但是缺点也很明显,无法实现异构数据操作,对于复杂情况也无法实现,比如比备份或者导出一个联合查询的结果到另一种数据库中
2,MySQL中开启了innodb_file_per_table的情况下,可以直接通过flush table for EXPORT的方式备份表对应的文件到目标实例,然后通过alter table my_table import TABLESPACE;这种方式仅适应于MySQL,优点是简单快捷效率高。但是PostgreSQL和SQLServer中没有类似选项或者做法。
3,自定义代码实现,类似于ETL
这种方式最为灵活,可以跨实例,跨表结构(源表可以是一个结果集,目标是一个表),或者异构数据库之间的表级别备份/迁移,配置简单明了,如果有多个迁移对象的情况下,通过多进程/线程并发备份(迁移),灵活性和效率都有保证。缺点是需要自行做代码实现。
来源:https://www.cnblogs.com/wy123/p/18267491
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|