Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

更新できるデータレイクを作る 〜Apache Hudiを用いたユーザデータ基盤の刷新〜

広告技術部のUT@mocyutoです。 こちらの記事はGunosy Advent Calendar 2021の4日目の記事です。

昨日は内田さんの その設定、pyproject.tomlに全部書けます - Gunosyデータ分析ブログ でした

今回はApache Hudiを用いたユーザデータ基盤の刷新を紹介します。

背景

Gunosyの広告システムではユーザに対して最適な広告を届けるために、接触済みのユーザに対して何度も同じ広告を出さないようにする仕組みを提供しています。 例えば、すでにある広告Aをクリックしたユーザには広告Aは再度配信しないのような設定です。

仕組み

この仕組みを実現するためには以下のようなアーキテクチャになっていました。

  • 各ユーザの最終アクションの日付を全てRedisに保持
  • ユーザから広告のリクエストごとにそのユーザがすでにアクション済みかRedisを確認
  • OKであればその広告を出す

課題

上記の仕組みでは以下のような複数の問題を抱えていました。

  • 全てのユーザのアクションをRedisに保持することが必要
  • 複数アクション保持が必要
    • 例えば、インストール、クリックなど
    • 対象アクションを増やす = Redisの容量が指数的に増える
  • Redisアクセスが多く、APIへの負荷が高い
    • 大量の広告候補に対してそれぞれ該当のアクションをしているかを取得する
    • 広告候補N個、対象アクションM個に対して Redisから取得する値はN*M個*1
    • アクション数=負荷なのでスケールコスト大

という課題をはらんでおり、気軽に新規アクションを追加できない状態でした。

対応策

データの持ち方を変える

Redisに全てのユーザアクションデータをいれることでスケールしなくなっているので、Redisにはユーザが該当のアクション(例えば広告を1ヶ月以内にクリック)をしたかどうかのみをいれます。

Redisの値設計

ただこれを実現するには前段のバッチ処理などでユーザがアクションをしたかどうかを判定する必要があり、 何日以内、何ヶ月以内という判定を実施するには、各ユーザの最終アクセス日時をどこかで保持する必要があります。 大量のログから毎回「何日以内にアクションしたユーザ」をAthenaなどで抽出しているとクラウド破産してしまうからです。

ただ単純に「ログデータからユーザでgroup byした最終アクション履歴を定期バッチで作成する」では、非効率な処理でもあります。 たとえば毎時AthenaのCTASで抽出している場合、毎時ごとにユーザのアクション履歴のファイルが作成されます。 ユーザは複数回、同一アクションしていると考えられるので各ファイルでユーザの重複が大量に発生しえます。

今回そんな課題を解決してくれたのがUberが開発していた Apache Hudi です。 Apache HudiはS3などデータレイクのファイルを更新、削除ができるデータフォーマットなのです。

似たようなものではApache Sparkを作っているDatabricksが開発しているDelta LakeやNetflixが開発していたApache Icebergがあります。*2

今回Apache Hudiを利用した理由としては 、Glue Data Catalogへのsyncを一緒にやってくれるというのと、AWS Athenaの読み込みの公式サポートが発表されたからでした。

Athena を使用した Apache Hudi データセットのクエリ - Amazon Athena

Apache Hudiとは

Apache HudiとはUberがOSSとして開発したデータレイクをDBのように扱えるデータフォーマットです。 Hoodie Keyと呼ばれるPrimaryKey(レコードキー + パーティションキー)をキーとしてupdateやdeleteを実行することができます。

データフォーマットといっていますが、実際のファイルはparquetファイルとメタデータファイルで構成されています。 Hudiはバッチ処理によらずApache Flinkによるストリーム処理での書き込みにも耐えうる設計となっています。

