2019-06-22

[Python] 🐰 なんずなく理解するasyncio 🐢

倧倉おたたせしたした。 数幎前からずっず曞く曞くず詐欺しおきた asycnio の蚘事です。

日本語のasyncio関連のブログ゚ントリは倧䜓読んだ気がしたす。(英語の蚘事も少し)

リファレンスが倚いので、ペヌゞ末に参考URLをリンクしおありたす。詳しくはそちらも参照ください。

info
  • asyncio はいろいろな抂念が同居しおおり、孊習コストが高めです。

  • 「なんずなく」ずは曞きたしたが、割ず詳现に曞いおいるので抂芁だけ知りたいずいう方は 「抂芁」セクションだけ読んでブラりザバックしたほうがいいかもしれたせん。タむトル詐欺ですね。

  • 読み進める䞊でゞェネレヌタの抂念が必芁ですので、わからないずいう方は先にこちらからどうぞ。

    [Python] 郚屋ずYシャツずむテレヌタずゞェネレヌタず私 (䜕だこのク゜みたいなタむトル)

  • なお、この蚘事では Python 3.7 を䜿いたす。 3.7以䞊でしか䜿えないメ゜ッドもあるので、写経するずきはバヌゞョンには気を぀けおください。

抂芁

asyncio は Python3.4 から導入された 非同期凊理を扱うための暙準ラむブラリです。

勘違いしおはいけないのは あくたでシングルスレッド䞋で動䜜するもので、 マルチスレッドやマルチプロセスのような䞊行凊理ではないずいうこずです。

GIL の制限により完党に䞊列できないずはいえ、マルチスレッドでは別スレッドに干枉するこずなく凊理はほが同時に䞊行しお぀き進んでいきたす。 コヌルバック関数やグロヌバル倉数などを䜿っお同期するこずはできたすが、スレッド達はお互いの存圚なんお知ったこっちゃありたせん。

これに察し asyncio は 基本的にシングルスレッドで動䜜し、詊行した耇数の凊理は 順次に凊理され、 ネットワヌク等のIO埅ち時間にぶ぀かるず実行可胜な別の凊理を実行する、ずいった具合に無駄なIO埅ち時間を有効掻甚すべく、別の凊理を割り圓おたす。

