一休.com Developers Blog

一休のエンジニア、デザイナー、ディレクターが情報を発信していきます

Cloud WorkflowsとCloud Tasksを使って日次のバッチ処理を作る

宿泊プロダクト開発部の田中(id:kentana20)です。

このエントリーは一休.com Advent Calendar 2024の19日目の記事です。

今回は一休.com宿泊のとあるプロジェクトで必要になった 「ホテル・旅館の商品データを日次で更新する」 という処理を

  • Cloud Scheduler
  • Cloud Workflows
  • Cloud Tasks

とWeb APIで構築、運用している事例をご紹介します。

宿泊システムのバッチ処理について(背景・課題)

一休.com 宿泊には、業務に必要なデータ作成や更新を行うバッチ処理が多く存在します。たとえば

  • 投稿されたクチコミ評点を集計してホテル、旅館のスコアを更新する
  • 前月分までの宿泊予約データをもとにユーザーにポイントを付与する

などです。 これらのバッチ処理は宿泊システムの中でも古い部類に入る技術スタック(ASP.NET(C#/VB))で作られており

  • スピーディに開発できない
  • バッチ処理の開発に慣れているメンバーが限られている

といった課題がありました。

新たに必要になったバッチ処理をどうやって作るか

今年の春頃に実施したプロジェクトで「ホテル・旅館の売れ筋商品(プラン)を日次で洗替する」という処理を新たに作る必要が出てきました。

ざっくりとした要件は以下のような内容です。

  • 一休.comに掲載している一部のホテル・旅館を処理対象とする
  • 処理対象のホテル・旅館に対して、直近XX日間の予約を集計して売れ筋商品(プラン)を抽出する
  • 対象の売れ筋商品(プラン)に対してフラグを立てる
  • 処理対象のホテル・旅館は増えたり、減ったりする
  • 売れ筋商品の洗替は日次で行う

前述の背景・課題があったため「新しい開発基盤を作ってバッチ処理をスピーディに開発できるようにする」ことを考えてCTOに壁打ちをしたところ「新しい開発基盤を作る前に、そもそもこれはバッチで作るのがベストなのか?」というフィードバックをもらいました。具体的には

  • 一休.com宿泊では、歴史的経緯*1から、オンライン処理できないものをほとんどバッチで作っている
  • 現在では、そもそもバッチでまとめて処理せずに、非同期化・分散処理をする選択肢もある
  • バッチで作るのが本当にベストなのか、ほかの選択肢も含めて検討したほうがよい

といった内容でした。このフィードバック内容を踏まえて

  1. (もともとの案)新たにバッチ開発の基盤を作る
  2. マネージドなクラウドサービスを組み合わせて作る

を検討し

  • 今回実施したい作業はシンプルな処理の組み合わせで実現可能であること
  • 並列、分散処理を考えやすい要件であること(ホテル・旅館単位で処理しても問題ない)

といった理由から、最終的に2を選択しました。

Cloud Workflows + Cloud Tasks を使ったバッチ処理

クラウドサービスについて、一休では、AWSとGoogle Cloudを併用しています。 新しく作るサービスではGoogle Cloudを使うケースが増えている一方で、一休.com 宿泊ではまだ事例が少なかったこともあり、今回はGoogle Cloudを使うことにしました。

処理フロー

  • Cloud Scheduler
  • Cloud Workflows
  • Cloud Tasks

の3サービスと、シンプルなWeb APIを組み合わせた設計にしており、以下のような流れで動いています。

処理フロー

Cloud Workflows

cloud.google.com

Cloud Workflowsは、マネージドなジョブオーケストレーションサービスです。ワークフローに定義された処理順(ステップ)に従って

  • Google Cloudのサービスを実行する
  • 任意のHTTPエンドポイントにリクエストする

などを実行することができます。 公式ドキュメントにも日次のバッチジョブの例が載っており、バッチ処理がユースケースの1つであることがわかります。

ワークフローで実行したい内容(ステップ)をYAML形式で記述します。 以下は、今回作ったワークフローのイメージです。

main:
  steps:
    - init:
        assign:
          - queueName: "cloud-tasks-queue-name"
    - getTargetHotels:
        call: http.get
        args:
          url: "https://api.example.com/hotels"
          auth:
            type: OIDC
          query:
            target: true
        result: hotelData
    - createCloudTasks:
        palallel:
          for:
            in: ${hotelData.body.hotels}
            value: hotel
            steps:
              - createTask:
                  call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                  args:
                    parent: "projects/${sys.get_env('GOOGLE_CLOUD_PROJECT_ID')}/locations/${sys.get_env('LOCATION')}/queues/${queueName}"
                    body:
                      task:
                        httpRequest:
                          httpMethod: "PUT"
                          url: "https://api.example.com/hotels/${hotel.id}/popular"
                          headers:
                           Content-Type: "application/json"
                          oidcToken:
                            serviceAccountEmail: ${"application@" + projectId + ".iam.gserviceaccount.com"}

Workflowsから外部APIを呼び出す

getTargetHotels のステップで、Web APIへリクエストして対象のホテル・旅館を取得しています。 auth でOIDCを指定していますが、これによってWorkflowsからのAPIリクエストにAuthorizationヘッダを付与することができます。

ワークフローからの認証済みリクエスト  |  Workflows  |  Google Cloud

呼び出されるAPIで、このヘッダを使ってIDTokenを検証することで、Workflowsからのリクエストであることを保証しています。*2

以下は、IDTokenの検証をするミドルウェアのサンプル実装(Go)です。

import (
    "fmt"
    "net/http"
    "strings"

    "google.golang.org/api/idtoken"
)

// IDトークンを検証するミドルウェア
func AuthMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Authorization ヘッダからBearerトークンを取得
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, "Authorization header is required", http.StatusUnauthorized)
            return
        }

        token := strings.TrimPrefix(authHeader, "Bearer ")

        // IDトークンの検証
        _, err := idtoken.Validate(r.Context(), token, "")
        if err != nil {
            // トークンの検証に失敗した場合はエラーを返す
            http.Error(w, "Invalid ID Token", http.StatusUnauthorized)
            return
        }

        // トークンが有効であれば、次のハンドラーを呼び出す
        next.ServeHTTP(w, r)
    })
}

