[Python] multiprocessing 備忘録
2017-02-20

普段あまり使うことのない multiprocessing について勉強したので備忘録としてまとめておきます。

この記事ではPython3を使います。 なお、基本的に動作確認はLinux/Macでしておりその他OS(特にWindows)での動作確認はできてないのであしからず。

さて、Python(POSIX系)でプロセスを作り出す最も原始的な方法は os.fork() を使うことです。

os.fork は実行プロセスのクローンを作ります。

生成された側のプロセス(子プロセス)では os.fork() から 0 が返却されるため、 この値によって分岐することで子プロセスに任意の動作をさせることができるわけですね。

こんなC言語みたいなことやってると辛くなるので今回は multiprocessing という標準ライブラリを使ってプロセスを操作していきましょう。

Process

Processクラス を使うとプロセスが一つ作られます。

target に関数を与え、関数に与える引数を args, kwargs で指定すると、別プロセスでその関数が実行されます。

第一仮引数は group なんですが、これは threading モジュールとの互換性のために存在するだけなので、省略するか None を指定する必要があります。

console stdout
import time
from multiprocessing import Process

p1 = Process(target=print, args=(1, 2, 3), kwargs={'sep': '-'})
p1.start()  # 実行開始
p1.join()  # p.start()が終わるまで待つ
p1.start()  # 再実行不可

p2 = Process(target=lambda: time.sleep(10))
p2.start()
p2.terminate()

print(p1, p2)
# -
# -
# -
# -
1-2-3
# -
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/process.py", line 99, in start
    assert self._popen is None, 'cannot start a process twice'
AssertionError: cannot start a process twice
# -
<Process(Process-1, stopped)> <Process(Process-2, stopped[SIGTERM])>

start() メソッドで処理が開始され terminate() で中断します。

ここではほとんど関係ありませんが、 join() メソッドを実行すると処理がブロックされます。

ブロックとは処理を待たされるという意味で、以降も出てくる表現です。

警告

  • 一度実行した Process インスタンスは再度実行できないことに注意してください。
    • 同じ処理をしたい場合はインスタンスを作りなおします。
  • また、今回は説明しませんでしたが daemon 属性が True のプロセスは親プロセスの実行と同時にお亡くなりになります。
    • これは start() 前に指定されている必要があります。
    • 通常は子プロセスの処理が全て終了しない限り親プロセスは死にません。

Pool

Poolクラス は予め作成したプロセスに処理を割り当てるための仕組みです。 プールされたプロセスを使い回しながら処理を進めていきます。

例えば以下のように使います。

from multiprocessing import Pool
import os
import time

start = time.time()

def f(x):
    time.sleep(1)
    value = x * x
    print('{}s passed...\t{}\t(pid:{})'.format(int(time.time() - start), value, os.getpid()))
    return value

with Pool(processes=3) as p:
    print(p.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))


# コンテキストマネージャを使わずに以下のように書いても良い
# Pool(3).map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
1s passed...    1       (pid:7484)
1s passed...    4       (pid:7486)
1s passed...    9       (pid:7485)
2s passed...    16      (pid:7486)
2s passed...    25      (pid:7484)
2s passed...    36      (pid:7485)
3s passed...    49      (pid:7486)
3s passed...    64      (pid:7484)
3s passed...    81      (pid:7485)
4s passed...    100     (pid:7486)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

上記では複数の値に対して同じ処理を行うためのメソッドとして map を使っています。使い方はビルトインの map と同じです。

map はすべてのシーケンスを処理した結果をリストで返却するため、終了するまでブロックされます。 シーケンスの要素にタプルやリストを指定するとそれを展開する starmap というメソッドもあります。

また、イテレータを返却する imap があります。 イテレーションした時に処理が終了している結果だけ取り出すことができます。終了していない分はブロックされます。

from multiprocessing import Pool
import time

def f(x):
    time.sleep(10)
    return x * x

p = Pool(processes=3)
it = p.imap(f, range(10))
next(it)
next(it)
next(it)

上記の例では10秒ごとに3個ずつ結果を取り出せます。

imapmap_async はいずれも発行された時点でバックグラウンド処理を進めますが、 map_async はすべての処理が終わっていないと結果が得られないのに対し、 imap は終わっている分の結果だけイテレーションによってを取り出すことができます。

プールしたプロセスを一つずつ利用するメソッドに apply()apply_async() があります。

from multiprocessing import Pool
import time

def f(x):
    time.sleep(10)
    return x * x

