【分散処理】PySpark ~ RDD <=> DataFrame の相互変換 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/04/06/001709

より分冊および追記。

RDD <=> DataFrame の相互変換について扱う。

目次

【1】RDD => DataFrame
 1)createDataFrame()
 2)spark.read.csv()
  補足:TSVなど区切り文字を変更して変更したい場合
 3)toDF()
 補足:例外「TypeError: Can not infer schema for type <class 'str'>」発生時
 
【2】DataFrame => RDD
おまけとして、、、
【3】DataFrame(PySpark) => DataFrame(Pandas) への変換

【1】RDD => DataFrame

* 色々あるが状況に応じて使い分ければいい。

1)createDataFrame()
2)spark.read.csv()
3)toDF()

1)createDataFrame()

https://dk521123.hatenablog.com/entry/2020/01/04/150942

# より抜粋

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 45, 'Sales'),
  (2, 'Tom', 65, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 28, 'Human resources'),
  (5, 'Bob', 25, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])

# ★ここに注目★
data_frame = spark.createDataFrame(rdd, schema)

2)spark.read.csv()

# 主に textFile()で取得したRDDをDataFrameに変換する場合

# ヘッダーがなく、「|」区切りの場合
df = spark.read \
  .option("header", "False") \
  .format("csv") \
  .csv(rdd, sep="|")

補足:TSVなど区切り文字を変更して変更したい場合

* sep='【区切り文字】' でできる

サンプル

df = spark.read \
  .option("header", "False") \
  .format("csv") \
  .csv(rdd, sep='\t')

3)toDF()

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.textFile("./test.csv")
df = rdd.toDF(['row'])
df.show()

補足:例外「TypeError: Can not infer schema for type 」発生時

https://dk521123.hatenablog.com/entry/2021/05/18/185420

の対応時に、以下「NG時:例外発生時のコード」を実行したら
例外「TypeError: Can not infer schema for type <class 'str'>」が発生した。

以下のサイトを参考に「OK時:修正版」で対応した。

https://stackoverflow.com/questions/32742004/create-spark-dataframe-can-not-infer-schema-for-type-type-float
NG時:例外発生時のコード

data_frame = spark.createDataFrame(rdd)

OK時:修正版

from pyspark.sql imort Row

data_frame = rdd.map(Row("val")).toDF()

【2】DataFrame => RDD

* data_frame.rdd でOK

サンプル
https://dk521123.hatenablog.com/entry/2021/04/06/001709

# 『[補足] 不正と有効な行を効率よく分ける場合』のコードより抜粋
bad_row_rdd = df.rdd.filter(lambda x: x['is_bad_row'])

【3】DataFrame(PySpark) => DataFrame(Pandas) への変換

* toPandas() で簡単に変更できる
* ただし、toPandas() を使用して、大きいファイルサイズのデータを扱う場合は
 エラー「Total size ... is bigger than spark.driver.maxResultSize」に
 なる可能性があるので、注意が必要。
 (詳細は、以下の関連記事を参照のこと)

https://dk521123.hatenablog.com/entry/2021/04/22/131849

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 32, 'Sales'),
  (2, 'Tom', 20, 'IT'),
  (3, 'Sam', 32, 'Sales'),
  (4, 'Kevin', 30, 'Human resources'),
  (5, 'Bob', 30, 'IT'),
  (6, 'Alice', 20, 'Banking'),
  (7, 'Carol', 30, 'IT'),
])
# StructFiled([項目名], [データ型], [Nullを許容するか(True:許容する)])
schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('job', StringType(), False),
])
data_frame = spark.createDataFrame(rdd, schema)

# ★注目★
results = data_frame.toPandas().to_dict(orient='records')

print(results)

関連記事

PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark で 出力ファイル名を変更する
https://dk521123.hatenablog.com/entry/2021/05/12/003047
AWS Glue上で saveAsTextFile() を使ったら エラー「DirectOutputCommitter not found」が発生する
https://dk521123.hatenablog.com/entry/2021/05/18/185420
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849