[Python] 🐰 なんとなく理解するasyncio 🐢

2019-06-03

大変おまたせしました。 数年前からずっと書く書くと詐欺してきた asycnio の記事です。

日本語のasyncio関連のブログエントリは大体読んだ気がします。(英語の記事も少し)

リファレンスが多いので、 ページ末 に参考URLをリンクしてあります。詳しくはそちらも参照ください。

備考

asyncio はいろいろな概念が同居しており、学習コストが高めです。

最低限ジェネレータの概念は必要ですので、わからないという方は先にこちらからどうぞ。

[Python] 部屋とYシャツとイテレータとジェネレータと私 (何だこのクソみたいなタイトル)

なお、この記事では Python 3.7 を使います。 3.7以上でしか使えないメソッドもあるので、写経するときはバージョンには気をつけてください。

目次

概要 (Overview)

asyncio は Python3.4 から導入された 非同期処理を扱うための標準ライブラリです。

勘違いしてはいけないのは あくまでシングルスレッド下で動作するもので、 マルチスレッドやマルチプロセスのような並行処理ではないということです。

GIL の制限により完全に並列できないとはいえ、マルチスレッドでは別スレッドに干渉することなく処理はほぼ同時に並行してつき進んでいきます。 コールバック関数やグローバル変数などを使って同期することはできますが、スレッド達はお互いの存在なんて知ったこっちゃありません。

これに対し asyncio は 基本的にシングルスレッドで動作し、試行した複数の処理は 順次に処理され、 ネットワーク等のIO待ち時間にぶつかると実行可能な別の処理を実行する、といった具合に無駄なIO待ち時間を有効活用すべく、別の処理を割り当てます。

