翼度科技»论坛 编程开发 python 查看内容

python multiprocessing库使用记录

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
python multiprocessing库使用记录

需求是想并行调用形式化分析工具proverif,同时发起对多个query的分析(378个)。实验室有40核心80线程的服务器(双cpu,至强gold 5218R*2)。
观察到单个命令在分析时内存占用不大,且只使用单核心执行,因此考虑同时调用多个命令同时执行分析,加快结果输出。
最底层的逻辑是调用多个命令行语句,和在命令行直接执行proverif语句类似。在python中也就是使用 os.system()函数实现命令调用。然而由于存在如下问题,需要考虑使用多进程multiprocessing库。

  • 如果使用多线程threading库,由于GIL的存在,是否会因为一个进程未执行结束而无法发起新的进程?
  • query数量很大的原因来自于多场景分析,同时对于同一场景下的query也希望可以并行推进,同时分析。
  • query数量大+场景多,得到很多结果,每条分析语句都有各自不同的位置,需要生成大量的命令。
  • 每条query执行完成后会给出分析结果。虽然分析结果会以html文件的形式输出到指定结果文件夹,但是不能对分析结果做统一的分析,仍旧需要逐个阅读。希望能在输出后即时统计,原有输出不变,还能给出分析结果表。
  • 尽管proverif在分析上速度已经很好了,但是仍然有62条query在30000秒(8.3h)后未给出结果。希望能够统计每一条query的运行时间并记录,并能够提供当前仍在执行的query数量。

    • 进一步的,设置最高分析时长上限(如48h),若超出上限则终止分析。
    • 对于一些可达性查询(reachability,实现方法是:在实体执行最后,在公开信道上发送执行完成标记,检查攻击者是否检验实体代码是否正确,以及攻击者是否能够阻止合法实体正常执行程序(如何做?)),会出现构建攻击路径很慢的情况。但是实际上已经给出了goal reachable的结果。对于这种其实无需浪费更多时间,可以把reachability的query添加 set reconstructTrace = false .以提前结束。
    • 对于数量监控,需要多进程读写共享变量;对于运行时间记录,需要多进程读写同一个文件。

mutliprocessing库使用

主要使用multiprocessing.Pool()来创建进程池,当前python进程会创建新的python进程用于执行函数。(win下是子进程,linux下是fork)
由于存在操作系统上的差异,请使用if __name__ == '__main__':来编写主函数,否则可能出现问题。主函数内容如下。
  1. query_num = multiprocessing.Value('i', 0)
  2. def long_time_task(c, ):
  3.     start = time.time()
  4.     os.system(c)
  5.     end = time.time()
  6.     # task_name=...
  7.     with query_num.get_lock():
  8.         query_num.value -= 1
  9.         print('Task %s runs %0.2f seconds. ' % (task_name, (end - start)) + str(query_num.value) + ' left.')
  10.     return 'Task %s runs %0.2f seconds.' % (task_name, (end - start))
  11. def call_back(s):
  12.     with open('/home/dell/proverif/DDS/time.txt', "a+") as file:
  13.         file.writelines(s + '\n')
  14.         
  15. if __name__ == '__main__':
  16.     query_list = extract(path_query, 'query', '.')
  17.     query_file_path_list = query_file(query_list)
  18.     whole_cS = compromise_Scenarios(path_compromise, path_process_whole, work_path)
  19.     MAC_cS = compromise_Scenarios(path_compromise, path_process_MAC, work_path)
  20.     cmd = []
  21.     cmd += (pv_cmd(query_file_path_list, whole_cS, path_result))
  22.     cmd += (pv_cmd(query_file_path_list, MAC_cS, path_result))
  23.     p = Pool(len(cmd))
  24.     query_num.value = len(cmd)
  25.     # for i in cmd:
  26.     #     p.apply_async(long_time_task, args=(i,), callback=call_back)
  27.     results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]
  28.     print('Waiting for all subprocesses done...')
  29.     output = [result.get(timeout=24*60*60) for result in results]
  30.     # p.close()
  31.     # p.join()
  32.     print('All subprocesses done.')
  33.    
