Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

Digdag の Plugin をたくさん作ったので紹介するよ

こちらは Gunosy Advent Calendar 2018、7日目の記事です。なお、昨日の記事は @yutanim さんの RxSwiftにおける孫からの祖父母孝行 でした。

qiita.com

はじめに

こんにちは、広告技術部の キヴィタスポ(読み方統一運動中) (@Civitaspo) | Twitter です。

Gunosy に入社してから早いもので1年が経ちました。昨年の Gunosy Advent Calendar では僕は読む専門だったのですが、『Gunosyのパーソナライズを支える技術 -ワークフロー編-』を読んで非常に感銘を受けたのを覚えています。

tech.gunosy.io

ここではそのとき感銘を受けた言葉を紹介しておきます。

ワークフローは、いわばシステム上における兵站といってもいいでしょう。「戦争のプロは兵站を語り、戦争の素人は戦略を語る」という名言もあるくらいです。

僕はこの言葉に強いバイブスを感じ、ただひたすらに自分の兵站を dig ってきました。

さて、この記事はそんなワークフローを管理してくれる Digdag に関する記事です。Digdag は Plugin 機構を持っており JVM 言語で Plugin を実装すれば機能拡張をすることができます。 しかしながら Digdag の Plugin 数は同じ Treasure Data が開発している Fluentd や Embulk に比べるとあまり多くありません*1。せっかく Plugin 機構があるのに Python などでゴリゴリ処理を書いてノウハウが世の中に溜まらない状況になっているのは勿体無いなと思い*2 Digdag Plugin をたくさん作って導入してみました。この記事では作った Plugin 達を淡々と紹介していこうと思います。

では始めていきましょう。

もくじ

広告技術部の Digdag 事情

淡々と紹介していきますと言いましたが、作った Plugin が使われている部分を把握するために僕たちの Digdag を取り巻く環境を簡単にお話しておこうと思います。

f:id:civitaspo:20181206135427p:plain

広告技術部では主に以下のような種類の workflow が動いています。

  • 広告配信最適化のため CTR/CVR を推定する機械学習 workflow
    • Spark on EMR で機械学習を行い、生成した model を s3 へ upload する
  • デモグラ情報を推定する機械学習 workflow
    • GPU や数百 GB のメモリを必要とする特殊なワークロードのため AWS Batch を利用している
  • 機械学習で用いる中間集計を生成する workflow
    • Athena か Spark on EMR で集計している
  • その他、広告配信に必要な細かな集計を行う workflow
    • Golang と Python で実装され、 embulk でデータ転送したりしている
    • ECS Task として実行している

要件がそれぞれ大きく異なるため様々な AWS Service を利用しているのが分かると思います。 この中で AWS Batch 以外の部分に関しては全て Plugin で動かしています。需要があれば AWS Batch もいずれ Plugin 化するかもしれません。

Digdag Plugin 達の紹介

それでは Plugin の紹介を始めていきます。

digdag-operator-emr_fleet

github.com

EMR を Instance Fleets を使って構築できる Plugin です。公式にも emr> operator*3 がありますが、digdag-operator-emr_fleet とは以下のような違いがあります。

  • Instance Fleets*4 を使って EMR 構築を行うことが出来る
  • AWS の認証で access_key_id/secret_access_key 以外の方法が使える
  • Instance Fleets を使用した EMR 構築において AWS がサポートしている全ての Option を設定することが出来る
  • Step 機能*5はサポートしていない

弊社ではバッチ処理で必要になったタイミングで EMR を構築し、処理終了後に破棄するため Instance Fleets という機能を使って EMR を利用しています。これは雑に言えば安く安全に EMR を構築できる機能で、Spot 価格の高騰や特定のインスタンスタイプを確保しにくい状況であっても、指定した複数インスタンスタイプの候補から構築可能なインスタンスタイプを使って必要容量を満たした EMR Cluster を構築してくれます。

EMR は構築時に大量の Option が必要で、かつ、古いバージョンのサポートや 通常の EMR 構築と Instance Fleets を使った EMR 構築を全て同じ API で行っているため、本当は必須だけど API 的には Optional になっていると言った罠がたくさんあります。この digdag-operator-emr_fleet では Instance Fleets を使った 5 系バージョンの EMR 構築にフォーカスし、必要な Option だけを定義できるようにしています。

また、後に紹介する AWS 系の Plugin も同じですが、起動時の config で許可設定が行われている場合に限り access_key_id/secret_access_key 以外の方法で AWS の認証を行うことができます。弊社ではセキュリティの観点から基本的に IAM ユーザーを発行しません。代わりに Instance Profile、ECS であれば Task Role を使って認証を行います。そのため、Digdag が持つ AWS 系の Plugin は利用できませんでした。*6 僕が作った AWS 系の Plugin では Instance Profile を含む Digdag server が建っているインスタンス内のリソースを使った認証を起動時の config で許可設定が行われている場合に限り、行うことが出来るようにしています。

Step 機能をサポートしていないのはこのあと書く digdag-operator-livy を使っているためです。*7

digdag-operator-livy

github.com

Apache Livy*8を操作する Plugin です。Apache Livy は簡単に言うと spark-submit を Rest API で Submit することが出来るミドルウェアです。JupyterHub や Zeppelin の裏側で使われるような Interactive な機能もありますが digdag-oprator-livy は Batch 文脈での利用を想定しています。

弊社で Apache Livy を使っている理由は以下の AWS の記事を見てもらうのが早そうです。

aws.amazon.com

EMR の Step 機能では Serial にしか処理を行うことができません。そのため処理によっては EMR の全てのリソースを使い切れない時間ができてしまいます。弊社ではそういったリソースの無駄遣いを避けるため Apache Livy を用いて Spark Job をまとめて同じクラスタで並列実行しています。

