python多进程和多线程 python的多进程和多线程

您所在的位置:网站首页 邓文捷图片 python多进程和多线程 python的多进程和多线程

python多进程和多线程 python的多进程和多线程

2023-06-28 00:30| 来源: 网络整理| 查看: 265

python多线程和多进程进程和线程多线程线程的基本状态自定义线程执行类守护线程join()方法主线程等待子线程结束线程访问全局变量线程的安全问题同步互斥锁用锁解决买票的问题上锁过程定时调度线程操作队列生产者消费者进程方法说明Process创建的实例对象的常用属性:创建进程自定义进程处理类进程控制join()进程中断 terminate()fork创建子进程进程间不能共享全局变量线程和进程的异同功能区别优缺点进程池Pool进程队列进程间通信-Queue

进程和线程

进程:是正在运行的程序 是系统进行资源分配和调用的独立单位 每一个进程都有它自己的内存空间和系统资源

线程:是进程中的单个顺序控制流,是一条执行路径 线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位 单线程:一个进程如果只有一条执行路径,则称为单线程程序 多线程:一个进程如果有多条执行路径,则称为多线程程序

Python 提供多线程编程的模块有以下几个: _thread threading Queue multiprocessing

多线程_thread 模块提供了低级别的基本功能来支持多线程功能,提供简单的锁来确保同步,推荐使用 threading 模块。threading 模块对 _thread 进行了封装,提供了更高级别,功能更强,更易于使用的线程管理的功能,对线程的支持更为完善,绝大多数情况下,只需要使用 threading 这个高级模块就够了线程的基本状态

python多进程和多线程 python的多进程和多线程_python

threading的方法

方法

说明

init(self,group=None,target=None,name=None,args=(),kwargs=None,*,daemon=None)

创建一个线程对象,参数作用见下面

start()

启动线程活动

run()

线程操作主体,若没有target处理函数,则执行此方法

join(timeout=None)

线程强制执行

setName()

设置线程名

getName()

返回线程名

isAlive()

判断线程是否存活

setDaemon(daemonic)

设置是否为守护线程

__init__参数作用如下

group:定义分组 target:线程处理对象 name:线程名称,若不设置,则自动分配一个名称 args:线程处理对象所需要执行参数 kwargs:调用字典对象 daemon:是否设置为后台线程(守护线程)

注意run()方法不能直接启动进程,进程的启动必须要使用start()而star()方法会自动调用run()方法

import threading def test(): for i in range(101): print(threading.current_thread().getName()+' : {}'.format(i)) t1=threading.Thread(target=test,name='高铁') t2=threading.Thread(target=test,name='飞机') t1.start() t2.start()

如果要传递带有参数的

import threading def test(n): for i in range(n): print(threading.current_thread().getName()+' : {}'.format(i)) #如果要传递带有参数数的,必须是元组 t1=threading.Thread(target=test,name='高铁',args=(100,)) t2=threading.Thread(target=test,name='飞机',args=(100,)) t1.start() t2.start()自定义线程执行类import threading class Mythread(threading.Thread): def __init__(self,name): super(Mythread, self).__init__() self.name=name def run(self): for i in range(101): print(threading.current_thread().getName() + ' : {}'.format(i)) t1=Mythread('高铁') t2=Mythread('飞机') t1.start() t2.start()守护线程

这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。

所谓’线程守护’,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。

关于这个方法也举一个形象的例子,这里有三个线程,刘备(主线程),关羽,张飞,把关羽和张飞设置为守护线程,当刘备死了,也就是主进程结束了,那么关羽和张飞也会马上死亡,因为当年他们桃园三结义的时候,说了不求同年同月同日生,但求同年同月同日死

import threading def test(n): for i in range(n): print(threading.current_thread().getName()+' : {}'.format(i)) #如果要传递带有参数数的,必须是元组 t1=threading.Thread(target=test,name='关羽',args=(100,),daemon=True) t2=threading.Thread(target=test,name='张飞',args=(100,),daemon=True) #也可以调用setDaemon()设置守护线程 #把子线程设置为守护线程,必须在start()之前设置 threading.current_thread().setName('刘备') for i in range(11): print(threading.current_thread().getName() + ' : {}'.format(i)) t1.start() t2.start()

