■ はじめに
https://dk521123.hatenablog.com/entry/2020/01/04/150942
の続き。 今回は、テーブルデータの集計に関して扱う。
目次
【0】agg (集計) 【1】min/max (最小/最大) 【2】count (カウント) 【3】countDistinct (重複カウント) 他にも、sum (総計), avg (平均) などがある
【0】agg (集計)
* aggregate = 集計 * min, max, sumなどの実行結果を返す
API仕様
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.agg.html
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql import functions as F spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ ('1999-02-01', 'YouTube', 1), ('1999-02-01', 'Google', 2), ('1999-02-02', 'YouTube', 2), ('1999-02-02', 'Google', 3), ('1999-02-03', 'YouTube', 4), ('1999-02-03', 'Google', 5), ('1999-02-04', 'YouTube', 12), ('1999-02-04', 'Google', 31), ]) schema = StructType([ StructField('date', StringType(), False), StructField('access_site', StringType(), False), StructField('access_count', IntegerType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) # data_frame.show() # ★ここに注目★ data_frame.groupBy("access_site") \ .agg(F.sum(data_frame.access_count)) \ .show() print("Done") spark.stop()
出力結果
+-----------+-----------------+ |access_site|sum(access_count)| +-----------+-----------------+ | YouTube| 19| | Google| 41| +-----------+-----------------+
【1】min/max (最小/最大)
サンプル
from pyspark.sql import SparkSession from pyspark.sql.functions import min, max spark = SparkSession \ .builder \ .appName("Demo") \ .getOrCreate() df = spark.read.csv( "./hello_world.csv", header=True) df_min = df.groupBy('job') \ .agg(min('birth_date').alias('min_birth')) df_min.show() df_max = df.groupBy('job') \ .agg(max('birth_date').alias('max_birth')) df_max.show()
入力ファイル「hello_world.csv」
id,name,job,birth_date x0001,Mike,Sales,1991-01-01 x0002,Tom,IT,1971-11-21 x0003,Sam,Sales,1971-08-15 x0004,Kevin,Human resources,1971-04-08 x0005,Bob,,2000-09-09 x0006,Alice,Banking,1981-06-29 x0007,Carol,,1956-10-12 x0008,Tom,Banking,1987-06-27 x0009,Mike,IT,1967-10-12 x0010,Bob,,1989-12-31
出力結果
+---------------+----------+ | job| min_birth| +---------------+----------+ | Sales|1971-08-15| | null|1956-10-12| | IT|1967-10-12| |Human resources|1971-04-08| | Banking|1981-06-29| +---------------+----------+ +---------------+----------+ | job| max_birth| +---------------+----------+ | Sales|1991-01-01| | null|2000-09-09| | IT|1971-11-21| |Human resources|1971-04-08| | Banking|1987-06-27| +---------------+----------+
【2】count (カウント)
サンプル
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Demo") \ .getOrCreate() df = spark.read.csv( "./hello_world.csv", header=True) # Count print(f"Row count = {df.count()}") # Null count (filter + count) df_null = df.filter(df["job"].isNull()) print(f"null count = {df_null.count()}") # Count each job (filter + count) df_job = df.groupby('job') \ .count() \ .sort("count", ascending=False) df_job.show()
出力結果
Row count = 10 null count = 3 +---------------+-----+ | job|count| +---------------+-----+ | null| 3| | Sales| 2| | IT| 2| | Banking| 2| |Human resources| 1| +---------------+-----+
【3】countDistinct (重複カウント)
* DataFrameで SELECT COUNT( DICTINCT (xxx)) 構文が 使えないため、個別のメソッドが用意されている
API仕様
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.countDistinct.html
サンプル
from pyspark.sql.functions import countDistinct # Job count df_job_count = df.select(countDistinct('job').alias('job_count')) df_job_count.show()
出力結果
+---------+ |job_count| +---------+ | 4| +---------+
参考文献
https://x1.inkenkun.com/archives/5207
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
https://qiita.com/wwacky/items/e687c0ef05ae7f1de980
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
Hive / HiveQL ~ 日時・日付操作編 ~
https://dk521123.hatenablog.com/entry/2021/02/11/233633