Python多进程与多线程

您所在的位置:网站首页 python多线程编程包 Python多进程与多线程

Python多进程与多线程

#Python多进程与多线程| 来源: 网络整理| 查看: 265

在python中,multiprocessing模块提供了Process类,每个进程对象可以用一个Process类对象来代表。在python中进行多进程编程时,经常需要使用到Process类,这里对其进行简单说明。

1. Process类简单说明

1.1 Process类构造函数

Process类代码如下:

# # Type of default context -- underlying context can be set at most once # class Process(process.BaseProcess): _start_method = None @staticmethod def _Popen(process_obj): return _default_context.get_context().Process._Popen(process_obj) @staticmethod def _after_fork(): return _default_context.get_context().Process._after_fork()

由上述代码可知,Process类继承自process.BaseProcess,且自身没有构造方法。因此,在实例化Process类对象时调用的是process.BaseProcess类中的构造方法。进一步查看父类的构造方法,并对其中属性做出简单说明:

class BaseProcess(object): ''' Process objects represent activity that is run in a separate process The class is analogous to `threading.Thread` ''' def _Popen(self): raise NotImplementedError def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_process_counter) self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() self._parent_pid = os.getpid() self._parent_name = _current_process.name self._popen = None self._closed = False self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) self._name = name or type(self).__name__ + '-' + \ ':'.join(str(i) for i in self._identity) if daemon is not None: self.daemon = daemon _dangling.add(self)

由上可知,在Process类对象初始化时,需要提供如下参数:

group: 分组。由构造函数中的assert语句可知,此参数应始终为None。因此,在初始化时,不要传入此参数;target: 调用对象。即子进程要执行的任务对象,在此可以传入要执行的函数名称,表示子进程要调用的函数对象;name:为子进程设置的进程名称;args:传递给target对象的位置参数。args是元组类型,要以元组方式传入;kwargs: 传递给tartget对象的关键字函数。kwargs为字典类型,要以字典方式传入。daemon:是否要将子进程设置为守护进程。

1.2 实例方法

start():启动子进程;run():子进程启动时要执行的方法。当子进程启动后,就会执行run方法。在run方法中会调用_target方法并执行。如果自定义多进程类时,一定要重写该方法。 def run(self): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs) terminate():强制终止子进程。此方法不会进行任何清理操作。如果子进程p继续创建了子进程p1,该子进程p1就成了僵尸进程。如果子进程p中维持了锁,那么该锁也不会被释放,进而形成死锁。 def terminate(self): ''' Terminate process; sends SIGTERM signal or uses TerminateProcess() ''' self._check_closed() self._popen.terminate()is_alive():查看子进程是否在运行。join([timeout]):进程同步,父进程等待子进程完成后再继续执行。父线程等待子进程运行结束 是指主线程处于等待状态,而子进程处于运行状态。timeout为超时时间,超过这个时间,父线程不再等待子线程,继续往下执行。

1.3 属性介绍:

daemon:子进程是否作为守护进程,默认值为False。当为True时,若父进程终止,子进程也随之终止;并且子进程不能创建自己的子进程;name:子进程名称;pid:子进程pid,唯一标识一个进程;exitcode:进程在运行时为None、如果为–N,表示被信号N结束;authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功。2. 使用Process类进行多进程并发

2.1 直接实例化Process类对象

import os import random import time from multiprocessing import Process def multi_process_function(second: int): print(f'pid: {os.getpid()}, sleep time: {second} seconds') time.sleep(second) print(f'pid: {os.getpid()}, sleep finish') if __name__ == '__main__': process_set = [] for i in range(20): p = Process(target=multi_process_function, args=(random.randint(1, 20),)) process_set.append(p) p.start() for pro in process_set: pro.join()

上述代码中,为每个要执行的方法实例化了process对象,并通过start方法将子进程运行起来。在子进程启动之后,就会自动执行run方法,进而执行注册的方法multi_process_function。

