Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

稼働中データ基盤を安全に dbt 移行する仕組み

はじめに

DRE&MLOps チームの hyamamoto です。

この記事は Gunosy Advent Calendar 2023 の 5 日目の記事です。 4 日目の記事は m-hamashita さんの企業テックブログのレビューを GitHub 上でおこなっている話でした。

DRE&MLOps ではデータ基盤における変換処理を dbt に移行するプロジェクトを進めてきました。 移行作業に際して重要な項目として、以下の点が挙げられると思います。

  • 既存のデータ変換クエリを正確に dbt に移行できるか
  • 属人化せずにスムーズに移行作業を進められるか

この記事では、移行前の弊社におけるデータ変換処理の仕組みを紹介し、その上でどのようなアプローチによって dbt 移行作業を進めたかについて紹介します。 また、dbt については以前 dbt の仕組みに関する紹介記事を書いたのでよかったら参照してもらえると嬉しいです。

data.gunosy.io

移行前のデータ変換処理の仕組み

弊社のデータ基盤 Baikal の説明は以前この記事で紹介させていただきました。

tech.gunosy.io

上記の内容から dbt 移行に際して関係する主要な内容としては以下の内容が挙げられます。

まとめると、移行前の状態でもいわゆる ELT アーキテクチャを採用しており、その Transform 部分の実行を Digdag のプラグインを用いて Athena API を叩いて行なっているという状態です。 よって、dbt 移行の目的は以下の 2 点です。

  • Digdag workflow で記述していた SQL を dbt に移行する
    • Digdag workflow では workflow の変数を SQL に埋め込んでいた
  • Digdag workflow から dbt を呼び出し、オーケストレーションする

本記事では前者の Digdag workflow で記述していた SQL を dbt に移行する際の工夫について紹介します。 なお、ここで言及する SQL の移行とは、Digdag 特有の変数埋め込みの記法を dbt 向けに書き換えることや、モデル間の依存関係を refsource によって表現することを指します。

移行時の課題

SQL の移行に際してもっとも注意しなければならない点は、移行前後でデグレが発生してしまい変換ロジックが変わってしまうことです。 一方で、移行後のクエリをコンパイルして Redash などで実行し、既存テーブルとの結果を逐一比較することは非常に手間がかかり、属人化したり移行作業のスピードを落としたりする原因になります。

よって、この問題を解決するために CLI ツールの作成と GitHub Actions を用いた自動テストの仕組みを導入しました。

移行時デグレの検知の仕組み

CLI の整備

まず、dbt を CI で実行するにあたっての課題として、実行対象のモデルを指定する必要があるという点が挙げられます。 dbt の build 機能はデフォルトでは全てのモデルを実行するため、すでにデータ基盤自体は成熟していて大量にモデルを抱えたプロジェクトでは、CI の実行時間増加による開発者体験の悪化やコスト増加に繋がります。 そこで PR に含まれる変更を検知しその変更内容から dbt 実行時の引数に渡すパラメーターを生成する CLI ツールを作成しました。 実行内容の方向性は Terraform の plan と apply のようなものを想定しています。

変更内容の検知

変更内容の検知には dbt が出力する manifest.json を用いて行います。 GitHub Actions の全体像は後ほど示しますが、事前に S3 に manifest.json をアップロードしておき、その差分を検知することで変更内容を検知しています。 manifest.json のパースに関しては、以下のライブラリを用いています。

github.com

検知された差分の結果と Glue の API と比較を行うことで、dbt 上の差分が最終的に Glue 上にどのような影響を与えるかを確認しています。 さらに、削除したモデルのテーブル削除にもこの機能をベースに対応しています*1

パラメーターの生成

この CLI は最終的に json 形式で差分を出力します。 出力された json は後ほど dbt の CLI に渡しやすいように整形しており、jq を用いることで効率的に利用できるようにしています。 例えば、dbt build コマンドの --select option に渡すためのパラメーターをこの json に保持させています。

GitHub Actions の整備

それでは次に整備した GitHub Actions の内容について説明していきます。 大きく以下の 3 つのジョブを用意しています。

  • Continuous Integration (CI)
    • plan-catalogs: CLI による差分検知を実行するジョブ
    • audit-catalogs: stg 環境上*2*3にあるデータを用いて、PR 前後のモデルを比較するジョブ
  • Continuous Delivery (CD)
    • apply-catalogs: plan-catalogs の結果を受けて変更を反映するジョブ

