翼度科技»论坛 编程开发 python 查看内容

Python大批量写入数据(百万级别)的方法

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
背景

现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。

方案

方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入

代码
  1.     1,先通过pandas读取所有csv数据存入列表。    2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)    3,方案二 线程内以  executemany 方法批量插入所有数据。    4,方案一 线程内使用异步事件循环遍历所有数据异步插入。     5,方案一纯属没事找事型。
复制代码
方案二
  1. import threading

  2. import pandas as pd
  3. import asyncio
  4. import time

  5. import aiomysql
  6. import pymysql

  7. data=[]
  8. error_data=[]

  9. def run(start,end):
  10.     global data
  11.     global error_data
  12.     print("start"+threading.current_thread().name)
  13.     print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
  14.     mysdb = getDb("*", *, "*", "*", "*")
  15.     cursor = mysdb.cursor()
  16.     sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  17.     cursor.executemany(sql,data[start:end])
  18.     mysdb.commit()
  19.     mysdb.close()
  20.     print("end" + threading.current_thread().name)
  21.     print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

  22. def csv_file_read_use_pd(csvFile):
  23.     csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
  24.     csv_result = csv_result.fillna(value="None")
  25.     result = csv_result.values.tolist()
  26.     return result

  27. class MyDataBase:
  28.     def __init__(self,host=None,port=None,username=None,password=None,database=None):
  29.         self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
  30.     def close(self):
  31.         self.db.close()

  32. def getDb(host,port,username,password,database):
  33.     MyDb = MyDataBase(host, port, username, password,database)
  34.     return MyDb.db

  35. def main(csvFile):
  36.     global data  #获取全局对象  csv全量数据
  37.     #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
  38.     thread_lens=20
  39.     csv_result=csv_file_read_use_pd(csvFile)
  40.     day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  41.     for item in csv_result:
  42.         item.insert(0,day)

  43.     data=csv_result
  44.     thread_exe_count_list=[]   #线程需要执行的区间
  45.     csv_lens=len(csv_result)
  46.     avg = csv_lens // thread_lens
  47.     remainder=csv_lens % thread_lens
  48.     # 0,27517  27517,55,034
  49.     nowIndex=0
  50.     for i in range(thread_lens):
  51.         temp=[nowIndex,nowIndex+avg]
  52.         nowIndex=nowIndex+avg
  53.         thread_exe_count_list.append(temp)
  54.     thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
  55.     # print(thread_exe_count_list)

  56.     #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])

  57.     for i in range(thread_lens):
  58.         sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
  59.         sub_thread.start()
  60.         sub_thread.join()
  61.         time.sleep(3)

  62. if __name__=="__main__":
  63.     #csv_file_read_use_pd("分公司箱型箱量.csv")
  64.     main("分公司箱型箱量.csv")
复制代码
方案一
  1. import threading

  2. import pandas as pd
  3. import asyncio
  4. import time

  5. import aiomysql

  6. data=[]
  7. error_data=[]

  8. async def async_basic(loop,start,end):
  9.     global data
  10.     global error_data
  11.     print("start"+threading.current_thread().name)
  12.     print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
  13.     conn = await aiomysql.connect(
  14.         host="*",
  15.         port=*,
  16.         user="*",
  17.         password="*",
  18.         db="*",
  19.         loop=loop
  20.     )
  21.     day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  22.     sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  23.     async with conn.cursor() as cursor:
  24.         for item in data[start:end]:
  25.             params=[day]
  26.             params.extend(item)
  27.             try:
  28.                 x=await cursor.execute(sql,params)
  29.                 if x==0:
  30.                     error_data.append(item)
  31.                 print(threading.current_thread().name+"   result "+str(x))
  32.             except Exception as e:
  33.                 print(e)
  34.                 error_data.append(item)
  35.                 time.sleep(10)
  36.                 pass
  37.     await conn.close()
  38.     #await conn.commit()
  39.     #关闭连接池
  40.     # pool.close()
  41.     # await pool.wait_closed()
  42.     print("end" + threading.current_thread().name)
  43.     print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

  44. def csv_file_read_use_pd(csvFile):
  45.     csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
  46.     csv_result = csv_result.fillna(value="None")
  47.     result = csv_result.values.tolist()
  48.     return result

  49. def th(start,end):
  50.     loop = asyncio.new_event_loop()
  51.     loop.run_until_complete(async_basic(loop,start,end))


  52. def main(csvFile):
  53.     global data  #获取全局对象  csv全量数据
  54.     #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
  55.     thread_lens=20
  56.     csv_result=csv_file_read_use_pd(csvFile)
  57.     data=csv_result
  58.     thread_exe_count_list=[]   #线程需要执行的区间
  59.     csv_lens=len(csv_result)
  60.     avg = csv_lens // thread_lens
  61.     remainder=csv_lens % thread_lens
  62.     # 0,27517  27517,55,034
  63.     nowIndex=0
  64.     for i in range(thread_lens):
  65.         temp=[nowIndex,nowIndex+avg]
  66.         nowIndex=nowIndex+avg
  67.         thread_exe_count_list.append(temp)
  68.     thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
  69.     print(thread_exe_count_list)

  70.     #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])

  71.     for i in range(thread_lens):
  72.         sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
  73.         sub_thread.start()
  74.         time.sleep(3)

  75. if __name__=="__main__":
  76.     #csv_file_read_use_pd("分公司箱型箱量.csv")
  77.     main("分公司箱型箱量.csv")
复制代码
总结

到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

来源:https://www.jb51.net/python/292125st5.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具