【分散処理】PySpark ~ DataFrame / テーブル・項目操作編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/01/04/150942

の続き。
今回は、PySpark でのデータフレーム(DataFrame)の
データ項目を操作する方法を纏める。

目次

【1】データ項目名
 1)データ項目の定義
 2)データ項目名の変更
 3)データ項目の追加
 4)データ項目の削除
 Z)サンプル
 [補足] StructTypeから項目名を取得する
【2】 一時ビュー / テーブル作成
 1)registerTempTable
 2)createTempView / createOrReplaceTempView
 3)createGlobalTempView / createOrReplaceGlobalTempView
 Z)サンプル

【1】データ項目名

1)データ項目の定義

StructType/StructField/[データ型]Type(e.g. StringType)で定義する

以下のサンプル以外には、以下の関連記事でも行っている

https://dk521123.hatenablog.com/entry/2019/11/24/225534

2)データ項目名の変更

data_frame.withColumnRenamed('変更前', '変更後')

3)データ項目の追加

data_frame.withColumn('項目名', '項目値')

使用上の注意:文字列以外の項目値の設定

lit()を使う。
 => lit() ... リテラル値(literal=直定数)を返す

https://x1.inkenkun.com/archives/5308

# 数字などの定数を入れる場合は、lit() を使う
data_frame.withColumn('項目名', lit(数字))

# Noneの場合は、 以下の通り(データ型 e.g. (StringType())
data_frame.withColumn('項目名', lit(None).cast([データ型]))

補足

以下の関連記事でも使用している

https://dk521123.hatenablog.com/entry/2020/07/09/000832

4)データ項目の削除

data_frame.drop('削除項目名')

Z)サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 32, 'Sales'),
  (2, 'Tom', 20, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 30, 'Human resources'),
  (5, 'Bob', 30, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
# StructFiled([項目名], [データ型], [Nullを許容するか(True:許容する)])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

data_frame = data_frame \
  .withColumnRenamed('id', 'person_id') \
  .withColumnRenamed('name', 'person_name') \
  .drop('age') \
  .drop('job') \
  .withColumn('dummy_number', lit(10)) \
  .withColumn('remarks', lit(None).cast(StringType()))
data_frame.show()

出力結果

+---------+-----------+------------+-------+
|person_id|person_name|dummy_number|remarks|
+---------+-----------+------------+-------+
|        1|       Mike|          10|   null|
|        2|        Tom|          10|   null|
|        3|        Sam|          10|   null|
|        4|      Kevin|          10|   null|
|        5|        Bob|          10|   null|
|        6|      Alice|          10|   null|
|        7|      Carol|          10|   null|
+---------+-----------+------------+-------+

[補足] StructTypeから項目名を取得する

StructType(以下の「schema 」)から項目名を取得する必要があったのでメモ。

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType

spark_context = SparkContext()
spark = SparkSession(spark_context)
header = ["s.no", "name", "Country", "Dummy"]

columns = [StructField(item, StringType(), True) for item in header]
schema = StructType(columns)

# ★ここ★
for field in schema:
  print(field.name) # "s.no", "name", "Country", "Dummy"が表示

【1】 一時ビュー / テーブル作成

1)registerTempTable (From ver1.3, ver2.0 : 非推奨)

指定した名前で一時テーブルとして登録する

使用上の注意:Ver2.0系では非推奨
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable

より抜粋
~~~~
Deprecated in 2.0, use createOrReplaceTempView instead.
# 2.0 では非推奨。createOrReplaceTempView() を変わりに使ってください。
~~~~

2)createTempView / createOrReplaceTempView (From ver2.0)

ローカル(Lifetime : SparkSession) 一時ビュー作成
(createTempView の場合、同名のビューがあれば例外「TempTableAlreadyExistsException」発生)

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createTempView
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createOrReplaceTempView

削除するには、 dropTempView()

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava#pyspark.sql.Catalog.dropTempView

3)createGlobalTempView / createOrReplaceGlobalTempView (From ver2.1 / 2.2)

グローバル(Lifetime : Spark application) 一時ビュー作成
(createGlobalTempView の場合、同名のビューがあれば例外「TempTableAlreadyExistsException」発生)

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createGlobalTempView
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createOrReplaceGlobalTempView

削除するには、 dropGlobalTempView()

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava#pyspark.sql.Catalog.dropGlobalTempView

Z)サンプル

# 略

# 作成
data_frame.createOrReplaceTempView("demo_table")
data_frame_by_id = spark.sql('SELECT * FROM demo_table WHERE id=3')
data_frame_by_id.show()

# 削除
spark.catalog.dropTempView("demo_table")

出力結果

+---+----+---+-----+
| id|name|age|  job|
+---+----+---+-----+
|  3| Sam| 32|Sales|
+---+----+---+-----+

参考文献

http://data-analysis-stats.jp/2019/04/06/pyspark%E3%81%A7dataframe%E3%81%AB%E5%88%97%E3%82%92%E8%BF%BD%E5%8A%A0%E3%81%99%E3%82%8B%E6%96%B9%E6%B3%95/
https://note.nkmk.me/python-pandas-drop/

関連記事

PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
Glue から DataCatalogテーブル に対して Spark SQLを実行する
https://dk521123.hatenablog.com/entry/2021/05/11/220731