なお、以下において今回作成した CLI を Baikal dbt Tools と呼称します。 また、dbt artifacts とは manifest.json を含む dbt の target フォルダ配下のデータを指します。

Continuous Integration (CI)

plan-catalogs

plan-catalogs は行った変更がどのような影響を及ぼすかを確認するためのジョブです。 また、その変更内容の差分の結果を json 化して S3 に保存します。

このジョブは以下のような流れで実行されます。

sequenceDiagram
    participant ga as GitHub Actions
    participant dbt as dbt
    participant tools as Baikal dbt Tools
    participant s3 as S3

    ga->>s3: dbt artifacts を取得
    s3-->> ga: dbt artifacts を返す
    ga->>dbt: dbt compile の実行
    dbt-->> ga: dbt artifacts を返す
    ga->>tools: S3 の artifacts と手元の artifacts を比較
    tools-->> ga: 差分を plan.json として出力
    ga->>ga: PR に変更差分をコメント
    ga->>s3: plan.json を S3 に保存
    s3-->> ga: plan.json を保存したことを返す

なお、対象とする S3 のバケットは PR のマージ先の環境を選択しています。 例えば、feature ブランチの PR の場合は dev 環境のバケットを参照し dbt artifacts を取得します。 PR のコメント結果の例としては下記のようなものです。

plan の PR コメント

audit-catalogs

audit-catalogs は feature branch に対して実行されるジョブで、stg 環境を用いて実際のデータを使ったテストを行います。

このジョブは以下のような流れで実行されます。

sequenceDiagram
    participant ga as GitHub Actions
    participant dbt as dbt
    participant tools as Baikal dbt Tools
    participant s3 as S3

    ga->>s3: dbt artifacts を取得
    s3-->> ga: dbt artifacts を返す
    ga->>dbt: dbt compile の実行
    dbt-->> ga: dbt artifacts を返す
    ga->>tools: S3 の artifacts と手元の artifacts を比較
    tools-->> ga: 差分を plan.json として出力

    note over ga: 以下の内容はすべて plan.json の差分によって決定される
    ga->>dbt: dbt build の実行<br/>(build には test も含まれる)
    dbt-->>ga: dbt build の結果を返す
    ga->>dbt: stg のテーブルと build の結果を比較
    dbt-->>ga: 比較結果を返す
    ga->>ga: PR に比較結果をコメント
    ga->>dbt: build の結果から profile を出力
    dbt-->>ga: profile を返す
    ga->>ga: PR に profile の結果をコメント

audit-catalogs は実質 plan-catalogs と後続の apply-catalogs の両方を実行しています。 これにより、実際のデータを使った動作確認や既存モデルとの比較、モデルの profile 情報の出力が可能になっています。 なお、差分検知や profile 情報の出力に関する詳しい内容については、audit で実行されていることの詳細 に後述します

Continuous Delivery (CD)

apply-catalogs

apply-catalogs では plan-catalogs で作成された plan.json を用いて、dbt コマンドを実行します。

このジョブは以下のような流れで実行されます。

sequenceDiagram
    participant ga as GitHub Actions
    participant dbt as dbt
    participant tools as Baikal dbt Tools
    participant s3 as S3

    ga->>s3: plan.json を取得
    s3-->> ga: plan.json を返す
    note over ga: 以下の内容はすべて plan.json の差分によって決定される
    ga->>tools: 削除されたモデル情報から Glue のテーブルを削除
    tools-->> ga: 削除したテーブルの情報を返す
    ga->>dbt: *View の変更に対してのみ* dbt build の実行
    dbt-->> ga: dbt build の結果を返す
    ga->>s3: S3 に artifacts を保存
    s3-->> ga: 保存したことを返す

このジョブによって以下のことが達成されます。

  • 削除検知によるテーブルの削除
  • View の変更を適用*4
  • S3 上の artifacts を更新

apply の実行結果の PR コメント

audit-catalogs で実行されていることの詳細仕様

ここでは、audit-catalogs で実行されていることの詳細について説明します。

audit-catalogs で実行されている処理

audit-catalogs では大きく以下の 3 つの内容を実行しています。

  • dbt build の実行
  • dbt-audit-helper の実行
  • dbt-profiler の実行

dbt build の実行

dbt builddbt compiledbt test を実行するコマンドです。 これにより、実際のデータを用いて CI のタイミングでモデルのテストを実行することが可能になります。

