Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

digdag-operator-pg_lockのご紹介

こちらは Gunosy Advent Calendar 2019、2日目の記事です。なお、昨日の記事はかとうさんのわかる Gunosy 2019でした。

qiita.com

はじめに

こんにちは、 Gunosy Tech Lab Data Reliability & MLOps Group の キヴィタスポ(人工知能) (@Civitaspo) / Twitter です。

昨年のAdvent Calendarでは公開したDigdag Pluginを淡々と紹介しました。 tech.gunosy.io 最近ではこれらのPluginを利用して90%ものコスト改善が行われるなど、社内で大活躍が見られています。 data.gunosy.io 皆さんも機会があれば是非利用してください!またフィードバックいただけるとありがたいです!

さて、この記事では昨年に引き続き新しく作ったDigdag Pluginを紹介します。 digdag-operator-pg_lockというPluginを1つ紹介するだけですが、Digdagを利用する上での様々な問題を解決する(はず)なので期待して読んでください! よろしくおねがいします。

もくじ

digdag-operator-pg_lockとは?

github.com

Digdag Workflow実行中にPostgreSQLを使ったLockを取得できるPluginです。

Digdag を利用していると_parallel: trueで並列実行したいけど特定のリソースにアクセスするときは並列数を絞りたいといったケースがよくあります。このPluginはそういった課題を解決するPluginです。

使い方

簡単に例を書いてから詳細な使い方を説明します。

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - pro.civitaspo:digdag-operator-pg_lock:0.0.3

+sample:
  _parallel: true
  +a:
    pg_lock>: hoge-lock
    _do:
      echo>: a
  +b:
    pg_lock>: hoge-lock
    _do:
      echo>: a

pg_lock>というOperator名で利用することが出来ます。Lockを取得している間に実行したいTaskの設定を_do以下に定義します。上記の例では +a+b というTaskで hoge-lock という名前でLockを取得しているため_parallel: trueを指定して並列実行していますが実際にはどちらか先にLockを取得した方の処理が完了するまでもう一方の処理は走ることはありません。

利用可能なOption

なんとなく使い方は把握できたと思うので利用できるオプションを詳細に説明していきます。

  • pg_lock>
    • 取得したいLockの名前を定義します。
    • 必須オプションです。
  • wait_timeout
    • Lock取得待ちをタイムアウトさせる時間を定義します。
    • Default値は 15m
  • expire_in
    • 取得したLockが失効するまでの時間を定義します。
    • 失効しても実行中のTaskはCanceledになったりはしません。
    • Default値は 1h
  • limit
    • pg_lock>で指定した名前でLockを取得できる最大数を定義します。
    • pg_lock>で指定した名前に対する設定のうち複数箇所で異なる limit を設定していた場合はConfigExeceptionが起こります。
    • Default値は 1
  • namespace
    • pg_lock>で指定した名前でLockを取得する名前空間を定義します。名前空間は以下の6つから選択します。
      • site: Digdag全体を対象とした名前空間です。*1
      • project: Project単位で独立した名前空間です。
      • workflow: Workflow単位で独立した名前空間です。
      • session: Session単位で独立した名前空間です。
      • attempt: Attempt単位で独立した名前空間です。
      • global: Globalな名前空間です。後述しますが、複数のDigdag*2を運用している場合に役立ちます。
    • namespace で指定した名前空間の中でlimitに指定された数だけpg_lock>で指定した名前のLockを取得できます。
    • Default値は site
  • unlock_finished_attempt_locks
    • 既に終了したTaskがLockを取得し続けている場合にLockを解放するかどうかを定義します。
    • Digdag起動時に指定するConfigの一つであるpg_lock.digdag.hostに指定されたDigdagのHostへAPIアクセスを行い、Lockを取得したAttemptが終了しているかどうかを確認しています。
    • Default値は true
  • _do
    • Lock取得中に実行したいTaskを記述します。
    • 必須オプションです。

なお、Digdag起動時に指定するConfigもあるのですが利用するにあたって迷うことはないと思うので今回は省略します。README を参照してください。