复制代码
主函数前7行为文本处理,其内容不细表。
第8行p = Pool(len(cmd))创建了进程池,其长度为cmd的个数,也就是我们要同时发起这么多个进程。接下来注释掉的循环是常规的多进程发起办法,即使用apply_async函数执行我们要的函数。args是long_time_task的参数,由于需要为Iterable且只有一个参数,因此以元组形式传入。
call_back参数为回调函数,这里很像go语言下的defer,会在函数执行后再执行。回调函数接受long_time_task的返回值作为参数,我们使用这个机制实现多进程写文件。long_time_task在返回后会受到进程池p的调度,依次执行写文件操作,因此避免了同时写引起冲突。
对于剩余的query数量,使用全局变量query_num = multiprocessing.Value('i', 0),这样的变量具有锁,可以供多进程读写。每个query在完成后会将数量减一,输出时间和剩余数量。使用with query_num.get_lock():获得锁,避免读写冲突,并在使用完成后自动释放。
这已经满足了基本需求。还有一个定时终止的功能有待实现。接下来再介绍我不断修改的思路。
多进程定时终止

单进程定时终止
  1. process = multiprocessing.Process(target=long_time_task)
  2. # 启动进程
  3. process.start()
  4. # 设置运行时长上限(48小时)
  5. timeout = 48 * 60 * 60  # 以秒为单位
  6. # 创建定时器,在指定时间后终止进程
  7. timer = multiprocessing.Timer(timeout, process.terminate)
  8. timer.start()
  9. # 等待进程结束
  10. process.join()
复制代码
使用定时器的办法,在一定时间后调用我们创建进程的process.terminate()方法结束进程。但我们需要多进程并行。
多进程定时终止
  1. pool = multiprocessing.Pool()
  2. # 准备要执行函数的参数列表
  3. inputs = [1, 2, 3, 4, 5]
  4. # 执行函数,并设置最大运行时长为30秒
  5. result = pool.map_async(long_time_task, inputs)
  6. # 获取结果,最多等待30秒
  7. output = result.get(timeout=48 * 60 * 60)
复制代码
map_async方法可以将函数应用于可迭代的参数列表,并返回一个AsyncResult对象,可以使用该对象的get方法获取结果。map_async方法将任务提交给进程池后会立即返回,并不会等待所有任务执行完成。如果在get方法获取结果时,其中某些任务仍在执行,将会等待直到超时。get方法拥有timeout参数,超时后会raise TimeoutError,报错终止python程序的运行。因此如果想输出已完成的结果,有两个思路:

  • try-except捕获TimeoutError,并针对处理。
  • 对每个结果都使用get方法并设置超时时间。
列表推导式
  1. results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]
  2. print('Waiting for all subprocesses done...')
  3. output = [result.get(timeout=24*60*60) for result in results]
  4. # p.close()
  5. # p.join()
  6. print('All subprocesses done.')
复制代码
使用apply_async方法来执行函数,该方法会也会返回一个AsyncResult对象。我们将这些对象放入results数组,接着使用数组中每个元素的结果组成output数组并定义超时时间。这样就可以执行call_back函数了。output内容其实不是很重要,主要是为了使用AsyncResult对象的get方法来设置定时器。
不过这样还是需要try-except捕获TimeoutError,以处理超时未完成的query。这样做比map_async好在哪里?我在使用的时候map_async似乎不能成功调用回调函数,还有待试验。此外,该方法并不能在设定时间时准时停下,例如我设置时间5s,则会在约12秒时才停止。
还有一个问题是,在pycharm里运行脚本时,会有部分进程无法结束。暂不清楚其原因,也不确定命令行下执行脚本是否存在同样的问题。
与Go相比


显著的感觉到python在处理多进程、多线程、并发等问题上有一定的弱点。虽然能够通过一系列操作实现,但是做起来比较吃力,也不算太优雅。现在的脚本已经可以并行分析了。然而在任务管理器中,除了看到了378个proverif进程,还看到了378个sh和378个python
来源:https://www.cnblogs.com/biing/p/17535482.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具