【分散処理】PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~

■ はじめに

 PySpark でファイルを出力した際に、
サイズが0Byteの空ファイルが出力されたので
対応について調べてみた。
ついでに、ファイルを1つにまとめることも載せておく。

目次

【1】対応方法

【2】出力ファイルが空ファイルになる
 1)出力される原因
 2)サンプル

【3】出力ファイルを1ファイルに纏める
 1)サンプル

【1】対応方法

以下のサイトが、凄く参考になった。勉強になった。感謝。
(ゼロファイルが出力しないって、プロパティはないんだな、、、)

https://qiita.com/kwbt/items/73fdcccfde319644f447

結論からいう、パーティション数を調整すれば可能。
パーティション(Partition)については、以下の関連記事を参照のこと。

PySpark ~ パーティション
https://dk521123.hatenablog.com/entry/2021/05/13/110811
Hive / HiveQL ~ パーティション / 基本編 ~
https://dk521123.hatenablog.com/entry/2020/07/16/224332
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637

【2】出力ファイルが空ファイルになる

1)出力される原因

https://qiita.com/kwbt/items/73fdcccfde319644f447

より抜粋
~~~~~~~~~~~~~~
出力するデータが無いパーティションは、
出力をしないのではなく、
レコードの無い空ファイルを出力します。
~~~~~~~~~~~~~~

2)サンプル

以下のサイトを参考にフルコードで作成してみた。

https://qiita.com/kwbt/items/73fdcccfde319644f447

from pyspark import SparkContext
from pyspark.sql import SparkSession

THRESHOLD = 5

spark_context = SparkContext()
spark = SparkSession(spark_context)

data_frame = spark.read.csv(
  "./input_files/", header=True)
data_frame.show()

count = data_frame.count()
print(f"Data count = {count}")
if count == 0:
  print("No need to output.")
else:
  # パーティション数調整判定処理
  partition_number = data_frame.rdd.getNumPartitions()
  print(f"Original partition number = {partition_number}")
  if count <= THRESHOLD:
    # 一つにパーティションをまとめる
    partition_number = 1
  elif (data_frame.rdd.getNumPartitions() * THRESHOLD) > count:
    # データ件数がパーティション数の5倍より少ない場合、パーティション数を調整
    partition_number = int(count / THRESHOLD)

  if partition_number != data_frame.rdd.getNumPartitions():
    # パーティション数を補正する
    print(f"To modify partition number to {partition_number}")
    data_frame = data_frame.repartition(partition_number)

  data_frame.write.mode('overwrite').csv(
    "./output_files",
    header=False,
    quoteAll=True)

【3】出力ファイルを1ファイルに纏める

【2】のような計算をするのでなく、あまり大きいデータ数ではない場合
シンプルに1ファイルにまとめることをやってみる

1)サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

# 1つのファイルに纏めるデータ数の閾値
THRESHOLD_DATA_NUMBER_FOR_ONE_PARTITION = 10

spark_context = SparkContext()
spark = SparkSession(spark_context)

data_frame = spark.read.csv(
  "./input_files/", header=True)
data_frame.show()

count = data_frame.count()
print(f"Data count = {count}")
if count == 0:
  print("No need to output.")
else:
  # 一定数以下だったらパーティション数1で対応
  if count <= THRESHOLD_DATA_NUMBER_FOR_ONE_PARTITION:
    # パーティション数を補正する
    print(f"To modify partition number to 1")
    data_frame = data_frame.repartition(1)

  data_frame.write.mode('overwrite').csv(
    "./output_files",
    header=False,
    quoteAll=True)

参考文献

https://qiita.com/kwbt/items/73fdcccfde319644f447

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション
https://dk521123.hatenablog.com/entry/2021/05/13/110811
Hive / HiveQL ~ パーティション / 基本編 ~
https://dk521123.hatenablog.com/entry/2020/07/16/224332
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637