注意:设置守护线程一定要在start()前面

join()方法

线程强制执行 join方法可以用这个形象的例子来理解,例如我现在有三个线程,名字分别为康熙,八阿哥,和四阿哥,他们3争夺皇位,但是康熙是他们爸爸啊,所以,只有康熙驾崩了,八阿哥和四阿哥才能争夺皇位,所以就给康熙添加join方法,只有等康熙这个线程结束了,八阿哥和四阿哥这两个线程才会开始

import threading def test(n): for i in range(n): print(threading.current_thread().getName()+' : {}'.format(i)) #如果要传递带有参数数的,必须是元组 t1=threading.Thread(target=test,name='康熙',args=(100,)) t2=threading.Thread(target=test,name='四阿哥',args=(100,)) t3=threading.Thread(target=test,name='八阿哥',args=(100,)) t1.start() #必须线程启动后在调用join方法 t1.join() t2.start() t3.start()

注意一定要线程先启动了才能调用join()方法

主线程等待子线程结束

为了让守护线程执行结束之后,主线程再结束,我们可以使用join方法,让主线程等待子线程执行

import threading def test(n): for i in range(n): print(threading.current_thread().getName()+' : {}'.format(i)) #如果要传递带有参数数的,必须是元组 t1=threading.Thread(target=test,name='关羽',args=(100,),daemon=True) t2=threading.Thread(target=test,name='张飞',args=(100,),daemon=True) #也可以调用setDaemon()设置守护线程 threading.current_thread().setName('刘备') for i in range(11): print(threading.current_thread().getName() + ' : {}'.format(i)) t1.start() t1.join()#设置主线程等待子线程结束 t2.start()线程访问全局变量import threading num=0 def test(): global num num+=1 print(num) t1=threading.Thread(target=test) t2=threading.Thread(target=test) t1.start() t2.start() #1 #2

在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据。缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)

线程的安全问题

买票的例子

import threading,time ticket=100 def test(): global ticket while True: if ticket > 0: time.sleep(0.4) ticket -= 1 print('{}卖了一张票,还剩{}'.format(threading.current_thread().name, ticket)) else: print('{}票卖完了'.format(threading.current_thread().name)) break t1=threading.Thread(target=test,name='窗口一') t2=threading.Thread(target=test,name='窗口二') t1.start() t2.start()

卖票出现了问题 相同的票出现了多次 出现了负数的票

问题产生原因 线程执行的随机性导致的

同步

当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制。同步就是协同步调,按预定的先后次序进行运行。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁

互斥锁

互斥锁为资源引入一个状态:锁定/非锁定

某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

threading模块中定义了Lock类,可以方便的处理锁定:

# 创建锁 mylock = threading.Lock() # 锁定 mylock .acquire() # 释放 mylock .release()

注意: 如果这个锁之前是没有上锁的,那么acquire不会堵塞 如果在调用acquire对这个锁上锁之前 它已经被 其他线程上了锁,那么此时acquire会堵塞,直到这个锁被解锁为止。

和文件操作一样,Lock也可以使用with语句快速的实现打开和关闭操作

用锁解决买票的问题import threading,time ticket=100 my_lock = threading.Lock() def test(): global ticket while True: my_lock.acquire() if ticket > 0: time.sleep(0.5) ticket -= 1 my_lock.release() print('{}卖了一张票,还剩{}'.format(threading.current_thread().name, ticket)) else: print('{}票卖完了'.format(threading.current_thread().name)) my_lock.release() break t1=threading.Thread(target=test,name='窗口一') t2=threading.Thread(target=test,name='窗口二') t1.start() t2.start()上锁过程当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。 每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变 为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之 后,锁进入“unlocked”状态。 线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运 行(running)状态

锁的好处:

确保了某段关键代码只能由一个线程从头到尾完整地执行 锁的坏处:

阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁

定时调度

定时调度是指可以根据设定的时间安排自动执行程序的任务,python提供了sched模块来实现定时调度,sched模块采用进程模式实现调度的处理

通过调用scheduler.enter(delay,priority,func,args)函数 delay 调度任务启动的延时,如果设置为0,表示立即启动 priority 多个调度任务的执行优先级,优先级越高越有可能被执行 func 设置任务调度处理函数 args 调度处理函数的参数(必须为可迭代对象)

