[Python] multiprocessing 備忘録

プロセス間通信

プロセスというのは基本的にメモリ空間が共有されないので、プロセス間で何かやり取りしたくなったらプロセス間通信(IPC)を行う必要があります。
もし、POSIXやSystem Vのプロセス間通信が行いたい場合はこちらを参照してください。

multiprocessingのプロセス間で通信を行う方法には、大きく分けて「Pipe」「Queue」「共有オブジェクト」「ソケット」があります。

パイプ(PIPE)

Pipeは2点間で情報をやり取りを行うための基本的な通信です。
Pipe(duplex=True)がConnectionを2つ返却します。
作成したパイプの片方を引数として渡し、send(), recv()することでプロセス間の通信を実現します。

双方向のパイプを関数の引数に渡して利用します。以降の別の通信方法でも同様に引数に渡すことで共有されると覚えておきましょう。

console stdout
from multiprocessing import Process, Pipe
 
 
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    parent_conn.send('a')
    parent_conn.send(['b', 'c'])
 
    child_conn.recv()
    child_conn.recv()
 
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
 
    parent_conn.close()  # クローズする
    child_conn.close()  # クローズする
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
'a'
['b', 'c']
[42, None, 'hello']

「duplexがTrue(デフォルト)ならパイプは双方向性です。duplexがFalseならパイプは一方向性で、conn1はメッセージの受信専用、conn2はメッセージの送信専用になります。」だそうです。
本来パイプというものは片方向の通信を実現するものなので、双方向のコネクションはおそらく内部的には2セットのパイプを持っているのでしょう。
send(オブジェクト)メソッドでパイプに蓄積されたデータをもう片方のパイプのrecv()メソッドによって取り出します。取り出し順はFIFOで、取り出すデータがない場合はブロックされます。

取り出すデータがあるかどうかはpoll()メソッドで確認できます。

>>> p1, p2 = Pipe()
>>> p2.poll()
False
>>> p1.send(1)
>>> p2.poll()
True
>>> p2.recv()
1
>>> p2.poll()
False

ちなみに送信されるデータはPickle化されるので単純なスカラ値でなくとも送受信可能ですが、一時的な値や複雑な参照を持ったオブジェクトだとうまく受け渡せないことがあるかもしれないので注意しましょう。
以降のQueueでも同様です。

キュー(QUEUE)

QueueはFIFO構造でデータをやり取りするためのオブジェクトです。Pipeによって実装されているようです。(参考)気になる方はmultiprocessing/queue.pyの実装を読んでみましょう。

Pipeは2つのオブジェクトを返却し片方を共有することで2点間の通信を実現しましたが、Queueは1つのオブジェクトのみを返却します。
コネクションオブジェクトと同様に引数として渡すことで通信を実現します。イメージ的には通信というより共有に近いと思います。

queue.Queueと使い方は同じそうです。
簡単に使い方を見てみましょう。ただ使い方だけ見たいだけなので同一プロセス間で試します。基本はput()で積んだオブジェクトをget()で取り出すことを覚えてれば大丈夫です。

console stdout
from multiprocessing import Queue
 
q = Queue(3)  # キューの最大サイズを3に限定する。デフォルトは0(無限)
q.put(1, block=False)
q.put(2, block=False)
q.put(3, block=False)
q.put(4, block=False)
#
#
#
#
#
q.get(block=False)
q.get(block=False)
q.get(block=False)
q.get(block=False)
#
#
#
#
#
#
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full
#
1
2
3
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 107, in get
    raise Empty
queue.Empty

キューの最大サイズに達した状態でput()する、キューが空の状態でget()するとブロックされます。
このブロッキングを制御する引数として、blockとtimeoutを持ちます。

  • block引数にFalseを指定した場合、ブロックされるような状態だと例外が発生します。
  • timeout引数に秒数を指定した場合、その秒数が経過してもブロックされるような状態だと例外が発生します。
  • getの場合は「Empty」例外、putの場合は「Full」例外が発生します。

セマフォ(SEMAPHORE)

Semaphoreは資源の上限を管理するための仕組みです。通信というよりプロセスを超えて資源を制御する仕組みって感じですね。まぁプロセス間通信の一種なのでここで取り上げます。
たとえばQueueのような共有資源に制限をかける場合にはSemaphoreまたはBoundedSemaphoreを使うことで実現できます。

  • acquire()で資源を獲得します。資源を獲得できるときはTrueを返却しますが、できないときはブロックされます。block引数がFalseならFalseを返却します。
  • release()で資源を解放します。BoundedSemphoreを使った場合、獲得した資源以上に解放しようとするとValueError例外を発生させます。
  • 内部的にはカウンタを増減させているだけです。get_value()でカウンタの値を確認できます。

コネクションオブジェクトやキューと同様に引数として渡すことでその値を共有できます。というだけの話なので仕組みだけ見ておきましょう。

console stdout
from multiprocessing import Semaphore
s = Semaphore(3)
s.get_value()
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.get_value()
 
