【分散処理】PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/24/225534

で、PySpark で、CSVファイルを扱った。

ただ、以下のサイト「Spark 2.0 Scala - Read csv files with escaped delimiters」

https://stackoverrun.com/ja/q/10868798

のように、項目「City」に
エスケープ付きのカンマ区切りのデータ(e.g. Dublin\,Ireland)が
入ったデータを以下の【期待動作】のようにする方法を考える

【ファイル】

Joe Bloggs,Dublin\,Ireland
Joseph Smith,Salt Lake City\,

【期待動作】

  Name           |       City
-----------------|---------------
Joe Bloggs       | Dublin,Ireland
Joseph Smith     | Salt Lake City,

目次

【1】案1:別の文字列に置き換えてパースしてから元に戻す
 サンプル
 補足:textFile() について
【2】案2:一旦取り込んでから 不要な項目はdropする
 サンプル

【1】案1:別の文字列に置き換えてパースしてから元に戻す

以下の「参考になるサイト」のように、
一旦、「\,」を別の文字列に置き換えて
パースしてから、元に戻す

参考になるサイト
https://stackoverflow.com/questions/59831393/pyspark-escape-backslash-and-delimiter-when-reading-csv
https://stackoverflow.com/questions/53672800/spark-to-parse-backslash-escaped-comma-in-csv-files-that-are-not-enclosed-by-quo

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import StringType, StructField, StructType

DELIMITER = ","
DELIMITER_WITH_ESCAPE = "\\" + DELIMITER
REPLACED_CHAR = "@@@DELIMITER@@@"

spark_context = SparkContext()
spark = SparkSession(spark_context)

# CSV をテキストとして読み込む
rdd = spark_context.textFile("./hello.csv")
# エスケープされた区切り文字「\,」を一旦別の文字に置き換える
rdd = rdd.map(
  lambda line: line.replace(
    DELIMITER_WITH_ESCAPE, REPLACED_CHAR).split(DELIMITER))

headers = rdd.first()
# ヘッダーを一旦消す
rdd = rdd.filter(lambda row: row != headers)

columns = [StructField(item, StringType(), True) \
  for item in headers]
schema = StructType(columns)
data_frame = spark.createDataFrame(rdd, schema)

# 全項目に対して、エスケープされた区切り文字「\,」を元に戻す
for field in schema:
  data_frame = data_frame.withColumn(
    field.name, regexp_replace(
      field.name, REPLACED_CHAR, DELIMITER_WITH_ESCAPE))
data_frame.show()

# CSVに囲み文字付き(quoteAll=True)で保存しておく
data_frame.write.mode('overwrite').csv(
  "./out",
  header=True,
  quoteAll=True)

入力ファイル

id,name,city,birth_date
1,Joe Bloggs,Dublin\,Ireland,2020-12-22
2,Joseph Smith,Salt Lake City\,,1982-09-11
3,Mike,hello\,world\,Mike!,1991-01-01

出力結果

+---+------------+-----------------+----------+
| id|        name|             city|birth_date|
+---+------------+-----------------+----------+
|  1|  Joe Bloggs|   Dublin,Ireland|2020-12-22|
|  2|Joseph Smith|  Salt Lake City,|1982-09-11|
|  3|        Mike|hello,world,Mike!|1991-01-01|
+---+------------+-----------------+----------+

補足:textFile() について

* ファイルの各行をそのまま取り込み、
 RDD(Resilient Distributed Dataset)として返却してくれる
 => RDD については、以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2021/04/04/111057
API仕様
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.SparkContext.textFile.html
参考文献
https://dev.classmethod.jp/articles/pyspark-read-csv-recursive/

【2】案2:一旦取り込んでから 不要な項目はdropする

https://stackoverflow.com/questions/54900213/escape-comma-inside-a-csv-file-using-spark-shell

に別の実装があったので、PySparkで書き直してみる。
(これは、エスケープされていない)

【データ】

s.no,name,Country
101,xyz,India,IN
102,abc,UnitedStates,US

【サンプル】

df = spark.read
  .option("header", true)
  .schema(schema)
  .csv("data.csv")
  .withColumn("Country" ,
    concat ($"country", lit(", "), $"country1"))
  .drop("country1")

df.show(false)

欠点

ただし、この方法は、カンマの数が決まっていないとできない

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import concat, col, lit

spark_context = SparkContext()
spark = SparkSession(spark_context)
header = ["s.no", "name", "Country", "Dummy"]

columns = [StructField(item, StringType(), True) for item in header]
schema = StructType(columns)
data_frame = spark.read \
  .option("header", True) \
  .schema(schema) \
  .csv("./info.csv") \
  .withColumn("Country",
    concat(col("Country"), lit(", "), col("Dummy"))) \
  .drop("Dummy")
data_frame.show()

入力ファイル

s.no,name,Country
101,xyz,India,IN
102,abc,UnitedStates,US

出力結果

+----+----+----------------+
|s.no|name|         Country|
+----+----+----------------+
| 101| xyz|       India, IN|
| 102| abc|UnitedStates, US|
+----+----+----------------+

関連記事

PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404
Python ~ 基本編 / 辞書 ~
https://dk521123.hatenablog.com/entry/2019/10/27/100014