Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

digdag の retry と for_each を使う時のハマりどころをなんとかして回避したお話

はじめに

DR & MLOps チームの菊地です。

弊チームはデータ処理のための Workflow Engine として digdag を採用していますが、この記事では digdag を利用する中で出会った、あるハマりどころとそのワークアラウンドを紹介したいと思います。

ハマりどころ詳細

ハマりどころは retry と for_each Operator を組み合わせて利用した場合に発生するもので、下記の issue で報告されています。

github.com

こちらは digdag の 0.9.32 以降のバージョンで、 retry 以下の Task に for_each が含まれる場合で、for_each 以下の Task でエラーが発生したとします。このとき、retry 後の for_each 以下の Task が多重に実行されてしまうというハマりどころです。

_retry:
  limit: 3

+run:
  for_each>:
    v: [0, 1, 2]
  _do:
    +echo:
      echo>: ${v}

    +failed:
      fail>: "error here!"

例えば、上記の dig ファイルであれば、 fail でエラーが発生してから、retry が発生します。その後、 for_each 以下の Task(echo や fail)が1回しか実行されないのが正しい状態です。しかしながら digdag 0.9.32 以降では retry 後に for_each 以下の Task(echo や fail)が複数回実行されてしまいます。

上記 dig の実行画面。最初にエラーが発生した後、retry が実行されますが、その後では echo と run が2回実行されています

もちろん 0.9.31 以前に digdag のバージョンを戻せば正しく動きますが、plugin を用いている場合では戻すことが難しいケースもあります。なぜなら、digdag が https で maven へ接続するようになったのは、バージョン 0.9.41 からだからです。maven は 2020年1月15日から https での接続を要求されるようになりました。そのため、バージョンを下げてしまうと、 plugin を取得できないエラーが発生する場合があります。

github.com

さらに、 この issue へ対応する Pull Request が バージョン 0.10 用の branch へ merge されているのもの、こちらはいつリリースされるかは執筆時点ではアナウンスされていません。

github.com

そのため弊チームでも、この issue のワークアラウンドを考える必要がありました。

ワークアラウンド

この問題は同一 attempt を構成する Workflow 内において、retry 以下のタスクに for_each が含まれている場合に発生します。これを回避するための方針として、retry を含む処理と for_each 含む処理を分割し、別々の Workflow にすることにしました。ここでは便宜上、 retry を含む Workflow を schedule-layer とし、 for_each を含む Workflow を core-layer とします。

次に schedule-layer から core-layer を別 attempt で動かすために、Require Operator を利用します。

docs.digdag.io

Require Operator の特徴として、参照先の Workflow を異なる attempt で実行することができ、参照先の attempt が終了するまで参照元が待ち続ける挙動をします。他の Workflow を参照するものにInclude Directive や Call Operator がありますが、参照元の attempt に含まれる形で動作するため、Require Operator を用いる必要があります。

ワークアラウンド詳細

この章では上記ワークアラウンドを実際に実装した順序で説明していきます。このワークアラウンドを実施することで回避したかったハマりどころは回避できました。しかしながら、実施する中で以下の2つの問題が発生しました。

  • retry_attempt_name に毎回ランダム文字列入れる必要がある
  • retry_attempt_name を事前に評価して静的な値にする必要がある この章ではこれらの問題を解消しつつ、どのようにハマりどころが回避されたのかコードを示しながら説明したいと思います。

Require Operator で for_each を含む Workflow を別 attempt で実行する

まず、schedule-layer で Require Operator を用いて、core-layer と依存関係を持たせてみました。

schedule-layer.dig

_retry:
  limit: 3
require>: core-layer

core-layer.dig

+run:
  for_each>:
    v: [0, 1, 2]
  _do:
    +echo:
      echo>: ${v}
  
    +failed:
      fail>: "error here!"

しかし、このコードでは core-layer 側で retry 後に 新しい attempt が動くことはなく、のぞむ挙動にはなりませんでした。そのため、Require Operator のコードを確認してみました。Require Operator は下記の箇所で attempt を取得しています。

https://github.com/treasure-data/digdag/blob/v0.9.41/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java#L75-L81

この attempt を取得している startSession という関数を読み進めてみると、attempt を登録する箇所にたどり着きます。