p = Pool(processes=3)
result1 = p.apply(f, args=(2,))  # 10秒後に結果が返却される
result2 = p.apply_async(f, args=(3,))  # 結果はすぐに返却されるが、評価可能になるのは10秒後
print(result1, result2)
print(result2.get())  # 値を評価する。評価できるまでブロックされる

apply_asyncAsyncResult オブジェクトを返却します。

内部的には apply メソッドは apply_async メソッドを呼んだ結果を返しているだけです。

次はプロセス間通信についてです

プロセス間通信

プロセスというのは基本的にメモリ空間が共有されないので、プロセス間で何かやり取りしたくなったらプロセス間通信(IPC)を行う必要があります。

もし、 POSIXSystemV のプロセス間通信が行いたい場合は こちら を参照してください。

パイプ(PIPE)

Pipeクラス は2点間で情報をやり取りを行うための基本的な通信です。

Pipe(duplex=True)Connection を2つ返却します。

作成したパイプの片方を引数として渡し、 send(), recv() することでプロセス間の通信を実現します。

双方向のパイプを関数の引数に渡して利用します。以降の別の通信方法でも同様に引数に渡すことで共有されると覚えておきましょう。

console stdout
from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    parent_conn.send('a')
    parent_conn.send(['b', 'c'])

    child_conn.recv()
    child_conn.recv()

    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"

    parent_conn.close()  # クローズする
    child_conn.close()  # クローズする
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
'a'
['b', 'c']
[42, None, 'hello']

multiprocessing.Pipe によると

duplexがTrue(デフォルト)ならパイプは双方向性です。

duplexがFalseならパイプは一方向性で、conn1はメッセージの受信専用、conn2はメッセージの送信専用になります。

だそうです。

本来パイプというものは片方向の通信を実現するものなので、双方向のコネクションはおそらく内部的には2セットのパイプを持っているのでしょう。

send(オブジェクト)メソッドでパイプに蓄積されたデータをもう片方のパイプのrecv()メソッドによって取り出します。取り出し順はFIFOで、取り出すデータがない場合はブロックされます。

取り出すデータがあるかどうかは poll() メソッドで確認できます。

>>> p1, p2 = Pipe()
>>> p2.poll()
False
>>> p1.send(1)
>>> p2.poll()
True
>>> p2.recv()
1
>>> p2.poll()
False

ちなみに送信されるデータはPickle化されるので単純なスカラ値でなくとも送受信可能ですが、一時的な値や複雑な参照を持ったオブジェクトだとうまく受け渡せないことがあるかもしれないので注意しましょう。 以降のQueueでも同様です。

キュー(QUEUE)

Queueクラス はFIFO構造でデータをやり取りするためのオブジェクトです。Pipeによって実装されているようです。 (参考)

気になる方は multiprocessing/queue.py の実装を読んでみましょう。

Pipe は2つのオブジェクトを返却し片方を共有することで2点間の通信を実現しましたが、 Queue は1つのオブジェクトのみを返却します。

コネクションオブジェクトと同様に引数として渡すことで通信を実現します。 イメージ的には通信というより共有に近いと思います。

queue.Queue と使い方は同じそうです。

簡単に使い方を見てみましょう。ただ使い方だけ見たいだけなので同一プロセス間で試します。

基本は put() で積んだオブジェクトを get() で取り出すことを覚えてれば大丈夫です。

console stdout
from multiprocessing import Queue

q = Queue(3)  # キューの最大サイズを3に限定する。デフォルトは0(無限)
q.put(1, block=False)
q.put(2, block=False)
q.put(3, block=False)
q.put(4, block=False)
# -
# -
# -
# -
# -
q.get(block=False)
q.get(block=False)
q.get(block=False)
q.get(block=False)
# -
# -
# -
# -
# -
# -
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full
# -
1
2
3
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 107, in get
    raise Empty
queue.Empty

キューの最大サイズに達した状態でput()する、キューが空の状態でget()するとブロックされます。

このブロッキングを制御する引数として、blockとtimeoutを持ちます。

  • block引数にFalseを指定した場合、ブロックされるような状態だと例外が発生します。
  • timeout引数に秒数を指定した場合、その秒数が経過してもブロックされるような状態だと例外が発生します。
  • getの場合は「Empty」例外、putの場合は「Full」例外が発生します。

セマフォ(SEMAPHORE)

Semaphoreクラス は資源の上限を管理するための仕組みです。

通信というよりプロセスを超えて資源を制御する仕組みって感じですね。まぁプロセス間通信の一種なのでここで取り上げます。

