【分散処理】PySpark ~ CSV / Null・空文字・異常値の扱い ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/04/29/075903

の続き。

今回は、Null および 空文字 について、扱う。

 調べてみると、読み込み時において、
それ以外の異常値(e.g. non-number)についても
指定できるらしいので、それらについても、載せておく。

目次

【1】Null / 空文字 / 異常値 の扱い
 1)書き込み時
  a) nullValue (Optional)
  b) emptyValue (Optional)
 2)読み込み時
  a) nullValue (Optional)
  b) emptyValue (Optional)
  c) nanValue (Optional)
  d) positiveInf (Optional)  
  e) negativeInf (Optional)
【2】補足1:NaN/正・負の無限大
【3】補足2:破損データ「columnNameOfCorruptRecord」
【4】使用上の注意
【5】サンプル
 例1:書き込み時
 例2:読み込み時

【1】Null / 空文字 / 異常値 の扱い

nullValue="指定値",  emptyValue="指定値"
などで、指定する。

1)書き込み時

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html
a) nullValue (Optional)

* Null文字の値を設定する。
* Noneの場合は、デフォルト値(空文字)を使う

b) emptyValue (Optional)

* 空文字の値を設定する。
* Noneの場合は、デフォルト値("")を使う

2)読み込み時

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html

a) nullValue (Optional)

* Null文字の値を設定する。
* Noneの場合は、デフォルト値(空文字)を使う
* v2.0.1以降、文字列型を含む全ての値をサポートしている

b) emptyValue (Optional)

* 空文字の値を設定する。
* Noneの場合は、デフォルト値(空文字)を使う

c) nanValue (Optional)

* 非数値(non-number)の値を設定する。
* Noneの場合は、デフォルト値(NaN)を使う

d) positiveInf (Optional)

* 正の無限大(positive infinity)の値を設定する。
* Noneの場合は、デフォルト値(Inf)を使う

e) negativeInf (Optional)

* 負の無限大(negative infinity)の値を設定する。
* Noneの場合は、デフォルト値(Inf)を使う

【2】補足1:NaN/正・負の無限大

http://mogile.web.fc2.com/spark/sql-ref-datatypes.html#floating-point-special-values

より抜粋
~~~~~~~~~~~~~~~~~
・Inf/+Inf/Infinity/+Infinity: 正の無限大
 FloatType: Scala の Float.PositiveInfinity と等価。
 DoubleType: Scala の Double.PositiveInfinity と等価。
・-Inf/-Infinity: 負の無限大
 FloatType: Scala の Float.NegativeInfinity と等価。
 DoubleType: Scala の Double.NegativeInfinity と等価。
・NaN: 数値ではない
 FloatType: Scala の Float.NaN と等価。
 DoubleType: Scala の Double.NaN と等価。
~~~~~~~~~~~~~~~~~

SELECT double('infinity') AS col;
+--------+
|     col|
+--------+
|Infinity|
+--------+

SELECT float('-inf') AS col;
+---------+
|      col|
+---------+
|-Infinity|
+---------+

SELECT float('NaN') AS col;
+---+
|col|
+---+
|NaN|
+---+

SELECT double('infinity') * 0 AS col;
+---+
|col|
+---+
|NaN|
+---+

SELECT double('-infinity') * (-1234567) AS col;
+--------+
|     col|
+--------+
|Infinity|
+--------+

SELECT double('infinity') < double('NaN') AS col;
+----+
| col|
+----+
|true|
+----+

SELECT double('NaN') = double('NaN') AS col;
+----+
| col|
+----+
|true|
+----+

SELECT double('inf') = double('infinity') AS col;
+----+
| col|
+----+
|true|
+----+

