【分散処理】PySpark ~ CSV / escape ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/07/30/195226
https://dk521123.hatenablog.com/entry/2020/07/09/000832

の続き。

PandasでCSVを正しくパースされていたファイルが
PySparkで表示しようとしたらデータが崩れてしまったので
調べてみたら、escapeが関連してそうなのでメモ。

目次

【1】今回のケース
【2】今回の解決案
【3】[おまけ] ファイルの書き出しについて
【4】[おまけ] textFile() でファイルを読み込んだ場合

【1】今回のケース

* 以下 「入力ファイル:hello_world.csv」 において、、、

【id=3】

 A) PySpark (1)PySpark の場合(NGケース))では
 「"J\"」でエスケープしてしまい、うまくデータが取れない
 B) Pandas (2)Pandas の場合(OKケース))では、問題なし

【id=4】

 A) PySpark (1)PySpark の場合(NGケース))では
 「"""Dublin,Ireland"」でカンマを区切り文字として認識してしまい、
 データがぐれてしまう
 B) Pandas (2)Pandas の場合(OKケース))では、問題なし

入力ファイル:hello_world.csv

id,last_name,first_name,city,birth_date,year,month,day
"1","Bloggs","Joe","Dublin","2020-12-22","2020","11","11"
"2","Joseph","Smith","Salt Lake City","1982-09-11","2020","11","11"
"3","J\","Mike","Tokyo","1991-01-01","2020","11","11"
"4","Kennedy","John","""Dublin,Ireland","1991-01-01","2020","11","11"
"5","Twain","Kevin","Osaka","1991-01-01","2020","11","11"

1)PySpark の場合(NGケース)

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv(
    "hello_world.csv", header=True)
  data_frame.show(truncate=False)

if __name__ == '__main__':
  main()

出力結果

