|
背景
现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案
方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码
- 1,先通过pandas读取所有csv数据存入列表。 2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程) 3,方案二 线程内以 executemany 方法批量插入所有数据。 4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 5,方案一纯属没事找事型。
复制代码 方案二
- import threading
- import pandas as pd
- import asyncio
- import time
- import aiomysql
- import pymysql
- data=[]
- error_data=[]
- def run(start,end):
- global data
- global error_data
- print("start"+threading.current_thread().name)
- print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
- mysdb = getDb("*", *, "*", "*", "*")
- cursor = mysdb.cursor()
- sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- cursor.executemany(sql,data[start:end])
- mysdb.commit()
- mysdb.close()
- print("end" + threading.current_thread().name)
- print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
- def csv_file_read_use_pd(csvFile):
- csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
- csv_result = csv_result.fillna(value="None")
- result = csv_result.values.tolist()
- return result
- class MyDataBase:
- def __init__(self,host=None,port=None,username=None,password=None,database=None):
- self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
- def close(self):
- self.db.close()
- def getDb(host,port,username,password,database):
- MyDb = MyDataBase(host, port, username, password,database)
- return MyDb.db
- def main(csvFile):
- global data #获取全局对象 csv全量数据
- #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行
- thread_lens=20
- csv_result=csv_file_read_use_pd(csvFile)
- day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
- for item in csv_result:
- item.insert(0,day)
- data=csv_result
- thread_exe_count_list=[] #线程需要执行的区间
- csv_lens=len(csv_result)
- avg = csv_lens // thread_lens
- remainder=csv_lens % thread_lens
- # 0,27517 27517,55,034
- nowIndex=0
- for i in range(thread_lens):
- temp=[nowIndex,nowIndex+avg]
- nowIndex=nowIndex+avg
- thread_exe_count_list.append(temp)
- thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程
- # print(thread_exe_count_list)
- #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
- for i in range(thread_lens):
- sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
- sub_thread.start()
- sub_thread.join()
- time.sleep(3)
- if __name__=="__main__":
- #csv_file_read_use_pd("分公司箱型箱量.csv")
- main("分公司箱型箱量.csv")
复制代码 方案一
- import threading
- import pandas as pd
- import asyncio
- import time
- import aiomysql
- data=[]
- error_data=[]
- async def async_basic(loop,start,end):
- global data
- global error_data
- print("start"+threading.current_thread().name)
- print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
- conn = await aiomysql.connect(
- host="*",
- port=*,
- user="*",
- password="*",
- db="*",
- loop=loop
- )
- day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
- sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- async with conn.cursor() as cursor:
- for item in data[start:end]:
- params=[day]
- params.extend(item)
- try:
- x=await cursor.execute(sql,params)
- if x==0:
- error_data.append(item)
- print(threading.current_thread().name+" result "+str(x))
- except Exception as e:
- print(e)
- error_data.append(item)
- time.sleep(10)
- pass
- await conn.close()
- #await conn.commit()
- #关闭连接池
- # pool.close()
- # await pool.wait_closed()
- print("end" + threading.current_thread().name)
- print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
- def csv_file_read_use_pd(csvFile):
- csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
- csv_result = csv_result.fillna(value="None")
- result = csv_result.values.tolist()
- return result
- def th(start,end):
- loop = asyncio.new_event_loop()
- loop.run_until_complete(async_basic(loop,start,end))
- def main(csvFile):
- global data #获取全局对象 csv全量数据
- #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行
- thread_lens=20
- csv_result=csv_file_read_use_pd(csvFile)
- data=csv_result
- thread_exe_count_list=[] #线程需要执行的区间
- csv_lens=len(csv_result)
- avg = csv_lens // thread_lens
- remainder=csv_lens % thread_lens
- # 0,27517 27517,55,034
- nowIndex=0
- for i in range(thread_lens):
- temp=[nowIndex,nowIndex+avg]
- nowIndex=nowIndex+avg
- thread_exe_count_list.append(temp)
- thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程
- print(thread_exe_count_list)
- #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
- for i in range(thread_lens):
- sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
- sub_thread.start()
- time.sleep(3)
- if __name__=="__main__":
- #csv_file_read_use_pd("分公司箱型箱量.csv")
- main("分公司箱型箱量.csv")
复制代码 总结
到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
来源:https://www.jb51.net/python/292125st5.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|