Apache Beamが多言語・多バックエンド処理系を実現する仕組み

Apache Beam Portable Framework概要図

ストリーム処理とバッチ処理を統合して扱えるプログラミングモデル(あるいはデータ処理のフロントエンド)である Apache Beam が、特にGoogle Cloud DataflowやApache Flinkからの利用を背景にシェアを伸ばしています。

Apache Beamの特色として、複数のプログラミング言語のSDKを持つこと・複数のバックエンド処理系(Flinkなどを指す)を持つことが挙げられますが、これがどう実現されているのかをまとめます。

目次

前提知識: Beam入門

Exampleコードからざっくり理解

Exampleを見る前に、Beamのプログラミングをするのはどういうことかをざっくりと説明する。

Beamのプログラミング体験

Beamでプログラミングをするということは、「BeamのSDKを介して下図のようなパイプラインを構成」すること。

https://blog.gopheracademy.com/advent-2018/apache-beam/ より引用

パイプラインの重要な構成要素は以下:

  • PCollection: レコードが入るテーブルやキュー。データの型は単なる String であったり、リレーショナルであったり、ArrayやObjectをネストすることもできる。
  • Transform: 基本的に、PCollection から PCollection への変換関数。InputをPCollectionに変換するものを特別に Read Transform , PCollectionをOutputへ変換するものを Write Transform と呼ぶ。
  • Input: 任意のデータ。Read Transform が頑張ってBeam実行系が扱えるPCollectionに変換する。バッチ処理用に始めから終わりが定義されているデータでも良いし、ストリーム処理用に(概念上)無限に流れるデータでも良い。
  • Output: 任意のデータ。これもbounded dataでもunbounded dataでも良い。

Beamのコードを見てみる

JavaのMinimalWordCount exampleを見る。

コメントを短縮するとこんな感じ。
PCollection 出力PCollection = 入力PCollection.apply(Transform) なメソッドチェーンが続く。

MinimalWordCount 抜粋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Pipeline p = Pipeline.create(options);  // 空のパイプラインを作成

p.apply( // パイプラインにRead Transformを追加する
// テキストファイルから読み取る Read Transform
TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")
)
// この時点で PCollection が出来上がっている。
// PCollectionのレコード型は String で、テキストファイル1行ごとにレコードになっている。

// レコードを、空白文字で更に区切る。
// ただ split するだけだと List<List<String>> みたいになってしまうので FlatMap する。
// FlatMapElements は一つの Transform。
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))

// Filter の Transformで空文字列を排除
.apply(Filter.by((String word) -> !word.isEmpty()))

// Countはレコード列に関する集計をしてくれる便利 Transform。
// ここではレコード全体(すなわち単語)ごとに一意なものを取り、それらの個数を数えて key-value に変換。
.apply(Count.perElement())
// ここで出来上がった PCollection のレコード型は KV<String, Long>

// KVだったレコードをhuman readableなStringに変換
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))

// Write Transform を適用し、 wordcounts という名前のファイルにStringなレコードを書いていく
.apply(TextIO.write().to("wordcounts"));

// ここまでで出来上がったパイプライン p を実行し、結果を待つ。
p.run().waitUntilFinish();

Beamにおけるパイプライン実行

パイプラインは、SDKを介してRunnerに渡される。パイプラインはRunnerから更にEngineに渡されてEngineが実行するのが基本である。

例えば、ストリーム処理系としてFlinkを利用し、コードはJavaで書く場合は下図のような構成になる。典型的にはRunnerはリモートサーバーで、Engineは別のリモートサーバーで稼働することになる。

SDK, Runner, Engine

主にlocal環境での動作確認やテスト用に、Direct Runnerというのも用意されている。Engineの機能も果たし、パイプライン実行までしてくれるもの。

Direct Runner

Beamのプログラミングモデルをちゃんと理解

Beam Programming Guide を読むのが入門として一番良い。が、中々骨があるドキュメントなので、ここで「躓きやすい知識」について記載しておく。
本記事の趣旨とはずれるので、次のセクションまで読み飛ばしても良い。

  • Schema

    • 6.1. What is a schema? に書かれているようなデータ型は、スキーマにできる(明示的にしなければスキーマにはならない)。
    • スキーマはPCollectionに紐付けられる。入力PCollectionがスキーマと紐付いている場合、PTransformとして schema transform (またの名を relational transform) が使える。
    • Schema transform の出力はスキーマである(関係代数が閉包であるのと同様)ので、schema transformをつなげている部分パイプラインにおいては、スキーマ定義は最低1つで済む。
  • Row

    • スキーマが付与されたPCollectionの1レコードのインスタンス。すなわち、あらゆるPTransformの中でrowを入力として扱えるわけではない点に注意。
  • Coder

    • Runnerを流れるバイト列と、SDKと合わせて使うユーザー定義型(クライアントサイド)の変換を受け持つ。
    • リッチなものだと、read transform が特定のフォーマットのデータ(例: JSON)をマッピングさせるのにCoder定義したり。
    • もうちょっと些末なものだと、パイプラインの途中の PTransform が文字列データを入力とする時、パイプラインを流れるデータ列をUTF-8と仮定して StringUtf8Coder を使ったり。

前提知識: Beamでは複数種類のバックエンドが使える

Beam Capability Matrix に列挙されているものはBeamに認知されていて公式にRunnerが用意されている処理系。

