一文吃透python多线程(全面总结)

您所在的位置:网站首页 python多线程机制 一文吃透python多线程(全面总结)

一文吃透python多线程(全面总结)

2023-12-16 08:56| 来源: 网络整理| 查看: 265

目录 1 创建线程1.1 函数创建1.2 类创建 2 线程守护2.1 deamon2.2 join 3 线程锁3.1 Lock3.2 死锁3.3 Rlock 4 线程通信4.1 Condition4.2 Semaphore4.3 Event4.4 Queue 5 线程池5.1 实例5.2 as_completed5.3 map5.4 wait

1 创建线程

在Python中创建线程主要依靠内置的threading模块。

threading.current_thread():获取到当前线程。

获取线程后可以得到两个比较重要的属性:name和ident,分别是线程名称和id。

创建线程可以使用两种方法:使用函数或类创建。

1.1 函数创建

使用函数创建线程时,使用threading.Thread()函数,把线程里要执行的函数传进去。

import os import time import threading def fun(n): start = time.time() my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始运行...' % my_thread_name) time.sleep(n) my_thread_id = threading.current_thread().ident # 获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{}'.format(my_thread_name, my_thread_id, os.getpid())) print('%s线程运行结束,耗时%ds...' % (my_thread_name, time.time() - start)) t1 = time.time() # 创建3个线程 for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(i,)) t.start() main_thread_name = threading.current_thread().name # 获取当前线程名称 main_thread_id = threading.current_thread().ident # 获取当前线程id print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name, main_thread_id, os.getpid())) print("一共耗时%ds" % (time.time() - t1)) # 线程1开始运行... # 线程2开始运行... # 线程3开始运行... # 主线程为:MainThread,线程id为:8637730304,所在进程为:19493 # 一共耗时0s # 当前线程为:线程1,线程id为:13005955072,所在进程为:19493 # 线程1线程运行结束,耗时1s... # 当前线程为:线程2,线程id为:13022744576,所在进程为:19493 # 线程2线程运行结束,耗时2s... # 当前线程为:线程3,线程id为:13039534080,所在进程为:19493 # 线程3线程运行结束,耗时3s...

可以看到,我们开了三个子线程,分别执行1s,2s,3s,但是为什么我们一共耗时是0秒呢。因为创建子线程后,主线程的代码还在继续向后执行,可以看到当主线程结束后,子线程还在继续执行。后面将学习如何让主线程等待子线程。

1.2 类创建

使用类创建线程需要继承Thread,并实现run方法。

import os import time import threading class MyThread(threading.Thread): def __init__(self, n, name=None): super().__init__() self.name = name self.n = n def run(self): start = time.time() my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始运行...' % my_thread_name) time.sleep(self.n) my_thread_id = threading.current_thread().ident # 获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{}'.format(my_thread_name, my_thread_id, os.getpid())) print('%s线程运行结束,耗时%ds...' % (my_thread_name, time.time() - start)) t1 = time.time() # 创建3个线程 for i in range(1, 4): t = MyThread(name='线程%d' % i, n=i) t.start() main_thread_name = threading.current_thread().name # 获取当前线程名称 main_thread_id = threading.current_thread().ident # 获取当前线程id print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name, main_thread_id, os.getpid())) print("一共耗时%ds" % (time.time() - t1)) # 线程1开始运行... # 线程2开始运行... # 线程3开始运行... # 主线程为:MainThread,线程id为:8678927872,所在进程为:20587 # 一共耗时0s # 当前线程为:线程1,线程id为:13089185792,所在进程为:20587 # 线程1线程运行结束,耗时1s... # 当前线程为:线程2,线程id为:13105975296,所在进程为:20587 # 线程2线程运行结束,耗时2s... # 当前线程为:线程3,线程id为:13122764800,所在进程为:20587 # 线程3线程运行结束,耗时3s... 2 线程守护 2.1 deamon

Thread类有一个名为deamon的属性,标志该线程是否为守护线程,默认值为False。

当deamon值为True,即设为守护线程后,只要主线程结束了,无论子线程代码是否结束,都得跟着结束。

修改deamon的值必须在线程start()方法调用之前,否则会报错。

上面的例子可以看到,主线程结束后,子线程还在运行,直到子线程结束,下面我们使用守护线程:

import os import time import threading def fun(n): start = time.time() my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始运行...' % my_thread_name) time.sleep(n) my_thread_id = threading.current_thread().ident # 获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{}'.format(my_thread_name, my_thread_id, os.getpid())) print('%s线程运行结束,耗时%ds...' % (my_thread_name, time.time() - start)) t1 = time.time() for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(i,)) t.daemon = True t.start() main_thread_name = threading.current_thread().name # 获取当前线程名称 main_thread_id = threading.current_thread().ident # 获取当前线程id # 等待1秒,让线程1结束 time.sleep(1) print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name, main_thread_id, os.getpid())) print("一共耗时%ds" % (time.time() - t1)) # 线程1开始运行... # 线程2开始运行... # 线程3开始运行... # 当前线程为:线程1,线程id为:13072445440,所在进程为:21178 # 线程1线程运行结束,耗时1s... # 主线程为:MainThread,线程id为:8670285312,所在进程为:21178 # 一共耗时1s

