一休.com Developers Blog

一休のエンジニア、デザイナー、ディレクターが情報を発信していきます

請求書発行のためにEmbulkを使って爆速でデータを集約した話

こんにちは。宿泊開発チームの菊地です!

このエントリは 一休.com Advent Calendar 2023 12日目の記事です。昨日は id:rotom によるSlack Enterprise Grid における情報バリアの設計でした。その他の素敵なエントリも以下のリンクからご覧ください。

qiita.com

私はEmbulkを使って、各プロダクトの請求データを集約する機能を担当しました。今回は、Embulkの紹介とふりかえりをしていきたいと思います!

背景

一休では、これまでプロダクト毎に請求書発行機能が実装されていました。私のチームでは、2023年10月に施行されたインボイス制度*1の対応として、全プロダクトの請求書を適格請求書形式に改修することになりました。

今後法律が改正されることも考慮して、既存の実装を個別に修正するのではなく、請求書マイクロサービスに一元化して各プロダクトから利用するという方針を立てました。

課題

一休ではプロダクト毎に個別のDBを持っています。プロダクトによって採用しているDBMSも様々です。全プロダクトの請求書を発行するためには、個別に管理されているデータを統合する必要がありました。また社内向けに、複数DBをまたいだ情報を書き出したCSVの発行が求められていました。

これらの要件を満たすため、複数のデータソースからデータを集約して別のデータソースに出力する手法を検討しました。

解決策

この課題を解決するために、Embulkを使って各プロダクトのDBからデータを集約することにしました。

Embulkとは?

www.embulk.org

Embulk is an Open-source Pluggable Bulk Data Loader to/from varieties of storages, file formats, databases, cloud services, and else.

Embulkはあるデータソースからデータを吸い出し、別のデータソースへ転送するためのETLツールです。また、Pluggableとあるように、Embulk本体は基本的な処理順序(inputプラグインを実行し、filterプラグインを実行し、outputプラグインを実行する)のみを制御しており、利用者は個々のユースケースに合わせたプラグイン*2の組み合わせで処理を実現します。

簡単に動かしてみたい方は、embulkのコマンドでquick startが提供されていますので、試してみてください*3

embulk example {dir}

今回の課題に対してEmbulkがマッチした理由

Embulkでは、プラグインを組み合わせることで複数データソースをまたいだ操作が簡単に記述できます。今回の要件では、次の2つの操作ができることが非常に強力でした。

union: 複数のデータソースを連結する

unionプラグインを使うことで、複数のDBからのデータ取得処理を書くことができます。また、一休ではプロダクト毎にPostgresやSQL Serverなどの異なるDBMSを使っているため、適切なinputプラグインが異なります。union プラグインはソースとなる input もまたプラガブルになっており、任意の input プラグインを組み合わせられる自由度の高さも非常にありがたかったです。

config.ymlの記述例

in:
  type: union
  union:
    - name: product_hoge
      in:
        type: sqlserver
        url: product_hoge_jdbc_url
        user: product_hoge_db_user
        password: product_hoge_db_pwd
        query: |
          SELECT
            hoge_id AS common_id,
            amount,
            tax_fee
          FROM
            product_hoge_table
      filters:
        - type: column
          add_columns:
          - { name: product_code, type: string, default: "hoge" }

    - name: product_fuga
      in:
        type: postgresql
        host: product_fuga_db_host
        port: product_fuga_db_port
        user: product_fuga_db_user
        password: product_fuga_db_pwd
        database: product_fuga_db_name
        query: |
          SELECT
            fuga_id AS common_id,
            charge AS amount,
            tax_fee
          FROM
            product_fuga_table
      filters:
        - type: column
          add_columns:
          - { name: product_code, type: string, default: "fuga" }

out:
  type: postgresql
  host: common_db_host
  user: common_db_user
  port: common_db_port
  password: common_db_pwd
  database: common_db_name

  table: common_table
  mode: merge
  merge_rule: [
    "product_code = S.product_code",
    "id = S.common_id",
    "amount = S.amount",
    "tax_fee = S.tax_fee"
  ]

lookup: 複数のデータソースを結合する

csv_lookupプラグインを使うことで、DBから取得した情報に対し、CSV のデータを SQL の left join のような形で結合できます。このプラグインでデータベースと CSV を結合した帳票を得ることができました。処理自体も非常に軽量で、例えば、6,000件のDBレコードに対し18,000行のCSVをlookupしたCSVを発行するジョブは平均5分18秒で実行できました*4

config.ymlの記述例

exec:
  min_output_tasks: 1

in:
  type: sqlserver
  url: hoge_db_jdbc_url
  user: hoge_db_user
  password: hoge_db_pwd
  query: |
    SELECT
      hoge_key,
      hoge_col_1,
      hoge_col_2
    FROM
      hoge_table

filters:
  - type: csv_lookup
    mapping_from:
      - hoge_key
    mapping_to:
      - fuga_key
    new_columns:
      - { name: fuga_col_1, type: string }
      - { name: fuga_col_2, type: string }
    path_of_lookup_file: "ref/fuga.csv"

