こちらは Gunosy Advent Calendar 2019、2日目の記事です。なお、昨日の記事はかとうさんのわかる Gunosy 2019でした。
はじめに
こんにちは、 Gunosy Tech Lab Data Reliability & MLOps Group の キヴィタスポ(人工知能) (@Civitaspo) / Twitter です。
昨年のAdvent Calendarでは公開したDigdag Pluginを淡々と紹介しました。 tech.gunosy.io 最近ではこれらのPluginを利用して90%ものコスト改善が行われるなど、社内で大活躍が見られています。 data.gunosy.io 皆さんも機会があれば是非利用してください!またフィードバックいただけるとありがたいです!
さて、この記事では昨年に引き続き新しく作ったDigdag Pluginを紹介します。 digdag-operator-pg_lockというPluginを1つ紹介するだけですが、Digdagを利用する上での様々な問題を解決する(はず)なので期待して読んでください! よろしくおねがいします。
もくじ
digdag-operator-pg_lockとは?
Digdag Workflow実行中にPostgreSQLを使ったLockを取得できるPluginです。
Digdag を利用していると_parallel: true
で並列実行したいけど特定のリソースにアクセスするときは並列数を絞りたいといったケースがよくあります。このPluginはそういった課題を解決するPluginです。
使い方
簡単に例を書いてから詳細な使い方を説明します。
_export: plugin: repositories: - https://jitpack.io dependencies: - pro.civitaspo:digdag-operator-pg_lock:0.0.3 +sample: _parallel: true +a: pg_lock>: hoge-lock _do: echo>: a +b: pg_lock>: hoge-lock _do: echo>: a
pg_lock>
というOperator名で利用することが出来ます。Lockを取得している間に実行したいTaskの設定を_do
以下に定義します。上記の例では +a
と +b
というTaskで hoge-lock
という名前でLockを取得しているため_parallel: true
を指定して並列実行していますが実際にはどちらか先にLockを取得した方の処理が完了するまでもう一方の処理は走ることはありません。
利用可能なOption
なんとなく使い方は把握できたと思うので利用できるオプションを詳細に説明していきます。
pg_lock>
- 取得したいLockの名前を定義します。
- 必須オプションです。
wait_timeout
- Lock取得待ちをタイムアウトさせる時間を定義します。
- Default値は
15m
expire_in
- 取得したLockが失効するまでの時間を定義します。
- 失効しても実行中のTaskはCanceledになったりはしません。
- Default値は
1h
limit
pg_lock>
で指定した名前でLockを取得できる最大数を定義します。pg_lock>
で指定した名前に対する設定のうち複数箇所で異なるlimit
を設定していた場合はConfigExeception
が起こります。- Default値は
1
namespace
pg_lock>
で指定した名前でLockを取得する名前空間を定義します。名前空間は以下の6つから選択します。namespace
で指定した名前空間の中でlimit
に指定された数だけpg_lock>
で指定した名前のLockを取得できます。- Default値は
site
unlock_finished_attempt_locks
- 既に終了したTaskがLockを取得し続けている場合にLockを解放するかどうかを定義します。
- Digdag起動時に指定するConfigの一つである
pg_lock.digdag.host
に指定されたDigdagのHostへAPIアクセスを行い、Lockを取得したAttemptが終了しているかどうかを確認しています。 - Default値は
true
_do
- Lock取得中に実行したいTaskを記述します。
- 必須オプションです。
なお、Digdag起動時に指定するConfigもあるのですが利用するにあたって迷うことはないと思うので今回は省略します。README を参照してください。
ユースケース: _parallel: true
で並列数制御したい
ここからは想定ユースケースを説明していきます。1つ目は並列数制御をしたいというユースケースです。例を見てみましょう。
+run: loop>: 100 _parallel: true _do: +echo: echo>: "Sleep ${i % 2} seconds" +sleep: sh>: "sleep ${i % 2}"
この例ではloop>
Operatorで_parallel: true
が指定されているので100並列で_do
以下のTaskが実行されます。これを pg_lock>
Operatorを使って_do
以下のTaskが10並列で実行されるように変更してみましょう。
+run: loop>: 100 _parallel: true _do: pg_lock>: hoge namespace: attempt limit: 10 _do: +echo: echo>: "Sleep ${i % 2} seconds" +sleep: sh>: "sleep ${i % 2}"
hoge
という名前で10個までLockを取得できる設定を入れました。これでLockを取得できるまで_do
以下のTaskは実行されなくなるため10並列で実行されるようになります。なお、namespace: attempt
を指定しているので別のattemptが同時に動いていても影響はありません。便利ですね!
ユースケース: 特定のリソースへのアクセス数を制限したい
2つ目は特定のリソースへのアクセス数を制限したいというユースケースです。例を見てみましょう。
+run: +http-access: http>: http://takusan-tatakenai-api.com/sugoku_omoi/ store_content: true +echo: echo>: "${http.last_content}"
この例ではhttp>
Operatorでhttp://takusan-tatakenai-api.com/sugoku_omoi/
にGETリクエストを投げてResponse Bodyを表示するWorkflowです。
例の仮定として、このAPIは以下のような性質を持ってるとします。
- このAPIは裏ですごく重い処理が走り、Responseが返ってくるまで1時間かかる
- 並列アクセスすると5XX系のResponseを返してしまう
- 様々なWorkflowから参照される重要なAPI
このAPIアクセスを安全に行えるように変更してみましょう。
+run: +http-access: pg_lock>: http://takusan-tatakenai-api.com/sugoku_omoi/ wait_timeout: 3h expires_in: 1h10m _do: http>: http://takusan-tatakenai-api.com/sugoku_omoi/ store_content: true +echo: echo>: "${http.last_content}"
http://takusan-tatakenai-api.com/sugoku_omoi/
という名前でLockを取得する設定を入れました。wait_timeout
, expires_in
の設定はテキトーですが仕様上Responseが返ってくるまで1時間かかるのでどちらも1時間以上に設定する必要があるでしょう。limit
はDefault値1
、namespace
はDefault値site
なのでhttp://takusan-tatakenai-api.com/sugoku_omoi/
という名前のLockはDigdag全体で1つしか存在できません。このAPIアクセスを必要とする全てのWorkflowにこのpg_lock>
Operatorの設定を入れることで必ずシリアルにAPIアクセスが行われることを保証できます。
また、複数のDigdagを運用していたとしても pg_lock>
Operatorを利用すれば同様に安全なAPIアクセスが実現できます。
+run: +http-access: pg_lock>: http://takusan-tatakenai-api.com/sugoku_omoi/ namespace: global unlock_finished_attempt_locks: false wait_timeout: 3h expires_in: 1h10m _do: http>: http://takusan-tatakenai-api.com/sugoku_omoi/ store_content: true +echo: echo>: "${http.last_content}"
上の例ではnamespace: global
という設定をしました。これはpg_lock>
OperatorがLockのために利用しているPostgreSQL Database内でhttp://takusan-tatakenai-api.com/sugoku_omoi/
という名前のLockは1つしか存在できないことを意味します。複数のDigdagを運用していたとしてもpg_lock>
Operator用のPostgreSQL Databaseを共有することで安全なAPIアクセスを実現することができます。ただし、注意していただきたいのは複数のDigdagでnamespace: global
な同一名称のLockを取得する場合unlock_finished_attempt_locks: false
を設定する必要があることです。このOptionは既に終了済みのAttemptが保有している同一設定のLockを失効させるかどうかを指定するOptionですが、複数のDigdagでLockを共有している場合Lockを所有しているAttemptがどのDigdagのAttemptなのか判別できないため意図せずLockを失効させてしまう可能性があるからです。複数のDigdagでPostgreSQL Databaseを共有する場合はunlock_finished_attempt_locks: false
を設定する必要があると覚えておいて下さい。
ユースケース: Workflowの重複実行を避けたい
3つ目の想定ユースケースはWorkflowの重複実行を避けたいというユースケースです。例を見てみましょう。
schedule: cron>: "*/15 * * * *" +run: sh>: | set -e aws s3 rm --recursive s3://my-bucket/path/to/${session_date_compact}/ cat <<EOS > /tmp/${session_date_compact}-${attempt_id}.yml; embulk run /tmp/${session_date_compact}-${attempt_id}.yml in: type: http url: https://sample.example.com/api/v1/report/ params: - {name: apiKey, value: ${secret:sample_example_com.api_key}} - {name: date, value: ${session_date_compact}} parser: type: none out: type: s3_parquet bucket: my-bucket path_prefix: path/to/${session_date_compact}/data-${attempt_id}- EOS
この例では15分おきにsh>
Operatorを使ったEmbulk*3のConfig生成と実行を行っています。
例の仮定として、このWorkflowの目的を書いておきます。
https://sample.example.com/api/v1/report/
から取得できるレポートをS3に保存したい。https://sample.example.com/api/v1/report/
は日別のレポートしか提供していない。- 当日分のレポートは徐々に更新されるので高頻度でレポートを取得し、S3上のデータを常に最新に保ちたい。
こういった要件だとしてこのWorkflowには問題があります。それは同じ${session_date_compact}
を持つAttemptが複数同時に走ってしまった場合、s3://my-bucket/path/to/${session_date_compact}/
へ格納されるデータが重複する可能性があることです。このデータがどう利用されるかによってはpg_lock>
Operatorを使わなくても解決する方法はあるのですが、今回はpg_lock>
Operatorを使ってこの問題を解決してみましょう。
schedule: cron>: "*/15 * * * *" +run: pg_lock>: ${session_date_compact} namespace: workflow _do: sh>: | set -e aws s3 rm --recursive s3://my-bucket/path/to/${session_date_compact}/ cat <<EOS > /tmp/${session_date_compact}-${attempt_id}.yml; embulk run /tmp/${session_date_compact}-${attempt_id}.yml in: type: http url: https://sample.example.com/api/v1/report/ params: - {name: apiKey, value: ${secret:sample_example_com.api_key}} - {name: date, value: ${session_date_compact}} parser: type: none out: type: s3_parquet bucket: my-bucket path_prefix: path/to/${session_date_compact}/data-${attempt_id}- EOS
namespace: workflow
で${session_date_compact}
という名前のLockを取得することにしました。${session_date_compact}
はもちろん実行時には変数評価されるので例えば20191202
のような値になります。この設定により特定の日付に対するWorkflowの実行は必ず並列で走らないことを保証できます。このように取得したいデータのライフサイクルとWorkflowを実行したいスケジュールが異なる場合にもpg_lock>
Operatorは役立ちます。便利ですね!
開発時の苦悩
ここからは内部実装に興味がある人向けに実装時の苦悩をまとめていきます。本当に頭を悩ませていて今も正しく課題を解決できたのか自信がありません。他の解決策があれば優しく教えてもらえると嬉しいです。*4
pg_advisory_lockを使った行レベル排他ロックの実現
実装中limit
機能を実現するために非常に頭を悩ませました。というのも、PostgreSQLは存在しない行への排他ロックを取得できないからです。*5 そもそもLockはpg_lock>
Operatorが作ったPostgreSQL Table*6の1 Recordとして実現しています。limit
機能は同一名前空間内で同一名称を持つRecordの総数がlimit
で定義された数を超えてはいけないという制約であり、総数確認を行ってRecordがInsert可能であることを確認してからInsertを行わなければならないため、存在しない行への排他ロックを取得できなければ制約を満たせませんでした。そのため pg_advisory_lock
を利用することにしました。
pg_advisory_lock
は以下に引用したとおりbigintな数値1つかintな数値2つに対して排他ロックを取得できます。*7
pg_advisory_lock
locks an application-defined resource, which can be identified either by a single 64-bit key value or two 32-bit key values (note that these two key spaces do not overlap). If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available. The lock is exclusive.
名前空間と名前のそれぞれに対してハッシュ関数を通してintな数値に変換しpg_advisory_lock
を使うことで、論理的に存在しない行に対する排他ロックを実現することができました。*8
ConnectionPoolしてるのにConnection溢れる問題への対応
いざ本番導入してみたら謎のConnection溢れに悩まされました。自分としてはConnection PoolingしているのでDigdagの数に比例してConnectionが貼られるような実装をしたつもりでした。しかしながら実際にはそうなっておらずずっとConnectionが増えていっていました。コードを読み解いていくと以下のIssueに書いたような状態となっていました。
Digdagには起動時にLoadされるStaticなPluginとdigファイルで定義されたPluginの情報からDynamicにLoadされるPluginと2種類存在しています。前者はClasspathにjarを入れた上で起動して上げる必要があるためJavaに詳しい人でないと利用するのが難しく、僕は後者の使い方ができる実装を選択していました。しかしながらDynamicにLoadされるPluginは10分でOperatorのCacheが切れてしまい、OperatorのObjectおよびそのDependenciesが破棄されてしまう仕様になっていました。またその破棄のタイミングではGuiceのLifeCycleも効かず*9、Connection Poolの終了処理ができていない状態になっていました。苦肉の策としてfinalize()
メソッド内でConnectionのCloseを行う実装を行いましたが、あるべき姿ではないのでIssueに書いたようにOperatorのCacheが切れるタイミングでHookできるようになればいいなと思っています。
AttemptがCancelされてもLockを失効させられない問題
これはIssueを見てもらったほうが早いと思うのでこちらを見て下さい。 github.com
Digdagではtry...catch...
に相当する処理は_error
で実現できるのですがtry...catch...finally...
といった処理を実現することはできません。pg_lock>
OperatorではAttemptがどういった理由で終了していてもLockが解放されるように実装したかったのですが現在のDigdagの仕様上実装不能なのでdigdag-clientを使ってAPI経由でAttemptの状態を確認するようにしました。
おわりに
この記事ではdigdag-operator-pg_lockを詳細に紹介しました。機能としては単機能ですが、この機能はこれまでDigdagが抱えていた多くの課題のうち割と大きめな課題を解決してくれると信じています。ぜひ皆さんもご利用になっていただけると嬉しいです。そしてフィードバックをいただけると嬉しいです。よろしくおねがいします。
*1:コードを読む限り、Digdagは1つのPostgreSQLをマルチテナントで使えるようにSiteIdという単位で管理されています。正確にはそのSiteId毎に別れた名前空間です。
*2:複数のWorkerがいるという意味ではなく、Digdag Serverが利用するPostgreSQL Databaseレベルで別れているDigdagが複数という意味
*4:特にPostgreSQLガチ勢の意見を聞いてみたい
*5:PostgreSQLにはLOCK関数がありますが存在しない行へのロック取得ではない https://www.postgresql.org/docs/11/sql-lock.html
*6:https://github.com/civitaspo/digdag-operator-pg_lock/blob/0.0.3/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/pg/migration/V0_0_1_1__CreateTable_digdag_pg_locks.scala#L16-L28
*7:https://www.postgresql.org/docs/11/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
*8:https://github.com/civitaspo/digdag-operator-pg_lock/blob/0.0.3/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala#L113-L141