Google Cloud Dataflow FAQ

Google Cloud Dataflow 全般

Q. Cloud Dataflow とは何?

A. Google Cloud Platform (GCP) のサービスのひとつ。入力データを加工し、出力することに特化したもの。いわゆる ETL。

Q. Dataflow を使うと何ができる?

A. ファイルを読んで、加工して、ファイルに出力する。 ファイルを読んで、加工して、DB に出力する。 DB を読んで、加工して、DB に出力する。 Pub/Sub メッセージを読んで、加工して、Datastore に出力する。 などなど。

Q. データ加工なら、普通にバッチ作ればいいんじゃない? Dataflow の場合、何がうれしいの?

分散処理が簡単にできるので、数千万や数億レコードあるようなビッグデータに向いている。 また、リアルタイム処理ができるのもメリット。

逆に言うと、リアルタイム性が不要で、数千行・数万行の小規模データなら、 ファイルをオープンして、1行ずつ処理するプログラムを自分で書けばよい。 結構学習コストが高いので、使う必要がないなら無理して使わない方がよい。

Q. 管理画面操作だけで、ジョブ処理の定義が行える?

基本的にはできない。Java や Python でプログラムをごりごり書く必要がある。

なぜ「基本的には」かと言うと、Google 提供の Dataflow テンプレートというものがあり、 これを使えば画面上操作または gcloud コマンド一発で ETL 処理が行える。 ただし Google 提供テンプレートは基本的な処理のみであるため、 ほとんどのケースではプログラムを書く必要が出てくるのではと思う次第。

Google 提供テンプレートはこちら

Q. どんなふうにごりごり書くの?

こんな感じ。 GCS にあるテキスト内のワードをカウントし、結果を GCS に出力する

public class MinimalWordCount {
  public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
      .as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject("SET_YOUR_PROJECT_ID_HERE");
    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
    Pipeline p = Pipeline.create(options);
    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
      .apply(ParDo.named("ExtractWords").of(new DoFn() {
                        @Override
                        public void processElement(ProcessContext c) {
                          for (String word : c.element().split("[^a-zA-Z']+")) {
                            if (!word.isEmpty()) {
                              c.output(word);
                            }
                          }
                        }
                      }))
      .apply(Count.perElement())
      .apply("FormatResults", MapElements.via(new SimpleFunction, String>() {
                        @Override
                        public String apply(KV input) {
                          return input.getKey() + ": " + input.getValue();
                        }
                      }))
      .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
    p.run();
  }
}

Q. Java と Python どちらがおすすめ?

2018/8 現在、Java がおすすめ。 ずっと Python ではバッチしか対応していなかった (ストリーミングができなかった) が、 2018/7 にてストリーミングに対応した。 しかしながら、依然として Python ではできないことがあるようで (例: DynamicDestinations)、 フル機能を使えるという意味では Java の方がよいと考える。

ただ、Java だとジェネリクスを書くのがとても面倒で、当ページ管理人の個人的な意見としては Python を使いたい。

Q. 本当に Java と Python しか使えないの?

正式対応しているのは Java と Python のみ。

しかしながら Kotlin を使っている人がちらほらといる模様。 Kotlin は JVM で動作し、Java よりも簡潔に書けることを標榜した言語。 また、Spotify が提供している Scio という Scala 用ライブラリもある。

Q. データ入力元・出力先の種類は?

GCS・BigQuery・Pub/Sub・Datastore など。ただしカスタムソース・カスタムシンクというもので定義できるので、 AWS の S3 など、標準で用意されていないものでも対応可能 (ある程度ソースを書く必要はある)。

Apache Beam 2.x で S3 標準対応とどこかに書いてあったかもしれない。

Q. バッチとかストリーミングって何?

バッチは、一回限りの実行のこと。1時間に1回とか、1日に1回などの定時実行もバッチ。

ストリーミングは、動かしっぱなしにすることで、ほぼリアルタイムにデータ処理を行える。

Dataflow はジョブを実行する際、「バッチ」「ストリーミング」のいずれかを指定する必要がある。

Q. Apache Beam との関係性は?

いまだによくわかっていないのだが、 最初にできたのが Dataflow で、SDK を公開していた(?)。 Apache Beam は、そのあとに始まったプロジェクトではないかと思われる。 2016年、Google は Apache Foundation に SDK コードを寄贈。 Google の Dataflow SDK は 2.5.0 で終了。それ以降は Apache Beam SDK を使うことになっている。

