パパエンジニアのアウトプット帳

30歳に突入した1児のパパエンジニアのブログ

embulk-output-bigqueryでパーティションテーブルを扱うときのアレコレ

お仕事でEmbulkを使ってBigQueryにデータインポートやることがあって、その時にパーティションテーブルに対してアレコレやるときに色々検証したのでメモ。
Embulkはv0.9.23でembulk-output-bigqueryはv0.6.4です。

github.com

パーティションの利用方法

embulk-output-bigqueryでパーティションテーブルを扱う方法は下記の2つです。

table名でパーティションデコレータ指定

table: table_name$20200827 のようにパーティションデコレータを利用すると、指定パーティションに書き込めます。
この書き方をすると後述するmodeオプションでreplaceなどを利用した時に指定パーティションだけ上書きできます。


デコレータの書き方も直接20200837のように書くこともできますし、ここに書いてあるようにTime#strftime formatが許されるのでtable_name$%Y%m%d と書くこともできます。
さらに、embulkはliquid templateを利用できるので関数を利用して動的に指定することも可能です。

{% assign target_date = env.TARGET_DATE | default: '2020-08-20' %}

table:table_name${{ target_date | replace: "-", "" }}

time_partitioning.field

もう1つはtime_partitioning.fieldを指定する方法で、DATEかTIMESTAMP型のカラムを指定するとカラムベースのパーティションテーブルになります。
カラムベースでパーティションを分けるので、embulkで一括処理するときにパーティションを気にせずinsertすることができます。
なので、この場合は基本的にもう1つのパーティションデコレータ指定をする必要はありません。


しかし、この場合は後述するmodeでreplaceなどを選んでも対象のパーティションは不明なので、全てのデータがreplaceされてしまいます。
データ重複を避けたいけど、一度に全データを対象に読み込まない場合はこの方法の場合は使えないので注意です。

mode

embulk-output-bigqueryでは下記の5つの書き込みモードをサポートしています。

  • append
    • 一時テーブルにinsertした後、対象テーブルに追記します
  • append_direct
    • 一時テーブルを利用せず、対象テーブルに追記します
    • トランザクションをサポートしないので、途中で失敗しても成功した分のデータはinsertされたままです
  • replace
    • 一時テーブルにinsertした後、対象テーブルに上書きします
  • replace_backup
    • 基本的にはreplaceと同じですが、対象テーブルを上書きする前にバックアップテーブルにデータを退避します
  • delete_in_advance
    • 対象のテーブルやパーティションのデータを削除してからinsertします
    • 一時テーブルは利用しません

BigQueryにはプライマリーキーのような概念はないようなので、同じデータのinputに対して何度も実行した時に重複データができないようにするには、replace,replace_backup,delete_in_advanceの3つを使うしかないと言う事になります。

パーティションテーブル利用時に冪等性を担保するためには

time_partitioning.field を利用している場合は先にも述べたようにreplaceなどは全データが対象になるので、対象のパーティションのみを入れ替えるようなことができません。(毎日全データを対象にしてデータを入れ替えるような運用の場合は大丈夫ですが)


一応、time_partitioning.fieldパーティションデコレータを組み合わせることもできますが、inputに日を跨ぐようなデータが混ざる場合は下記のようなエラーになってしまいます。

org.embulk.exec.PartialExecutionException: org.jruby.exceptions.RaiseException: (Error) failed during waiting a Load job, get_job(test-embulk-287601, embulk_load_job_44809189-0ec6-4cac-9cbd-58a70da7bd93), errors:[{:reason=>"invalid", :message=>"Some rows belong to different partitions rather than destination partition 20200820"}]

なので、異なる日付データが含まれないことが保証されるのであればカラムベースでreplaceやdelete_in_advanceを使って重複データが含まれないようにすることも可能そうです。(試した感じでは一応期待した動きにはなっている)

out:
  ・・・
  table: access_log${{ target_date | replace: "-", "" }}
  time_partitioning:
    type: DAY
    field: date
  mode: replace

※他に調べていたところembulk-filter-rowを使うと行の除外もできるので、これを利用して対象の日付の行以外を除外すれば問題なさそう

カラム変更

schema_update_optionsを指定するとカラム追加に対応できます。
ただ、一時テーブルを作るappend,replace,replace_backupでは元とカラムが異なってしまいコピーできないので利用できません。(なので重複データの考慮もするのであればdelete_in_advance一択となります)
また、time-partitioningの項に書いてあるとおり追加のみで、カラム変更や削除はできません。

Use Tables: patch API to update the schema of the partitioned table, embulk-output-bigquery itself does not support it, though. Note that only adding a new column, and relaxing non-necessary columns to be NULLABLE are supported now. Deleting columns, and renaming columns are not supported.

schema_update_options のALLOW_FIELD_ADDITIONなどについては公式ドキュメント(--schema_update_option)を参照
cloud.google.com

サンドボックス

これはembulk-output-bigqueryと言うよりはBigQueryのサンドボックスモードの場合の注意点です。
GCPはクレジットカードを登録しなくても無料枠の範囲内で利用できるので気軽に試せるのですが、BigQueryはその場合はサンドボックスモードとなり制限があります。

cloud.google.com

古いデータなどで試している場合にパーティションに利用しているカラムが60日より前の日付だとinsertに成功してもデータが0件になります。
appendやreplaceなど一時テーブルに一度インサート→コピーするようなmodeの場合はinとoutの件数が違ってembulk実行時にエラーになります。
delete_in_advanceのように直接デーブルにinsertするような場合はエラーにもならず成功するので、サンドボックスで試している場合はデータの鮮度?に気をつけましょう。

参考

初めてembulk-output-bigqueryを利用するので冪等性はどうやったらできるかな?と調べていて下記の記事を見つけたので、自分でも色々検証してみました。 qiita.com

これ以外だとmercariさんのような感じで差分更新を実現しないといけないので、中々大変そうです。
engineering.mercari.com