Python 中最常用的 5 种线程锁你会用吗?

您所在的位置:网站首页 360程序锁为什么会失效 Python 中最常用的 5 种线程锁你会用吗?

Python 中最常用的 5 种线程锁你会用吗?

2024-07-10 16:43| 来源: 网络整理| 查看: 265

前言

本文将继续围绕 threading 模块讲解,基本上是纯理论偏多。

对于日常开发者来讲很少会使用到本文的内容,但是对框架作者等是必备知识,同时也是高频的面试常见问题。

线程安全

线程安全是多线程或多进程编程中的一个概念,在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。

线程安全的问题最主要还是由线程切换导致的,比如一个房间(进程)中有10颗糖(资源),除此之外还有3个小人(1个主线程、2个子线程),当小人A吃了3颗糖后被系统强制进行休息时他认为还剩下7颗糖,而当小人B工作后又吃掉了3颗糖,那么当小人A重新上岗时会认为糖还剩下7颗,但是实际上只有4颗了。

上述例子中线程A和线程B的数据不同步,这就是线程安全问题,它可能导致非常严重的意外情况发生,我们按下面这个示例来进行说明。

下面有一个数值num初始值为0,我们开启2条线程:

线程1对num进行一千万次+1的操作线程2对num进行一千万次-1的操作

结果可能会令人咋舌,num最后并不是我们所想象的结果0:

import threading num = 0 def add(): global num for i in range(10_000_000): num += 1 def sub(): global num for i in range(10_000_000): num -= 1 if __name__ == "__main__": subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 结果三次采集 # num result : 669214 # num result : -1849179 # num result : -525674

上面这就是一个非常好的案例,想要解决这个问题就必须通过锁来保障线程切换的时机。

需要我们值得留意的是,在Python基本数据类型中list、tuple、dict本身就是属于线程安全的,所以如果有多个线程对这3种容器做操作时,我们不必考虑线程安全问题。

锁的作用

锁是Python提供给我们能够自行操控线程切换的一种手段,使用锁可以让线程的切换变的有序。

一旦线程的切换变的有序后,各个线程之间对数据的访问、修改就变的可控,所以若要保证线程安全,就必须使用锁。

threading模块中提供了5种最常见的锁,下面是按照功能进行划分:

同步锁:lock(一次只能放行一个)递归锁:rlock(一次只能放行一个)条件锁:condition(一次可以放行任意个)事件锁:event(一次全部放行)信号量锁:semaphore(一次可以放行特定个) 1、Lock() 同步锁 基本介绍

Lock锁的称呼有很多,如:

同步锁互斥锁

它们是什么意思呢?如下所示:

互斥指的是某一资源同一时刻仅能有一个访问者对其进行访问,具有唯一性和排他性,但是互斥无法限制访问者对资源的访问顺序,即访问是无序的同步是指在互斥的基础上(大多数情况),通过其他机制实现访问者对资源的有序访问同步其实已经实现了互斥,是互斥的一种更为复杂的实现,因为它在互斥的基础上实现了有序访问的特点

下面是threading模块与同步锁提供的相关方法:

方法描述threading.Lock()返回一个同步锁对象lockObject.acquire(blocking=True, timeout=1)上锁,当一个线程在执行被上锁代码块时,将不允许切换到其他线程运行,默认锁失效时间为1秒lockObject.release()解锁,当一个线程在执行未被上锁代码块时,将允许系统根据策略自行切换到其他线程中运行lockObject.locaked()判断该锁对象是否处于上锁状态,返回一个布尔值 使用方式

同步锁一次只能放行一个线程,一个被加锁的线程在运行时不会将执行权交出去,只有当该线程被解锁时才会将执行权通过系统调度交由其他线程。

如下所示,使用同步锁解决最上面的问题:

import threading num = 0 def add(): lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() def sub(): lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 结果三次采集 # num result : 0 # num result : 0 # num result : 0

这样这个代码就完全变成了串行的状态,对于这种计算密集型I/O业务来说,还不如直接使用串行化单线程执行来得快,所以这个例子仅作为一个示例,不能概述锁真正的用途。

死锁现象

