はじめに
DRE&MLOps チームの hyamamoto です。
この記事は Gunosy Advent Calendar 2023 の 5 日目の記事です。 4 日目の記事は m-hamashita さんの企業テックブログのレビューを GitHub 上でおこなっている話でした。
DRE&MLOps ではデータ基盤における変換処理を dbt に移行するプロジェクトを進めてきました。 移行作業に際して重要な項目として、以下の点が挙げられると思います。
- 既存のデータ変換クエリを正確に dbt に移行できるか
- 属人化せずにスムーズに移行作業を進められるか
この記事では、移行前の弊社におけるデータ変換処理の仕組みを紹介し、その上でどのようなアプローチによって dbt 移行作業を進めたかについて紹介します。 また、dbt については以前 dbt の仕組みに関する紹介記事を書いたのでよかったら参照してもらえると嬉しいです。
移行前のデータ変換処理の仕組み
弊社のデータ基盤 Baikal の説明は以前この記事で紹介させていただきました。
上記の内容から dbt 移行に際して関係する主要な内容としては以下の内容が挙げられます。
- 各種バッチを Digdag によって管理している
- digdag-operator-athena plugin を用いて Athena API を叩いてデータ変換を行なっている
まとめると、移行前の状態でもいわゆる ELT アーキテクチャを採用しており、その Transform 部分の実行を Digdag のプラグインを用いて Athena API を叩いて行なっているという状態です。 よって、dbt 移行の目的は以下の 2 点です。
- Digdag workflow で記述していた SQL を dbt に移行する
- Digdag workflow では workflow の変数を SQL に埋め込んでいた
- Digdag workflow から dbt を呼び出し、オーケストレーションする
本記事では前者の Digdag workflow で記述していた SQL を dbt に移行する際の工夫について紹介します。
なお、ここで言及する SQL の移行とは、Digdag 特有の変数埋め込みの記法を dbt 向けに書き換えることや、モデル間の依存関係を ref
や source
によって表現することを指します。
移行時の課題
SQL の移行に際してもっとも注意しなければならない点は、移行前後でデグレが発生してしまい変換ロジックが変わってしまうことです。 一方で、移行後のクエリをコンパイルして Redash などで実行し、既存テーブルとの結果を逐一比較することは非常に手間がかかり、属人化したり移行作業のスピードを落としたりする原因になります。
よって、この問題を解決するために CLI ツールの作成と GitHub Actions を用いた自動テストの仕組みを導入しました。
移行時デグレの検知の仕組み
CLI の整備
まず、dbt を CI で実行するにあたっての課題として、実行対象のモデルを指定する必要があるという点が挙げられます。 dbt の build 機能はデフォルトでは全てのモデルを実行するため、すでにデータ基盤自体は成熟していて大量にモデルを抱えたプロジェクトでは、CI の実行時間増加による開発者体験の悪化やコスト増加に繋がります。 そこで PR に含まれる変更を検知しその変更内容から dbt 実行時の引数に渡すパラメーターを生成する CLI ツールを作成しました。 実行内容の方向性は Terraform の plan と apply のようなものを想定しています。
変更内容の検知
変更内容の検知には dbt が出力する manifest.json
を用いて行います。
GitHub Actions の全体像は後ほど示しますが、事前に S3 に manifest.json
をアップロードしておき、その差分を検知することで変更内容を検知しています。
manifest.json
のパースに関しては、以下のライブラリを用いています。
検知された差分の結果と Glue の API と比較を行うことで、dbt 上の差分が最終的に Glue 上にどのような影響を与えるかを確認しています。 さらに、削除したモデルのテーブル削除にもこの機能をベースに対応しています*1。
パラメーターの生成
この CLI は最終的に json 形式で差分を出力します。
出力された json は後ほど dbt の CLI に渡しやすいように整形しており、jq
を用いることで効率的に利用できるようにしています。
例えば、dbt build
コマンドの --select
option に渡すためのパラメーターをこの json に保持させています。
GitHub Actions の整備
それでは次に整備した GitHub Actions の内容について説明していきます。 大きく以下の 3 つのジョブを用意しています。
- Continuous Integration (CI)
- Continuous Delivery (CD)
- apply-catalogs: plan-catalogs の結果を受けて変更を反映するジョブ
なお、以下において今回作成した CLI を Baikal dbt Tools と呼称します。
また、dbt artifacts とは manifest.json
を含む dbt の target フォルダ配下のデータを指します。
Continuous Integration (CI)
plan-catalogs
plan-catalogs は行った変更がどのような影響を及ぼすかを確認するためのジョブです。 また、その変更内容の差分の結果を json 化して S3 に保存します。
このジョブは以下のような流れで実行されます。
sequenceDiagram participant ga as GitHub Actions participant dbt as dbt participant tools as Baikal dbt Tools participant s3 as S3 ga->>s3: dbt artifacts を取得 s3-->> ga: dbt artifacts を返す ga->>dbt: dbt compile の実行 dbt-->> ga: dbt artifacts を返す ga->>tools: S3 の artifacts と手元の artifacts を比較 tools-->> ga: 差分を plan.json として出力 ga->>ga: PR に変更差分をコメント ga->>s3: plan.json を S3 に保存 s3-->> ga: plan.json を保存したことを返す
なお、対象とする S3 のバケットは PR のマージ先の環境を選択しています。 例えば、feature ブランチの PR の場合は dev 環境のバケットを参照し dbt artifacts を取得します。 PR のコメント結果の例としては下記のようなものです。
audit-catalogs
audit-catalogs は feature branch に対して実行されるジョブで、stg 環境を用いて実際のデータを使ったテストを行います。
このジョブは以下のような流れで実行されます。
sequenceDiagram participant ga as GitHub Actions participant dbt as dbt participant tools as Baikal dbt Tools participant s3 as S3 ga->>s3: dbt artifacts を取得 s3-->> ga: dbt artifacts を返す ga->>dbt: dbt compile の実行 dbt-->> ga: dbt artifacts を返す ga->>tools: S3 の artifacts と手元の artifacts を比較 tools-->> ga: 差分を plan.json として出力 note over ga: 以下の内容はすべて plan.json の差分によって決定される ga->>dbt: dbt build の実行<br/>(build には test も含まれる) dbt-->>ga: dbt build の結果を返す ga->>dbt: stg のテーブルと build の結果を比較 dbt-->>ga: 比較結果を返す ga->>ga: PR に比較結果をコメント ga->>dbt: build の結果から profile を出力 dbt-->>ga: profile を返す ga->>ga: PR に profile の結果をコメント
audit-catalogs は実質 plan-catalogs と後続の apply-catalogs の両方を実行しています。 これにより、実際のデータを使った動作確認や既存モデルとの比較、モデルの profile 情報の出力が可能になっています。 なお、差分検知や profile 情報の出力に関する詳しい内容については、audit で実行されていることの詳細 に後述します
Continuous Delivery (CD)
apply-catalogs
apply-catalogs では plan-catalogs で作成された plan.json を用いて、dbt コマンドを実行します。
このジョブは以下のような流れで実行されます。
sequenceDiagram participant ga as GitHub Actions participant dbt as dbt participant tools as Baikal dbt Tools participant s3 as S3 ga->>s3: plan.json を取得 s3-->> ga: plan.json を返す note over ga: 以下の内容はすべて plan.json の差分によって決定される ga->>tools: 削除されたモデル情報から Glue のテーブルを削除 tools-->> ga: 削除したテーブルの情報を返す ga->>dbt: *View の変更に対してのみ* dbt build の実行 dbt-->> ga: dbt build の結果を返す ga->>s3: S3 に artifacts を保存 s3-->> ga: 保存したことを返す
このジョブによって以下のことが達成されます。
- 削除検知によるテーブルの削除
- View の変更を適用*4
- S3 上の artifacts を更新
audit-catalogs で実行されていることの詳細仕様
ここでは、audit-catalogs で実行されていることの詳細について説明します。
audit-catalogs で実行されている処理
audit-catalogs では大きく以下の 3 つの内容を実行しています。
dbt build
の実行dbt-audit-helper
の実行dbt-profiler
の実行
dbt build
の実行
dbt build
は dbt compile
と dbt test
を実行するコマンドです。
これにより、実際のデータを用いて CI のタイミングでモデルのテストを実行することが可能になります。
dbt-audit-helper の実行
dbt-audit-helper は 2 つのテーブルを比較し、その差分を出力するツールです。
このツールを実際の stg 環境のテーブルと dbt_sandbox
以下に作成された CI 向けの一時テーブルを比較することで、テーブルの差分を検知しています。
移行時にはこの結果を PR のコメントとして出力することで、作業者やレビューワーが変更によるデグレが発生していないかを確認することが可能になります。
⚠️ dbt-audit-helper は SQL を用いてレコードの差分を完全一致で比較を行います。
よって、浮動小数点誤差が起因で数値の差分が検知されることがあります。
dbt-profiler の実行
dbt-profiler は dbt の profile を出力するツールです。 markdown のテーブル形式で各モデルのカラムの統計量を出力することが可能です。 この結果を PR のコメントや markdown のファイルとして書き出すことで、そのモデルの概要から掴むことが可能になります。 一方でカラムの多いモデルの場合は、統計量の出力が多くなってしまい情報としてノイズになっているのが現状の課題でもあります。
テスト向けの一時テーブルを作成するための機構
上記の実行内容を踏まえた上で重要な点として、audit-catalogs では実際のテーブルを上書きするのではなく、テスト向けの一時テーブルを作成する必要があるという点が挙げられます。
というのも、単純に dbt build
を実行した場合、実際のテーブルが更新されてしまい、意図せず共有環境を壊してしまったり、既存テーブルとの比較を行うことができなくなってしまったりするからです。
この問題を解決するために、以下の 2 つのテクニックを採用しています。
schema の差し替え
dbt には generate_schema_name
というマクロが存在し、これに基づいて schema を差し替えることができます。
audit-catalogs
を実行時には stg_ci
という target
を指定しています。
そこで、下記のように target
に応じて schema を差し替えることで、出力先の schema を変更しています (dbt の schema は Glue の database に対応します)。
{% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if target.name == "stg_ci" -%} dbt_sandbox {%- elif custom_schema_name is none -%} {{ default_schema }} {%- else -%} {{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %}
これにより、audit-catlogs
で実行される dbt build
の結果は dbt_sandbox
DB に主力され、実際のテーブルに対しての変更は行われません。
defer
機能の活用
schema の差し替えによって生じる問題として、対象のモデルが参照しているテーブルの schema も変更されてしまうということです。
例えば、下記のように bar
モデルが foo
モデルに依存していた場合、schema の差し替えによって foo
モデルの参照先も dbt_sandbox
に変更されてしまいます。
SELECT * FROM {{ ref('foo') }} -- dbt compile 後の想定結果 -- SELECT * FROM dbt_sandbox.foo
しかしながら、元々存在していた foo
モデルは dbt_sandbox
に存在しないため、bar
モデルのビルドに失敗してしまいます。
この問題を解決するために、defer
機能を活用しています。
詳細はリンク先に譲りますが、defer
機能を用い、stg
環境の artifacts を渡すことで、bar
モデルのビルド時に依存先の foo
モデルについては stg
環境に存在する実際のテーブルを参照するようになります。
SELECT * FROM {{ ref('foo') }} -- `defer` を利用した際の dbt compile 後の想定結果 -- SELECT * FROM stg_db.foo
まとめ
本記事では、dbt 移行に際してどのような工夫を行なったかについて紹介しました。 すでにデータ基盤を作成していて、dbt に移行するのが難しいという方はぜひ本記事を参考にしてみてください。 結局のところ CLI の作成と CI/CD の整備を頑張るという方向性にはなってしまいますが、個人が手作業で行うよりも効率的に移行作業を進めることができると思います。 また、dbt の artifacts フォルダの中には dbt を実行する上で必要な情報が全て含まれているため、このフォルダを用いて様々なことができるということも覚えておいてもらえると嬉しいです。
明日は johnmanjiro さんの「tokio ベースの Rust 向け fluentd クライアントを作った話」です。 お楽しみに!