【PySpark】PySparkで入力ファイル名を取得するには

■ はじめに

https://dk521123.hatenablog.com/entry/2020/11/16/162114

の PySpark版。

 小ネタだが、ファイル名のみを取得する場合(以下の
「【2】入力ファイルのみを取得するには」の「方法1:UDFで行う」)、
前に扱った UDF (User Defined Function)の復習になったので、
非常によかった。

目次

【1】フルパスを取得するには
【2】入力ファイルのみを取得するには
 方法1:UDFで行う
  サンプル:udf関数から取り込む場合
  サンプル:デコレータを利用する場合
 方法2:Hiveで行う

【1】フルパスを取得するには

* pyspark.sql.functions.input_file_name() により取得可能

https://spark.apache.org/docs/latest/api/python//reference/api/pyspark.sql.functions.input_file_name.html

使用上の注意

input_file_name() って関数名だが、
フルパス(e.g. s3://xxx/xxxx/aaa.csv や file:///C:/xxxx/bbb.csv など)
で返ってくることに注意!

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv('C:/hello/sample/', header=True) \
    .withColumn('file_name', input_file_name())
  data_frame.show()


if __name__ == '__main__':
  main()

入力データ:C:/hello/sample/hello_001.csv

id,name,city,birth_date
1,Joe Bloggs,Dublin,2020-12-22
2,Joseph Smith,Salt Lake City,1982-09-11
3,Mike,hello,1991-01-01

出力結果

+---+------------+--------------+----------+--------------------+
| id|        name|          city|birth_date|           file_name|
+---+------------+--------------+----------+--------------------+
|  1|  Joe Bloggs|        Dublin|2020-12-22|file:///C:/hello/s...|
|  2|Joseph Smith|Salt Lake City|1982-09-11|file:///C:/hello/s...|
|  3|        Mike|         hello|1991-01-01|file:///C:/hello/s...|
+---+------------+--------------+----------+--------------------+

【2】入力ファイルのみを取得するには

方法1:UDFで行う

https://www.javaer101.com/article/1052268.html

で行っているUDF (User Defined Function=ユーザー定義関数)を
使って、PySparkで実装する。
なお、UDFについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/05/20/195621
使用上の注意

 『デコレータを利用する場合』は、
使用する関数はUDF以外では使用しない方がいい。

 理由としては、返却される値が、
今回の場合、str(文字列)ではなく、Objectとして返ってくるため。
 => これで、バグってしまった。
 => 詳細は、以下の関連記事に記載したので、こちらを参照。

https://dk521123.hatenablog.com/entry/2020/05/20/195621

サンプル:udf関数から取り込む場合

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name
# ★注目★
from pyspark.sql.functions import udf

import os

# ファイル名のみ取得する関数
def to_file_name(full_path):
  return os.path.basename(full_path)

spark_context = SparkContext()
spark = SparkSession(spark_context)

df = spark.read.csv("test.csv", header=False)

schema = StructType([
  StructField('no', StringType(), True),
  StructField('name', StringType(), True),
  StructField('height', StringType(), True),
  StructField('birth_date', StringType(), True),
])
data_frame = spark.createDataFrame(df.rdd, schema)

# ★注目★
# 関数 to_file_name を定義。その名前を get_file_name と命名。
get_file_name = udf(to_file_name, StringType())
data_frame = data_frame \
  .withColumn('file_name', get_file_name(input_file_name()))

data_frame.show()

test.csv

100,Mike,179.6,2018-03-20 10:41:20
101,Sam,167.9,2018-03-03 11:32:34
102,Kevin,189.2,2018-01-28 20:20:11
103,Mike,179.6,2018-03-20 10:41:20
104,Sam,167.9,2018-03-03 11:32:34
105,Kevin,189.2,2018-01-28 20:20:11
106,Mike,179.6,2018-03-20 10:41:20
107,Sam,167.9,2018-03-03 11:32:34
108,Kevin,189.2,2018-01-28 20:20:11

出力結果

+---+-----+------+-------------------+---------+
| no| name|height|         birth_date|file_name|
+---+-----+------+-------------------+---------+
|100| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|101|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|102|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
|103| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|104|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|105|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
|106| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|107|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|108|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
+---+-----+------+-------------------+---------+

サンプル:デコレータを利用する場合

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import udf

import os


# ファイル名のみ取得する関数
@udf(returnType=StringType())
def to_file_name(full_path):
  return os.path.basename(full_path)

spark_context = SparkContext()
spark = SparkSession(spark_context)

df = spark.read.csv("test.csv", header=False)

schema = StructType([
  StructField('no', StringType(), True),
  StructField('name', StringType(), True),
  StructField('height', StringType(), True),
  StructField('birth_date', StringType(), True),
])
data_frame = spark.createDataFrame(df.rdd, schema)

# ★注目★
data_frame = data_frame \
  .withColumn('file_name', to_file_name(input_file_name()))

# 出力結果は、上のサンプルと同じだから省略
data_frame.show()

方法2:Hiveで行う

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv('C:/work/hello/sample/', header=True) \
    .withColumn('file_name', functions.input_file_name())
  data_frame.createOrReplaceTempView("person")
  data_frame = spark.sql("""SELECT
    id AS id
    ,name AS name
    ,city AS city
    ,birth_date AS birth_date
    ,REVERSE(SPLIT(REVERSE(file_name), '/')[0]) AS file_name
    ,REVERSE(SUBSTRING(SPLIT(REVERSE(file_name), '/')[0], 5, 3)) AS file_no
  FROM person
  """)
  data_frame.show()


if __name__ == '__main__':
  main()

出力結果

+---+------------+--------------+----------+-------------+-------+
| id|        name|          city|birth_date|    file_name|file_no|
+---+------------+--------------+----------+-------------+-------+
|  1|  Joe Bloggs|        Dublin|2020-12-22|hello_001.csv|    001|
|  2|Joseph Smith|Salt Lake City|1982-09-11|hello_001.csv|    001|
|  3|        Mike|         hello|1991-01-01|hello_001.csv|    001|
+---+------------+--------------+----------+-------------+-------+

参考文献

https://dev.classmethod.jp/articles/spark-input-file/
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0

関連記事

Hiveクエリで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2020/11/16/162114
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
Python ~ 基本編 / 正規表現
https://dk521123.hatenablog.com/entry/2019/09/01/000000

【分散処理】PySpark ~ Parquet / 基本編 ~

■ はじめに

PySpark で、Parquet フォーマットで
保存する必要ができたので調べてみた

Parquet ファイルに関しては、以下の関連記事を参照のこと。

Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000

目次

【1】サポートされている圧縮形式
 [補足] Pandas の場合
 [補足] AWS Glueの場合
【2】データ型
【3】Parquet 書き込み
 [補足] セーブモード (mode)
【4】Parquet 読み込み

【1】サポートされている圧縮形式

https://spark.apache.org/docs/2.4.3/sql-data-sources-parquet.html
spark.sql.parquet.compression.codec

* none
* uncompressed:非圧縮
* snappy(デフォルト)
* gzip
* lzo
* brotli
 => BrotliCodec がインストールされること
* lz4
* zstd
 => ZStandardCodec がインストールされること

[補足] Pandas の場合

以下の関連記事より抜粋
 ~~~~
compression=‘snappy’, ‘gzip’, ‘brotli’, None
 ~~~~

https://dk521123.hatenablog.com/entry/2021/04/10/192752

[補足] AWS Glueの場合

* 以下の公式サイトを参照。

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

【2】データ型

http://mogile.web.fc2.com/spark/sql-ref-datatypes.html#supported-data-types

に記載されている。

* BigInt型は、Longで表現
 => 以下の関連記事のサンプルを参照のこと。

https://dk521123.hatenablog.com/entry/2021/11/13/095519

【3】Parquet 書き込み

* data_frame.write.parquet("【出力パス】")で行う

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales'),
  (2, 'Tom', 65, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 28, 'Human resources'),
  (5, 'Bob', 25, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

# 書き込み
data_frame.write \
  .mode('overwrite') \
  .option("compression", "gzip") \
  .parquet("./out")

data_frame.show()

print("Done")

[補足] セーブモード (mode)

http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html#save-modes

1)error ... データが存在する場合、例外を発生させる
2)append ... データ/テーブル が存在する場合、既存データに追記
3)overwrite ... データ/テーブル が存在する場合、既存データで上書きする
4)ignore ...  データ が存在する場合、何もしない(ignore=無視)