s.release()
s.release()
s.release()
s.release()
s.get_value()
#
#
3
True
True
True
False
0
#
#
#
#
#
4
console stdout
from multiprocessing import BoundedSemaphore
s = BoundedSemaphore(3)
s.get_value()
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.get_value()
 
s.release()
s.release()
s.release()
s.release()
#
#
#
s.get_value()
#
#
3
True
True
True
False
0
#
#
#
#
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: semaphore or lock released too many times
#
3

ロック(LOCK)

Lockは最大資源数が1で固定されたBoundedSemaphore相当の動作をします。

>>> from multiprocessing import Lock
>>> l = Lock()
>>> l.acquire(block=False)
True
>>> l.acquire(block=False)
False
>>> l.release()
>>> l.release()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: semaphore or lock released too many times

再帰ロック(RLOCK)

RLockは再帰ロックオブジェクトということでこれだけだとよくわからないんですが、操作元が同一プロセスか否かによって動作が異なるLockです。
Lockとは違いロックを獲得したプロセスからであれば何回であっても獲得を行えます。
そしてロックを獲得したプロセスからでなければ解放は行えませんし、獲得した分だけ解放されなければ他のプロセスからロックを獲得できません。

console stdout
from multiprocessing import Process, RLock
l = RLock()
l
#
l.acquire(block=False)
#
l
#
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
#
p = Process(target=(lambda l: print(l.release())), args=(l,))
p.start()
p.join()
#
#
#
#
#
#
#
#
l.acquire(block=False)
#
l
#
l.release()
l
#
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
#
#
l.release()
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
#
l
#
#
<RLock(None, 0)>
#
True
#
<RLock(MainProcess, 1)>
#
#
#
False
#
#
#
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<stdin>", line 1, in <lambda>
AssertionError: attempt to release recursive lock not owned by thread
#
True
#
<RLock(MainProcess, 2)>
#
#
<RLock(MainProcess, 1)>
#
#
#
False
#
#
#
#
#
True
<RLock(SomeOtherProcess, nonzero)>

共有オブジェクト

共有メモリ空間にデータを置くことで通信を行います。共有メモリにはmmapモジュールが使われているようです。
Cと互換性のあるデータ型しか保存できません。興味のある方はsharedtypesモジュールを参照してください。

共有したい変数を生成し、パイプやキューと同じように引数として渡して使います。

console stdout
import time
from multiprocessing import Process, Value
v = Value('i', 100)
 
def f(v):
    print(v.value)
    time.sleep(5)
    print(v.value)
 
p = Process(target=f, args=(v,))
p.start()
time.sleep(1)
v.value += 1
#
#
#
#
#
#
#
#
#
#
100
#
#
101  # 5秒後に別プロセスによって表示される
console stdout
import time
from multiprocessing import Process, Array
a = Array('i', range(10))
 
def f(a):
    print(a[:])
    time.sleep(5)
    print(a[:])
 
p = Process(target=f, args=(a,))
p.start()
time.sleep(1)
a[0] = 100
#
#
#
#
#
#
#
#
#
#
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
#
#
[100, 1, 2, 3, 4, 5, 6, 7, 8, 9]  # 5秒後に別プロセスによって表示される

また、共有オブジェクトはロックによってアクセスを制限できます。

console stdout
 
import time
from multiprocessing import Process, Array
a = Array('i', range(10))
 
def f(a):
    a[0] = 100
 
 
a.acquire()
 
p = Process(target=f, args=(a,))
p.start()
 
a[:]
time.sleep(1)
a.release()
time.sleep(1)
a[:]
#
#
#
#
#
#
#
#
#
#
#
#
#
#
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
#
#
#
[100, 1, 2, 3, 4, 5, 6, 7, 8, 9]

ここでは書き込みがロックされましたが、参照される場合もロックされます。

サーバープロセス

メモリ領域を共有するという意味で、Managerというものがあります。ManagerはPythonプロセスなので柔軟なオブジェクトを表現できます。
親プロセスが終了するかガベージコレクションされると停止するそうです。

console stdout
from multiprocessing import Process, Manager
 
def f(d, l):
    d['c'] = 8
    l.append(9)
 
m = Manager()
m.address
d = m.dict({'a': 1, 'b': [2, 3, 4]})
l = m.list([5, 6, '7'])
dict(d)
list(l)
p = Process(target=f, args=(d, l))
p.start()
p.join()
dict(d)
list(l)
m.shutdown()
dict(d)
#
#
#
#
#
#
#
'/tmp/pymp-rv1zf_84/listener-fh5hnrf5'
#
#
{'b': [2, 3, 4], 'a': 1}
[5, 6, '7']
#
#
#
{'c': 8, 'b': [2, 3, 4], 'a': 1}
[5, 6, '7', 9]
# shutdownするとプロセスがクローズされるため参照できなくなる
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 2, in keys
  File "/usr/lib/python3.5/multiprocessing/managers.py", line 716, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

ただし、共有メモリと比べると少し遅いそうなのでうまく使いわけてください。このへんはあんまり良く調べてない。

ソケット通信