gantt title asyncioの割当イメージ(単位に特別な意味はない dateFormat YYYY-MM-DD section 処理1 CPU時間 :crit, a1, 2000-01-01, 1d IO時間 :active, a2, after a1, 3d CPU時間 :crit, a3, after c1, 2d IO時間 :active, a4, after a3, 4d CPU時間 :crit, a5, after a4, 1d section 処理2 CPU時間 :crit, b1, 2000-01-02, 2d IO時間 :active, b2, after b1, 2d CPU時間 :crit, b3, after a3, 1d IO時間 :active, b4, after b3, 4d section 処理3 CPU時間 :crit, c1, 2000-01-04, 2d IO時間 :active, c2, after c1, 3d CPU時間 :crit, c3, after b3, 2d IO時間 :active, c4, after c3, 3d
gantt title 直列に処理すると.. dateFormat YYYY-MM-DD section 処理1 CPU時間 :crit, a1, 2000-01-01, 1d IO時間 :active, a2, after a1, 3d CPU時間 :crit, a3, after a2, 2d IO時間 :active, a4, after a3, 4d CPU時間 :crit, a5, after a4, 1d section 処理2 CPU時間 :crit, b1, after a5, 2d IO時間 :active, b2, after b1, 2d CPU時間 :crit, b3, after b2, 1d IO時間 :active, b4, after b3, 4d section 処理3 CPU時間 :crit, c1, after b4, 2d IO時間 :active, c2, after c1, 3d CPU時間 :crit, c3, after c2, 2d IO時間 :active, c4, after c3, 3d

このため、asyncio は (基本的に) ネットワーク通信等の IO時間が長くてブロックされるような処理では 効率化により 高速化が望めますが、 CPU時間が長い処理では 実行完了を待ち続けることとなり、 効率化されないため 高速化は望めません。

主な登場人物 (Main characters)

さて、ここで asyncio を使う上で重要な機能を見ていくことにしましょう。

Future

処理の結果を格納するためのオブジェクトです。

JavaScript を触ったことがある方なら Promise といえばわかるかもしれませんね。

これは Future_パターン という並行処理におけるデザインパターンの asyncio の実装です。

結果オブジェクトだけを先に生成し、 処理が完了した段階で結果を格納することで処理が完了タイミングを気にせず実装ができます。

Future には 「結果」と「ステータス」という属性があり、コールバック関数などを使って明示的に格納してあげます。

ステータスは 「pending」「finished」「cancelled」の3種類があり、 初期状態は「pending」で、終了状態は「finished」か「cancelled」のいずれかです。

警告

空のFuture オブジェクトは loop オブジェクトの create_future オブジェクトを使って生成することが推奨されています。 Futureクラスを直に触るのはやめましょう。

finishedにする cancelledにする

ステータスを finished にする場合は、 future.set_result というメソッドを使います。 このメソッドにより結果を格納されると自動的に futureオブジェクトのステータスは finished (終了) に移行します。

値を格納せずにステータスを finished に変えることはできないようです。

ステータスを cancel に変更する場合は future.cancel というメソッドを使います。 これにより futureオブジェクトのステータスは cancelled (キャンセル) に移行します。

値を格納することはできないようです。

>>> future = loop.create_future()
>>> future.set_result(100)

>>> future
<Future finished result=100>
>>> future.done()
True
>>> future.cancel()  # 一度終了したらキャンセルは出来ない
False
>>> future.cancelled()  # キャンセルにならない
False
>>> future = loop.create_future()
>>> future.cancel()
True
>>> future
<Future cancelled>
>>> future.done()  # キャンセルも一応終了と言えるので 真となる
True
>>> future.cancelled()  # キャンセルになる
True
>>> future.set_result(100)  # 一度キャンセルしたら結果を格納できない
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
asyncio.base_futures.InvalidStateError: invalid state
該当コード:

Coroutine

備考

ここで説明するコルーチンとはあくまで asyncio専用のコルーチンであり、 他言語のコルーチンの定義と必ずしも一致するものではありません。

コルーチンは asyncio で実行する処理のことです。 そして コルーチンを宣言するための文が async 文です。

小難しい単語が出てきましたが、コルーチンの実態は概ねジェネレータです。

Python3.3 (PEP380) で yield from 文という構文が登場しました。 詳しくはサブジェネレータへの委譲と言うそうです。(iterableならなんでも受け付けるんですがそれは..)

さて、「また新しいのが出てきた..」と思ったかもしれませんが、そこまで身構える必要はありません。

ドキュメントにも書いてあるとおり、 yield from iterable は for item in iterable: yield item とほぼ同じ意味です。

ここで重要なのは「指定されたオブジェクトの要素をすべてイテレーションする」というところです。 つまり ここにジェネレータが指定された場合(以降サブジェネレータという)、サブジェネレータのイテレーションがすべて済んでから 次の処理に進むことになります。

この仕組みによって コルーチンは「待つ(await)」を実現しています。(ブロッキングについては後述)

コルーチンはその性質により、「イテレーション自体の返却値(yield)」が使われないため、 仕組みとしてはジェネレータと同じでも目的は大きく異なると言えます。

(コルーチン自体の返却値(return)は Future の結果として重要なので混同しないように注意)

実際 Python3.4 までは yield from 文が使われていましたが Python3.5 で登場した await 文に取って代わりました。

警告

yield from は for文で回しているのとほぼ同じ意味、と言いましたが それは内部的な話で若干の違いがあります。

  • yield from はコルーチンを受け取って処理できますが、 実はコルーチンはイテラブルではないので for文で回すことができません。
    • TypeError: 'coroutine' object is not iterable
  • yield from にサブジェネレータが指定された場合は、その返却値(return)が左辺に格納されます。

await 文に指定できるオブジェクトを Awaitable といいます。 Awaitable なオブジェクトはデフォルトで Coroutine と Future、Task(後述)の3つです。

もう少し詳しい話をするとスペシャルメソッドの __await__ が定義されているものとなりますが、 自分でクラスを定義したりサードパーティライブラリを導入しなければ気にする必要はないです。

備考

(ひとまず Task は放っておいて) await の挙動について

await文が未完了なFutureを受け取ると、そのコルーチンをブロックし、別の実行可能なコルーチンを実行します。 途中でFutureのステータスが完了に移行すると、ブロックが解除され コルーチンが再開します。

最初から 完了したFutureをawait文に指定された場合は普通にスルーされるだけです。

冒頭で IO待ち時間にぶつかると実行可能な別の処理を割り当てる といいましたが、 「IO待ち時間にぶつかるまで」というのが、 「await文が 未完了なFutureに遭遇するまで」ということになります。

await文がコルーチンを受け取ると、 上記のようにIO待ち時間にぶつかるまでコルーチンを呼び出しながら処理を繋いでいきます。

blockdiag coro1coro2coro3others処理1result = await coro2return resultreturn resultawait future (DONE)result = await coro3result = await future (PENDING)ブロックしてる間他の実行可能なコルーチンに処理が移るreturn result

なお、未完了なFutureは loopのコールバックや別のコルーチンの中で(Futureが)操作され ステータスが終了することを想定しています。

少しつながってきましたね。

警告

asyncio の関数ではコルーチンを引数に受け取るものがありますが、 その場合に指定するのは「コルーチンオブジェクト」であって、「コルーチン関数」ではないので注意してください。

コルーチン関数を実行した返却値がコルーチンオブジェクトです。

async と await 文を使って、1秒待った後に何らかの文字を出力するコルーチン関数は以下のように書けます。

async def sleep_and_print(txt):
    await asyncio.sleep(1)
    print(txt)
Python3.4 以前

async / await 文は Python3.5から新たに登場した予約語です。

Python3.4のときは次のようにしてジェネレータとしてうまく処理していたのです。

async:

@asyncio.coroutine デコレータでジェネレータ関数を囲む。(Python3.10でなくなるので使わないこと)

@asyncio.coroutine
def hello_world():
    print("Hello World!")
await:

yield from 文を使う。

@asyncio.coroutine
def sleep():
    yield from asyncio.sleep(1)

後方互換性維持のため、まだ yield from を使うこともできますが、 コルーチンの本来の用途から考えれば、 await 文を使うのが適切です。

なお、コルーチンで yield from が使えるのは asyncio.coroutine デコレータによって定義した場合のみで、 async 文を使って定義したコルーチンの中で yield from を使うと SyntaxError になります。

SyntaxError: 'yield from' inside async function

つまりこれもいずれは使えなくなるので新しいコードでは使わないことです。

該当コード:

Event loop

JavaScript では async 文を使って作成した非同期関数はそのまま実行することができましたが、 Python のコルーチンはそのまま関数として実行しても処理されません。コルーチンオブジェクトが生成されて終わりです。

これはコルーチンの実態がジェネレータだからというのが理由です。 ジェネレータはイテレーションを進めたときに初めて 最初のyield文まで 実行されますよね。

お察しの通り、このコルーチンを実行してくれる(イテレーションを進めてくれる)のがイベントループということになります。 IOの待ち時間には他の処理を割り当てるなどの裏方もやってくれる偉い子です。

備考

裏技というほどでもありませんが、 coro.send(None) とすることで、コルーチン単体でイテレーションを進めることができます。

決して推奨しているわけではないので覚える必要はありません。 デバッグ用なら asyncio.run(coro) するのがいいと思います。

なお、 None 以外を send するとエラーになります。 TypeError: can't send non-None value to a just-started coroutine

イベントループオブジェクトの取得方法 (How to get an event loop object)

loop の取得方法には 現状3つの方法があります

asyncio.get_event_loop()

get_event_loop で生成したイベントループは自動的には カレントイベントループ (以降カレントループという) としてマークされ、二回目以降は カレントループが取得されます。

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> loop2 = asyncio.get_event_loop()
>>> loop2 is loop

この記事のサンプルコードでは loop オブジェクトの定義を省略している箇所がありますが、 上記の方法で作成しているとお考えください。

とりあえずイベントループの作成はこれだけ知ってれば大丈夫です。

概要だけを知りたいという方は以下は読み飛ばしていいです。

警告

どうやら asyncio.run を実行してから get_event_loop を呼ぶと RuntimeError: There is no current event loop in thread 'MainThread'. が発生することがあるようです。

これが起きたときの対処法としては 作成済みの イベントループをカレントループとして登録してあげればOKです。

カレントループとして登録するには asyncio.set_event_loop(loop) を使います。

asyncio.get_running_loop()

Python3.7で追加されました。現在動いているイベントループがある場合はそれを返却し、 なければ RuntimeError が発生します。

おそらくコルーチンの中で所属しているイベントループを取るためのものでしょう。 これによりバケツリレーで loop オブジェクトを引き継ぐ必要がなくなります。

>>> async def check_loop():
...     await asyncio.sleep(1)
...     return loop is asyncio.get_running_loop()
...
>>> loop.run_until_complete(check_loop())
True

>>> asyncio.get_running_loop()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
RuntimeError: no running event loop
asyncio.new_event_loop()

既存のカレントループを無視して、新規のイベントループを作成します。

get_event_loop で作ったイベントループとは違って自動的にカレントループとはなりません。

>>> import asyncio
>>> loop = asyncio.get_event_loop()

# new_event_loop は必ず新規のイベントループを生成し、既存のイベントループとは重複しない
>>> loop2 = asyncio.new_event_loop()
>>> loop is loop2
False
>>> loop3 = asyncio.new_event_loop()
>>> loop3 is loop2
False
>>> loop3 is loop
False

# get_event_loop で取得したイベントループは同じく get_event_loop で取得したイベントループとのみ一致する
>>> loop4 = asyncio.get_event_loop()
>>> loop4 is loop3
False
>>> loop4 is loop2
False
>>> loop4 is loop
True

# set_event_loop をつかうと 特定のイベントループをカレントループに設定できる
>>> asyncio.set_event_loop(loop3)
>>> loop5 = asyncio.get_event_loop()
>>> loop5 is loop4
False
>>> loop5 is loop3
True

備考

set_event_loop という関数を使うことで 別のイベントループをカレントループとしてマークできます。

イベントループの動かし方 (How to make an event loop work)

イベントループに仕事をさせるための方法はいくつかあります。

asyncio.run

引数に指定した コルーチンが完了するまで実行します。 Python3.7で追加されました。

内部的には新規の loop オブジェクトを作り 後述する loop.run_until_complete を呼び出すだけのショートカットのようなものです。

注意点は以下の2点です

  • 新規のイベントループが生成される
  • 引数にはコルーチンしか受けとらない
>>> async def coro(seconds, value):
...     await asyncio.sleep(seconds)
...     return value + 100
...
>>> asyncio.run(coro(1, 100))  # コルーチンの値がそのまま返却される
200

>>> future = asyncio.gather(coro(1, 1), coro(2, 2))  # gather でまとめてもだめ
>>> asyncio.run(future)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 37, in run
    raise ValueError("a coroutine was expected, got {!r}".format(main))
ValueError: a coroutine was expected, got <_GatheringFuture pending>

備考

複数の Coroutine (Future) の終了を待つための関数として asyncio.gather と asyncio.wait があります。

asyncio.gather は awaitable なオブジェクトを可変長仮引数として複数指定すると、 それらをまとめ上げた Future オブジェクトを返却します。

中のオブジェクトが完了すると Future は可変長仮引数に指定したのと同じ順番で Future オブジェクトに結果が格納されます。

>>> future = asyncio.gather(asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3))
>>> loop.run_until_complete(future)
[1, 2, 3]
>>> future
<_GatheringFuture finished result=[1, 2, 3]>