Q. ネット上にあるサンプルコードが動かないんだけど

オープンソース系にはよくあることだが、Dataflow の場合、 SDK 1.x 系 → SDK 2.x 系 (≒ Apache Beam に移行) のタイミングでインタフェースが変わったため、 特にひどい状況。

例えば Dartaflow SDK 1.x 系は下記のように書いていたものが、

p.apply(TextIO.Read.from("gs://foo/bar.txt"));

SDK 2.x 系は下記のように変わった。

p.apply(TextIO.read().from("gs://foo/bar.txt"));

ネット上の情報で、2016年に書かれたコードはほぼ動かないと思ってよい。 2017年は微妙。 2018年ならだいたい動く。

さらにひどいことに、Apache Beam SDK (最新の 2.5.0) の Javadoc 記載のコードも動かないものがあったり、 Dataflow ドキュメントの日本語ページの翻訳が古くサンプルコードがエラーになったりする (英語版では動いたりする)。

とはいえ、SDK 1.x と 2.x で概念や考え方が変わったわけではなく、メソッド名などのインタフェースが微妙に変わったり、機能追加がなされたり、というのが主な変更点であるので、概念・考え方を調べるために Dataflow のドキュメントを読むのはまったく問題ない。

Q. オートスケールする?

オートスケールさせることもできるし、させないこともできる。 インスタンスタイプ (n1-standard-4 など)・スタート時のインスタンス数・ インスタンス数上限を規定する。

Q. ジョブはどこで実行される?

自動的に GCE インスタンスが作成され、その中でジョブが実行される。 インスタンス数 3 と指定すれば、GCE が 3 インスタンス起動される。 GCE 管理画面でインスタンスが起動していることが確認できるし、 ssh でログインもできる (ログインできても何かができるわけではない)。

Q. パイプラインを作成してジョブを実行するコードも GCE 上で実行される?

違う。わかりにくいが、 Dataflow ジョブを実行したら、その後は Dataflow サービスの枠内として GCE で自動的に処理が行われる。 Dataflow ジョブを作成するための、上記に提示した MinimalWordCount のようなコードは、 別のどこかで動かす必要がある。 それは CloudShell でもよいし、どこかの GCE でもいいし、GKE でも、GAE でも、Functions でも、どこでもよい。 あなたが好きに選べばよい。

Q. Dataflow Template って何

上記の「Dataflow ジョブを作成するための、上記に提示した MinimalWordCount のようなコード」を、 テンプレート生成時に終わらせてしまい、生成されたパイプライン情報を「テンプレート」として GCS に配置しておく。

そうすることで、Dataflow ジョブを実行する際、 「Dataflow ジョブを作成するための、上記に提示した MinimalWordCount のようなコード」を実行する必要がないため、 ジョブを生成する時には Java 環境が不要になる。 こうすることで、gcloud dataflow jobs run コマンドのみで、Dataflow ジョブを実行することができる。

そのため、Cloud Functions、Composer 等、GCP コンソール、gcloud コマンドなど、Java 環境がない環境からも簡単に実行が可能になる。

Q. 料金は?

Dataflow ジョブの実行中に確保した CPU・メモリ・ディスクの量に、 それぞれの CPU 単価・メモリ単価・ディスク単価をかけた金額が請求される。 これらは「確保したリソース×実行時間」で決まるので、実際の使用量で決まるものではない。 実体としては GCE が起動されるが GCE としては請求されない。通常の GCE と比べるとおおむね 1.5倍程度になる模様。 この 0.5倍分が Dataflow の付加価値と言えるのだろう。

Q. Dataflow は、ワークフロー管理ができる?

JP1・Airflow・Digdag などのジョブワークフロー管理のようなことができるかという質問であれば、 できない。Dataflow 管理画面で、一見ワークフロー管理っぽい画面があるが、 あれは処理の流れの進捗状況を表示しているだけ。そもそも、Dataflow は、 「この入力を、こう加工して、ここに出力」を定義するものであるが、 「これが終わったらこれを実行」という『逐次処理』はできない。

Q. 「逐次処理はできない」の意味がわからない。