たとえばQueueのような共有資源に制限をかける場合には Semaphore または BoundedSemaphore を使うことで実現できます。 使えるメソッドは以下のような感じ。

acquire()
  • 資源を獲得します。
  • 資源を獲得できるときはTrueを返却しますが、できないときはブロックされます。
  • block引数False なら False を返却します。
release()
  • 資源を解放します。
  • BoundedSemphore を使った場合、獲得した資源以上に解放しようとすると ValueError 例外を発生させます。
get_value()
  • カウンタの値を確認できます。

コネクションオブジェクトやキューと同様に引数として渡すことでその値を共有できます。

内部的には カウンタを増減させているだけです。仕組みを見ておきましょう。

console stdout
from multiprocessing import Semaphore
s = Semaphore(3)
s.get_value()
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.get_value()

s.release()
s.release()
s.release()
s.release()
s.get_value()
# -
# -
3
True
True
True
False
0
# -
# -
# -
# -
# -
4
console stdout
from multiprocessing import BoundedSemaphore
s = BoundedSemaphore(3)
s.get_value()
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.acquire(block=False)
s.get_value()

s.release()
s.release()
s.release()
s.release()
# -
# -
# -
s.get_value()
# -
# -
3
True
True
True
False
0
# -
# -
# -
# -
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: semaphore or lock released too many times
# -
3

ロック(LOCK)

Lockクラス は最大資源数が 1 で固定された BoundedSemaphore 相当の動作をします。

>>> from multiprocessing import Lock
>>> l = Lock()
>>> l.acquire(block=False)
True
>>> l.acquire(block=False)
False
>>> l.release()
>>> l.release()
Traceback (most recent call last):
  File "", line 1, in
ValueError: semaphore or lock released too many times

再帰ロック(RLOCK)

RLockクラス は再帰ロックオブジェクトということでこれだけだとよくわからないんですが、操作元が同一プロセスか否かによって動作が異なるLockです。

Lockとは違いロックを獲得したプロセスからであれば何回であっても獲得を行えます。

そしてロックを獲得したプロセスからでなければ解放は行えませんし、 獲得した分だけ解放されなければ他のプロセスからロックを獲得できません。

console stdout
from multiprocessing import Process, RLock
l = RLock()
l
# -
l.acquire(block=False)
# -
l
# -
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
# -
p = Process(target=(lambda l: print(l.release())), args=(l,))
p.start()
p.join()
# -
# -
# -
# -
# -
# -
# -
# -
l.acquire(block=False)
# -
l
# -
l.release()
l
# -
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
# -
# -
l.release()
p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,))
p.start()
p.join()
# -
l
# -
# -
<RLock(None, 0)>
# -
True
# -
<RLock(MainProcess, 1)>
# -
# -
# -
False
# -
# -
# -
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<stdin>", line 1, in <lambda>
AssertionError: attempt to release recursive lock not owned by thread
# -
True
# -
<RLock(MainProcess, 2)>
# -
# -
<RLock(MainProcess, 1)>
# -
# -
# -
False
# -
# -
# -
# -
# -
True
<RLock(SomeOtherProcess, nonzero)>

共有オブジェクト

共有メモリ空間にデータを置くことで通信を行います。共有メモリには mmap モジュールが使われているようです。

C と互換性のあるデータ型しか保存できません。

興味のある方は sharedtypes モジュールを参照してください。

共有したい変数を生成し、パイプやキューと同じように引数として渡して使います。

console stdout
import time
from multiprocessing import Process, Value
v = Value('i', 100)

def f(v):
    print(v.value)
    time.sleep(5)
    print(v.value)

p = Process(target=f, args=(v,))
p.start()
time.sleep(1)
v.value += 1
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
100
# -
# -
101  # 5秒後に別プロセスによって表示される
console stdout
import time
from multiprocessing import Process, Array
a = Array('i', range(10))

def f(a):
    print(a[:])
    time.sleep(5)
    print(a[:])

p = Process(target=f, args=(a,))
p.start()
time.sleep(1)
a[0] = 100
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# -
# -
[100, 1, 2, 3, 4, 5, 6, 7, 8, 9]  # 5秒後に別プロセスによって表示される

また、共有オブジェクトはロックによってアクセスを制限できます。

console stdout
import time
from multiprocessing import Process, Array
a = Array('i', range(10))

def f(a):
    a[0] = 100


a.acquire()

p = Process(target=f, args=(a,))
p.start()