Ref: https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.gather

asyncio.wait も同様に awaitable なオブジェクトを複数受け取りますが、 可変長引数ではなく list や set などのシーケンスを指定します。

返却値は gather とは違い Coroutine を返却します。 コルーチン自体の返却値は (done, pending) のような set の tuple です。 また、タイムアウト時間やコルーチンの終了条件は引数として指定できます。

>>> coro = asyncio.wait([asyncio.sleep(3, 1), asyncio.sleep(2, 2), asyncio.sleep(1, 3)], timeout=2.1)
>>> done, pending = loop.run_until_complete(coro)
>>> done
{<Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=3>, <Task finished coro=<sleep() done, defined at /usr/lib64/python3.7/asyncio/tasks.py:555> result=2>}
>>> pending
{<Task pending coro=<sleep() running at /usr/lib64/python3.7/asyncio/tasks.py:568> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d23a198>()]>>}

今回の例では終了条件を指定していませんが、 return_when 引数には以下の文字列を指定することができます。

FIRST_COMPLETED:
 

いずれかのフューチャが終了したかキャンセルされたときに返します。

FIRST_EXCEPTION:
 

いずれかのフューチャが例外の送出で終了した場合に返します。例外を送出したフューチャがない場合は、ALL_COMPLETED と等価になります。