https://github.com/treasure-data/digdag/blob/v0.9.41/digdag-core/src/main/java/io/digdag/core/agent/InProcessTaskCallbackApi.java#L176-L193

このコードは新規 attempt を登録するコードですが、既に登録済みである場合は SessionAttemptConflictException が発生し、登録済みの attempt が返されます。この登録済みを判定する条件は「実行しようとしている session に紐づく attempts に同一の attempt_name を含む attempt が存在しているか否か」です。今回の場合は attempt_name = retry_attempt_name なので、同一の retry_attempt_name のまま Require Operator を実行しても既に実行済みの attempt を取得してしまいます。 つまり、 毎回 Require Operator の引数である retry_attempt_name に異なる文字列を渡すことができれば、retry 後に異なる attempt が生成されて、core-layer を retry 後に異なる attempt で動かすことができそうです。

Require Operator の retry_attempt_name にランダム文字列を渡す

上記のことを踏まえて、Require Operator の retry_attempt_name にランダム文字列を渡して retry が正しく動くかを確認しました。

_retry: 
  limit: 3
require>: core-layer
retry_attempt_name: ${Math.random().toString(36).slice(-8)}

しかしながら、これでも schedule-layer 側で適切に retry を制御できる状態にはなりませんでした。 なんと core-layer が無限に実行される状態になってしまったのです。原因となっている箇所は下記の箇所です。

https://github.com/treasure-data/digdag/blob/v0.9.41/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java#L95

digdag は Operator 内部で TaskExecutionException が発生した際に retry_interval で設定された時間 sleep してから、Operator が再実行される仕組みになっています。

https://github.com/treasure-data/digdag/blob/v0.9.41/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java#L139-L159

Require Operator もその仕組みを用いて対象 attempt が終了したかどうかを polling をしています。そのため、Require Operator が polling している時には内部的に該当 Task の retry が発生します。digdag は変数を遅延評価するので、この retry 時に retry_attempt_name を毎回評価してしまいます。つまり retry のたびに ${Math.random().toString(36).slice(-8)} が評価されてしまっていたのです。この結果 attempt が無限に実行されてしまう挙動をしてしまいます。

retry_attempt_name をretry 後毎回評価し、静的な値として用いる

上記のことから retry_attempt_name を Require Operator に渡す前に、毎回ランダム文字列を評価し静的な値にする必要がありました。そこで用いたのは、 digdag-plugin-param です。

github.com

この plugin は実は civitaspo さんが以前紹介している digdag の plugin の一つです。

tech.gunosy.io

これを用いて、ランダム文字列を事前に毎回評価し静的にした上で retry_attempt_name に格納します。Require Operator 内部では、polling をする際に発生してしまう retry 時においても、retry_attempt_name が事前に静的になっているために、新しい attempt を生成することがありません。

完成したコード

これらの問題を回避したコードがこちらです。

schedule-layer.dig

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - pro.civitaspo:digdag-operator-param:0.0.2

_retry:
  limit: 3

+run:
  +eval:
    param_eval>: retry_attempt_name
    _export:
      retry_attempt_name: ${Math.random().toString(36).slice(-8)}
  +req:
    require>: core-layer

core-layer.dig

+run:
  for_each>:
    v: [0, 1, 2]
  _do:
    +echo:
      echo>: ${v}
  
    +failed:
      fail>: "error here!"

まとめ

digdag の 0.9.32 以降のバージョンでは、retry と for_each を用いる際にハマりどころがありました。そのワークアラウンドとしてそのハマりどころから逃れる方法としてWorkflow を二つに分割して、別 attempt で実行する方法を示しました。ただし、お察しの通りこの方法は分割したWorkflow同士の関係が見えづらいなど運用上の課題があるので根治対応が早くなされることを願っています。

DR&MLOps チームはデータの信頼性を担保するという重要な仕事をしています。こういう部署があるので、分析者や機械学習エンジニアが輝けるものと僕は信じています。このような仕事に興味のある方、もしくは分析や機械学習をやりたい方は是非とも来てください。

https://hrmos.co/pages/gunosy/jobs/0000204hrmos.co

https://hrmos.co/pages/gunosy/jobs/0000004hrmos.co