このページでは、タスクを作成して push キューに配置する方法を説明します。タスクを処理する場合は、新しいタスク オブジェクトを作成してキューに配置する必要があります。タスクを処理するサービスとハンドラを明示的に指定し、必要に応じてタスク固有のデータをハンドラに渡すこともできます。また、タスクを実行するタイミングをスケジューリングしたり、失敗した場合にタスクを再試行する回数を制限したりするなど、タスクの構成を微調整することもできます。
新しいタスクを作成する
タスクを作成し、キューに入れるには、taskqueue.add() 関数を呼び出します。次のコードでは worker という名前のサービスを対象とするタスクを作成し、URL /update-counter を設定してハンドラを呼び出します。
または、Task オブジェクトを作成して、その add() メソッドを呼び出すことができます。
ワーカー サービスを指定する
タスクがキューから取り出される(ポップ)と、タスクはタスクキュー サービスによってワーカー サービスに送信されます。どのタスクにもターゲットと url があり、どのサービスとハンドラがそのタスクを実行するかがこれらによって決まります。
target-
ターゲットは、どのサービスがタスク実行の HTTP リクエストを受け取るかを指定します。これは文字列であり、サービス / バージョン / インスタンスを正規形式のいずれかで指定します。よく使用されるものは次のとおりです。
service version.service instance.version.serviceターゲット文字列の前には、アプリのドメイン名が付加されます。タスクのターゲットを設定するには、次の 3 つの方法があります。
-
タスクを作成するときに、ターゲットを明示的に宣言します。
-
queue.yamlでキューを定義するときにtargetディレクティブを指定します(queue-blueの定義をご覧ください)。targetが指定されているキューに追加されるタスクはすべて、そのターゲットを使用します。別のターゲットがタスク作成時に割り当てられていても無視されます。 -
前述の 2 つのどちらの方法でもターゲットが指定されていない場合は、タスクのターゲットはそのタスクをキューに追加したサービスのバージョンとなります。この方法でタスクをデフォルトのサービスとバージョンからキューに追加した場合に、タスク実行の前にデフォルトのバージョンが変更されたときは、新しいデフォルトのバージョンでタスクが実行されます。
-
url-
urlに基づいて、ターゲット サービスのハンドラの 1 つが選択され、そのハンドラがタスクを実行します。urlは、ターゲット サービス内のハンドラ URL パターンの 1 つに一致する必要があります。タスクに指定されたメソッドがGETまたはPULLである場合は、urlにクエリ パラメータを含めることができます。urlが指定されていない場合は、デフォルトの URL/_ah/queue/[QUEUE_NAME]が使用されます。ここで[QUEUE_NAME]はタスクのキューの名前です。
データをハンドラに渡す
データをハンドラに渡すには、タスクの URL 内のクエリ パラメータとして渡すという方法がありますが、これを使用できるのはタスクで指定されているメソッドが GET または PULL である場合のみです。
payload や params フィールドを使ってタスクにデータを追加することもできます。
次の 3 つの呼び出しは同等です。
taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')
注:
- ペイロードは HTTP リクエストの本文で配信されます。
- HTTP
POSTメソッドをペイロードとともに使用する場合や、HTTPGETメソッドを使用するときに URL とクエリ パラメータを付加する場合は、params を指定しないでください。
タスクに名前を付ける
新しいタスクを作成すると、デフォルトでは App Engine によってタスクに一意の名前が割り当てられます。ただし、name パラメータを使用すると、タスクに独自の名前を割り当てることができます。独自のタスク名を割り当てることの利点は、名前付きのタスクでは重複が除外されることです。つまり、タスク名を使用すると、タスクが 1 回のみ追加されることを保証できます。重複の除外は、タスクが完了するか、削除されてから 9 日間続きます。
重複の除外ロジックではパフォーマンスのオーバーヘッドが大幅に増加するため、レイテンシが増加し、場合によっては名前付きタスクに伴うエラー率が高まるので注意してください。このようなコストは、タイムスタンプのように連続したタスク名が付けられている場合、大幅に増大する可能性があります。そのため、独自の名前を割り当てる場合は、コンテンツのハッシュなどの適度に分散された接頭辞をタスク名に使用することをおすすめします。
独自の名前をタスクに割り当てる場合、名前の最大長が 500 文字で、名前には英字の大文字と小文字、数字、アンダースコア、ハイフンを使用できることに注意してください。
taskqueue.add(url='/url/path', name='first-try')
タスクを非同期で追加する
デフォルトでは、Task Queue API の呼び出しは同期的です。ほとんどのシナリオでは、同期呼び出しで問題ありません。たとえば、タスク 1 個の追加は、通常は短時間で終わるオペレーションです。タスク 1 個の追加に要する時間の中央値は 5 ミリ秒で、タスク 1,000 個のうち 1 個の割合で最大 300 ミリ秒かかるものがあります。周期的な出来事、たとえばバックエンドのアップグレードが原因で、タスク 1,000 個につき 1 個の割合でスパイクが生じることがあり、時間は最大 1 秒かかります。
開発するアプリケーションについて、レイテンシの低さが求められている場合は、Task Queue API の非同期呼び出しを利用するとレイテンシを最小化できます。
たとえば、タスクを 10 個追加する必要があり、追加先は 10 個の異なるキューであるとします(したがってバッチ処理はできません)。最悪のケースでは、1 つのループの中で queue.add() を 10 回呼び出すと最大 10 秒間のブロックが起きる可能性があります(実際に起きることはほとんどありませんが)。非同期インターフェースを使用してタスクをそれぞれのキューに追加する処理を並行して行うと、最悪のケースのレイテンシーを 1 秒まで短縮できます。
タスクキューに対して非同期呼び出しを行う場合は、Queue クラスと RPC オブジェクトの非同期メソッドを使用します。返された RPC オブジェクトに対して get_result() を呼び出すと、リクエストを強制的に完了させることができます。トランザクションの中で非同期でタスクを追加するときは、リクエストを確実に完了させるために、get_result() を RPC に対して呼び出してからトランザクションをコミットしてください。
Cloud Datastore トランザクションの中でタスクをキューに追加する
タスクをキューに追加する処理を、Cloud Datastore トランザクションの一部として行うことができます。このようにすると、タスクがキューに追加される(かつ、キューへの追加が保証される)のは、トランザクションが正常に commit された場合のみとなります。トランザクションの中で追加されたタスクは、そのトランザクションの一部と見なされ、同じレベルの分離と整合性を持つことになります。
1 つのトランザクションの中でタスクキューに挿入できるトランザクション タスクは 5 個までです。トランザクション タスクの名前をユーザーが指定することはできません。
次のコードサンプルでは、Cloud Datastore トランザクションの一部としてトランザクション タスクを push キューに追加する方法を示します。
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
@ndb.transactional
def do_something_in_transaction():
taskqueue.add(url='/path/to/my/worker', transactional=True)
#...
do_something_in_transaction()
ワーカー サービスの代わりに遅延タスク ライブラリを使用する
上記のセクションで説明したように、実行するタスクごとに別個にハンドラを設定すると、タスクの複雑な引数のシリアル化 / シリアル化解除と同様、煩雑になる場合があります。特に、キューで実行するタスクが小さく、多種多様で数も多い場合には、厄介です。Python SDK に含まれるライブラリ(google.appengine.ext.deferred)を使用すると、単純な関数をエクスポーズするだけで、専用のタスク ハンドラの設定やパラメータのシリアル化/シリアル化解除の作業をすべて回避できます。
このライブラリを使用するには、deferred ビルトインを app.yaml に追加する必要があります。詳しくは、app.yaml リファレンスのビルトイン ハンドラのセクションを参照してください。
deferred ライブラリを使用するには、関数とその引数を deferred.defer() に渡すだけです。
import logging
from google.appengine.ext import deferred
def do_something_expensive(a, b, c=None):
logging.info("Doing something expensive!")
# Do your work here
# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)
deferred ライブラリは、関数の呼び出しとその引数をパッケージ化して、タスク キューに追加します。タスクの実行時に、deferred ライブラリでは do_something_expensive("Hello, world!", 42, True) を実行します。
Python で deferred ライブラリを使用する方法について詳しくは、遅延ライブラリを使用したバックグラウンド作業をご覧ください。
マルチテナント アプリケーションでタスクを扱う
デフォルトでは、push キューについては、タスクの作成時に名前空間マネージャで設定されていた現在の名前空間が使用されます。アプリケーションでマルチテナントを使用する場合は、名前空間 Python API をご覧ください。
次のステップ
- タスクハンドラを作成する方法を学習する

