■ はじめに
PySpark に関して、 パーティション (Partition) 付きでファイル出力があったので そのことを含めて、パーティションにまつわるTipsをまとめておく cf. Partition = 仕切り壁、分割、分配
目次
【1】パーティションの基本操作 1)現在のパーティション数を取得する 2)パーティションを変更する 【2】その他のパーティションに関わる操作 1)パーティション付きでファイル出力する 2)ファイル分割して出力する 3)出力ファイルを1つにまとめたい 【3】出力ファイルを1つにまとめたい 【4】パーティション単位で上書きするには
【1】パーティションの基本操作
1)現在のパーティション数を取得する
partition_number = data_frame.rdd.getNumPartitions() print(f"partition_number = {partition_number}")
2)パーティションを変更する
data_frame.repartition(partition_number) print(f"partition_number = {partition_number}")
【2】その他のパーティションに関わる操作
1)パーティション付きでファイル出力する
解決方法
* data_frame.write.partitionBy(【パーティション】)で指定する
構文例
data_frame.write \ .mode('overwrite') \ .partitionBy('partition1', partition2', ...) \ .csv('./out/csv/')
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ ('x0001', 'Mike', 'Sales'), ('x0002', 'Tom', 'IT'), ('x0003', 'Sam', 'Sales'), ('x0004', 'Kevin', 'Human resources'), ('x0005', 'Bob', 'IT'), ('x0006', 'Alice', 'Banking'), ('x0007', 'Carol', 'IT'), ]) schema = StructType([ StructField('id', StringType(), False), StructField('name', StringType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) # ★ここに注目★ data_frame.write \ .mode('overwrite') \ .partitionBy('job') \ .csv('./out/csv/')
出力結果「./out/csv配下のファイル」
./out/csv + job=Banking + part-00000-xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx-c000.csv + job=Human%20resources <= 空白は「%20」で変換 + part-00000-yyyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyyy-c000.csv + job=IT + part-00000-zzzzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzzzzz-c000.csv + job=Sales + part-00000-ZZZZZZZZZZ-XXXX-YYYY-zzzz-zzzzzzzzzzzzzzz-c000.csv + ... (CRCファイルなど) => パーティションごとに分けられた => できれば、空白の入らないデータ項目で行った方がいいっぽい
参考文献
https://yamap55.hatenablog.com/entry/2019/07/25/200000
http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html
2)ファイル分割して出力する
https://dk521123.hatenablog.com/entry/2021/05/10/143328
https://dk521123.hatenablog.com/entry/2021/04/22/131849
のトラブルの解決案として、 ファイルを物理的に分割することを上げたが 実際にその具体的な方法を言及してなかったので 今回取り上げる。
解決方法
* data_frame.repartition(【分割数】)で指定する * repartition(【パーティション数】, 【項目名】) だと、 指定した項目名で指定したパーティション数に分割する
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.repartition.html
構文例
data_frame.repartition(2) \ .write \ .mode('overwrite') \ .csv('./out/csv/', header=True)
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) # まず読み込む data_frame = spark.read.csv("input.csv", header=True) # ★ここに注目★ 2ファイルに分割して出力する data_frame.repartition(2) \ .write \ .mode('overwrite') \ .csv('./out/csv/', header=True)
入力ファイル「input.csv」
id,name,job x0001,Mike,Sales x0002,Tom,IT x0003,Sam,Sales x0004,Kevin,Human resources x0005,Bob,IT x0006,Alice,Banking x0007,Carol,IT x0008,Tom,Banking x0009,Mike,IT x0010,Bob,Sales
出力結果「./out/csv配下のファイル」
./out/csv + part-00000-xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx-c000.csv + part-00000-yyyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyyy-c000.csv + ... (CRCファイルなど) => ファイルが2つに分割されている => どちらのファイルにもCSVヘッダー付
参考文献
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0
https://blog.imind.jp/entry/2019/09/21/160754
https://yohei-a.hatenablog.jp/entry/20181206/1544036150
【3】出力ファイルを1つにまとめたい
以下の関連記事を参照のこと
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
【4】パーティション単位で上書きするには
以下の関連記事を参照のこと
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254