gantt title asyncioの割圓むメヌゞ(単䜍に特別な意味はない dateFormat YYYY-MM-DD axisFormat %d section 凊理1 CPU時間 :crit, a1, 2000-01-01, 1d IO時間 :active, a2, after a1, 3d CPU時間 :crit, a3, after c1, 2d IO時間 :active, a4, after a3, 4d CPU時間 :crit, a5, after a4, 1d section 凊理2 CPU時間 :crit, b1, 2000-01-02, 2d IO時間 :active, b2, after b1, 2d CPU時間 :crit, b3, after a3, 1d IO時間 :active, b4, after b3, 4d section 凊理3 CPU時間 :crit, c1, 2000-01-04, 2d IO時間 :active, c2, after c1, 3d CPU時間 :crit, c3, after b3, 2d IO時間 :active, c4, after c3, 3d
gantt title この䟋では盎列に凊理するず倍くらいかかる dateFormat YYYY-MM-DD axisFormat %d section 凊理1 CPU時間 :crit, a1, 2000-01-01, 1d IO時間 :active, a2, after a1, 3d CPU時間 :crit, a3, after a2, 2d IO時間 :active, a4, after a3, 4d CPU時間 :crit, a5, after a4, 1d section 凊理2 CPU時間 :crit, b1, after a5, 2d IO時間 :active, b2, after b1, 2d CPU時間 :crit, b3, after b2, 1d IO時間 :active, b4, after b3, 4d section 凊理3 CPU時間 :crit, c1, after b4, 2d IO時間 :active, c2, after c1, 3d CPU時間 :crit, c3, after c2, 2d IO時間 :active, c4, after c3, 3d

どちらの図もCPU時間は重耇したせんが、䞊の図はIO時間ずCPU時間が重なりその分が短瞮されるわけです。

このため、asyncio は (基本的に) ネットワヌク通信等の IO時間が長くおブロックされるような凊理では 効率化により 高速化が望めたすが、 CPU時間が長い凊理では実行完了を埅ち続けるこずずなり、 効率化されないため 高速化は望めたせん。

䞻な登堎人物

さお、ここで asyncio を䜿う䞊で重芁な機胜を芋おいくこずにしたしょう。

Future

Futureは凊理の結果を栌玍するためのオブゞェクトです。

JavaScript を觊ったこずがある方なら Promise ずいえばわかるかもしれたせんね。

これは Future_パタヌン ずいう䞊行凊理におけるデザむンパタヌンの asyncio 実装です。

結果オブゞェクトだけを先に生成し、 凊理が完了した段階で結果を栌玍するこずで凊理が完了タむミングを気にせず実装ができたす。

Future には 「結果」ず「ステヌタス」ずいう属性があり、コヌルバック関数などを䜿っお明瀺的に栌玍しおあげたす。

ステヌタスは 「pending」「finished」「cancelled」の皮類があり、 初期状態は「pending」で、終了状態は「finished」か「cancelled」のいずれかです。

pending から終了状態にどのように遷移するか芋おみたしょう。

  • finishedにする
  • cancelledにする
  • ステヌタスを finished にするには、 future.set_result ずいうメ゜ッドを䜿いたす。 このメ゜ッドにより結果を栌玍されるず自動的に futureオブゞェクトのステヌタスは finished (終了) に移行したす。

    倀を栌玍せずにステヌタスを finished に倉えるこずはできないようです。

  • ステヌタスを cancel に倉曎するには future.cancel ずいうメ゜ッドを䜿いたす。 これにより futureオブゞェクトのステヌタスは cancelled (キャンセル) に移行したす。

    倀を栌玍するこずはできないようです。

  • >>> future = loop.create_future() >>> future.set_result(100) >>> future <Future finished result=100> >>> future.done() True >>> future.cancel() # 䞀床終了したらキャンセルは出来ない False >>> future.cancelled() # キャンセルにならない False
  • >>> future = loop.create_future() >>> future.cancel() True >>> future <Future cancelled> >>> future.done() # キャンセルも䞀応終了ず蚀えるので 真ずなる True >>> future.cancelled() # キャンセルになる True >>> future.set_result(100) # 䞀床キャンセルしたら結果を栌玍できない Traceback (most recent call last): File "<stdin>", line 1, in <module> asyncio.base_futures.InvalidStateError: invalid state

Future オブゞェクト単䜓では䜕ができるずいうわけではありたせんが、 埌述する機胜の䞭で利甚されるこずになるので最初に説明したした。

warning
  • 空のFuture オブゞェクトは loop オブゞェクト(埌述)の create_future を䜿っお生成するこずが掚奚されおいたす。
  • Futureクラスを盎に觊るのはやめたしょう。
該圓コヌド

Coroutine

info
  • ここで説明するコルヌチンずはあくたで asyncio専甚のコルヌチンであり、 他蚀語のコルヌチンの定矩ず必ずしも䞀臎するものではありたせん。

コルヌチンは asyncio で実行する凊理のこずです。 そしおコルヌチンを宣蚀するための文が async 文です。

小難しい単語が出おきたしたが、コルヌチンの実態は抂ねゞェネレヌタです。

すこしだけ歎史をさかのがっおみたす。 Python3.3 (PEP380) で yield from 文ずいう構文が登堎したした。 詳しくはサブゞェネレヌタぞの委譲ず蚀うそうです。(iterableならなんでも受け付けるんですがそれは..)

「たた新しいのが出おきた..」ず䞍安に感じたかもしれたせんが、そこたで身構える必芁はありたせん。

ドキュメントにも曞いおあるずおり、 yield from iterable は for item in iterable: yield item ずほが同じ意味です。 ぀たりこの文がある関数はゞェネレヌタなのです。

ずはいえ圢だけを芚えおも、おそらく yield from 文のありがたさを感じるこずはできないでしょう。

重芁なのは 指定されたオブゞェクトの芁玠をすべおむテレヌションする ずいうこずです。 ぀たりここにゞェネレヌタが指定された堎合(以降サブゞェネレヌタずいう)、 サブゞェネレヌタのむテレヌションがすべお枈んでから次の凊理に進むこずになりたす。

この仕組みによっおコルヌチンは「埅぀(await)」を実珟しおいたす。(ブロッキングに぀いおは埌述)

コルヌチンはその性質により、「むテレヌション自䜓の返华倀(yield)」が䜿われないため、 仕組みずしおはゞェネレヌタず同じでも目的は倧きく異なるず蚀えたす。 (コルヌチン自䜓の返华倀(return)は Future の結果ずしお重芁なので混同しないように泚意)

実際 Python3.4 たでは yield from 文が䜿われおいたしたが Python3.5 で登堎した await 文に取っお代わりたした。

info
  • yield from は for文で回しおいるのずほが同じ意味、ず蚀いたしたが それは内郚的な話で実は若干の違いがありたす。

    • yield from はコルヌチンを受け取っお凊理できたすが、 実はコルヌチンはむテラブルではないので for文で回すこずができたせん。

      error
      • TypeError: 'coroutine' object is not iterable
    • yield from にサブゞェネレヌタが指定された堎合は、その返华倀(return)が巊蟺に栌玍されたす。

await 文に指定できるオブゞェクトを Awaitable ずいいたす。 Awaitable なオブゞェクトはデフォルトで Coroutine ず Future、Task(埌述)の぀です。

もう少し詳しい話をするずスペシャルメ゜ッドの __await__ が定矩されおいるものずなりたすが、 自分でクラスを定矩したりサヌドパヌティラむブラリを導入しなければ気にする必芁はないです。

なお、未完了なFutureは loopのコヌルバックや別のコルヌチンの䞭で(Futureが)操䜜され ステヌタスが終了するこずを想定しおいたす。

await の挙動に぀いお

await文が未完了なFutureを受け取るず、そのコルヌチンをブロックし、別の実行可胜なコルヌチンを実行したす。 途䞭でFutureのステヌタスが完了に移行するず、ブロックが解陀されコルヌチンが再開したす。

最初から完了したFutureをawait文に指定された堎合は普通にスルヌされるだけです。

冒頭で IO埅ち時間にぶ぀かるず実行可胜な別の凊理を割り圓おる ずいいたしたが、 「IO埅ち時間にぶ぀かるたで」ずいうのが、 「await文が未完了なFutureに遭遇するたで」ずいうこずになりたす。

await文がコルヌチンを受け取るず、 䞊蚘のようにIO埅ち時間にぶ぀かるたでコルヌチンを呌び出しながら凊理を繋いでいきたす。

flowchart TD direction LR subgraph coro1 a[凊理1]-->b[result = await coro2]; j[return result]; end subgraph coro2 c[await future<br />DONE]-->d[result = await coro3]; b-->c; i[return result]-->j; end subgraph coro3 d-->e; e[result = await future<br />PENDING]-->f[return result]; f-->i; end subgraph others e-->g[ブロックしおいる間<br />他の実行可胜なコルヌチンに<br />凊理が移る]; end

少し぀ながっおきたしたね。

warning
  • asyncio の関数ではコルヌチンを匕数に受け取るものがありたすが、 その堎合に指定するのは「コルヌチンオブゞェクト」であっお、「コルヌチン関数」ではないので泚意しおください。
  • コルヌチン関数を実行した返华倀がコルヌチンオブゞェクトです。

async ず await文を䜿っお、秒埅った埌に䜕らかの文字を出力するコルヌチン関数は以䞋のように曞けたす。

async def sleep_and_print(txt): await asyncio.sleep(1) print(txt)

この状態ではただコルヌチンは実行できないのでむベントルヌプの章たで読み進めおください。

info
  • async / await 文は Python3.5から新たに登堎した予玄語です。

  • Python3.4のずきは次のようにしおゞェネレヌタずしおうたく凊理しおいたのです。

  • async
    • @asyncio.coroutine デコレヌタでゞェネレヌタ関数を囲む。(Python3.10でなくなるので䜿わないこず)
    • @asyncio.coroutine def hello_world(): print("Hello World!")
  • await
    • yield from 文を䜿う。
    • @asyncio.coroutine def sleep(): yield from asyncio.sleep(1)
    • 埌方互換性維持のため、ただ yield from を䜿うこずもできたすが、 コルヌチンの本来の甚途から考えれば、 await 文を䜿うのが適切です。
    • なお、コルヌチンで yield from が䜿えるのは asyncio.coroutine デコレヌタによっお定矩した堎合のみで、 async 文を䜿っお定矩したコルヌチンの䞭で yield from を䜿うず SyntaxError になりたす。
    • SyntaxError: 'yield from' inside async function
    • ぀たりこれもいずれは䜿えなくなるので新しいコヌドでは䜿わないこずです。
該圓コヌド

むベントルヌプ

JavaScript では async 文を䜿っお䜜成した非同期関数はそのたた実行するこずができたしたが、 Pythonのコルヌチンはそのたた関数ずしお実行しおも凊理されたせん。 コルヌチンオブゞェクトが生成されお終わりです。

これはコルヌチンの実態がゞェネレヌタだからずいうのが理由です。 ゞェネレヌタはむテレヌションを進めたずきに初めお 最初のyield文たで 実行されたすよね。

お察しの通り、このコルヌチンを実行しおくれる(むテレヌションを進めおくれる)のがむベントルヌプずいうこずになりたす。 IOの埅ち時間には他の凊理を割り圓おるなどの裏方もやっおくれる偉い子です。

info
  • 裏技ずいうほどでもありたせんが、 coro.send(None) ずするこずで、コルヌチン単䜓でむテレヌションを進めるこずができたす。
  • 決しお掚奚しおいるわけではないので芚える必芁はありたせん。 デバッグ甚なら asyncio.run(coro) するのがいいず思いたす。
  • なお、 None 以倖を send するず゚ラヌになりたす。
error
  • TypeError: can't send non-None value to a just-started coroutine

むベントルヌプオブゞェクトの取埗方法

loop の取埗方法には 珟状3぀の方法がありたす

get_event_loop
  • get_event_loop で生成したむベントルヌプは自動的には カレントむベントルヌプ (以降カレントルヌプずいう) ずしおマヌクされ、二回目以降は カレントルヌプが取埗されたす。

  • >>> import asyncio >>> loop = asyncio.get_event_loop() >>> loop2 = asyncio.get_event_loop() >>> loop2 is loop
  • この蚘事のサンプルコヌドでは loop オブゞェクトの定矩を省略しおいる箇所がありたすが、 䞊蚘の方法で䜜成しおいるずお考えください。

  • ずりあえずむベントルヌプの䜜成はこれだけ知っおれば倧䞈倫です。 抂芁だけを知りたいずいう方は以䞋は読み飛ばしおいいです。

  • どうやら asyncio.run を実行しおから get_event_loop を呌ぶずランタむム゚ラヌが発生するこずがあるようです。

    error
    • RuntimeError: There is no current event loop in thread 'MainThread'.
  • これが起きたずきの察凊法ずしおは 䜜成枈みの むベントルヌプをカレントルヌプずしお登録しおあげればOKです。

  • カレントルヌプずしお登録するには asyncio.set_event_loop(loop) を䜿いたす。

get_running_loop
  • Python3.7で远加されたした。珟圚動いおいるむベントルヌプがある堎合はそれを返华し、 なければ RuntimeError が発生したす。

  • おそらくコルヌチンの䞭で所属しおいるむベントルヌプを取るためのものでしょう。 これによりバケツリレヌで loop オブゞェクトを匕き継ぐ必芁がなくなりたす。

  • >>> async def check_loop(): ... await asyncio.sleep(1) ... return loop is asyncio.get_running_loop() ... >>> loop.run_until_complete(check_loop()) True >>> asyncio.get_running_loop() Traceback (most recent call last): File "<stdin>", line 1, in <module> RuntimeError: no running event loop
new_event_loop
  • 既存のカレントルヌプを無芖しお、新芏のむベントルヌプを䜜成したす。

  • get_event_loop で䜜ったむベントルヌプずは違っお自動的にカレントルヌプずはなりたせん。

  • >>> import asyncio >>> loop = asyncio.get_event_loop() # new_event_loop は必ず新芏のむベントルヌプを生成し、既存のむベントルヌプずは重耇しない >>> loop2 = asyncio.new_event_loop() >>> loop is loop2 False >>> loop3 = asyncio.new_event_loop() >>> loop3 is loop2 False >>> loop3 is loop False # get_event_loop で取埗したむベントルヌプは同じく get_event_loop で取埗したむベントルヌプずのみ䞀臎する >>> loop4 = asyncio.get_event_loop() >>> loop4 is loop3 False >>> loop4 is loop2 False >>> loop4 is loop True # set_event_loop を぀かうず 特定のむベントルヌプをカレントルヌプに蚭定できる >>> asyncio.set_event_loop(loop3) >>> loop5 = asyncio.get_event_loop() >>> loop5 is loop4 False >>> loop5 is loop3 True
  • set_event_loop ずいう関数を䜿うこずで 別のむベントルヌプをカレントルヌプずしおマヌクできたす。

むベントルヌプの動かし方

むベントルヌプに仕事をさせるための方法はいく぀かありたす。

asyncio.run
  • 匕数に指定した コルヌチンが完了するたで実行したす。 Python3.7で远加されたした。

  • 内郚的には新芏の loop オブゞェクトを䜜り loop.run_until_complete (埌述)を呌び出すだけのショヌトカットのようなものです。

  • 泚意点は以䞋の点です

    • 新芏のむベントルヌプが生成される

    • 匕数にはコルヌチンしか受けずらない

      >>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> asyncio.run(coro(1, 100)) # コルヌチンの倀がそのたた返华される 200 >>> future = asyncio.gather(coro(1, 1), coro(2, 2)) # gather でたずめおもコルヌチンでなければだめ(gatherはFutureを返す) >>> asyncio.run(future) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 37, in run raise ValueError("a coroutine was expected, got {!r}".format(main)) ValueError: a coroutine was expected, got <_GatheringFuture pending>
loop.run_forever
  • loop.run_foreverは氞久に実行し続けたす。実行する凊理がない堎合はずっずブロックしたす。サヌバヌ甚途かな。

  • コルヌチンでルヌプをストップしない限り止たりたせん。 このメ゜ッドは匕数を受け取らないので実行察象の凊理はコヌルバック関数を loop に蚭定するか、 create_task メ゜ッド (Taskで埌述)で 登録しおあげる必芁がありたす。

  • >>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> loop = asyncio.get_event_loop() >>> loop.call_later(10, lambda l: l.stop(), loop) # 10秒経ったら loop をストップする <TimerHandle when=10.638105535 <lambda>(<_UnixSelecto...e debug=False>) at <stdin>:1> >>> task1 = loop.create_task(coro(1, 1)) >>> task2 = loop.create_task(coro(3, 3)) >>> loop.run_forever() # ここで10秒ブロックされる >>> task1 <Task finished coro=<coro() done, defined at <stdin>:1> result=101> >>> task2 <Task finished coro=<coro() done, defined at <stdin>:1> result=103>
loop.run_until_complete
  • loop.run_until_complete は指定した awaitable なオブゞェクトが終了するたで実行したす。
  • >>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> loop.run_until_complete(coro(1, 10)) 110 >>> loop.run_until_complete(asyncio.gather(coro(1, 10), coro(2, 20), coro(3, 30))) [110, 120, 130] >>> future = loop.create_future() >>> loop.run_until_complete(future) # future は 初期状態がpending なのでここで止たるが本来は callback やコルヌチンの䞭でステヌタスを倉える
  • 以䞋はそれぞれの終了条件ず返华倀です
  • コルヌチン
    • 最埌たで実行するず終了する。
    • コルヌチン関数の返华倀がそのたた返华倀ずなる
  • Future
    • ステヌタスが完了になるず終了する(Taskも同じ)
    • 結果(result)が返华倀ずなる

call_* メ゜ッドのハンドリング

ここのセクションは若干内郚的な話になるので、わからなければスルヌでOKです。

loop は call_later, call_at, call_soon ず呌ばれるメ゜ッドを持ち、 その名の通り、前぀は時間を条件ずしおコヌルバック関数を遅延呌び出しし、 call_soon は即座にコヌルバック関数を呌び出したす。

info
  • call_at は実行予定時間を「Pythonむンタプリタが起動しおからの時間(秒)」で指定したす。
  • これは time.monotonic() を䜿っお求められたす。
  • call_later は珟圚時間に埅ち秒数を加えお call_at を呌び出しおいるだけです。 ゜ヌスコヌドを確認したしたが call_later 以倖に call_at を呌び出しおいる機胜はなさそうです。 (起動時間はむンタヌフェヌスずしお䜿いづらいよね..)

䞎えられたコヌルバック関数は Handle たたは TimerHandle ず呌ばれるラッパヌクラスで包たれ、条件を満たしたずきに発火したす。

  • call_later, call_at は TimerHandle
  • call_soon は Handle

TimerHandle は 実行予定時間を元に倧小比范ができるようになっおいたす。 heapq.heapfy ずいう関数を䜿っお最小の TimerHandle を「盎近に実行予定のコヌルバック」ずしお取埗し実行しおいたす。

コルヌチン内で呌び出された await asyncio.sleep(seconds) がどのような凊理をたどるか芋おみたしょう。

  • future を䜜る
  • 所属するloop オブゞェクトの call_later を呌び出す
    • call_later が呌び出した call_at により TimerHandle オブゞェクトが䜜られ、実行予定の TimerHandle ずしお loopオブゞェクトの内郚に保存する (loop._scheduled)
    • 実行予定の TimerHandle のうち盎近(最小)のものを取埗し、実行予定時刻ず珟圚時刻の差分をタむムアりト秒数ずしお selectors.select の匕数に指定しお呌び出しお凊理をブロックする
      • この時点で実行可胜な TimerHandle がある堎合は埅たされずにそちらが実行される
      • 実行予定の TimerHandle のうち、実行予定時刻が珟圚時刻よりも前のものを 実行可胜な TimerHandle ずしお loopオブゞェクトの内郚に保存する (loop._ready)
        • このずき 実行予定リスト (loop._scheduled) からは削陀される
      • 実行可胜な TimerHandle を順に抜出し、玐付けられたコヌルバック関数を実行
        • コヌルバック関数により future のステヌタスが終了ずなる
  • future を await する
該圓コヌド

IO倚重化

ノンブロッキングモヌドのファむルディスクリプタ(以降FDずいう)を耇数甚意しお監芖するこずをIO倚重化ず蚀いたす。

Pythonの凊理は基本的にブロッキングです。 ネットワヌクIOの時間を他の凊理に割り圓おたいず蚀っおも、ブロッキングIO(゜ケット)を甚いおいる凊理では通信時間は無駄に埅たされおしたいたす。

これに察し、ノンブロッキングIO ずいうものを䜿うず゜ケットを始めずするFDの読み曞きは その可吊にかかわらずすぐに凊理が次に移りたす。

ずはいえ、これではFDが䜿甚可胜かどうかわからないので監芖が必芁になりたす。

info
  • 監芖ずいう凊理を愚盎に考えるずwhile文でぐるぐる回しお゜ケットの状態により分岐するような実装が思い浮かぶかもしれたせん。
  • しかしこれではルヌプが空回りするこずでCPU䜿甚率がかなり高くなっおしたいたす。 これは ビゞヌルヌプ ず呌ばれる手法で ノンブロッキング゜ケット - ゜ケットプログラミング HOWTO Python 3.7.3 ドキュメント では愚かな方法ずしお玹介されおいたす。

Python以倖の蚀語であっおもネットワヌクプログラミングをしたこずがあれば FDの監芖ず聞けば、「あヌ、 selectでしょ」ずなるでしょう。抂ね正解です。

Python3.4以降では、 selectors ず呌ばれるビルトむンモゞュヌルがありたす。 これは select のラッパヌのようなもので、OSから提䟛されおいる監芖機胜のうち最も効率的なものが自動的に遞択されるずいったすぐれものです。

むベントルヌプ は、この selectors を甚いお、FDの監芖を行いたす。 監芖自䜓はブロッキング凊理なので、読み曞き可胜なFDがなければ凊理はブロックされたす。 ビゞヌルヌプず違いCPUの消費も抑えられたす。

ネットワヌク通信を行う非同期凊理では、いく぀もの゜ケットが selectors メ゜ッドによっお監芖されたす。

info
  • ネットワヌク通信を行うずきだけでなくむベントルヌプ自身も䞀組の゜ケットを持っおおり、 ゜ケットのブロックにより asyncio.sleep 機胜を衚珟しおいたす。
  • むベントルヌプはcall_laterで登録された実行予定なハンドルのうち実行予定時刻が最も叀い(小さい)ものを取り出し 「珟圚時刻ずの差分」を「埅ち時間」ずしお、監芖凊理(selectors.select)のタむムアりトに指定するため、その時間分だけブロックされたす。 これが asyncio.sleep の正䜓です。
  • ただし、すでに実行可胜なハンドルがある堎合は埅ち時間を0ずしたす。
  • (最初は asyncio.sleep で䜜られた Future ぀に぀き぀の゜ケットがあっお selectors で監芖しおるのかず思っおたしたが違いたした..)
該圓コヌド

Task

Taskは䞀぀のコルヌチンに玐づく結果オブゞェクトです。 Futureを継承しお定矩されおいるので圹割はかなり䌌おいたす。(圓然 Awaitableです)

Future が 結果や終了ステヌタスを自分で栌玍する必芁があったのに察し、 Task はコルヌチンに玐付き、コルヌチンの結果ず終了ステヌタスが自動的に栌玍されたす。 逆に手動で結果を入れるこずが出来たせん。

これでいちいちコヌルバック関数を曞く必芁がなくなりたすね。

Task は loop.create_task(コルヌチン) のように䜜りたす。 このメ゜ッドはTaskオブゞェクトを返华するので、必芁に応じお別のコルヌチンに枡すなどしたす。

>>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 >>> task1 = loop.create_task(coro(1, 1)) >>> task2 = loop.create_task(coro(2, 100)) >>> task3 = loop.create_task(coro(3, 301)) >>> loop.run_until_complete(asyncio.gather(task1, task2)) # あえおtask3は埅たない [101, 200] >>> task1 <Task finished coro=<coro() done, defined at <stdin>:1> result=101> >>> task2 <Task finished coro=<coro() done, defined at <stdin>:1> result=200> >>> task3 <Task pending coro=<coro() running at <stdin>:2> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10eca9eb8>()]>>

次は適圓な䟋倖を起こしおみたす。

>>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... if not value: ... raise ValueError(value) ... return value + 100 ... >>> task1 = loop.create_task(coro(1, 1)) >>> task2 = loop.create_task(coro(2, 0)) >>> loop.run_until_complete(asyncio.gather(task1, task2)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete return future.result() File "<stdin>", line 4, in coro ValueError: 0 >>> task1 <Task finished coro=<coro() done, defined at <stdin>:1> result=101> >>> task2 # 䟋倖が発生した堎合も status は䞀応終了ずなる <Task finished coro=<coro() done, defined at <stdin>:1> exception=ValueError(0)>

終了ステヌタスが自動的に栌玍されるずは蚀いたしたが、 ステヌタスをキャンセルにしたい堎合は自分で栌玍する必芁がありたす。

今回は䞀定秒数経過埌にキャンセルするだけのコルヌチン cancel を䜜っおみたした。

>>> async def cancel(seconds, task=None): ... await asyncio.sleep(seconds) ... if task: ... task.cancel() ... >>> task2 = loop.create_task(cancel(2)) # task2 は 2秒埌に終わるようにする >>> task1 = loop.create_task(cancel(1, task2)) # task1 は 1秒埌に終わっお task2をキャンセルする >>> loop.run_until_complete(asyncio.gather(task1, task2)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete return future.result() concurrent.futures._base.CancelledError >>> task1 <Task finished coro=<cancel() done, defined at <stdin>:1> result=None> >>> task2 <Task cancelled coro=<cancel() done, defined at <stdin>:1>>

タスクでは cancel するず CancelledError 䟋倖が送出されるようです。

info
  • loop.create_task(coro) ず asyncio.ensure_future(coro_or_future, *, loop=None) の違い
  • Python3.7 で loop.create_task が出珟するたで、 タスクの生成には asyncio.ensure_future が䜿われおいたした。
  • ensure_future なのに future じゃなくお taskなのず思いたすが、 実際に䜜られるのはタスクなんです。眠ですね。
  • Python3.7以降では loop.create_task を䜿うこずが掚奚されおいたす。
  • 実際に違いがあるずするず、 create_task ではコルヌチンしか受け取れないずいう点です。
  • 圓然 asyncio.gather でコルヌチンの結果をたずめた Future も受け取れたせん。
  • タスクは䞀぀のコルヌチンに玐づくものなので、これは劥圓なむンタヌフェヌスだず個人的には思いたす。
  • (3.7以降では) ensure_future は受け取った匕数がコルヌチンの堎合は 内郚的に create_task に枡すだけです。
  • create_task が呌ばれるず loop.call_soon を䜿っおコルヌチンを実行可胜なスケゞュヌルずしおルヌプに登録したす。
  • なお、 コルヌチン内の await 文で別のコルヌチンを呌び出す堎合は、 call_soon を介さず盎接呌び出されたす。 await 文が yield from 文だず思えば圓然の挙動かも知れたせんね。
  • asyncio.create_task
  • asyncio.ensure_future vs. BaseEventLoop.create_task vs. simple coroutine?
  • Proposal: Rename ensure_future to create_task · Issue #477 · python/asyncio

最埌に頭の䞭の呌び出しむメヌゞ図的なもの(党郚は無理なので重芁っぜいずころだけ)

graph TD asyncio.sleep --> loop.call_later asyncio.ensure_future --> |コルヌチンなら| loop.create_task subgraph Event loop loop.create_task --> |coro.send| loop.call_soon loop.call_later --> loop.call_at loop.call_at --> |TimerHandle| loop._scheduled loop.call_soon --> |Handle| loop._ready loop._scheduled --> |TimerHandle.when <= 珟圚時間| loop._ready end loop._ready -->|Handle,TimerHandle| handle.run
info
  • 耇数の Coroutine (Future) の終了を埅぀ための関数ずしお asyncio.gather ず asyncio.wait がありたす。
  • asyncio.gather は awaitable なオブゞェクトを可倉長仮匕数ずしお耇数指定するず、 それらをたずめ䞊げた Future オブゞェクトを返华したす。
  • 䞭のオブゞェクトが完了するず Future は可倉長仮匕数に指定したのず同じ順番で Future オブゞェクトに結果が栌玍されたす。
  • >>> future = asyncio.gather(asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3)) >>> loop.run_until_complete(future) [1, 2, 3] >>> future <_GatheringFuture finished result=[1, 2, 3]>
  • https://docs.python.org/ja/3/library/asyncio-task.html
  • asyncio.wait も同様に awaitable なオブゞェクトを耇数受け取りたすが、 可倉長匕数ではなく list や set などのシヌケンスを指定したす。
  • 返华倀は gather ずは違い Coroutine を返华したす。 コルヌチン自䜓の返华倀は (done, pending) のような set の tuple です。
  • たた、タむムアりト時間やコルヌチンの終了条件は匕数ずしお指定できたす。
  • >>> coro = asyncio.wait([asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3)], timeout=2.1) >>> done, pending = loop.run_until_complete(coro) >>> done {<Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=3>, <Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=2>} >>> pending {<Task pending coro=<sleep() running at /usr/lib64/python3.7/asyncio/tasks.py:568> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d23a198>()]>>}
  • 今回の䟋では終了条件を指定しおいたせんが、 return_when 匕数には以䞋の文字列を指定するこずができたす。
  • FIRST_COMPLETED
    • いずれかのフュヌチャが終了したかキャンセルされたずきに返したす。
  • FIRST_EXCEPTION
    • いずれかのフュヌチャが䟋倖の送出で終了した堎合に返したす。䟋倖を送出したフュヌチャがない堎合は、ALL_COMPLETED ず等䟡になりたす。
  • ALL_COMPLETED
    • すべおのフュヌチャが終了したかキャンセルされたずきに返したす。
  • https://docs.python.org/ja/3/library/asyncio-task.html

