Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

embulk-filter-unnest を作った

はじめに

こんにちは。DR & MLOps グループの阿部です。Embulkの達人 @civitaspo 師匠の下、Embulk プラグイン作りに入門しました。 今回作ったのは embulk-filter-unnest といって、JSONとして表現されるArrayをフラット化(展開)するものです。 例えば以下のようデータがあるとします。 Column B は JSON なカラムです。

Column     | Column B
-----------+-------------------------
hoge1      | ["aaa1", "bbb1", "ccc1"]
hoge2      | ["aaa2", "bbb2", "ccc2"]
hoge3      | ["aaa3", "bbb3", "ccc3"]

これを本プラグインを使うと以下のようにできます。

Column A | Column B
---------+---------
hoge1    | aaa1
hoge1    | bbb1
hoge1    | ccc1
hoge2    | aaa2
hoge2    | bbb2
hoge2    | ccc2
hoge3    | aaa3
hoge3    | bbb3
hoge3    | ccc3

本記事ではプラグインを作った背景と作り方、利用方法などを紹介したいと思います。

背景

社内で開発されているAPIの一つに、以下の例のように複数のユーザーIDを受け付けてそれぞれのユーザーIDのユーザー属性を返却するものがあります。

Request:

https://hoge-api/users?ids=100,101,102

Response:

[
  {"user_id": 100, "attribute": "hoge"},
  {"user_id": 101, "attribute": "fuga"},
  {"user_id": 102, "attribute": "piyo"},
]

ちなみに、複数のユーザーIDを受け付けるのはAPIへのリクエスト回数を減らすことでスループットを向上させるためです。 このAPIを利用して、以下のようなワークフローを作ることになりました。

  1. 対象ユーザーID抽出&グループ化
  2. APIリクエスト
  3. レスポンスをunnest
  4. 結果をファイルとして保存

データ基盤では Athena + Digdag + Embulk が使われています。これらを使ってワークフローを構築します。 ワークフローは大まかに、このように分けました。 DigdagのAthenaオペレータを利用して対象ユーザーの抽出とグループ化し、S3へ保存 Embulkを使ってユーザー属性を紐付け、S3へ保存

まず、1番目の部分はこんな感じで作りました。

athena.ctas>: |
  with grouped_users as (
    select
      (row_number() over () - 1) / 100  as group_id,
      user_id
    from
      users
    where ...  -- 絞り込み
  )
  select
    array_join(array_agg(user_id), ',') as user_ids
  from
    grouped_users
  group by
    group_id
database: workspace
location: s3://bucket/destination/
format: json
compression: GZIP
table_mode: data_only
save_mode: overwrite

このクエリでは対象ユーザーIDを抽出して100個ずつグループ化しています。 結果は s3://bucket/destination/ に保存されます。

次に2番目の部分です。 Embulkのconfigの中身は、先程作ったID一覧をS3から読み出し、ruby_procプラグインを使用して行ごとにAPIコールし、Rubyコードで unnest をしてから保存しています。 実際にはシェルオペレータを用いて Embulkのconfigファイルを生成し、実行します。

in:
  type: s3
  bucket: bucket
  path_prefix: destination/
  decoders:
    - type: commons-compress
      format: gz
  parser:
    charset: UTF-8
    type: json
    columns:
    - {name: user_ids, type: json}
filters:
  - type: ruby_proc
    requires:
      - uri
      - net/http
      - json
    pages:
      - proc: |
          ->(records) do
            records.flat_map do |r|
              uri = URI('hoge-api/users')
              http = Net::HTTP.new(uri.host, uri.port)
              http.use_ssl = true
              uri.query = URI.encode_www_form({
                ids: r['user_ids'],
              })
              req = Net::HTTP::Get.new(uri)
              res = JSON.parse(http.request(req).body)
              res = res.map { |row|
                  {'user_ids': row.to_json }  # ここが unnest の部分
              }
            end
          end
  - type: rename
    columns:
      user_ids: response_json
out:
  type: s3_parquet
  bucket: ...
  path_prefix: ...

ご覧の通り、Embulkの中のRubyコードが若干複雑です。もう少し簡潔に書きたいですね (procを複数にしてもう少し効率化できそう)。 中でも unnest をする部分は他にも応用できそうと考えて、プラグイン化することにしました。

使ってみた

arrayの部分をフラット化するプラグインを作りました。

github.com

json_column_name というパラメータで対象のカラム名を指定します。

filters:
  - type: unnest
    json_column_name: user_ids

これを使うと、先程のEmbulkの設定ファイルは以下のようになります。

in:
  type: s3
  bucket: bucket
  path_prefix: destination/
  decoders:
    - type: commons-compress
      format: gz
  parser:
    charset: UTF-8
    type: json
    columns:
    - {name: user_ids, type: json}
filters:
  - type: ruby_proc
    requires:
      - uri
      - net/http
      - json
    pages:
      - proc: |
          ->(records) do
            records.flat_map do |r|
              uri = URI('https://hoge-api/users')
              http = Net::HTTP.new(uri.host, uri.port)
              http.use_ssl = true
              uri.query = URI.encode_www_form({
                ids: r['user_ids'],
              })
              req = Net::HTTP::Get.new(uri)
              res = {'user_ids': JSON.parse(http.request(req).body)}
            end
          end
  - type: unnest
    json_column_name: user_ids
  - type: rename
    columns:
      user_ids: response_json

おわかりいただけただろうか?

