■ はじめに
PySpark の UDF (User Defined Function) っての学ぶ。
目次
【1】UDF とは 【2】UDF定義方法 1)udf関数から取り込む 2)デコレータを利用する方法 3)spark.udf.register() で登録する 【3】使用上の注意 【4】サンプル なお、以下の関連記事でもUDFを実際に使用している。
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
【1】UDF とは
* UDF = User Defined Function(ユーザー定義関数) * ユーザが定義した関数を使って Spark クラスタで分散処理をするための機能
【2】UDF定義方法
1)udf関数から取り込む 2)デコレータを利用する方法 3)spark.udf.register() で登録する ⇒ 実際のコードは、以下の関連記事を参照のこと
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
⇒ デコレータの詳細は、以下の関連記事を参照のこと。
Python ~ 基本編 / デコレータ @xxxx ~
https://dk521123.hatenablog.com/entry/2020/05/19/000000
【3】使用上の注意
* 以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/05/20/095706
【4】サンプル
* 下記以外のサンプルは、以下の関連記事を参照のこと。
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
例:UDFを使って、Data frameに項目を追加する
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql.types import DoubleType from pyspark.sql.types import BooleanType from pyspark.sql.functions import udf from pyspark.sql.functions import col spark_context = SparkContext() spark = SparkSession(spark_context) # UDF - lambda を使用 is_adult_udf = udf( lambda age: True if age >= 20 else False, BooleanType()) to_usd_udf = udf( lambda salary: salary * 0.0092, DoubleType()) rdd = spark_context.parallelize([ (1, 'Mike', 18, 320000, 'Sales'), (2, 'Tom', 19, 200000, 'IT'), (3, 'Sam', 23, 320000, 'Sales'), (4, 'Kevin', 32, 300000, 'Human resources'), (5, 'Bob', 45, 460000, 'IT'), (6, 'Alice', 20, 230000, 'Banking'), (7, 'Carol', 54, 500000, 'IT'), ]) schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), False), StructField('age', IntegerType(), False), StructField('salary', IntegerType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) # 項目追加 data_frame = data_frame \ .withColumn('is_adult', is_adult_udf(col("age"))) \ .withColumn('salary_usd', to_usd_udf(data_frame.salary)) data_frame.show()
出力結果
+---+-----+---+------+---------------+--------+----------+ | id| name|age|salary| job|is_adult|salary_usd| +---+-----+---+------+---------------+--------+----------+ | 1| Mike| 18|320000| Sales| false| 2944.0| | 2| Tom| 19|200000| IT| false| 1840.0| | 3| Sam| 23|320000| Sales| true| 2944.0| | 4|Kevin| 32|300000|Human resources| true| 2760.0| | 5| Bob| 45|460000| IT| true| 4232.0| | 6|Alice| 20|230000| Banking| true| 2116.0| | 7|Carol| 54|500000| IT| true| 4600.0| +---+-----+---+------+---------------+--------+----------+
参考文献
https://blog.amedama.jp/entry/2018/01/31/210755
https://qiita.com/takugenn/items/eb725f6bfa0bc38b5d79
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
PySpark ~ UDF の使用上の注意 ~
https://dk521123.hatenablog.com/entry/2021/05/20/095706
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
Python ~ 基本編 / デコレータ @xxxx ~
https://dk521123.hatenablog.com/entry/2020/05/19/000000