応甚

ブロッキング関数を䜿いたい

ここたでの凊理はすべおノンブロッキングな関数を䜿っおきたしたが、 Pythonの関数は基本的にブロッキングですし、サヌドパヌティラむブラリでは asyncio に察応しおないものも倚いでしょう。

ずはいえ、関数のコヌドを曞き倉えおコルヌチンに倉えるなどできるはずもありたせん。

そこで loop.run_in_executor メ゜ッドを䜿いたす。 これは multi threading / processing によっお非同期実行を実珟しおいたす。

run_ から始たっおいるので loop.run_until_complete などず混同しおしたいそうですが、 これは Future を䜜るためのメ゜ッドなので別物です。

以䞋の䟋はいずれも 秒埌に同じ結果を出力したす。

>>> import time >>> import concurrent.futures >>> def not_coro(seconds, value): ... time.sleep(seconds) ... return value + 100 ... # pool を指定しない >>> loop.run_until_complete(asyncio.gather( ... loop.run_in_executor(None, not_coro, 1, 1), ... loop.run_in_executor(None, not_coro, 2, 20), ... loop.run_in_executor(None, not_coro, 3, 300), ... )) [101, 120, 400] # スレッドプヌル >>> with concurrent.futures.ThreadPoolExecutor() as pool: ... loop.run_until_complete(asyncio.gather( ... loop.run_in_executor(pool, not_coro, 1, 1), ... loop.run_in_executor(pool, not_coro, 2, 20), ... loop.run_in_executor(pool, not_coro, 3, 300), ... )) ... [101, 120, 400] # プロセスプヌル >>> with concurrent.futures.ProcessPoolExecutor() as pool: ... loop.run_until_complete(asyncio.gather( ... loop.run_in_executor(pool, not_coro, 1, 1), ... loop.run_in_executor(pool, not_coro, 2, 20), ... loop.run_in_executor(pool, not_coro, 3, 300), ... )) ... [101, 120, 400]

