【分散処理】PySpark ~ Parquet / 基本編 ~

■ はじめに

PySpark で、Parquet フォーマットで
保存する必要ができたので調べてみた

Parquet ファイルに関しては、以下の関連記事を参照のこと。

Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000

目次

【1】サポートされている圧縮形式
 [補足] Pandas の場合
 [補足] AWS Glueの場合
【2】データ型
【3】Parquet 書き込み
 [補足] セーブモード (mode)
【4】Parquet 読み込み

【1】サポートされている圧縮形式

https://spark.apache.org/docs/2.4.3/sql-data-sources-parquet.html
spark.sql.parquet.compression.codec

* none
* uncompressed:非圧縮
* snappy(デフォルト)
* gzip
* lzo
* brotli
 => BrotliCodec がインストールされること
* lz4
* zstd
 => ZStandardCodec がインストールされること

[補足] Pandas の場合

以下の関連記事より抜粋
 ~~~~
compression=‘snappy’, ‘gzip’, ‘brotli’, None
 ~~~~

https://dk521123.hatenablog.com/entry/2021/04/10/192752

[補足] AWS Glueの場合

* 以下の公式サイトを参照。

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

【2】データ型

http://mogile.web.fc2.com/spark/sql-ref-datatypes.html#supported-data-types

に記載されている。

* BigInt型は、Longで表現
 => 以下の関連記事のサンプルを参照のこと。

https://dk521123.hatenablog.com/entry/2021/11/13/095519

【3】Parquet 書き込み

* data_frame.write.parquet("【出力パス】")で行う

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales'),
  (2, 'Tom', 65, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 28, 'Human resources'),
  (5, 'Bob', 25, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

# 書き込み
data_frame.write \
  .mode('overwrite') \
  .option("compression", "gzip") \
  .parquet("./out")

data_frame.show()

print("Done")

[補足] セーブモード (mode)

http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html#save-modes

1)error ... データが存在する場合、例外を発生させる
2)append ... データ/テーブル が存在する場合、既存データに追記
3)overwrite ... データ/テーブル が存在する場合、既存データで上書きする
4)ignore ...  データ が存在する場合、何もしない(ignore=無視)

append / overwrite との使い分け

結論からすると、仕様(フォルダ構成含む)に応じて使い分けていけば

overwriteの場合、特に気にせずに 冪等性(べきとうせい)が担保されるが
appendの場合、重複して登録されてしまう
 => 例えば、AWS Glue や Lambda などで、Write途中でエラーになり、
  再実行された場合、overwriteであれば問題ないが、
  appendの場合、二重にデータを保存されてしまう。

参考文献
https://yamap55.hatenablog.com/entry/2019/07/25/200000

【4】Parquet 読み込み

* data_frame = spark.read.parquet("【読み込みパス】")で行う

サンプル:【3】で書き込みしたファイルを読み込む

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

# 読み込み
data_frame = spark.read.parquet("./out")

data_frame.show()

print("Done")

出力結果

+---+-----+---+---------------+
| id| name|age|            job|
+---+-----+---+---------------+
|  4|Kevin| 28|Human resources|
|  6|Alice| 20|        Banking|
|  1| Mike| 45|          Sales|
|  3|  Sam| 32|          Sales|
|  7|Carol| 30|             IT|
|  2|  Tom| 65|             IT|
|  5|  Bob| 25|             IT|
+---+-----+---+---------------+

参考文献

https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0

関連記事

Pandas ~ to_xxxx / 出力編 ~
https://dk521123.hatenablog.com/entry/2021/04/10/192752
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
Python ~ Parquet ~
https://dk521123.hatenablog.com/entry/2021/11/13/095519