18.5.3. タスクとコルーチン

Source code: Lib/asyncio/tasks.py

Source code: Lib/asyncio/coroutines.py

18.5.3.1. コルーチン

asyncio と一緒に使うコルーチンは async def 文を使って実装するか、もしくは ジェネレータ を使って実装します。async def 型のコルーチンはPython 3.5の時に追加されました。3.5より前のバージョンのPythonをサポートする必要がなければ async def タイプのコルーチンが推奨されています。

必須というわけではありませんが、ジェネレータベースのコルーチンは @asyncio.coroutine でデコレートすべきです。 @asyncio.coroutine でデコレートすることで async def 型のコルーチンと互換性を持たせることができ、ドキュメントとしても役に立ちます。ジェネレータベースのコルーチンは yield 構文を使う代わりに、 PEP 380 で導入された yield from 構文を使います。

単語 "コルーチン" は単語 "ジェネレーター" のように、(関連はしていますが) 異なる 2 つの概念で使用されます:

  • コルーチンを定義した関数 (async def を使用するか @asyncio.coroutine でデコレートされた関数定義)。 曖昧さを避ける際は コルーチン関数 と呼びます (iscoroutinefunction()True を返します)。
  • コルーチン関数の呼び出しによって取得されたオブジェクト。このオブジェクトは、いつかは完了する計算または I/O 操作 (通常はその組み合わせ) を表します。曖昧さの解消が必要な場合はこれを コルーチンオブジェクト (iscoroutine()True を返す) と呼びます。

コルーチンができること:

  • result = await future or result = yield from future -- suspends the coroutine until the future is done, then returns the future's result, or raises an exception, which will be propagated. (If the future is cancelled, it will raise a CancelledError exception.) Note that tasks are futures, and everything said about futures also applies to tasks.
  • result = await coroutine or result = yield from coroutine -- wait for another coroutine to produce a result (or raise an exception, which will be propagated). The coroutine expression must be a call to another coroutine.
  • return expression -- produce a result to the coroutine that is waiting for this one using await or yield from.
  • raise exception -- raise an exception in the coroutine that is waiting for this one using await or yield from.

Calling a coroutine does not start its code running -- the coroutine object returned by the call doesn't do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method.

コルーチン (およびタスク) はイベントループが実行中の場合にのみ起動できます。

@asyncio.coroutine

Decorator to mark generator-based coroutines. This enables the generator use yield from to call async def coroutines, and also enables the generator to be called by async def coroutines, for instance using an await expression.

async def コルーチン自身をデコレートする必要はありません。

If the generator is not yielded from before it is destroyed, an error message is logged. See Detect coroutines never scheduled.

注釈

In this documentation, some methods are documented as coroutines, even if they are plain Python functions returning a Future. This is intentional to have a freedom of tweaking the implementation of these functions in the future. If such a function is needed to be used in a callback-style code, wrap its result with ensure_future().

18.5.3.1.1. 例: Hello World コルーチン

"Hello World" と表示するコルーチンの例:

import asyncio

async def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()

参考

The Hello World with call_soon() example uses the AbstractEventLoop.call_soon() method to schedule a callback.

18.5.3.1.2. 例: 現在の日時を表示するコルーチン

sleep() 関数を用いて現在の時刻を5秒間、毎秒表示するコルーチンの例:

import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

参考

The display the current date with call_later() example uses a callback with the AbstractEventLoop.call_later() method.

18.5.3.1.3. 例: コルーチンのチェーン

コルーチンをチェーンする例です:

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute()print_sum() にチェーンされます: print_sum() コルーチンは compute() が完了するまで待ちます。

この例のシーケンス図です:

../_images/tulip_coro.png

The "Task" is created by the AbstractEventLoop.run_until_complete() method when it gets a coroutine object instead of a task.

The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses AbstractEventLoop.call_later() to wake up the task in 1 second.

18.5.3.2. InvalidStateError

exception asyncio.InvalidStateError

操作はこの状態では許可されません。

18.5.3.3. TimeoutError

exception asyncio.TimeoutError

操作は与えられた期限を超えました。

注釈