上記で説明したように、プロセス間で通信をする場合は基本的にパイプやキューを使うわけですが、みんな大好きなソケット通信もできます。
ListenerClientを使います。言わずもがなそれぞれサーバー・クライアントに該当します。

下記のコードはリファレンスのコードほぼそのままです。

ターミナル1(サーバー) ターミナル2(クライアント)
<pre>
from multiprocessing.connection import Listener
from array import array
 
address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
 
with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)
        conn.send([2.25, None, 'junk', float])
        conn.send_bytes(b'hello')
        conn.send_bytes(array('i', [42, 1729]))
from multiprocessing.connection import Client
from array import array
 
address = ('localhost', 6000)
 
with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]
    print(conn.recv_bytes())            # => 'hello'
    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

実行するとターミナル2にターミナル1から送った内容が表示されます。

[1, 'a']
b'hello'
8
array('i', [42, 1729, 0, 0, 0])

やってる事自体は単純ですが、処理の流れを解説すると以下のようになります

  1. サーバーはlocalhostの6000番ポートで待つ。認証キーとして’secret password’を使う
  2. クライアントはサーバーが待つlocalhostの6000番ポートに接続する。認証キーとして’secret password’を使う
    • クライアントが認証に失敗すると「multiprocessing.context.AuthenticationError: digest received was wrong」となり、サーバー側もクライアント側も両方中断される。
    • ホストやポートが違うと「ConnectionRefusedError」となり単に接続できないだけ
  3. クライアントがサーバーに接続するとaccept()によってコネクションオブジェクト(conn)が生成される
  4. サーバーはコネクションを使ってクライアントにデータを送りつける
    • サーバーはrecv*()がないため処理を待ったりはしない
    • 本来サーバープロセスであればループさせることで引き続き受け付けるが、そんなものはないのでサーバー側はこれで終了
  5. クライアントはサーバーから受け取ったデータから期待するオブジェクトをおくられた分だけ取り出す
    • recvは受け取ったバッファをオブジェクトとして解析して表示する。ただし、複雑なクラスは別プロセスではunpickleできない可能性があるので送受信するのは基本型に絞ったほうが良い。
    • recv_bytesはバッファをバイトとして解釈する。send_bytesで送ったバッファをrecvとかで取り出したりすると「_pickle.UnpicklingError: invalid load key, ‘w’.」みたいなエラーが発生するので注意。
    • recv_bytes_intoは読み込んだバッファをarr変数に格納する。バイト数がメソッドの返却値として返却される。
    • タイミングによってはクライアント側が先に行われるかもしれないが、クライアントはrecv*()によっていつまでのサーバーからのデータを待つのでどちらが先に実行されようと実行結果は同じ
    • 送られたデータはバッファ経由で読むので送られたタイミングと同時である必要はない

wait

waitは多重化したコネクションオブジェクトが読み込み可能になるまでブロックしてくれる関数です。そうです、selectと似てますね。

console stdout
from multiprocessing import Process, Pipe
from multiprocessing.connection import Listener, Client, wait
 
 
def receive(readers):
    while readers:
        print('receiving...')
        for r in wait(readers):
            try:
                print('received {}:'.format(r.fileno()), r.recv())
            except EOFError:
                readers.remove(r)
    print('finished')
 
 
(r1, w1), (r2, w2) = Pipe(), Pipe()
listener, w3 = Listener(('localhost', 9685)), Client(('localhost', 9685))
r3 = listener.accept()
print('r1: {}, r2: {}, r3: {}'.format(r1.fileno(), r2.fileno(), r3.fileno()))
p = Process(target=receive, args=([r1, r2, r3],))
p.start()
# 以下は個別に手入力
w1.send(100)
 
 
w2.send(200)
 
 
w3.send(300)
 
 
w1.send(400)
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
r1: 3, r2: 5, r3: 9
#
receiving...
#
received 3: 100
receiving...
#
received 5: 200
receiving...
#
received 9: 300
receiving...
#
received 3: 400
receiving...

上記はパイプとソケットを多重化して読み込み可能になったら(データが来たら)送られてきたデータを表示する単純な例です。

リファレンスの例では書き込み側のパイプを閉じると対応する読み込み側のパイプで検知するという旨のことが書いてありますが、どうやらwaitを実行しているのと同じプロセスで実行する必要があるようです。
というわけで以下の例は別プロセスからパイプ経由で値を受け取った後にクローズするという例です。

import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
from threading import Thread
 
def foo(w):
    for i in range(5):
        w.send(i)
 
if __name__ == '__main__':
    r, w = Pipe(duplex=True)
    readers = [r]
    Process(target=foo, args=(w,)).start()
    def closing(w):
        time.sleep(5)
        w.close()
    Thread(target=closing, args=(w,)).start()
 
    while readers:
        for r in wait(readers):
            try:
                print(r.recv())
            except EOFError:
                readers.remove(r)
    print('finished')

「0」〜「4」が表示されたあと「finished」と表示されれば期待通りです。
waitとcloseを並列して表現できないのでスレッドを使ってしまいましたが、同一プロセス内でのclose()をwaitは検知できるということがわかればよいです。

1 2 3