CREATE TABLE test (c1 int, c2 double);
INSERT INTO test VALUES (1, double('infinity'));
INSERT INTO test VALUES (2, double('infinity'));
INSERT INTO test VALUES (3, double('inf'));
INSERT INTO test VALUES (4, double('-inf'));
INSERT INTO test VALUES (5, double('NaN'));
INSERT INTO test VALUES (6, double('NaN'));
INSERT INTO test VALUES (7, double('-infinity'));
SELECT COUNT(*), c2 FROM test GROUP BY c2;
+---------+---------+
| count(1)|       c2|
+---------+---------+
|        2|      NaN|
|        2|-Infinity|
|        3| Infinity|
+---------+---------+

【3】補足2:破損データ「columnNameOfCorruptRecord」

* 以下の関連記事を参照のこと。

PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845

【4】使用上の注意

環境によって、使えないプロパティもある模様。

https://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-spark-s3select.html

より抜粋
~~~~~~
nanValue、positiveInf、negativeInf などの
Spark CSV と JSON のオプションや
破損した記録に関連する
オプション (failfast および dropmalformed モードなど) は
サポートされません。
~~~~~~

【5】サンプル

Hiveのファイルに合わせてみた。

https://wyukawa.hatenablog.com/entry/20110713/1310550302

例1:書き込み時

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  (1, 'Mike', 32, 172.2, 'Sales'),
  (None, 'Tom', 20, 167.8, 'IT'),
  (3, None, 32, 201.1, 'Sales'),
  (4, 'Kevin', None, 157.5, 'Human resources'),
  (5, 'Bob', 30, float('nan'), None),
  (6, '', 20, float('+Infinity'), 'Banking'),
  (7, 'Carol', 30, float('-Infinity'), ''),
])

schema = StructType([
  StructField('id', IntegerType(), True),
  StructField('name', StringType(), True),
  StructField('age', IntegerType(), True),
  StructField('height', FloatType(), True),
  StructField('job', StringType(), True),
])
data_frame = spark.createDataFrame(rdd, schema)
data_frame.show()

data_frame.repartition(1).write \
  .mode('overwrite') \
  .csv('./out',
    sep=",",
    header=True,
    nullValue=r"\N",
    emptyValue=r"\N")

print("Done")
spark.stop()

出力結果

+----+-----+----+---------+---------------+
|  id| name| age|   height|            job|
+----+-----+----+---------+---------------+
|   1| Mike|  32|    172.2|          Sales|
|null|  Tom|  20|    167.8|             IT|
|   3| null|  32|    201.1|          Sales|
|   4|Kevin|null|    157.5|Human resources|
|   5|  Bob|  30|      NaN|           null|
|   6|     |  20| Infinity|        Banking|
|   7|Carol|  30|-Infinity|               |
+----+-----+----+---------+---------------+

出力結果(ファイル)

id,name,age,height,job
1,Mike,32,172.2,Sales
\N,Tom,20,167.8,IT
3,\N,32,201.1,Sales
4,Kevin,\N,157.5,Human resources
5,Bob,30,NaN,\N
6,\N,20,Infinity,Banking
7,Carol,30,-Infinity,\N

例2:読み込み時

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

data_frame = spark.read.csv(
  "test.csv",
  sep=",",
  header=True,
  nullValue=r"\N",
  emptyValue=r"\N")
data_frame.show()

print("Done")
spark.stop()

入力ファイル「test.csv

id,name,age,job
1,Mike,32,Sales
\N,Tom,20,IT
3,\N,32,Sales
4,Kevin,\N,Human resources
5,Bob,44,\N
6,\N,33,Banking
7,Carol,\N,\N

出力結果

+----+-----+----+---------------+
|  id| name| age|            job|
+----+-----+----+---------------+
|   1| Mike|  32|          Sales|
|null|  Tom|  20|             IT|
|   3| null|  32|          Sales|
|   4|Kevin|null|Human resources|
|   5|  Bob|  44|           null|
|   6| null|  33|        Banking|
|   7|Carol|null|           null|
+----+-----+----+---------------+

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415