Dataflow で作成するパイプラインとは、『一つの SQL』のようなものだと思ってほしい。 SQL では、FROM でデータ入力元を指定でき、WHERE でデータを絞り込むことができ、 SELECT でカラムを選択でき、GROUP BY で集約ができ、JOIN で複数の入力を扱える。

しかしながら、

INSERT INTO output1 SELECT * FROM input1;
INSERT INTO output2 SELECT * FROM input2;
のように、データの流れがつながっていない、1つめの SQL が終わったら 2つめの SQL を実行する、 という逐次処理は、「1つの SQL」では実現できない。なので、Dataflow のパイプラインでも実現できない。

Q. 逐次処理ができないということは、つまり何ができない?

例えば下記ができない。

  • BigQuery にデータを格納後、後続バッチを起動するために Pub/Sub メッセージを送信する。
  • BigQuery にテーブルA を格納後、テーブルA を SELECT して加工し、テーブルBに格納する。

GCE インスタンスについて

Q. GCE インスタンスサイズは指定できる?

パイプラインオプションに --workerMachineType=n1-standard-2 などと指定する。 インスタナンスタイプは固定で、データ処理量に応じてインスタンスタイプが勝手に変わったりはしない。

Q. カスタムマシンタイプは使用できる?

できる。vCPU が 2、メモリが 4096 MB であれば、下記のようにする。

--workerMachineType=custom-2-4096

拡張メモリを使いたい場合は、下記のように末尾に "-ext" をつける。

--workerMachineType=custom-2-16384-ext

下記のような GCE のカスタムマシンタイプ一般の制限があるので注意。詳細は https://cloud.google.com/compute/docs/instances/creating-instance-with-custom-machine-type を参照。

  • vCPU は 1個または偶数個 (3 とか 5 は不可)
  • メモリは vCPU 1 個あたり 0.9 GB 以上、最大 6.5 GB まで (6.5 GB 超は拡張メモリになる)
  • インスタンスの合計メモリは、256 MB の倍数にする必要があります。

Q. プリエンプティブインスタンスは使える?

安い代わりに最大 24時間で、いつ終了になるかわからないというプリエンティブインスタンスは、 Dataflow からは残念ながら使えない。

Q. GCE インスタンスで SSD は使える?

使える。デフォルトでは HDD だが、下記のパイプラインオプションで SSD を使うことができる。 オプションで下記を指定する。

--workerDiskType="compute.googleapis.com/projects//zones//diskTypes/pd-ssd"

Q. ローカル SSD は使える?

NVMe で爆速なローカル SSD は、残念ながら使えない (と思われる)。

Q. インスタンス数はどうやって指定する?

開始時のインスタンス数 (numWorkers) と、インスタンス数上限 (maxNumWorkers) を指定できる。 インスタンス数下限は指定できない。

  • 例. 開始時のインスタンス数 1、インスタンス数上限を 5 とした場合:
    • 開始時は 1、その後は 1〜5 のいずれになるかは Dataflow サービス側で判断。
  • 例. 開始時のインスタンス数 5、インスタンス数上限を 5 とした場合:
    • 開始時は 5、その後は 1〜5 のいずれになるかは Dataflow サービス側で判断。 ジョブ開始後
  • 例. 開始時のインスタンス数 5、インスタンス数上限を 10 とした場合:
    • 開始時は 5、その後は 1〜10 のいずれになるかは Dataflow サービス側で判断。

ジョブ管理

Q. ジョブをキャンセルできる?

できる。管理画面上か、gcloud コマンドで gcloud dataflow jobs cancel [ジョブID] で可能。 いずれにしても、GCE を終了するのに 5分くらいかかるので、待つのがめんどくさい。

Q. ジョブ再実行は?

投入した Dataflow ジョブを、再度実行することはできない。同じジョブを作るしかない。 同じパラメータで新規ジョブを作成して実行、くらいはできてもいいような気がするが、 2018/08 現在、そのような機能はない。

Q. タイムアウト設定は?

Dataflow ジョブをN時間で終了させるという簡単なオプションはないと思う。 代替策は下記のように一定時間待って、その後ジョブを殺す。

pipeline.run().waitUntilFinish(new Duration(1000*60*5)); // 単位はミリ秒。5分待つ

あるいは Dataflow ジョブを実行後、定期的にポーリングして、 ステータスが実行中かつN時間を超えたらジョブを殺す、といったツールを作る。

Q. Dataflow ジョブ完了後、PubSub などで通知を受けることは可能?

