■ はじめに
PySpark で、Parquet フォーマットで 保存する必要ができたので調べてみた Parquet ファイルに関しては、以下の関連記事を参照のこと。
Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000
目次
【1】サポートされている圧縮形式 [補足] Pandas の場合 [補足] AWS Glueの場合 【2】データ型 【3】Parquet 書き込み [補足] セーブモード (mode) 【4】Parquet 読み込み
【1】サポートされている圧縮形式
https://spark.apache.org/docs/2.4.3/sql-data-sources-parquet.html
spark.sql.parquet.compression.codec
* none * uncompressed:非圧縮 * snappy(デフォルト) * gzip * lzo * brotli => BrotliCodec がインストールされること * lz4 * zstd => ZStandardCodec がインストールされること
[補足] Pandas の場合
以下の関連記事より抜粋 ~~~~ compression=‘snappy’, ‘gzip’, ‘brotli’, None ~~~~
https://dk521123.hatenablog.com/entry/2021/04/10/192752
[補足] AWS Glueの場合
* 以下の公式サイトを参照。
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
【2】データ型
http://mogile.web.fc2.com/spark/sql-ref-datatypes.html#supported-data-types
に記載されている。 * BigInt型は、Longで表現 => 以下の関連記事のサンプルを参照のこと。
https://dk521123.hatenablog.com/entry/2021/11/13/095519
【3】Parquet 書き込み
* data_frame.write.parquet("【出力パス】")で行う
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ (1, 'Mike', 45, 'Sales'), (2, 'Tom', 65, 'IT'), (3, 'Sam', 32, 'Sales'), (4, 'Kevin', 28, 'Human resources'), (5, 'Bob', 25, 'IT'), (6, 'Alice', 20, 'Banking'), (7, 'Carol', 30, 'IT'), ]) schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), False), StructField('age', IntegerType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) # 書き込み data_frame.write \ .mode('overwrite') \ .option("compression", "gzip") \ .parquet("./out") data_frame.show() print("Done")
[補足] セーブモード (mode)
http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html#save-modes
1)error ... データが存在する場合、例外を発生させる 2)append ... データ/テーブル が存在する場合、既存データに追記 3)overwrite ... データ/テーブル が存在する場合、既存データで上書きする 4)ignore ... データ が存在する場合、何もしない(ignore=無視)
append / overwrite との使い分け
結論からすると、仕様(フォルダ構成含む)に応じて使い分けていけば overwriteの場合、特に気にせずに 冪等性(べきとうせい)が担保されるが appendの場合、重複して登録されてしまう => 例えば、AWS Glue や Lambda などで、Write途中でエラーになり、 再実行された場合、overwriteであれば問題ないが、 appendの場合、二重にデータを保存されてしまう。
参考文献
https://yamap55.hatenablog.com/entry/2019/07/25/200000
【4】Parquet 読み込み
* data_frame = spark.read.parquet("【読み込みパス】")で行う
サンプル:【3】で書き込みしたファイルを読み込む
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) # 読み込み data_frame = spark.read.parquet("./out") data_frame.show() print("Done")
出力結果
+---+-----+---+---------------+ | id| name|age| job| +---+-----+---+---------------+ | 4|Kevin| 28|Human resources| | 6|Alice| 20| Banking| | 1| Mike| 45| Sales| | 3| Sam| 32| Sales| | 7|Carol| 30| IT| | 2| Tom| 65| IT| | 5| Bob| 25| IT| +---+-----+---+---------------+
参考文献
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0
関連記事
Pandas ~ to_xxxx / 出力編 ~
https://dk521123.hatenablog.com/entry/2021/04/10/192752
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
Python ~ Parquet ~
https://dk521123.hatenablog.com/entry/2021/11/13/095519