【分散処理】PySpark ~ ユーザ定義関数 UDF 編 ~

■ はじめに

PySpark の UDF (User Defined Function) っての学ぶ。

目次

【1】UDF とは
【2】UDF定義方法
 1)udf関数から取り込む
 2)デコレータを利用する方法
 3)spark.udf.register() で登録する
【3】使用上の注意
【4】サンプル

なお、以下の関連記事でもUDFを実際に使用している。

PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133

【1】UDF とは

* UDF = User Defined Function(ユーザー定義関数)
* ユーザが定義した関数を使って Spark クラスタで分散処理をするための機能

【2】UDF定義方法

1)udf関数から取り込む
2)デコレータを利用する方法
3)spark.udf.register() で登録する

⇒ 実際のコードは、以下の関連記事を参照のこと

PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132

⇒ デコレータの詳細は、以下の関連記事を参照のこと。

Python ~ 基本編 / デコレータ @xxxx ~
https://dk521123.hatenablog.com/entry/2020/05/19/000000

【3】使用上の注意

* 以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/05/20/095706

【4】サンプル

* 下記以外のサンプルは、以下の関連記事を参照のこと。

PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132

例:UDFを使って、Data frameに項目を追加する

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

spark_context = SparkContext()
spark = SparkSession(spark_context)

# UDF - lambda を使用
is_adult_udf = udf(
  lambda age: True if age >= 20 else False, BooleanType())
to_usd_udf = udf(
  lambda salary: salary * 0.0092, DoubleType())

rdd = spark_context.parallelize([
  (1, 'Mike', 18, 320000, 'Sales'),
  (2, 'Tom', 19, 200000, 'IT'),
  (3, 'Sam', 23, 320000, 'Sales'),
  (4, 'Kevin', 32, 300000, 'Human resources'),
  (5, 'Bob', 45, 460000, 'IT'),
  (6, 'Alice', 20, 230000, 'Banking'),
  (7, 'Carol', 54, 500000, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('salary', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

# 項目追加
data_frame = data_frame \
  .withColumn('is_adult', is_adult_udf(col("age"))) \
  .withColumn('salary_usd', to_usd_udf(data_frame.salary))

data_frame.show()

出力結果

+---+-----+---+------+---------------+--------+----------+
| id| name|age|salary|            job|is_adult|salary_usd|
+---+-----+---+------+---------------+--------+----------+
|  1| Mike| 18|320000|          Sales|   false|    2944.0|
|  2|  Tom| 19|200000|             IT|   false|    1840.0|
|  3|  Sam| 23|320000|          Sales|    true|    2944.0|
|  4|Kevin| 32|300000|Human resources|    true|    2760.0|
|  5|  Bob| 45|460000|             IT|    true|    4232.0|
|  6|Alice| 20|230000|        Banking|    true|    2116.0|
|  7|Carol| 54|500000|             IT|    true|    4600.0|
+---+-----+---+------+---------------+--------+----------+

参考文献

https://blog.amedama.jp/entry/2018/01/31/210755
https://qiita.com/takugenn/items/eb725f6bfa0bc38b5d79

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
PySpark ~ UDF の使用上の注意 ~
https://dk521123.hatenablog.com/entry/2021/05/20/095706
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
Python ~ 基本編 / デコレータ @xxxx ~
https://dk521123.hatenablog.com/entry/2020/05/19/000000