この例外は組み込みの TimeoutError 例外とは異なります!

18.5.3.4. フューチャー

class asyncio.Future(*, loop=None)

このクラスは concurrent.futures.Futureほぼ 互換です。

相違点:

このクラスは スレッド安全ではありません

cancel()

フューチャとスケジュールされたコールバックをキャンセルします。

フューチャがすでに終了しているかキャンセルされていた場合 False を返し、そうでない場合フューチャの状態をキャンセルに変更し、コールバックをスケジュールし、True を返します。

cancelled()

フューチャがキャンセルされていた場合 True を返します。

done()

Return True if the future is done.

終了とは、結果が返された、例外が送出された、あるいはフューチャがキャンセルされたことを意味します。

result()

このフューチャが表す結果を返します。

フューチャがキャンセルされていた場合 CancelledError が送出されます。フューチャの結果がまだ利用できない場合 InvalidStateError が送出されます。フューチャが終了し例外の集合を持っていた場合その例外が送出されます。

exception()

このフューチャで設定された例外を返します。

例外 (例外が設定されていない場合は None) はフューチャが終了した場合のみ返されます。フューチャがキャンセルされていた場合 CancelledError が送出されます。フューチャがまだ終了していない場合 InvalidStateError が送出されます。

add_done_callback(fn)

フューチャが終了したときに実行するコールバックを追加します。

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon().

引数をコールバックに渡すには functools.partial を使います。例えば fut.add_done_callback(functools.partial(print, "Future:", flush=True))print("Future:", fut, flush=True) を呼びます。

remove_done_callback(fn)

"終了時に呼び出す" リストからコールバックのすべてのインスタンスを除去します。

除去されたコールバック数を返します。

set_result(result)

フューチャの終了をマークしその結果を設定します。

このメソッドが呼ばれたときにフューチャがすでに終了している場合、InvalidStateError を送出します。

set_exception(exception)

フューチャの終了をマークし例外を設定します。

このメソッドが呼ばれたときにフューチャがすでに終了している場合、InvalidStateError を送出します。

18.5.3.4.1. 例: run_until_complete() を使ったフューチャ

Futureコルーチン関数 を組み合わせた例:

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

The coroutine function is responsible for the computation (which takes 1 second) and it stores the result into the future. The run_until_complete() method waits for the completion of the future.

注釈

The run_until_complete() method uses internally the add_done_callback() method to be notified when the future is done.

18.5.3.4.2. 例: run_forever() を使ったフューチャ

上の例を Future.add_done_callback() メソッド使って制御フローを明示して書くこともできます:

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

この例では slow_operation()got_result() にリンクするために future を用いています。slow_operation() が終了したとき got_result() が結果と供に呼ばれます。

18.5.3.5. タスク

class asyncio.Task(coro, *, loop=None)

コルーチン の実行をスケジュールします: それをフューチャ内にラップします。タスクは Future のサブクラスです。

A task is responsible for executing a coroutine object in an event loop. If the wrapped coroutine yields from a future, the task suspends the execution of the wrapped coroutine and waits for the completion of the future. When the future is done, the execution of the wrapped coroutine restarts with the result or the exception of the future.

イベントループは協調スケジューリングを使用します: 1 つのイベントループは同時に 1 つのタスクのみ実行します。その他のタスクは、他のイベントループが異なるメソッドで実行されている場合に並列で実行されるかもしれません。タスクがフューチャの計算を待っている間、イベントループは新しいタスクを実行します。

タスクのキャンセルはフューチャのキャンセルとは異なります。cancel() はラップされたコルーチンに CancelledError を送出します。ラップされたコルーチンが CancelledError 例外を補足しなかった、あるいは CancelledError 例外を送出しなかった場合、cancelled() は常に True を返します。

未完のタスクが破棄された場合、それのラップされた コルーチン は完了しません。これはおそらくバグであり警告がログに記録されます: 未完のタスクの破棄 を参照してください。

Don't directly create Task instances: use the ensure_future() function or the AbstractEventLoop.create_task() method.

このクラスは スレッド安全ではありません

classmethod all_tasks(loop=None)