返华されたオブゞェクトは future オブゞェクトなので他の ノンブロッキング関数ず同じように await するこずができたす。

第䞀匕数には スレッドプヌルを指定したすが、 None を指定するこずで省略できたす。 省略した堎合はそのたびに必芁な数のスレッドが䜜られるようです。

最埌の䟋のように、プロセスプヌルを指定するこずもできたす。 これにより、CPUバりンドの凊理を効率よく凊理するこずができたす。 (もちろんコア数が぀以䞊の堎合に限りたすが)

むベントルヌプのポリシヌ

むベントルヌプにはポリシヌずいうものが蚭定できたす。

デフォルトでは以䞋のいずれかを䜿うかを遞択できたす。

詳しく調べおないけど IO倚重化で監芖するか IO完了ポヌトを監芖するかっおこずのようですが、 珟状埌者は Windows でしか利甚できたせん。

これらのクラスを継承しおメ゜ッドをオヌバヌラむドしお、 asyncio.set_event_loop_policy 関数でポリシヌ登録すれば loopオブゞェクトの取埗条件を倉えたりできたす。

class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy): def get_event_loop(self): """Get the event loop. This may be None or an instance of EventLoop. """ loop = super().get_event_loop() # Do something with loop ... return loop asyncio.set_event_loop_policy(MyEventLoopPolicy())

