Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

tokio ベースの Rust 向け fluentd クライアントを作りました

この記事は Gunosy Advent Calendar 2023 の6日目の記事です。昨日の記事は hyamamoto さんの稼働中データ基盤を安全に dbt 移行する仕組みでした。

こんにちは。プロダクト開発部 Ads チームの johnmanjiro です。普段は広告配信の API や管理画面を作っています。

先日、趣味で Rust 向けの fluentd クライアントを作ったのでご紹介します*1

Rust での fluentd クライアントの状況と作成の経緯

本題に入る前に、Rust における fluentd クライアントの状況を簡単にご紹介します。

2023年12月現在、Rust には公式による fluentd クライアントは存在しません*2。 また、Crates.iofluent と検索すると多くのクレートがヒットしますが、そのほとんどが Fluent という別プロジェクトのものであり、fluentd とは関係がありません。

fluentdfluent log で検索することでいくつかの fluentd クライアントを見つけることができますが、いずれも下記のような懸念があります。

  • 多言語に存在する公式の fluentd クライアントと使用感が異なる
  • 非同期処理に対応していない

このような状況のため、非同期処理に対応していて公式の fluentd クライアントと似た感覚で使えるものが欲しいと思い、非同期処理で広く使われている tokio をベースとした fluentd クライアントを作成することにしました。

そうして作成したのが tokio-fluent です。

github.com

使い方

初期化

tokio-fluent では現在 TCP と Unix Domain Socket をサポートしており、それぞれ下記のように使用することができます。

// TCP
let client = Client::new_tcp(
    "127.0.0.1:24224".parse().unwrap(),
    &Config {..Default::default()}
)
.await
.unwrap();

// Unix Domain Socket
let client_unix = Client::new_unix(
    "/path/to/fluentd.sock",
    &Config {..Default::default()}
)
.await
.unwrap();

公式と違い TCP と Unix Domain Socket で関数が異なるのは、Config で利用する値を共通で利用するものに限定するためです。

たとえば Go 向けの fluent-logger-golang では Config に FluentNetwork というフィールドがあり、設定値によって TCP と Unix Domain Socket を切り替えています。 しかし、Config で設定すると TCP と Unix Domain Socket で利用する設定値が混在し複雑さが増すため*3、採用していません。

ログの送信

ログを送信する際は、クライアントの send メソッドを利用します。 第一引数がタグ、第二引数がログの内容を表すレコードです。

ここで利用している MapValuetokio_fluent::record モジュールに定義されているものです。

use std::collections::HashMap;

use tokio_fluent::record::{Map, Value};

let mut map = Map::new();
map.insert("age".to_string(), 22.into());
map.insert(
    "scores".to_string(),
    vec![80, 90]
        .into_iter()
        .map(|e| e.into())
        .collect::<Vec<_>>()
        .into(),
);
client.send("fluent.test", map).unwrap();

上記の例では新しく Map を定義していますが、record_map! マクロを利用することでより簡単に記述することもできます。

use std::collections::HashMap;

use tokio_fluent::record_map;

let map_from_macro = record_map!(
    "age".to_string() => 22.into(),
    "scores".to_string() => [80, 90].into_iter().map(|e| e.into()).collect::<Vec<_>>().into(),
);
client.send("fluent.test", map_from_macro).unwrap();

Map への変換が必要にはなりますが、送信用のメソッドにタグとレコードを渡すことでログを送信するというのは公式の fluentd クライアントと同じです。

サポートしているデータ型

tokio-fluent でサポートしているデータ型は下記の通りです。Value という enum で定義されています*4

  • Bool
  • Int
  • UInt
  • Float
  • Str
  • Object
  • Array

これらのデータ型は独自に定義されたものですが、それぞれが From トレイトを実装しているため、into() による標準のデータ型からの変換が可能です。

構成

ここからは、tokio-fluent がどのように動作しているかをご紹介します。

大きく分けて二つの構成要素があります。

  • Client
    • ユーザーが利用するクライアント
    • Worker にレコードを送信する
  • Worker
    • クライアントの初期化時に生成される
    • レコードを受け取り、forward プロトコルに従い fluentd サーバーに送信する

レコードの流れ

また、fluentd との通信のために forward プロトコルを実装しています。

Client

Client は先ほどから登場している、ユーザーが利用するクライアントです。 send メソッドを利用することで Worker に対してレコードを送信します。

実際に fluentd にログを送信する処理は Worker が行うため、ユーザーは fluentd サーバーへのログの送信を待つ必要がありません。

Worker

Worker は Client の初期化時に tokio::spawn によって生成され、無限ループで Client からレコードを受け取り fluentd サーバーに送信します。 Client が破棄されると同時に Worker も破棄されます。

forward プロトコル

fluentd にはいくつかのプラグインが存在しますが、特に fluentd サーバーに直接レコードを送信する場合には forward プラグインが使われます。

forward プラグインでは forward プロトコルを利用しており、このプロトコルを実装することで、自作のクライアントからも fluentd サーバーにログを送信することができます。

tokio-fluent は forward プロトコルの中でも Message Modes に対応しています。

Message Modes はレコードを一つずつ送信するモードであり、下記のようなデータを MessagePack でシリアライズしたものを送信します。

名前 説明
tag string レコードのタグ
time integer レコードのタイムスタンプ(UNIX時間もしくはEventTime*5
record hash レコードの内容
option hash オプション

たとえば、使い方の例で出てきたレコードは下記のようになります。

[
  "fluent.test",
  1701412937,
  {"age": 22, "scores": [80,90]},
  {"chunk": "OTViZmY1NmItMjAzYi00N2I3LWJlNTctMTE4ODU1M2M0ODYyCg=="}
]

上から順に tag, time, record, option に対応しています。

ここで option に対して chunk を指定していますが、これは128ビットのユニークな ID を base64 でエンコードしたものであり、 fluentd サーバーが正しくレコードを受け取ることができたかを判定するために必要になります。

forward プロトコルでは option に chunk を指定すると、fluentd サーバーから ack を受け取ることができます*6。この値は chunk で指定したものと同一であるため、レコードが正しく受け取られたかを判定することができます。

{"ack": "OTViZmY1NmItMjAzYi00N2I3LWJlNTctMTE4ODU1M2M0ODYyCg=="}

構成のまとめ

上記をまとめると、tokio-fluent での処理の流れは下記のようになります。

  1. Client の send メソッドが呼び出される
  2. Client が受け取ったデータを Message Modes に対応する形式にして Worker へ送信する
  3. Worker で MessagePack にシリアライズし、fluentd サーバーへ送信する
  4. レスポンスに問題がないか確認し、問題があればリトライする

終わりに

この記事では、tokio ベースの Rust 向け fluentd クライアントである tokio-fluent についてご紹介しました。

自分でクライアントを実装してみて、プロトコルの仕様など普段 fluentd を使っているだけではなかなか意識しないところを知ることができ、とても勉強になりました。 機能的にはまだまだ不足している部分もありますが、今後も改善していきたいと思います。

明日は石川さんの「オンライン実験を速く試すための基盤構築」です。お楽しみに!

*1:趣味なので社内導入などはされていません

*2:https://github.com/fluent

*3:たとえば TCP でしか利用せず Unix Domain Socketでは無視されるフィールドなどが生まれます

*4:https://docs.rs/tokio-fluent/latest/tokio_fluent/record/enum.Value.html

*5:EventTime に関しては割愛します。詳しくは wiki をご確認ください。https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format

*6:https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#response