a[:]
time.sleep(1)
a.release()
time.sleep(1)
a[:]
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# -
# -
# -
[100, 1, 2, 3, 4, 5, 6, 7, 8, 9]

ここでは書き込みがロックされましたが、参照される場合もロックされます。

サーバープロセス

メモリ領域を共有するという意味で、 Managerクラス というものがあります。 ManagerはPythonプロセスなので柔軟なオブジェクトを表現できます。

親プロセスが終了するかガベージコレクションされると停止するそうです。

console stdout
from multiprocessing import Process, Manager

def f(d, l):
    d['c'] = 8
    l.append(9)

m = Manager()
m.address
d = m.dict({'a': 1, 'b': [2, 3, 4]})
l = m.list([5, 6, '7'])
dict(d)
list(l)
p = Process(target=f, args=(d, l))
p.start()
p.join()
dict(d)
list(l)
m.shutdown()
dict(d)
# -
# -
# -
# -
# -
# -
# -
'/tmp/pymp-rv1zf_84/listener-fh5hnrf5'
# -
# -
{'b': [2, 3, 4], 'a': 1}
[5, 6, '7']
# -
# -
# -
{'c': 8, 'b': [2, 3, 4], 'a': 1}
[5, 6, '7', 9]
# shutdownするとプロセスがクローズされるため参照できなくなる
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 2, in keys
  File "/usr/lib/python3.5/multiprocessing/managers.py", line 716, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

ただし、共有メモリと比べると少し遅いそうなのでうまく使いわけてください。 このへんはあんまり良く調べてない。

ソケット通信

上記で説明したように、プロセス間で通信をする場合は基本的にパイプやキューを使うわけですが、みんな大好きなソケット通信もできます。

ListenerクラスClientクラス を使います。

言わずもがなそれぞれサーバー・クライアントに該当します。

下記のコードはリファレンスのコードほぼそのままです。

ターミナル1(サーバ) ターミナル2(クライアント)
from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)
        conn.send([2.25, None, 'junk', float])
        conn.send_bytes(b'hello')
        conn.send_bytes(array('i', [42, 1729]))
from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]
    print(conn.recv_bytes())            # => 'hello'
    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

実行するとターミナル2にターミナル1から送った内容が表示されます。

[1, 'a']
b'hello'
8
array('i', [42, 1729, 0, 0, 0])

やってる事自体は単純ですが、一応処理の流れを解説してみます。

  1. サーバーはlocalhostの6000番ポートで待つ。認証キーとして'secret password'を使う
  2. クライアントはサーバーが待つlocalhostの6000番ポートに接続する。認証キーとして'secret password'を使う
    • クライアントが認証に失敗すると multiprocessing.context.AuthenticationError: digest received was wrong となり、 サーバー側もクライアント側も両方中断される。
    • ホストやポートが違うと ConnectionRefusedError となり単に接続できないだけ
  3. クライアントがサーバーに接続すると accept() によってコネクションオブジェクト(conn)が生成される
  4. サーバーはコネクションを使ってクライアントにデータを送りつける
    • サーバーは recv*() がないため処理を待ったりはしない
    • 本来サーバープロセスであればループさせることで引き続き受け付けるが、そんなものはないのでサーバー側はこれで終了
  5. クライアントはサーバーから受け取ったデータから期待するオブジェクトをおくられた分だけ取り出す
    • recv は受け取ったバッファをオブジェクトとして解析して表示する。
      • ただし、複雑なクラスは別プロセスでは unpickle できない可能性があるので送受信するのは基本型に絞ったほうが良い。
    • recv_bytes はバッファをバイトとして解釈する。
      • send_bytes で送ったバッファを recv とかで取り出したりすると _pickle.UnpicklingError: invalid load key, 'w'. みたいなエラーが発生するので注意。
    • recv_bytes_into は読み込んだバッファをarr変数に格納する。
      • バイト数がメソッドの返却値として返却される。
    • タイミングによってはクライアント側が先に行われるかもしれないが、クライアントは recv*() によっていつまでのサーバーからのデータを待つのでどちらが先に実行されようと実行結果は同じ
    • 送られたデータはバッファ経由で読むので送られたタイミングと同時である必要はない

wait

wait関数 は多重化したコネクションオブジェクトが読み込み可能になるまでブロックしてくれる関数です。 そうです、selectと似てますね。

console stdout
from multiprocessing import Process, Pipe
from multiprocessing.connection import Listener, Client, wait


