multiprocessing
多处理 - 基于进程的“线程化”接口
2.6版本中的新功能。
1.介绍
multiprocessing
是一个使用类似于threading
模块的API支持产卵过程的软件包。该multiprocessing
包提供本地和远程并发性,通过使用子进程而不是线程有效地侧移全局解释器锁。由于这个原因,该multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。
multiprocessing
模块还引入了threading
模块中没有模拟量的API 。这方面的一个主要例子是Pool
提供了一种方便的手段,即跨越多个输入值并行执行一个函数,在输入数据之间分配输入数据(数据并行性)。以下示例演示了在模块中定义这些函数的通用做法,以便子进程可以成功导入该模块。这个数据并行性的基本例子Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
1.1. Process类别
在multiprocessing
,进程是通过创建一个Process
对象,然后调用它的start()
方法产生的。Process
遵循API的threading.Thread
。一个多进程程序的简单例子是
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了显示所涉及的各个进程ID,下面是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
有关解释为什么(在Windows上)if __name__ == '__main__'
零件是必需的,请参阅编程准则。
1.2. 在进程之间交换对象
multiprocessing
支持两种进程之间的通信通道:
Queues
这个Queue
类是一个近似的克隆Queue.Queue
。例如:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
队列是线程和进程安全的。
Pipes
该Pipe()
函数返回一对由管道连接的连接对象,默认情况下为双工(双向)。例如:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
通过返回的两个连接对象Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法(等等)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一
端,则管道中的数据可能会损坏。当然,不会同时使用管道的不同端点进行腐蚀的风险。
1.3. 进程之间的同步
multiprocessing
包含来自所有同步基元的等价物threading
。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用来自不同流程的锁定输出,则容易混淆。
1.4. 在进程之间共享状态
如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。
但是,如果您确实需要使用某些共享数据,则multiprocessing
可以提供一些方法。
共享内存
数据可以使用Value
或存储在共享内存映射中Array
。例如,下面的代码
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
将打印
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
在'd'
与'i'
创建时使用的参数num
和arr
被使用的那种的TypeCodes array
模块:'d'
表示一个双精度浮点数和'i'
指示符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes
支持创建从共享内存分配的任意ctypes对象的模块。
服务器进程
通过Manager()
控制服务器进程返回的管理器对象,该进程包含Python对象并允许其他进程使用代理来操作它们。
通过返回的Manager()
将支持类型list
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Queue
,Value
和Array
。例如,
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print d
print l
将打印
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理员可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存要慢。
1.5. 使用一群工人
本Pool
类代表工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。
例如:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
请注意,池的方法只能由创建它的进程使用。
注意
这个包中的功能要求__main__
模块可以由孩子导入。编程指南中介绍了这一点,但值得在此指出。这意味着一些示例(如Pool
示例)在交互式解释器中不起作用。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果你尝试这样做,它实际上会输出三个以半随机方式交错的完整回溯,然后你可能不得不停止主处理。)
2.参考
multiprocessing
软件包主要复制threading
模块的API 。
2.1. Process和例外
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
过程对象表示在单独的过程中运行的活动。本Process
类具有的所有方法等同threading.Thread
。
应始终使用关键字参数调用构造函数。团队
应该永远是None
; 它仅仅是为了兼容而存在threading.Thread
。target
是要由run()
方法调用的可调用对象。它默认为None
,意味着什么都不叫。名称
是进程名称
。缺省情况下,一个唯一的名称
由'Process-N1:N2:...:Nk'形式构成,其中N1,N2,...,Nk是一个整数序列,其长度由进程的生成
决定。args
是目标
调用的参数元组。kwargs
是目标
调用的关键字参数字典。默认情况下,没有参数传递给目标
.
如果一个子类重写构造函数,它必须确保它在进行任何其他操作之前调用基类构造函数(Process.__init__()
)。
run()
表示进程活动的方法。
您可以在子类中重写此方法。标准run()
方法调用传递给对象构造函数的可调用对象作为目标参数,如果有的话,分别从args
和kwargs
参数中获取顺序和关键字参数。
start()
开始流程的活动。
每个过程对象最多只能调用一次。它安排run()
在独立的进程中调用对象的方法。
join([timeout])
阻塞调用线程,直到调用该join()
方法的进程终止或发生可选超时。
如果超时
则None
没有超时
。
一个过程可以连接多次。
进程无法自行加入,因为这会导致死锁。尝试在启动之前加入进程是错误的。
name
该进程的名称。
该名称是仅用于识别目的的字符串。它没有语义。多个进程可以被赋予相同的名称。初始名称由构造函数设置。
is_alive()
返回过程是否存在。
粗略地说,从该start()
方法返回的那一刻起,一个进程对象处于活动状态,直到子进程终止。
daemon
进程的守护进程标志,一个布尔值。这必须在start()
调用之前设置。
初始值是从创建过程继承的。
当进程退出时,它将尝试终止其所有守护进程的子进程。
请注意,守护进程不允许创建子进程。否则,守护进程会在其父进程退出时终止其子进程。另外,这些不是
Unix守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是
加入)。
除了threading.Thread
API之外,Process
对象还支持以下属性和方法:
pid
返回进程ID。在这个过程产生之前,这将是None
。
exitcode
孩子的退出代码。这将是None
如果该过程还没有结束。负值-N
表示孩子被信号N
终止。
authkey
进程的身份验证密钥(一个字节字符串)。
当multiprocessing
初始化主进程正在使用分配一个随机串os.urandom()
。
当Process
创建一个对象时,它将继承其父进程的身份验证密钥,但可以通过设置authkey
为另一个字节字符串来更改它。
请参阅认证密钥。
terminate()
终止该过程。在Unix上,这是使用SIGTERM
信号完成的; 在Windows TerminateProcess()
上使用。请注意,退出处理程序和最后的子句等将不会执行。
请注意,流程的后代流程不会
终止 - 它们将简单地变成孤儿。
警告
如果在关联进程正在使用管道或队列时使用此方法,那么管道或队列可能会损坏并可能会被其他进程无法使用。同样,如果进程获得了锁或信号量等,那么终止它可能会导致其他进程死锁。
需要注意的是start()
,join()
,is_alive()
,terminate()
和exitcode
方法只能由创建进程对象的过程调用。
一些方法的示例用法Process
:
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.BufferTooShort
Connection.recv_bytes_into()
当提供的缓冲区对象太小而不能读取消息时引发异常。
如果e
是的一个实例BufferTooShort
,然后e.args[0]
会给出消息作为字节串。
2.2. 管道和队列
当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步原语(如锁)。
对于传递消息,可以使用Pipe()
(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。
的Queue
,multiprocessing.queues.SimpleQueue
和JoinableQueue
类型是仿照多生产,多消费FIFO队列Queue.Queue
的标准库类。他们的区别在于Queue
缺乏task_done()
和join()
引入的Python 2.5的方法Queue.Queue
类。
如果使用,JoinableQueue
则必须
调用JoinableQueue.task_done()
从队列中移除的每个任务,否则用于计算未完成任务数的信号量最终可能会溢出,引发异常。
请注意,您也可以使用管理员对象创建共享队列 - 请参阅经理。
注意
multiprocessing
使用常规Queue.Empty
和Queue.Full
例外来表示超时。它们在multiprocessing
命名空间中不可用,因此您需要从中导入Queue
。
注意
当一个对象被放在一个队列中时,该对象被腌制,后面的线程将清理后的数据刷新到底层管道。这有一些令人惊讶的后果,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以使用由经理创建的队列。
- 在一个空队列上放置一个对象之后,在队列的
empty()
方法返回之前可能会有一个无限小的延迟,False
并且get_nowait()
可以在不提升的情况下返回Queue.Empty
。
2. 如果多个进程正在排列对象,则可能会在另一端无序地接收对象。然而,由相同过程入队的对象将始终按预期顺序相对于彼此。
警告
如果一个进程正在使用打死Process.terminate()
或os.kill()
当它试图使用Queue
,则在队列中的数据可能会损坏。这可能会导致任何其他进程在尝试稍后使用队列时发生异常。
警告
如上所述,如果子进程已将项目放入队列(并且未使用JoinableQueue.cancel_join_thread
),则该进程将不会终止,直到所有缓冲项目已被刷新到管道。
这意味着如果你尝试加入这个过程,你可能会遇到死锁,除非你确定放入队列的所有物品都已被消耗掉。同样,如果子进程是非守护进程,那么父进程在尝试加入所有非守护进程子进程时可能会在退出时挂起。
请注意,使用管理器创建的队列不存在此问题。请参阅编程准则。
有关使用队列进行进程间通信的示例,请参阅示例。
multiprocessing.Pipe([duplex])
返回一对(conn1, conn2)
的Connection
代表的配管的端部的对象。
如果双工
是True
(默认),那么管道是双向的。如果双工
是False
管道,那么管道是单向的:conn1
只能用于接收消息,conn2
并且只能用于发送消息。
class multiprocessing.Queue([maxsize])
返回使用管道和几个锁/信号量实现的进程共享队列。当一个进程首先将一个项目放入队列中时,启动一个进给线程,该进程线程将对象从缓冲区传送到管道中。
标准库模块的常见Queue.Empty
和Queue.Full
例外情况Queue
引发超时。
Queue
实现Queue.Queue
除了task_done()
和之外的所有方法join()
。
qsize()
返回队列的近似大小。由于多线程/多处理语义,这个数字是不可靠的。
请注意,NotImplementedError
在Mac OS X等Unix平台上可能会出现这种情况,sem_getvalue()
但未实现。
empty()
如果队列为空则返回True
,否则返回False
。由于多线程/多处理语义,这是不可靠的。
full()
如果队列已满则返回True
,否则返回False
。由于多线程/多处理语义,这是不可靠的。
put(obj[, block[, timeout]])
将obj放入队列中。如果可选参数块
是True
(默认)并且超时
是None
(默认),则在需要时禁止,直到有空闲插槽可用。如果超时
是一个正数,它会阻止至多超时
秒数,并Queue.Full
在该时间内没有空闲插槽时引发异常。否则(块
是False
),如果空闲插槽立即可用,则在队列中放置一个项目,否则引发Queue.Full
异常(在这种情况下超时
被忽略)。
put_nowait(obj)
相当于put(obj, False)
。
get([block[, timeout]])
从队列中移除并返回一个项目。如果可选的参数块
为True
默认值,并且超时
为None
默认值,则在必要时阻塞,直到项目可用。如果超时时间
为正数,则最多会阻止超时
秒数,Queue.Empty
如果在该时间内没有可用项目,则会引发异常。否则(块
是False
),如果一个项立即可用,则返回一个项目,否则引发Queue.Empty
异常(在这种情况下超时
被忽略)。
get_nowait()
相当于get(False)
。
Queue
有一些其他方法未找到Queue.Queue
。大多数代码通常不需要这些方法:
close()
指示当前进程不会有更多数据放入此队列中。一旦将所有缓冲数据刷新到管道,后台线程将退出。这在队列被垃圾收集时自动调用。
join_thread()
加入后台线程。这只能在close()
被调用后才能使用。它会阻塞,直到后台线程退出,确保缓冲区中的所有数据已被刷新到管道。
默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该过程可以调用cancel_join_thread()
做出join_thread()
什么也不做。
cancel_join_thread()
防止join_thread()
阻塞。特别是,这可以防止后台线程在进程退出时自动加入 - 请参阅join_thread()
。
这个方法的名字可能更好allow_exit_without_flush()
。这很可能会导致入队数据丢失,并且几乎可以肯定不需要使用它。如果您需要当前进程立即退出而不等待将排入队列的数据刷新到底层管道,并且您不关心丢失的数据,那才真正在那里。
注意
该类的功能需要在主机操作系统上运行共享信号量。没有一个,这个类中的功能将被禁用,并尝试实例化一个Queue
将导致一个ImportError
。有关其他信息,请参阅问题3770。对于下面列出的任何专用队列类型也是如此。
class multiprocessing.queues.SimpleQueue
这是一个简化的Queue
类型,非常接近锁定Pipe
。
empty()
如果队列为空则返回True
,否则返回False
。
get()
从队列中移除并返回一个项目。
put(item)
将项目
放入队列中。
class multiprocessing.JoinableQueue([maxsize])
JoinableQueue
,一个Queue
子类,是另外有一个队列task_done()
和join()
方法。
task_done()
表明以前排队的任务已完成。由队列消费者线程使用。对于每个get()
用于获取任务的对象,后续调用会task_done()
告知队列该任务的处理已完成。
如果join()
当前处于阻塞状态,则在所有项目都处理完毕后(即task_done()
接收到已put()
进入队列的每个项目的呼叫),它将恢复。
提出了一个ValueError
好象叫更多的时间比中放入队列中的项目。
join()
阻塞,直到队列中的所有项目都被获取并处理。
每当将项目添加到队列中时,未完成任务的数量就会增加。无论何时消费者线程调用task_done()
以指示该项目已被检索并且所有工作都已完成,计数就会减少。当未完成任务的计数降至零时,join()
取消阻止。
2.3. 杂
multiprocessing.active_children()
返回当前进程的所有活着的孩子的列表。
调用它会产生“加入”已经完成的任何过程的副作用。
multiprocessing.cpu_count()
返回系统中的CPU数量。可能会提高NotImplementedError
。
multiprocessing.current_process()
返回Process
当前进程对应的对象。
一种类似物threading.current_thread()
。
multiprocessing.freeze_support()
添加对何时使用的程序multiprocessing
已被冻结以生成Windows可执行文件的支持。(已经用py2exe
,PyInstaller
和cx_Freeze
进行过测试。)
需要if __name__ == '__main__'
在主模块的行后面直接调用该函数。例如:
from multiprocessing import Process, freeze_support
def f():
print 'hello world!'
if __name__ == '__main__':
freeze_support()
Process(target=f).start()
如果freeze_support()
行被忽略,则尝试运行冻结的可执行文件将会引发RuntimeError
。
freeze_support()
在Windows以外的任何操作系统上调用时,调用都不起作用。另外,如果模块正在通过Windows上的Python解释器正常运行(该程序尚未被冻结),则freeze_support()
不起作用。
multiprocessing.set_executable()
设置启动子进程时使用的Python解释器的路径。(默认sys.executable
使用)。嵌入者可能需要做一些类似的事情
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
然后才能创建子进程。(仅限Windows)
注意
multiprocessing
不包含的类似物threading.active_count()
,threading.enumerate()
,threading.settrace()
,threading.setprofile()
,threading.Timer
,或threading.local
。
2.4. 连接对象
连接对象允许发送和接收可选对象或字符串。他们可以被认为是面向消息的连接套接字。
连接对象通常使用Pipe()
- 另请参阅监听器和客户端。
class multiprocessing.Connectionsend(obj)
将对象发送到应该使用的连接的另一端recv()
。
该对象必须是可挑选的。非常大的泡菜(大约32 MB +,但取决于操作系统)可能会引发ValueError
异常。
recv()
返回使用连接的另一端发送的对象send()
。阻止,直到有东西可以接收。EOFError
如果没有什么可以接收而另一端关闭,就会引发。
fileno()
返回连接使用的文件描述符或句柄。
close()
关闭连接。
当连接被垃圾收集时这被自动调用。
poll([timeout])
返回是否有可供读取的数据。
如果没有指定超时
,它将立即返回。如果超时
是一个数字,那么这指定了要阻止的最大时间(以秒为单位)。如果超时
,None
则使用无限超时
。
send_bytes(buffer[, offset[, size]])
将支持缓冲区接口的对象的字节数据作为完整消息发送。
如果给出偏移量
,则数据从缓冲区中的
该位置读取。如果给出大小
,那么将从缓冲区读取那么多字节。非常大的缓冲区(大约32 MB +,但取决于操作系统)可能会引发ValueError
异常
recv_bytes([maxlength])
将连接另一端发送的字节数据的完整消息作为字符串返回。阻止,直到有东西可以接收。提出EOFError
,如果没有什么留下来接收和另一端已经关闭。
如果最大长度
被指定并且所述消息是长于最大长度
然后IOError
升至并连接将不再是可读的。
recv_bytes_into(buffer[, offset])
将从连接另一端发送的完整字节数据消息读入缓冲区
,并返回消息中的字节数。阻止,直到有东西可以接收。EOFError
如果没有什么可以接收而另一端关闭,就会引发。
缓冲区
必须是满足可写缓冲区
接口的对象。如果给出偏移量
,那么消息将从该位置写入缓冲区
。偏移量
必须是小于缓冲区
长度的非负整数(以字节为单位)。
如果缓冲区太短,则会BufferTooShort
引发异常,并且完整的消息可用,e.args[0]
因为e
异常实例在哪里。
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动取消接收的数据,这可能会带来安全风险,除非您可以信任发送该消息的进程。
因此,除非连接对象是使用Pipe()
您制作的,否则只应在执行某种验证之后使用recv()
和send()
方法。请参阅认证密钥。
警告
如果某个进程在尝试读取或写入管道时被终止,那么管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。
2.5. 同步原语
通常,同步原语在多进程程序中并不像在多线程程序中那样必要。请参阅threading
模块的文档。
请注意,也可以使用管理器对象创建同步基元 - 请参阅管理器。
class multiprocessing.BoundedSemaphore([value])
一个有界的信号量对象:一个紧密的类比threading.BoundedSemaphore
。
与其近似模拟存在单一差异:其acquire
方法的第一个参数是名为block的,
并且它支持可选的第二个参数超时
,与之一致Lock.acquire()
。
注意
在Mac OS X上,这是无法区分的,Semaphore
因为sem_getvalue()
没有在该平台上实现。
class multiprocessing.Condition([lock])
一个条件变量:一个克隆threading.Condition
。
如果指定了锁
,那么它应该是一个Lock
或一个RLock
对象multiprocessing
。
class multiprocessing.Event
克隆的threading.Event
。这个方法在退出时返回内部信号量的状态,所以它会一直返回True
,除非超时并且操作超时。
在版本2.7中更改:以前,该方法始终返回None
。
class multiprocessing.Lock
非递归锁定对象:一个非常类似的threading.Lock
。一旦进程或线程获得锁定,随后尝试从任何进程或线程获取它将被阻塞,直到它被释放; 任何进程或线程都可以释放它。threading.Lock
适用于线程的概念和行为在multiprocessing.Lock
适用于进程或线程时在此处被复制,除非另有说明。
请注意,Lock
它实际上是一个工厂函数,它返回multiprocessing.synchronize.Lock
使用默认上下文初始化的实例。
Lock
支持上下文管理器协议,因此可以在with
语句中使用。
acquire(block=True, timeout=None)
获取锁定,阻止或不阻止。
将block
参数设置为True
(默认值)时,方法调用将被阻塞,直到锁处于解锁状态,然后将其设置为锁定并返回True
。请注意,这第一个参数的名称不同于threading.Lock.acquire()
。
将块
参数设置为False
,方法调用不会阻止。如果锁当前处于锁定状态,则返回False
;否则将锁设置为锁定状态并返回True
。
当用正值,浮点值超时
调用时
,只要无法获取锁,就会阻塞超时
指定的秒数。具有负值的调用超时
值相当于超时
值为零。超时
值为None
(缺省值)的调用将超时时间
设置为无限。该超时
参数有,如果没有实际意义块
参数被设置为False
,并因此忽略。返回True
如果锁已经被收购或False
如果超时
周期已经过去。请注意超时
这种方法的模拟中不存在论据threading.Lock.acquire()
。
release()
释放一个锁。这可以从任何进程或线程调用,不仅是最初获取锁的进程或线程。
除了在解锁的锁上调用threading.Lock.release()
时,行为与其中的行为相同ValueError
。
class multiprocessing.RLock
递归锁定对象:一个紧密的模拟对象threading.RLock
。递归锁必须由获取它的进程或线程释放。一旦进程或线程获得了递归锁定,相同的进程或线程可能会再次获取而不会阻塞; 该进程或线程必须每次释放一次它被获取。
请注意,RLock
它实际上是一个工厂函数,它返回multiprocessing.synchronize.RLock
使用默认上下文初始化的实例。
RLock
支持上下文管理器协议,因此可以在with
语句中使用。
acquire(block=True, timeout=None)
获取锁定,阻止或不阻止。
在block
参数设置为的情况下调用时True
,阻塞直到锁处于解锁状态(不属于任何进程或线程),除非该锁已被当前进程或线程所拥有。当前进程或线程接着获取锁的所有权(如果它还没有所有权),并且锁内的递归级别增加1,导致返回值为True
。请注意,与实现相比,第一个参数的行为存在一些差异threading.RLock.acquire()
,从参数本身的名称开始。
当调用block
参数设置为时False
,不要阻塞。如果锁已被另一个进程或线程获取(并因此被拥有),则当前进程或线程不会获得所有权,并且锁内的递归级别不会更改,导致返回值为False
。如果锁处于未锁定状态,当前进程或线程将获得所有权,并递增递归级别,从而返回值为True
。
timeout
参数的使用和行为与in中的相同Lock.acquire()
。请注意,此方法的模拟中不存在超时
参数threading.RLock.acquire()
。
release()
释放一个锁,递减递归级别。如果在递减之后递归级别为零,则将锁重置为解锁(不属于任何进程或线程),并且如果有任何其他进程或线程被阻塞,等待锁解锁,则准许其中一个进程继续。如果在递减之后递归级别仍然不为零,则锁保持锁定并由调用进程或线程拥有。
只有在调用进程或线程拥有锁定时才调用此方法。一个AssertionError
如果该方法是通过一个过程调用或线程以外的雇主或升高如果锁处于解锁(无主)的状态。请注意,在这种情况下引发的异常类型与实现的行为不同threading.RLock.release()
。
class multiprocessing.Semaphore([value])
信号量对象:一个近似的类比threading.Semaphore
。
与其近似模拟存在单一差异:其acquire
方法的第一个参数是名为block的,
并且它支持可选的第二个参数超时
,与之一致Lock.acquire()
。
注意
所述acquire()
的方法BoundedSemaphore
,Lock
,RLock
和Semaphore
具有未由当量支撑的超时
参数threading
。签名的acquire(block=True, timeout=None)
关键字参数是可以接受的。如果block
是True
和timeout
不是,None
那么它指定一个以秒为单位的超时
。如果块
是False
那么超时
将被忽略。
在Mac OS X上,sem_timedwait
不受支持,因此acquire()
使用超时调用将使用休眠循环模拟该函数的行为。
注意
如果所产生的SIGINT信号Ctrl-C
,而主线程是由呼叫阻塞到达BoundedSemaphore.acquire()
,Lock.acquire()
,RLock.acquire()
,Semaphore.acquire()
,Condition.acquire()
或Condition.wait()
则该呼叫将被立即中断和KeyboardInterrupt
将提高。
这与threading
在等效阻塞调用进行时忽略SIGINT 的行为不同。
注意
某些此软件包的功能需要在主机操作系统上运行共享信号量。没有一个,multiprocessing.synchronize
模块将被禁用,并尝试导入它将导致一个ImportError
。有关其他信息,请参阅问题3770。
2.6. 共享ctypes对象
可以使用可以由子进程继承的共享内存创建共享对象。
multiprocessing.Value(typecode_or_type, *args[, lock])
返回ctypes
从共享内存分配的对象。默认情况下,返回值实际上是对象的同步包装器。
typecode_or_type
确定返回对象的类型:它是一个ctypes类型或array
模块使用的类型的一个字符类型。*参数
传递给类型的构造函数。
如果锁
是True
(默认值),那么一个新的递归锁
对象被创建同步访问值。如果锁
是一个Lock
或一个RLock
对象,那么将用于同步对该值的访问。如果是锁
,False
那么访问返回的对象不会被锁
自动保护,因此它不一定是“过程安全的”。
操作就像+=
它涉及的读取和写入不是原子。因此,例如,如果你想以原子方式递增共享值,那么仅仅是做不到
counter.value += 1
假设关联的锁是递归的(它是默认的),你可以改为这样做
with counter.get_lock():
counter.value += 1
请注意,锁定
是仅有关键字的参数。
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回从共享内存分配的ctypes数组。默认情况下,返回值实际上是数组的同步包装器。
typecode_or_type
确定返回数组元素的类型:它是模块使用的ctypes类型或单字符typecode array
。如果size_or_initializer
是一个整数,那么它决定了数组的长度,并且该数组最初将归零。否则,size_or_initializer
是用于初始化数组的序列,其长度决定数组的长度。
如果锁
是True
(默认值),那么一个新的锁
定对象被创建同步访问值。如果锁
是一个Lock
或一个RLock
对象,那么将用于同步对该值的访问。如果是锁
,False
那么访问返回的对象不会被锁
自动保护,因此它不一定是“过程安全的”。
请注意,锁定
是仅有关键字的参数。
请注意,一个ctypes.c_char
具有值
和原始
属性的数组允许用它来存储和检索字符串。
2.6.1. multiprocessing.sharedctypes模块
multiprocessing.sharedctypes
模块提供了ctypes
从共享内存中分配可由子进程继承的对象的功能。
注意
虽然可以将指针存储在共享内存中,但请记住这将指向特定进程的地址空间中的某个位置。但是,指针在第二个进程的上下文中很可能无效,并且试图从第二个进程取消引用该指针可能会导致崩溃。
multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
返回从共享内存分配的ctypes数组。
typecode_or_type
确定返回数组元素的类型:它是模块使用的ctypes类型或单字符typecode array
。如果size_or_initializer
是一个整数,那么它确定数组的长度,并且该数组将被初始化为零。否则,size_or_initializer
是一个用于初始化数组的序列,其长度决定数组的长度。
请注意,设置和获取元素可能是非原子的 - 请使用它Array()
来确保使用锁自动同步访问。
multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
返回从共享内存分配的ctypes对象。
typecode_or_type
确定返回对象的类型:它是一个ctypes类型或array
模块使用的类型的一个字符类型。*参数
传递给类型的构造函数。
请注意,设置和获取该值可能是非原子的 - 使用它Value()
来确保访问使用锁自动同步。
请注意,ctypes.c_char
has value
和raw
属性允许用它来存储和检索字符串 - 请参阅文档ctypes
。
multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])
相同RawArray()
,除了取决于的值锁
的过程安全的同步封装器可以返回来代替原始ctypes的阵列。
如果锁
是True
(默认值),那么一个新的锁
定对象被创建同步访问值。如果锁
是一个Lock
或一个RLock
对象,那么将用于同步对该值的访问。如果是锁
,False
那么访问返回的对象不会被锁
自动保护,因此它不一定是“过程安全的”。
请注意,锁定
是仅有关键字的参数。
multiprocessing.sharedctypes.Value(typecode_or_type, *args[, lock])
相同RawValue()
,除了取决于的值锁
的过程安全的同步封装器可以返回来代替原始ctypes的对象。
如果锁
是True
(默认值),那么一个新的锁
定对象被创建同步访问值。如果锁
是一个Lock
或一个RLock
对象,那么将用于同步对该值的访问。如果是锁
,False
那么访问返回的对象不会被锁
自动保护,因此它不一定是“过程安全的”。
请注意,锁定
是仅有关键字的参数。
multiprocessing.sharedctypes.copy(obj)
返回从ctypes对象obj
的副本共享内存中分配的ctypes对象。
multiprocessing.sharedctypes.synchronized(obj[, lock])
为使用锁
来同步访问的ctypes对象返回一个流程安全的包装器对象。如果锁
是None
(缺省值),然后一个multiprocessing.RLock
对象被自动创建。
一个同步包装除了包装它的对象之外还有两个方法:get_obj()
返回包装对象并get_lock()
返回用于同步的锁对象。
请注意,通过包装器访问ctypes对象可能比访问原始ctypes对象要慢很多。
下表比较了用于从共享内存创建共享ctypes对象的语法与正常ctypes语法。(在表中MyStruct
是。的一些子类ctypes.Structure
。)
ctypes的 | 使用类型共享类型 | 使用typecode共享类型 |
---|---|---|
c_double(2.4) | RawValue(c_double,2.4) | RawValue('d',2.4) |
MyStruct(4,6) | RawValue(MyStruct,4,6) | |
(c_short * 7)() | RawArray(c_short,7) | RawArray('h',7) |
(c_int * 3)(9,2,8) | RawArray(c_int,(9,2,8)) | RawArray('i',(9,2,8)) |
下面是一个例子,其中一些ctypes对象被一个子进程修改:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', 'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n.value
print x.value
print s.value
print [(a.x, a.y) for a in A]
打印的结果是
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
2.7. 经理
管理人员提供了一种方法来创建可以在不同进程之间共享的数据。管理员对象控制管理共享对象
的服务器进程。其他进程可以通过使用代理来访问共享对象
。
multiprocessing.Manager()
返回SyncManager
可用于在进程之间共享对象的已启动对象。返回的管理器对象对应于衍生的子进程,并具有创建共享对象并返回相应代理的方法。
当垃圾收集或其父进程退出时,Manager进程将立即关闭。管理员类在multiprocessing.managers
模块中定义:
class multiprocessing.managers.BaseManager([address[, authkey]])
创建一个BaseManager对象。
一旦创建,应该调用start()
或get_server().serve_forever()
确保管理器对象引用已启动的管理器进程。
地址
是管理进程侦听新连接的地址
。如果地址
是None
那么任意一个被选择。
authkey
是将用于检查到服务器进程的传入连接的有效性的身份验证密钥。如果AUTHKEY
是None
那么current_process().authkey
。否则使用authkey
,它必须是一个字符串。
start([initializer[, initargs]])
启动一个子流程来启动管理器。如果初始化
程序不是,None
那么子进程将initializer(*initargs)
在启动时调用。
get_server()
返回一个Server
代表管理器控制下的实际服务器的对象。Server
对象支持serve_forever()
方法:
>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()
Server
另外还有一个address
属性。
connect()
将本地管理器对象连接到远程管理器进程:
>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown()
停止经理使用的过程。这仅start()
在用于启动服务器进程时才可用。
这可以被多次调用。
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
可用于注册类型或可与经理类一起调用的类方法。
typeid
是一个“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。
callable
是可调用的,用于为此类型标识创建对象。如果将使用from_address()
classmethod 创建管理器实例,或者如果create_method
参数是False
可以保留为None
。
proxytype
是其中的一个子类,BaseProxy
用于使用此typeid
为共享对象创建代理。如果None
那么代理类是自动创建的。
exposed
用于指定一个方法名称序列,该类型的代理应该被允许使用访问BaseProxy._callmethod()
。(如果暴露
在None
随后proxytype._exposed_
被用于代替如果它存在)。在其中没有指定暴露
列表中的情况下,共享对象的所有“公共方法”将是可访问的。(这里的“公共方法”是指任何具有__call__()
方法且名称不以其开头的属性'_'
。)
method_to_typeid
是一个映射,用于指定那些应该返回代理的公开方法的返回类型。它将方法名称映射到typeid字符串。(如果method_to_typeid
是None
然后proxytype._method_to_typeid_
,如果它存在来代替。)如果一个方法的名字不在此映射的关键或如果映射是None
则该方法返回该对象将在值被复制。
create_method
确定是否应该使用名称typeid
创建一个方法,该方法可用于通知服务器进程创建新的共享对象并为其返回代理。默认情况下是True
。
BaseManager
实例也有一个只读属性:
address
经理使用的地址。
class multiprocessing.managers.SyncManager
其中的一个子类BaseManager
可用于进程的同步。这种类型的对象被返回multiprocessing.Manager()
。
它还支持创建共享列表和字典。
BoundedSemaphore([value])
创建一个共享threading.BoundedSemaphore
对象并为其返回一个代理。
Condition([lock])
创建一个共享threading.Condition
对象并为其返回一个代理。
如果提供了锁
,那么它应该是一个threading.Lock
或一个threading.RLock
对象的代理。
Event()
创建一个共享threading.Event
对象并为其返回一个代理。
Lock()
创建一个共享threading.Lock
对象并为其返回一个代理。
Namespace()
创建一个共享Namespace
对象并为其返回一个代理。
Queue([maxsize])
创建一个共享Queue.Queue
对象并为其返回一个代理。
RLock()
创建一个共享threading.RLock
对象并为其返回一个代理。
Semaphore([value])
创建一个共享threading.Semaphore
对象并为其返回一个代理。
Array(typecode, sequence)
创建一个数组并为其返回一个代理。
Value(typecode, value)
创建一个具有可写value
属性的对象并返回它的代理。
dict()dict(mapping)dict(sequence)
创建一个共享dict
对象并为其返回一个代理。
list()list(sequence)
创建一个共享list
对象并为其返回一个代理。
注意
字典和列表代理中的可变值或项目的修改不会通过管理器传播,因为代理无法知道其值或项目何时被修改。要修改这样的项目,您可以将修改的对象重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append{})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
class multiprocessing.managers.Namespace
可以注册的类型SyncManager
。
名称空间对象没有公共方法,但具有可写的属性。它的表示显示了它的属性值。
但是,当为名称空间对象使用代理时,以下列属性开头的属性'_'
将成为代理的属性,而不是所指对象的属性:
>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3 # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')
2.7.1. 定制经理
要创建自己的经理,需要创建一个子类BaseManager
并使用register()
该类方法向经理类注册新类型或可调用对象。例如:
from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
2.7.2. 使用远程管理器
可以在一台机器上运行管理服务器,并让客户机从其他机器上使用它(假设所涉及的防火墙允许)。
运行以下命令将为远程客户端可以访问的单个共享队列创建一个服务器:
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个客户端可以如下访问服务器:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个客户端也可以使用:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问该队列,使用客户端上面的代码远程访问它:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
2.8. 代理对象
代理是一个对象
,它指的
是一个共享对象
,它可能存在于不同的进程中。共享对象
被认为是代理的对象
。多个代理对象
可能具有相同的对象
。
代理对象具有调用其指示对象的相应方法的方法(尽管代理中并不需要指示对象的每种方法)。代理通常可以以与其所指对象相同的方式使用:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,应用于str()
代理将返回指示对象的表示,而应用repr()
将返回代理的表示。
代理对象的一个重要特性是它们是可选择的,因此它们可以在进程之间传递。但是,请注意,如果将代理发送给相应的经理进程,则取消其代理将自己生成指示对象。这意味着,例如,一个共享对象可以包含第二个:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']
注意
代理类型multiprocessing
无法支持按值进行比较。所以,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
在进行比较时,应该只使用指示对象的副本。
class multiprocessing.managers.BaseProxy
代理对象是的子类的实例BaseProxy
。
_callmethod(methodname[, args[, kwds]])
调用并返回代理所指对象方法的结果。
如果proxy
是代理人的对象是obj
表达式
proxy._callmethod(methodname, args, kwds)
将评估表达
getattr(obj, methodname)(*args, **kwds)
在经理的过程中。
返回的值将是调用结果的副本或新共享对象的代理 - 请参阅method_to_typeid
参数的文档BaseManager.register()
。
如果通话引发异常,则通过重新提出_callmethod()
。如果经理进程中出现其他异常,则将其转换为RemoteError
异常并由其提出_callmethod()
。
特别要注意的是,如果methodname
尚未公开,
将会引发异常。
使用的一个例子_callmethod()
:
>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()
返回指称对象的副本。
如果指示物是不可抽出的,那么这将引发异常。
__repr__()
返回代理对象的表示。
__str__()
返回指示对象的表示。
2.8.1. 清理
代理对象使用weakref回调函数,以便当它被垃圾回收后,它将从拥有其指示对象的管理器中注销自身。
当不再有任何代理引用共享对象时,会从管理进程中删除共享对象。
2.9. 进程池
可以创建一个过程池,执行与Pool
课程一起提交给它的任务。
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
一个进程池对象,用于控制可提交作业的工作进程池。它支持超时和回调的异步结果,并具有并行映射实现。
进程
是要使用的工作进程
的数量。如果过程
是None
然后通过返回的数字cpu_count()
被使用。如果初始化
程序不是,None
则每个工作进程
initializer(*initargs)
在启动时都会调用。
请注意,池对象的方法只应由创建池的进程调用。
版本2.7中的新增功能:maxtasksperchild
是工作进程在退出并被新工作进程替换之前可以完成的任务数量,以便释放未使用的资源。默认的maxtasksperchild
是None
,这意味着工作进程的生存时间与池一样长。
注意
一般情况下,工作进程在Pool
池的工作队列的整个持续时间内都处于活动状态。在其他系统(如Apache,mod_wsgi等)中发现的释放工作人员资源的频繁模式是允许池中的工作人员在退出,清理和产生新的进程之前完成一定量的工作取代旧的。该maxtasksperchild
参数来Pool
自曝这种能力给最终用户。
apply(func[, args[, kwds]])
apply()
内置功能的等价物。它阻塞直到结果准备就绪,所以apply_async()
更适合并行执行工作。此外,func
只能在游泳池的其中一名工作人员身上
执行。
apply_async(func[, args[, kwds[, callback]]])
apply()
返回结果对象的方法的变体。
如果指定了回调
,那么它应该是一个可接受的参数。当结果变为就绪时,将对其应用回调
(除非该呼叫失败)。回调
应该立即完成,否则处理结果的线程将被阻塞。
map(func, iterable[, chunksize])
map()
内置函数的并行等价物(尽管它只支持一个可迭代的
参数)。它阻塞,直到结果准备就绪。
这种方法将迭代器切成许多块,并将它作为单独的任务提交给进程池。这些块的(近似)大小可以通过将chunksize
设置为正整数来指定。
map_async(func, iterable[, chunksize[, callback]])
map()
返回结果对象的方法的变体。
如果指定了回调
,那么它应该是一个可接受的参数。当结果变为就绪时,将对其应用回调
(除非该呼叫失败)。回调
应该立即完成,否则处理结果的线程将被阻塞。
imap(func, iterable[, chunksize])
相当于itertools.imap()
。
所述CHUNKSIZE
参数是与由所使用的一个map()
方法。对于使用了一个较大的值很长iterables CHUNKSIZE
可以使作业完成的太多
比使用的默认值加快1
。
此外,如果CHUNKSIZE
是1
则next()
通过返回的迭代器的方法imap()
方法有一个可选的超时
参数:next(timeout)
将提高multiprocessing.TimeoutError
如果结果不能内退回超时
秒。
imap_unordered(func, iterable[, chunksize])
同样imap()
,除了从返回的迭代结果的排序应该考虑随心所欲。(只有当只有一个工作进程时,才能保证“正确”的顺序。)
close()
防止将更多任务提交到池中。所有任务完成后,工作进程将退出。
terminate()
立即停止工作进程而不完成杰出的工作。当池对象被垃圾收集时terminate()
会立即调用。
join()
等待工作进程退出。必须调用close()
或terminate()
在使用之前join()
。
class multiprocessing.pool.AsyncResult
由Pool.apply_async()
和返回的结果的类Pool.map_async()
。
get([timeout])
到达时返回结果。如果超时
不是None
,并且结果未在超时
秒内到达,multiprocessing.TimeoutError
则会引发。如果远程调用引发异常,那么该异常将被重新调整get()
。
wait([timeout])
等到结果可用或超时
秒数通过。
ready()
返回通话是否完成。
successful()
返回是否完成呼叫而不引发异常。AssertionError
如果结果尚未准备好,将会升高。
以下示例演示了如何使用池:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
2.10. 听众和客户
通常在进程之间传递消息是使用队列或使用Connection
返回的对象完成的Pipe()
。
但是,multiprocessing.connection
模块允许一些额外的灵活性。它基本上为处理套接字或Windows命名管道提供了高层面的面向消息的API,并且还支持使用该模块的摘要式身份验证
hmac
。
multiprocessing.connection.deliver_challenge(connection, authkey)
将随机生成的消息发送到连接的另一端并等待回复。
如果回复与使用authkey
作为密钥的消息摘要匹配,则欢迎消息将发送到连接的另一端。否则AuthenticationError
会提出。
multiprocessing.connection.answer_challenge(connection, authkey)
接收消息,使用authkey
作为密钥来计算
消息的摘要,然后将摘要发回。
如果没有收到欢迎消息,则会AuthenticationError
提出。
multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])
尝试建立到正在使用地址
地址
的侦听器的连接,返回一个Connection
。
连接的类型由族
参数决定,但通常可以省略,因为它通常可以从地址
格式中推断出来。(请参阅地址
格式)
如果身份验证
是True
或authkey
是字符串,则使用摘要式身份验证
。用于身份验证
的关键将是要么AUTHKEY
或者current_process().authkey)
如果AUTHKEY
是None
。如果认证失败,则AuthenticationError
引发。请参阅认证密钥。
class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
绑定套接字或Windows命名管道的包装器,它正在侦听连接。
地址
是监听器对象的绑定套接字或命名管道使用的地址
。
注意
如果使用地址'0.0.0.0',则该地址在Windows上不会是可连接的终点。如果您需要可连接的端点,则应使用“127.0.0.1”。
family
是要使用的套接字(或命名管道)的类型。这可以是一个字符串'AF_INET'
(对于TCP套接字),'AF_UNIX'
(对于Unix域套接字)或'AF_PIPE'
(对于Windows命名管道)。其中只有第一个保证可用。如果家庭
是None
从地址
格式推断出家庭
的话。如果地址
也是None
默认选择。这个默认值是被认为是最快可用的家族。请参阅地址
格式。请注意,如果系列
是'AF_UNIX'
和地址
,None
那么将使用创建的私有临时目录创建套接字tempfile.mkstemp()
。
如果侦听器对象使用套接字,则在绑定后,backlog
(默认为1)将传递给listen()
套接字的方法。
如果认证
是True
(False
默认)或authkey
不是,None
那么使用摘要认证
。
如果authkey
是一个字符串,那么它将被用作认证密钥; 否则它一定是None
。
如果AUTHKEY
是None
和身份验证
是True
然后current_process().authkey
被用作认证密钥。如果AUTHKEY
是None
和身份验证
的False
则没有进行身份认证。如果认证失败,则AuthenticationError
引发。请参阅认证密钥。
accept()
接受监听器对象的绑定套接字或命名管道上的连接并返回一个Connection
对象。如果尝试进行身份验证并失败,则会AuthenticationError
引发此问题。
close()
关闭侦听器对象的绑定套接字或命名管道。当收听器被垃圾收集时,这被自动调用。但明确地称它是明智的。
侦听器对象具有以下只读属性:
address
Listener对象正在使用的地址。
last_accepted
上次接受连接的地址。如果这不可用,那就是了None
。
该模块定义了两个例外:
exception multiprocessing.connection.AuthenticationError
出现身份验证错误时引发异常。
例子
以下服务器代码创建一个'secret password'
用作身份验证密钥的侦听器。然后等待连接并将一些数据发送到客户端:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
2.10.1. 地址格式
- 一个
'AF_INET'
地址是以下形式的元组(hostname, port)
,其中主机名
是一个字符串,端口
是一个整数。
- 一个
'AF_UNIX'
地址是代表文件系统中的文件名的字符串。
An 'AF_PIPE' address is a string of the form
r'\\.\pipe\PipeName'
。要用于Client()
连接到名为ServerName
的远程计算机上的命名管道,应该使用表单的地址。r'\\ServerName\pipe\PipeName'
请注意,任何以两个反斜杠开头的字符串默认为'AF_PIPE'
地址而不是'AF_UNIX'
地址。
2.11. 验证密钥
当使用时Connection.recv
,收到的数据会自动取消。不幸的是,从不可信来源取出数据是一种安全风险。因此Listener
,请Client()
使用该hmac
模块提供摘要式身份验证。
认证密钥是一个可以被认为是密码的字符串:一旦建立连接,两端都会要求证明对方知道认证密钥。(证明两端都使用相同的密钥并没有
涉及发送通过连接键。)
如果请求身份验证但未指定身份验证密钥,current_process().authkey
则使用返回值(请参阅Process
)。该值将由Process
当前进程创建的任何对象自动继承。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,可以在设置它们之间的连接时使用该身份验证密钥。
合适的认证密钥也可以通过使用生成os.urandom()
。
2.12. 记录
有些支持日志记录功能。但请注意,logging
程序包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。
multiprocessing.get_logger()
返回使用的记录器multiprocessing
。如有必要,将创建一个新的。
首次创建时,记录器具有级别logging.NOTSET
并且没有默认处理程序。发送到此记录器的消息不会默认传播到根记录器。
请注意,在Windows上,子进程只会继承父进程记录器的级别 - 记录器的其他任何自定义都不会被继承。
multiprocessing.log_to_stderr()
该函数执行一个调用,get_logger()
但除了返回由get_logger创建的记录器之外,它还添加了一个处理程序,该处理程序将输出发送到sys.stderr
使用格式'[%(levelname)s/%(processName)s] %(message)s'
。
以下是打开日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
除了具有这两个日志记录功能之外,多处理还会公开两个附加日志记录级别属性。这些是SUBWARNING
和SUBDEBUG
。下表说明了这些论文在正常层次结构中的位置。
水平 | 数字值 |
---|---|
SUBWARNING | 25 |
SUBDEBUG | 5 |
有关日志级别的完整表格,请参阅logging
模块。
这些额外的日志级别主要用于多处理模块中的某些调试消息。以下是与上面相同的示例,但已SUBDEBUG
启用:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0
2.13. multiprocessing.dummy模块
multiprocessing.dummy
复制API的multiprocessing
但不过是threading
模块的包装。
3.编程指南
使用时应遵守一定的准则和习惯用语multiprocessing
。
3.1. 所有平台
避免共享状态
尽可能避免在进程间转移大量数据。
最好坚持使用队列或管道进行进程之间的通信,而不是使用threading
模块中较低级别的同步原语。
Picklability
确保代理方法的参数是可挑选的。
代理线程安全
除非用锁保护,否则不要使用来自多个线程的代理对象。
(使用相同
代理的不同进程永远不会有问题。)
加入僵尸进程
在Unix上,当进程结束但尚未加入时,它变成僵尸。不应该有很多,因为每当新进程开始(或被active_children()
调用)时,所有已完成的尚未加入的进程都将被加入。也调用完成的进程Process.is_alive
将加入该进程。即使如此,明确加入您开始的所有流程也可能是一种很好的做法。
继承不如pickle/unpickle
在Windows上,许多类型multiprocessing
需要可选择,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在别处创建的共享资源的进程可以从祖先进程继承。
避免终止进程
使用Process.terminate
方法停止进程可能会导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或不可用于其他进程。
因此,最好只考虑Process.terminate
在不使用任何共享资源的进程上使用。
加入使用队列的进程
请记住,已将项目放入队列的进程将在终止之前等待,直到所有缓冲项目都由“馈线”线程馈送到基础管道。(子进程可以调用cancel_join_thread()
队列的方法来避免这种行为。)
这意味着无论什么时候使用队列,都需要确保放入队列中的所有项目在加入之前最终都会被删除。否则,您无法确定将项目放入队列的进程将终止。还要记住,非守护进程会自动加入。
一个会导致死锁的例子如下:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
这里的一个解决方法是交换最后两行(或者直接删除p.join()
行)。
明确地将资源传递给子进程
在Unix上,子进程可以使用全局资源在父进程中创建的共享资源。但是,最好将该对象作为参数传递给子进程的构造函数。
除了使代码(可能)与Windows兼容外,这也确保了只要子进程仍然存在,对象将不会被垃圾收集到父进程中。如果在父进程中垃圾收集对象时某些资源被释放,这可能很重要。
举例来说
from multiprocessing import Process, Lock
def f():
... do something using "lock" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
应改写为
from multiprocessing import Process, Lock
def f(l):
... do something using "l" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()
谨防用sys.stdin
“像对象的文件”
multiprocessing
原本被无条件地称为:
os.close(sys.stdin.fileno())
在multiprocessing.Process._bootstrap()
方法中 - 这导致了流程中的问题。这已被更改为:
sys.stdin.close()
sys.stdin = open(os.devnull)
它解决了进程之间相互冲突的基本问题,导致了错误的文件描述符错误,但却给应用程序带来了潜在的危险,这些应用程序将替换sys.stdin()
为带有输出缓冲的“类文件对象”。这种危险是,如果多个进程调用close()
这个类文件对象,可能会导致相同的数据被多次刷新到对象,从而导致损坏。
如果您编写一个类似文件的对象并实现自己的缓存,那么无论何时添加到缓存中,都可以通过存储pid来使其更安全,并在pid更改时放弃缓存。例如:
@property
def cache(self):
pid = os.getpid()
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache
有关更多信息,请参阅问题5155,问题5313和问题5331
3.2. 视窗
由于Windows缺少os.fork()
它有一些额外的限制:
更多的可拾取性
确保所有参数Process.__init__()
都是可选择的。这尤其意味着绑定或未绑定的方法不能直接用作target
Windows上的参数 - 只需定义一个函数并使用它即可。
此外,如果您继承了子类,Process
那么确保在Process.start
调用方法时可以选择实例。
全局变量
请记住,如果在子进程中运行的代码尝试访问全局变量,则它所看到的值(如果有)可能与Process.start
被调用时父进程中的值不同。
但是,仅仅是模块级别常量的全局变量不会导致问题。
主模块安全导入
确保主模块可以通过新的Python解释器安全地导入,而不会引起意外的副作用(例如开始一个新过程)。
例如,在Windows下运行以下模块将会失败,并显示RuntimeError
:
from multiprocessing import Process
def foo():
print 'hello'
p = Process(target=foo)
p.start()
相反,应该使用if __name__ == '__main__':
以下方式来保护程序的“切入点” :
from multiprocessing import Process, freeze_support
def foo():
print 'hello'
if __name__ == '__main__':
freeze_support()
p = Process(target=foo)
p.start()
(freeze_support()
如果程序正常运行而不是冻结,可以省略该行。)
这允许新产生的Python解释器安全地导入模块,然后运行模块的foo()
功能。
如果在主模块中创建池或管理器,则会应用类似的限制。
4.例子
演示如何创建和使用定制的管理人员和代理人:
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module( make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
使用Pool
:
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
同步类型,如锁,条件和队列:
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError('there should be no positive refcounts left')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit(2)
test(namespace)
显示如何使用队列将任务提供给工作进程集合并收集结果的示例:
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
一个工作进程池如何SimpleHTTPServer.HttpServer
在共享一个监听套接字时分别运行一个实例。
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
一些简单的基准测试比较multiprocessing
有threading
:
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()