【分散処理】PySpark ~ パーティション単位で上書きするには ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/07/06/120134

で、パーティション付のテーブルにアクセスした際に
実ファイルがなくなっていた現象が発生していた。

この現象のそもそも原因が、ファイルを書きだすときの
モード mode = overwrite だったのでメモしておく。

目次

【1】書き込みモード overwrite 時の注意点
【2】パーティション単位で上書きするには
【3】spark.sql.sources.partitionOverwriteMode について

【1】書き込みモード overwrite 時の注意点

パーティション指定時に書き込みモード overwrite 時を指定した場合、
別パーティションの既存にある実ファイルがなくなる可能性がある

実例

 以下のコード例のように
パーティション指定(今回の場合「updated_date=20210706」)
+書き込みモード overwrite 時を指定した場合に
別パーティションの既存にある実ファイルがなくなっていた。
「updated_date=20210704」「updated_date=20210705」など
なくなっていた。

コード例

-- パーティション「updated_date=20210706」が指定されている場合
data_frame.write \
  .mode('overwrite') \
  .partitionBy('updated_date') \
  .csv('s3://your-bucket-name/xxx/sample_db/sample_table')

ファイル書き出し前のパス状況

s3://your-bucket-name/xxx/sample_db/sample_table
├── updated_date=20210704/
|       └── part-0001.csv
|       └── …
├── updated_date=20210705/
|       └── part-0001.csv
|       └── …
├── updated_date=…
 ... 

# 「updated_date=20210706」を追加したい

ファイル書き出し後のパス状況

s3://your-bucket-name/xxx/sample_db/sample_table
 ├── updated_date=20210706/
        └── part-0001.csv
        └── …

# 「updated_date=20210704」「updated_date=20210705」など
# なくなっていた。

実行環境

* AWS Glue v2.0

https://dk521123.hatenablog.com/entry/2020/08/19/130118

より抜粋

* Spark 2.4.3
* Python 3.7

【2】パーティション単位で上書きするには

 Sparkでパーティション単位で上書きする場合、
以下のプロパティ「spark.sql.sources.partitionOverwriteMode=dynamic」
を設定するといい。(実際にこれで解決した)

修正例

# ★ここを追加★
spark.conf \
 .set("spark.sql.sources.partitionOverwriteMode", "dynamic")

-- パーティション「updated_date=20210706」が指定されている場合
data_frame.write \
  .mode('overwrite') \
  .partitionBy('updated_date') \
  .csv('s3://your-bucket-name/xxx/sample_db/sample_table')

補足:Spark バージョン 2.4以上の場合

-- 試していないが、Spark バージョン 2.4以上だと
-- partitionOverwriteModeプロパティが追加されたので以下のようにできる
data_frame.write \
  .option("partitionOverwriteMode", "dynamic") \
  .mode('overwrite') \
  .partitionBy('updated_date') \
  .csv('s3://your-bucket-name/xxx/sample_db/sample_table')

【3】spark.sql.sources.partitionOverwriteMode について

* パーティションの上書きモードの指定。
=> 詳細は以下の通り。

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-properties.html

# 設定値 説明
1 static デフォルト。 Sparkは、指定された全てのパーティション(e.g. PARTITION(a=1,b)) 上書き前に消す
2 dynamic Sparkは前のパーティションを消さずに指定されたパーティションのみ上書きする

参考文献

https://yamap55.hatenablog.com/entry/2019/07/25/200000
https://note.com/shplofem/n/nb1aa532d9ab0

関連記事

PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション
https://dk521123.hatenablog.com/entry/2021/05/13/110811
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637
テーブルアクセス時に例外「Vertex failed, ... InvalidInputException」が発生する
https://dk521123.hatenablog.com/entry/2021/07/06/120134