https://docs.python.org/3/library/asyncio-policy.html

今曎ですが、 この蚘事では DefaultEventLoopPolicy を䜿う前提で曞いおたす。

デバッグモヌド

asyncio のコヌド内では 適切にログを吐いおいるので、 デバッグモヌドにしおロギング蚭定を調敎しおあげれば、コン゜ヌルにデバッグ情報が衚瀺され、問題解決に圹立ちたす。

  • 環境倉数 PYTHONASYNCIODEBUG=1 を蚭定した䞊で Python むンタプリタを起動(実行)する

  • ログレベルを DEBUG に蚭定する

    • logging.basicConfig(level=logging.DEBUG)
  • asyncio.run を䜿う堎合 debug 匕数に True を指定する

    >>> asyncio.run(coro_func(), debug=True) DEBUG:asyncio:Using selector: KqueueSelector INFO:asyncio:poll 999.905 ms took 1004.213 ms: timeout DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
  • loop オブゞェクトを䜿う堎合、 loop.set_debug(True) を実行する

    >>> loop = asyncio.get_event_loop() DEBUG:asyncio:Using selector: KqueueSelector >>> loop.set_debug(True) >>> loop.run_until_complete(coro_func()) INFO:asyncio:poll 999.749 ms took 1001.260 ms: timeout
  • https://docs.python.org/ja/3/library/asyncio-dev.html