在上述方法中,使用join方法实现父进程与子进程的同步等待。

2.2 继承process类,并重写run方法。

在前面我们提到,在python中,是使用process实例对象代表一个进程。在进程start()函数启动后,会自动调用process对象中的run方法。在process类中,run方法只做了一件事:调用注册到process对象中的target函数对象。所以,process实例对象中,run方法中定义的逻辑是真正在子进程中要执行逻辑。

那么,就可以自定义类,使其继承自process类,然后重写process类的run方法,在run方法中定义想要在子进程中执行的逻辑 的方式来实现多进程。

如下所示:

import os import random import time from multiprocessing import Process class MyProcess(Process): def __init__(self, name, second): super(MyProcess, self).__init__() self.name = name self.second = second def run(self): print(f'程序开始,进程pid:{os.getpid()}') time.sleep(self.second) print(f'程序结束, 进程pid:{os.getpid()}') if __name__ == '__main__': process_set = [] for i in range(20): p = MyProcess(f'进程{i}', random.randint(1, 20)) process_set.append(p) p.start() for pro in process_set: pro.join()

在上述示例中,自定义了类MyProcess,并重写run方法。在实例化MyProcess类之后,直接调用start方法就在子进程中执行了自定义的逻辑。

在上述例子中,继承了Process类之后,在调用Process类的初始化方法时,是将所有的初始化参数都保持为了默认值,并且在run方法中直接自定义了在新进程中要执行的逻辑。我们当然可以这么实现,但是这并不规范和标准,因为这样的作法使得我们重写的类与某种特定的逻辑绑定在一起了,该类实例对象只能实现这种特定的逻辑。标准重写run方法如下所使:

""" You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. """2. 进程池Pool

2.1 Pool介绍

在第一节中介绍Process类时,是手动进行子进程的创建。此方法只适用于需要手动创建的进程数量较少且执行目标不用控制的情景。当需要执行的目标很多,或者子进程数量很多时,就需要使用到进程池管理进程。在Python中,是通过Pool类来代表进程池的。

Pool类作为进程池,可以提供指定数量的进程供用户调用。当有新的请求提交到Pool中,如果进程池还没满,那么就会创建一个新的进程来执行该请求;如果进程池中的进程数量已经达到进程池上限,那么该请求就会等待,直到池中有进程结束,然后重用进程池中已经结束的进程(这里不是另外新起进程,而是重新使用已经执行结束的进程执行该请求)。

Pool构造函数:

def __init__( self, processes: int | None = ..., initializer: Callable[..., object] | None = ..., initargs: Iterable[Any] = ..., maxtasksperchild: int | None = ..., context: Any | None = ..., ) -> None: ...processes:进程池中需要创建的进程数量。如果该值省略,就保持默认值,即os.cpu_count()返回的数量;initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs);initargs:是要传给initializer的参数组;maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活;context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

Pool实例方法

apply(func[, args[, kwargs]]):在进程池的一个工作进程中执行func(args,*kwargs),然后返回结果。此方法是阻塞的,即待当前进程执行完毕后,才会执行下一个进程。这样,进程之间还是串行的,达不到多线程的效果。 def apply(self, func: Callable[..., _T], args: Iterable[Any] = ..., kwds: Mapping[str, Any] = ...) -> _T: ...apply_async(func[, arg[, kwds={}[, callback=None]]]):在进程池的一个工作进程中执行func(args,*kwargs),并返回执行结果,此执行结果是AsyncResult类实例对象。callback可调用对象,接收输入参数。当func运行结束后,会将运行结果传递给callback进行回调。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。apply_async是非阻塞,不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。此方法真正起到多进程作用。 def apply_async( self, func: Callable[..., _T], args: Iterable[Any] = ..., kwds: Mapping[str, Any] = ..., callback: Callable[[_T], object] | None = ..., error_callback: Callable[[BaseException], object] | None = ..., ) -> AsyncResult[_T]: ...map(func, iterable[, chunksize=None]):Pool类中的map方法,与apply函数用法行为基本一致,它会使进程阻塞直到返回结果。 def map(self, func: Callable[[_S], _T], iterable: Iterable[_S], chunksize: int | None = ...) -> list[_T]: ...map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async。 def map_async( self, func: Callable[[_S], _T], iterable: Iterable[_S], chunksize: int | None = ..., callback: Callable[[_T], object] | None = ..., error_callback: Callable[[BaseException], object] | None = ..., ) -> MapResult[_T]: ...close():关闭进程池。join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用,让其不再接受新的Process。terminate():结束工作进程,不再处理未处理的任务。

