【分散処理】PySpark で 出力ファイル名を変更する

■ はじめに

PySpark で ファイルを出力した場合
出力先パスは指定できるが、ファイル名は勝手に決められる。
このファイル名を変更するやり方を調べてみた。

目次

【1】PySpark での リネーム方法
【2】サンプル
【3】補足1:拡張子 CRCファイル について
【4】補足2:AWS(S3)環境の場合

【1】PySpark での リネーム方法

http://hatunina.hatenablog.com/entry/2018/08/06/200915

をベースに実装してみた。
HadoopのFile Systemの機能を使って行うっぽい。

【2】サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  ('x0001', 'Mike', 'Sales'),
  ('x0002', 'Tom', 'IT'),
  ('x0003', 'Sam', 'Sales'),
  ('x0004', 'Kevin', 'Human resources'),
  ('x0005', 'Bob', 'IT'),
  ('x0006', 'Alice', 'Banking'),
  ('x0007', 'Carol', 'IT'),
])
schema = StructType([
  StructField('id', StringType(), False),
  StructField('name', StringType(), False),
  StructField('job', StringType(), False),
])

data_frame = spark.createDataFrame(rdd, schema)
data_frame.show()

# To avoid CRC files (★以下「【3】補足」参照★)
spark_context._jsc.hadoopConfiguration().set(
  "mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data_frame.write \
  .mode('overwrite') \
  .csv('./out/temp/',
    sep=",",
    header=False)

# ここからリネーム処理
from py4j.java_gateway import java_import
java_import(spark._jvm, "org.apache.hadoop.fs.Path")

file_system = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
  spark._jsc.hadoopConfiguration())
target_path = spark_context._jvm.Path('./out/temp/part*')
for index, file_status in enumerate(
  file_system.globStatus(target_path)):
  file_name = file_status.getPath().getName()
  print(f"file_name = {file_name}")

  # リネーム
  file_system.rename(
    spark_context._jvm.Path(
      './out/temp/' + file_name),
    spark_context._jvm.Path(
      './out/final/output{:08}.csv'.format(index)))
  # CRCファイルの削除 (★以下「【3】補足」参照★)
  file_system.delete(spark_context._jvm.Path(
    './out/final/.output{:08}.csv.crc'.format(index)), True)

# 元ファイルの削除
file_system.delete(spark_context._jvm.Path('./out/temp/'), True)

【3】補足1:拡張子 CRCファイル について

* HDFSでは、拡張子:CRCファイルが出力される
 => CRC : Cyclic Redundancy Check (巡回冗長検査)?
 => 上記のサンプルは、出力抑制 / 削除している

CRCファイル「SUCCESS」の出力抑制

CRCファイル「SUCCESS」の出力抑制 について
以下のコードで行えた。
~~~~~
spark_context._jsc.hadoopConfiguration().set(
  "mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
~~~~~

以下のサイトを参考に行った。感謝。

https://stackoverflow.com/questions/34382612/how-to-avoid-generating-crc-files-and-success-files-while-saving-a-dataframe
https://qiita.com/Yarimizu14/items/500891c7403e9e4d8dbd

【4】補足2:AWS(S3)環境の場合

 【2】を実装して思ったが、AWS の S3上にある場合、
新規Tempファルダにファイル出力して、
そのフォルダ配下のファイル「part*」を一覧で取得し、
指定のファルダ先にリネーム&移動するでもいいかなっと思った。
(サンプルもほど同じことやっているし)
 => 実際、boto3 API で実装してみたら、やりたいことができた。
 => boto3 API については、以下の関連記事を参照のこと。

Amazon S3Python boto3でS3を操作する ~
https://dk521123.hatenablog.com/entry/2019/10/21/230004

参考文献

http://hatunina.hatenablog.com/entry/2018/08/06/200915

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
Amazon S3Python boto3でS3を操作する ~
https://dk521123.hatenablog.com/entry/2019/10/21/230004