一休.com Developers Blog

一休のエンジニア、デザイナー、ディレクターが情報を発信していきます

社内マーケター向けの機械学習プラットフォームを作りました

はじめに

こんにちは。データサイエンス部の平田です。

一休でのデータ分析はJupyter NotebookやJupyter Labを用いてDWHにアクセスして行われることが多いですが、サービスそのものと分析環境が乖離していることにより、分析結果を継続的にサービスに取り込むのが難しい状況でした。 また、マーケティング部の方々がJupyterを使用して分析した結果に基づいて継続的に施策を行おうとしても、Airflowに組み込む際のエンジニアの負担はそこそこありますし、修正するたびに依頼をしなければならないなどコミュニケーションコストも発生します。 さらに、マーケティングに機械学習を取り入れたい場合でもairflow側で全部やってしまうと密結合になってしまいます。

そこで、Airflowから別の場所にあるJupyterを直接実行することによりエンジニアの負担は最小限にとどめ、自由に施策を打てるような仕組みとして機械学習プラットフォーム、通称ml-jupyterが生まれました。

f:id:ikyu_com:20190711124818p:plain
模式図

AirflowからJupyterをキックする

①まず、日次で実行しているAirflowからml-jupyter上のAPIをキックする関数を作ります。

キックするコードは以下のようになります。 大まかな流れとしては、カーネル起動→コードを取得→ウェブソケットで通信→カーネル終了となります。

Jupyter notebookでは一つのnoteごとにカーネルが割り当てられるため、まずその起動から始まります。

import json
import requests
import datetime
import uuid
from work.dags.base_taskset import BaseTaskset
from websocket import create_connection

class TriggerJupyterNotebook(BaseTaskset):

    @classmethod
    def execute(cls, **kwargs):
        ipynb_file = kwargs['ipynb_file']
        folder_name = kwargs['folder_name']

        # 施策ごとのノートブックファイル (Airflowから実行されるもの)
        notebook_path = f'/{folder_name}/{ipynb_file}.ipynb' 
        host = [host_name]
        base = f'http://{host}'

        # カーネルを起動
        url = base + '/api/kernels'
        response = requests.post(url)
        kernel = json.loads(response.text)
        print('kernel_id:', kernel['id'])

        # コードを取得
        url = base + '/api/contents' + notebook_path
        response = requests.get(url)
        file = json.loads(response.text)

        # セルのコードのみを抽出
        code = [c['source'] for c in file['content']['cells'] if len(c['source']) > 0 and c['cell_type'] == 'code']

        # WebSocketのオープン
        ws = create_connection(
            f'ws://{host}/api/kernels/' + kernel['id'] + '/channels'
        )

        # WebSocket上でメッセージを送る

        # カレントディレクトリを施策用のディレクトリに変更
        # (パッケージ等もカレントディレクトリを参照する)
        # 最後にカーネルの処理が完了したことを知るために、特定の文字列を出力する
        terminated_signal_str = uuid.uuid1().hex
        code = ['import os', f"os.chdir('/tf/{folder_name}')"] \
        + code + ["print('" + terminated_signal_str + "', end='')"]

        for c in code:
            msg_type = 'execute_request'

            content = {
                'code': c,
                'silent': False
            }

            hdr = {
                'msg_id': uuid.uuid1().hex,
                'username': 'airflow',
                'session': uuid.uuid1().hex,
                'data': datetime.datetime.now().isoformat(),
                'msg_type': msg_type,
                'version': '5.0'
            }

            ws.send(json.dumps({
                'header': hdr,
                'parent_header': hdr,
                'metadata': {},
                'content': content
            }))

        # WebSocketのレスポンスを取得
        error_flag = False
        while True:
            msg_type = ''
            while msg_type != "stream":
                rsp = json.loads(ws.recv())
                msg_type = rsp['msg_type']
                if rsp['content'].get('status') == 'error':
                    print('jupyter notebook error:', rsp['content']['evalue'])
                    error_flag = True
                    break

            # エラーを返却した場合、WebSocketをクローズして、処理を終了
            if error_flag:
                ws.close()
                break

            # 特定の文字列を含む場合、WebSocketをクローズして、処理を終了
            if terminated_signal_str == rsp['content']['text']:
                ws.close()
                break

        # カーネルを終了
        url = base + f"/api/kernels/{kernel['id']}"
        response = requests.delete(url)
        response.status_code  # 204ならOK

        return not error_flag

