【Python】Python で Parquet を扱う

■ はじめに

 Parquet (パーケット) 形式のファイルを取り込むことになって
そのためのテストデータ(ファイル)を作りたいので、
Python で Parquet を扱う方法をメモしておく。

Parquet については、以下の関連記事を参照のこと。

Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000

目次

【0】Python で Parquet を扱う方法について
【1】Pandas を使用する
【2】PyArrow を使用する
【3】PySpark を使用する

【0】Python で Parquet を扱う方法について

* 方法は、(他にもあるかもしれないが)以下の通り。
~~~~~~~~~~~~~~
[1] Pandas を使用する
[2] PyArrow を使用する
[3] PySpark を使用する
~~~~~~~~~~~~~~

個人的な意見

最終的にどんなファイルを作りたいか、読み込ませたいのか
にもよって、使い分ければいいが、目安として
気軽にやりたいなら、「[1] Pandas を使用する」
ビッグデータを扱っているなら、「[3] PySpark を使用する」
っとなるのかなっと。

【1】Pandas を使用する

1)出力・書き出し

以前、以下の関連記事で扱った to_parquet() でいける。

https://dk521123.hatenablog.com/entry/2021/04/10/192752

API仕様:to_parquet
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html

2)入力・読み込み

read_parquet() を使う。

API仕様:read_parquet
https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html

3)サンプル

import pandas as pd


file_name = "customer.parq"
header_list = ['id', 'name', 'city', 'birth_day', 'created_at']
body_list = [
  ["1", "Mike", "Dublin", "2010-10-11", "2021-11-12 23:12:32"],
  ["2", "Sam", "Tokyo", "1988-01-21", "2021-11-12 23:12:32"],
  ["3", "Tom", "London", "1999-12-31", "2021-11-12 23:12:32"],
]

data_frame = pd.DataFrame(data=body_list, columns=header_list)
# 型変換
data_frame['id'] = data_frame['id'].astype('int64')
data_frame['name'] = data_frame['name'].astype('str')
data_frame['city'] = data_frame['city'].astype('str')
data_frame['birth_day'] = data_frame['birth_day'].astype('datetime64[ns]')
data_frame['created_at'] = data_frame['created_at'].astype('datetime64[ns]')

print(data_frame)
print(data_frame.info())

# 1)出力・書き出し - to_parquet()
data_frame.to_parquet(file_name, compression='GZIP')

# 2)入力・読み込み - read_parquet()
loaded_data_frame = pd.read_parquet(file_name)

print(loaded_data_frame)
print(loaded_data_frame.info())

出力結果

   id  name    city  birth_day          created_at
0   1  Mike  Dublin 2010-10-11 2021-11-12 23:12:32
1   2   Sam   Tokyo 1988-01-21 2021-11-12 23:12:32
2   3   Tom  London 1999-12-31 2021-11-12 23:12:32
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   id          3 non-null      int64
 1   name        3 non-null      object        
 2   city        3 non-null      object        
 3   birth_day   3 non-null      datetime64[ns]
 4   created_at  3 non-null      datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 248.0+ bytes
None
   id  name    city  birth_day          created_at
0   1  Mike  Dublin 2010-10-11 2021-11-12 23:12:32
1   2   Sam   Tokyo 1988-01-21 2021-11-12 23:12:32
2   3   Tom  London 1999-12-31 2021-11-12 23:12:32
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   id          3 non-null      int64
 1   name        3 non-null      object
 2   city        3 non-null      object
 3   birth_day   3 non-null      datetime64[ns]
 4   created_at  3 non-null      datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 248.0+ bytes
None

【2】PyArrow を使用する

1)出力・書き出し

以前、以下の関連記事で扱った pyarrow.parquet.write_table() でいける。

https://dk521123.hatenablog.com/entry/2021/09/21/224959

API仕様:pyarrow.parquet.write_table
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html

2)入力・読み込み

こちらも、以前、以下の関連記事で扱った pyarrow.parquet.read_table() でいける。

https://dk521123.hatenablog.com/entry/2021/09/21/224959

API仕様:pyarrow.parquet.read_table
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html

3)サンプル

import pandas as pd
import pyarrow
import pyarrow.parquet

file_name = "customer2.parq"
header_list = ['id', 'name', 'city', 'birth_day', 'created_at']
body_list = [
  ["1", "Mike", "Dublin", "2010-10-11", "2021-11-12 23:12:32"],
  ["2", "Sam", "Tokyo", "1988-01-21", "2021-11-12 23:12:32"],
  ["3", "Tom", "London", "1999-12-31", "2021-11-12 23:12:32"],
]

data_frame = pd.DataFrame(data=body_list, columns=header_list)
# 型変換
data_frame['id'] = data_frame['id'].astype('int64')
data_frame['name'] = data_frame['name'].astype('str')
data_frame['city'] = data_frame['city'].astype('str')
data_frame['birth_day'] = data_frame['birth_day'].astype('datetime64[ns]')
data_frame['created_at'] = data_frame['created_at'].astype('datetime64[ns]')

# DataFrame -> Arrow Table
table = pyarrow.Table.from_pandas(data_frame)