digdag-operator-athena

github.com

AWS Athena に Query を投げたり、 CTAS*9 を使って中間集計を作ったり出来る Plugin です。AWS Athena は CTAS がサポートされるまで CSV でしか S3 上にデータを置くことができませんでしたが、CTAS のサポートにより parquet でデータを置くことが出来るようになったため Spark や Hive といった EMR を必要とするミドルウェアや AWS Glue を使わずとも気軽に中間集計を行うことが出来るようになりました。

この Plugin のいいところは CTAS を使う場合でも SELECT 文を書くだけですむところです。普段あまり書かない CREATE TABLE 文を書く必要はなく、それに含まれる AWS Athena 独自の仕様などは Option としてサポートするようにしています。

加えて、table_mode 及び save_mode という2つの Option の設定によって、例えば「テーブルは必要なくてデータを作りたいだけなんだ」といったケースもデータだけを冪等に生成することができます。

digdag-operator-ecs_task

github.com

ECS Task を実行する Plugin です。この Plugin は ECS Task Definition の登録、及び ECS Task の実行を行うことができます。

この Plugin を作る以前の Digdag 運用では Digdag server 内で py> operator や embulk> operator などの Scripting Operator*10 が実行されていました。しかしながら、これらの Scripting Operator は Digdag server の内で実行されるため Digdag が利用するリソースの分析を難しくしていました。また、Scripting Operator によって実行される Task は Task 毎にワークロードが大きく異なるため、 Digdag server に割り当てるリソースを過剰に設定せざるを得ない状況でした。権限に関しても全ての Task が実行可能な権限を割り当てる必要があり管理しにくいといった課題もありました。

この Plugin ではそういった課題を解決するため Digdag の持つ Scripting Operator とほぼ同じ使用感で ECS Task が実行できるように実装しました。*11*12 以下のようにあらかじめ ECS Task に必要な設定を定義してあげることで、利用者は Scripting Operator と同じ感覚で使用することができます。注意点として利用する image は awscli が入っていることを前提としているのでその点のみご留意ください。

# ECS Task の設定を予め export しておく
_export:
  ecs_task:
    auth_method: instance
    tmp_storage:
      type: s3
      uri: s3://path/to/workspace/
    family_prefix: hello-
    cluster: sample_cluster
    network_mode: host
    memory: 1 GB
    task_role_arn: arn:aws:iam::999999999999:role/ecs-tasks.your-role

# Digdag の Task を記述するときは以下のようになる。
+step1:
  ecs_task.py>: path.to.sample_package.sample_class.sample_method
  image: civitaspo/python-awscli:latest

この Plugin を導入できたことで Digdag server 内で動く Scripting Operator を完全撤廃することができ*13 リソース分析や権限分割が捗るようになりました。

なお、Digdag 上で Scripting Operator が実行されると ECS Task が立ち上がるため ECS Cluster 内に大量の ECS Task が実行されることになります。これらを安定稼働させるためには ECS Cluster の設計が重要になってきます。この点に関しては弊社 VPoT が blog 書きたいみたいなので期待して待っていてください。

digdag-operator-param

github.com

最後に毛色の異なる Plugin を紹介しておきます。この Plugin はシンプルに param の store や reset が出来るようになる Plugin です。僕たちの Digdag 運用では Python などで workflow を記述することはせず、dig ファイルに記述するようにしています。そのため、変数の定義は _export で行わざるを得ない状況でした。しかしながら _export は定義された Task とその子タスクにしか伝播させることができないので、稀に workflow の記述に困ってしまうことがありました。この Plugin は Python API や Ruby API がサポートしている param の store と Operator を実装しないと使えない param の reset を行えるようにし、dig による workflow 記述の幅を広げることができました。*14

また、一部の変数が展開されないバグ*15 へ対応する param_eval> operator も同梱しています。

おわりに

この記事では作った Digdag Plugin を淡々と紹介しました。今回は紹介できませんでしたが Digdag 運用プラクティス的なものも溜まってきているので今後発信していければいいなと思っています。乞うご期待。

兵站

今日はついに dig ってきた兵站を披露する日ですね。大乱闘の中でも己の兵站を発揮し、ボコボコにスマッシュしてあげましょう。*16

補足

*1:ワークフローエンジンDigdagのまとめ @hiroysato san の crawling をくぐり抜けた Plugin はないはず…!

*2:他にもいろいろ理由はあったんですが割愛

*3:https://docs.digdag.io/operators/emr.html

*4:New – Amazon EMR Instance Fleets

*5:Submit Work to a Cluster

*6:おそらく Treasure Data では Digdag をサービスとして提供しているためサーバー内のいかなるリソースにもアクセスさせない思想なのだろうと推測しています

*7:実装したくないわけではないので p-r お待ちしています。

*8:Apache Livy

*9:Creating a Table from Query Results (CTAS)

*10:https://docs.digdag.io/operators/scripting.html

*11:Digdag には Scripting Operator の実行基盤を別のものに切り替える CommandExecutor という interface があるのですが、現在これは拡張できない仕様になっている(digdag#901)ため全て自前で実装することになりましたorz

*12:Task を docker で動かすことも検討しましたが ECS 上の Digdag container から別の docker container を立ち上げるのは非常にトリッキーな手法を使えば可能である一方、運用上のタブーを大量に犯す必要があったため断念しましたorz

*13:Digdag server 起動時の設定などで Scripting Operator を利用不可にする方法とかないかな…

*14:今は Param Operators があるので store に関してはコレを使うでも良かったかもしれない。

*15: digdag#862 enhancement label が付けられているのでバグではないのかもしれない。

*16:本日 12/7 は大乱闘スマッシュブラザーズ SPECIALの発売日です。