ALL_COMPLETED:

すべてのフューチャが終了したかキャンセルされたときに返します。

Ref: https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.wait

loop.run_forever

永久に実行し続けます。実行する処理がない場合はずっとブロックします。サーバー用途かな。

コルーチンでループをストップしない限り止まりません。 このメソッドは引数を受け取らないので実行対象の処理はコールバック関数を loop に設定するか、 create_task メソッド (Taskで後述) で 登録してあげる必要があります。

>>> async def coro(seconds, value):
...     await asyncio.sleep(seconds)
...     return value + 100
...
>>> loop = asyncio.get_event_loop()
>>> loop.call_later(10, lambda l: l.stop(), loop)  # 10秒経ったら loop をストップする
<TimerHandle when=10.638105535 <lambda>(<_UnixSelecto...e debug=False>) at <stdin>:1>
>>> task1 = loop.create_task(coro(1, 1))
>>> task2 = loop.create_task(coro(3, 3))
>>> loop.run_forever()  # ここで10秒ブロックされる

>>> task1
<Task finished coro=<coro() done, defined at <stdin>:1> result=101>
>>> task2
<Task finished coro=<coro() done, defined at <stdin>:1> result=103>
loop.run_until_complete

指定した awaitable なオブジェクトが終了するまで実行します。

>>> async def coro(seconds, value):
...     await asyncio.sleep(seconds)
...     return value + 100
...
>>> loop.run_until_complete(coro(1, 10))
110
>>> loop.run_until_complete(asyncio.gather(coro(1, 10), coro(2, 20), coro(3, 30)))
[110, 120, 130]
>>> future = loop.create_future()
>>> loop.run_until_complete(future)
# future は 初期状態がpending なのでここで止まるが本来は callback やコルーチンの中でステータスを変える

以下はそれぞれの終了条件と返却値です

コルーチン:

最後まで実行すると終了する。

コルーチン関数の返却値がそのまま返却値となる

Future:

ステータスが完了になると終了する(Taskも同じ)

結果(result)が返却値となる

Handle and call_* methods

ここのセクションはわからなければスルーでOKです。