可以看到,让主线程阻塞1s,只有线程1运行结束了,线程2和线程3还没运行结束就随着主线程结束而结束了。

2.2 join

设置子线程join后,主线程会阻塞等待子进程完成,再执行join后面的代码

import os import time import threading def fun(n): start = time.time() my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始运行...' % my_thread_name) time.sleep(n) my_thread_id = threading.current_thread().ident # 获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{}'.format(my_thread_name, my_thread_id, os.getpid())) print('%s线程运行结束,耗时%ds...' % (my_thread_name, time.time() - start)) t1 = time.time() t_list = [] for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(i,)) t.daemon = True t.start() t_list.append(t) for t in t_list: t.join() main_thread_name = threading.current_thread().name # 获取当前线程名称 main_thread_id = threading.current_thread().ident # 获取当前线程id time.sleep(1) print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name, main_thread_id, os.getpid())) print("一共耗时%ds" % (time.time() - t1)) # 线程1开始运行... # 线程2开始运行... # 线程3开始运行... # 当前线程为:线程1,线程id为:12953964544,所在进程为:23305 # 线程1线程运行结束,耗时1s... # 当前线程为:线程2,线程id为:12970754048,所在进程为:23305 # 线程2线程运行结束,耗时2s... # 当前线程为:线程3,线程id为:12987543552,所在进程为:23305 # 线程3线程运行结束,耗时3s... # 主线程为:MainThread,线程id为:8610835968,所在进程为:23305 # 一共耗时4s

注意join要在start之后,且不要和start在一个循环里,因为主线程遇到join就会阻塞,这样第一个循环创建线程1并start和join后,主线程开始阻塞,等线程1执行完毕才会下一个循环,才开始创建线程2,这样达不到并发的效果了。

从上面的结果可以看出,设置子线程join后,主线程就会等待子线程运行完毕,才开始执行join后面的代码,这样一共耗时4s。

3 线程锁

多线程一个很大的问题是数据不安全,因为线程之间的数据是共享的。

以银行转账为例,一个银行的总额是大家一同影响的,创建多个线程(表示多个人进行取钱)共同使用一个变量,如下:

import random import time import threading money = 1000 def fun(n): global money my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始取钱...' % my_thread_name) num = money # 开始查询银行有多少钱 num -= n # 取出钱 time.sleep(random.random()) money = num # 计算取出后银行还要多少钱 print('%s取了%d,还剩%d' % (my_thread_name, n, money)) t_list = [] for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(100,)) t.daemon = True t.start() t_list.append(t) for t in t_list: t.join() print("全部取钱结束,银行还剩%d" % money) # 线程1开始取钱... # 线程2开始取钱... # 线程3开始取钱... # 线程3取了100,还剩900 # 线程2取了100,还剩900 # 线程1取了100,还剩900 # 全部取钱结束,银行还剩900

可以看到,是那个人同时取钱时,读取的银行都是1000块,但是三个人取完后还是900,这就是因为数据共享的原因。

3.1 Lock

线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源设置一个状态:锁定和非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

import random import time import threading money = 1000 def fun(n, lock): lock.acquire() global money my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始取钱...' % my_thread_name) num = money # 开始查询银行有多少钱 num -= n # 取出钱 time.sleep(random.random()) money = num # 计算取出后银行还要多少钱 print('%s取了%d,还剩%d' % (my_thread_name, n, money)) lock.release() t_list = [] lock = threading.Lock() for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(100, lock)) t.daemon = True t.start() t_list.append(t) for t in t_list: t.join() print("全部取钱结束,银行还剩%d" % money) # 线程1开始取钱... # 线程1取了100,还剩900 # 线程2开始取钱... # 线程2取了100,还剩800 # 线程3开始取钱... # 线程3取了100,还剩700 # 全部取钱结束,银行还剩700

使用了锁之后,代码运行速度明显降低,这是因为线程由原来的并发执行变成了串行,不过数据安全性得到保证。

还可以使用with lock这种上下文格式,自动管理上锁和释放锁。

