はじめに
こんにちは。DR & MLOps グループの阿部です。Embulkの達人 @civitaspo 師匠の下、Embulk プラグイン作りに入門しました。 今回作ったのは embulk-filter-unnest といって、JSONとして表現されるArrayをフラット化(展開)するものです。 例えば以下のようデータがあるとします。 Column B は JSON なカラムです。
Column | Column B -----------+------------------------- hoge1 | ["aaa1", "bbb1", "ccc1"] hoge2 | ["aaa2", "bbb2", "ccc2"] hoge3 | ["aaa3", "bbb3", "ccc3"]
これを本プラグインを使うと以下のようにできます。
Column A | Column B ---------+--------- hoge1 | aaa1 hoge1 | bbb1 hoge1 | ccc1 hoge2 | aaa2 hoge2 | bbb2 hoge2 | ccc2 hoge3 | aaa3 hoge3 | bbb3 hoge3 | ccc3
本記事ではプラグインを作った背景と作り方、利用方法などを紹介したいと思います。
背景
社内で開発されているAPIの一つに、以下の例のように複数のユーザーIDを受け付けてそれぞれのユーザーIDのユーザー属性を返却するものがあります。
Request:
https://hoge-api/users?ids=100,101,102
Response:
[ {"user_id": 100, "attribute": "hoge"}, {"user_id": 101, "attribute": "fuga"}, {"user_id": 102, "attribute": "piyo"}, ]
ちなみに、複数のユーザーIDを受け付けるのはAPIへのリクエスト回数を減らすことでスループットを向上させるためです。 このAPIを利用して、以下のようなワークフローを作ることになりました。
- 対象ユーザーID抽出&グループ化
- APIリクエスト
- レスポンスをunnest
- 結果をファイルとして保存
データ基盤では Athena + Digdag + Embulk が使われています。これらを使ってワークフローを構築します。 ワークフローは大まかに、このように分けました。 DigdagのAthenaオペレータを利用して対象ユーザーの抽出とグループ化し、S3へ保存 Embulkを使ってユーザー属性を紐付け、S3へ保存
まず、1番目の部分はこんな感じで作りました。
athena.ctas>: | with grouped_users as ( select (row_number() over () - 1) / 100 as group_id, user_id from users where ... -- 絞り込み ) select array_join(array_agg(user_id), ',') as user_ids from grouped_users group by group_id database: workspace location: s3://bucket/destination/ format: json compression: GZIP table_mode: data_only save_mode: overwrite
このクエリでは対象ユーザーIDを抽出して100個ずつグループ化しています。 結果は s3://bucket/destination/ に保存されます。
次に2番目の部分です。 Embulkのconfigの中身は、先程作ったID一覧をS3から読み出し、ruby_procプラグインを使用して行ごとにAPIコールし、Rubyコードで unnest をしてから保存しています。 実際にはシェルオペレータを用いて Embulkのconfigファイルを生成し、実行します。
in: type: s3 bucket: bucket path_prefix: destination/ decoders: - type: commons-compress format: gz parser: charset: UTF-8 type: json columns: - {name: user_ids, type: json} filters: - type: ruby_proc requires: - uri - net/http - json pages: - proc: | ->(records) do records.flat_map do |r| uri = URI('hoge-api/users') http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true uri.query = URI.encode_www_form({ ids: r['user_ids'], }) req = Net::HTTP::Get.new(uri) res = JSON.parse(http.request(req).body) res = res.map { |row| {'user_ids': row.to_json } # ここが unnest の部分 } end end - type: rename columns: user_ids: response_json out: type: s3_parquet bucket: ... path_prefix: ...
ご覧の通り、Embulkの中のRubyコードが若干複雑です。もう少し簡潔に書きたいですね (procを複数にしてもう少し効率化できそう)。 中でも unnest をする部分は他にも応用できそうと考えて、プラグイン化することにしました。
使ってみた
arrayの部分をフラット化するプラグインを作りました。
json_column_name というパラメータで対象のカラム名を指定します。
filters: - type: unnest json_column_name: user_ids
これを使うと、先程のEmbulkの設定ファイルは以下のようになります。
in: type: s3 bucket: bucket path_prefix: destination/ decoders: - type: commons-compress format: gz parser: charset: UTF-8 type: json columns: - {name: user_ids, type: json} filters: - type: ruby_proc requires: - uri - net/http - json pages: - proc: | ->(records) do records.flat_map do |r| uri = URI('https://hoge-api/users') http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true uri.query = URI.encode_www_form({ ids: r['user_ids'], }) req = Net::HTTP::Get.new(uri) res = {'user_ids': JSON.parse(http.request(req).body)} end end - type: unnest json_column_name: user_ids - type: rename columns: user_ids: response_json
おわかりいただけただろうか?
res = res.map { |row| {'user_ids': row.to_json } # ここが unnest の部分 }
これが
res = {'user_ids': JSON.parse(http.request(req).body)}
こうなりました! そして、ruby_procのあとに unnest filter を追加しています。
- type: unnest json_column_name: user_ids
コード量が若干シンプルになりましたね。(あんまりなってない?)
作り方
Embulkのプラグインは以下の手順で作れます。 ここではOSSプロダクトを作るにあたって、やっておいた方が良さそうなこと (ライセンスとかCHANGELOGとか) も一緒に記載しておきました。
- embulk new コマンドでひな形を作成
- embulk-gradle-plugins を使うようにする
- ライセンスを確認しておく
- CIの設定 (今回は GitHub Actions を利用)
- 実装する
- テストを書く
- READMEを書く
- CHANGELOGを作成
- リリース
- RubyGemsに登録
- https://plugins.embulk.org/ に登録してもらう
以下、詳細を書いていきます。
embulk new コマンドでひな形を作成
embulk にはプラグインのひな型を作るコマンドがあり、それを使います。 今回は filter タイプなので、
$ embulk new java-filter unnest
としました。コマンドの詳細は以下のページにあります。
https://www.embulk.org/docs/customization.html
embulk-gradle-plugins を使うようにする
2020年5月時点では embulk-gradle-plugins の利用が推奨されているようです。 そのためこのページを参考にしつつ、READMEにしたがってbuild.gradleファイルの書き換え等を行いました。
https://qiita.com/hiroysato/items/a2fe0862c141fa66338b
https://github.com/embulk/gradle-embulk-plugins
ライセンスを確認しておく
embulk new コマンドを利用するとLICENSE.txtが作成されます。デフォルトではMITライセンスになっているようです。
CIの設定 (今回は GitHub Actions を利用)
GitHub にプッシュしたときにテストを走らせるのと、コードスタイルのチェックが走るように設定しました。
https://github.com/y-abe/embulk-filter-unnest/blob/master/.github/workflows/test.yml
実装する
自動生成された UnnestFilterPlugin.java というファイルを編集します。
まずは、必要な設定項目 (今回でいうとjson_column_name) を追加します。
次にtransactionの中を編集していきます。 ここではスキーマの設定や設定項目のバリデーションをします。 こうしておくとConfigにエラーがあった場合、データソースにアクセスするまえに処理が失敗してくれます。
https://github.com/y-abe/embulk-filter-unnest/pull/8#discussion_r429050879
ここで出力されるスキーマを定義しています。今回は入力スキーマと出力スキーマは変わらないので、こうしておきます。
そしてopenの中を編集します。FilteredPageOutput は PageOutput インターフェースを実装したクラスで、別のファイル (FilteredPageOutput.java) に切り出しています。 addの中で処理を書いていきますが、Embulk には ColumnVisitor というインターフェースが用意されており、これを ColumnVisitorImpl として実装するようにしました。こちらは ColumnVisitorImpl.java というファイルにしています。
unnestしたいカラム (targetColumn) とそれ以外のカラムとで分けて処理されるようにしており、FilteredPageOutputのaddの中では
- targetではないカラムを予め追加しておく (39行目)
- targetカラムを unnest しつつ追加 (44行目)
- その内容をレコードとして追加 (45行目) という流れです。
テストを書く
embulk java-filter plugin のテストの書き方 - Qiita qiita.com
この記事を参考にしつつ書いていきました。
今回書いたテストケースは以下のものです。
- Config の読み込みが正常にできるか
- スキーマが意図通りか
- inputに対してoutputが意図通りか
READMEを書く
雛形に沿って書いていきます。
リリース
build.gradle のここのバージョンを設定してGitHubでリリースを出します。
RubyGemsに登録
gradlew gemPush コマンドで登録できます。
https://plugins.embulk.org/ に登録してもらう
こうするとプラグインを検索してもらいやすくなるでしょう。
ここにプルリクを出します。(まだやってないけど)
まとめ
embulk-filter-unnestを使うことにより少しコード量を減らすことができました。 しかしながら、コードの大部分を占めるHTTPリクエストする部分はまだ複雑です。この部分もゆくゆくは embulk-filter-http としてプラグインを書こうかなと思っています。ゆくゆくはね。unnestプラグインほど手軽にはいかなさそうで、覚悟が必要です。。