■ はじめに
PySpark でファイルを出力した際に、 サイズが0Byteの空ファイルが出力されたので 対応について調べてみた。 ついでに、ファイルを1つにまとめることも載せておく。
目次
【1】対応方法 【2】出力ファイルが空ファイルになる 1)出力される原因 2)サンプル 【3】出力ファイルを1ファイルに纏める 1)サンプル
【1】対応方法
以下のサイトが、凄く参考になった。勉強になった。感謝。 (ゼロファイルが出力しないって、プロパティはないんだな、、、)
https://qiita.com/kwbt/items/73fdcccfde319644f447
結論からいう、パーティション数を調整すれば可能。 パーティション(Partition)については、以下の関連記事を参照のこと。
PySpark ~ パーティション ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811
Hive / HiveQL ~ パーティション / 基本編 ~
https://dk521123.hatenablog.com/entry/2020/07/16/224332
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637
【2】出力ファイルが空ファイルになる
1)出力される原因
https://qiita.com/kwbt/items/73fdcccfde319644f447
より抜粋 ~~~~~~~~~~~~~~ 出力するデータが無いパーティションは、 出力をしないのではなく、 レコードの無い空ファイルを出力します。 ~~~~~~~~~~~~~~
2)サンプル
以下のサイトを参考にフルコードで作成してみた。
https://qiita.com/kwbt/items/73fdcccfde319644f447
from pyspark import SparkContext from pyspark.sql import SparkSession THRESHOLD = 5 spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv( "./input_files/", header=True) data_frame.show() count = data_frame.count() print(f"Data count = {count}") if count == 0: print("No need to output.") else: # パーティション数調整判定処理 partition_number = data_frame.rdd.getNumPartitions() print(f"Original partition number = {partition_number}") if count <= THRESHOLD: # 一つにパーティションをまとめる partition_number = 1 elif (data_frame.rdd.getNumPartitions() * THRESHOLD) > count: # データ件数がパーティション数の5倍より少ない場合、パーティション数を調整 partition_number = int(count / THRESHOLD) if partition_number != data_frame.rdd.getNumPartitions(): # パーティション数を補正する print(f"To modify partition number to {partition_number}") data_frame = data_frame.repartition(partition_number) data_frame.write.mode('overwrite').csv( "./output_files", header=False, quoteAll=True)
【3】出力ファイルを1ファイルに纏める
【2】のような計算をするのでなく、あまり大きいデータ数ではない場合 シンプルに1ファイルにまとめることをやってみる
1)サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession # 1つのファイルに纏めるデータ数の閾値 THRESHOLD_DATA_NUMBER_FOR_ONE_PARTITION = 10 spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv( "./input_files/", header=True) data_frame.show() count = data_frame.count() print(f"Data count = {count}") if count == 0: print("No need to output.") else: # 一定数以下だったらパーティション数1で対応 if count <= THRESHOLD_DATA_NUMBER_FOR_ONE_PARTITION: # パーティション数を補正する print(f"To modify partition number to 1") data_frame = data_frame.repartition(1) data_frame.write.mode('overwrite').csv( "./output_files", header=False, quoteAll=True)
参考文献
https://qiita.com/kwbt/items/73fdcccfde319644f447
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811
Hive / HiveQL ~ パーティション / 基本編 ~
https://dk521123.hatenablog.com/entry/2020/07/16/224332
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637