# 1)出力・書き出し - to_parquet()
pyarrow.parquet.write_table(table, file_name)

# 2)入力・読み込み - read_parquet()
loaded_table = pyarrow.parquet.read_table(file_name)
print(loaded_table)

loaded_data_frame = loaded_table.to_pandas()
print(loaded_data_frame)
print(loaded_data_frame.info())

print("Done")

出力結果

pyarrow.Table
id: int64
name: string
city: string
birth_day: timestamp[us] 
created_at: timestamp[us]
   id  name    city  birth_day          created_at
0   1  Mike  Dublin 2010-10-11 2021-11-12 23:12:32
1   2   Sam   Tokyo 1988-01-21 2021-11-12 23:12:32
2   3   Tom  London 1999-12-31 2021-11-12 23:12:32
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   id          3 non-null      int64
 1   name        3 non-null      object
 2   city        3 non-null      object
 3   birth_day   3 non-null      datetime64[ns]
 4   created_at  3 non-null      datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 248.0+ bytes
None
Done

【3】PySpark を使用する

使用上の注意

* Sparkが必要になるので、気軽にはできない

1)出力・書き出し

以前、以下の関連記事で扱った data_frame.write.parquet() でいける。

https://dk521123.hatenablog.com/entry/2021/04/11/101305

2)入力・読み込み

こちらも、以前、以下の関連記事で扱った spark.read.parquet() でいける。

https://dk521123.hatenablog.com/entry/2021/04/11/101305

3)サンプル

from datetime import datetime

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, TimestampType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
# ★BigIntは、Longで表現
from pyspark.sql.types import LongType
from pyspark.sql.types import TimestampType


spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales', datetime.strptime('2000-01-01 12:11:10.0000', "%Y-%m-%d %H:%M:%S.%f")),
  (2, 'Tom', 65, 'IT', datetime.strptime('2001-01-02 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
  (3, 'Sam', 32, 'Sales', datetime.strptime('2002-01-03 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
  (4, 'Kevin', 28, 'Human resources', datetime.strptime('2003-01-04 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
  (5, 'Bob', 25, 'IT', datetime.strptime('2004-01-05 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
  (6, 'Alice', 20, 'Banking', datetime.strptime('2005-01-06 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
  (7, 'Carol', 30, 'IT', datetime.strptime('2006-01-07 12:11:10.000', "%Y-%m-%d %H:%M:%S.%f")),
])
# StructField(【項目名】, 【データ型】, isNullbale)
schema = StructType([
  StructField('id', LongType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), True),
  StructField('birth_date', TimestampType(), True),
])
data_frame = spark.createDataFrame(rdd, schema)

# 一つにパーティションをまとめる
# https://dk521123.hatenablog.com/entry/2021/05/21/110922
data_frame = data_frame.repartition(1)

# 1)書き込み
data_frame.write \
  .mode('overwrite') \
  .option("compression", "gzip") \
  .parquet("./out")

data_frame.show()

# 2)読み込み
data_frame = spark.read.parquet("./out")
data_frame.show()

print("Done")

出力結果

+---+-----+---+---------------+-------------------+
| id| name|age|            job|         birth_date|
+---+-----+---+---------------+-------------------+
|  1| Mike| 45|          Sales|2000-01-01 12:11:10|
|  2|  Tom| 65|             IT|2001-01-02 12:11:10|
|  3|  Sam| 32|          Sales|2002-01-03 12:11:10|
|  4|Kevin| 28|Human resources|2003-01-04 12:11:10|
|  5|  Bob| 25|             IT|2004-01-05 12:11:10|
|  6|Alice| 20|        Banking|2005-01-06 12:11:10|
|  7|Carol| 30|             IT|2006-01-07 12:11:10|
+---+-----+---+---------------+-------------------+

+---+-----+---+---------------+-------------------+
| id| name|age|            job|         birth_date|
+---+-----+---+---------------+-------------------+
|  1| Mike| 45|          Sales|2000-01-01 12:11:10|
|  2|  Tom| 65|             IT|2001-01-02 12:11:10|
|  3|  Sam| 32|          Sales|2002-01-03 12:11:10|
|  4|Kevin| 28|Human resources|2003-01-04 12:11:10|
|  5|  Bob| 25|             IT|2004-01-05 12:11:10|
|  6|Alice| 20|        Banking|2005-01-06 12:11:10|
|  7|Carol| 30|             IT|2006-01-07 12:11:10|
+---+-----+---+---------------+-------------------+

Done

参考文献

https://ohke.hateblo.jp/entry/2020/08/15/150000

関連記事

Pandas ~ to_xxxx / 出力編 ~
https://dk521123.hatenablog.com/entry/2021/04/10/192752
Pandas ~ 基本編 / CSV編 ~
https://dk521123.hatenablog.com/entry/2020/11/17/000000
Apache Arrow / PyArrow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/21/224959
PySpark ~ Parquet / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/11/101305
AWS Glue ~ DynamicFrame ~
https://dk521123.hatenablog.com/entry/2021/12/14/221043
Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000