res = res.map { |row|
  {'user_ids': row.to_json }  # ここが unnest の部分
}

これが

res = {'user_ids': JSON.parse(http.request(req).body)}

こうなりました! そして、ruby_procのあとに unnest filter を追加しています。

- type: unnest
    json_column_name: user_ids

コード量が若干シンプルになりましたね。(あんまりなってない?)

作り方

Embulkのプラグインは以下の手順で作れます。 ここではOSSプロダクトを作るにあたって、やっておいた方が良さそうなこと (ライセンスとかCHANGELOGとか) も一緒に記載しておきました。

  1. embulk new コマンドでひな形を作成
  2. embulk-gradle-plugins を使うようにする
  3. ライセンスを確認しておく
  4. CIの設定 (今回は GitHub Actions を利用)
  5. 実装する
  6. テストを書く
  7. READMEを書く
  8. CHANGELOGを作成
  9. リリース
  10. RubyGemsに登録
  11. https://plugins.embulk.org/ に登録してもらう

以下、詳細を書いていきます。

embulk new コマンドでひな形を作成

embulk にはプラグインのひな型を作るコマンドがあり、それを使います。 今回は filter タイプなので、

$ embulk new java-filter unnest

としました。コマンドの詳細は以下のページにあります。

https://www.embulk.org/docs/customization.html

embulk-gradle-plugins を使うようにする

2020年5月時点では embulk-gradle-plugins の利用が推奨されているようです。 そのためこのページを参考にしつつ、READMEにしたがってbuild.gradleファイルの書き換え等を行いました。

https://qiita.com/hiroysato/items/a2fe0862c141fa66338b

https://github.com/embulk/gradle-embulk-plugins

ライセンスを確認しておく

embulk new コマンドを利用するとLICENSE.txtが作成されます。デフォルトではMITライセンスになっているようです。

CIの設定 (今回は GitHub Actions を利用)

GitHub にプッシュしたときにテストを走らせるのと、コードスタイルのチェックが走るように設定しました。

https://github.com/y-abe/embulk-filter-unnest/blob/master/.github/workflows/test.yml

実装する

自動生成された UnnestFilterPlugin.java というファイルを編集します。

まずは、必要な設定項目 (今回でいうとjson_column_name) を追加します。

https://github.com/y-abe/embulk-filter-unnest/blob/82a5a8147dc88f2e392c180a9f3b0999bbafe2c1/src/main/java/org/embulk/filter/unnest/UnnestFilterPlugin.java#L15-L16

次にtransactionの中を編集していきます。 ここではスキーマの設定や設定項目のバリデーションをします。 こうしておくとConfigにエラーがあった場合、データソースにアクセスするまえに処理が失敗してくれます。

https://github.com/y-abe/embulk-filter-unnest/pull/8#discussion_r429050879

ここで出力されるスキーマを定義しています。今回は入力スキーマと出力スキーマは変わらないので、こうしておきます。

https://github.com/y-abe/embulk-filter-unnest/blob/82a5a8147dc88f2e392c180a9f3b0999bbafe2c1/src/main/java/org/embulk/filter/unnest/UnnestFilterPlugin.java#L27

そしてopenの中を編集します。FilteredPageOutput は PageOutput インターフェースを実装したクラスで、別のファイル (FilteredPageOutput.java) に切り出しています。 addの中で処理を書いていきますが、Embulk には ColumnVisitor というインターフェースが用意されており、これを ColumnVisitorImpl として実装するようにしました。こちらは ColumnVisitorImpl.java というファイルにしています。

https://github.com/y-abe/embulk-filter-unnest/blob/82a5a8147dc88f2e392c180a9f3b0999bbafe2c1/src/main/java/org/embulk/filter/unnest/ColumnVisitorImpl.java

unnestしたいカラム (targetColumn) とそれ以外のカラムとで分けて処理されるようにしており、FilteredPageOutputのaddの中では

https://github.com/y-abe/embulk-filter-unnest/blob/82a5a8147dc88f2e392c180a9f3b0999bbafe2c1/src/main/java/org/embulk/filter/unnest/FilteredPageOutput.java

  1. targetではないカラムを予め追加しておく (39行目)
  2. targetカラムを unnest しつつ追加 (44行目)
  3. その内容をレコードとして追加 (45行目) という流れです。
テストを書く

embulk java-filter plugin のテストの書き方 - Qiita qiita.com

この記事を参考にしつつ書いていきました。

今回書いたテストケースは以下のものです。

  • Config の読み込みが正常にできるか
  • スキーマが意図通りか
  • inputに対してoutputが意図通りか
READMEを書く

雛形に沿って書いていきます。

リリース

https://github.com/y-abe/embulk-filter-unnest/blob/82a5a8147dc88f2e392c180a9f3b0999bbafe2c1/build.gradle#L19

build.gradle のここのバージョンを設定してGitHubでリリースを出します。

RubyGemsに登録

gradlew gemPush コマンドで登録できます。

https://plugins.embulk.org/ に登録してもらう

こうするとプラグインを検索してもらいやすくなるでしょう。

github.com

ここにプルリクを出します。(まだやってないけど)

まとめ

embulk-filter-unnestを使うことにより少しコード量を減らすことができました。 しかしながら、コードの大部分を占めるHTTPリクエストする部分はまだ複雑です。この部分もゆくゆくは embulk-filter-http としてプラグインを書こうかなと思っています。ゆくゆくはね。unnestプラグインほど手軽にはいかなさそうで、覚悟が必要です。。