【分散処理】PySpark ~ CSV / MultiLine対応 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/07/30/195226

の続き。

 今回は、PySparkにおいて、
CSVなどで改行が入った時の複数行(MultiLine)の対応について
触れたので、メモしておく

目次

【1】対応方法
【2】サンプル
 例1:spark.read.csv()
 例2:spark.read.format("csv").load()
【3】API仕様

【1】対応方法

* 「multiLine=True」を指定する

【2】サンプル

例1:spark.read.csv()

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException


spark_context = SparkContext()
spark = SparkSession(spark_context)

try:
  print("multiLine=True")
  data_frame = spark.read.csv("demo_multi.csv", header=True, multiLine=True)
  data_frame.show()

  print("*********")
  print("multiLine指定なし")
  data_frame = spark.read.csv("demo_multi.csv", header=True)
  data_frame.show()
except AnalysisException as ex:
  print("AnalysisException = " + str(ex))
  raise ex
except Exception as ex:
  print("Exception = " + str(ex))
  raise ex

テストデータ「demo_multi.csv

id,name,remarks,date
1,Mike,"Multi
Line1",2021-02-04
2,Tom,"Multi
Line2",2021-02-04
3,Smith,NonMulti,2021-02-04

出力結果

multiLine=True
+---+-----+-----------+----------+
| id| name|    remarks|      date|
+---+-----+-----------+----------+
|  1| Mike|Multi
Line1|2021-02-04|
|  2|  Tom|Multi
Line2|2021-02-04|
|  3|Smith|   NonMulti|2021-02-04|
+---+-----+-----------+----------+

*********
multiLine指定なし
+------+----------+--------+----------+
|    id|      name| remarks|      date|
+------+----------+--------+----------+
|     1|      Mike|   Multi|      null|
|Line1"|2021-02-04|    null|      null|
|     2|       Tom|   Multi|      null|
|Line2"|2021-02-04|    null|      null|
|     3|     Smith|NonMulti|2021-02-04|
+------+----------+--------+----------+

例2:spark.read.format("csv").load()

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException


spark_context = SparkContext()
spark = SparkSession(spark_context)

try:
  options = {"header": True, "multiLine": True}

  print("multiLine=False")
  data_frame = spark.read.format("csv").load("demo_multi.csv", **options)
  data_frame.show()
except AnalysisException as ex:
  print("AnalysisException = " + str(ex))
  raise ex
except Exception as ex:
  print("Exception = " + str(ex))
  raise ex

【3】API仕様

https://spark.apache.org/docs/latest/sql-data-sources-csv.html

Property Name Default Meaning Scope
multiLine false Parse one record, which may span multiple lines, per file. read

参考文献

https://qiita.com/kazz_ogawa/items/92e21a825e9ca2413f99
https://sparkbyexamples.com/spark/spark-read-multiline-multiple-line-csv-file/

関連記事

PySpark ~ 環境設定 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404