append / overwrite との使い分け

結論からすると、仕様(フォルダ構成含む)に応じて使い分けていけば

overwriteの場合、特に気にせずに 冪等性(べきとうせい)が担保されるが
appendの場合、重複して登録されてしまう
 => 例えば、AWS Glue や Lambda などで、Write途中でエラーになり、
  再実行された場合、overwriteであれば問題ないが、
  appendの場合、二重にデータを保存されてしまう。

参考文献
https://yamap55.hatenablog.com/entry/2019/07/25/200000

【4】Parquet 読み込み

* data_frame = spark.read.parquet("【読み込みパス】")で行う

サンプル:【3】で書き込みしたファイルを読み込む

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

# 読み込み
data_frame = spark.read.parquet("./out")

data_frame.show()

print("Done")

出力結果

+---+-----+---+---------------+
| id| name|age|            job|
+---+-----+---+---------------+
|  4|Kevin| 28|Human resources|
|  6|Alice| 20|        Banking|
|  1| Mike| 45|          Sales|
|  3|  Sam| 32|          Sales|
|  7|Carol| 30|             IT|
|  2|  Tom| 65|             IT|
|  5|  Bob| 25|             IT|
+---+-----+---+---------------+

参考文献

https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0

関連記事

Pandas ~ to_xxxx / 出力編 ~
https://dk521123.hatenablog.com/entry/2021/04/10/192752
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
Python で Parquet を扱う
https://dk521123.hatenablog.com/entry/2021/11/13/095519

