【分散処理】PySpark ~ DataFrame / データ操作編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2019/11/24/225534

の続き。
PySpark でのデータの基本操作について、学ぶ。

目次

【0】collect(ループさせる)
【1】filter (抽出)
【2】groupby (グループ化)
【3】distinct / dropDuplicates (重複値の削除)
【4】orderBy (ソート)
【5】sql (SQL文)

【0】collect(ループさせる)

* 全てのレコードを Row (pyspark.sql.types.Row) の list で返す

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

def to_data_frame(spark_context, row):
  values = to_tuple(row)
  return spark_context.parallelize([values]).toDF()

def to_tuple(row):
  return tuple(row.asDict().values())

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales'),
  (2, 'Tom', 65, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 28, 'Human resources'),
  (5, 'Bob', 25, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

data_frame_even = spark.createDataFrame([], schema)
data_frame_odd = spark.createDataFrame([], schema)
for row in data_frame.collect():
  row_df = to_data_frame(spark_context, row)
  if int(row.id) % 2 == 0:
    data_frame_even = data_frame_even.union(row_df)
  else:
    data_frame_odd = data_frame_odd.union(row_df)

data_frame_even.show()
print("++++++++")
data_frame_odd.show()

出力結果

+---+-----+---+---------------+
| id| name|age|            job|
+---+-----+---+---------------+
|  2|  Tom| 65|             IT|
|  4|Kevin| 28|Human resources|
|  6|Alice| 20|        Banking|
+---+-----+---+---------------+

++++++++
+---+-----+---+-----+
| id| name|age|  job|
+---+-----+---+-----+
|  1| Mike| 45|Sales|
|  3|  Sam| 32|Sales|
|  5|  Bob| 25|   IT|
|  7|Carol| 30|   IT|
+---+-----+---+-----+

参考文献
https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/

【1】filter (抽出)

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales'),
  (2, 'Tom', 65, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 28, 'Human resources'),
  (5, 'Bob', 25, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

# filter - 1
data_frame.filter((data_frame.age >= 20) & (data_frame.age < 30)).show()

print("***********")

# filter - 2
data_frame.filter("id in (1, 2)").show()

出力結果

+---+-----+---+---------------+
| id| name|age|            job|
+---+-----+---+---------------+
|  4|Kevin| 28|Human resources|
|  5|  Bob| 25|             IT|
|  6|Alice| 20|        Banking|
+---+-----+---+---------------+

+---+----+---+-----+
| id|name|age|  job|
+---+----+---+-----+
|  1|Mike| 45|Sales|
|  2| Tom| 65|   IT|
+---+----+---+-----+

【2】groupby (グループ化)

サンプル

# ... 略 ...

# groupby & count
df_job = data_frame.groupby('job')
df_job.show()

出力結果

+---------------+
|            job|
+---------------+
|             IT|
|          Sales|
|Human resources|
|        Banking|
+---------------+

【3】distinct / dropDuplicates (重複値の削除)

1)distinct

サンプル

rdd = spark_context.parallelize([
  (1, 'Mike', 32, 'Sales'),
  (2, 'Tom', 20, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 30, 'Human resources'),
  (5, 'Bob', 30, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])

# ... 略 ...

# 重複値の削除
count_for_age = data_frame.select("age").distinct().count()
print(count_for_age)

出力結果

3

2)dropDuplicates

* 重複値を削除した DataFrame(別インスタンス) を返す
* 項目も指定できる

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

salary_data = [
  ("Mike", "Sales", 3000), \
  ("Tom", "Sales", 4600), \
  ("Robert", "Sales", 4100), \
  ("Maria", "Finance", 3000), \
  ("Mike", "Sales", 3000), \
  ("Scott", "Finance", 3300), \
  ("Ken", "Finance", 3900), \
  ("Jeff", "Marketing", 3000), \
  ("Kevin", "Marketing", 2000), \
  ("Sam", "Sales", 4100) \
]

columns= ["name", "department", "salary"]

data_frame = spark.createDataFrame(
  data=salary_data,
  schema=columns)
data_frame.printSchema()
data_frame.show(truncate=False)

# Distinct
distinct_df = data_frame.distinct()
print("Distinct count: " + str(distinct_df.count()))
distinct_df.show(truncate=False)

# Drop duplicates
drop_duplicate_df = data_frame.dropDuplicates()
print("Drop duplicates count: " + str(drop_duplicate_df.count()))
drop_duplicate_df.show(truncate=False)

# Drop duplicates on selected columns
drop_duplicate_df = data_frame.dropDuplicates(["department", "salary"])
print("Distinct count of department salary : " + str(drop_duplicate_df.count()))
drop_duplicate_df.show(truncate=False)

出力結果

root
 |-- name: string (nullable = true)      
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)      