import random import time import threading money = 1000 def fun(n, lock): with lock: global money my_thread_name = threading.current_thread().name # 获取当前线程名称 print('%s开始取钱...' % my_thread_name) num = money # 开始查询银行有多少钱 num -= n # 取出钱 time.sleep(random.random()) money = num # 计算取出后银行还要多少钱 print('%s取了%d,还剩%d' % (my_thread_name, n, money)) t_list = [] lock = threading.Lock() for i in range(1, 4): t = threading.Thread(target=fun, name='线程%s' % i, args=(100, lock)) t.daemon = True t.start() t_list.append(t) for t in t_list: t.join() print("全部取钱结束,银行还剩%d" % money) # 线程1开始取钱... # 线程1取了100,还剩900 # 线程2开始取钱... # 线程2取了100,还剩800 # 线程3开始取钱... # 线程3取了100,还剩700 # 全部取钱结束,银行还剩700 3.2 死锁

用Lock的时候必须注意是否会陷入死锁,所谓死锁是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

以科学家吃面为例,爱因斯坦和记录夫人一起吃面,需要叉子和面条同时拥有才能吃面,但是各只有一个人可以使用,只有一个人同时获得面条和叉子才可以吃面。

import time from threading import Thread, Lock def eat_noodle1(name, noodle_lock, fork_lock): noodle_lock.acquire() print(f"{name} get noodle") time.sleep(1) fork_lock.acquire() print(f"{name} get fork") print(f"{name} start eat noodle") fork_lock.release() print(f"{name} put down noodle") noodle_lock.release() print(f"{name} put down noodle") def eat_noodle2(name, noodle_lock, fork_lock): fork_lock.acquire() print(f"{name} get fork") time.sleep(1) noodle_lock.acquire() print(f"{name} get noodle") print(f"{name} start eat noodle") noodle_lock.release() print(f"{name} put down noodle") fork_lock.release() print(f"{name} put down noodle") t_list = [] name_list = ["Einstein", "Curie"] noodle_lock = Lock() fork_lock = Lock() Einstein = Thread(target=eat_noodle1, name="Einstein", args=("Einstein", noodle_lock, fork_lock)) t_list.append(Einstein) Curie = Thread(target=eat_noodle2, name="Curie", args=("Curie", noodle_lock, fork_lock)) t_list.append(Curie) for t in t_list: t.start() # Einstein get noodle # Curie get fork

运行程序则程序死锁,因为Einstein拿到面条锁住,等待叉子,Curie拿到叉子锁住,等待面条,两个都在等对方释放锁,但是都在阻塞,造成死锁。

还有一种情况,在同一线程里,多次取获得锁,第一次获取锁后,还未释放,再次获得锁

import time from threading import Thread, Lock def eat_noodle1(name, lock): lock.acquire() print(f"{name} get noodle") lock.acquire() print(f"{name} get fork") print(f"{name} start eat noodle") lock.release() print(f"{name} put down noodle") lock.release() print(f"{name} put down noodle") t_list = [] name_list = ["Einstein", "Curie"] lock = Lock() for name in name_list: Einstein = Thread(target=eat_noodle1, name=name, args=(name, lock)) t_list.append(Einstein) for t in t_list: t.start() # Einstein get noodle

为了解决Lock死锁的情况,就有了递归锁:RLock。

3.3 Rlock

所谓的递归锁也被称为“锁中锁”,指一个线程可以多次申请同一把锁,但是不会造成死锁,这就可以用来解决上面的死锁问题。

import time from threading import Thread, RLock def eat_noodle1(name, lock): lock.acquire() print(f"{name} get noodle") lock.acquire() print(f"{name} get fork") print(f"{name} start eat noodle") lock.release() print(f"{name} put down noodle") lock.release() print(f"{name} put down noodle") t_list = [] name_list = ["Einstein", "Curie"] lock = RLock() for name in name_list: Einstein = Thread(target=eat_noodle1, name=name, args=(name, lock)) t_list.append(Einstein) for t in t_list: t.start() # Einstein get noodle # Einstein get fork # Einstein start eat noodle # Einstein put down noodle # Einstein put down noodle # Curie get noodle # Curie get fork # Curie start eat noodle # Curie put down noodle # Curie put down noodle

RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

4 线程通信 4.1 Condition

Condition可以认为是一把比Lock和RLOK更加高级的锁,其在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition内部常用方法如下:

acquire(): 上线程锁release(): 释放锁wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeErrornotify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放LocknotifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程

使用condition实现一个生产者消费者模式

import threading import time # 生产者 def produce(con): # 锁定线程 global num con.acquire() print("工厂开始生产……") while True: num += 1 print("已生产商品数量:{}".format(num)) time.sleep(1) if num >= 5: print("商品数量达到5件,仓库饱满,停止生产……") con.notify() # 唤醒消费者 con.wait() # 生产者自身陷入沉睡 # 释放锁 con.release() # 消费者 def consumer(con): con.acquire() global num print("消费者开始消费……") while True: num -= 1 print("剩余商品数量:{}".format(num)) time.sleep(2) if num


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3