import sched,threading,time def event_handel(schedule): print('时间为:{}'.format(time.time())) schedule.enter(delay=1,priority=0,action=event_handel,argument=(schedule,)) schedule=sched.scheduler() schedule.enter(delay=1,priority=0,action=event_handel,argument=(schedule,)) schedule.run()

注意每当使用emter()方法设置调度任务时,该任务会执行一次,所以需要在调度处理函数中反复使用enter()方法才可以实现任务的操作

线程操作队列

数据缓冲区的实现可以依靠queue模块实现,此模块提供了3种线程同步队列的定义 queue.Queue :先进先出(FIFO)同步队列 queue.LifoQueue :后进先出同步队列 queue.PriorityQueue:优先队列

Queue是一个先进先出(First In First Out)的队列,主进程中创建一个Queue对象,并作为参数传入子进程,两者之间通过put( )放入数据,通过get( )取出数据,执行了get( )函数之后队列中的数据会被同时删除,可以使用multiprocessing模块的Queue实现多进程之间的数据传递

方法名

说明

init(self,maxsize=0)

实例化队列并设置最大保存长度

put(item,block=True,timeout=None)

向队列保存数据

get(block=True,timeout=None)

从队列中获取数据

qsize()

返回队列大小

empty()

判断队列是否为空,队列为空返回True,否则返回false

full()

判断队列是否满,队列为空返回false,否则返回True

join()

强制等待队列为空后再执行后续操作

生产者消费者import threading,time from queue import Queue def producer(queue): for i in range(100): print('{}生产了第{}瓶牛奶'.format(threading.current_thread().getName(),i)) queue.put(i) time.sleep(0.5) return def consumer(queue): for i in range(100): val=queue.get() print('{}消费了第{}瓶牛奶'.format(threading.current_thread().getName(),i)) time.sleep(0.3) if not val: return if __name__ == '__main__': queue=Queue() t1=threading.Thread(target=consumer,name='顾客一',args=(queue,)) t2 = threading.Thread(target=consumer, name='顾客二', args=(queue,)) t3 = threading.Thread(target=producer, name='超市一', args=(queue,)) t4 = threading.Thread(target=producer, name='超市二', args=(queue,)) t1.start() t2.start() t3.start() t4.start()进程

进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

不仅可以通过线程完成多任务,进程也是可以的。

进程的状态 工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态。

就绪态:运行的条件都已经满足,正在等在cpu执行。 执行态:cpu正在执行其功能。 等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态。

方法说明

方法名

说明

Process( target [, name [, args [, kwargs]]])

创建一个进程,参数如下

start()

启动子进程实例(创建子进程)

is_alive()

判断进程子进程是否还在活着

join([timeout])

是否等待子进程执行结束,或等待多少秒

terminate()

不管任务是否完成,立即终止子进程

Process创建的实例对象的常用属性:

name:当前进程的别名,默认为Process-N,N为从1开始递增的整数 pid:当前进程的pid(进程号)

创建进程

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情

from multiprocessing import Process import time def test(): while True: print('dyk666') time.sleep(1) if __name__ == '__main__': p = Process(target=test) p.start() while True: print('666') time.sleep(2)

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动

自定义进程处理类from multiprocessing import Process import time class My_process(Process): def __init__(self): super(My_process, self).__init__() def run(self): while True: print('dyk666') time.sleep(1) if __name__ == '__main__': p = My_process() p.start() while True: print('666') time.sleep(2)进程控制join()

所有进程对象通过方法启动之后都将进入到进程等待队列,如果此时某个进程需要优先执行,则可以通过join()方法控制

from multiprocessing import Process import multiprocessing import time def test(): while True: time.sleep(5) print(multiprocessing.current_process().name+': dyk666') if __name__ == '__main__': p = Process(target=test) p.start() p.join() while True: print(multiprocessing.current_process().name+': 666') time.sleep(1) #Process-1: dyk666 #Process-1: dyk666 ... ... .进程中断 terminate()from multiprocessing import Process import multiprocessing import time def test(): while True: time.sleep(5) print(multiprocessing.current_process().name+': dyk666') if __name__ == '__main__': p = Process(target=test) p.start() if p.is_alive(): print('进程被中断') p.terminate() # 进程被中断fork创建子进程