Hudiには2つのテーブル設計パターンがあり、読み込みパフォーマンスに適したCopy On Writeモードと書き込み性能に適した Merge On Readモードがあります。

  • Copy On Write: parquetファイルのみを出力とするシンプルなモードで書き込み時に毎回parquetファイルを作り直す
  • Merge On Read: 列フォーマットのparquetファイルと、updateされた行フォーマットのavroファイルで格納されます。コンパクションが別途実行され 前述の parquetファイルとavroファイルはマージされ新しいparquetファイルとして作成されます。

Hudiが吐くMergeOnRead時のファイルはこんな感じ

Hudiの仕組みを理解するためにまず時系列の主要アクションを紹介します。

  • commits: データをテーブル(ファイル)に書き込む操作
  • delta_commit: 差分ファイルのみ書き込む操作。Merge On Readモードのときのみ。
  • cleans: 古いバージョンの使わなくなったファイルを削除する操作
  • compaction: バックグラウンドで実行される行フォーマットの差分ファイルを列フォーマットファイルにマージする操作

以下図を見てもらうとupsert (update or insert) が5分おきに実行された場合、その度にcommitが走ります。(Merge On Readの場合は delta_commit)

Hudiの仕組み

図は Overview | Apache Hudi から引用しています。

commitが実行された際にcommit回数に応じてcompactionやcleansがバックグラウンドで実行されます。 compactionやcleansの頻度や実行トリガーは設定で変更することができます。 compactionやcleansが実行される理由としては、読み込みの速度をスローダウンさせないためです。

Hudiのデータレイクにデータをためていくとどんどんファイル数が増えていきますが、差分ファイルを全部読んでいては読み込みがどんどん遅くなるためcompactionを実行します。 また、ディレクトリ内に大量のファイルがあるとファイルリスト取得にも時間がかかってしまうので、cleansによる削除も実行しています。

Hudi側が自動でcompactionやcleansをやってくれるので特に意識しなくてよく運用がとても楽です。

構成

Glue + PySpark

Hudiを使ったシステムはすでに本番稼働しているので、構成を紹介します。

Apache HudiはSparkかFlinkから書き込みをサポートしています。(Athenaでの書き込みも早く来てほしいです!) ですので、マネージドSparkであるGlue Jobを利用してApache Hudiを扱います。 構成としてはPySparkをjobの環境としてセットアップ、SparkSQLでjobを記述しdigdagでワークフロー管理を行っています。

構成図

私はScalaが好きですが、SparkSQLを使う場合は特に言語による差がほぼないのと、AWS Glue Studio上では直接Pythonのコードを編集できるのでPySparkを利用しました。

AWS Glueの設定で --enable-glue-datacatalog *3というオプションを設定すると、SparkSQLから直接Glue Catalogのテーブルを読み込めるのでAthenaのようにSQLを書くだけでjob実行が可能です。 SparkSQLでは通常のSQLだけでなくWith句も使うことができるので、AthenaでSQLを書いていた人であれば理解も容易なので、Dataframeを使って記述するよりおすすめです。

以下、SQLを用いてHudiデータを出力するPythonコードです。

import sys

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job

// GlueのStartJob時に渡す引数群
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "bucket_name",
        "s3_path",
        "database_name",
        "table_name",
        "target_time",  # ex. 2021091811
        "env",
    ],
)

# Hudi用のSerializer
conf = SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
glueContext = GlueContext(SparkContext(conf=conf))
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

bucketName = args["bucket_name"]
path = args["s3_path"]
databaseName = args["database_name"]
tableName = args["table_name"]
basePath = f"s3://{bucketName}/{path}"
targetTime = args["target_time"]
env = args["env"]