APIのレスポンスをもとにCloud Tasksにエンキューする

createCloudTask のステップで、Web APIで取得した hotelData.body.hotels に含まれるホテル・旅館ごとにCloud Tasksにエンキューしています。前述したように実行順序を考慮する必要がないため、parallel を使って並列処理しています。

また、 oidcToken を指定することで、Cloud TasksがAPIリクエストを送る際にOIDCトークンを付与することができます。これによってWorkflowsからのAPIリクエストと同様に、API側でIDTokenを検証することができます。

Cloud Tasks

cloud.google.com

Cloud Tasksについては、昨年のAdvent CalendarでCTO室の徳武が詳細に解説していますので、ぜひご覧ください。

zenn.dev zenn.dev

Web API

Cloud Workflows/Cloud Tasksが呼ぶWeb APIは、以下の2つを用意しました。

  1. 処理対象のホテル・旅館を取得するAPI(GET)
  2. 指定されたホテル・旅館IDをもとに売れ筋商品を更新するAPI(PUT)

どちらのAPIも、特定のユースケースに合わせたAPIという形ではなく、単一のリソースを取得/更新するというシンプルな仕様にして再利用可能な設計にしています。

この設計にしたことによって、リリース後に「ホテル・旅館が管理システムから任意の操作をした際に、売れ筋商品を更新したい」というユースケースが出てきたときも、2のAPIを使って対応することができました。

リリース後の運用

このWorkflowsを使ったバッチ処理をリリースした後に、安定運用のためにいくつか変更したポイントがあるのでご紹介します。

Cloud Tasksのキュー設定の調整

Cloud Tasksの設定が適切ではなく、Web APIへの秒間リクエスト数が多すぎてレスポンスが遅くなるという事象があったため

  • 最大ディスパッチ数
  • 最大同時ディスパッチ数

などを調整しました。

キュー設定変更のPull Request

異常終了時の検知を強化

異常があった場合に、受動的に気付けるように

  • Workflowsのエラー処理を調整する
  • エラーログ(Cloud Logging)をSlackに通知する

といった対応をしました。

まとめ

Cloud Workflows + Cloud TasksとWeb APIを組み合わせたバッチ処理を実装した事例をご紹介しました。 個人的な所感としては、以下のようなメリットを感じています。

  • Cloud Workflowsはある程度複雑な処理も定義できるため、バッチ処理で必要な手続きをアプリケーション内部に書かずにシンプルなWeb APIとの組み合わせでバッチ処理を作れる
  • データの更新処理は特に、処理単位を小さくする & Cloud Tasksなどのキュー処理を使うと並列実行やエラー時のリトライをマネージドにできるので、運用が楽になる
    • 実際に、キュー設定の調整をする前は初回エラー → キューのリトライによって成功する、といったケースがあり、運用上問題になることはなかったです

また、今回は採用しませんでしたが、一休社内ではCloud Run Jobsを使ったバッチ処理の基盤も整ってきており、冒頭にご紹介した課題に対して複数の解決方法ができつつあるので、既存のレガシーなバッチ処理も少しずつ刷新していきたいと考えています。

おわりに

一休では、事業の成果をともに目指せる仲間を募集しています。

www.ikyu.co.jp

まずはカジュアル面談からお気軽にご応募ください!

hrmos.co

明日は @yamazakik の「一休バーチャル背景を作ったはなし」です。お楽しみに!

*1:非同期ジョブキューの仕組みがない時代に作られたバッチが多く残っています

*2:実際には、この保証だけでなくほかの方法も含めて安全に運用できるように設計しています