②PythonOperatorを作り、この関数を実行します。

PythonOperator(
    task_id='[task_id]',
    provide_context=True,
    python_callable=TriggerJupyterNotebook.execute,
    dag=subdag,
    op_kwargs={'folder_name': '[folder_name]', 'ipynb_file': '[file_name]'}
)

③新たな施策をAirflow上で組み込むときは、エンジニア側は「Jupyter上の〇〇ファイルを実行してほしい」 「〇〇が終わった後や〇〇テーブルが更新された後に実行してほしい」などの要望を聞き、上のPythonOperatorをコピペして少し変え、適切な順番で実行されるように実装するだけになります。

ただ、Airflowから無事キックできるようにするにはいくつか決めなければならないことがあります。

ライブラリ競合問題

施策ごとにライブラリのバージョンが異なる可能性がある場合、それぞれその施策独自のライブラリを見に行く必要があります。 Jupyter上で、

!pip install [hogehoge] -t .

とすることで、カレントディレクトリにライブラリがインストールされるようになります。

施策ごとにフォルダを切り、上記を実行することでそれぞれのフォルダ内で独立してライブラリをインストールできます。 Airflowからキックするコードにchdirコマンドを追加しています。(第1項)

code = ['import os', f"os.chdir('/tf/{folder_name}')"] \
+ code + ["print('" + terminated_signal_str + "', end='')"]

さらに、pandas, boto3など共通としてよさそうなライブラリは別途インストールしています。

終了検知問題

Jupyterでの実行終了を検知するのは結構難しい問題です。若干泥臭くなりますが、Airflowでランダムに生成した特定文字列をJupyter側で最後に出力させることで終了とみなすようにしました。もっといい方法があれば教えてほしいです。(第3項)

code = ['import os', f"os.chdir('/tf/{folder_name}')"] \
+ code + ["print('" + terminated_signal_str + "', end='')"]

また、Jupyter自体がエラーで終了することもあり得るのでフォローしています。

ml-jupyter上からできること

サービスAPIとマーケターをつなぐ

基本的に一休のCRM施策はDWHのデータを元に、SQLで対象者データやコンテンツ情報を抽出し、Ikyu Marketing Cloudという社内ツールから各種チャネルに配信しています。

ただし、DWHのデータは基幹DBの当日2:00までのデータであり、リアルタイムに基幹DBから情報を取得・追加・更新することができません。 上記のDB分離問題からマーケティング側の課題として、特定のユーザーに対して何らかのアクション(クーポンの付与や会員ランクの変更)をフレキシブルに行えないという問題がありました。

そこで、ml-jupyter上から基幹側の各種APIを呼べるようにライブラリを整備することで、各Notebookのインスタンス上でpythonを用いてAPIをコールするなど自由自在にクーポン付与等の施策を行えるようになりました。

機械学習のアプローチを用いたデータ生成

今まではMarketing Cloud上で配信する対象者等のデータ抽出については、SQLを用いて複雑なクエリやストアドプロシージャを組んで行っていましたが、SQL自体の言語仕様が貧弱なところもあり、機械学習のアプローチを用いてデータの作成をするのが困難でした。

ml-jupyterではJupyter Notebook上で、pandasscikit-learn等の強力なpythonライブラリを用いた機械学習のアプローチで各種データの生成ができるようになり、より一歩進んだCRM施策が行えるようになりました。

宿泊のリマーケティングクーポンの施策は機械学習のアプローチにてデータが作られ、Marketing Cloud上で日々配信が行われており、現状かなりの数値のリフトが見られています。

細かい話

マーケティングの方々はおそらくバージョン管理に興味ありません。(偏見)

そこで、jupyter notebookをcronでこっそり自動pushしています。