Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

大規模データ基盤における冪等性を確保した dbt のオーケストレーション

はじめに

DRE&MLOps チームの hyamamoto です。

この記事は Gunosy Advent Calendar 2023 の 24 日目の記事です。 23 日目の記事は Liang さんのCircleCI + Android UI Test スクリーンショットの確認仕組みでした。

さて、本記事では大規模データ基盤における冪等性を確保した dbt のオーケストレーションについて紹介します。 今回 DRE&MLOps ではデータ基盤における変換処理を dbt に移行するプロジェクトを進めてきました。

tech.gunosy.io

上記の記事では dbt のクエリをデグレなく移行するための仕組みについて内容を深掘って紹介しました。 今回の記事では、弊社で生じた移行時のもう一つの課題である dbt 実行のオーケストレーションについて紹介します。

データ基盤におけるバッチのオーケストレーション

まず、弊社のデータ基盤の既存の仕組みについて簡単に紹介します。 より詳細な内容は先程上げた記事の移行前のデータ変換処理の仕組みにゆずりつつ、ここでは弊社のデータ基盤におけるバッチのオーケストレーション方法にフォーカスしてお伝えします。

弊社ではバッチのオーケストレーションを Digdag を使っていっています。 この Digdag サーバーは EKS 上に構築されており、一部の処理については k8s Job として Digdag から呼び出されています。

データ変換については基本的に 1 テーブル を 1 ワークフローの粒度で行っており、かなりこまごまと存在しています。 このような粒度で管理することは、ワークフローの実行スケジュールを柔軟に設定できること、エラーハンドリングを簡便に保つことや backfill を容易にすることを目的としています。

移行時の課題

以上の背景を踏まえて dbt に移行する際に生じた課題は以下の通りです。

  • Digdag workflow から dbt を実行する仕組みを用意する必要がある
    • 粒度の細かい実行のために、dbt に適切なパラメーターを渡す必要がある
  • backfill を実行できるように、実行時の冪等性を確保する必要がある

これらの問題を解決するために Helm を用いて k8s Job として dbt を実行する仕組みを用意しました。

Helm を活用した dbt 実行基盤の構築

それでは、Helm を用いて k8s Job として dbt を実行する仕組みについて詳しく紹介していきます。

Helm の採用理由

まず、なぜ Helm を採用したのかについて説明します。 Helm は k8s 上でアプリケーションをデプロイするためのツールです。 go template に基づき、柔軟に k8s のリソースを生成することができます。

この柔軟性が移行時の課題を解決するために非常に有効であると考えました。 というのも、様々な dbt CLI のパラメータを Digdag から外挿する必要があったためです。

Helm による dbt を実行する template の記述

上記の課題を解決するために、dbt を実行する k8s Job の container 部分は以下のように記述しました。 例からも分かるように --selects パラメータや --vars パラメータを外挿できる形になっています。 また、--selects パラメータを渡すときに入力の配列をループして展開できていることは、Helm の強みだと感じています。

# ...
containers:
  - name: dbt-job
    image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
    command:
      - "sh"
    args:
      - "-c"
      - |
        dbt build \
        {{- with .Values.dbtRunParameters.selects }}
            {{- range . }}
            {{- printf "--select %s \\" . | nindent 16 }}
            {{- end }}
        {{- end }}
        --vars {{ .Values.dbtRunParameters.vars | toJson | quote }}
    volumeMounts:
      - mountPath: /usr/app/dbt/.dbt
        name: profiles-volume
# ...

さらに dbt の profiles についても ConfigMap を用いることで柔軟性と管理のしやすさを両立させました。

apiVersion: v1
kind: ConfigMap
metadata:
  name: { { include "dbt-job.fullname" . } }
  labels: { { - include "dbt-job.labels" . | nindent 4 } }
data:
  profiles.yml: |
    {{- toYaml .Values.dbt.profiles | nindent 4 }}