「あなたが構成したパイプラインは DirectRunner でテストし、Google Cloud DataflowでもFlinkでも(他のでも)動かせますよ」という世界観。

豆知識だが、最も注力されているRunnerはGoogle Cloud Dataflow。次いでOSSのFlinkと言った構造。Googlerの投資をOSS陣営のFlinkが追いかけるといった様相が2016年のGoogleのブログから読み取れるが、そのパワーバランスは2022年になっても継続している模様。

前提知識: Beamプログラムは多言語で記述できる

JavaのMinimalWordCount exampleに触れたが、Java以外にもPython, Go, TypeScript用のSDKが用意されており、これらの言語でBeamパイプラインを構成することができる。

ちなみにRust SDKもGooglerから要望が高いという話がBeam Summit 2022の講演であったので、いつかサポートされるかも。

多言語・他バックエンド対応の課題

言語をまたいだUDF定義・実行が厳しい

JavaのMinimalWordCount exampleに出てきたTransformの中には、UDF (User-Defined Function) が含まれていた。

MinimalWordCount 抜粋
1
2
3
4
5
6
7
8
9
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via(
// このラムダ式とか
(String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))

.apply(Filter.by(
// これとか
(String word) -> !word.isEmpty()))

Javaで定義されたラムダ式は、Java実装のRunnerなら実行できる(ただしSDKからRunnerにこのラムダ式を送信するための術は必要)。

しかし他の言語(e.g. JVM言語でない Go, Python, TypeScript)で実装されたRunnerでJavaのラムダ式を実行するのは困難である。

したがって、naiveに考えるとSDKを提供する言語の数だけRunner実装を作る必要がある。

EngineごとにRunnerを作る必要がある?

Runnerの役割は、各Engineに対してBeamのパイプラインをsubmitし、結果を受けることである。

各Engineのデータモデルに合わせてBeamパイプラインを変換しなければならないというのが基本路線であり、naiveに考えるとEngineの数だけRunnerを作る必要がある。

Runner実装の数 == SDK言語の数 x Engine数?

実際のところ、途中まではそのような方針だった。Flink RunnerもDataflow Runnerも、JavaにもGoにもPythonにも実装されていたりする。 (2022/08現在)

これでは言語追加もEngine追加も全くやりたくないですね…

https://docs.google.com/presentation/d/1Yg8Xm4fb-oRjiLQjwLt5153hpwwTLclZrVOKP2hQifo/edit#slide=id.p より引用

Apache Beamが多言語・多バックエンド処理系を実現する仕組み

上述の Runner実装の数 == SDK言語の数 x Engine数 問題を解決するため、2019年頃からBeamでは “Portability Framework” の導入が進められている。

Portability Frameworkは現在も開発途上であり、細かい方針転換もあったりするようなので、このドキュメントを記載する上で参照したドキュメントをまず列挙する (いずれも 2022/08/31 時点参照)。

[理想像] Portable Frameworkの仕組み

ドキュメントなどから読み解ける理想像を記載。なお、まだ全然固まっていないところもあり一部筆者の推測も含む。

Portable Runnerにより、例えば「JVMで動作するFlinkやDataflowなどのEngineを使いつつPythonで定義したパイプラインを実行」することが可能になる。

Portable Runner

まず、UDFが登場しないパイプラインについて図解する。

UDFが登場しないパイプラインのPortable Runnerでの実行

これが実現できれば、Portable Runnerの実装言語は何でも良くなり、かつRunnerはPortable Runnerが1つあれば事足りるようになる。


しかし、Runner APIだけではだけではUDF実行ができない。
ProtocolBeffer(やgRPC)では「クライアント側で任意の言語で定義された関数を、別言語で実装されたサーバー側で実行する」という芸当はできないからだ。

UDFが登場するパイプラインでは下図のようになる。

UDFが登場するパイプラインのPortable Runnerでの実行

新たに SDK Harness というのが登場している。この実体はDockerコンテナであり、本例では「PythonのUDFが実行できるようにPython処理系 (とBeamランタイム) が入ったDockerコンテナ」である。

Portable Runnerは、自分でもEngineでも実行できない多言語のUDF実行をSDK Harnessに委託する形である。

[2022/08] Portable Framework の現状

Portable Runner を各言語に定義する動きが見受けられる。

※なぜGoに Portable Runner がないのかは未調査

SDKからRunnerへのパイプライン受け渡し部分にはRunner APIは使われていない。

現状のPortable Runnerでの実行

また、SDK Harnessの実装はGoogle Cloud Dataflow用のものだけ進んでいるように見えて、実質SDKとRunnerの言語は合わせる必要がある状況。

author Sho Nakatani a.k.a. laysakura

東京大学大学院 情報理工学系研究科 電子情報学専攻 修士課程で並列分散処理・ストリーム処理・データベースを研究。
2014年4月に株式会社ディー・エヌ・エーにエンジニアスペシャリストとして入社し、ソーシャルゲームのサーバサイド共通基盤の開発に従事。
2016年8月より、オンライン証券会社株式会社FOLIOに入社。バックエンドシステム開発・プロジェクトマネージメント・Engineering Managementに従事。
2020年3月よりIdein株式会社所属。デバイス・高性能計算関連の研究開発。
2021年9月よりトヨタ自動車株式会社所属。自動車データの収集〜分析基盤の研究開発に従事。
その他個人事業主として、RDBMS開発やIntel SGXを利用するためのライブラリ開発などの活動。