loop は call_later, call_at, call_soon と呼ばれるメソッドを持ち、 その名の通り、前2つは時間を条件としてコールバック関数を遅延呼び出しし、 call_soon は即座にコールバック関数を呼び出します。

備考

call_at は 実行予定時間を「Pythonインタプリタが起動してからの時間(秒)」で指定します。 これは time.monotonic() を使って求められます。

call_later は現在時間に待ち秒数を加えて call_at を呼び出しているだけです。 ソースコードを確認しましたが call_later 以外に call_at を呼び出している機能はなさそうです。 (起動時間はインターフェースとして使いづらいよね..)

与えられたコールバック関数は Handle または TimerHandle と呼ばれるラッパークラスで包まれ、条件を満たしたときに発火します。

  • call_later, call_at は TimerHandle
  • call_soon は Handle

TimerHandle は 実行予定時間を元に大小比較ができるようになっています。 heapq.heapfy という関数を使って最小の TimerHandle を「直近に実行予定のコールバック」として取得し実行しています。

コルーチン内で呼び出された await asyncio.sleep(seconds) がどのような処理をたどるか見てみましょう。

  • future を作る
  • 所属するloop オブジェクトの call_later を呼び出す
    • call_later が呼び出した call_at により TimerHandle オブジェクトが作られ、実行予定の TimerHandle として loopオブジェクトの内部に保存する (loop._scheduled)
    • 実行予定の TimerHandle のうち直近(最小)のものを取得し、実行予定時刻と現在時刻の差分をタイムアウト秒数として selectors.select の引数に指定して呼び出して処理をブロックする
      • この時点で実行可能な TimerHandle がある場合は待たされずにそちらが実行される
    • 実行予定の TimerHandle のうち、実行予定時刻が現在時刻よりも前のものを 実行可能な TimerHandle として loopオブジェクトの内部に保存する (loop._ready)
      • このとき 実行予定リスト (loop._scheduled) からは削除される
    • 実行可能な TimerHandle を順に抽出し、紐付けられたコールバック関数を実行
      • コールバック関数により future のステータスが終了となる
  • future を await する
該当コード:

IO multiplexing

ノンブロッキングモードのファイルディスクリプタ(以降FDという)を複数用意して監視することをIO多重化と言います。

Pythonの処理は基本的にブロッキングです。 ネットワークIOの時間を他の処理に割り当てたいと言っても、ブロッキングIO(ソケット)を用いている処理では通信時間は無駄に待たされてしまいます。

これに対し、ノンブロッキングIO というものを使うとソケットを始めとするFDの読み書きは その可否にかかわらずすぐに処理が次に移ります。

とはいえ、これではFDが使用可能かどうかわからないので監視が必要になります。

備考

監視という処理を愚直に考えると while文でぐるぐる回してソケットの状態により分岐するような実装が思い浮かぶかもしれません。

しかしこれではループが空回りすることでCPU使用率がかなり高くなってしまいます。 これは ビジーループ と呼ばれる手法で ノンブロッキングソケット - ソケットプログラミング HOWTO Python 3.7.3 ドキュメント では愚かな方法として紹介されています。

Python以外の言語であってもネットワークプログラミングをしたことがあれば FDの監視と聞けば、「あー、 selectでしょ」となるでしょう。概ね正解です。

Python3.4以降では、 selectors と呼ばれる ビルトインモジュールがあります。 これは select のラッパーのようなもので、OSから提供されている監視機能のうち最も効率的なものが自動的に選択されるといったすぐれものです。

イベントループ は、この selectors を用いて、FDの監視を行います。 監視自体はブロッキング処理なので、読み書き可能なFDがなければ処理はブロックされます。 ビジーループと違いCPUの消費も抑えられます。

ネットワーク通信を行う非同期処理では、いくつものソケットが selectors メソッドによって監視されます。

備考

ネットワーク通信を行うときだけでなく イベントループ自身も一組のソケットを持っており、 ソケットのブロックにより asyncio.sleep 機能を表現しています。

イベントループはcall_laterで登録された実行予定なハンドルのうち実行予定時刻が最も古い(小さい)ものを取り出し 「現在時刻との差分」を「待ち時間」として、監視処理(selectors.select)のタイムアウトに指定するため、その時間分だけブロックされます。 これが asyncio.sleep の正体です。

ただし、すでに実行可能なハンドルがある場合は待ち時間を0とします。

(最初は asyncio.sleep で作られた Future 1つにつき1つのソケットがあって selectors で監視してるのかと思ってましたが違いました..)

該当コード:

Task

Taskは一つのコルーチンに紐づく結果オブジェクトです。 Future を継承して定義されているので役割はかなり似ています。(当然 Awaitable です)

