Python:多进程同步共享全局变量(锁,计数器,原子布尔)

您所在的位置:网站首页 python多进程并发 Python:多进程同步共享全局变量(锁,计数器,原子布尔)

Python:多进程同步共享全局变量(锁,计数器,原子布尔)

2023-08-18 06:59| 来源: 网络整理| 查看: 265

摘要:Python,多进程

多进程变量同步的场景和方法

场景:在使用Python多进程并行时需要在进程间共享变量,这些共享的变量可以更好地控制和把握任务执行的情况,比如查看任务进度,提前停止任务等 方法:在多线程中变量共享在主线程中定义变量,在每个子线程中使用global关键字拿到变量,再配合threading.RLock()在对变量操作时拿到和释放锁(acquire和release)即可,但是在多进程中,变量是放在不同子进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了Array,Manager,Value类来定义共享变量,能够实现进程间共享数字,字符串,列表,字典,实例对象的变量共享

共享整数变量

对于整数的多进程共享是常用的场景,比如使用多进程并行任务,需要记录执行日志记录任务进度,实例代码如下

import multiprocessing from multiprocessing import Pool, Lock, Value from utils.logger_utils import logging_ LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log")) lock = Lock() Counter = Value('i', 0) ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()])) TOTAL = len(ENT_LIST) def get_one_res(data): global TOTAL, lock, Counter res = {} try: ent_name = data res = get_feature(ent_name, PREDICT_DATE) res["updatedate"] = PREDICT_DATE res["uid"] = get_md5(formatted_ent(ent_name)) except Exception as e: LOGGER.error(data + ":错误:" + e.args[0]) finally: with lock: Counter.value += 1 LOGGER.info("执行完成:(%d / %d) 进程号: %d --------------- %s", Counter.value, TOTAL, os.getpid(), data) return res if __name__ == '__main__': pool = Pool(int(get_string("process_num"))) res = pool.map(get_one_res, ENT_LIST) LOGGER.info("全部执行完成,关闭进程池") pool.close() pool.join()

运行查看执行日志

2021-11-18 15:19:15 [predict_main] INFO [42] 执行完成:(1 / 1400) 进程号: 15 --------------- 深圳顺亚投资有限公司 2021-11-18 15:19:16 [predict_main] INFO [42] 执行完成:(2 / 1400) 进程号: 25 --------------- 芜湖新扬投资合伙企业(有限合伙) 2021-11-18 15:19:18 [predict_main] INFO [42] 执行完成:(3 / 1400) 进程号: 24 --------------- 保定隆瑞房地产开发有限公司 2021-11-18 15:19:19 [predict_main] INFO [42] 执行完成:(4 / 1400) 进程号: 11 --------------- 云南俊发凯丰房地产开发有限公司

在全局定义锁和计数器,Value('i', 0)代表定义的共享变量是int类型初始值是0,如果要定义double变量则使用Value('d', 0),相当于java里面的原子变量,在执行函数中调用with上下文在实行完任务后调用Counter.value += 1实现计数+1,最后在进程池中调用执行方法,每个并行的任务在执行完毕会调用锁进行计数器+1,同一时刻只有一个子进程拿到锁实现进程同步,如果不采用锁的方式,在日志中计数器会乱序,但是最终总的值相等

共享布尔变量

这种情况在全局中记录一个布尔变量,每次执行任务前拿到变量判断是否与预期一致,如果执行报错修改变量状态,多用于子进程中任务报错提前结束全部任务全部退出,代码如下

from multiprocessing import Pool, Lock, Manager from ctypes import c_bool import os lock = Lock() ERROR = Manager().Value(c_bool, False) def run(fn): global tests_count, lock, ERROR if not ERROR.value: try: print('执行任务. PID: %d ' % (os.getpid())) 1 / 0 except Exception as e: with lock: ERROR.value = True else: print("子进程报错,任务结束") if __name__ == "__main__": pool = Pool(10) # 80个任务,会运行run()80次,每次传入xrange数组一个元素 pool.map(run, list(range(80))) pool.close() pool.join()

查看执行输出

执行任务. PID: 27374 子进程报错,任务结束 子进程报错,任务结束 子进程报错,任务结束 ... Process finished with exit code 0

初始化一个共享变量为布尔类型为False,每个进程在执行前先拿到共享变量判断是否为False,是则执行任务,否则直接跳过执行。初始化布尔变量使用Manager类实例化后调用Value方法,c_bool是Ctypes下的数据类型,相关类型如下

另一种是在主进程中判断共享变量,调用map_async使得主进程不被子进程阻塞,主进程判断全局变量如果不符合预期直接退出,调用terminate终止线程池

from multiprocessing import Pool, Lock, Manager, Value from ctypes import c_bool import os import time lock = Lock() ERROR = Manager().Value(c_bool, False) COUNTER = Value('i', 0) def run(fn): global tests_count, lock, ERROR try: time.sleep(2) 1 / 0 except: with lock: ERROR.value = True finally: with lock: COUNTER.value += 1 print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid())) if __name__ == "__main__": pool = Pool(10) pool.map_async(run, list(range(80))) pool.close() print("主进程判断...") while COUNTER.value != len(list(range(80))): time.sleep(1) if ERROR.value: print("子进程报错,主进程提前退出") pool.terminate() break pool.join()

输出如下,每隔1秒中检查全局变量ERROR,如果变为True主进程终止进程池

主进程判断... 执行任务(1 / 80). PID: 4168 执行任务(2 / 80). PID: 4169 执行任务(3 / 80). PID: 4177 执行任务(4 / 80). PID: 4171 执行任务(5 / 80). PID: 4173 执行任务(6 / 80). PID: 4182 执行任务(7 / 80). PID: 4183 执行任务(8 / 80). PID: 4174 执行任务(9 / 80). PID: 4179 执行任务(10 / 80). PID: 4181 子进程报错,主进程提前退出 Process finished with exit code 0

一个实用的例子是多进程找一个列表中符合要求第一个值,如果找到则退出多进程

from multiprocessing import Pool, Lock, Manager, Value from ctypes import c_bool import os import time lock = Lock() FOUND = Manager().Value(c_bool, False) COUNTER = Value('i', 0) def run(fn): global tests_count, lock, ERROR try: time.sleep(2) res = fn + 1 if res == 10: print("结果是:{}".format(fn)) with lock: FOUND.value = True return fn except Exception as e: print(e) finally: with lock: COUNTER.value += 1 print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid())) if __name__ == "__main__": t1 = time.time() pool = Pool(10) pool.map_async(run, list(range(80))) pool.close() print("主进程判断...") while COUNTER.value != len(list(range(80))): time.sleep(1) if FOUND.value: print("已找到结果") pool.terminate() break pool.join() t2 = time.time() print(t2 - t1) 共享字典和数组变量

使用Manager近创建,Manager().dict(),Manager().list(),测试代码如下

from multiprocessing.pool import Pool from multiprocessing import Manager, Lock import time import datetime LOCK = Lock() DICT = Manager().dict() LIST = Manager().list() def job(ent): with LOCK: if len(LIST) < 5: time.sleep(1) LIST.append(ent) else: if len(LIST) and ent


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3