■ はじめに
https://dk521123.hatenablog.com/entry/2020/11/16/162114
の PySpark版。 小ネタだが、ファイル名のみを取得する場合(以下の 「【2】入力ファイルのみを取得するには」の「方法1:UDFで行う」)、 前に扱った UDF (User Defined Function)の復習になったので、 非常によかった。
目次
【1】フルパスを取得するには 【2】入力ファイルのみを取得するには 方法1:UDFで行う サンプル:udf関数から取り込む場合 サンプル:デコレータを利用する場合 方法2:Hiveで行う
【1】フルパスを取得するには
* pyspark.sql.functions.input_file_name() により取得可能
使用上の注意
input_file_name() って関数名だが、 フルパス(e.g. s3://xxx/xxxx/aaa.csv や file:///C:/xxxx/bbb.csv など) で返ってくることに注意!
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.functions import input_file_name def main(): spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv('C:/hello/sample/', header=True) \ .withColumn('file_name', input_file_name()) data_frame.show() if __name__ == '__main__': main()
入力データ:C:/hello/sample/hello_001.csv
id,name,city,birth_date 1,Joe Bloggs,Dublin,2020-12-22 2,Joseph Smith,Salt Lake City,1982-09-11 3,Mike,hello,1991-01-01
出力結果
+---+------------+--------------+----------+--------------------+ | id| name| city|birth_date| file_name| +---+------------+--------------+----------+--------------------+ | 1| Joe Bloggs| Dublin|2020-12-22|file:///C:/hello/s...| | 2|Joseph Smith|Salt Lake City|1982-09-11|file:///C:/hello/s...| | 3| Mike| hello|1991-01-01|file:///C:/hello/s...| +---+------------+--------------+----------+--------------------+
【2】入力ファイルのみを取得するには
方法1:UDFで行う
https://www.javaer101.com/article/1052268.html
で行っているUDF (User Defined Function=ユーザー定義関数)を 使って、PySparkで実装する。 なお、UDFについては、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2020/05/20/195621
使用上の注意
『デコレータを利用する場合』は、 使用する関数はUDF以外では使用しない方がいい。 理由としては、返却される値が、 今回の場合、str(文字列)ではなく、Objectとして返ってくるため。 => これで、バグってしまった。 => 詳細は、以下の関連記事に記載したので、こちらを参照。
https://dk521123.hatenablog.com/entry/2020/05/20/195621
サンプル:udf関数から取り込む場合
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.functions import input_file_name # ★注目★ from pyspark.sql.functions import udf import os # ファイル名のみ取得する関数 def to_file_name(full_path): return os.path.basename(full_path) spark_context = SparkContext() spark = SparkSession(spark_context) df = spark.read.csv("test.csv", header=False) schema = StructType([ StructField('no', StringType(), True), StructField('name', StringType(), True), StructField('height', StringType(), True), StructField('birth_date', StringType(), True), ]) data_frame = spark.createDataFrame(df.rdd, schema) # ★注目★ # 関数 to_file_name を定義。その名前を get_file_name と命名。 get_file_name = udf(to_file_name, StringType()) data_frame = data_frame \ .withColumn('file_name', get_file_name(input_file_name())) data_frame.show()
test.csv
100,Mike,179.6,2018-03-20 10:41:20 101,Sam,167.9,2018-03-03 11:32:34 102,Kevin,189.2,2018-01-28 20:20:11 103,Mike,179.6,2018-03-20 10:41:20 104,Sam,167.9,2018-03-03 11:32:34 105,Kevin,189.2,2018-01-28 20:20:11 106,Mike,179.6,2018-03-20 10:41:20 107,Sam,167.9,2018-03-03 11:32:34 108,Kevin,189.2,2018-01-28 20:20:11
出力結果
+---+-----+------+-------------------+---------+ | no| name|height| birth_date|file_name| +---+-----+------+-------------------+---------+ |100| Mike| 179.6|2018-03-20 10:41:20| test.csv| |101| Sam| 167.9|2018-03-03 11:32:34| test.csv| |102|Kevin| 189.2|2018-01-28 20:20:11| test.csv| |103| Mike| 179.6|2018-03-20 10:41:20| test.csv| |104| Sam| 167.9|2018-03-03 11:32:34| test.csv| |105|Kevin| 189.2|2018-01-28 20:20:11| test.csv| |106| Mike| 179.6|2018-03-20 10:41:20| test.csv| |107| Sam| 167.9|2018-03-03 11:32:34| test.csv| |108|Kevin| 189.2|2018-01-28 20:20:11| test.csv| +---+-----+------+-------------------+---------+
サンプル:デコレータを利用する場合
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.functions import input_file_name from pyspark.sql.functions import udf import os # ファイル名のみ取得する関数 @udf(returnType=StringType()) def to_file_name(full_path): return os.path.basename(full_path) spark_context = SparkContext() spark = SparkSession(spark_context) df = spark.read.csv("test.csv", header=False) schema = StructType([ StructField('no', StringType(), True), StructField('name', StringType(), True), StructField('height', StringType(), True), StructField('birth_date', StringType(), True), ]) data_frame = spark.createDataFrame(df.rdd, schema) # ★注目★ data_frame = data_frame \ .withColumn('file_name', to_file_name(input_file_name())) # 出力結果は、上のサンプルと同じだから省略 data_frame.show()
方法2:Hiveで行う
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql import functions def main(): spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv('C:/work/hello/sample/', header=True) \ .withColumn('file_name', functions.input_file_name()) data_frame.createOrReplaceTempView("person") data_frame = spark.sql("""SELECT id AS id ,name AS name ,city AS city ,birth_date AS birth_date ,REVERSE(SPLIT(REVERSE(file_name), '/')[0]) AS file_name ,REVERSE(SUBSTRING(SPLIT(REVERSE(file_name), '/')[0], 5, 3)) AS file_no FROM person """) data_frame.show() if __name__ == '__main__': main()
出力結果
+---+------------+--------------+----------+-------------+-------+ | id| name| city|birth_date| file_name|file_no| +---+------------+--------------+----------+-------------+-------+ | 1| Joe Bloggs| Dublin|2020-12-22|hello_001.csv| 001| | 2|Joseph Smith|Salt Lake City|1982-09-11|hello_001.csv| 001| | 3| Mike| hello|1991-01-01|hello_001.csv| 001| +---+------------+--------------+----------+-------------+-------+
参考文献
https://dev.classmethod.jp/articles/spark-input-file/
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0
関連記事
Hiveクエリで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2020/11/16/162114
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
Python ~ 基本編 / 正規表現 ~
https://dk521123.hatenablog.com/entry/2019/09/01/000000