Future が 結果や終了ステータスを自分で格納する必要があったのに対し、 Task は コルーチンに紐付き、コルーチンの結果と終了ステータスが自動的に格納されます。 逆に手動で結果を入れることが出来ません。

これでいちいちコールバック関数を書く必要がなくなりますね。

Task は loop.create_task(コルーチン) のように作ります。 このメソッドは Task オブジェクトを返却するので、必要に応じて別のコルーチンに渡すなどします。

>>> async def coro(seconds, value):
...     await asyncio.sleep(seconds)
...     return value + 100

>>> task1 = loop.create_task(coro(1, 1))
>>> task2 = loop.create_task(coro(2, 100))
>>> task3 = loop.create_task(coro(3, 301))
>>> loop.run_until_complete(asyncio.gather(task1, task2))  # あえてtask3は待たない
[101, 200]
>>> task1
<Task finished coro=<coro() done, defined at <stdin>:1> result=101>
>>> task2
<Task finished coro=<coro() done, defined at <stdin>:1> result=200>
>>> task3
<Task pending coro=<coro() running at <stdin>:2> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10eca9eb8>()]>>

次は適当な例外を起こしてみます。

>>> async def coro(seconds, value):
...     await asyncio.sleep(seconds)
...     if not value:
...         raise ValueError(value)
...     return value + 100
...
>>> task1 = loop.create_task(coro(1, 1))
>>> task2 = loop.create_task(coro(2, 0))

>>> loop.run_until_complete(asyncio.gather(task1, task2))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "<stdin>", line 4, in coro
ValueError: 0
>>> task1
<Task finished coro=<coro() done, defined at <stdin>:1> result=101>
>>> task2  # 例外が発生した場合も status は一応終了となる
<Task finished coro=<coro() done, defined at <stdin>:1> exception=ValueError(0)>

終了ステータスが自動的に格納されるとは言いましたが、 ステータスをキャンセルにしたい場合は自分で格納する必要があります。

今回は一定秒数経過後にキャンセルするだけのコルーチン cancel を作ってみました。

>>> async def cancel(seconds, task=None):
...     await asyncio.sleep(seconds)
...     if task:
...         task.cancel()
...
>>> task2 = loop.create_task(cancel(2))  # task2 は 2秒後に終わるようにする
>>> task1 = loop.create_task(cancel(1, task2))  # task1 は 1秒後に終わって task2をキャンセルする
>>> loop.run_until_complete(asyncio.gather(task1, task2))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError

>>> task1
<Task finished coro=<cancel() done, defined at <stdin>:1> result=None>
>>> task2
<Task cancelled coro=<cancel() done, defined at <stdin>:1>>

タスクでは cancel すると CancelledError 例外が送出されるようです。

備考

loop.create_task(coro) と asyncio.ensure_future(coro_or_future, *, loop=None) の違い

Python3.7 で loop.create_task が出現するまで、 タスクの生成には asyncio.ensure_future が使われていました。

ensure_future なのに future じゃなくて taskなの?と思いますが、 実際に作られるのはタスクなんです。罠ですね。

Python3.7以降では loop.create_task を使うことが推奨されています。

実際に違いがあるとすると、 create_task では コルーチンしか受け取れないという点です。 当然 asyncio.gather でコルーチンの結果をまとめた Future も受け取れません。

タスクは一つのコルーチンに紐づくものなので、これは妥当なインターフェースだと個人的には思います。

(3.7以降では) ensure_future は受け取った引数がコルーチンの場合は 内部的に create_task に渡すだけです。

create_task が呼ばれると loop.call_soon を使って コルーチンを実行可能なスケジュールとしてループに登録します。

なお、 コルーチン内の await 文で別のコルーチンを呼び出す場合は、 call_soon を介さず直接呼び出されます。 await 文が yield from 文だと思えば当然の挙動かも知れませんね。

最後に頭の中の呼び出しイメージ図的なもの(全部は無理なので重要っぽいところだけ)

graph TD asyncio.sleep --> loop.call_later asyncio.ensure_future --> |コルーチンなら| loop.create_task subgraph Event loop loop.create_task --> |coro.send| loop.call_soon loop.call_later --> loop.call_at loop.call_at --> |TimerHandle| loop._scheduled loop.call_soon --> |Handle| loop._ready loop._scheduled --> |TimerHandle.when <= 現在時間| loop._ready end loop._ready -->|Handle,TimerHandle| handle.run
該当コード:

応用

ブロッキング関数を使いたい (using non-blocking functions)

ここまでの処理はすべてノンブロッキングな関数を使ってきましたが、 Pythonの関数は基本的にブロッキングですし、サードパーティライブラリでは asyncio に対応してないものも多いでしょう。

とはいえ、関数のコードを書き変えてコルーチンに変えるなどできるはずもありません。