ユースケース: _parallel: trueで並列数制御したい

ここからは想定ユースケースを説明していきます。1つ目は並列数制御をしたいというユースケースです。例を見てみましょう。

+run:
  loop>: 100
  _parallel: true
  _do:
    +echo:
      echo>: "Sleep ${i % 2} seconds"
    +sleep:
      sh>: "sleep ${i % 2}"

この例ではloop> Operatorで_parallel: trueが指定されているので100並列で_do以下のTaskが実行されます。これを pg_lock> Operatorを使って_do以下のTaskが10並列で実行されるように変更してみましょう。

+run:
  loop>: 100
  _parallel: true
  _do:
    pg_lock>: hoge
    namespace: attempt
    limit: 10
    _do:
      +echo:
        echo>: "Sleep ${i % 2} seconds"
      +sleep:
        sh>: "sleep ${i % 2}"

hogeという名前で10個までLockを取得できる設定を入れました。これでLockを取得できるまで_do以下のTaskは実行されなくなるため10並列で実行されるようになります。なお、namespace: attemptを指定しているので別のattemptが同時に動いていても影響はありません。便利ですね!

ユースケース: 特定のリソースへのアクセス数を制限したい

2つ目は特定のリソースへのアクセス数を制限したいというユースケースです。例を見てみましょう。

+run:
  +http-access:
    http>: http://takusan-tatakenai-api.com/sugoku_omoi/
    store_content: true
  +echo:
    echo>: "${http.last_content}"

この例ではhttp> Operatorでhttp://takusan-tatakenai-api.com/sugoku_omoi/にGETリクエストを投げてResponse Bodyを表示するWorkflowです。

例の仮定として、このAPIは以下のような性質を持ってるとします。

  • このAPIは裏ですごく重い処理が走り、Responseが返ってくるまで1時間かかる
  • 並列アクセスすると5XX系のResponseを返してしまう
  • 様々なWorkflowから参照される重要なAPI

このAPIアクセスを安全に行えるように変更してみましょう。

+run:
  +http-access:
    pg_lock>: http://takusan-tatakenai-api.com/sugoku_omoi/
    wait_timeout: 3h
    expires_in: 1h10m
    _do:
      http>: http://takusan-tatakenai-api.com/sugoku_omoi/
      store_content: true
  +echo:
    echo>: "${http.last_content}"

http://takusan-tatakenai-api.com/sugoku_omoi/という名前でLockを取得する設定を入れました。wait_timeout, expires_inの設定はテキトーですが仕様上Responseが返ってくるまで1時間かかるのでどちらも1時間以上に設定する必要があるでしょう。limitはDefault値1namespace はDefault値siteなのでhttp://takusan-tatakenai-api.com/sugoku_omoi/という名前のLockはDigdag全体で1つしか存在できません。このAPIアクセスを必要とする全てのWorkflowにこのpg_lock> Operatorの設定を入れることで必ずシリアルにAPIアクセスが行われることを保証できます。

また、複数のDigdagを運用していたとしても pg_lock> Operatorを利用すれば同様に安全なAPIアクセスが実現できます。

+run:
  +http-access:
    pg_lock>: http://takusan-tatakenai-api.com/sugoku_omoi/
    namespace: global
    unlock_finished_attempt_locks: false
    wait_timeout: 3h
    expires_in: 1h10m
    _do:
      http>: http://takusan-tatakenai-api.com/sugoku_omoi/
      store_content: true
  +echo:
    echo>: "${http.last_content}"

上の例ではnamespace: globalという設定をしました。これはpg_lock> OperatorがLockのために利用しているPostgreSQL Database内でhttp://takusan-tatakenai-api.com/sugoku_omoi/という名前のLockは1つしか存在できないことを意味します。複数のDigdagを運用していたとしてもpg_lock> Operator用のPostgreSQL Databaseを共有することで安全なAPIアクセスを実現することができます。ただし、注意していただきたいのは複数のDigdagでnamespace: globalな同一名称のLockを取得する場合unlock_finished_attempt_locks: falseを設定する必要があることです。このOptionは既に終了済みのAttemptが保有している同一設定のLockを失効させるかどうかを指定するOptionですが、複数のDigdagでLockを共有している場合Lockを所有しているAttemptがどのDigdagのAttemptなのか判別できないため意図せずLockを失効させてしまう可能性があるからです。複数のDigdagでPostgreSQL Databaseを共有する場合はunlock_finished_attempt_locks: falseを設定する必要があると覚えておいて下さい。