Dataflow 単体では簡単にはできない (できてほしいのだが)。

案1. デプロイプロセス側で

pipeline.run().waitUntilFinish();
で終了を待って、その後 Pub/Sub メッセージを投げる方法がある。

案2. あるいは定期的にポーリングして、ジョブ終了を検知するプログラムを作る。

案3. パイプラインの最後で TextIO でストレージに出力し、「Cloud Pub/Sub Notifications for Cloud Storage」を使って Pub/Sub メッセージを送信する。

Q. リトライは?

ひとつの処理でエラーが発生した場合、自動的にリトライを 4回行ってくれる。 ただし、リトライ時にウェイトを入れてくれたりはしないので、 ごくごく短時間の外部サービス障害ならば問題ないが、 長時間落ちていた場合などはジョブがエラー終了となる。

ちなみに、当ページ管理人の経験としては、定時バッチで S3 からオブジェクトを取得する際、 1回だけエラーになるのは非常によくあるが、4回すべて失敗したことは一度もなかった。

Q. 全く関連のないジョブを複数個実行することはできる?

できる。パイプラインのビルド処理において、パイプラインを 2つ作ればよい。 ただし、ビルド処理は Dataflow サービスの枠外であるため、例えば下記コードで p.run() した直後にマシンが落ちてしまったら p2 は実行されないことに注意。

Pipeline p = Pipeline.create(options);
p.apply(...);
p.run();

Pipeline p2 = Pipeline.create(options);
p2.apply(...);
p2.run();

データ処理全般

Q. データのスキップはできる?

できる。下記のように、特定条件の場合 c.output を呼ばなければよい。

Integer num = c.element();
if ( num > 10000 ){
  // 異常値なのでエラーログ出力等
} else {
  c.output(num);
}

filter を使ってもできると思う (未検証)。

Q. SQL での group by のような処理ははできる?

GroupByKey を使う。 キーを指定すると、同じキーを持つレコードを集約できる。 集約したあとに何を行うかはあなた次第 (MAX 値を取るのか、 件数をカウントするのか、タイムスタンプが最も早いものなどの条件で1つを選ぶのか)。

なお、全データが揃わないと GroupByKey が開始できないため、分散処理のうまみが減ってしまう。 また、データを貯め込む必要があるためデータ量に応じたばディスク容量が必要となる (ディスクを使うので、SSD を使うと高速になる)。

Q. SQL での join のような結合処理ははできる?

2つやり方がある。 1つは、CoGroupBy を使う方法。 もう 1つは副入力 (Side Input) を使う方法。

Q. 副入力って何?

典型的には、大量のデータと、少量のマスタがあって、 それを結合したい場合などに少量のマスタを副入力として扱う。 例えば

ID,タイムスタンプ,店舗ID,売上金額

という大量売上データがあるとして、 さらに BigQuery などに 店舗ID,店舗名 という店舗マスタがあり、 最終的に

ID,タイムスタンプ,店舗ID,店舗名,売上金額

というふうに「店舗名」カラムを追加して出力したい場合、 店舗マスタを副入力として扱うとよい。

参考: Cloud Dataflowで複数リソースを読み込む方法

Q. 複数の出力を行うことはできる?

PTranslate は、入力の PCollection を変換し、出力 PCollection を生成するが、 複数の出力を生成することができる。 例えば売上データを読み込んで、

  • 出力1: 商品ごとの売上合計
  • 出力2: 店舗ごとの売上合計
  • 出力3: セール品の売上合計
  • 出力4: フォーマットエラー情報

というふうに複数を分けることができる。 そのあとは、出力1 はファイルに、出力2・3 は BigQuery のテーブルに、 出力4 はファイルとして GCS に、というふうに分岐できる。

Q. SQL での DISTINCT のような重複除去はできる?

RemoveDuplicates を使う。

Q. SQL で書きたいんだけれど

試したことはないが書けるはず。Apache Beam 2.2.0 (2017-12-02 リリース) より Beam SQL DSL が実装された。 BeamSqlExample.java

ただし Dataflow SDK 2.2.0 のリリースノートにおいては https://cloud.google.com/dataflow/release-notes/release-notes-java-2?hl=en にて

Known issue: SQL support is not included in this release because it is experimental and not tested on Cloud Dataflow. Using SQL is not recommended.

との記載があり、Dataflow としてはサポート対象外。

