【分散処理】PySpark ~ パーティション ~

■ はじめに

 PySpark に関して、
パーティション (Partition) 付きでファイル出力があったので
そのことを含めて、パーティションにまつわるTipsをまとめておく

cf. Partition = 仕切り壁、分割、分配

目次

【1】パーティションの基本操作
 1)現在のパーティション数を取得する
 2)パーティションを変更する

【2】その他のパーティションに関わる操作
 1)パーティション付きでファイル出力する
 2)ファイル分割して出力する
 3)出力ファイルを1つにまとめたい

【3】出力ファイルを1つにまとめたい
【4】パーティション単位で上書きするには

【1】パーティションの基本操作

1)現在のパーティション数を取得する

partition_number = data_frame.rdd.getNumPartitions()
print(f"partition_number = {partition_number}")

2)パーティションを変更する

data_frame.repartition(partition_number)
print(f"partition_number = {partition_number}")

【2】その他のパーティションに関わる操作

1)パーティション付きでファイル出力する

解決方法

* data_frame.write.partitionBy(【パーティション】)で指定する

構文例

data_frame.write \
  .mode('overwrite') \
  .partitionBy('partition1', partition2', ...) \
  .csv('./out/csv/')

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  ('x0001', 'Mike', 'Sales'),
  ('x0002', 'Tom', 'IT'),
  ('x0003', 'Sam', 'Sales'),
  ('x0004', 'Kevin', 'Human resources'),
  ('x0005', 'Bob', 'IT'),
  ('x0006', 'Alice', 'Banking'),
  ('x0007', 'Carol', 'IT'),
])
schema = StructType([
  StructField('id', StringType(), False),
  StructField('name', StringType(), False),
  StructField('job', StringType(), False),
])

data_frame = spark.createDataFrame(rdd, schema)

# ★ここに注目★
data_frame.write \
  .mode('overwrite') \
  .partitionBy('job') \
  .csv('./out/csv/')

出力結果「./out/csv配下のファイル」

./out/csv
 + job=Banking
   + part-00000-xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx-c000.csv
 + job=Human%20resources <= 空白は「%20」で変換
   + part-00000-yyyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyyy-c000.csv
 + job=IT
   + part-00000-zzzzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzzzzz-c000.csv
 + job=Sales
   + part-00000-ZZZZZZZZZZ-XXXX-YYYY-zzzz-zzzzzzzzzzzzzzz-c000.csv
 + ... (CRCファイルなど)

 => パーティションごとに分けられた
 => できれば、空白の入らないデータ項目で行った方がいいっぽい

参考文献
https://yamap55.hatenablog.com/entry/2019/07/25/200000
http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html

2)ファイル分割して出力する

https://dk521123.hatenablog.com/entry/2021/05/10/143328
https://dk521123.hatenablog.com/entry/2021/04/22/131849

のトラブルの解決案として、
ファイルを物理的に分割することを上げたが
実際にその具体的な方法を言及してなかったので
今回取り上げる。

解決方法

* data_frame.repartition(【分割数】)で指定する
* repartition(【パーティション数】, 【項目名】) だと、
 指定した項目名で指定したパーティション数に分割する

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.repartition.html
構文例

data_frame.repartition(2) \
  .write \
  .mode('overwrite') \
  .csv('./out/csv/', header=True)

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

# まず読み込む
data_frame = spark.read.csv("input.csv", header=True)

# ★ここに注目★ 2ファイルに分割して出力する
data_frame.repartition(2) \
  .write \
  .mode('overwrite') \
  .csv('./out/csv/', header=True)

入力ファイル「input.csv

id,name,job
x0001,Mike,Sales
x0002,Tom,IT
x0003,Sam,Sales
x0004,Kevin,Human resources
x0005,Bob,IT
x0006,Alice,Banking
x0007,Carol,IT
x0008,Tom,Banking
x0009,Mike,IT
x0010,Bob,Sales

出力結果「./out/csv配下のファイル」

./out/csv
 + part-00000-xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx-c000.csv
 + part-00000-yyyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyyy-c000.csv
 + ... (CRCファイルなど)

 => ファイルが2つに分割されている
 => どちらのファイルにもCSVヘッダー付

参考文献

https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0
https://blog.imind.jp/entry/2019/09/21/160754
https://yohei-a.hatenablog.jp/entry/20181206/1544036150

【3】出力ファイルを1つにまとめたい

以下の関連記事を参照のこと

PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254

【4】パーティション単位で上書きするには

以下の関連記事を参照のこと

PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
Hive / HiveQL ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/09/18/113637
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254