+----------+----------+----------+--------------+----------+----------+-----+----+
|id        |last_name |first_name|city          |birth_date|year      |month|day |
+----------+----------+----------+--------------+----------+----------+-----+----+
|1         |Bloggs    |Joe       |Dublin        |2020-12-22|2020      |11   |11  |
|2         |Joseph    |Smith     |Salt Lake City|1982-09-11|2020      |11   |11  |
|3         |"J","Mike"|Tokyo     |1991-01-01    |2020      |11        |11   |null|<< ★うまくとれてない★
|4         |Kennedy   |John      |"""Dublin     |Ireland"  |1991-01-01|2020 |11  |<< ★うまくとれてない★
|5         |Twain     |Kevin     |Osaka         |null      |null      |null |null|
|1991-01-01|2020      |11        |11            |null      |null      |null |null|
+----------+----------+----------+--------------+----------+----------+-----+----+

2)Pandas の場合(OKケース)

import pandas as pd

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

data_frame = pd.read_csv('hello_world.csv', encoding='UTF-8')
print(data_frame)

# おまけ:ファイル出力したらどうなるか
data_frame.to_csv(
  "out_pd.csv", encoding='UTF8', header=True, index=False, sep=",")

出力結果

   id last_name first_name             city  birth_date  year  month  day
0   1    Bloggs        Joe           Dublin  2020-12-22  2020     11   11
1   2    Joseph      Smith   Salt Lake City  1982-09-11  2020     11   11
2   3        J\       Mike            Tokyo  1991-01-01  2020     11   11
3   4   Kennedy       John  "Dublin,Ireland  1991-01-01  2020     11   11
4   5     Twain      Kevin            Osaka  1991-01-01  2020     11   11

【2】今回の解決案

* spark.read.csv() の escape を明示的に指定する
 => 指定しない or escape=None の場合、「\」でエスケープされる。
 => 今回の場合、「escape="\""」を明示的に指定して対応。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html

サンプル:PySpark (OKケース)

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv(
    # 「, escape="\""」を追加
    "hello_world.csv", header=True, escape="\"")
  data_frame.show(truncate=False)

  # おまけ:ファイル出力したらどうなるか
  data_frame.write.mode('overwrite').csv(
    "./out",
    header=True,
    quoteAll=True)

if __name__ == '__main__':
  main()

出力結果

+---+---------+----------+---------------+----------+----+-----+---+
|id |last_name|first_name|city           |birth_date|year|month|day|
+---+---------+----------+---------------+----------+----+-----+---+
|1  |Bloggs   |Joe       |Dublin         |2020-12-22|2020|11   |11 |
|2  |Joseph   |Smith     |Salt Lake City |1982-09-11|2020|11   |11 |
|3  |J\       |Mike      |Tokyo          |1991-01-01|2020|11   |11 |
|4  |Kennedy  |John      |"Dublin,Ireland|1991-01-01|2020|11   |11 |
|5  |Twain    |Kevin     |Osaka          |1991-01-01|2020|11   |11 |
+---+---------+----------+---------------+----------+----+-----+---+

おまけ:ファイル出力結果(★詳細は「【3】ファイルの書き出しについて」)

"id","last_name","first_name","city","birth_date","year","month","day"
"1","Bloggs","Joe","Dublin","2020-12-22","2020","11","11"
"2","Joseph","Smith","Salt Lake City","1982-09-11","2020","11","11"
"3","J\\","Mike","Tokyo","1991-01-01","2020","11","11"
"4","Kennedy","John","\"Dublin,Ireland","1991-01-01","2020","11","11"
"5","Twain","Kevin","Osaka","1991-01-01","2020","11","11"

【3】[おまけ] ファイルの書き出しについて

「おまけ:ファイル出力結果」で、囲み文字を付けた場合
「J\\」となっている通り、デフォルト「\」でエスケープされる

調査結果

 以下の「実験コード」で分かったが、
「quoteAll=True」指定の有無ではなく、囲み文字の有無でエスケープされる
 => 「quoteAll=True」すれば一律エスケープされる
 => ただ、指定してなくても囲み文字で囲まれていればエスケープされる

なぜ、囲み文字された場合、エスケープされるのか?
https://dk521123.hatenablog.com/entry/2020/07/30/195226

で紹介した escapeQuotes(default:True)によるもの。
~~~~~~~~~~~~~
escapeQuotes
 ... もし囲み文字があった場合、エスケープするかどうか(default:True)
~~~~~~~~~~~~~

実験コード

hello_world.csv (last_name/first_nameに囲み文字なし)

id,last_name,first_name,city,birth_date,year,month,day
"1",Bloggs,Joe,"Dublin","2020-12-22","2020","11","11"
"2",Joseph,Smith,"Salt Lake City","1982-09-11","2020","11","11"
"3",J\,Mike,"Tokyo","1991-01-01","2020","11","11"
"4",Kennedy,John,"Dublin,Ireland","1991-01-01","2020","11","11"
"5",Twain,Kevin,"Osaka","1991-01-01","2020","11","11"
"6",A-///\\\///\\\,Smith,"Osaka, \Japan","1991-01-01","2020","11","11"

demo.py

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

data_frame = spark.read.csv(
  "hello_world.csv", header=True, escape="\"")
data_frame.show(truncate=False)

# 「quoteAll=True」なし
data_frame.write.mode('overwrite').csv(
  "./out",
  header=True)
# data_frame.write.mode('overwrite').csv(
#   "./out",
#   header=True,
#   quoteAll=True)

出力結果

+---+--------------+----------+--------------+----------+----+-----+---+
|id |last_name     |first_name|city          |birth_date|year|month|day|
+---+--------------+----------+--------------+----------+----+-----+---+
|1  |Bloggs        |Joe       |Dublin        |2020-12-22|2020|11   |11 |
|2  |Joseph        |Smith     |Salt Lake City|1982-09-11|2020|11   |11 |
|3  |J\            |Mike      |Tokyo         |1991-01-01|2020|11   |11 |
|4  |Kennedy       |John      |Dublin,Ireland|1991-01-01|2020|11   |11 |
|5  |Twain         |Kevin     |Osaka         |1991-01-01|2020|11   |11 |
|6  |A-///\\\///\\\|Smith     |Osaka, \Japan |1991-01-01|2020|11   |11 |
+---+--------------+----------+--------------+----------+----+-----+---+

ファイル出力結果 (「Osaka, \Japan」に注目。囲み文字があればエスケープされる)

id,last_name,first_name,city,birth_date,year,month,day
1,Bloggs,Joe,Dublin,2020-12-22,2020,11,11
2,Joseph,Smith,Salt Lake City,1982-09-11,2020,11,11
3,J\,Mike,Tokyo,1991-01-01,2020,11,11
4,Kennedy,John,"Dublin,Ireland",1991-01-01,2020,11,11
5,Twain,Kevin,Osaka,1991-01-01,2020,11,11
6,A-///\\\///\\\,Smith,"Osaka, \\Japan",1991-01-01,2020,11,11

[比較用] 「quoteAll=True」した場合のファイル出力結果

"id","last_name","first_name","city","birth_date","year","month","day"
"1","Bloggs","Joe","Dublin","2020-12-22","2020","11","11"
"2","Joseph","Smith","Salt Lake City","1982-09-11","2020","11","11"
"3","J\\","Mike","Tokyo","1991-01-01","2020","11","11"
"4","Kennedy","John","Dublin,Ireland","1991-01-01","2020","11","11"
"5","Twain","Kevin","Osaka","1991-01-01","2020","11","11"
"6","A-///\\\\\\///\\\\\\","Smith","Osaka, \\Japan","1991-01-01","2020","11","11"

【4】[おまけ] textFile() でファイルを読み込んだ場合

https://dk521123.hatenablog.com/entry/2021/04/06/001709

の「1)RDD => DataFrame」の「その2:spark.read.csv()」を使用する

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.textFile("hello_world.csv")

# データのクリーニングや整合性を確かめる処理(省略)

data_frame = spark.read \
  .option("header", "True") \
  .format("csv") \
  .csv(rdd, sep=",", escape="\"")

data_frame.show(truncate=False)

# おまけ:ファイル出力したらどうなるか
data_frame.write.mode('overwrite').csv(
  "./out",
  header=True,
  quoteAll=True)

関連記事

PySpark ~ 環境設定 + Hello World / Windows編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
Pandas ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/22/014957