■ はじめに
https://dk521123.hatenablog.com/entry/2021/04/29/075903
の続き。 今回は、Null および 空文字 について、扱う。 調べてみると、読み込み時において、 それ以外の異常値(e.g. non-number)についても 指定できるらしいので、それらについても、載せておく。
目次
【1】Null / 空文字 / 異常値 の扱い 1)書き込み時 a) nullValue (Optional) b) emptyValue (Optional) 2)読み込み時 a) nullValue (Optional) b) emptyValue (Optional) c) nanValue (Optional) d) positiveInf (Optional) e) negativeInf (Optional) 【2】補足1:NaN/正・負の無限大 【3】補足2:破損データ「columnNameOfCorruptRecord」 【4】使用上の注意 【5】サンプル 例1:書き込み時 例2:読み込み時
【1】Null / 空文字 / 異常値 の扱い
nullValue="指定値", emptyValue="指定値" などで、指定する。
1)書き込み時
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html
a) nullValue (Optional)
* Null文字の値を設定する。 * Noneの場合は、デフォルト値(空文字)を使う
b) emptyValue (Optional)
* 空文字の値を設定する。 * Noneの場合は、デフォルト値("")を使う
2)読み込み時
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html
a) nullValue (Optional)
* Null文字の値を設定する。 * Noneの場合は、デフォルト値(空文字)を使う * v2.0.1以降、文字列型を含む全ての値をサポートしている
b) emptyValue (Optional)
* 空文字の値を設定する。 * Noneの場合は、デフォルト値(空文字)を使う
c) nanValue (Optional)
* 非数値(non-number)の値を設定する。 * Noneの場合は、デフォルト値(NaN)を使う
d) positiveInf (Optional)
* 正の無限大(positive infinity)の値を設定する。 * Noneの場合は、デフォルト値(Inf)を使う
e) negativeInf (Optional)
* 負の無限大(negative infinity)の値を設定する。 * Noneの場合は、デフォルト値(Inf)を使う
【2】補足1:NaN/正・負の無限大
http://mogile.web.fc2.com/spark/sql-ref-datatypes.html#floating-point-special-values
より抜粋 ~~~~~~~~~~~~~~~~~ ・Inf/+Inf/Infinity/+Infinity: 正の無限大 FloatType: Scala の Float.PositiveInfinity と等価。 DoubleType: Scala の Double.PositiveInfinity と等価。 ・-Inf/-Infinity: 負の無限大 FloatType: Scala の Float.NegativeInfinity と等価。 DoubleType: Scala の Double.NegativeInfinity と等価。 ・NaN: 数値ではない FloatType: Scala の Float.NaN と等価。 DoubleType: Scala の Double.NaN と等価。 ~~~~~~~~~~~~~~~~~
例
SELECT double('infinity') AS col; +--------+ | col| +--------+ |Infinity| +--------+ SELECT float('-inf') AS col; +---------+ | col| +---------+ |-Infinity| +---------+ SELECT float('NaN') AS col; +---+ |col| +---+ |NaN| +---+ SELECT double('infinity') * 0 AS col; +---+ |col| +---+ |NaN| +---+ SELECT double('-infinity') * (-1234567) AS col; +--------+ | col| +--------+ |Infinity| +--------+ SELECT double('infinity') < double('NaN') AS col; +----+ | col| +----+ |true| +----+ SELECT double('NaN') = double('NaN') AS col; +----+ | col| +----+ |true| +----+ SELECT double('inf') = double('infinity') AS col; +----+ | col| +----+ |true| +----+ CREATE TABLE test (c1 int, c2 double); INSERT INTO test VALUES (1, double('infinity')); INSERT INTO test VALUES (2, double('infinity')); INSERT INTO test VALUES (3, double('inf')); INSERT INTO test VALUES (4, double('-inf')); INSERT INTO test VALUES (5, double('NaN')); INSERT INTO test VALUES (6, double('NaN')); INSERT INTO test VALUES (7, double('-infinity')); SELECT COUNT(*), c2 FROM test GROUP BY c2; +---------+---------+ | count(1)| c2| +---------+---------+ | 2| NaN| | 2|-Infinity| | 3| Infinity| +---------+---------+
【3】補足2:破損データ「columnNameOfCorruptRecord」
* 以下の関連記事を参照のこと。
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
【4】使用上の注意
環境によって、使えないプロパティもある模様。
https://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-spark-s3select.html
より抜粋 ~~~~~~ nanValue、positiveInf、negativeInf などの Spark CSV と JSON のオプションや 破損した記録に関連する オプション (failfast および dropmalformed モードなど) は サポートされません。 ~~~~~~
【5】サンプル
Hiveのファイルに合わせてみた。
https://wyukawa.hatenablog.com/entry/20110713/1310550302
例1:書き込み時
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql.types import FloatType spark_context = SparkContext() spark = SparkSession(spark_context) rdd = spark_context.parallelize([ (1, 'Mike', 32, 172.2, 'Sales'), (None, 'Tom', 20, 167.8, 'IT'), (3, None, 32, 201.1, 'Sales'), (4, 'Kevin', None, 157.5, 'Human resources'), (5, 'Bob', 30, float('nan'), None), (6, '', 20, float('+Infinity'), 'Banking'), (7, 'Carol', 30, float('-Infinity'), ''), ]) schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('height', FloatType(), True), StructField('job', StringType(), True), ]) data_frame = spark.createDataFrame(rdd, schema) data_frame.show() data_frame.repartition(1).write \ .mode('overwrite') \ .csv('./out', sep=",", header=True, nullValue=r"\N", emptyValue=r"\N") print("Done") spark.stop()
出力結果
+----+-----+----+---------+---------------+ | id| name| age| height| job| +----+-----+----+---------+---------------+ | 1| Mike| 32| 172.2| Sales| |null| Tom| 20| 167.8| IT| | 3| null| 32| 201.1| Sales| | 4|Kevin|null| 157.5|Human resources| | 5| Bob| 30| NaN| null| | 6| | 20| Infinity| Banking| | 7|Carol| 30|-Infinity| | +----+-----+----+---------+---------------+
出力結果(ファイル)
id,name,age,height,job 1,Mike,32,172.2,Sales \N,Tom,20,167.8,IT 3,\N,32,201.1,Sales 4,Kevin,\N,157.5,Human resources 5,Bob,30,NaN,\N 6,\N,20,Infinity,Banking 7,Carol,30,-Infinity,\N
例2:読み込み時
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv( "test.csv", sep=",", header=True, nullValue=r"\N", emptyValue=r"\N") data_frame.show() print("Done") spark.stop()
入力ファイル「test.csv」
id,name,age,job 1,Mike,32,Sales \N,Tom,20,IT 3,\N,32,Sales 4,Kevin,\N,Human resources 5,Bob,44,\N 6,\N,33,Banking 7,Carol,\N,\N
出力結果
+----+-----+----+---------------+ | id| name| age| job| +----+-----+----+---------------+ | 1| Mike| 32| Sales| |null| Tom| 20| IT| | 3| null| 32| Sales| | 4|Kevin|null|Human resources| | 5| Bob| 44| null| | 6| null| 33| Banking| | 7|Carol|null| null| +----+-----+----+---------------+
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415