■ はじめに
PySpark で ファイルを出力した場合 出力先パスは指定できるが、ファイル名は勝手に決められる。 このファイル名を変更するやり方を調べてみた。
目次
【1】PySpark での リネーム方法 【2】サンプル 【3】補足1:拡張子 CRCファイル について 【4】補足2:AWS(S3)環境の場合
【1】PySpark での リネーム方法
http://hatunina.hatenablog.com/entry/2018/08/06/200915
をベースに実装してみた。 HadoopのFile Systemの機能を使って行うっぽい。
【2】サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ ('x0001', 'Mike', 'Sales'), ('x0002', 'Tom', 'IT'), ('x0003', 'Sam', 'Sales'), ('x0004', 'Kevin', 'Human resources'), ('x0005', 'Bob', 'IT'), ('x0006', 'Alice', 'Banking'), ('x0007', 'Carol', 'IT'), ]) schema = StructType([ StructField('id', StringType(), False), StructField('name', StringType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) data_frame.show() # To avoid CRC files (★以下「【3】補足」参照★) spark_context._jsc.hadoopConfiguration().set( "mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") data_frame.write \ .mode('overwrite') \ .csv('./out/temp/', sep=",", header=False) # ここからリネーム処理 from py4j.java_gateway import java_import java_import(spark._jvm, "org.apache.hadoop.fs.Path") file_system = spark._jvm.org.apache.hadoop.fs.FileSystem.get( spark._jsc.hadoopConfiguration()) target_path = spark_context._jvm.Path('./out/temp/part*') for index, file_status in enumerate( file_system.globStatus(target_path)): file_name = file_status.getPath().getName() print(f"file_name = {file_name}") # リネーム file_system.rename( spark_context._jvm.Path( './out/temp/' + file_name), spark_context._jvm.Path( './out/final/output{:08}.csv'.format(index))) # CRCファイルの削除 (★以下「【3】補足」参照★) file_system.delete(spark_context._jvm.Path( './out/final/.output{:08}.csv.crc'.format(index)), True) # 元ファイルの削除 file_system.delete(spark_context._jvm.Path('./out/temp/'), True)
【3】補足1:拡張子 CRCファイル について
* HDFSでは、拡張子:CRCファイルが出力される => CRC : Cyclic Redundancy Check (巡回冗長検査)? => 上記のサンプルは、出力抑制 / 削除している
CRCファイル「SUCCESS」の出力抑制
CRCファイル「SUCCESS」の出力抑制 について 以下のコードで行えた。 ~~~~~ spark_context._jsc.hadoopConfiguration().set( "mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") ~~~~~ 以下のサイトを参考に行った。感謝。
https://stackoverflow.com/questions/34382612/how-to-avoid-generating-crc-files-and-success-files-while-saving-a-dataframe
https://qiita.com/Yarimizu14/items/500891c7403e9e4d8dbd
【4】補足2:AWS(S3)環境の場合
【2】を実装して思ったが、AWS の S3上にある場合、 新規Tempファルダにファイル出力して、 そのフォルダ配下のファイル「part*」を一覧で取得し、 指定のファルダ先にリネーム&移動するでもいいかなっと思った。 (サンプルもほど同じことやっているし) => 実際、boto3 API で実装してみたら、やりたいことができた。 => boto3 API については、以下の関連記事を参照のこと。
Amazon S3 ~ Python boto3でS3を操作する ~
https://dk521123.hatenablog.com/entry/2019/10/21/230004
参考文献
http://hatunina.hatenablog.com/entry/2018/08/06/200915
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
Amazon S3 ~ Python boto3でS3を操作する ~
https://dk521123.hatenablog.com/entry/2019/10/21/230004