def receive(readers):
    while readers:
        print('receiving...')
        for r in wait(readers):
            try:
                print('received {}:'.format(r.fileno()), r.recv())
            except EOFError:
                readers.remove(r)
    print('finished')


(r1, w1), (r2, w2) = Pipe(), Pipe()
listener, w3 = Listener(('localhost', 9685)), Client(('localhost', 9685))
r3 = listener.accept()
print('r1: {}, r2: {}, r3: {}'.format(r1.fileno(), r2.fileno(), r3.fileno()))
p = Process(target=receive, args=([r1, r2, r3],))
p.start()
# 以下は個別に手入力
w1.send(100)


w2.send(200)


w3.send(300)


w1.send(400)
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
# -
r1: 3, r2: 5, r3: 9
# -
receiving...
# -
received 3: 100
receiving...
# -
received 5: 200
receiving...
# -
received 9: 300
receiving...
# -
received 3: 400
receiving...

上記はパイプとソケットを多重化して読み込み可能になったら(データが来たら)送られてきたデータを表示する単純な例です。

リファレンスの例では書き込み側のパイプを閉じると対応する読み込み側のパイプで検知するという旨のことが書いてありますが、 どうやら wait を実行しているのと同じプロセスで実行する必要があるようです。

というわけで以下の例は別プロセスからパイプ経由で値を受け取った後にクローズするという例です。

import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
from threading import Thread

def foo(w):
    for i in range(5):
        w.send(i)

if __name__ == '__main__':
    r, w = Pipe(duplex=True)
    readers = [r]
    Process(target=foo, args=(w,)).start()
    def closing(w):
        time.sleep(5)
        w.close()
    Thread(target=closing, args=(w,)).start()

    while readers:
        for r in wait(readers):
            try:
                print(r.recv())
            except EOFError:
                readers.remove(r)
    print('finished')

「0」〜「4」が表示されたあと「finished」と表示されれば期待通りです。

waitclose を並列して表現できないのでスレッドを使ってしまいましたが、 同一プロセス内での close()wait は検知できるということがわかればよいです。

開始方法

multiprocessingは3つのプロセス開始方法を持ちます。

set_start_method()get_context() を使って以下の方法を指定します。

fork

使っているOSがPOSIX系であればデフォルトの開始方法はこれです。

はじめに書きましたが、forkはプロセスのクローンを作ります。

fork によって作成された子プロセスは親プロセスのデータセグメントをすべて引き継ぐため、親プロセスが巨大なデータを持ってるとメモリを圧迫する可能性があります。

spawn

WindowsのPythonを使っている場合、デフォルトの開始方法はこれです。

子プロセスはプロセスオブジェクトの run() メソッドの実行に必要なリソースのみ継承します。

ということで、 Process オブジェクトの実行に与えたものだけが子プロセスに渡されます。

インタラクティブシェルでこれをやろうとするとAttributeErrorになるのでなんらかのオブジェクトがPickle化、あるいは復号できなかったとかそういうことなんだと思います。 理由については深入りしてません。

速度はforkやforkserverに劣るとのこと。理由はよくわかりませんがコンテキストのPickle化とかいろいろするからかな。

詳しい人いたら教えてください。なんでもしま(ry

forkserver

forkserverはプロセスを作るためのプロセスを作り、それ経由でforkを行います。

spawnと同様に不要なリソースを継承しません。調べてないから憶測ですが最初のプロセスはspawnで作ってる? なのでインタラクティブシェルでやると概ね死にます。

ではメモリ使用量について調べてみましょう。

以下をprocess.pyとして保存します。ここでは set_start_method を使います。

# coding: utf-8
import os
import sys
import time
import multiprocessing as mp

method = sys.argv[1]

if __name__ == '__main__':
    l = list(range(1000000))
    mp.set_start_method(method)
    # 20秒間スリープするだけのプロセス
    p = mp.Process(target=time.sleep, args=(20,))
    p.start()
    print('{}: {} -> {}'.format(method, os.getpid(), p.pid), end=' child:')
else:
    # 子プロセスの場合は名前空間を表示する
    print(__name__)

親子関係を表示して20秒間スリープするだけのプロセスを作るプログラムです。第一引数にプロセス開始方法を指定します。

サイズが100万のリストはただ持っているだけ使いません。生成方法ごとのメモリ使用量を比較するために持っています。

これらを同時に実行してそのメモリ使用量を比較してみましょう。

ターミナル1 ターミナル2
$ python3 process.py fork &       # [1] 3968
fork: 3968 -> 3969 child:

$ python3 process.py spawn &      # [2] 3970
spawn: 3970 -> 3972 child:__mp_main__

$ python3 process.py forkserver & # [3] 3973
forkserver: 3973 -> 3976 child:__mp_main__

[1]   Done                    python3 process.py fork
[2]-  Done                    python3 process.py spawn
[3]+  Done                    python3 process.py forkserver
$ ps aux|grep python3

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND # この行は本来表示されない
vagrant   3968  0.6  9.9  72584 49780 pts/1    S    12:32   0:00 python3 process.py fork
vagrant   3969  0.0  9.6  72584 48216 pts/1    S    12:32   0:00 python3 process.py fork

vagrant   3970  0.8 10.1  75548 50652 pts/1    S    12:32   0:00 python3 process.py spawn
vagrant   3971  0.2  2.1  35028 10816 pts/1    S    12:32   0:00 /usr/bin/python3 -c from multiprocessing.semaphore_tracker import main;main(3)
vagrant   3972  0.3  2.1  35152 10972 pts/1    S    12:32   0:00 /usr/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=6) --multiprocessing-fork