+------+----------+------+
|name  |department|salary|
+------+----------+------+
|Mike  |Sales     |3000  |
|Tom   |Sales     |4600  |
|Robert|Sales     |4100  |
|Maria |Finance   |3000  |
|Mike  |Sales     |3000  |
|Scott |Finance   |3300  |
|Ken   |Finance   |3900  |
|Jeff  |Marketing |3000  |
|Kevin |Marketing |2000  |
|Sam   |Sales     |4100  |
+------+----------+------+

Distinct count: 9
+------+----------+------+
|name  |department|salary|
+------+----------+------+
|Mike  |Sales     |3000  |
|Kevin |Marketing |2000  |
|Tom   |Sales     |4600  |
|Scott |Finance   |3300  |
|Ken   |Finance   |3900  |
|Robert|Sales     |4100  |
|Jeff  |Marketing |3000  |
|Sam   |Sales     |4100  |
|Maria |Finance   |3000  |
+------+----------+------+

Drop duplicates count: 9
+------+----------+------+
|name  |department|salary|
+------+----------+------+
|Mike  |Sales     |3000  |
|Kevin |Marketing |2000  |
|Tom   |Sales     |4600  |
|Scott |Finance   |3300  |
|Ken   |Finance   |3900  |
|Robert|Sales     |4100  |
|Jeff  |Marketing |3000  |
|Sam   |Sales     |4100  |
|Maria |Finance   |3000  |
+------+----------+------+

Distinct count of department salary : 8 <- Sam のデータがない
+------+----------+------+
|name  |department|salary|
+------+----------+------+
|Tom   |Sales     |4600  |
|Robert|Sales     |4100  |
|Ken   |Finance   |3900  |
|Maria |Finance   |3000  |
|Scott |Finance   |3300  |
|Kevin |Marketing |2000  |
|Mike  |Sales     |3000  |
|Jeff  |Marketing |3000  |
+------+----------+------+

【4】orderBy (ソート)

サンプル

from pyspark.sql.functions import col

# 年齢、ID 順番に並べる
data_frame.orderBy(col('age').asc(), col('id').desc()).show()

出力結果

+---+-----+---+---------------+
| id| name|age|            job|
+---+-----+---+---------------+
|  6|Alice| 20|        Banking|
|  5|  Bob| 25|             IT|
|  4|Kevin| 28|Human resources|
|  7|Carol| 30|             IT|
|  3|  Sam| 32|          Sales|
|  1| Mike| 45|          Sales|
|  2|  Tom| 65|             IT|
+---+-----+---+---------------+

【5】sql (SQL文)

サンプル

# ... 略 ...

# createOrReplaceTempView() は以下の関連記事を参照
# 
data_frame.createOrReplaceTempView("demo_table")
data_frame_by_id = spark.sql('SELECT * FROM demo_table WHERE id=3')
data_frame_by_id.show()

出力結果

+---+----+---+-----+
| id|name|age|  job|
+---+----+---+-----+
|  3| Sam| 32|Sales|
+---+----+---+-----+

参考文献

https://blog.amedama.jp/entry/2018/03/03/173257
https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/
https://kotaeta.com/62339762
https://qiita.com/wwacky/items/e687c0ef05ae7f1de980

関連記事

PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
Spark ~ FutureAction ~
https://dk521123.hatenablog.com/entry/2023/04/18/234214