イベントループ loop のすべてのタスクの集合を返します。

デフォルトでは現在のイベントループの全タスクが返されます。

classmethod current_task(loop=None)

イベントループ内で現在実行中のタスクまたは None を返します。

デフォルトでは現在のイベントループの現在のタスクが返されます。

Task のコンテキスト内から呼び出されたのではない場合 None が返されます。

cancel()

このタスクのキャンセルを自身で要求します。

これは、イベントループを通して次のサイクルにおいてラップされたコルーチンに投入される CancelledError を準備します。コルーチンにはその後 try/except/finally を使用してクリーンアップするか要求を拒否する機会が与えられます。

Future.cancel() と異なり、これはタスクのキャンセルを保証しません: 例外が補足されそれが処理されることで、タスクのキャンセル処理が遅延したりキャンセル処理が完了しない場合があります。また、タスクは戻り値を返すか異なる例外を送出する場合もあります。

このメソッドが呼び出された直後は cancelled()True を返しません (タスクがすでにキャンセル済みの場合は除く)。ラップされたコルーチンが CancelledError で中止されたとき、タスクは (cancel() が呼ばれなかった場合でも) キャンセル済みとマークされます。

get_stack(*, limit=None)

このタスクのコルーチンのスタックフレームのリストを返します。

If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.

フレームは常に古いものから新しい物へ並んでいます。

任意の引数 limit には返すフレームの最大数を指定します; デフォルトでは有効なすべてのフレームが返されます。これは返される値がスタックかトレースバックかによって意味が変わります: スタックでは最新のフレームから返されますが、トレースバックでは最古のものから返されます。 (これは traceback モジュールの振る舞いと一致します。)

いかんともしがたい理由により、サスペンドされているコルーチンの場合スタックフレームが 1 個だけ返されます。

print_stack(*, limit=None, file=None)

このタスクのコルーチンのスタックあるいはトレースバックを出力します。

この出力は get_stack() によって回収されたフレームで、traceback モジュールのそれと同じです。引数 limit は get_stack() に渡されます。引数 file は出力を書き込む I/O ストリームです; デフォルトでは sys.stderr になります。

18.5.3.5.1. 例: タスクの並列実行

3 個のタスク (A, B, C) を並列に実行する例です:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
))
loop.close()

出力:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

タスクは作成されたときに実行を自動的にスケジュールされます。イベントループはすべてのタスクが終了したときに停止します。

18.5.3.6. タスク関数

注釈

In the functions below, the optional loop argument allows explicitly setting the event loop object used by the underlying task or coroutine. If it's not provided, the default event loop is used.

asyncio.as_completed(fs, *, loop=None, timeout=None)

その値のイテレーターか、待機中のときは Future インスタンスを返します。

全フューチャが終了する前にタイムアウトが発生した場合 asyncio.TimeoutError を送出します。

以下はプログラム例です:

for f in as_completed(fs):
    result = yield from f  # The 'yield from' may raise
    # Use result

注釈

フューチャ f は fs のメンバーである必要はありません。

asyncio.ensure_future(coro_or_future, *, loop=None)

コルーチンオブジェクト の実行をスケジュールします: このときフューチャにラップします。Task オブジェクトを返します。

引数が Future の場合、それが直接返されます。

バージョン 3.4.4 で追加.

バージョン 3.5.1 で変更: The function accepts any awaitable object.

参考

The AbstractEventLoop.create_task() method.

asyncio.async(coro_or_future, *, loop=None)

ensure_future() への非推奨なエイリアスです。

バージョン 3.4.4 で非推奨.

asyncio.wrap_future(future, *, loop=None)

Wrap a concurrent.futures.Future object in a Future object.

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

与えられたコルーチンオブジェクトあるいはフューチャからの結果を一つにまとめたフューチャを返します。

All futures must share the same event loop. If all the tasks are done successfully, the returned future's result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

キャンセル: 外側のフューチャがキャンセルされた場合、すべての (まだ完了していない) 子プロセスもキャンセルされます。いずれかの子プロセスがキャンセルされた場合、これは CancelledError を送出するように扱います -- この場合外側のフューチャはキャンセル されません。 (This is to prevent the cancellation of one child to cause other children to be cancelled.)

