こんにちは。宿泊開発チームの菊地です!
このエントリは 一休.com Advent Calendar 2023 12日目の記事です。昨日は id:rotom によるSlack Enterprise Grid における情報バリアの設計でした。その他の素敵なエントリも以下のリンクからご覧ください。
私はEmbulkを使って、各プロダクトの請求データを集約する機能を担当しました。今回は、Embulkの紹介とふりかえりをしていきたいと思います!
背景
一休では、これまでプロダクト毎に請求書発行機能が実装されていました。私のチームでは、2023年10月に施行されたインボイス制度*1の対応として、全プロダクトの請求書を適格請求書形式に改修することになりました。
今後法律が改正されることも考慮して、既存の実装を個別に修正するのではなく、請求書マイクロサービスに一元化して各プロダクトから利用するという方針を立てました。
課題
一休ではプロダクト毎に個別のDBを持っています。プロダクトによって採用しているDBMSも様々です。全プロダクトの請求書を発行するためには、個別に管理されているデータを統合する必要がありました。また社内向けに、複数DBをまたいだ情報を書き出したCSVの発行が求められていました。
これらの要件を満たすため、複数のデータソースからデータを集約して別のデータソースに出力する手法を検討しました。
解決策
この課題を解決するために、Embulkを使って各プロダクトのDBからデータを集約することにしました。
Embulkとは?
Embulk is an Open-source Pluggable Bulk Data Loader to/from varieties of storages, file formats, databases, cloud services, and else.
Embulkはあるデータソースからデータを吸い出し、別のデータソースへ転送するためのETLツールです。また、Pluggable
とあるように、Embulk本体は基本的な処理順序(inputプラグインを実行し、filterプラグインを実行し、outputプラグインを実行する)のみを制御しており、利用者は個々のユースケースに合わせたプラグイン*2の組み合わせで処理を実現します。
簡単に動かしてみたい方は、embulkのコマンドでquick startが提供されていますので、試してみてください*3。
embulk example {dir}
今回の課題に対してEmbulkがマッチした理由
Embulkでは、プラグインを組み合わせることで複数データソースをまたいだ操作が簡単に記述できます。今回の要件では、次の2つの操作ができることが非常に強力でした。
union: 複数のデータソースを連結する
unionプラグインを使うことで、複数のDBからのデータ取得処理を書くことができます。また、一休ではプロダクト毎にPostgresやSQL Serverなどの異なるDBMSを使っているため、適切なinputプラグインが異なります。union プラグインはソースとなる input もまたプラガブルになっており、任意の input プラグインを組み合わせられる自由度の高さも非常にありがたかったです。
config.ymlの記述例
in: type: union union: - name: product_hoge in: type: sqlserver url: product_hoge_jdbc_url user: product_hoge_db_user password: product_hoge_db_pwd query: | SELECT hoge_id AS common_id, amount, tax_fee FROM product_hoge_table filters: - type: column add_columns: - { name: product_code, type: string, default: "hoge" } - name: product_fuga in: type: postgresql host: product_fuga_db_host port: product_fuga_db_port user: product_fuga_db_user password: product_fuga_db_pwd database: product_fuga_db_name query: | SELECT fuga_id AS common_id, charge AS amount, tax_fee FROM product_fuga_table filters: - type: column add_columns: - { name: product_code, type: string, default: "fuga" } out: type: postgresql host: common_db_host user: common_db_user port: common_db_port password: common_db_pwd database: common_db_name table: common_table mode: merge merge_rule: [ "product_code = S.product_code", "id = S.common_id", "amount = S.amount", "tax_fee = S.tax_fee" ]
lookup: 複数のデータソースを結合する
csv_lookupプラグインを使うことで、DBから取得した情報に対し、CSV のデータを SQL の left join のような形で結合できます。このプラグインでデータベースと CSV を結合した帳票を得ることができました。処理自体も非常に軽量で、例えば、6,000件のDBレコードに対し18,000行のCSVをlookupしたCSVを発行するジョブは平均5分18秒で実行できました*4。
config.ymlの記述例
exec: min_output_tasks: 1 in: type: sqlserver url: hoge_db_jdbc_url user: hoge_db_user password: hoge_db_pwd query: | SELECT hoge_key, hoge_col_1, hoge_col_2 FROM hoge_table filters: - type: csv_lookup mapping_from: - hoge_key mapping_to: - fuga_key new_columns: - { name: fuga_col_1, type: string } - { name: fuga_col_2, type: string } path_of_lookup_file: "ref/fuga.csv" out: type: file path_prefix: ./out file_ext: csv formatter: type: csv header_line: true charset: UTF-8
ふりかえり
Embulkを導入し、予定通りにインボイス対応を完了することができました!実際に使ってみて得た知見をまとめます。
とくに良かったこと
config.ymlの取り回しのよさが開発スピードをあげてくれた
Embulkでデータ移送のジョブを6個、CSV発行のジョブを12個担当しましたが、慣れてからは1日1ジョブのペースで開発を進めることができました。Embulkはconfig.ymlにテンプレートにしたがってSQLやプラグインの実行を記述していくだけで、非常に取り回しがよかったのが開発速度を後押ししてくれました。
config.yml.liquidのサポート
Embulkではconfig.ymlへの変数埋め込みのために、Liquidテンプレートをサポートしています*5。たとえばunionプラグインを使ったconfig.ymlの記述例では、DB接続文字列を指定しています。
type: union union: - name: product_hoge in: type: sqlserver url: product_hoge_jdbc_url user: product_hoge_db_user password: product_hoge_db_pwd query: | SELECT col_1, col_2, col_3 FROM product_hoge_table
しかし、実際にはDB接続文字列はリポジトリ管理すべき情報ではありませんし、構築環境ごとに専用DBに接続したいものです。そのため、Liquidテンプレートを使い、環境変数から以下のように接続文字列を読み込む実装にしました。
type: union union: - name: product_hoge in: type: sqlserver url: {{ env.HOGE_JDBC_URL }} user: {{ env.HOGE_DB_USER }} password: {{ env.HOGE_DB_PWD }} query: | SELECT col_1, col_2, col_3 FROM product_hoge_table
注意したほうがいいこと
任意のクエリでlookupしたいときは、CSVを一度経由する必要がある
先ほど、複数のデータソースを結合したCSVの生成に csv_lookupプラグインを紹介しました。プラグイン一覧から、lookup 先にDB テーブルを直接参照できる{db}_lookup
プラグインが提供されていることにお気づきの方もいるでしょう。
これらの{db}_lookup
プラグインは設定ファイルで指定したテーブルとカラムから自動で lookup する仕組みになっていて、任意のクエリ(SELECT 文)は指定できません。*6。そのため、内部表をサブクエリにしたいケースではこのプラグインが利用できないことに注意が必要です。今回は回避策として、内部表の部分をローカルにCSV出力するEmbulkジョブを実行し、出力したCSVに対してcsv_lookupプラグインでlookupすることにしました。
GCSへのCSVアップロードプラグインにはstorage.objects.listが必要
社内向けにCSVを公開するため、Google Cloud Storageへ保存したいという要件がありました。当初はgcsプラグインを利用して、Embulk内部でGCSへのアップロードまで実行しようと考えていましたが、実装してみると次のエラーが発生してしまいました。
Caused by: java.lang.RuntimeException: org.embulk.config.ConfigException: org.embulk.util.retryhelper.RetryGiveupException: com.google.cloud.storage.StorageException: {my_service_account}@{my_project}.iam.gserviceaccount.com does not have storage.objects.list access to the Google Cloud Storage bucket. Permission 'storage.objects.list' denied on resource (or it may not exist).
ライブラリの内部実装を確認したところ、バケットの存在確認のためにObject Listを取得するようになっていたためでした*7。今回は、なるべく最小のPermissionsに絞ったサービスアカウントを利用したかったため、ローカルに出力したCSVをgsutilでアップロードするスクリプトを組みました。
まとめ
ここまで読んでいただきありがとうございました!使ってみて、EmbulkはPluginが豊富でとても強力なツールであることがわかりました!ETLや複数データソースをまたいだCSV生成を行う際には導入を検討してはいかがでしょうか。
小ネタ:Embulkのメンテナンス体制が新しくなったとのこと!(2023年3月)
EmbulkはFluentdの開発者である古橋氏によって2015年に公開されました*8。その後、氏が創設者であるTreasure Data社によって運用や設計の改善が行われてきましたが、2023年3月からは、社に限定せず広くコアチームを結成し設計検討を行っていく方針が発表されました*9。その経緯については、Treasure Data社のTech Talk2022の発表資料にてより詳しくまとめられています。
イベント資料|TreasureData Tech Talk 2022 - TECH PLAY[テックプレイ]
業務としてのOSS開発のアンビバレンスなど、かなり実情に即した部分まで言及されており示唆に富んだ発表資料でした。私はOSS開発の経験はありませんが、事業会社でエンジニアリングを行ううえでビジネス優先度は常に考慮すべき観点ですので、非常に考えさせられました。
あらためて、OSSメンテナの皆様、いつもありがとうございます!
さいごに
一休では、ともに試行錯誤しながらよいサービスを作ってくれる仲間を募集しています!
カジュアル面談も実施していますので、ぜひお気軽にご連絡ください
*2: List of Embulk Plugins by Category
*3: EmbulkでMySQLに大量データを投入してみる - その1 #MySQL - Qiita
*4:このケースはSQL自体がやや重かったり、後述の理由からEmbulkを内部で多重実行していますので本来はもっと早いと思います
*6: GitHub - InfoObjects/embulk-filter-mssql_lookup
*7: embulk-output-gcs/src/main/java/org/embulk/output/gcs/GcsAuthentication.java at 427f9fdc677885a7467606393f6a343ceda2c4c9 · embulk/embulk-output-gcs · GitHub]