【分散処理】PySpark ~ CSV / White Spaceの扱い ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/07/09/000832
https://dk521123.hatenablog.com/entry/2020/11/23/224349
https://dk521123.hatenablog.com/entry/2020/07/30/195226

の続き。

 PySpark で、CSVファイルとして書き込む際に
White Space (ホワイトスペース; 半角SP、タブ etc) が
勝手に削除されてしまう現象に見舞われたので
White Spaceの扱いについて、調べてみた

目次

【1】ホワイトスペースに関するプロパティ
 1)書き込み - write
  a)ignoreLeadingWhiteSpace
  b)ignoreTrailingWhiteSpace
 2)読み込み - read
  a)ignoreLeadingWhiteSpace
  b)ignoreTrailingWhiteSpace

【2】サンプル
 例1:書き込み時の挙動
 例2:読み込み時の挙動

【1】ホワイトスペースに関するプロパティ

ホワイトスペースに関して read/write で、
以下の2つのプロパティが用意されている。
~~~~~~~~~~
a)ignoreLeadingWhiteSpace
b)ignoreTrailingWhiteSpace
~~~~~~~~~~

使用上の注意

[1] 「1)書き込み - write」と「2)読み込み - read」で
  デフォルト値が異なることに注意
[2] 個人的には、プロパティ名と挙動が分かりづらい
[3] 読み込み時は、囲み文字で囲まれている値の前後のホワイトスペースは
 True/Falseに関わらず維持される(トリムされずにそのまま)

1)書き込み - write

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html

a)ignoreLeadingWhiteSpace

* 前の White Space を書き込むかどうか(default : True)
 => True : 前の White Space を無視して、そのまま書き込む
 => False : 前の White Space をトリムする

API仕様より抜粋
ignoreLeadingWhiteSpacestr or bool, optional
a flag indicating whether or not leading whitespaces
 from values being written should be skipped.
If None is set, it uses the default value, true.

=> このプロパティ名だったら
 True にしたら、white spaceを無視して書き込みにいく
 って意味に見える
(実際には、FalseでWhite Spaceが書き込まれる)

b)ignoreTrailingWhiteSpace

* 後ろの White Space を書き込むかどうか(default : True)
 => True : 後ろの White Space を無視して、そのまま書き込む
 => False : 後ろの White Space をトリムする

ignoreTrailingWhiteSpacestr or bool, optional
a flag indicating whether or not trailing whitespaces
 from values being written should be skipped.
If None is set, it uses the default value, true.

2)読み込み - read

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html

a)ignoreLeadingWhiteSpace

* 前の White Space を読み込むかどうか(default : False)
 => True : 前の White Space をトリムして読み込む(White Spaceを削除)
 => False : 前の White Space をトリムせずにそのまま読み込む

API仕様より抜粋
ignoreLeadingWhiteSpacestr or bool, optional
A flag indicating whether or not leading whitespaces
 from values being read should be skipped.
If None is set, it uses the default value, false.

b)ignoreTrailingWhiteSpace

* 後ろの White Space を読み込むかどうか(default : False)
 => True : 後ろの White Space をトリムして読み込む(White Spaceを削除)
 => False : 後ろの White Space をトリムせずにそのまま読み込む

API仕様より抜粋
ignoreTrailingWhiteSpacestr or bool, optional
A flag indicating whether or not trailing whitespaces
 from values being read should be skipped.
If None is set, it uses the default value, false.

【2】サンプル

例1:書き込み時の挙動
例2:読み込み時の挙動

例1:書き込み時の挙動

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType


spark_context = SparkContext()
spark = SparkSession(spark_context)

# 前後に半角SPを入れてある
rdd = spark_context.parallelize([
  (' x0001 ', '  Mike  ', '  Sales  '),
  (' x0002 ', '  Tom  ', '  IT  '),
  (' x0003 ', '  Sam  ', '  Sales  '),
  (' x0004 ', '  Kevin  ', '  Human resources  '),
  (' x0005 ', '  Bob  ', '  IT  '),
  (' x0006 ', '  Alice  ', '  Banking  '),
  (' x0007 ', '  Carol  ', '  IT  '),
])
schema = StructType([
  StructField('id', StringType(), False),
  StructField('name', StringType(), False),
  StructField('job', StringType(), False),
])

data_frame = spark.createDataFrame(rdd, schema)

# まず表示させてみる
data_frame.show()

# 実験1:Falseを指定 (White Spaceをそのまま出力)
data_frame.write \
  .mode('overwrite') \
  .csv('./out_false',
    sep=",",
    header=True,
    ignoreLeadingWhiteSpace=False,
    ignoreTrailingWhiteSpace=False)