【Python】 Pandas ~ to_xxxx / 出力編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/10/22/014957
https://dk521123.hatenablog.com/entry/2020/10/14/000000
https://dk521123.hatenablog.com/entry/2021/04/07/105858

の続き。

to_dict / to_json などを使用したのだが、
他にも色々あるので調べてみた。

以下に載せた形式以外にもあるみたいだけど、
載せきれないので、気になったものだけあげておく。

目次

【1】出力ファイル
 1)to_csv
 2)to_excel
 3)to_parquet
 4)to_pickle
 5)to_latex
 6)to_feather
 7)to_hdf
 8)to_stata
 9)to_html
【2】その他
 1)to_dict
 2)to_json
 3)to_numpy
 4)to_sql
 5)to_gbq

【1】出力ファイル

1)to_csv

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html

* CSVファイル出力
* 以下の関連記事を参照のこと

Pandas ~ 基本編 / CSV編 ~
https://dk521123.hatenablog.com/entry/2020/11/17/000000

2)to_excel

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_excel.html

* Excelファイル出力
* 以下の関連記事を参照のこと

Pandas ~ 基本編 / Excel編 ~
https://dk521123.hatenablog.com/entry/2020/11/18/000000
Pandas ~ 基本編 / Excel => CSVに変換 ~
https://dk521123.hatenablog.com/entry/2021/01/25/000000

3)to_parquet

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html

* Parquet形式での出力
 => Parquetファイルの詳細は、以下の関連記事を参照のこと)

https://dk521123.hatenablog.com/entry/2020/06/03/000000
圧縮形式 / compression

compression=‘snappy’, ‘gzip’, ‘brotli’, None
 => default は、‘snappy’
 => None は、非圧縮

サンプル

* 以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/11/13/095519

4)to_pickle

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_pickle.html

* pickleファイルで出力

5)to_latex

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_latex.html

* LaTexファイルで出力

参考文献
https://www.haya-programming.com/entry/2018/05/31/020009

6)to_feather

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_feather.html

* バイナリFeatherファイル(列志向)で出力

7)to_hdf

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_hdf.html

* HDF5ファイル
(Hierarchical Data Format=階層的データ形式。バージョン5)で出力

8)to_stata

https://pandas.pydata.org/pandas-docs/version/1.2.0/reference/api/pandas.DataFrame.to_stata.html

* Stata dtaファイル(***.dta)で出力

9)to_html

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_html.html

* HTMLファイルで出力

出力例

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>item1</th>
      <th>item2</th>
      <th>item3</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>1</td>
      <td>2</td>
      <td>3</td>
    </tr>
    <tr>
      <th>1</th>
      <td>4</td>
      <td>5</td>
      <td>6</td>
    </tr>
    <tr>
      <th>2</th>
      <td>7</td>
      <td>8</td>
      <td>9</td>
    </tr>
  </tbody>
