【分散処理】PySpark ~ _corrupt_record ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/06/01/142457

の続き。

 PySpark で
 「columnNameOfCorruptRecord」とか「_corrupt_record」など
でてきたので、調べてみた。

目次

【1】Corrupt Record
 1)何ができる?
 2)使用用途
【2】使用方法
 1)手順
【3】サンプル
 例1:CSVファイル
【4】使用上の注意
 1)使用できるSparkバージョン
 2)使用可能なファイル種類およびモード
 3)cache() する必要がある
 4)データ型(特に数字)の指定に注意

【1】Corrupt Record

cf. corrupt = 堕落した、邪悪な、〔政治的に〕腐敗した
 =>(ここでの意味は)〔データが〕破損した

* 以下のサンプルにあるように想定したスキーマした定義と
 異なるものを見つけた場合に、
 その行の「_corrupt_record」に破損したデータを追加してくれる
 => 言葉よりも、後述の「サンプル」をみてもらった方が理解は早いと思う

1)何ができる?

* 破損したデータレコードについて、検知できる

2)使用用途

* データ検証および破損データの排除

【2】使用方法

https://medium.com/@sasidharan-r/how-to-handle-corrupt-or-bad-record-in-apache-spark-custom-logic-pyspark-aws-430ddec9bb41

が詳しい。

1)手順

[1] スキーマを定義し、その際「_corrupt_record」を追加

  schema = StructType([
    StructField("employee_id", LongType(), True),
    StructField("employee_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("_corrupt_record", StringType(), True) # ★注目★
  ])

[2] columnNameOfCorruptRecord=_corrupt_recordを指定する

  options = {
    "header": True,
    # [1] で指定したSchemaを設定
    "schema": schema,
    # ★注目
    "columnNameOfCorruptRecord": "_corrupt_record"
  }

  data_frame = spark.read.format("csv").load("employee.csv", **options)

【3】サンプル

例1:CSVファイル

from itertools import count
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import LongType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType

from pyspark.sql.functions import col


spark_context = SparkContext()
spark = SparkSession(spark_context)

try:
  schema = StructType([
    StructField("employee_id", LongType(), True),
    StructField("employee_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("_corrupt_record", StringType(), True)
  ])

  options = {
    "header": True,
    "schema": schema,
    "columnNameOfCorruptRecord": "_corrupt_record"
  }

  print("Start!")

  data_frame = spark.read.format("csv").load("employee.csv", **options)
  data_frame.show(truncate=False)

  print("For _corrupt_record")

  # !! 重要 !! cache() を使う
  #  (詳細は後述「3)cache() する必要がある」を参照)
  data_frame.cache()

  data_frame.select("_corrupt_record").show()
  # 全部省略せずに出力したい場合は、truncate=Falseを指定(cf. truncate = 切り捨てる)
  # https://dk521123.hatenablog.com/entry/2021/04/26/161342  より
  # data_frame.select("_corrupt_record").show(n=100, truncate=False, vertical=True)

  count = data_frame.filter(data_frame._corrupt_record.isNotNull()).count()
  print(f"Count = {count}")

  after_cleaning_df = data_frame.where(col("_corrupt_record").isNull()).drop("_corrupt_record")
  print("After cleaning")
  after_cleaning_df.show(truncate=False)

  print("Done...")
except AnalysisException as ex:
  print("AnalysisException = " + str(ex))
  raise ex
except Exception as ex:
  print("Exception = " + str(ex))
  raise ex

CSVデータ - employee.csv(「10004」がカンマ一個多い)

employee_id,employee_name,age,country
10000,Mike,23,USA
10001,Tom,33,UK
10002,Lee,43,HongKong
10003,Ken,53,Australia
10004,Alen,63,India,Mumbai
10005,Sam,63,China
10006,Smith,63,Japan

出力結果

Start!
+-----------+-------------+---+---------+--------------------------+
|employee_id|employee_name|age|country  |_corrupt_record           |
+-----------+-------------+---+---------+--------------------------+
|10000      |Mike         |23 |USA      |null                      |
|10001      |Tom          |33 |UK       |null                      |
|10002      |Lee          |43 |HongKong |null                      |
|10003      |Ken          |53 |Australia|null                      |
|10004      |Alen         |63 |India    |10004,Alen,63,India,Mumbai|
|10005      |Sam          |63 |China    |null                      |
|10006      |Smith        |63 |Japan    |null                      |
+-----------+-------------+---+---------+--------------------------+

For _corrupt_record
+--------------------+
|     _corrupt_record|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|10004,Alen,63,Ind...|
|                null|
|                null|
+--------------------+

Count = 1
After cleaning
+-----------+-------------+---+---------+
|employee_id|employee_name|age|country  |
+-----------+-------------+---+---------+
|10000      |Mike         |23 |USA      |
|10001      |Tom          |33 |UK       |
|10002      |Lee          |43 |HongKong |
|10003      |Ken          |53 |Australia|
|10005      |Sam          |63 |China    |
|10006      |Smith        |63 |Japan    |
+-----------+-------------+---+---------+

Done...

【4】使用上の注意

http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html
https://spark.apache.org/docs/3.0.0-preview/api/java/org/apache/spark/sql/streaming/DataStreamReader.html

に記載されている事項とハマった事項をまとめておく。

1)使用できるSparkバージョン

* Spark 2.3以降である必要ある
 => 以下を参照

http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html

2)使用可能なファイル種類およびモード

* 使用可能なファイル種類:CSV/JSON
* 使用可能なモード:mode = "PERMISSIVE" で使用可能
 => デフォルトが「PERMISSIVE」なんであまり意識しないかもしれないが

3)cache() する必要がある

http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html

パースされた結果をキャッシュあるいは保存する必要がある

サンプルより抜粋

  # !! 重要 !! cache() を使う
  data_frame.cache()

4)データ型(特に数字)の指定に注意

* 指定したデータ型も動作に影響して、
 特に、数字(整数or実数)で動きが変わってしまうので注意
 => この調査で、半日つぶした、、、
 => 整数なら「LongType」or「IntegerType」。実数なら「FloatType」を明示的に指定
  (どっちも来る可能性があるなら「FloatType」にしていれば、受け入れは可能)

実験

* 上記のサンプルで使用したデータ「employee.csv」の一部を
 以下のように変更して実行してみる

【修正後】(数字「63」⇒「63.0」に変更)

10006,Smith,63.0,Japan

【実行結果(一部抜粋)】

For _corrupt_record
+--------------------+
|     _corrupt_record|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|10004,Alen,63,Ind...|
|                null|
|10006,Smith,63.0,...|
+--------------------+

Count = 2
After cleaning
+-----------+-------------+---+---------+
|employee_id|employee_name|age|country  |
+-----------+-------------+---+---------+
|10000      |Mike         |23 |USA      |
|10001      |Tom          |33 |UK       |
|10002      |Lee          |43 |HongKong |
|10003      |Ken          |53 |Australia|
|10005      |Sam          |63 |China    |
+-----------+-------------+---+---------+

Done...

関連記事

PySpark ~ 環境設定 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / show() ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404