方法apply_async()和map_async()的返回值是AsyncResul的实例对象。AsyncResul实例对象具有以下方法:

# # Class whose instances are returned by `Pool.apply_async()` # class ApplyResult(object): def __init__(self, pool, callback, error_callback): self._pool = pool self._event = threading.Event() self._job = next(job_counter) self._cache = pool._cache self._callback = callback self._error_callback = error_callback self._cache[self._job] = self def ready(self): return self._event.is_set() def successful(self): if not self.ready(): raise ValueError("{0!r} not ready".format(self)) return self._success def wait(self, timeout=None): self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError if self._success: return self._value else: raise self._value def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) self._event.set() del self._cache[self._job] self._pool = None __class_getitem__ = classmethod(types.GenericAlias) get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发;ready():如果调用完成,返回True;successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常;wait([timeout]):等待结果变为可用;terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数;

2.2 Pool使用

import os import random import time from multiprocessing import Pool def multi_process_function(second: int): print(f'pid: {os.getpid()}, sleep time: {second} seconds') time.sleep(second) print(f'pid: {os.getpid()}, sleep finish') if __name__ == '__main__': pool = Pool(processes=10) for i in range(100): pool.apply_async(multi_process_function, (random.randint(1, 30),)) pool.close() pool.join()

执行结果如下

pid: 7392, sleep time: 9 seconds pid: 2532, sleep time: 9 seconds pid: 15460, sleep time: 3 seconds pid: 14640, sleep time: 2 seconds pid: 5296, sleep time: 30 seconds pid: 11048, sleep time: 7 seconds pid: 16476, sleep time: 14 seconds pid: 8408, sleep time: 28 seconds pid: 8712, sleep time: 27 seconds pid: 10472, sleep time: 30 seconds pid: 14640, sleep finish pid: 14640, sleep time: 17 seconds pid: 15460, sleep finish pid: 15460, sleep time: 9 seconds pid: 11048, sleep finish pid: 11048, sleep time: 16 seconds pid: 7392, sleep finish pid: 7392, sleep time: 24 seconds pid: 2532, sleep finish pid: 2532, sleep time: 2 seconds

首先,会同时启动10个进程。随后,当进程池中的工作进程的任务执行结束后,会继续在原有工作进程上执行新的请求。

3. 多线程

在python中,使用threading中的Thread类实例对象代表一个线程。其构造函数如下

class Thread: """A class that represents a thread of control. This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. """ _initialized = False def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): """This constructor should always be called with keyword arguments. Arguments are: *group* should be None; reserved for future extension when a ThreadGroup class is implemented. *target* is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. *args* is the argument tuple for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread. """

由代码可知,Thread类的初始化参数和Process类对象的参数一致。除此之外,Process类和Thread类的方法和属性也基本相同;由Thread类实现多线程的思路和方法与由Process类实现多进程的思路和方法也基本一致,这里不再展开说明。Thread类有自身的一些关于Daemon、name的操作,可参考源码知悉,这里不再展开。

值得注意的是,在Process类中,进程的唯一标识是pid,可以由os.getpid()方法获取;而线程的唯一表示则由ident表示,获取方法可参考下面代码

@property def ident(self): """Thread identifier of this thread or None if it has not been started. This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited. """ assert self._initialized, "Thread.__init__() not called" return self._ident



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3