</table>

【2】その他

1)to_dict

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_dict.html

* 辞書に変換

圧縮形式 / orient

* orient = ‘dict’, ‘list’, ‘series’, ‘split’, ‘records’, ‘index’
 => とりあえず、「orient=‘records’」を押さえておけば大丈夫そう

サンプル

import pandas as pd

list = [
  {'item1': 1, 'item2': 2, 'item3': 3},
  {'item1': 4, 'item2': 5, 'item3': 6},
  {'item1': 7, 'item2': 8, 'item3': 9}
]
df = pd.DataFrame(list)

print('*********')
# {'item1': {0: 1, 1: 4, 2: 7}, 'item2': {0: 2, 1: 5, 2: 8}, 'item3': {0: 3, 1: 6, 2: 9}}
print(df.to_dict(orient='dict'))

print('*********')
# {'item1': [1, 4, 7], 'item2': [2, 5, 8], 'item3': [3, 6, 9]}
print(df.to_dict(orient='list'))

print('*********')
# {'item1': 0    1
# 1    4
# 2    7
# Name: item1, dtype: int64, 'item2': 0    2
# 1    5
# 2    8
# Name: item2, dtype: int64, 'item3': 0    3
# 1    6
# 2    9
# Name: item3, dtype: int64}
print(df.to_dict(orient='series'))

print('*********')
# {'index': [0, 1, 2], 'columns': ['item1', 'item2', 'item3'], 'data': [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}
print(df.to_dict(orient='split'))

print('*********')
# [{'item1': 1, 'item2': 2, 'item3': 3}, {'item1': 4, 'item2': 5, 'item3': 6}, {'item1': 7, 'item2': 8, 'item3': 9}]
print(df.to_dict(orient='records'))

print('*********')
# {0: {'item1': 1, 'item2': 2, 'item3': 3}, 1: {'item1': 4, 'item2': 5, 'item3': 6}, 2: {'item1': 7, 'item2': 8, 'item3': 9}}
print(df.to_dict(orient='index'))

2)to_json

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html

* JSON形式に変換
 => サンプルなどは、以下の関連記事を参照のこと。

Pandas ~ 基本編 / JSON編 ~
https://dk521123.hatenablog.com/entry/2022/02/16/000000

使用上の注意

df.to_json の戻り値は、文字列なので、
for などでループさせるには
以下のようにする必要がある

json_data = json.loads(json_str)

3)to_numpy

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_numpy.html

* Numpy 配列に変換

参考文献
https://note.nkmk.me/python-pandas-numpy-conversion/

4)to_sql

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html

* DBへの書き込み

参考文献
https://www.haya-programming.com/entry/2019/05/03/043334

5)to_gbq

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_gbq.html

* BigQueryへの書き込み

参考文献
https://qiita.com/i_am_miko/items/68cb516ad2be61d59554

参考文献

https://blog.imind.jp/entry/2019/04/12/224942
https://blog.amedama.jp/entry/2018/07/11/081050

関連記事

Pandas ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/22/014957
Pandas ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2020/10/14/000000
Pandas ~ 基本編 / CSV編 ~
https://dk521123.hatenablog.com/entry/2020/11/17/000000
Pandas ~ 基本編 / JSON編 ~
https://dk521123.hatenablog.com/entry/2022/02/16/000000
Pandas ~ 基本編 / Excel編 ~
https://dk521123.hatenablog.com/entry/2020/11/18/000000
Pandas ~ 基本編 / Excel => CSVに変換 ~
https://dk521123.hatenablog.com/entry/2021/01/25/000000
Pandas ~ データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/04/07/105858
Pandas ~ 基本編 / データのクレンジング ~
https://dk521123.hatenablog.com/entry/2020/04/06/235555
Pandas の環境設定でのトラブル
https://dk521123.hatenablog.com/entry/2021/03/19/000000
NumPy ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2018/03/28/224532
Parquet ファイル
https://dk521123.hatenablog.com/entry/2020/06/03/000000
Python で Parquet を扱う
https://dk521123.hatenablog.com/entry/2021/11/13/095519