对于同步锁来说,一次acquire()必须对应一次release(),不能出现连续重复使用多次acquire()后再重复使用多次release()的操作,这样会引起死锁造成程序的阻塞,完全不动了,如下所示:

import threading num = 0 def add(): lock.acquire() # 上锁 lock.acquire() # 死锁 # 不执行 global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() # 上锁 lock.acquire() # 死锁 # 不执行 global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) with语句

由于threading.Lock()对象中实现了__enter__()与__exit__()方法,故我们可以使用with语句进行上下文管理形式的加锁解锁操作:

import threading num = 0 def add(): with lock: # 自动加锁 global num for i in range(10_000_000): num += 1 # 自动解锁 def sub(): with lock: # 自动加锁 global num for i in range(10_000_000): num -= 1 # 自动解锁 if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 结果三次采集 # num result : 0 # num result : 0 # num result : 0 2、RLock() 递归锁 基本介绍

递归锁是同步锁的一个升级版本,在同步锁的基础上可以做到连续重复使用多次acquire()后再重复使用多次release()的操作,但是一定要注意加锁次数和解锁次数必须一致,否则也将引发死锁现象。

下面是threading模块与递归锁提供的相关方法:

方法描述threading.RLock()返回一个递归锁对象lockObject.acquire(blocking=True, timeout=1)上锁,当一个线程在执行被上锁代码块时,将不允许切换到其他线程运行,默认锁失效时间为1秒lockObject.release()解锁,当一个线程在执行未被上锁代码块时,将允许系统根据策略自行切换到其他线程中运行lockObject.locaked()判断该锁对象是否处于上锁状态,返回一个布尔值 使用方式

以下是递归锁的简单使用,下面这段操作如果使用同步锁则会发生死锁现象,但是递归锁不会:

import threading num = 0 def add(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 结果三次采集 # num result : 0 # num result : 0 # num result : 0 with语句

由于threading.RLock()对象中实现了__enter__()与__exit__()方法,故我们可以使用with语句进行上下文管理形式的加锁解锁操作:

import threading num = 0 def add(): with lock: # 自动加锁 global num for i in range(10_000_000): num += 1 # 自动解锁 def sub(): with lock: # 自动加锁 global num for i in range(10_000_000): num -= 1 # 自动解锁 if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 结果三次采集 # num result : 0 # num result : 0 # num result : 0 3、Condition() 条件锁 基本介绍

条件锁是在递归锁的基础上增加了能够暂停线程运行的功能。并且我们可以使用wait()与notify()来控制线程执行的个数。

注意:条件锁可以自由设定一次放行几个线程。

下面是threading模块与条件锁提供的相关方法:

方法描述threading.Condition()返回一个条件锁对象lockObject.acquire(blocking=True, timeout=1)上锁,当一个线程在执行被上锁代码块时,将不允许切换到其他线程运行,默认锁失效时间为1秒lockObject.release()解锁,当一个线程在执行未被上锁代码块时,将允许系统根据策略自行切换到其他线程中运行lockObject.wait(timeout=None)将当前线程设置为“等待”状态,只有该线程接到“通知”或者超时时间到期之后才会继续运行,在“等待”状态下的线程将允许系统根据策略自行切换到其他线程中运行lockObject.wait_for(predicate, timeout=None)将当前线程设置为“等待”状态,只有该线程的predicate返回一个True或者超时时间到期之后才会继续运行,在“等待”状态下的线程将允许系统根据策略自行切换到其他线程中运行。注意:predicate参数应当传入一个可调用对象,且返回结果为bool类型lockObject.notify(n=1)通知一个当前状态为“等待”的线程继续运行,也可以通过参数n通知多个lockObject.notify_all()通知所有当前状态为“等待”的线程继续运行 使用方式

下面这个案例会启动10个子线程,并且会立即将10个子线程设置为等待状态。

然后我们可以发送一个或者多个通知,来恢复被等待的子线程继续运行:

import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name condLock.acquire() # 上锁 print("start and wait run thread : %s" % thName) condLock.wait() # 暂停线程运行、等待唤醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) condLock.release() # 解锁 if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3