ユースケース: Workflowの重複実行を避けたい

3つ目の想定ユースケースはWorkflowの重複実行を避けたいというユースケースです。例を見てみましょう。

schedule:
  cron>: "*/15 * * * *"

+run:
  sh>: |
    set -e
    aws s3 rm --recursive s3://my-bucket/path/to/${session_date_compact}/
    cat <<EOS > /tmp/${session_date_compact}-${attempt_id}.yml; embulk run /tmp/${session_date_compact}-${attempt_id}.yml
    in:
      type: http
      url: https://sample.example.com/api/v1/report/
      params:
        - {name: apiKey, value: ${secret:sample_example_com.api_key}}
        - {name: date, value: ${session_date_compact}}
      parser:
        type: none
    out:
      type: s3_parquet
      bucket: my-bucket
      path_prefix: path/to/${session_date_compact}/data-${attempt_id}-
    EOS

この例では15分おきにsh> Operatorを使ったEmbulk*3のConfig生成と実行を行っています。

例の仮定として、このWorkflowの目的を書いておきます。

  • https://sample.example.com/api/v1/report/から取得できるレポートをS3に保存したい。
  • https://sample.example.com/api/v1/report/は日別のレポートしか提供していない。
  • 当日分のレポートは徐々に更新されるので高頻度でレポートを取得し、S3上のデータを常に最新に保ちたい。

こういった要件だとしてこのWorkflowには問題があります。それは同じ${session_date_compact}を持つAttemptが複数同時に走ってしまった場合、s3://my-bucket/path/to/${session_date_compact}/へ格納されるデータが重複する可能性があることです。このデータがどう利用されるかによってはpg_lock> Operatorを使わなくても解決する方法はあるのですが、今回はpg_lock> Operatorを使ってこの問題を解決してみましょう。

schedule:
  cron>: "*/15 * * * *"

+run:
  pg_lock>: ${session_date_compact}
  namespace: workflow
  _do:
    sh>: |
      set -e
      aws s3 rm --recursive s3://my-bucket/path/to/${session_date_compact}/
      cat <<EOS > /tmp/${session_date_compact}-${attempt_id}.yml; embulk run /tmp/${session_date_compact}-${attempt_id}.yml
      in:
        type: http
        url: https://sample.example.com/api/v1/report/
        params:
          - {name: apiKey, value: ${secret:sample_example_com.api_key}}
          - {name: date, value: ${session_date_compact}}
        parser:
          type: none
      out:
        type: s3_parquet
        bucket: my-bucket
        path_prefix: path/to/${session_date_compact}/data-${attempt_id}-
      EOS

namespace: workflow${session_date_compact}という名前のLockを取得することにしました。${session_date_compact}はもちろん実行時には変数評価されるので例えば20191202のような値になります。この設定により特定の日付に対するWorkflowの実行は必ず並列で走らないことを保証できます。このように取得したいデータのライフサイクルとWorkflowを実行したいスケジュールが異なる場合にもpg_lock> Operatorは役立ちます。便利ですね!

開発時の苦悩

ここからは内部実装に興味がある人向けに実装時の苦悩をまとめていきます。本当に頭を悩ませていて今も正しく課題を解決できたのか自信がありません。他の解決策があれば優しく教えてもらえると嬉しいです。*4

pg_advisory_lockを使った行レベル排他ロックの実現