テキストファイル関連

Q. 複数のファイルを入力とすることはできる?

できる。*.csv などと記述することができる

p.apply(TextIO.read().from("gs://foo/bar/*.csv"));
さらに下記のような複数のストレージなど、ワイルドカードでは表現できない場合でも、 PCollection を flatten することで、1つの入力にまとめることができる。 参考: https://stackoverflow.com/questions/44407323/textio-read-multiple-files-from-gcs-using-pattern
  • gs://foo1/bar/*.csv
  • gs://foo2/bar/*.csv

Q. テキストファイルの先頭ヘッダを除外することはできる?

TextIO ならできない。 各インスタンスでデータを分散処理するため、「1行目」という判定ができないため。 下記のように特定文字列なら処理しない、という書き方になる。

String line = c.element();
if ( line.startWith("Id,Date,Action,...") ){
  return;
}
.... 何かしらの処理

FileIO ならできるが、Dataflow から使えるのか? (ローカルのファイルを読み書きするということなので)

Q. エラー時の情報として、入力データのファイル名や行番号を知りたい

TextIO ならできない。 FileIO ならできるが、Dataflow から使えるのか? (ローカルのファイルを読み書きするということなので)

Q. テキストファイルをソートしてから出力することはできる?

できない。ソートは分散処理に不向き。 どうしてもというなら https://stackoverflow.com/questions/44291407/dataflow-write-to-file-in-order-of-pcollection の回答にあるようカスタムシンクを作ることでできるかもしれないが、大規模データを出力するのは困難ではと思う。

BigQuery 関連

Q. BigQuery のテーブルを入力とすることはできる?

できる。

PCollection weatherData = pipeline.apply(
     BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));

Q. BigQuery クエリ結果を入力とすることはできる?

できる。StandardSQL も、LegacySQL いずれも可能。 下記のように usingStandardSql() をつければ StandardSQL。 つけなければ LegacySQL。

p.apply(
    BigQueryIO.readTableRows
      .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
      .usingStandardSql();

Q. BigQuery のテーブルを出力とすることはできる?

できる。

Q. BigQuery のテーブルに merge 文を使って重複除去しつつ出力することはできる?

できないと思われる。

Q. BigQuery のテーブルを新規作成することも、既存テーブルへの追記もできる?

いずれも可能。新規作成時はテーブルスキーマを指定しないといけない。

Q. すでに存在する BigQuery のテーブルを丸ごと置き換えることはできる?

できる。下記のように WriteDisposition の WRITE_TRUNCATE を指定すればよい。

   .apply(BigQueryIO.writeTableRows()
          .to(TableDestination("mydataset:mytable", "description"))
          .withSchema(schema)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

Q. データ内容に応じて複数のテーブルに振り分けることはできる?

できる。下記参照。

クォータ・リミット・制限

どんなクォータ・リミット・制限がある?

引っかかりやすいものは下記。Dataflow の制限と、起動する GCE の制限両方を確認すること。

  • Dataflow が使用する GCE 1000インスタンス
  • Dataflow ジョブ同時実行数 25
  • GCE IPアドレス 50個
  • GCE SSD 合計サイズ 2TB
詳細はこちら https://cloud.google.com/dataflow/quotas

いずれも制限緩和申請が可能。

プロファイリング

Dataflow のボトルネックを解析したい。プロファイリングできる?

できる。 あらかじめ GCS にバケット my-dataflow-profile を作成しておき、 パイプラインオプションに --saveProfilesToGcs=gs://my-dataflow-profile/bar などと指定する。 パイプライン実行後、GCS のファイルを取得し、pprof でプロファイリング結果を見ることができる。

下記は Dataflow とは関係はないが、pprof の Web UI の紹介記事。めちゃくちゃ便利とのことだが、 これは確かにすごく非常に便利。

Go言語のプロファイリングツール、pprofのWeb UIがめちゃくちゃ便利なので紹介する

その他 (未整理)

Dataflow shuffle サービスって何

一般用語としての MapReduce において、Map と Reduce の間に位置するのが shuffle。 Dataflow で言うと、GroupByKey がまさに shuffle。 集約処理が必要なので、分散処理には向かない部分。 よって、GCE 内ではなく GCP のリソースで請け負ってあげましょうというのが Dataflow shuffle サービス。

ご意見・ご指摘は Twitter: @68user までお願いします。