大変おまたせしました。 数年前からずっと書く書くと詐欺してきた asycnio の記事です。
日本語のasyncio関連のブログエントリは大体読んだ気がします。(英語の記事も少し)
リファレンスが多いので、ページ末に参考URLをリンクしてあります。詳しくはそちらも参照ください。
- info
- asyncio はいろいろな概念が同居しており、学習コストが高めです。
- 「なんとなく」とは書きましたが、割と詳細に書いているので概要だけ知りたいという方は 「概要」セクションだけ読んでブラウザバックしたほうがいいかもしれません。タイトル詐欺ですね。
- 読み進める上でジェネレータの概念が必要ですので、わからないという方は先にこちらからどうぞ。 [Python] 部屋とYシャツとイテレータとジェネレータと私 (何だこのクソみたいなタイトル)
- なお、この記事では Python 3.7 を使います。 3.7以上でしか使えないメソッドもあるので、写経するときはバージョンには気をつけてください。
概要
asyncio は Python3.4 から導入された 非同期処理を扱うための標準ライブラリです。
勘違いしてはいけないのは あくまでシングルスレッド下で動作するもので、 マルチスレッドやマルチプロセスのような並行処理ではないということです。
GIL の制限により完全に並列できないとはいえ、マルチスレッドでは別スレッドに干渉することなく処理はほぼ同時に並行してつき進んでいきます。 コールバック関数やグローバル変数などを使って同期することはできますが、スレッド達はお互いの存在なんて知ったこっちゃありません。
これに対し asyncio は 基本的にシングルスレッドで動作し、試行した複数の処理は 順次に処理され、 ネットワーク等のIO待ち時間にぶつかると実行可能な別の処理を実行する、といった具合に無駄なIO待ち時間を有効活用すべく、別の処理を割り当てます。
gantt
title asyncioの割当イメージ(単位に特別な意味はない
dateFormat YYYY-MM-DD
axisFormat %d
section 処理1
CPU時間 :crit, a1, 2000-01-01, 1d
IO時間 :active, a2, after a1, 3d
CPU時間 :crit, a3, after c1, 2d
IO時間 :active, a4, after a3, 4d
CPU時間 :crit, a5, after a4, 1d
section 処理2
CPU時間 :crit, b1, 2000-01-02, 2d
IO時間 :active, b2, after b1, 2d
CPU時間 :crit, b3, after a3, 1d
IO時間 :active, b4, after b3, 4d
section 処理3
CPU時間 :crit, c1, 2000-01-04, 2d
IO時間 :active, c2, after c1, 3d
CPU時間 :crit, c3, after b3, 2d
IO時間 :active, c4, after c3, 3d
gantt
title この例では直列に処理すると倍くらいかかる
dateFormat YYYY-MM-DD
axisFormat %d
section 処理1
CPU時間 :crit, a1, 2000-01-01, 1d
IO時間 :active, a2, after a1, 3d
CPU時間 :crit, a3, after a2, 2d
IO時間 :active, a4, after a3, 4d
CPU時間 :crit, a5, after a4, 1d
section 処理2
CPU時間 :crit, b1, after a5, 2d
IO時間 :active, b2, after b1, 2d
CPU時間 :crit, b3, after b2, 1d
IO時間 :active, b4, after b3, 4d
section 処理3
CPU時間 :crit, c1, after b4, 2d
IO時間 :active, c2, after c1, 3d
CPU時間 :crit, c3, after c2, 2d
IO時間 :active, c4, after c3, 3d
どちらの図もCPU時間は重複しませんが、上の図はIO時間とCPU時間が重なりその分が短縮されるわけです。
このため、asyncio は (基本的に) ネットワーク通信等の IO時間が長くてブロックされるような処理では 効率化により 高速化が望めますが、 CPU時間が長い処理では実行完了を待ち続けることとなり、 効率化されないため 高速化は望めません。
主な登場人物
さて、ここで asyncio を使う上で重要な機能を見ていくことにしましょう。
Future
Futureは処理の結果を格納するためのオブジェクトです。
JavaScript を触ったことがある方なら Promise
といえばわかるかもしれませんね。
これは Future_パターン という並行処理におけるデザインパターンの asyncio 実装です。
結果オブジェクトだけを先に生成し、 処理が完了した段階で結果を格納することで処理が完了タイミングを気にせず実装ができます。
Future には 「結果」と「ステータス」という属性があり、コールバック関数などを使って明示的に格納してあげます。
ステータスは 「pending」「finished」「cancelled」の3種類があり、 初期状態は「pending」で、終了状態は「finished」か「cancelled」のいずれかです。
pending から終了状態にどのように遷移するか見てみましょう。
- finishedにする
- cancelledにする
- ステータスを finished にするには、 future.set_result というメソッドを使います。 このメソッドにより結果を格納されると自動的に futureオブジェクトのステータスは finished (終了) に移行します。 値を格納せずにステータスを finished に変えることはできないようです。
- ステータスを cancel に変更するには future.cancel というメソッドを使います。 これにより futureオブジェクトのステータスは cancelled (キャンセル) に移行します。 値を格納することはできないようです。
-
>>> future = loop.create_future() >>> future.set_result(100) >>> future <Future finished result=100> >>> future.done() True >>> future.cancel() # 一度終了したらキャンセルは出来ない False >>> future.cancelled() # キャンセルにならない False
-
>>> future = loop.create_future() >>> future.cancel() True >>> future <Future cancelled> >>> future.done() # キャンセルも一応終了と言えるので 真となる True >>> future.cancelled() # キャンセルになる True >>> future.set_result(100) # 一度キャンセルしたら結果を格納できない Traceback (most recent call last): File "<stdin>", line 1, in <module> asyncio.base_futures.InvalidStateError: invalid state
Future オブジェクト単体では何ができるというわけではありませんが、 後述する機能の中で利用されることになるので最初に説明しました。
- warning
- 空のFuture オブジェクトは
loop
オブジェクト(後述)のcreate_future
を使って生成することが推奨されています。 - Futureクラスを直に触るのはやめましょう。
- 空のFuture オブジェクトは
- 該当コード
Coroutine
- info
- ここで説明するコルーチンとはあくまで asyncio専用のコルーチンであり、 他言語のコルーチンの定義と必ずしも一致するものではありません。
コルーチンは asyncio で実行する処理のことです。
そしてコルーチンを宣言するための文が async
文です。
小難しい単語が出てきましたが、コルーチンの実態は概ねジェネレータです。
すこしだけ歴史をさかのぼってみます。
Python3.3 (PEP380) で yield from
文という構文が登場しました。
詳しくはサブジェネレータへの委譲と言うそうです。(iterableならなんでも受け付けるんですがそれは..)
「また新しいのが出てきた..」と不安に感じたかもしれませんが、そこまで身構える必要はありません。
ドキュメントにも書いてあるとおり、 yield from iterable
は
for item in iterable: yield item
とほぼ同じ意味です。
つまりこの文がある関数はジェネレータなのです。
とはいえ形だけを覚えても、おそらく yield from
文のありがたさを感じることはできないでしょう。
重要なのは 指定されたオブジェクトの要素をすべてイテレーションする ということです。 つまりここにジェネレータが指定された場合(以降サブジェネレータという)、 サブジェネレータのイテレーションがすべて済んでから次の処理に進むことになります。
この仕組みによってコルーチンは「待つ(await)」を実現しています。(ブロッキングについては後述)
コルーチンはその性質により、「イテレーション自体の返却値(yield
)」が使われないため、
仕組みとしてはジェネレータと同じでも目的は大きく異なると言えます。
(コルーチン自体の返却値(return
)は Future
の結果として重要なので混同しないように注意)
実際 Python3.4 までは yield from
文が使われていましたが Python3.5
で登場した await
文に取って代わりました。
- info
yield from
は for文で回しているのとほぼ同じ意味、と言いましたが それは内部的な話で実は若干の違いがあります。-
yield from
はコルーチンを受け取って処理できますが、 実はコルーチンはイテラブルではないので for文で回すことができません。- error
- TypeError: 'coroutine' object is not iterable
-
yield from
にサブジェネレータが指定された場合は、その返却値(return
)が左辺に格納されます。
-
await 文に指定できるオブジェクトを Awaitable といいます。 Awaitable なオブジェクトはデフォルトで Coroutine と Future、Task(後述)の3つです。
もう少し詳しい話をするとスペシャルメソッドの __await__
が定義されているものとなりますが、
自分でクラスを定義したりサードパーティライブラリを導入しなければ気にする必要はないです。
なお、未完了なFutureは loopのコールバックや別のコルーチンの中で(Futureが)操作され ステータスが終了することを想定しています。
await の挙動について
await文が未完了なFutureを受け取ると、そのコルーチンをブロックし、別の実行可能なコルーチンを実行します。 途中でFutureのステータスが完了に移行すると、ブロックが解除されコルーチンが再開します。
最初から完了したFutureをawait文に指定された場合は普通にスルーされるだけです。
冒頭で IO待ち時間にぶつかると実行可能な別の処理を割り当てる
といいましたが、 「IO待ち時間にぶつかるまで」というのが、
「await文が未完了なFutureに遭遇するまで」ということになります。
await文がコルーチンを受け取ると、 上記のようにIO待ち時間にぶつかるまでコルーチンを呼び出しながら処理を繋いでいきます。
flowchart TD
direction LR
subgraph coro1
a[処理1]-->b[result = await coro2];
j[return result];
end
subgraph coro2
c[await future<br />DONE]-->d[result = await coro3];
b-->c;
i[return result]-->j;
end
subgraph coro3
d-->e;
e[result = await future<br />PENDING]-->f[return result];
f-->i;
end
subgraph others
e-->g[ブロックしている間<br />他の実行可能なコルーチンに<br />処理が移る];
end
少しつながってきましたね。
- warning
- asyncio の関数ではコルーチンを引数に受け取るものがありますが、 その場合に指定するのは「コルーチンオブジェクト」であって、「コルーチン関数」ではないので注意してください。
- コルーチン関数を実行した返却値がコルーチンオブジェクトです。
async と await文を使って、1秒待った後に何らかの文字を出力するコルーチン関数は以下のように書けます。
async def sleep_and_print(txt):
await asyncio.sleep(1)
print(txt)
この状態ではまだコルーチンは実行できないのでイベントループの章まで読み進めてください。
- info
async
/await
文は Python3.5から新たに登場した予約語です。- Python3.4のときは次のようにしてジェネレータとしてうまく処理していたのです。
-
- async
@asyncio.coroutine
デコレータでジェネレータ関数を囲む。(Python3.10でなくなるので使わないこと)-
@asyncio.coroutine def hello_world(): print("Hello World!")
-
- await
yield from
文を使う。-
@asyncio.coroutine def sleep(): yield from asyncio.sleep(1)
- 後方互換性維持のため、まだ
yield from
を使うこともできますが、 コルーチンの本来の用途から考えれば、await
文を使うのが適切です。 - なお、コルーチンで
yield from
が使えるのはasyncio.coroutine
デコレータによって定義した場合のみで、 async 文を使って定義したコルーチンの中でyield from
を使うと SyntaxError になります。 SyntaxError: 'yield from' inside async function
- つまりこれもいずれは使えなくなるので新しいコードでは使わないことです。
- 該当コード
イベントループ
JavaScript では async
文を使って作成した非同期関数はそのまま実行することができましたが、
Pythonのコルーチンはそのまま関数として実行しても処理されません。
コルーチンオブジェクトが生成されて終わりです。
これはコルーチンの実態がジェネレータだからというのが理由です。 ジェネレータはイテレーションを進めたときに初めて 最初のyield文まで 実行されますよね。
お察しの通り、このコルーチンを実行してくれる(イテレーションを進めてくれる)のがイベントループということになります。 IOの待ち時間には他の処理を割り当てるなどの裏方もやってくれる偉い子です。
- info
- 裏技というほどでもありませんが、
coro.send(None)
とすることで、コルーチン単体でイテレーションを進めることができます。 - 決して推奨しているわけではないので覚える必要はありません。
デバッグ用なら
asyncio.run(coro)
するのがいいと思います。 - なお、
None
以外を send するとエラーになります。
- 裏技というほどでもありませんが、
- error
- TypeError: can't send non-None value to a just-started coroutine
イベントループオブジェクトの取得方法
loop の取得方法には 現状3つの方法があります
- get_event_loop
- get_event_loop で生成したイベントループは自動的には カレントイベントループ (以降カレントループという) としてマークされ、二回目以降は カレントループが取得されます。
-
>>> import asyncio >>> loop = asyncio.get_event_loop() >>> loop2 = asyncio.get_event_loop() >>> loop2 is loop
- この記事のサンプルコードでは loop オブジェクトの定義を省略している箇所がありますが、 上記の方法で作成しているとお考えください。
- とりあえずイベントループの作成はこれだけ知ってれば大丈夫です。 概要だけを知りたいという方は以下は読み飛ばしていいです。
- どうやら
asyncio.run
を実行してからget_event_loop
を呼ぶとランタイムエラーが発生することがあるようです。- error
- RuntimeError: There is no current event loop in thread 'MainThread'.
- これが起きたときの対処法としては 作成済みの イベントループをカレントループとして登録してあげればOKです。
- カレントループとして登録するには
asyncio.set_event_loop(loop)
を使います。
- get_running_loop
- Python3.7で追加されました。現在動いているイベントループがある場合はそれを返却し、
なければ
RuntimeError
が発生します。 - おそらくコルーチンの中で所属しているイベントループを取るためのものでしょう。 これによりバケツリレーで loop オブジェクトを引き継ぐ必要がなくなります。
-
>>> async def check_loop(): ... await asyncio.sleep(1) ... return loop is asyncio.get_running_loop() ... >>> loop.run_until_complete(check_loop()) True >>> asyncio.get_running_loop() Traceback (most recent call last): File "<stdin>", line 1, in <module> RuntimeError: no running event loop
- Python3.7で追加されました。現在動いているイベントループがある場合はそれを返却し、
なければ
- new_event_loop
- 既存のカレントループを無視して、新規のイベントループを作成します。
get_event_loop
で作ったイベントループとは違って自動的にカレントループとはなりません。-
>>> import asyncio >>> loop = asyncio.get_event_loop() # new_event_loop は必ず新規のイベントループを生成し、既存のイベントループとは重複しない >>> loop2 = asyncio.new_event_loop() >>> loop is loop2 False >>> loop3 = asyncio.new_event_loop() >>> loop3 is loop2 False >>> loop3 is loop False # get_event_loop で取得したイベントループは同じく get_event_loop で取得したイベントループとのみ一致する >>> loop4 = asyncio.get_event_loop() >>> loop4 is loop3 False >>> loop4 is loop2 False >>> loop4 is loop True # set_event_loop をつかうと 特定のイベントループをカレントループに設定できる >>> asyncio.set_event_loop(loop3) >>> loop5 = asyncio.get_event_loop() >>> loop5 is loop4 False >>> loop5 is loop3 True
set_event_loop
という関数を使うことで 別のイベントループをカレントループとしてマークできます。
イベントループの動かし方
イベントループに仕事をさせるための方法はいくつかあります。
- asyncio.run
- 引数に指定した コルーチンが完了するまで実行します。 Python3.7で追加されました。
- 内部的には新規の loop オブジェクトを作り
loop.run_until_complete
(後述)を呼び出すだけのショートカットのようなものです。 - 注意点は以下の2点です
-
新規のイベントループが生成される
-
引数にはコルーチンしか受けとらない
>>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> asyncio.run(coro(1, 100)) # コルーチンの値がそのまま返却される 200 >>> future = asyncio.gather(coro(1, 1), coro(2, 2)) # gather でまとめてもコルーチンでなければだめ(gatherはFutureを返す) >>> asyncio.run(future) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 37, in run raise ValueError("a coroutine was expected, got {!r}".format(main)) ValueError: a coroutine was expected, got <_GatheringFuture pending>
-
- loop.run_forever
- loop.run_foreverは永久に実行し続けます。実行する処理がない場合はずっとブロックします。サーバー用途かな。
- コルーチンでループをストップしない限り止まりません。
このメソッドは引数を受け取らないので実行対象の処理はコールバック関数を
loop に設定するか、
create_task
メソッド (Taskで後述)で 登録してあげる必要があります。 -
>>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> loop = asyncio.get_event_loop() >>> loop.call_later(10, lambda l: l.stop(), loop) # 10秒経ったら loop をストップする <TimerHandle when=10.638105535 <lambda>(<_UnixSelecto...e debug=False>) at <stdin>:1> >>> task1 = loop.create_task(coro(1, 1)) >>> task2 = loop.create_task(coro(3, 3)) >>> loop.run_forever() # ここで10秒ブロックされる >>> task1 <Task finished coro=<coro() done, defined at <stdin>:1> result=101> >>> task2 <Task finished coro=<coro() done, defined at <stdin>:1> result=103>
- loop.run_until_complete
- loop.run_until_complete は指定した awaitable なオブジェクトが終了するまで実行します。
-
>>> async def coro(seconds, value): ... await asyncio.sleep(seconds) ... return value + 100 ... >>> loop.run_until_complete(coro(1, 10)) 110 >>> loop.run_until_complete(asyncio.gather(coro(1, 10), coro(2, 20), coro(3, 30))) [110, 120, 130] >>> future = loop.create_future() >>> loop.run_until_complete(future) # future は 初期状態がpending なのでここで止まるが本来は callback やコルーチンの中でステータスを変える
- 以下はそれぞれの終了条件と返却値です
-
- コルーチン
- 最後まで実行すると終了する。
- コルーチン関数の返却値がそのまま返却値となる
-
- Future
- ステータスが完了になると終了する(Taskも同じ)
- 結果(result)が返却値となる
call_*
メソッドのハンドリング
ここのセクションは若干内部的な話になるので、わからなければスルーでOKです。
loop は call_later
, call_at
, call_soon
と呼ばれるメソッドを持ち、
その名の通り、前2つは時間を条件としてコールバック関数を遅延呼び出しし、
call_soon は即座にコールバック関数を呼び出します。
- info
- call_at は実行予定時間を「Pythonインタプリタが起動してからの時間(秒)」で指定します。
- これは
time.monotonic()
を使って求められます。 - call_later は現在時間に待ち秒数を加えて call_at
を呼び出しているだけです。 ソースコードを確認しましたが
call_later
以外にcall_at
を呼び出している機能はなさそうです。 (起動時間はインターフェースとして使いづらいよね..)
与えられたコールバック関数は Handle
または TimerHandle
と呼ばれるラッパークラスで包まれ、条件を満たしたときに発火します。
- call_later, call_at は TimerHandle
- call_soon は Handle
TimerHandle は 実行予定時間を元に大小比較ができるようになっています。
heapq.heapfy
という関数を使って最小の TimerHandle
を「直近に実行予定のコールバック」として取得し実行しています。
コルーチン内で呼び出された await asyncio.sleep(seconds)
がどのような処理をたどるか見てみましょう。
- future を作る
- 所属するloop オブジェクトの
call_later
を呼び出すcall_later
が呼び出したcall_at
により TimerHandle オブジェクトが作られ、実行予定の TimerHandle として loopオブジェクトの内部に保存する (loop._scheduled
)- 実行予定の TimerHandle のうち直近(最小)のものを取得し、実行予定時刻と現在時刻の差分をタイムアウト秒数として
selectors.select の引数に指定して呼び出して処理をブロックする
- この時点で実行可能な TimerHandle がある場合は待たされずにそちらが実行される
- 実行予定の TimerHandle のうち、実行予定時刻が現在時刻よりも前のものを
実行可能な TimerHandle として loopオブジェクトの内部に保存する (
loop._ready
)- このとき 実行予定リスト (
loop._scheduled
) からは削除される
- このとき 実行予定リスト (
- 実行可能な TimerHandle を順に抽出し、紐付けられたコールバック関数を実行
- コールバック関数により future のステータスが終了となる
- future を await する
- 該当コード
IO多重化
ノンブロッキングモードのファイルディスクリプタ(以降FDという)を複数用意して監視することをIO多重化と言います。
Pythonの処理は基本的にブロッキングです。 ネットワークIOの時間を他の処理に割り当てたいと言っても、ブロッキングIO(ソケット)を用いている処理では通信時間は無駄に待たされてしまいます。
これに対し、ノンブロッキングIO というものを使うとソケットを始めとするFDの読み書きは その可否にかかわらずすぐに処理が次に移ります。
とはいえ、これではFDが使用可能かどうかわからないので監視が必要になります。
- info
- 監視という処理を愚直に考えるとwhile文でぐるぐる回してソケットの状態により分岐するような実装が思い浮かぶかもしれません。
- しかしこれではループが空回りすることでCPU使用率がかなり高くなってしまいます。
これは
ビジーループ
と呼ばれる手法で ノンブロッキングソケット - ソケットプログラミング HOWTO Python 3.7.3 ドキュメント では愚かな方法として紹介されています。
Python以外の言語であってもネットワークプログラミングをしたことがあれば FDの監視と聞けば、「あー、 selectでしょ」となるでしょう。概ね正解です。
Python3.4以降では、 selectors
と呼ばれるビルトインモジュールがあります。
これは select のラッパーのようなもので、OSから提供されている監視機能のうち最も効率的なものが自動的に選択されるといったすぐれものです。
イベントループ は、この selectors を用いて、FDの監視を行います。 監視自体はブロッキング処理なので、読み書き可能なFDがなければ処理はブロックされます。 ビジーループと違いCPUの消費も抑えられます。
ネットワーク通信を行う非同期処理では、いくつものソケットが selectors メソッドによって監視されます。
- info
- ネットワーク通信を行うときだけでなくイベントループ自身も一組のソケットを持っており、
ソケットのブロックにより
asyncio.sleep
機能を表現しています。 - イベントループはcall_laterで登録された実行予定なハンドルのうち実行予定時刻が最も古い(小さい)ものを取り出し
「現在時刻との差分」を「待ち時間」として、監視処理(
selectors.select
)のタイムアウトに指定するため、その時間分だけブロックされます。 これがasyncio.sleep
の正体です。 - ただし、すでに実行可能なハンドルがある場合は待ち時間を0とします。
- (最初は asyncio.sleep で作られた Future 1つにつき1つのソケットがあって selectors で監視してるのかと思ってましたが違いました..)
- ネットワーク通信を行うときだけでなくイベントループ自身も一組のソケットを持っており、
ソケットのブロックにより
- 該当コード
- base_events.BaseEventLoop
- proactor_events.BaseProactorEventLoop
- selector_events.BaseSelectorEventLoop
- unix_events._UnixSelectorEventLoop
selector_events.BaseSelectorEventLoop
を継承- unix_events.SelectorEventLoop と同じ
- windows_events._WindowsSelectorEventLoop
selector_events.BaseSelectorEventLoop
を継承- windows_events.SelectorEventLoop と同じ
- windows_events.ProactorEventLoop
proactor_events.BaseProactorEventLoop
を継承
Task
Taskは一つのコルーチンに紐づく結果オブジェクトです。 Futureを継承して定義されているので役割はかなり似ています。(当然 Awaitableです)
Future が 結果や終了ステータスを自分で格納する必要があったのに対し、 Task はコルーチンに紐付き、コルーチンの結果と終了ステータスが自動的に格納されます。 逆に手動で結果を入れることが出来ません。
これでいちいちコールバック関数を書く必要がなくなりますね。
Task は loop.create_task(コルーチン)
のように作ります。
このメソッドはTaskオブジェクトを返却するので、必要に応じて別のコルーチンに渡すなどします。
>>> async def coro(seconds, value):
... await asyncio.sleep(seconds)
... return value + 100
>>> task1 = loop.create_task(coro(1, 1))
>>> task2 = loop.create_task(coro(2, 100))
>>> task3 = loop.create_task(coro(3, 301))
>>> loop.run_until_complete(asyncio.gather(task1, task2)) # あえてtask3は待たない
[101, 200]
>>> task1
<Task finished coro=<coro() done, defined at <stdin>:1> result=101>
>>> task2
<Task finished coro=<coro() done, defined at <stdin>:1> result=200>
>>> task3
<Task pending coro=<coro() running at <stdin>:2> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10eca9eb8>()]>>
次は適当な例外を起こしてみます。
>>> async def coro(seconds, value):
... await asyncio.sleep(seconds)
... if not value:
... raise ValueError(value)
... return value + 100
...
>>> task1 = loop.create_task(coro(1, 1))
>>> task2 = loop.create_task(coro(2, 0))
>>> loop.run_until_complete(asyncio.gather(task1, task2))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "<stdin>", line 4, in coro
ValueError: 0
>>> task1
<Task finished coro=<coro() done, defined at <stdin>:1> result=101>
>>> task2 # 例外が発生した場合も status は一応終了となる
<Task finished coro=<coro() done, defined at <stdin>:1> exception=ValueError(0)>
終了ステータスが自動的に格納されるとは言いましたが、 ステータスをキャンセルにしたい場合は自分で格納する必要があります。
今回は一定秒数経過後にキャンセルするだけのコルーチン cancel を作ってみました。
>>> async def cancel(seconds, task=None):
... await asyncio.sleep(seconds)
... if task:
... task.cancel()
...
>>> task2 = loop.create_task(cancel(2)) # task2 は 2秒後に終わるようにする
>>> task1 = loop.create_task(cancel(1, task2)) # task1 は 1秒後に終わって task2をキャンセルする
>>> loop.run_until_complete(asyncio.gather(task1, task2))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
concurrent.futures._base.CancelledError
>>> task1
<Task finished coro=<cancel() done, defined at <stdin>:1> result=None>
>>> task2
<Task cancelled coro=<cancel() done, defined at <stdin>:1>>
タスクでは cancel すると CancelledError
例外が送出されるようです。
- info
loop.create_task(coro)
とasyncio.ensure_future(coro_or_future, *, loop=None)
の違い- Python3.7 で
loop.create_task
が出現するまで、 タスクの生成にはasyncio.ensure_future
が使われていました。 ensure_future
なのに future じゃなくて taskなの?と思いますが、 実際に作られるのはタスクなんです。罠ですね。- Python3.7以降では
loop.create_task
を使うことが推奨されています。 - 実際に違いがあるとすると、
create_task
ではコルーチンしか受け取れないという点です。 - 当然
asyncio.gather
でコルーチンの結果をまとめた Future も受け取れません。 - タスクは一つのコルーチンに紐づくものなので、これは妥当なインターフェースだと個人的には思います。
- (3.7以降では)
ensure_future
は受け取った引数がコルーチンの場合は 内部的にcreate_task
に渡すだけです。 - create_task が呼ばれると
loop.call_soon
を使ってコルーチンを実行可能なスケジュールとしてループに登録します。 - なお、 コルーチン内の
await
文で別のコルーチンを呼び出す場合は、call_soon
を介さず直接呼び出されます。await
文がyield from
文だと思えば当然の挙動かも知れませんね。 - asyncio.create_task
- asyncio.ensure_future vs. BaseEventLoop.create_task vs. simple coroutine?
- Proposal: Rename ensure_future to create_task · Issue #477 · python/asyncio
最後に頭の中の呼び出しイメージ図的なもの(全部は無理なので重要っぽいところだけ)
graph TD
asyncio.sleep --> loop.call_later
asyncio.ensure_future --> |コルーチンなら| loop.create_task
subgraph Event loop
loop.create_task --> |coro.send| loop.call_soon
loop.call_later --> loop.call_at
loop.call_at --> |TimerHandle| loop._scheduled
loop.call_soon --> |Handle| loop._ready
loop._scheduled --> |TimerHandle.when <= 現在時間| loop._ready
end
loop._ready -->|Handle,TimerHandle| handle.run
- info
- 複数の Coroutine (Future) の終了を待つための関数として
asyncio.gather
とasyncio.wait
があります。 asyncio.gather
は awaitable なオブジェクトを可変長仮引数として複数指定すると、 それらをまとめ上げた Future オブジェクトを返却します。- 中のオブジェクトが完了すると Future は可変長仮引数に指定したのと同じ順番で Future オブジェクトに結果が格納されます。
-
>>> future = asyncio.gather(asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3)) >>> loop.run_until_complete(future) [1, 2, 3] >>> future <_GatheringFuture finished result=[1, 2, 3]>
- https://docs.python.org/ja/3/library/asyncio-task.html
asyncio.wait
も同様に awaitable なオブジェクトを複数受け取りますが、 可変長引数ではなく list や set などのシーケンスを指定します。- 返却値は
gather
とは違い Coroutine を返却します。 コルーチン自体の返却値は(done, pending)
のような set の tuple です。 - また、タイムアウト時間やコルーチンの終了条件は引数として指定できます。
-
>>> coro = asyncio.wait([asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3)], timeout=2.1) >>> done, pending = loop.run_until_complete(coro) >>> done {<Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=3>, <Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=2>} >>> pending {<Task pending coro=<sleep() running at /usr/lib64/python3.7/asyncio/tasks.py:568> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d23a198>()]>>}
- 今回の例では終了条件を指定していませんが、
return_when
引数には以下の文字列を指定することができます。 -
- FIRST_COMPLETED
- いずれかのフューチャが終了したかキャンセルされたときに返します。
-
- FIRST_EXCEPTION
- いずれかのフューチャが例外の送出で終了した場合に返します。例外を送出したフューチャがない場合は、ALL_COMPLETED と等価になります。
-
- ALL_COMPLETED
- すべてのフューチャが終了したかキャンセルされたときに返します。
- https://docs.python.org/ja/3/library/asyncio-task.html
- 複数の Coroutine (Future) の終了を待つための関数として
応用
ブロッキング関数を使いたい
ここまでの処理はすべてノンブロッキングな関数を使ってきましたが、 Pythonの関数は基本的にブロッキングですし、サードパーティライブラリでは asyncio に対応してないものも多いでしょう。
とはいえ、関数のコードを書き変えてコルーチンに変えるなどできるはずもありません。
そこで loop.run_in_executor
メソッドを使います。
これは multi threading / processing によって非同期実行を実現しています。
run_
から始まっているので loop.run_until_complete
などと混同してしまいそうですが、 これは Future を作るためのメソッドなので別物です。
以下の3例はいずれも 3秒後に同じ結果を出力します。
>>> import time
>>> import concurrent.futures
>>> def not_coro(seconds, value):
... time.sleep(seconds)
... return value + 100
...
# pool を指定しない
>>> loop.run_until_complete(asyncio.gather(
... loop.run_in_executor(None, not_coro, 1, 1),
... loop.run_in_executor(None, not_coro, 2, 20),
... loop.run_in_executor(None, not_coro, 3, 300),
... ))
[101, 120, 400]
# スレッドプール
>>> with concurrent.futures.ThreadPoolExecutor() as pool:
... loop.run_until_complete(asyncio.gather(
... loop.run_in_executor(pool, not_coro, 1, 1),
... loop.run_in_executor(pool, not_coro, 2, 20),
... loop.run_in_executor(pool, not_coro, 3, 300),
... ))
...
[101, 120, 400]
# プロセスプール
>>> with concurrent.futures.ProcessPoolExecutor() as pool:
... loop.run_until_complete(asyncio.gather(
... loop.run_in_executor(pool, not_coro, 1, 1),
... loop.run_in_executor(pool, not_coro, 2, 20),
... loop.run_in_executor(pool, not_coro, 3, 300),
... ))
...
[101, 120, 400]
返却されたオブジェクトは future オブジェクトなので他の ノンブロッキング関数と同じように await することができます。
第一引数には スレッドプールを指定しますが、 None
を指定することで省略できます。
省略した場合はそのたびに必要な数のスレッドが作られるようです。
最後の例のように、プロセスプールを指定することもできます。 これにより、CPUバウンドの処理を効率よく処理することができます。 (もちろんコア数が2つ以上の場合に限りますが)
イベントループのポリシー
イベントループにはポリシーというものが設定できます。
デフォルトでは以下のいずれかを使うかを選択できます。
- DefaultEventLoopPolicy
- WindowsProactorEventLoopPolicy
- 現状は Windows でしか使えない
- https://github.com/python/cpython/blob/3.7/Lib/asyncio/windows_events.py
詳しく調べてないけど IO多重化で監視するか IO完了ポートを監視するかってことのようですが、 現状後者は Windows でしか利用できません。
これらのクラスを継承してメソッドをオーバーライドして、 asyncio.set_event_loop_policy
関数でポリシー登録すれば loopオブジェクトの取得条件を変えたりできます。
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
loop = super().get_event_loop()
# Do something with loop ...
return loop
asyncio.set_event_loop_policy(MyEventLoopPolicy())
https://docs.python.org/3/library/asyncio-policy.html
今更ですが、 この記事では DefaultEventLoopPolicy
を使う前提で書いてます。
デバッグモード
asyncio のコード内では 適切にログを吐いているので、 デバッグモードにしてロギング設定を調整してあげれば、コンソールにデバッグ情報が表示され、問題解決に役立ちます。
-
環境変数
PYTHONASYNCIODEBUG=1
を設定した上で Python インタプリタを起動(実行)する -
ログレベルを
DEBUG
に設定するlogging.basicConfig(level=logging.DEBUG)
-
asyncio.run
を使う場合debug
引数にTrue
を指定する>>> asyncio.run(coro_func(), debug=True) DEBUG:asyncio:Using selector: KqueueSelector INFO:asyncio:poll 999.905 ms took 1004.213 ms: timeout DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
-
loop
オブジェクトを使う場合、loop.set_debug(True)
を実行する>>> loop = asyncio.get_event_loop() DEBUG:asyncio:Using selector: KqueueSelector >>> loop.set_debug(True) >>> loop.run_until_complete(coro_func()) INFO:asyncio:poll 999.749 ms took 1001.260 ms: timeout
ネットワーク通信
asyncio はネットワーク通信を扱うための機能もいくつかもちます。
あまり詳しくはやりませんが、少しだけサンプルを見ておきましょう。(もう気力がないので雑)
Transports and Protocols
loop.create_connection()
のような
イベントループの低レベルAPIから利用されるネットワーク通信を実装するためクラス郡です。
これらは ライブラリやフレームワークから呼ばれるようなものであり、 アプリケーションから呼ばれることは期待されていません。
ここではドキュメントに従って、 UDPの エコーサーバーのサンプルを試してみましょう。
- Server
- Client
-
>>> import asyncio >>> >>> >>> class EchoServerProtocol: ... def connection_made(self, transport): ... self.transport = transport ... ... def datagram_received(self, data, addr): ... message = data.decode() ... print('Received %r from %s' % (message, addr)) ... print('Send %r to %s' % (message, addr)) ... self.transport.sendto(data, addr) ... >>> >>> async def main(): ... print("Starting UDP server") ... ... # Get a reference to the event loop as we plan to use ... # low-level APIs. ... loop = asyncio.get_running_loop() ... ... # One protocol instance will be created to serve all ... # client requests. ... transport, protocol = await loop.create_datagram_endpoint( ... lambda: EchoServerProtocol(), ... local_addr=('127.0.0.1', 9999)) ... ... try: ... await asyncio.sleep(3600) # Serve for 1 hour. ... finally: ... transport.close() ...
-
>>> import asyncio >>> >>> >>> class EchoClientProtocol: ... def __init__(self, message, loop): ... self.message = message ... self.loop = loop ... self.transport = None ... self.on_con_lost = loop.create_future() ... ... def connection_made(self, transport): ... self.transport = transport ... print('Send:', self.message) ... self.transport.sendto(self.message.encode()) ... ... def datagram_received(self, data, addr): ... print("Received:", data.decode()) ... print("Close the socket") ... self.transport.close() ... ... def error_received(self, exc): ... print('Error received:', exc) ... ... def connection_lost(self, exc): ... print("Connection closed") ... self.on_con_lost.set_result(True) ... >>> >>> async def main(): ... # Get a reference to the event loop as we plan to use ... # low-level APIs. ... loop = asyncio.get_running_loop() ... ... message = "Hello World!" ... transport, protocol = await loop.create_datagram_endpoint( ... lambda: EchoClientProtocol(message, loop), ... remote_addr=('127.0.0.1', 9999)) ... ... try: ... await protocol.on_con_lost ... finally: ... transport.close() ...
-
>>> asyncio.run(main()) Starting UDP server Received 'Hello World!' from ('127.0.0.1', 58632) Send 'Hello World!' to ('127.0.0.1', 58632)
-
>>> asyncio.run(main()) Send: Hello World! Received: Hello World! Close the socket Connection closed
Stream
上述した Transport と Protocol を使った上位実装がこちらの Stream です。
ここでもドキュメントに従い 単純な TCPの EchoServer と EchoClient を作ってみることにしましょう。
(少しいじりました)
- Server
- Client
-
>>> import asyncio >>> import time >>> async def handle_echo(reader, writer): ... data = await reader.read(100) ... message = data.decode() ... addr = writer.get_extra_info('peername') ... await asyncio.sleep(5) # 5秒待つ ... writer.write(data) ... print(f"Sent {message!r} back to {addr!r}") ... await writer.drain() ... writer.close() ... >>> async def main(): ... server = await asyncio.start_server(handle_echo, '127.0.0.1', 7777) ... addr = server.sockets[0].getsockname() ... async with server: ... await server.serve_forever()
-
>>> import asyncio >>> async def send(message): ... reader, writer = await asyncio.open_connection('127.0.0.1', 7777) ... writer.write(message.encode()) ... print(f'Sending {message}') ... data = await reader.read(100) ... print(f'Received: {data.decode()!r}') ... writer.close() ... await writer.wait_closed() ...
-
>>> asyncio.run(main()) # . # . # . # . # . # . # . # . # 5秒後 Sent 'h' back to ('127.0.0.1', 58298) Sent 'e' back to ('127.0.0.1', 58299) Sent 'l' back to ('127.0.0.1', 58300) Sent 'l' back to ('127.0.0.1', 58301) Sent 'o' back to ('127.0.0.1', 58302)
-
>>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(asyncio.gather( ... send('h'), send('e'), send('l'), send('l'), send('o'))) Sending h Sending e Sending l Sending l Sending o # 5秒後 Received: 'h' Received: 'e' Received: 'l' Received: 'l' Received: 'o'
重要なのは サーバー側の ハンドル用コルーチン関数では
reader
, writer
引数を受け取り、クライアント側ではコルーチン内で自分で生成します。
サーバ、クライアントはこれらオブジェクトを通じて、 お互いのタイミングでソケットの読み書きをしてあげます。
終わりに
バージョンを重ねるに連れ機能がだんだんと充実してきて、Pythonで非同期処理を書く土壌が整ってきましたね。
以前は Twisted などのサードパーティライブラリを用いるのが通例でしたが、 これからは asyncio をベースとしたライブラリがたくさん生まれてくるでしょう。
冒頭でも言いましたが、たくさんの記事にお世話になりました。 記事をレビューしてくれた匿名の方と @shimizukawa にも感謝。
ありがとうございました。
参考
- Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと
- Pythonで同期関数と非同期関数を統一的に扱う | Hatch tech blog
- pythonのasyncioでrun_in_executor()を使ってもブロックしてるように見えて上手く処理を逃がせないと感じたとき。 - podhmo's diary
- 次世代標準非同期I/Oフレームワーク asyncio (Tulip) - methaneのブログ
- Pythonにおける非同期処理: asyncio逆引きリファレンス - Qiita
- Pythonの非同期I/O(asyncio)を試す
- Pythonで非同期処理(asyncio)- イベントオブジェクト | TomoSoft
- Pythonのasyncioで非同期にリクエストを飛ばす - sambaiz-net
- Goが他の多くの言語での非同期プログラミングよりも優れている理由
- Python asyncio: Future, Task and the Event Loop | Abu Ashraf Masnun
- Detailing Python's Coroutine out of History - Speaker Deck