普段あまり使うことのない multiprocessing
について勉強したので備忘録としてまとめておきます。
この記事ではPython3を使います。 なお、基本的に動作確認はLinux/Macでしておりその他OS(特にWindows)での動作確認はできてないのであしからず。
さて、Python(POSIX系)でプロセスを作り出す最も原始的な方法は os.fork()
を使うことです。
os.fork
は実行プロセスのクローンを作ります。
生成された側のプロセス(子プロセス)では os.fork()
から 0
が返却されるため、
この値によって分岐することで子プロセスに任意の動作をさせることができるわけですね。
こんなC言語みたいなことやってると辛くなるので今回は multiprocessing という標準ライブラリを使ってプロセスを操作していきましょう。
Process
Processクラス を使うとプロセスが一つ作られます。
target
に関数を与え、関数に与える引数を args
, kwargs
で指定すると、別プロセスでその関数が実行されます。
第一仮引数は group
なんですが、これは threading
モジュールとの互換性のために存在するだけなので、省略するか None
を指定する必要があります。
- console
- stdout
import time from multiprocessing import Process p1 = Process(target=print, args=(1, 2, 3), kwargs={'sep': '-'}) p1.start() # 実行開始 p1.join() # p.start()が終わるまで待つ p1.start() # 再実行不可 p2 = Process(target=lambda: time.sleep(10)) p2.start() p2.terminate() print(p1, p2)
# - # - # - # - 1-2-3 # - Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python3.5/multiprocessing/process.py", line 99, in start assert self._popen is None, 'cannot start a process twice' AssertionError: cannot start a process twice # - <Process(Process-1, stopped)> <Process(Process-2, stopped[SIGTERM])>
start()
メソッドで処理が開始され terminate()
で中断します。
ここではほとんど関係ありませんが、 join()
メソッドを実行すると処理がブロックされます。
ブロックとは処理を待たされるという意味で、以降も出てくる表現です。
- warning
- 一度実行した
Process
インスタンスは再度実行できないことに注意してください。- 同じ処理をしたい場合はインスタンスを作りなおします。
- また、今回は説明しませんでしたが
daemon
属性がTrue
のプロセスは親プロセスの実行と同時にお亡くなりになります。- これは
start()
前に指定されている必要があります。 - 通常は子プロセスの処理が全て終了しない限り親プロセスは死にません。
- これは
- 一度実行した
Pool
Poolクラス は予め作成したプロセスに処理を割り当てるための仕組みです。 プールされたプロセスを使い回しながら処理を進めていきます。
例えば以下のように使います。
from multiprocessing import Pool import os import time start = time.time() def f(x): time.sleep(1) value = x * x print('{}s passed...\t{}\t(pid:{})'.format(int(time.time() - start), value, os.getpid())) return value with Pool(processes=3) as p: print(p.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) # コンテキストマネージャを使わずに以下のように書いても良い # Pool(3).map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
1s passed... 1 (pid:7484) 1s passed... 4 (pid:7486) 1s passed... 9 (pid:7485) 2s passed... 16 (pid:7486) 2s passed... 25 (pid:7484) 2s passed... 36 (pid:7485) 3s passed... 49 (pid:7486) 3s passed... 64 (pid:7484) 3s passed... 81 (pid:7485) 4s passed... 100 (pid:7486) [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
上記では複数の値に対して同じ処理を行うためのメソッドとして
map
を使っています。使い方はビルトインの map
と同じです。
map
はすべてのシーケンスを処理した結果をリストで返却するため、終了するまでブロックされます。
シーケンスの要素にタプルやリストを指定するとそれを展開する starmap
というメソッドもあります。
また、イテレータを返却する imap
があります。
イテレーションした時に処理が終了している結果だけ取り出すことができます。終了していない分はブロックされます。
from multiprocessing import Pool import time def f(x): time.sleep(10) return x * x p = Pool(processes=3) it = p.imap(f, range(10)) next(it) next(it) next(it)
上記の例では10秒ごとに3個ずつ結果を取り出せます。
imap
と map_async
はいずれも発行された時点でバックグラウンド処理を進めますが、
map_async
はすべての処理が終わっていないと結果が得られないのに対し、
imap
は終わっている分の結果だけイテレーションによってを取り出すことができます。
プールしたプロセスを一つずつ利用するメソッドに apply()
と apply_async()
があります。
from multiprocessing import Pool import time def f(x): time.sleep(10) return x * x p = Pool(processes=3) result1 = p.apply(f, args=(2,)) # 10秒後に結果が返却される result2 = p.apply_async(f, args=(3,)) # 結果はすぐに返却されるが、評価可能になるのは10秒後 print(result1, result2) print(result2.get()) # 値を評価する。評価できるまでブロックされる
apply_async
は
AsyncResult
オブジェクトを返却します。
内部的には apply
メソッドは apply_async
メソッドを呼んだ結果を返しているだけです。
次はプロセス間通信についてです
プロセス間通信
プロセスというのは基本的にメモリ空間が共有されないので、 プロセス間で何かやり取りしたくなったらプロセス間通信(IPC)を行う必要があります。
もし、 POSIX
や SystemV
のプロセス間通信が行いたい場合は
こちら を参照してください。
パイプ
Pipeクラス は2点間で情報をやり取りを行うための基本的な通信です。
Pipe(duplex=True)
が
Connection
を2つ返却します。
作成したパイプの片方を引数として渡し、 send()
, recv()
することでプロセス間の通信を実現します。
双方向のパイプを関数の引数に渡して利用します。以降の別の通信方法でも同様に引数に渡すことで共有されると覚えておきましょう。
- console
- stdout
from multiprocessing import Process, Pipe
def f(conn): conn.send([42, None, 'hello']) conn.close()
if name == 'main': parent_conn, child_conn = Pipe() parent_conn.send('a') parent_conn.send(['b', 'c'])
multiprocessing.Pipe によると
duplexがTrue(デフォルト)ならパイプは双方向性です。
duplexがFalseならパイプは一方向性で、conn1はメッセージの受信専用、conn2はメッセージの送信専用になります。
だそうです。
本来パイプというものは片方向の通信を実現するものなので、双方向のコネクションはおそらく内部的には2セットのパイプを持っているのでしょう。
send(オブジェクト)メソッドでパイプに蓄積されたデータをもう片方のパイプのrecv()メソッドによって取り出します。取り出し順はFIFOで、取り出すデータがない場合はブロックされます。
取り出すデータがあるかどうかは poll()
メソッドで確認できます。
>>> p1, p2 = Pipe() >>> p2.poll() False >>> p1.send(1) >>> p2.poll() True >>> p2.recv() 1 >>> p2.poll() False
ちなみに送信されるデータはPickle化されるので単純なスカラ値でなくとも送受信可能ですが、一時的な値や複雑な参照を持ったオブジェクトだとうまく受け渡せないことがあるかもしれないので注意しましょう。 以降のQueueでも同様です。
キュー
Queueクラス はFIFO構造でデータをやり取りするためのオブジェクトです。Pipeによって実装されているようです。 (参考)
気になる方は multiprocessing/queue.py
の実装を読んでみましょう。
Pipe
は2つのオブジェクトを返却し片方を共有することで2点間の通信を実現しましたが、
Queue
は1つのオブジェクトのみを返却します。
コネクションオブジェクトと同様に引数として渡すことで通信を実現します。 イメージ的には通信というより共有に近いと思います。
queue.Queue と使い方は同じそうです。
簡単に使い方を見てみましょう。ただ使い方だけ見たいだけなので同一プロセス間で試します。
基本は put()
で積んだオブジェクトを get()
で取り出すことを覚えてれば大丈夫です。
- console
- stdout
from multiprocessing import Queue q = Queue(3) # キューの最大サイズを3に限定する。デフォルトは0(無限) q.put(1, block=False) q.put(2, block=False) q.put(3, block=False) q.put(4, block=False) # - # - # - # - # - q.get(block=False) q.get(block=False) q.get(block=False) q.get(block=False)
# - # - # - # - # - # - Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put raise Full queue.Full # - 1 2 3 Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python3.5/multiprocessing/queues.py", line 107, in get raise Empty queue.Empty
キューの最大サイズに達した状態でput()する、キューが空の状態でget()するとブロックされます。
このブロッキングを制御する引数として、blockとtimeoutを持ちます。
- block引数にFalseを指定した場合、ブロックされるような状態だと例外が発生します。
- timeout引数に秒数を指定した場合、その秒数が経過してもブロックされるような状態だと例外が発生します。
- getの場合は「Empty」例外、putの場合は「Full」例外が発生します。
セマフォ
Semaphoreクラス は資源の上限を管理するための仕組みです。
通信というよりプロセスを超えて資源を制御する仕組みって感じですね。まぁプロセス間通信の一種なのでここで取り上げます。
たとえばQueueのような共有資源に制限をかける場合には
Semaphore
または BoundedSemaphore
を使うことで実現できます。 使えるメソッドは以下のような感じ。
- acquire()
- 資源を獲得します。
- 資源を獲得できるときはTrueを返却しますが、できないときはブロックされます。
block引数
がFalse
ならFalse
を返却します。
- 資源を獲得します。
- release()
- 資源を解放します。
BoundedSemphore
を使った場合、獲得した資源以上に解放しようとするとValueError
例外を発生させます。
- get_value()
- カウンタの値を確認できます。
コネクションオブジェクトやキューと同様に引数として渡すことでその値を共有できます。
内部的には カウンタを増減させているだけです。仕組みを見ておきましょう。
- console
- stdout
from multiprocessing import Semaphore s = Semaphore(3) s.get_value() s.acquire(block=False) s.acquire(block=False) s.acquire(block=False) s.acquire(block=False) s.get_value() s.release() s.release() s.release() s.release() s.get_value()
# - # - 3 True True True False 0 # - # - # - # - # - 4
- console
- stdout
from multiprocessing import BoundedSemaphore s = BoundedSemaphore(3) s.get_value() s.acquire(block=False) s.acquire(block=False) s.acquire(block=False) s.acquire(block=False) s.get_value() s.release() s.release() s.release() s.release() # - # - # - s.get_value()
# - # - 3 True True True False 0 # - # - # - # - Traceback (most recent call last): File "<stdin>", line 1, in <module> ValueError: semaphore or lock released too many times # - 3
ロック
Lockクラス
は最大資源数が 1
で固定された BoundedSemaphore
相当の動作をします。
>>> from multiprocessing import Lock >>> l = Lock() >>> l.acquire(block=False) True >>> l.acquire(block=False) False >>> l.release() >>> l.release() Traceback (most recent call last): File "", line 1, in ValueError: semaphore or lock released too many times
再帰ロック
RLockクラス は再帰ロックオブジェクトということでこれだけだとよくわからないんですが、操作元が同一プロセスか否かによって動作が異なるLockです。
Lockとは違いロックを獲得したプロセスからであれば何回であっても獲得を行えます。
そしてロックを獲得したプロセスからでなければ解放は行えませんし、 獲得した分だけ解放されなければ他のプロセスからロックを獲得できません。
- console
- stdout
from multiprocessing import Process, RLock l = RLock() l # - l.acquire(block=False) # - l # - p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,)) p.start() p.join() # - p = Process(target=(lambda l: print(l.release())), args=(l,)) p.start() p.join() # - # - # - # - # - # - # - # - l.acquire(block=False) # - l # - l.release() l # - p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,)) p.start() p.join() # - # - l.release() p = Process(target=(lambda l: print(l.acquire(block=False))), args=(l,)) p.start() p.join() # - l
# - # - <RLock(None, 0)> # - True # - <RLock(MainProcess, 1)> # - # - # - False # - # - # - Process Process-2: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "<stdin>", line 1, in <lambda> AssertionError: attempt to release recursive lock not owned by thread # - True # - <RLock(MainProcess, 2)> # - # - <RLock(MainProcess, 1)> # - # - # - False # - # - # - # - # - True <RLock(SomeOtherProcess, nonzero)>
共有オブジェクト
共有メモリ空間にデータを置くことで通信を行います。共有メモリには
mmap
モジュールが使われているようです。
C
と互換性のあるデータ型しか保存できません。
興味のある方は sharedtypes モジュールを参照してください。
共有したい変数を生成し、パイプやキューと同じように引数として渡して使います。
- console
- stdout
import time from multiprocessing import Process, Value v = Value('i', 100) def f(v): print(v.value) time.sleep(5) print(v.value) p = Process(target=f, args=(v,)) p.start() time.sleep(1) v.value += 1
# - # - # - # - # - # - # - # - # - 100 # - # - 101 # 5秒後に別プロセスによって表示される
- console
- stdout
import time from multiprocessing import Process, Array a = Array('i', range(10)) def f(a): print(a[:]) time.sleep(5) print(a[:]) p = Process(target=f, args=(a,)) p.start() time.sleep(1) a[0] = 100
# - # - # - # - # - # - # - # - # - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # - # - [100, 1, 2, 3, 4, 5, 6, 7, 8, 9] # 5秒後に別プロセスによって表示される
また、共有オブジェクトはロックによってアクセスを制限できます。
- console
- stdout
import time from multiprocessing import Process, Array a = Array('i', range(10)) def f(a): a[0] = 100 a.acquire() p = Process(target=f, args=(a,)) p.start() a[:] time.sleep(1) a.release() time.sleep(1) a[:]
# - # - # - # - # - # - # - # - # - # - # - # - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # - # - # - [100, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ここでは書き込みがロックされましたが、参照される場合もロックされます。
サーバープロセス
メモリ領域を共有するという意味で、 Managerクラス というものがあります。 ManagerはPythonプロセスなので柔軟なオブジェクトを表現できます。
親プロセスが終了するかガベージコレクションされると停止するそうです。
- console
- stdout
from multiprocessing import Process, Manager def f(d, l): d['c'] = 8 l.append(9) m = Manager() m.address d = m.dict({'a': 1, 'b': [2, 3, 4]}) l = m.list([5, 6, '7']) dict(d) list(l) p = Process(target=f, args=(d, l)) p.start() p.join() dict(d) list(l) m.shutdown() dict(d) # #
# - # - # - # - # - # - # - '/tmp/pymp-rv1zf_84/listener-fh5hnrf5' # - # - {'b': [2, 3, 4], 'a': 1} [5, 6, '7'] # - # - # - {'c': 8, 'b': [2, 3, 4], 'a': 1} [5, 6, '7', 9] # shutdownするとプロセスがクローズされるため参照できなくなる Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<string>", line 2, in keys File "/usr/lib/python3.5/multiprocessing/managers.py", line 716, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send self._send_bytes(ForkingPickler.dumps(obj)) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe
ただし、共有メモリと比べると少し遅いそうなのでうまく使いわけてください。 このへんはあんまり良く調べてない。
ソケット通信
上記で説明したように、プロセス間で通信をする場合は基本的にパイプやキューを使うわけですが、みんな大好きなソケット通信もできます。
Listenerクラス と Clientクラス を使います。
言わずもがなそれぞれサーバー・クライアントに該当します。
下記のコードはリファレンスのコードほぼそのままです。
- ターミナル1(サーバ)
- ターミナル2(クライアント)
from multiprocessing.connection import Listener from array import array address = ('localhost', 6000) # family is deduced to be 'AF_INET' with Listener(address, authkey=b'secret password') as listener: with listener.accept() as conn: print('connection accepted from', listener.last_accepted) conn.send([2.25, None, 'junk', float]) conn.send_bytes(b'hello') conn.send_bytes(array('i', [42, 1729]))
from multiprocessing.connection import Client from array import array address = ('localhost', 6000) with Client(address, authkey=b'secret password') as conn: print(conn.recv()) # => [2.25, None, 'junk', float] print(conn.recv_bytes()) # => 'hello' arr = array('i', [0, 0, 0, 0, 0]) print(conn.recv_bytes_into(arr)) # => 8 print(arr) # => array('i', [42, 1729, 0, 0, 0])
実行するとターミナル2にターミナル1から送った内容が表示されます。
[1, 'a'] b'hello' 8 array('i', [42, 1729, 0, 0, 0])
やってる事自体は単純ですが、一応処理の流れを解説してみます。
サーバーはlocalhostの6000番ポートで待つ。認証キーとして 'secret password' を使う
クライアントはサーバーが待つlocalhostの6000番ポートに接続する。認証キーとして 'secret password' を使う
- クライアントが認証に失敗すると
multiprocessing.context.AuthenticationError: digest received was wrong
となり、 サーバー側もクライアント側も両方中断される。 - ホストやポートが違うと
ConnectionRefusedError
となり単に接続できないだけ
- クライアントが認証に失敗すると
クライアントがサーバーに接続すると
accept()
によってコネクションオブジェクト(conn)が生成されるサーバーはコネクションを使ってクライアントにデータを送りつける
- サーバーは
recv*()
がないため処理を待ったりはしない - 本来サーバープロセスであればループさせることで引き続き受け付けるが、そんなものはないのでサーバー側はこれで終了
- サーバーは
クライアントはサーバーから受け取ったデータから期待するオブジェクトをおくられた分だけ取り出す
- recv
は受け取ったバッファをオブジェクトとして解析して表示する。
- ただし、複雑なクラスは別プロセスでは unpickle できない可能性があるので送受信するのは基本型に絞ったほうが良い。
- recv_bytes
はバッファをバイトとして解釈する。
send_bytes
で送ったバッファをrecv
とかで取り出したりすると_pickle.UnpicklingError: invalid load key, 'w'.
みたいなエラーが発生するので注意。
- recv_bytes_into
は読み込んだバッファをarr変数に格納する。
- バイト数がメソッドの返却値として返却される。
- タイミングによってはクライアント側が先に行われるかもしれないが、クライアントは
recv*()
によっていつまでのサーバーからのデータを待つのでどちらが先に実行されようと実行結果は同じ - 送られたデータはバッファ経由で読むので送られたタイミングと同時である必要はない
- recv
は受け取ったバッファをオブジェクトとして解析して表示する。
wait
wait関数 は多重化したコネクションオブジェクトが読み込み可能になるまでブロックしてくれる関数です。 そうです、selectと似てますね。
- console
- stdout
from multiprocessing import Process, Pipe from multiprocessing.connection import Listener, Client, wait # 受け取る関数 def receive(readers): while readers: print('receiving...') for r in wait(readers): try: print('received {}:'.format(r.fileno()), r.recv()) except EOFError: readers.remove(r) print('finished') # パイプを準備 (r1, w1), (r2, w2) = Pipe(), Pipe() listener, w3 = Listener(('localhost', 9685)), Client(('localhost', 9685)) r3 = listener.accept() print('r1: {}, r2: {}, r3: {}'.format(r1.fileno(), r2.fileno(), r3.fileno())) p = Process(target=receive, args=([r1, r2, r3],)) p.start() # 以下は個別に手入力 w1.send(100) # # w2.send(200) # # w3.send(300) # # w1.send(400) #
# - # - # - # - # - # - # - # - # - # - # - # - # - # - # - # - # - # - r1: 3, r2: 5, r3: 9 # - receiving... # - received 3: 100 receiving... # - received 5: 200 receiving... # - received 9: 300 receiving... # - received 3: 400 receiving...
上記はパイプとソケットを多重化して読み込み可能になったら(データが来たら)送られてきたデータを表示する単純な例です。
リファレンスの例では書き込み側のパイプを閉じると対応する読み込み側のパイプで検知するという旨のことが書いてありますが、
どうやら wait
を実行しているのと同じプロセスで実行する必要があるようです。
というわけで以下の例は別プロセスからパイプ経由で値を受け取った後にクローズするという例です。
import time from multiprocessing import Process, Pipe from multiprocessing.connection import wait from threading import Thread def foo(w): for i in range(5): w.send(i) if __name__ == '__main__': r, w = Pipe(duplex=True) readers = [r] Process(target=foo, args=(w,)).start() def closing(w): time.sleep(5) w.close() Thread(target=closing, args=(w,)).start() while readers: for r in wait(readers): try: print(r.recv()) except EOFError: readers.remove(r) print('finished')
「0」〜「4」が表示されたあと「finished」と表示されれば期待通りです。
wait
と close
を並列して表現できないのでスレッドを使ってしまいましたが、
同一プロセス内での close()
を wait
は検知できるということがわかればよいです。
開始方法
multiprocessingは3つのプロセス開始方法を持ちます。
set_start_method() か get_context() を使って以下の方法を指定します。
fork
使っているOSがPOSIX系であればデフォルトの開始方法はこれです。
はじめに書きましたが、forkはプロセスのクローンを作ります。
fork
によって作成された子プロセスは親プロセスのデータセグメントをすべて引き継ぐため、親プロセスが巨大なデータを持ってるとメモリを圧迫する可能性があります。
spawn
WindowsのPythonを使っている場合、デフォルトの開始方法はこれです。
子プロセスはプロセスオブジェクトの
run()
メソッドの実行に必要なリソースのみ継承します。
ということで、 Process
オブジェクトの実行に与えたものだけが子プロセスに渡されます。
インタラクティブシェルでこれをやろうとするとAttributeErrorになるのでなんらかのオブジェクトがPickle化、あるいは復号できなかったとかそういうことなんだと思います。 理由については深入りしてません。
速度はforkやforkserverに劣るとのこと。理由はよくわかりませんがコンテキストのPickle化とかいろいろするからかな。
詳しい人いたら教えてください。なんでもしま(ry
forkserver
forkserverはプロセスを作るためのプロセスを作り、それ経由でforkを行います。
spawnと同様に不要なリソースを継承しません。調べてないから憶測ですが最初のプロセスはspawnで作ってる? なのでインタラクティブシェルでやると概ね死にます。
ではメモリ使用量について調べてみましょう。
以下をprocess.pyとして保存します。ここでは set_start_method
を使います。
# coding: utf-8 import os import sys import time import multiprocessing as mp method = sys.argv[1] if __name__ == '__main__': l = list(range(1000000)) mp.set_start_method(method) # 20秒間スリープするだけのプロセス p = mp.Process(target=time.sleep, args=(20,)) p.start() print('{}: {} -> {}'.format(method, os.getpid(), p.pid), end=' child:') else: # 子プロセスの場合は名前空間を表示する print(__name__)
親子関係を表示して20秒間スリープするだけのプロセスを作るプログラムです。第一引数にプロセス開始方法を指定します。
サイズが100万のリストはただ持っているだけ使いません。生成方法ごとのメモリ使用量を比較するために持っています。
これらを同時に実行してそのメモリ使用量を比較してみましょう。
- ターミナル1
- ターミナル2
$ python3 process.py fork & # [1] 3968 fork: 3968 -> 3969 child: $ python3 process.py spawn & # [2] 3970 spawn: 3970 -> 3972 child:__mp_main__ $ python3 process.py forkserver & # [3] 3973 forkserver: 3973 -> 3976 child:__mp_main__ [1] Done python3 process.py fork [2]- Done python3 process.py spawn [3]+ Done python3 process.py forkserver
$ ps aux|grep python3 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND # この行は本来表示されない vagrant 3968 0.6 9.9 72584 49780 pts/1 S 12:32 0:00 python3 process.py fork vagrant 3969 0.0 9.6 72584 48216 pts/1 S 12:32 0:00 python3 process.py fork vagrant 3970 0.8 10.1 75548 50652 pts/1 S 12:32 0:00 python3 process.py spawn vagrant 3971 0.2 2.1 35028 10816 pts/1 S 12:32 0:00 /usr/bin/python3 -c from multiprocessing.semaphore_tracker import main;main(3) vagrant 3972 0.3 2.1 35152 10972 pts/1 S 12:32 0:00 /usr/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=6) --multiprocessing-fork vagrant 3973 7.0 10.8 91356 54024 pts/1 S 12:32 0:00 python3 process.py forkserver vagrant 3974 1.5 2.1 35024 10912 pts/1 S 12:32 0:00 /usr/bin/python3 -c from multiprocessing.semaphore_tracker import main;main(3) vagrant 3975 3.0 2.9 50896 14576 pts/1 S 12:32 0:00 /usr/bin/python3 -c from multiprocessing.forkserver import main; main(3, 5, ['__main__'], **{'sys_path': ['/home/vagrant', '/usr/lib/python35.zip', '/usr/lib/python3.5', '/usr/lib/python3.5/plat-x86_64-linux-gnu', '/usr/lib/python3.5/lib-dynload', '/usr/local/lib/python3.5/dist-packages', '/usr/lib/python3/dist-packages']}) vagrant 3976 0.0 2.2 51028 11460 pts/1 S 12:32 0:00 /usr/bin/python3 -c from multiprocessing.forkserver import main; main(3, 5, ['__main__'], **{'sys_path': ['/home/vagrant', '/usr/lib/python35.zip', '/usr/lib/python3.5', '/usr/lib/python3.5/plat-x86_64-linux-gnu', '/usr/lib/python3.5/lib-dynload', '/usr/local/lib/python3.5/dist-packages', '/usr/lib/python3/dist-packages']})
forkで生成された子プロセスはメモリを多く消費しているのがわかりますね。
forkはプロセスをコピーした直後からプログラムが再開されますが、spawnはプログラムを最初からやり直します。
つまりspawnで生成された子プロセスは親プロセスと同じコードが実行されうるということです。
子プロセスで実行されたくないコードは if __name__ == '__main__':
のようにして保護する必要があります。
このプログラムでは巨大なリストを生成している箇所がこれに該当します。
(ターミナルに出力されているように子プロセスの名前空間は __mp_main__
となるので __main__
と比較することで分岐できるというわけです)
ちなみに、さきほど説明した set_start_method()
は一度しか呼ぶことが許されていません(どうやら子プロセスでもこの情報は共有されているっぽい)。
そのためこの関数をコールする箇所は保護が必須です。
shimizukawa and tell_k に多謝
おまけ
CPUの数
マルチコアCPUの場合は下記のようにしてCPUの数を得られます。プロセスをプールする場合はこの数に合わせるのが効率的と言われています。
実際、デフォルトだとCPU数になります。
>>> import multiprocessing >>> multiprocessing.cpu_count() 4
ゾンビプロセス
Processによって開始されたプロセスはjoinしないとゾンビプロセスになります。
import time from multiprocessing import Process p = Process(target=time.sleep, args=(3, )) p.start() print(p.pid) time.sleep(20)
これを zombie.py
として実行すると..
vagrant 2307 0.7 0.4 32040 10144 pts/1 S+ 13:51 0:00 python3 zombie.py vagrant 2308 0.0 0.0 0 0 pts/1 Z+ 13:51 0:00 [python3] vagrant 2312 0.0 0.0 16572 1980 pts/0 S+ 13:52 0:00 grep python
プロセステーブルは親プロセスが子プロセスの終了ステータスを認識すれば更新されるので
p.is_alive()
などで状態を確認してもゾンビプロセスは解消できます。
OSによっては確認できなかったりします(Macとかね)。
メインプロセス終了時に掃除されるみたいです。
以上。はー、長かったー。
間違い等あったら教えてください。