■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2019/11/24/225534
の続き。 PySpark でのデータの基本操作について、学ぶ。
目次
【0】collect(ループさせる) 【1】filter (抽出) 【2】groupby (グループ化) 【3】distinct / dropDuplicates (重複値の削除) 【4】orderBy (ソート) 【5】sql (SQL文)
【0】collect(ループさせる)
* 全てのレコードを Row (pyspark.sql.types.Row) の list で返す
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType def to_data_frame(spark_context, row): values = to_tuple(row) return spark_context.parallelize([values]).toDF() def to_tuple(row): return tuple(row.asDict().values()) spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ (1, 'Mike', 45, 'Sales'), (2, 'Tom', 65, 'IT'), (3, 'Sam', 32, 'Sales'), (4, 'Kevin', 28, 'Human resources'), (5, 'Bob', 25, 'IT'), (6, 'Alice', 20, 'Banking'), (7, 'Carol', 30, 'IT'), ]) schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), False), StructField('age', IntegerType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) data_frame_even = spark.createDataFrame([], schema) data_frame_odd = spark.createDataFrame([], schema) for row in data_frame.collect(): row_df = to_data_frame(spark_context, row) if int(row.id) % 2 == 0: data_frame_even = data_frame_even.union(row_df) else: data_frame_odd = data_frame_odd.union(row_df) data_frame_even.show() print("++++++++") data_frame_odd.show()
出力結果
+---+-----+---+---------------+ | id| name|age| job| +---+-----+---+---------------+ | 2| Tom| 65| IT| | 4|Kevin| 28|Human resources| | 6|Alice| 20| Banking| +---+-----+---+---------------+ ++++++++ +---+-----+---+-----+ | id| name|age| job| +---+-----+---+-----+ | 1| Mike| 45|Sales| | 3| Sam| 32|Sales| | 5| Bob| 25| IT| | 7|Carol| 30| IT| +---+-----+---+-----+
参考文献
https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/
【1】filter (抽出)
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ (1, 'Mike', 45, 'Sales'), (2, 'Tom', 65, 'IT'), (3, 'Sam', 32, 'Sales'), (4, 'Kevin', 28, 'Human resources'), (5, 'Bob', 25, 'IT'), (6, 'Alice', 20, 'Banking'), (7, 'Carol', 30, 'IT'), ]) schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), False), StructField('age', IntegerType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) # filter - 1 data_frame.filter((data_frame.age >= 20) & (data_frame.age < 30)).show() print("***********") # filter - 2 data_frame.filter("id in (1, 2)").show()
出力結果
+---+-----+---+---------------+ | id| name|age| job| +---+-----+---+---------------+ | 4|Kevin| 28|Human resources| | 5| Bob| 25| IT| | 6|Alice| 20| Banking| +---+-----+---+---------------+ +---+----+---+-----+ | id|name|age| job| +---+----+---+-----+ | 1|Mike| 45|Sales| | 2| Tom| 65| IT| +---+----+---+-----+
【2】groupby (グループ化)
サンプル
# ... 略 ... # groupby & count df_job = data_frame.groupby('job') df_job.show()
出力結果
+---------------+ | job| +---------------+ | IT| | Sales| |Human resources| | Banking| +---------------+
【3】distinct / dropDuplicates (重複値の削除)
1)distinct
サンプル
rdd = spark_context.parallelize([ (1, 'Mike', 32, 'Sales'), (2, 'Tom', 20, 'IT'), (3, 'Sam', 32, 'Sales'), (4, 'Kevin', 30, 'Human resources'), (5, 'Bob', 30, 'IT'), (6, 'Alice', 20, 'Banking'), (7, 'Carol', 30, 'IT'), ]) # ... 略 ... # 重複値の削除 count_for_age = data_frame.select("age").distinct().count() print(count_for_age)
出力結果
3
2)dropDuplicates
* 重複値を削除した DataFrame(別インスタンス) を返す * 項目も指定できる
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) salary_data = [ ("Mike", "Sales", 3000), \ ("Tom", "Sales", 4600), \ ("Robert", "Sales", 4100), \ ("Maria", "Finance", 3000), \ ("Mike", "Sales", 3000), \ ("Scott", "Finance", 3300), \ ("Ken", "Finance", 3900), \ ("Jeff", "Marketing", 3000), \ ("Kevin", "Marketing", 2000), \ ("Sam", "Sales", 4100) \ ] columns= ["name", "department", "salary"] data_frame = spark.createDataFrame( data=salary_data, schema=columns) data_frame.printSchema() data_frame.show(truncate=False) # Distinct distinct_df = data_frame.distinct() print("Distinct count: " + str(distinct_df.count())) distinct_df.show(truncate=False) # Drop duplicates drop_duplicate_df = data_frame.dropDuplicates() print("Drop duplicates count: " + str(drop_duplicate_df.count())) drop_duplicate_df.show(truncate=False) # Drop duplicates on selected columns drop_duplicate_df = data_frame.dropDuplicates(["department", "salary"]) print("Distinct count of department salary : " + str(drop_duplicate_df.count())) drop_duplicate_df.show(truncate=False)
出力結果
root |-- name: string (nullable = true) |-- department: string (nullable = true) |-- salary: long (nullable = true) +------+----------+------+ |name |department|salary| +------+----------+------+ |Mike |Sales |3000 | |Tom |Sales |4600 | |Robert|Sales |4100 | |Maria |Finance |3000 | |Mike |Sales |3000 | |Scott |Finance |3300 | |Ken |Finance |3900 | |Jeff |Marketing |3000 | |Kevin |Marketing |2000 | |Sam |Sales |4100 | +------+----------+------+ Distinct count: 9 +------+----------+------+ |name |department|salary| +------+----------+------+ |Mike |Sales |3000 | |Kevin |Marketing |2000 | |Tom |Sales |4600 | |Scott |Finance |3300 | |Ken |Finance |3900 | |Robert|Sales |4100 | |Jeff |Marketing |3000 | |Sam |Sales |4100 | |Maria |Finance |3000 | +------+----------+------+ Drop duplicates count: 9 +------+----------+------+ |name |department|salary| +------+----------+------+ |Mike |Sales |3000 | |Kevin |Marketing |2000 | |Tom |Sales |4600 | |Scott |Finance |3300 | |Ken |Finance |3900 | |Robert|Sales |4100 | |Jeff |Marketing |3000 | |Sam |Sales |4100 | |Maria |Finance |3000 | +------+----------+------+ Distinct count of department salary : 8 <- Sam のデータがない +------+----------+------+ |name |department|salary| +------+----------+------+ |Tom |Sales |4600 | |Robert|Sales |4100 | |Ken |Finance |3900 | |Maria |Finance |3000 | |Scott |Finance |3300 | |Kevin |Marketing |2000 | |Mike |Sales |3000 | |Jeff |Marketing |3000 | +------+----------+------+
【4】orderBy (ソート)
サンプル
from pyspark.sql.functions import col # 年齢、ID 順番に並べる data_frame.orderBy(col('age').asc(), col('id').desc()).show()
出力結果
+---+-----+---+---------------+ | id| name|age| job| +---+-----+---+---------------+ | 6|Alice| 20| Banking| | 5| Bob| 25| IT| | 4|Kevin| 28|Human resources| | 7|Carol| 30| IT| | 3| Sam| 32| Sales| | 1| Mike| 45| Sales| | 2| Tom| 65| IT| +---+-----+---+---------------+
【5】sql (SQL文)
サンプル
# ... 略 ... # createOrReplaceTempView() は以下の関連記事を参照 # data_frame.createOrReplaceTempView("demo_table") data_frame_by_id = spark.sql('SELECT * FROM demo_table WHERE id=3') data_frame_by_id.show()
出力結果
+---+----+---+-----+ | id|name|age| job| +---+----+---+-----+ | 3| Sam| 32|Sales| +---+----+---+-----+
参考文献
https://blog.amedama.jp/entry/2018/03/03/173257
https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/
https://kotaeta.com/62339762
https://qiita.com/wwacky/items/e687c0ef05ae7f1de980
関連記事
PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
Spark ~ FutureAction ~
https://dk521123.hatenablog.com/entry/2023/04/18/234214