multiprocessing提供的是一个跨平台的多进程解决方案,而linux里面提供了一个fork函数,利用此函数 可以创建子进程,fork()函数的本质就是克隆父线程,这样就会实现两个进程异步执行

python通过os.fork()函数实现fork系统函数的调用,该函数有三种返回结果, 返回值小于0 子进程创建失败 返回值等于0 在子进程中 返回值大于0 在父进程中,返回的是子进程的pid

import multiprocessing,os def child(): print('[子进程]:父进程id: {} ,子进程id{}'.format((os.getgid(),os.getppid()))) if __name__ == '__main__': new=os.fork() print('[fork]新的子进程id:{}'.format(new)) if new==0: child() elif new>0: print('父进程在执行') else: print('子进程创建失败')

注意windos里面不支持fork()函数,然后就是fork()一定不要放在循环里面创建子进程

进程间不能共享全局变量from multiprocessing import Process import time num=0 def test(): global num num+=1 print(num) if __name__ == '__main__': p1 = Process(target=test) p2 = Process(target=test) p1.start() p2.start() # 1 # 1线程和进程的异同功能

进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ。 线程,能够完成多任务,比如 一个QQ中的多个聊天窗口。

区别

一个程序至少有一个进程,一个进程至少有一个线程. 线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率 线线程不能够独立执行,必须依存在进程中 可以将进程理解为工厂中的一条流水线,而其中的线程就是这个流水线上的工人

优缺点

线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反

进程池Pool

方法名

说明

apply(self,func,args=(),kwds={})

采用阻塞模式创建进程并接收返回结果

apply_async(func[,args[,kwds[,callback]]])

采用非阻塞模式创建进程,并且接收工作函数返回结果

apply_async(self,func,args=(),kwds={})

采用非阻塞模式进行数据处理

map_async(self,func,iterable)

采用非阻塞模式进行数据处理

close()

关闭进程池,不在接收新进程

terminate()

中断进程

join()

进程强制执行

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

import time from multiprocessing import * def task(n): print('{}----->start'.format(n)) time.sleep(1) print('{}------>end'.format(n)) if __name__ == '__main__': p = Pool(8) # 创建进程池,并指定线程池的个数,默认是CPU的核数 for i in range(1, 11): p.apply(task, args=(i,)) # 同步执行任务,一个一个的执行任务,没有并发效果 #p.apply_async(task, args=(i,)) # 异步执行任务,可以达到并发效果 p.close() p.join()

同步进程池获取任务的执行结果:

def task(n): print('{}----->start'.format(n)) time.sleep(1) print('{}------>end'.format(n)) return n ** 2 if __name__ == '__main__': p = Pool(4) for i in range(1, 11): res = p.apply_async(task, args=(i,)) # res 是任务的执行结果 print(res.get()) # 直接获取结果的弊端是,多任务又变成同步的了 p.close() # p.join() 不需要再join了,因为 res.get()本身就是一个阻塞方法

异步获取线程的执行结果:

import time from multiprocessing.pool import Pool def task(n): print('{}----->start'.format(n)) time.sleep(1) print('{}------>end'.format(n)) return n ** 2 if __name__ == '__main__': p = Pool(4) res_list = [] for i in range(1, 11): res = p.apply_async(task, args=(i,)) res_list.append(res) # 使用列表来保存进程执行结果 for re in res_list: print(re.get()) p.close()进程队列

multiprocessing.Queue是多进程编程中提供的进程队列,该队列采用FIFO的形式实现不同进程间的通信,这样可以保证多个数据按顺序的发送与接收

方法名

说明

init(self,maxsize=0,*,ctx)

实例化队列并设置最大保存长度

put(item,block=True,timeout=None)

向队列保存数据,block为队列满时阻塞设置,timeout为阻塞超时时间设置

get(block=True,timeout=None)

从队列中获取数据

qsize()

返回队列大小

empty()

判断队列是否为空,队列为空返回True,否则返回false

full()

判断队列是否满,队列为空返回false,否则返回True

join()

强制等待队列为空后再执行后续操作

如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;

Queue.get_nowait():相当Queue.get(False); Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True; 1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

Queue.put_nowait(item):相当Queue.put(item, False);

进程间通信-Queue



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3