hudiOptions = {
    "hoodie.table.name": tableName,
    # 書き込みオプション
    "hoodie.datasource.write.storage.type": "MERGE_ON_READ", 
    "hoodie.compact.inline": "true",  # 「true」にすると、INSERT/UPDATE/DELETE後に自動でコンパクションが実行される
    "hoodie.compact.inline.max.delta.commits": 10,
    "hoodie.datasource.write.recordkey.field": "user_id",  # レコードキーのカラム名
    "hoodie.datasource.write.table.name": tableName, 
    "hoodie.datasource.write.operation": "upsert",  # 書き込み操作種別
    "hoodie.datasource.write.precombine.field": "last_date",  # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
    "hoodie.upsert.shuffle.parallelism": 2,  # upsert時の並列数
    # Glue Catalogへの書き込み設定
    "hoodie.datasource.hive_sync.enable": "true",  # GlueDataCatalogとの連携を有効にする
    "hoodie.datasource.hive_sync.database": databaseName,  # GlueCatalogでのデータベース名
    "hoodie.datasource.hive_sync.table": tableName,  # GlueCatalogでのテーブル名
    "hoodie.datasource.hive_sync.use_jdbc": "false", 
}

# db.action_logsはGlue DataCatalogで管理されているテーブル
df = spark.sql(
    f"""
SELECT
  user_id
  , date_format(MAX(to_timestamp(dt, 'yMMddHH') + INTERVAL '9' HOUR), 'y-MM-dd') AS last_date
FROM
  db.action_logs
WHERE
  f"to_timestamp(dt, 'yMMddHH') = to_timestamp('{targetTime}', 'yMMddHH')"
GROUP BY
  1
"""
)

df.write.format("hudi").options(**hudiOptions).mode("append").save(basePath)

job.commit()

Hudiのオプションはこちらを参考にさせてもらいました

dev.classmethod.jp

複数パーティションを指定する場合は、以下オプションを設定します

"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", # パーティションが複数の場合のクラス
"hoodie.datasource.write.partitionpath.field": "fieldA,fieldB",  # パーティション対象のカラム名
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", # パーティションが複数の場合のクラス
"hoodie.datasource.hive_sync.partition_fields": "fieldA,fieldB",  # パーティションのフィールド名

他にもHudiのConfigリストは大量にあるので、利用する場合は眺めるとよいでしょう。

hudi.apache.org

上記Glue JobでHudiフォーマットのユーザの最終アクションが抽出できます。

Athenaによる抽出

Apache Hudiによるユーザアクション情報を作成すれば、あとはAthenaから対象ユーザを抽出してRedisへ投入するのみです。

弊社はDigdag から GoのバイナリでRedisにデータをいれるパターンをよく利用しているので、Athenaの抽出結果をGoで読み込みRedisに投入しています。 この際のAthenaクエリですが、すでにparquetであるのでスキャン量も抑えられ、最終アクセス月などでパーティションを切っておけば、対象期間の枝刈りもできます。

Redis自体のコストに比べるとAthenaのスキャンコストは大幅に安く抑えられ、かつRedisの値の持ち方もシンプルになり、APIの負荷も抑えることができます。

移行し終えて

移行した結果、Redisのサイズは3分の1になり、GlueJob自体もRedisと比較して安く抑えられているので、大幅なコスト削減に繋がりました。 また、そもそもの課題であったスケールできないという問題も解決しました またApache Hudi自体も自動でcompactionなどを実行してくれるので、複雑の運用スクリプトを作ることなく移行できたのも良かった点です。

またHudiの次のバージョンでは Kafkaの対応やdbtへの対応なども入り、開発が活発です。

更新できるデータフォーマットはApache IcebergやLakeFormationなどAWSが本格的に対応を始めており、これから一気に裾野が広がると思うので、検討してみてはいかがでしょうか?

*1:MGETで取っているのでRedisコマンドの発行回数ではなく取得するvalueの個数

*2:このブログを書いている最中にre:InventにてAthenaによるApache Icebergの書き込みサポートとLakeFormationによるACID Transactionサポートが発表されました。

*3:弊社はLakeFormationを利用しており、LakeFormation経由でアクセスする場合、Athenaと違ってLakeFormationの権限とは別に読み込むテーブルのS3への読み込み権限を付与してあげる必要がありました。(EMRでも同じ問題が。。)