# 実験2:Trueを指定 (White Spaceをトリムする)
data_frame.write \
  .mode('overwrite') \
  .csv('./out_true',
    sep=",",
    header=True,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True)

# 実験3:指定しない (White Spaceをトリムする)
data_frame.write \
  .mode('overwrite') \
  .csv('./out_none',
    sep=",",
    header=True)

出力結果

+-------+---------+-------------------+
|     id|     name|                job|
+-------+---------+-------------------+
| x0001 |   Mike  |            Sales  |
| x0002 |    Tom  |               IT  |
| x0003 |    Sam  |            Sales  |
| x0004 |  Kevin  |  Human resources  |
| x0005 |    Bob  |               IT  |
| x0006 |  Alice  |          Banking  |
| x0007 |  Carol  |               IT  |
+-------+---------+-------------------+

実験1:出力結果ファイル(Falseの場合:前後がトリムされていない)

id,name,job
 x0001 ,  Mike  ,  Sales  

実験2:出力結果ファイル(Trueの場合:前後がトリムされている)

id,name,job
x0001,Mike,Sales

実験3:出力結果ファイル(指定しない場合:前後がトリムされている)

id,name,job
x0001,Mike,Sales

例2:読み込み時の挙動

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

print("実験1:Falseを指定")
data_frame = spark.read.csv(
  "input.csv",
  header=True,
  ignoreLeadingWhiteSpace=False,
  ignoreTrailingWhiteSpace=False)
data_frame.show(truncate=False, vertical=True)

print("実験2:Trueを指定")
data_frame = spark.read.csv(
  "input.csv",
  header=True,
  ignoreLeadingWhiteSpace=True,
  ignoreTrailingWhiteSpace=True)
data_frame.show(truncate=False, vertical=True)

print("実験3:指定しない")
data_frame = spark.read.csv(
  "input.csv",
  header=True)
data_frame.show(truncate=False, vertical=True)

入力ファイル「input.csv

    id,name,job
x0001 ,  Mike  ,  Sales  
  x0002   ,   Tom  ,   IT  
   x0003 ,  Sam  , Sales  
" x0004 ","    Kevin    ","   Human resources  "
"x0005",  "Mike"  ,  "IT"  
  "   x0006   "  ,  "  Alice   "," Banking    "
"   x0007" ,"     Carol    "," IT  "    

出力結果

実験1:Falseを指定
-RECORD 0----------------------
     id | x0001
 name   |   Mike
 job    |   Sales
-RECORD 1----------------------
     id |   x0002
 name   |    Tom
 job    |    IT
-RECORD 2----------------------
     id |    x0003
 name   |   Sam
 job    |  Sales
-RECORD 3----------------------
     id |  x0004
 name   |     Kevin
 job    |    Human resources
-RECORD 4----------------------
     id | x0005
 name   |   "Mike"
 job    |   "IT"
-RECORD 5----------------------
     id |   "   x0006   "
 name   |   "  Alice   "
 job    |  Banking
-RECORD 6----------------------
     id |    x0007
 name   |      Carol
 job    |  IT

実験2:Trueを指定
-RECORD 0--------------------
 id   | x0001
 name | Mike
 job  | Sales
-RECORD 1--------------------
 id   | x0002
 name | Tom
 job  | IT
-RECORD 2--------------------
 id   | x0003
 name | Sam
 job  | Sales
-RECORD 3--------------------
 id   |  x0004
 name |     Kevin
 job  |    Human resources
-RECORD 4--------------------
 id   | x0005
 name | Mike
 job  | IT
-RECORD 5--------------------
 id   |    x0006
 name |   Alice
 job  |  Banking
-RECORD 6--------------------
 id   |    x0007
 name |      Carol
 job  |  IT

実験3:指定しない
-RECORD 0----------------------
     id | x0001
 name   |   Mike
 job    |   Sales
-RECORD 1----------------------
     id |   x0002
 name   |    Tom
 job    |    IT
-RECORD 2----------------------
     id |    x0003
 name   |   Sam
 job    |  Sales
-RECORD 3----------------------
     id |  x0004
 name   |     Kevin
 job    |    Human resources
-RECORD 4----------------------
     id | x0005
 name   |   "Mike"
 job    |   "IT"
-RECORD 5----------------------
     id |   "   x0006   "
 name   |   "  Alice   "
 job    |  Banking
-RECORD 6----------------------
     id |    x0007
 name   |      Carol
 job    |  IT

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark で 出力ファイル名を変更する
https://dk521123.hatenablog.com/entry/2021/05/12/003047
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457