dbt-audit-helper の実行

dbt-audit-helper は 2 つのテーブルを比較し、その差分を出力するツールです。 このツールを実際の stg 環境のテーブルと dbt_sandbox 以下に作成された CI 向けの一時テーブルを比較することで、テーブルの差分を検知しています。 移行時にはこの結果を PR のコメントとして出力することで、作業者やレビューワーが変更によるデグレが発生していないかを確認することが可能になります。

⚠️ dbt-audit-helper は SQL を用いてレコードの差分を完全一致で比較を行います。

よって、浮動小数点誤差が起因で数値の差分が検知されることがあります。

dbt-audit-helper の実行結果の PR コメント

dbt-profiler の実行

dbt-profiler は dbt の profile を出力するツールです。 markdown のテーブル形式で各モデルのカラムの統計量を出力することが可能です。 この結果を PR のコメントや markdown のファイルとして書き出すことで、そのモデルの概要から掴むことが可能になります。 一方でカラムの多いモデルの場合は、統計量の出力が多くなってしまい情報としてノイズになっているのが現状の課題でもあります。

dbt-profiler の実行結果のコメント

テスト向けの一時テーブルを作成するための機構

上記の実行内容を踏まえた上で重要な点として、audit-catalogs では実際のテーブルを上書きするのではなく、テスト向けの一時テーブルを作成する必要があるという点が挙げられます。 というのも、単純に dbt build を実行した場合、実際のテーブルが更新されてしまい、意図せず共有環境を壊してしまったり、既存テーブルとの比較を行うことができなくなってしまったりするからです。

この問題を解決するために、以下の 2 つのテクニックを採用しています。

schema の差し替え

dbt には generate_schema_name というマクロが存在し、これに基づいて schema を差し替えることができます。 audit-catalogs を実行時には stg_ci という target を指定しています。

そこで、下記のように target に応じて schema を差し替えることで、出力先の schema を変更しています (dbt の schema は Glue の database に対応します)。

{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if target.name == "stg_ci" -%}

        dbt_sandbox

    {%- elif custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

これにより、audit-catlogs で実行される dbt build の結果は dbt_sandbox DB に主力され、実際のテーブルに対しての変更は行われません。

defer 機能の活用

schema の差し替えによって生じる問題として、対象のモデルが参照しているテーブルの schema も変更されてしまうということです。 例えば、下記のように bar モデルが foo モデルに依存していた場合、schema の差し替えによって foo モデルの参照先も dbt_sandbox に変更されてしまいます。

SELECT * FROM {{ ref('foo') }}
-- dbt compile 後の想定結果
-- SELECT * FROM dbt_sandbox.foo

しかしながら、元々存在していた foo モデルは dbt_sandbox に存在しないため、bar モデルのビルドに失敗してしまいます。

この問題を解決するために、defer 機能を活用しています。 詳細はリンク先に譲りますが、defer 機能を用い、stg 環境の artifacts を渡すことで、bar モデルのビルド時に依存先の foo モデルについては stg 環境に存在する実際のテーブルを参照するようになります。

SELECT * FROM {{ ref('foo') }}
-- `defer` を利用した際の dbt compile 後の想定結果
-- SELECT * FROM stg_db.foo

まとめ

本記事では、dbt 移行に際してどのような工夫を行なったかについて紹介しました。 すでにデータ基盤を作成していて、dbt に移行するのが難しいという方はぜひ本記事を参考にしてみてください。 結局のところ CLI の作成と CI/CD の整備を頑張るという方向性にはなってしまいますが、個人が手作業で行うよりも効率的に移行作業を進めることができると思います。 また、dbt の artifacts フォルダの中には dbt を実行する上で必要な情報が全て含まれているため、このフォルダを用いて様々なことができるということも覚えておいてもらえると嬉しいです。

明日は johnmanjiro さんの「tokio ベースの Rust 向け fluentd クライアントを作った話」です。 お楽しみに!

*1:テーブル削除の機能は dbt 自体はサポートしていません

*2:弊社の stg 環境では直近の本番同等相当のデータを用意しています

*3:弊社では直近の本番同等相当のデータのある stg 環境を用意しているものの、本番環境だけの場合でも今回の方法で同様の仕組みを導入することは可能です

*4:table や incremental モデルの変更は、別途 dbt をオーケストレーションしている Digdag 側で反映される