■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/24/225534
で、PySpark で、CSVファイルを扱った。 ただ、以下のサイト「Spark 2.0 Scala - Read csv files with escaped delimiters」
https://stackoverrun.com/ja/q/10868798
のように、項目「City」に エスケープ付きのカンマ区切りのデータ(e.g. Dublin\,Ireland)が 入ったデータを以下の【期待動作】のようにする方法を考える
【ファイル】
Joe Bloggs,Dublin\,Ireland Joseph Smith,Salt Lake City\,
【期待動作】
Name | City -----------------|--------------- Joe Bloggs | Dublin,Ireland Joseph Smith | Salt Lake City,
目次
【1】案1:別の文字列に置き換えてパースしてから元に戻す サンプル 補足:textFile() について 【2】案2:一旦取り込んでから 不要な項目はdropする サンプル
【1】案1:別の文字列に置き換えてパースしてから元に戻す
以下の「参考になるサイト」のように、 一旦、「\,」を別の文字列に置き換えて パースしてから、元に戻す
参考になるサイト
https://stackoverflow.com/questions/59831393/pyspark-escape-backslash-and-delimiter-when-reading-csv
https://stackoverflow.com/questions/53672800/spark-to-parse-backslash-escaped-comma-in-csv-files-that-are-not-enclosed-by-quo
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.functions import regexp_replace from pyspark.sql.types import StringType, StructField, StructType DELIMITER = "," DELIMITER_WITH_ESCAPE = "\\" + DELIMITER REPLACED_CHAR = "@@@DELIMITER@@@" spark_context = SparkContext() spark = SparkSession(spark_context) # CSV をテキストとして読み込む rdd = spark_context.textFile("./hello.csv") # エスケープされた区切り文字「\,」を一旦別の文字に置き換える rdd = rdd.map( lambda line: line.replace( DELIMITER_WITH_ESCAPE, REPLACED_CHAR).split(DELIMITER)) headers = rdd.first() # ヘッダーを一旦消す rdd = rdd.filter(lambda row: row != headers) columns = [StructField(item, StringType(), True) \ for item in headers] schema = StructType(columns) data_frame = spark.createDataFrame(rdd, schema) # 全項目に対して、エスケープされた区切り文字「\,」を元に戻す for field in schema: data_frame = data_frame.withColumn( field.name, regexp_replace( field.name, REPLACED_CHAR, DELIMITER_WITH_ESCAPE)) data_frame.show() # CSVに囲み文字付き(quoteAll=True)で保存しておく data_frame.write.mode('overwrite').csv( "./out", header=True, quoteAll=True)
入力ファイル
id,name,city,birth_date 1,Joe Bloggs,Dublin\,Ireland,2020-12-22 2,Joseph Smith,Salt Lake City\,,1982-09-11 3,Mike,hello\,world\,Mike!,1991-01-01
出力結果
+---+------------+-----------------+----------+ | id| name| city|birth_date| +---+------------+-----------------+----------+ | 1| Joe Bloggs| Dublin,Ireland|2020-12-22| | 2|Joseph Smith| Salt Lake City,|1982-09-11| | 3| Mike|hello,world,Mike!|1991-01-01| +---+------------+-----------------+----------+
補足:textFile() について
* ファイルの各行をそのまま取り込み、 RDD(Resilient Distributed Dataset)として返却してくれる => RDD については、以下の関連記事を参照のこと
https://dk521123.hatenablog.com/entry/2021/04/04/111057
API仕様
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.SparkContext.textFile.html
参考文献
https://dev.classmethod.jp/articles/pyspark-read-csv-recursive/
【2】案2:一旦取り込んでから 不要な項目はdropする
https://stackoverflow.com/questions/54900213/escape-comma-inside-a-csv-file-using-spark-shell
に別の実装があったので、PySparkで書き直してみる。 (これは、エスケープされていない)
【データ】
s.no,name,Country 101,xyz,India,IN 102,abc,UnitedStates,US
【サンプル】
df = spark.read .option("header", true) .schema(schema) .csv("data.csv") .withColumn("Country" , concat ($"country", lit(", "), $"country1")) .drop("country1") df.show(false)
欠点
ただし、この方法は、カンマの数が決まっていないとできない
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructField, StructType from pyspark.sql.functions import concat, col, lit spark_context = SparkContext() spark = SparkSession(spark_context) header = ["s.no", "name", "Country", "Dummy"] columns = [StructField(item, StringType(), True) for item in header] schema = StructType(columns) data_frame = spark.read \ .option("header", True) \ .schema(schema) \ .csv("./info.csv") \ .withColumn("Country", concat(col("Country"), lit(", "), col("Dummy"))) \ .drop("Dummy") data_frame.show()
入力ファイル
s.no,name,Country 101,xyz,India,IN 102,abc,UnitedStates,US
出力結果
+----+----+----------------+ |s.no|name| Country| +----+----+----------------+ | 101| xyz| India, IN| | 102| abc|UnitedStates, US| +----+----+----------------+
関連記事
PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404
Python ~ 基本編 / 辞書 ~
https://dk521123.hatenablog.com/entry/2019/10/27/100014