そこで loop.run_in_executor メソッドを使います。 これは multi threading / processing によって非同期実行を実現しています。

run_ から始まっているので loop.run_until_complete などと混同してしまいそうですが、 これは Future を作るためのメソッドなので別物です。

以下の3例はいずれも 3秒後に同じ結果を出力します。

>>> import time
>>> import concurrent.futures
>>> def not_coro(seconds, value):
...     time.sleep(seconds)
...     return value + 100
...

# pool を指定しない
>>> loop.run_until_complete(asyncio.gather(
...   loop.run_in_executor(None, not_coro, 1, 1),
...   loop.run_in_executor(None, not_coro, 2, 20),
...   loop.run_in_executor(None, not_coro, 3, 300),
... ))
[101, 120, 400]

# スレッドプール
>>> with concurrent.futures.ThreadPoolExecutor() as pool:
...   loop.run_until_complete(asyncio.gather(
...     loop.run_in_executor(pool, not_coro, 1, 1),
...     loop.run_in_executor(pool, not_coro, 2, 20),
...     loop.run_in_executor(pool, not_coro, 3, 300),
...   ))
...
[101, 120, 400]

# プロセスプール
>>> with concurrent.futures.ProcessPoolExecutor() as pool:
...   loop.run_until_complete(asyncio.gather(
...     loop.run_in_executor(pool, not_coro, 1, 1),
...     loop.run_in_executor(pool, not_coro, 2, 20),
...     loop.run_in_executor(pool, not_coro, 3, 300),
...   ))
...
[101, 120, 400]

返却されたオブジェクトは future オブジェクトなので他の ノンブロッキング関数と同じように await することができます。

第一引数には スレッドプールを指定しますが、 None を指定することで省略できます。 省略した場合はそのたびに必要な数のスレッドが作られるようです。

最後の例のように、プロセスプールを指定することもできます。 これにより、CPUバウンドの処理を効率よく処理することができます。 (もちろんコア数が2つ以上の場合に限りますが)

Event loop policy

イベントループにはポリシーというものが設定できます。

デフォルトでは以下のいずれかを使うかを選択できます。

詳しく調べてないけど IO多重化で監視するか IO完了ポートを監視するかってことのようですが、 現状後者は Windows でしか利用できません。

これらのクラスを継承してメソッドをオーバーライドして、asyncio.set_event_loop_policy 関数でポリシー登録すれば loopオブジェクトの取得条件を変えたりできます。

class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        loop = super().get_event_loop()
        # Do something with loop ...
        return loop

asyncio.set_event_loop_policy(MyEventLoopPolicy())

