【分散処理】PySpark ~ DataFrame / データ集計編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/01/04/150942

の続き。

今回は、テーブルデータの集計に関して扱う。

目次

【0】agg (集計)
【1】min/max (最小/最大)
【2】count (カウント)
【3】countDistinct (重複カウント)

他にも、sum (総計), avg (平均) などがある

【0】agg (集計)

* aggregate = 集計
* min, max, sumなどの実行結果を返す

API仕様
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.agg.html

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  ('1999-02-01', 'YouTube', 1),
  ('1999-02-01', 'Google', 2),
  ('1999-02-02', 'YouTube', 2),
  ('1999-02-02', 'Google', 3),
  ('1999-02-03', 'YouTube', 4),
  ('1999-02-03', 'Google', 5),
  ('1999-02-04', 'YouTube', 12),
  ('1999-02-04', 'Google', 31),
])

schema = StructType([
  StructField('date', StringType(), False),
  StructField('access_site', StringType(), False),
  StructField('access_count', IntegerType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)
# data_frame.show()

# ★ここに注目★
data_frame.groupBy("access_site") \
  .agg(F.sum(data_frame.access_count)) \
  .show()

print("Done")
spark.stop()

出力結果

+-----------+-----------------+
|access_site|sum(access_count)|
+-----------+-----------------+
|    YouTube|               19|
|     Google|               41|
+-----------+-----------------+

【1】min/max (最小/最大)

サンプル

from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max

spark = SparkSession \
  .builder \
  .appName("Demo") \
  .getOrCreate()

df = spark.read.csv(
  "./hello_world.csv", header=True)

df_min = df.groupBy('job') \
  .agg(min('birth_date').alias('min_birth'))
df_min.show()

df_max = df.groupBy('job') \
  .agg(max('birth_date').alias('max_birth'))
df_max.show()

入力ファイル「hello_world.csv

id,name,job,birth_date
x0001,Mike,Sales,1991-01-01
x0002,Tom,IT,1971-11-21
x0003,Sam,Sales,1971-08-15
x0004,Kevin,Human resources,1971-04-08
x0005,Bob,,2000-09-09
x0006,Alice,Banking,1981-06-29
x0007,Carol,,1956-10-12
x0008,Tom,Banking,1987-06-27
x0009,Mike,IT,1967-10-12
x0010,Bob,,1989-12-31

出力結果

+---------------+----------+
|            job| min_birth|
+---------------+----------+
|          Sales|1971-08-15|
|           null|1956-10-12|
|             IT|1967-10-12|
|Human resources|1971-04-08|
|        Banking|1981-06-29|
+---------------+----------+

+---------------+----------+
|            job| max_birth|
+---------------+----------+
|          Sales|1991-01-01|
|           null|2000-09-09|
|             IT|1971-11-21|
|Human resources|1971-04-08|
|        Banking|1987-06-27|
+---------------+----------+

【2】count (カウント)

サンプル

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Demo") \
  .getOrCreate()

df = spark.read.csv(
  "./hello_world.csv", header=True)

# Count
print(f"Row count = {df.count()}")

# Null count (filter + count)
df_null = df.filter(df["job"].isNull())
print(f"null count = {df_null.count()}")

# Count each job (filter + count)
df_job = df.groupby('job') \
  .count() \
  .sort("count", ascending=False)
df_job.show()

出力結果

Row count = 10
null count = 3
+---------------+-----+
|            job|count|
+---------------+-----+
|           null|    3|
|          Sales|    2|
|             IT|    2|
|        Banking|    2|
|Human resources|    1|
+---------------+-----+

【3】countDistinct (重複カウント)

* DataFrameで SELECT COUNT( DICTINCT (xxx)) 構文が
 使えないため、個別のメソッドが用意されている

API仕様
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.countDistinct.html

サンプル

from pyspark.sql.functions import countDistinct

# Job count
df_job_count = df.select(countDistinct('job').alias('job_count'))
df_job_count.show()

出力結果

+---------+
|job_count|
+---------+
|        4|
+---------+

参考文献

https://x1.inkenkun.com/archives/5207
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
https://qiita.com/wwacky/items/e687c0ef05ae7f1de980

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
Hive / HiveQL ~ 日時・日付操作編 ~
https://dk521123.hatenablog.com/entry/2021/02/11/233633