実装中limit機能を実現するために非常に頭を悩ませました。というのも、PostgreSQLは存在しない行への排他ロックを取得できないからです。*5 そもそもLockはpg_lock> Operatorが作ったPostgreSQL Table*6の1 Recordとして実現しています。limit機能は同一名前空間内で同一名称を持つRecordの総数がlimitで定義された数を超えてはいけないという制約であり、総数確認を行ってRecordがInsert可能であることを確認してからInsertを行わなければならないため、存在しない行への排他ロックを取得できなければ制約を満たせませんでした。そのため pg_advisory_lockを利用することにしました。

pg_advisory_lockは以下に引用したとおりbigintな数値1つかintな数値2つに対して排他ロックを取得できます。*7

pg_advisory_lock locks an application-defined resource, which can be identified either by a single 64-bit key value or two 32-bit key values (note that these two key spaces do not overlap). If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available. The lock is exclusive.

名前空間と名前のそれぞれに対してハッシュ関数を通してintな数値に変換しpg_advisory_lockを使うことで、論理的に存在しない行に対する排他ロックを実現することができました。*8

ConnectionPoolしてるのにConnection溢れる問題への対応

いざ本番導入してみたら謎のConnection溢れに悩まされました。自分としてはConnection PoolingしているのでDigdagの数に比例してConnectionが貼られるような実装をしたつもりでした。しかしながら実際にはそうなっておらずずっとConnectionが増えていっていました。コードを読み解いていくと以下のIssueに書いたような状態となっていました。

github.com

Digdagには起動時にLoadされるStaticなPluginとdigファイルで定義されたPluginの情報からDynamicにLoadされるPluginと2種類存在しています。前者はClasspathにjarを入れた上で起動して上げる必要があるためJavaに詳しい人でないと利用するのが難しく、僕は後者の使い方ができる実装を選択していました。しかしながらDynamicにLoadされるPluginは10分でOperatorのCacheが切れてしまい、OperatorのObjectおよびそのDependenciesが破棄されてしまう仕様になっていました。またその破棄のタイミングではGuiceのLifeCycleも効かず*9、Connection Poolの終了処理ができていない状態になっていました。苦肉の策としてfinalize()メソッド内でConnectionのCloseを行う実装を行いましたが、あるべき姿ではないのでIssueに書いたようにOperatorのCacheが切れるタイミングでHookできるようになればいいなと思っています。

AttemptがCancelされてもLockを失効させられない問題

これはIssueを見てもらったほうが早いと思うのでこちらを見て下さい。 github.com

Digdagではtry...catch...に相当する処理は_errorで実現できるのですがtry...catch...finally...といった処理を実現することはできません。pg_lock> OperatorではAttemptがどういった理由で終了していてもLockが解放されるように実装したかったのですが現在のDigdagの仕様上実装不能なのでdigdag-clientを使ってAPI経由でAttemptの状態を確認するようにしました。

おわりに

この記事ではdigdag-operator-pg_lockを詳細に紹介しました。機能としては単機能ですが、この機能はこれまでDigdagが抱えていた多くの課題のうち割と大きめな課題を解決してくれると信じています。ぜひ皆さんもご利用になっていただけると嬉しいです。そしてフィードバックをいただけると嬉しいです。よろしくおねがいします。

*1:コードを読む限り、Digdagは1つのPostgreSQLをマルチテナントで使えるようにSiteIdという単位で管理されています。正確にはそのSiteId毎に別れた名前空間です。

*2:複数のWorkerがいるという意味ではなく、Digdag Serverが利用するPostgreSQL Databaseレベルで別れているDigdagが複数という意味

*3:https://www.embulk.org/

*4:特にPostgreSQLガチ勢の意見を聞いてみたい

*5:PostgreSQLにはLOCK関数がありますが存在しない行へのロック取得ではない https://www.postgresql.org/docs/11/sql-lock.html

*6:https://github.com/civitaspo/digdag-operator-pg_lock/blob/0.0.3/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/pg/migration/V0_0_1_1__CreateTable_digdag_pg_locks.scala#L16-L28

*7:https://www.postgresql.org/docs/11/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS

*8:https://github.com/civitaspo/digdag-operator-pg_lock/blob/0.0.3/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala#L113-L141

*9:https://github.com/embulk/guice-bootstrap