■ はじめに
https://dk521123.hatenablog.com/entry/2020/01/04/150942
の続き。 今回は、PySpark でのデータフレーム(DataFrame)の データ項目を操作する方法を纏める。
目次
【1】データ項目名 1)データ項目の定義 2)データ項目名の変更 3)データ項目の追加 4)データ項目の削除 Z)サンプル [補足] StructTypeから項目名を取得する 【2】 一時ビュー / テーブル作成 1)registerTempTable 2)createTempView / createOrReplaceTempView 3)createGlobalTempView / createOrReplaceGlobalTempView Z)サンプル
【1】データ項目名
1)データ項目の定義
StructType/StructField/[データ型]Type(e.g. StringType)で定義する 以下のサンプル以外には、以下の関連記事でも行っている
https://dk521123.hatenablog.com/entry/2019/11/24/225534
2)データ項目名の変更
data_frame.withColumnRenamed('変更前', '変更後')
3)データ項目の追加
data_frame.withColumn('項目名', '項目値')
使用上の注意:文字列以外の項目値の設定
lit()を使う。 => lit() ... リテラル値(literal=直定数)を返す
https://x1.inkenkun.com/archives/5308
# 数字などの定数を入れる場合は、lit() を使う data_frame.withColumn('項目名', lit(数字)) # Noneの場合は、 以下の通り(データ型 e.g. (StringType()) data_frame.withColumn('項目名', lit(None).cast([データ型]))
補足
以下の関連記事でも使用している
https://dk521123.hatenablog.com/entry/2020/07/09/000832
4)データ項目の削除
data_frame.drop('削除項目名')
Z)サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql.functions import lit spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ (1, 'Mike', 32, 'Sales'), (2, 'Tom', 20, 'IT'), (3, 'Sam', 32, 'Sales'), (4, 'Kevin', 30, 'Human resources'), (5, 'Bob', 30, 'IT'), (6, 'Alice', 20, 'Banking'), (7, 'Carol', 30, 'IT'), ]) # StructFiled([項目名], [データ型], [Nullを許容するか(True:許容する)]) schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), False), StructField('age', IntegerType(), False), StructField('job', StringType(), False), ]) data_frame = spark.createDataFrame(rdd, schema) data_frame = data_frame \ .withColumnRenamed('id', 'person_id') \ .withColumnRenamed('name', 'person_name') \ .drop('age') \ .drop('job') \ .withColumn('dummy_number', lit(10)) \ .withColumn('remarks', lit(None).cast(StringType())) data_frame.show()
出力結果
+---------+-----------+------------+-------+ |person_id|person_name|dummy_number|remarks| +---------+-----------+------------+-------+ | 1| Mike| 10| null| | 2| Tom| 10| null| | 3| Sam| 10| null| | 4| Kevin| 10| null| | 5| Bob| 10| null| | 6| Alice| 10| null| | 7| Carol| 10| null| +---------+-----------+------------+-------+
[補足] StructTypeから項目名を取得する
StructType(以下の「schema 」)から項目名を取得する必要があったのでメモ。
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructField, StructType spark_context = SparkContext() spark = SparkSession(spark_context) header = ["s.no", "name", "Country", "Dummy"] columns = [StructField(item, StringType(), True) for item in header] schema = StructType(columns) # ★ここ★ for field in schema: print(field.name) # "s.no", "name", "Country", "Dummy"が表示
【1】 一時ビュー / テーブル作成
1)registerTempTable (From ver1.3, ver2.0 : 非推奨)
指定した名前で一時テーブルとして登録する
使用上の注意:Ver2.0系では非推奨
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable
より抜粋 ~~~~ Deprecated in 2.0, use createOrReplaceTempView instead. # 2.0 では非推奨。createOrReplaceTempView() を変わりに使ってください。 ~~~~
2)createTempView / createOrReplaceTempView (From ver2.0)
ローカル(Lifetime : SparkSession) 一時ビュー作成 (createTempView の場合、同名のビューがあれば例外「TempTableAlreadyExistsException」発生)
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createTempView
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createOrReplaceTempView
削除するには、 dropTempView()
3)createGlobalTempView / createOrReplaceGlobalTempView (From ver2.1 / 2.2)
グローバル(Lifetime : Spark application) 一時ビュー作成 (createGlobalTempView の場合、同名のビューがあれば例外「TempTableAlreadyExistsException」発生)
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createGlobalTempView
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createOrReplaceGlobalTempView
削除するには、 dropGlobalTempView()
Z)サンプル
# 略 # 作成 data_frame.createOrReplaceTempView("demo_table") data_frame_by_id = spark.sql('SELECT * FROM demo_table WHERE id=3') data_frame_by_id.show() # 削除 spark.catalog.dropTempView("demo_table")
出力結果
+---+----+---+-----+ | id|name|age| job| +---+----+---+-----+ | 3| Sam| 32|Sales| +---+----+---+-----+
参考文献
http://data-analysis-stats.jp/2019/04/06/pyspark%E3%81%A7dataframe%E3%81%AB%E5%88%97%E3%82%92%E8%BF%BD%E5%8A%A0%E3%81%99%E3%82%8B%E6%96%B9%E6%B3%95/
https://note.nkmk.me/python-pandas-drop/
関連記事
PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
Glue から DataCatalogテーブル に対して Spark SQLを実行する
https://dk521123.hatenablog.com/entry/2021/05/11/220731