Helm による k8s Job の実行

次に Helm による k8s Job の実行方法です。 Helm を利用されている方ならイメージが湧きやすいかもしれませんが、基本的に Helm は永続的なアプリケーションをデプロイするためのツールです。 そのため、Helm を用いてリリースされたものは Release という形で管理されます。

一方で今回の要件は永続的なアプリケーションではなく、一度実行したら終了するような k8s Job であり、そのためには Release として管理されると不都合が生じます。

この問題を解決するための helm apply を実行するのではなく、Helm によってレンダリングされた k8s のリソースを kubectl apply することで解決しました。

以下は Digdag 内で記述している dbt の実行部分を簡便化したものです。 一部 Digdag で利用できる JavaScript ベースの変数テンプレートを利用しています。

mkdir -p tmpdir/
# rendering values.yaml
cat << EOF > tmpdir/values.yaml
dbt:
  profiles:
    project_name:
      target: ${env_short}
      config:
        debug: ${typeof(dbt_debug) == 'undefined' ? false:dbt_debug}
        log_format: ${typeof(dbt_log_format) == 'undefined' ? 'json':dbt_log_format}
  dbtRunParameters:
    selects: ${selects}
    vars: ${vars}
EOF

# login to helm registry
aws ecr get-login-password --region ap-northeast-1 | helm registry login --username AWS --password-stdin ${ecr_registry}

# rendering downloaded manifests
helm pull --destination tmpdir/ --version X.Y.Z --untar --untardir . oci://${ecr_registry}/dbt-job
helm template tmpdir/dbt-job -f tmpdir/values.yaml > tmpdir/manifests.yaml

# run kubernetes job
kubectl apply -f tmpdir/manifests.yaml

このようなスクリプトを Digdag 内から呼び出すことで、Helm Release ではなく単なる k8s Job として dbt を実行することができます。 さらに、values.yaml をワークフロー内で生成することで、Helm の柔軟性を活かして実行時に必要なパラメータ外装することも可能になります。

実行時の冪等性を確保する dbt の工夫

最後に、実行時の冪等性を確保するための dbt の工夫について紹介します。 弊社ではログなどの大規模なデータの変換には incremental model を活用しています。 incremental model の特徴として、時間に応じて部分的なデータを更新するというものがあります。

dbt のよくあるチュートリアルでは incremental model に対して current_time を基準にしてデータを取得しているものが多く見られます。 しかしながら、current_time に依存してしまうと冪等な結果を得ることができず、エラー発生時などに過去のデータを取り直すということが難しくなってしまいます。

そこで、Digdag の cron が発火した論理的な時間である session_time を dbt に外挿できるようにする仕組みを設けることで、冪等性を確保しました。 例えば、2023-12-01 00:00:00 に cron が発火した workflow の session_time は、実行された時間に依らず(例えば、後ほど workflow のリトライをしても変わらず) 2023-12-01 00:00:00 になります。

session_time の外挿としては --vars パラメータを利用しており、次のようなマクロを用意しました。

{%- macro digdag_session_time_jst() -%}
    at_timezone(from_iso8601_timestamp('{{ var("digdag_session_time") }}'), 'Asia/Tokyo')
{%- endmacro -%}

このマクロに基づいてデータ変換対象の時間軸を絞り込むことで、Digdag のワークフローとして冪等なデータ変換を実現することができました。

さいごに

以上のように、Helm を用いて k8s Job として dbt を実行する仕組みを用意しました。 今回は弊社の元々の技術スタックが Digdag であったため、Digdag の機能にフォーカスした面も多かったですが、Helm による k8s Job の実行は汎用的に利用できると考えています。 また、dbt の実行に対する冪等性の確保についても、他のワークフロー管理ツールでも応用できると考えています。 今回ご紹介した内容が皆様のデータ基盤の運用において参考になれば幸いです。

最終日は koid さんの記事です! お楽しみに!