■ はじめに
https://dk521123.hatenablog.com/entry/2021/06/01/142457
の続き。 PySpark で 「columnNameOfCorruptRecord」とか「_corrupt_record」など でてきたので、調べてみた。
目次
【1】Corrupt Record 1)何ができる? 2)使用用途 【2】使用方法 1)手順 【3】サンプル 例1:CSVファイル 【4】使用上の注意 1)使用できるSparkバージョン 2)使用可能なファイル種類およびモード 3)cache() する必要がある 4)データ型(特に数字)の指定に注意
【1】Corrupt Record
cf. corrupt = 堕落した、邪悪な、〔政治的に〕腐敗した =>(ここでの意味は)〔データが〕破損した * 以下のサンプルにあるように想定したスキーマした定義と 異なるものを見つけた場合に、 その行の「_corrupt_record」に破損したデータを追加してくれる => 言葉よりも、後述の「サンプル」をみてもらった方が理解は早いと思う
1)何ができる?
* 破損したデータレコードについて、検知できる
2)使用用途
* データ検証および破損データの排除
【2】使用方法
が詳しい。
1)手順
[1] スキーマを定義し、その際「_corrupt_record」を追加
schema = StructType([ StructField("employee_id", LongType(), True), StructField("employee_name", StringType(), True), StructField("age", IntegerType(), True), StructField("country", StringType(), True), StructField("_corrupt_record", StringType(), True) # ★注目★ ])
[2] columnNameOfCorruptRecord=_corrupt_recordを指定する
options = { "header": True, # [1] で指定したSchemaを設定 "schema": schema, # ★注目 "columnNameOfCorruptRecord": "_corrupt_record" } data_frame = spark.read.format("csv").load("employee.csv", **options)
【3】サンプル
例1:CSVファイル
from itertools import count from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.utils import AnalysisException from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import LongType from pyspark.sql.types import IntegerType from pyspark.sql.types import StringType from pyspark.sql.functions import col spark_context = SparkContext() spark = SparkSession(spark_context) try: schema = StructType([ StructField("employee_id", LongType(), True), StructField("employee_name", StringType(), True), StructField("age", IntegerType(), True), StructField("country", StringType(), True), StructField("_corrupt_record", StringType(), True) ]) options = { "header": True, "schema": schema, "columnNameOfCorruptRecord": "_corrupt_record" } print("Start!") data_frame = spark.read.format("csv").load("employee.csv", **options) data_frame.show(truncate=False) print("For _corrupt_record") # !! 重要 !! cache() を使う # (詳細は後述「3)cache() する必要がある」を参照) data_frame.cache() data_frame.select("_corrupt_record").show() # 全部省略せずに出力したい場合は、truncate=Falseを指定(cf. truncate = 切り捨てる) # https://dk521123.hatenablog.com/entry/2021/04/26/161342 より # data_frame.select("_corrupt_record").show(n=100, truncate=False, vertical=True) count = data_frame.filter(data_frame._corrupt_record.isNotNull()).count() print(f"Count = {count}") after_cleaning_df = data_frame.where(col("_corrupt_record").isNull()).drop("_corrupt_record") print("After cleaning") after_cleaning_df.show(truncate=False) print("Done...") except AnalysisException as ex: print("AnalysisException = " + str(ex)) raise ex except Exception as ex: print("Exception = " + str(ex)) raise ex
CSVデータ - employee.csv(「10004」がカンマ一個多い)
employee_id,employee_name,age,country 10000,Mike,23,USA 10001,Tom,33,UK 10002,Lee,43,HongKong 10003,Ken,53,Australia 10004,Alen,63,India,Mumbai 10005,Sam,63,China 10006,Smith,63,Japan
出力結果
Start! +-----------+-------------+---+---------+--------------------------+ |employee_id|employee_name|age|country |_corrupt_record | +-----------+-------------+---+---------+--------------------------+ |10000 |Mike |23 |USA |null | |10001 |Tom |33 |UK |null | |10002 |Lee |43 |HongKong |null | |10003 |Ken |53 |Australia|null | |10004 |Alen |63 |India |10004,Alen,63,India,Mumbai| |10005 |Sam |63 |China |null | |10006 |Smith |63 |Japan |null | +-----------+-------------+---+---------+--------------------------+ For _corrupt_record +--------------------+ | _corrupt_record| +--------------------+ | null| | null| | null| | null| |10004,Alen,63,Ind...| | null| | null| +--------------------+ Count = 1 After cleaning +-----------+-------------+---+---------+ |employee_id|employee_name|age|country | +-----------+-------------+---+---------+ |10000 |Mike |23 |USA | |10001 |Tom |33 |UK | |10002 |Lee |43 |HongKong | |10003 |Ken |53 |Australia| |10005 |Sam |63 |China | |10006 |Smith |63 |Japan | +-----------+-------------+---+---------+ Done...
【4】使用上の注意
http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html
https://spark.apache.org/docs/3.0.0-preview/api/java/org/apache/spark/sql/streaming/DataStreamReader.html
に記載されている事項とハマった事項をまとめておく。
1)使用できるSparkバージョン
* Spark 2.3以降である必要ある => 以下を参照
http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html
2)使用可能なファイル種類およびモード
* 使用可能なファイル種類:CSV/JSON * 使用可能なモード:mode = "PERMISSIVE" で使用可能 => デフォルトが「PERMISSIVE」なんであまり意識しないかもしれないが
3)cache() する必要がある
http://mogile.web.fc2.com/spark/spark240/sql-migration-guide-upgrade.html
パースされた結果をキャッシュあるいは保存する必要がある
サンプルより抜粋
# !! 重要 !! cache() を使う
data_frame.cache()
4)データ型(特に数字)の指定に注意
* 指定したデータ型も動作に影響して、 特に、数字(整数or実数)で動きが変わってしまうので注意 => この調査で、半日つぶした、、、 => 整数なら「LongType」or「IntegerType」。実数なら「FloatType」を明示的に指定 (どっちも来る可能性があるなら「FloatType」にしていれば、受け入れは可能)
実験
* 上記のサンプルで使用したデータ「employee.csv」の一部を 以下のように変更して実行してみる
【修正後】(数字「63」⇒「63.0」に変更)
10006,Smith,63.0,Japan
【実行結果(一部抜粋)】
For _corrupt_record +--------------------+ | _corrupt_record| +--------------------+ | null| | null| | null| | null| |10004,Alen,63,Ind...| | null| |10006,Smith,63.0,...| +--------------------+ Count = 2 After cleaning +-----------+-------------+---+---------+ |employee_id|employee_name|age|country | +-----------+-------------+---+---------+ |10000 |Mike |23 |USA | |10001 |Tom |33 |UK | |10002 |Lee |43 |HongKong | |10003 |Ken |53 |Australia| |10005 |Sam |63 |China | +-----------+-------------+---+---------+ Done...
関連記事
PySpark ~ 環境設定 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / show() ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404