out:
  type: file
  path_prefix: ./out
  file_ext: csv
  formatter:
    type: csv
    header_line: true
    charset: UTF-8

ふりかえり

Embulkを導入し、予定通りにインボイス対応を完了することができました!実際に使ってみて得た知見をまとめます。

とくに良かったこと

config.ymlの取り回しのよさが開発スピードをあげてくれた

Embulkでデータ移送のジョブを6個、CSV発行のジョブを12個担当しましたが、慣れてからは1日1ジョブのペースで開発を進めることができました。Embulkはconfig.ymlにテンプレートにしたがってSQLやプラグインの実行を記述していくだけで、非常に取り回しがよかったのが開発速度を後押ししてくれました。

config.yml.liquidのサポート

Embulkではconfig.ymlへの変数埋め込みのために、Liquidテンプレートをサポートしています*5。たとえばunionプラグインを使ったconfig.ymlの記述例では、DB接続文字列を指定しています。

  type: union
  union:
    - name: product_hoge
      in:
        type: sqlserver
        url: product_hoge_jdbc_url
        user: product_hoge_db_user
        password: product_hoge_db_pwd
        query: |
          SELECT
            col_1,
            col_2,
            col_3
          FROM
            product_hoge_table

しかし、実際にはDB接続文字列はリポジトリ管理すべき情報ではありませんし、構築環境ごとに専用DBに接続したいものです。そのため、Liquidテンプレートを使い、環境変数から以下のように接続文字列を読み込む実装にしました。

  type: union
  union:
    - name: product_hoge
      in:
        type: sqlserver
        url: {{ env.HOGE_JDBC_URL }}
        user: {{ env.HOGE_DB_USER }}
        password: {{ env.HOGE_DB_PWD }}
        query: |
          SELECT
            col_1,
            col_2,
            col_3
          FROM
            product_hoge_table

注意したほうがいいこと

任意のクエリでlookupしたいときは、CSVを一度経由する必要がある

先ほど、複数のデータソースを結合したCSVの生成に csv_lookupプラグインを紹介しました。プラグイン一覧から、lookup 先にDB テーブルを直接参照できる{db}_lookupプラグインが提供されていることにお気づきの方もいるでしょう。

これらの{db}_lookupプラグインは設定ファイルで指定したテーブルとカラムから自動で lookup する仕組みになっていて、任意のクエリ(SELECT 文)は指定できません。*6。そのため、内部表をサブクエリにしたいケースではこのプラグインが利用できないことに注意が必要です。今回は回避策として、内部表の部分をローカルにCSV出力するEmbulkジョブを実行し、出力したCSVに対してcsv_lookupプラグインでlookupすることにしました。

GCSへのCSVアップロードプラグインにはstorage.objects.listが必要

社内向けにCSVを公開するため、Google Cloud Storageへ保存したいという要件がありました。当初はgcsプラグインを利用して、Embulk内部でGCSへのアップロードまで実行しようと考えていましたが、実装してみると次のエラーが発生してしまいました。

Caused by: java.lang.RuntimeException: org.embulk.config.ConfigException: org.embulk.util.retryhelper.RetryGiveupException: com.google.cloud.storage.StorageException: {my_service_account}@{my_project}.iam.gserviceaccount.com does not have storage.objects.list access to the Google Cloud Storage bucket. Permission 'storage.objects.list' denied on resource (or it may not exist).

ライブラリの内部実装を確認したところ、バケットの存在確認のためにObject Listを取得するようになっていたためでした*7。今回は、なるべく最小のPermissionsに絞ったサービスアカウントを利用したかったため、ローカルに出力したCSVをgsutilでアップロードするスクリプトを組みました。

まとめ

ここまで読んでいただきありがとうございました!使ってみて、EmbulkはPluginが豊富でとても強力なツールであることがわかりました!ETLや複数データソースをまたいだCSV生成を行う際には導入を検討してはいかがでしょうか。

小ネタ:Embulkのメンテナンス体制が新しくなったとのこと!(2023年3月)

EmbulkはFluentdの開発者である古橋氏によって2015年に公開されました*8。その後、氏が創設者であるTreasure Data社によって運用や設計の改善が行われてきましたが、2023年3月からは、社に限定せず広くコアチームを結成し設計検討を行っていく方針が発表されました*9。その経緯については、Treasure Data社のTech Talk2022の発表資料にてより詳しくまとめられています。
イベント資料|TreasureData Tech Talk 2022 - TECH PLAY[テックプレイ]

業務としてのOSS開発のアンビバレンスなど、かなり実情に即した部分まで言及されており示唆に富んだ発表資料でした。私はOSS開発の経験はありませんが、事業会社でエンジニアリングを行ううえでビジネス優先度は常に考慮すべき観点ですので、非常に考えさせられました。

あらためて、OSSメンテナの皆様、いつもありがとうございます!

さいごに

一休では、ともに試行錯誤しながらよいサービスを作ってくれる仲間を募集しています!

www.ikyu.co.jp

カジュアル面談も実施していますので、ぜひお気軽にご連絡ください

hrmos.co