vagrant   3973  7.0 10.8  91356 54024 pts/1    S    12:32   0:00 python3 process.py forkserver
vagrant   3974  1.5  2.1  35024 10912 pts/1    S    12:32   0:00 /usr/bin/python3 -c from multiprocessing.semaphore_tracker import main;main(3)
vagrant   3975  3.0  2.9  50896 14576 pts/1    S    12:32   0:00 /usr/bin/python3 -c from multiprocessing.forkserver import main; main(3, 5, ['__main__'], **{'sys_path': ['/home/vagrant', '/usr/lib/python35.zip', '/usr/lib/python3.5', '/usr/lib/python3.5/plat-x86_64-linux-gnu', '/usr/lib/python3.5/lib-dynload', '/usr/local/lib/python3.5/dist-packages', '/usr/lib/python3/dist-packages']})
vagrant   3976  0.0  2.2  51028 11460 pts/1    S    12:32   0:00 /usr/bin/python3 -c from multiprocessing.forkserver import main; main(3, 5, ['__main__'], **{'sys_path': ['/home/vagrant', '/usr/lib/python35.zip', '/usr/lib/python3.5', '/usr/lib/python3.5/plat-x86_64-linux-gnu', '/usr/lib/python3.5/lib-dynload', '/usr/local/lib/python3.5/dist-packages', '/usr/lib/python3/dist-packages']})

forkで生成された子プロセスはメモリを多く消費しているのがわかりますね。

forkはプロセスをコピーした直後からプログラムが再開されますが、spawnはプログラムを最初からやり直します。

つまりspawnで生成された子プロセスは親プロセスと同じコードが実行されうるということです。

子プロセスで実行されたくないコードは if __name__ == '__main__': のようにして保護する必要があります。

このプログラムでは巨大なリストを生成している箇所がこれに該当します。 (ターミナルに出力されているように子プロセスの名前空間は __mp_main__ となるので __main__ と比較することで分岐できるというわけです)

ちなみに、さきほど説明した set_start_method() は一度しか呼ぶことが許されていません(どうやら子プロセスでもこの情報は共有されているっぽい)。

そのためこの関数をコールする箇所は保護が必須です。

shimizukawa and tell_k に多謝

おまけ

CPUの数

マルチコアCPUの場合は下記のようにしてCPUの数を得られます。プロセスをプールする場合はこの数に合わせるのが効率的と言われています。

実際、デフォルトだとCPU数になります。

>>> import multiprocessing
>>> multiprocessing.cpu_count()
4

ゾンビプロセス

Processによって開始されたプロセスはjoinしないとゾンビプロセスになります。

import time
from multiprocessing import Process

p = Process(target=time.sleep, args=(3, ))
p.start()
print(p.pid)
time.sleep(20)

これを zombie.py として実行すると..

vagrant   2307  0.7  0.4  32040 10144 pts/1    S+   13:51   0:00 python3 zombie.py
vagrant   2308  0.0  0.0      0     0 pts/1    Z+   13:51   0:00 [python3]
vagrant   2312  0.0  0.0  16572  1980 pts/0    S+   13:52   0:00 grep python

プロセステーブルは親プロセスが子プロセスの終了ステータスを認識すれば更新されるので p.is_alive() などで状態を確認してもゾンビプロセスは解消できます。

OSによっては確認できなかったりします(Macとかね)。

メインプロセス終了時に掃除されるみたいです。

以上。はー、長かったー。

間違い等あったら教えてください。