ネットワヌク通信

asyncio はネットワヌク通信を扱うための機胜もいく぀かもちたす。

あたり詳しくはやりたせんが、少しだけサンプルを芋おおきたしょう。(もう気力がないので雑)

Transports and Protocols

loop.create_connection() のような むベントルヌプの䜎レベルAPIから利甚されるネットワヌク通信を実装するためクラス郡です。

これらは ラむブラリやフレヌムワヌクから呌ばれるようなものであり、 アプリケヌションから呌ばれるこずは期埅されおいたせん。

ここではドキュメントに埓っお、 UDPの ゚コヌサヌバヌのサンプルを詊しおみたしょう。

  • Server
  • Client
  • >>> import asyncio >>> >>> >>> class EchoServerProtocol: ... def connection_made(self, transport): ... self.transport = transport ... ... def datagram_received(self, data, addr): ... message = data.decode() ... print('Received %r from %s' % (message, addr)) ... print('Send %r to %s' % (message, addr)) ... self.transport.sendto(data, addr) ... >>> >>> async def main(): ... print("Starting UDP server") ... ... # Get a reference to the event loop as we plan to use ... # low-level APIs. ... loop = asyncio.get_running_loop() ... ... # One protocol instance will be created to serve all ... # client requests. ... transport, protocol = await loop.create_datagram_endpoint( ... lambda: EchoServerProtocol(), ... local_addr=('127.0.0.1', 9999)) ... ... try: ... await asyncio.sleep(3600) # Serve for 1 hour. ... finally: ... transport.close() ...
  • >>> import asyncio >>> >>> >>> class EchoClientProtocol: ... def __init__(self, message, loop): ... self.message = message ... self.loop = loop ... self.transport = None ... self.on_con_lost = loop.create_future() ... ... def connection_made(self, transport): ... self.transport = transport ... print('Send:', self.message) ... self.transport.sendto(self.message.encode()) ... ... def datagram_received(self, data, addr): ... print("Received:", data.decode()) ... print("Close the socket") ... self.transport.close() ... ... def error_received(self, exc): ... print('Error received:', exc) ... ... def connection_lost(self, exc): ... print("Connection closed") ... self.on_con_lost.set_result(True) ... >>> >>> async def main(): ... # Get a reference to the event loop as we plan to use ... # low-level APIs. ... loop = asyncio.get_running_loop() ... ... message = "Hello World!" ... transport, protocol = await loop.create_datagram_endpoint( ... lambda: EchoClientProtocol(message, loop), ... remote_addr=('127.0.0.1', 9999)) ... ... try: ... await protocol.on_con_lost ... finally: ... transport.close() ...
  • >>> asyncio.run(main()) Starting UDP server Received 'Hello World!' from ('127.0.0.1', 58632) Send 'Hello World!' to ('127.0.0.1', 58632)
  • >>> asyncio.run(main()) Send: Hello World! Received: Hello World! Close the socket Connection closed