( https://docs.python.org/3/library/asyncio-policy.html#custom-policies )

今更ですが、 この記事では DefaultEventLoopPolicy を使う前提で書いてます。

Debug mode

asyncio のコード内では 適切にログを吐いているので、 デバッグモードにしてロギング設定を調整してあげれば、コンソールにデバッグ情報が表示され、問題解決に役立ちます。

  • 環境変数 PYTHONASYNCIODEBUG=1 を設定した上で Python インタプリタを起動(実行)する

  • ログレベルを DEBUG に設定する

    • logging.basicConfig(level=logging.DEBUG)
  • asyncio.run を使う場合 debug 引数に True を指定する

    >>> asyncio.run(coro_func(), debug=True)
    DEBUG:asyncio:Using selector: KqueueSelector
    INFO:asyncio:poll 999.905 ms took 1004.213 ms: timeout
    DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
    
  • loop オブジェクトを使う場合、 loop.set_debug(True) を実行する

    >>> loop = asyncio.get_event_loop()
    DEBUG:asyncio:Using selector: KqueueSelector
    >>> loop.set_debug(True)
    >>> loop.run_until_complete(coro_func())
    INFO:asyncio:poll 999.749 ms took 1001.260 ms: timeout
    
  • https://docs.python.org/ja/3/library/asyncio-dev.html#asyncio-debug-mode

Network communication

asyncio はネットワーク通信を扱うための機能もいくつかもちます。

あまり詳しくはやりませんが、少しだけサンプルを見ておきましょう。(もう気力がないので雑)

Transports and Protocols

loop.create_connection() のような イベントループの低レベルAPIから利用されるネットワーク通信を実装するためクラス郡です。

これらは ライブラリやフレームワークから呼ばれるようなものであり、 アプリケーションから呼ばれることは期待されていません。

ここではドキュメントに従って、 UDPの エコーサーバーのサンプルを試してみましょう。

Server Client
>>> import asyncio
>>>
>>>
>>> class EchoServerProtocol:
...     def connection_made(self, transport):
...         self.transport = transport
...
...     def datagram_received(self, data, addr):
...         message = data.decode()
...         print('Received %r from %s' % (message, addr))
...         print('Send %r to %s' % (message, addr))
...         self.transport.sendto(data, addr)
...
>>>
>>> async def main():
...     print("Starting UDP server")
...
...     # Get a reference to the event loop as we plan to use
...     # low-level APIs.
...     loop = asyncio.get_running_loop()
...
...     # One protocol instance will be created to serve all
...     # client requests.
...     transport, protocol = await loop.create_datagram_endpoint(
...         lambda: EchoServerProtocol(),
...         local_addr=('127.0.0.1', 9999))
...
...     try:
...         await asyncio.sleep(3600)  # Serve for 1 hour.
...     finally:
...         transport.close()
...
>>> import asyncio
>>>
>>>
>>> class EchoClientProtocol:
...     def __init__(self, message, loop):
...         self.message = message
...         self.loop = loop
...         self.transport = None
...         self.on_con_lost = loop.create_future()
...
...     def connection_made(self, transport):
...         self.transport = transport
...         print('Send:', self.message)
...         self.transport.sendto(self.message.encode())
...
...     def datagram_received(self, data, addr):
...         print("Received:", data.decode())
...         print("Close the socket")
...         self.transport.close()
...
...     def error_received(self, exc):
...         print('Error received:', exc)
...
...     def connection_lost(self, exc):
...         print("Connection closed")
...         self.on_con_lost.set_result(True)
...
>>>
>>> async def main():
...     # Get a reference to the event loop as we plan to use
...     # low-level APIs.
...     loop = asyncio.get_running_loop()
...
...     message = "Hello World!"
...     transport, protocol = await loop.create_datagram_endpoint(
...         lambda: EchoClientProtocol(message, loop),
...         remote_addr=('127.0.0.1', 9999))
...
...     try:
...         await protocol.on_con_lost
...     finally:
...         transport.close()
...
>>> asyncio.run(main())
Starting UDP server
Received 'Hello World!' from ('127.0.0.1', 58632)
Send 'Hello World!' to ('127.0.0.1', 58632)
>>> asyncio.run(main())
Send: Hello World!
Received: Hello World!
Close the socket
Connection closed

Stream

上述した Transport と Protocol を使った上位実装がこちらの Stream です。

ここでもドキュメントに従い 単純な TCPの EchoServer と EchoClient を作ってみることにしましょう。

(少しいじりました)

Server Client
>>> import asyncio
>>> import time

>>> async def handle_echo(reader, writer):
...     data = await reader.read(100)
...     message = data.decode()
...     addr = writer.get_extra_info('peername')
...     await asyncio.sleep(5)  # 5秒待つ
...     writer.write(data)
...     print(f"Sent {message!r} back to {addr!r}")
...     await writer.drain()
...     writer.close()
...

>>> async def main():
...     server = await asyncio.start_server(handle_echo, '127.0.0.1', 7777)
...     addr = server.sockets[0].getsockname()
...     async with server:
...         await server.serve_forever()
>>> import asyncio

>>> async def send(message):
...     reader, writer = await asyncio.open_connection('127.0.0.1', 7777)
...     writer.write(message.encode())
...     print(f'Sending {message}')
...     data = await reader.read(100)
...     print(f'Received: {data.decode()!r}')
...     writer.close()
...     await writer.wait_closed()
...
>>> asyncio.run(main())
# .
# .
# .
# .
# .
# .
# .
# .
# 5秒後
Sent 'h' back to ('127.0.0.1', 58298)
Sent 'e' back to ('127.0.0.1', 58299)
Sent 'l' back to ('127.0.0.1', 58300)
Sent 'l' back to ('127.0.0.1', 58301)
Sent 'o' back to ('127.0.0.1', 58302)
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(asyncio.gather(
...     send('h'), send('e'), send('l'), send('l'), send('o')))
Sending h
Sending e
Sending l
Sending l
Sending o

# 5秒後
Received: 'h'
Received: 'e'
Received: 'l'
Received: 'l'
Received: 'o'

重要なのは サーバー側の ハンドル用コルーチン関数では reader, writer 引数を受け取り、クライアント側ではコルーチン内で自分で生成します。

サーバ、クライアントはこれらオブジェクトを通じて、 お互いのタイミングでソケットの読み書きをしてあげます。

Conclusion (終わりに)

バージョンを重ねるに連れ機能がだんだんと充実してきて、Pythonで非同期処理を書く土壌が整ってきましたね。

以前は Twisted などのサードパーティライブラリを用いるのが通例でしたが、 これからは asyncio をベースとしたライブラリがたくさん生まれてくるでしょう。

冒頭でも言いましたが、たくさんの記事にお世話になりました。 記事をレビューしてくれた匿名の方にも感謝。

ありがとうございました。

参考