asyncio.iscoroutine(obj)

Return True if obj is a coroutine object, which may be based on a generator or an async def coroutine.

asyncio.iscoroutinefunction(func)

Return True if func is determined to be a coroutine function, which may be a decorated generator function or an async def function.

asyncio.run_coroutine_threadsafe(coro, loop)

Submit a coroutine object to a given event loop.

Return a concurrent.futures.Future to access the result.

This function is meant to be called from a different thread than the one where the event loop is running. Usage:

# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

If an exception is raised in the coroutine, the returned future will be notified. It can also be used to cancel the task in the event loop:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print('The coroutine raised an exception: {!r}'.format(exc))
else:
    print('The coroutine returned: {!r}'.format(result))

このドキュメントの asyncio-multithreading 節を参照してください。

注釈

Unlike other functions from the module, run_coroutine_threadsafe() requires the loop argument to be passed explicitly.

バージョン 3.5.1 で追加.

coroutine asyncio.sleep(delay, result=None, *, loop=None)

与えられた時間 (秒) 後に完了する コルーチン を作成します。result が与えられた場合、コルーチン完了時にそれが呼び出し元に返されます。

スリープの分解能は イベントループの粒度 に依存します。

この関数は コルーチン です。

coroutine asyncio.shield(arg, *, loop=None)

フューチャを待機しキャンセル処理から保護します。

命令文:

res = yield from shield(something())

上の文は以下と完全に等価です:

res = yield from something()

それを含むコルーチンがキャンセルされた場合を 除きsomething() 内で動作するタスクはキャンセルされません。something() 側から見るとキャンセル処理は発生しません。ただし、呼び出し元がキャンセルされた場合は、yield-from 表現は CancelledError を送出します。注意: something() が他の理由でキャンセルされた場合は shield() でも保護できません。

完全にキャンセル処理を無視させたい場合 (推奨はしません) は、以下のように shield() と try/except 節の組み合わせで行うことができます:

try:
    res = yield from shield(something())
except CancelledError:
    res = None
coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

シーケンス futures で与えられたフューチャおよびコルーチンオブジェクトが完了するまで待機します。コルーチンはタスクでラップされます。戻り値は (完了した Future, 未完の Future) の 2 個の集合になります。

シーケンス futures は空であってはなりません。

timeout で結果を返すまで待機する最大秒数を指定できます。timeout は整数か浮動小数点数をとります。timeout が指定されないか None の場合、無期限に待機します。

return_when でこの関数がいつ結果を返すか指定します。指定できる値は以下の concurrent.futures モジュール定数のどれか一つです:

定数 説明
FIRST_COMPLETED いずれかのフューチャが終了したかキャンセルされたときに返します。
FIRST_EXCEPTION いずれかのフューチャが例外の送出で終了した場合に返します。例外を送出したフューチャがない場合は、ALL_COMPLETED と等価になります。
ALL_COMPLETED すべてのフューチャが終了したかキャンセルされたときに返します。

この関数は コルーチン です。

使い方:

done, pending = yield from asyncio.wait(fs)

注釈

これは asyncio.TimeoutError を送出しません。タイムアウトが発生して完了しなかったフューチャは戻り値の後者の集合に含まれます。

coroutine asyncio.wait_for(fut, timeout, *, loop=None)

単一の Future または コルーチンオブジェクト を期限付きで待機します。timeoutNone の場合、フューチャが完了するまでブロックします。

コルーチンは Task でラップされます。

フューチャあるいはコルーチンの結果を返します。タイムアウトが発生した場合、タスクをキャンセルし asyncio.TimeoutError を送出します。タスクのキャンセルを抑止したい場合は shield() でラップしてください。

待機が中止された場合 fut も中止されます。

この関数は コルーチン です。使用法:

result = yield from asyncio.wait_for(fut, 60.0)

バージョン 3.4.3 で変更: 待機が中止された場合 fut も中止されます。

関連キーワード:  コルーチン, asyncio, タスク, フューチャ, coroutine, result, キャンセル, None, from, done