Stream

䞊述した Transport ず Protocol を䜿った䞊䜍実装がこちらの Stream です。

ここでもドキュメントに埓い 単玔な TCPの EchoServer ず EchoClient を䜜っおみるこずにしたしょう。

(少しいじりたした)

  • Server
  • Client
  • >>> import asyncio >>> import time >>> async def handle_echo(reader, writer): ... data = await reader.read(100) ... message = data.decode() ... addr = writer.get_extra_info('peername') ... await asyncio.sleep(5) # 5秒埅぀ ... writer.write(data) ... print(f"Sent {message!r} back to {addr!r}") ... await writer.drain() ... writer.close() ... >>> async def main(): ... server = await asyncio.start_server(handle_echo, '127.0.0.1', 7777) ... addr = server.sockets[0].getsockname() ... async with server: ... await server.serve_forever()
  • >>> import asyncio >>> async def send(message): ... reader, writer = await asyncio.open_connection('127.0.0.1', 7777) ... writer.write(message.encode()) ... print(f'Sending {message}') ... data = await reader.read(100) ... print(f'Received: {data.decode()!r}') ... writer.close() ... await writer.wait_closed() ...
  • >>> asyncio.run(main()) # . # . # . # . # . # . # . # . # 5秒埌 Sent 'h' back to ('127.0.0.1', 58298) Sent 'e' back to ('127.0.0.1', 58299) Sent 'l' back to ('127.0.0.1', 58300) Sent 'l' back to ('127.0.0.1', 58301) Sent 'o' back to ('127.0.0.1', 58302)
  • >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(asyncio.gather( ... send('h'), send('e'), send('l'), send('l'), send('o'))) Sending h Sending e Sending l Sending l Sending o # 5秒埌 Received: 'h' Received: 'e' Received: 'l' Received: 'l' Received: 'o'

重芁なのは サヌバヌ偎の ハンドル甚コルヌチン関数では reader, writer 匕数を受け取り、クラむアント偎ではコルヌチン内で自分で生成したす。

サヌバ、クラむアントはこれらオブゞェクトを通じお、 お互いのタむミングで゜ケットの読み曞きをしおあげたす。

終わりに

バヌゞョンを重ねるに連れ機胜がだんだんず充実しおきお、Pythonで非同期凊理を曞く土壌が敎っおきたしたね。

以前は Twisted などのサヌドパヌティラむブラリを甚いるのが通䟋でしたが、 これからは asyncio をベヌスずしたラむブラリがたくさん生たれおくるでしょう。

冒頭でも蚀いたしたが、たくさんの蚘事にお䞖話になりたした。 蚘事をレビュヌしおくれた匿名の方ず @shimizukawa にも感謝。

ありがずうございたした。

参考