【PySpark】PySparkで入力ファイル名を取得するには

■ はじめに

https://dk521123.hatenablog.com/entry/2020/11/16/162114

の PySpark版。

 小ネタだが、ファイル名のみを取得する場合(以下の
「【2】入力ファイルのみを取得するには」の「方法1:UDFで行う」)、
前に扱った UDF (User Defined Function)の復習になったので、
非常によかった。

目次

【1】フルパスを取得するには
【2】入力ファイルのみを取得するには
 方法1:UDFで行う
  サンプル:udf関数から取り込む場合
  サンプル:デコレータを利用する場合
 方法2:Hiveで行う

【1】フルパスを取得するには

* pyspark.sql.functions.input_file_name() により取得可能

https://spark.apache.org/docs/latest/api/python//reference/api/pyspark.sql.functions.input_file_name.html

使用上の注意

input_file_name() って関数名だが、
フルパス(e.g. s3://xxx/xxxx/aaa.csv や file:///C:/xxxx/bbb.csv など)
で返ってくることに注意!

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv('C:/hello/sample/', header=True) \
    .withColumn('file_name', input_file_name())
  data_frame.show()


if __name__ == '__main__':
  main()

入力データ:C:/hello/sample/hello_001.csv

id,name,city,birth_date
1,Joe Bloggs,Dublin,2020-12-22
2,Joseph Smith,Salt Lake City,1982-09-11
3,Mike,hello,1991-01-01

出力結果

+---+------------+--------------+----------+--------------------+
| id|        name|          city|birth_date|           file_name|
+---+------------+--------------+----------+--------------------+
|  1|  Joe Bloggs|        Dublin|2020-12-22|file:///C:/hello/s...|
|  2|Joseph Smith|Salt Lake City|1982-09-11|file:///C:/hello/s...|
|  3|        Mike|         hello|1991-01-01|file:///C:/hello/s...|
+---+------------+--------------+----------+--------------------+

【2】入力ファイルのみを取得するには

方法1:UDFで行う

https://www.javaer101.com/article/1052268.html

で行っているUDF (User Defined Function=ユーザー定義関数)を
使って、PySparkで実装する。
なお、UDFについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/05/20/195621
使用上の注意

 『デコレータを利用する場合』は、
使用する関数はUDF以外では使用しない方がいい。

 理由としては、返却される値が、
今回の場合、str(文字列)ではなく、Objectとして返ってくるため。
 => これで、バグってしまった。
 => 詳細は、以下の関連記事に記載したので、こちらを参照。

https://dk521123.hatenablog.com/entry/2020/05/20/195621

サンプル:udf関数から取り込む場合

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name
# ★注目★
from pyspark.sql.functions import udf

import os

# ファイル名のみ取得する関数
def to_file_name(full_path):
  return os.path.basename(full_path)

spark_context = SparkContext()
spark = SparkSession(spark_context)

df = spark.read.csv("test.csv", header=False)

schema = StructType([
  StructField('no', StringType(), True),
  StructField('name', StringType(), True),
  StructField('height', StringType(), True),
  StructField('birth_date', StringType(), True),
])
data_frame = spark.createDataFrame(df.rdd, schema)

# ★注目★
# 関数 to_file_name を定義。その名前を get_file_name と命名。
get_file_name = udf(to_file_name, StringType())
data_frame = data_frame \
  .withColumn('file_name', get_file_name(input_file_name()))

data_frame.show()

test.csv

100,Mike,179.6,2018-03-20 10:41:20
101,Sam,167.9,2018-03-03 11:32:34
102,Kevin,189.2,2018-01-28 20:20:11
103,Mike,179.6,2018-03-20 10:41:20
104,Sam,167.9,2018-03-03 11:32:34
105,Kevin,189.2,2018-01-28 20:20:11
106,Mike,179.6,2018-03-20 10:41:20
107,Sam,167.9,2018-03-03 11:32:34
108,Kevin,189.2,2018-01-28 20:20:11

出力結果

+---+-----+------+-------------------+---------+
| no| name|height|         birth_date|file_name|
+---+-----+------+-------------------+---------+
|100| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|101|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|102|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
|103| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|104|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|105|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
|106| Mike| 179.6|2018-03-20 10:41:20| test.csv|
|107|  Sam| 167.9|2018-03-03 11:32:34| test.csv|
|108|Kevin| 189.2|2018-01-28 20:20:11| test.csv|
+---+-----+------+-------------------+---------+

サンプル:デコレータを利用する場合

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import udf

import os


# ファイル名のみ取得する関数
@udf(returnType=StringType())
def to_file_name(full_path):
  return os.path.basename(full_path)

spark_context = SparkContext()
spark = SparkSession(spark_context)

df = spark.read.csv("test.csv", header=False)

schema = StructType([
  StructField('no', StringType(), True),
  StructField('name', StringType(), True),
  StructField('height', StringType(), True),
  StructField('birth_date', StringType(), True),
])
data_frame = spark.createDataFrame(df.rdd, schema)

# ★注目★
data_frame = data_frame \
  .withColumn('file_name', to_file_name(input_file_name()))

# 出力結果は、上のサンプルと同じだから省略
data_frame.show()

方法2:Hiveで行う

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv('C:/work/hello/sample/', header=True) \
    .withColumn('file_name', functions.input_file_name())
  data_frame.createOrReplaceTempView("person")
  data_frame = spark.sql("""SELECT
    id AS id
    ,name AS name
    ,city AS city
    ,birth_date AS birth_date
    ,REVERSE(SPLIT(REVERSE(file_name), '/')[0]) AS file_name
    ,REVERSE(SUBSTRING(SPLIT(REVERSE(file_name), '/')[0], 5, 3)) AS file_no
  FROM person
  """)
  data_frame.show()


if __name__ == '__main__':
  main()

出力結果

+---+------------+--------------+----------+-------------+-------+
| id|        name|          city|birth_date|    file_name|file_no|
+---+------------+--------------+----------+-------------+-------+
|  1|  Joe Bloggs|        Dublin|2020-12-22|hello_001.csv|    001|
|  2|Joseph Smith|Salt Lake City|1982-09-11|hello_001.csv|    001|
|  3|        Mike|         hello|1991-01-01|hello_001.csv|    001|
+---+------------+--------------+----------+-------------+-------+

参考文献

https://dev.classmethod.jp/articles/spark-input-file/
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0

関連記事

Hiveクエリで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2020/11/16/162114
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
Python ~ 基本編 / 正規表現
https://dk521123.hatenablog.com/entry/2019/09/01/000000