【分散処理】PySpark ~ CSV / escape 編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/07/30/195226
https://dk521123.hatenablog.com/entry/2020/07/09/000832

の続き。

PandasでCSVを正しくパースされていたファイルが
PySparkで表示しようとしたらデータが崩れてしまったので
調べてみたら、escapeが関連してそうなのでメモ。

■ 今回の例

* 以下 入力ファイル において、、、

【id=3】

 A) PySpark (「1)PySpark の場合(NGケース)」)では
 「"J\"」でエスケープしてしまい、うまくデータが取れない
 B) Pandas (2)Pandas の場合(OKケース))では、問題なし

【id=4】

 A) PySpark (「1)PySpark の場合(NGケース)」)では
 「""Dublin,Ireland"」でカンマを区切り文字として認識してしまい、
 データがぐれてしまう
 B) Pandas (2)Pandas の場合(OKケース))では、問題なし

入力ファイル:hello_world.csv

id,last_name,first_name,city,birth_date,year,month,day
"1","Bloggs","Joe","Dublin","2020-12-22","2020","11","11"
"2","Joseph","Smith","Salt Lake City","1982-09-11","2020","11","11"
"3","J\","Mike","Tokyo","1991-01-01","2020","11","11"
"4","Kennedy","John","""Dublin,Ireland","1991-01-01","2020","11","11"
"5","Twain","Kevin","Osaka","1991-01-01","2020","11","11"

1)PySpark の場合(NGケース)

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv(
    "hello_world.csv", header=True)
  data_frame.show(truncate=False)

if __name__ == '__main__':
  main()

出力結果

+----------+----------+----------+--------------+----------+----------+-----+----+
|id        |last_name |first_name|city          |birth_date|year      |month|day |
+----------+----------+----------+--------------+----------+----------+-----+----+
|1         |Bloggs    |Joe       |Dublin        |2020-12-22|2020      |11   |11  |
|2         |Joseph    |Smith     |Salt Lake City|1982-09-11|2020      |11   |11  |
|3         |"J","Mike"|Tokyo     |1991-01-01    |2020      |11        |11   |null|<< ★うまくとれてない★
|4         |Kennedy   |John      |"""Dublin     |Ireland"  |1991-01-01|2020 |11  |<< ★うまくとれてない★
|5         |Twain     |Kevin     |Osaka         |null      |null      |null |null|
|1991-01-01|2020      |11        |11            |null      |null      |null |null|
+----------+----------+----------+--------------+----------+----------+-----+----+

2)Pandas の場合(OKケース)

import pandas as pd

def main():
  pd.set_option('display.max_rows', None)
  pd.set_option('display.max_columns', None)

  data_frame = pd.read_csv('hello_world.csv', encoding='UTF-8')
  print(data_frame)

if __name__ == '__main__':
    main()

出力結果

   id last_name first_name             city  birth_date  year  month  day
0   1    Bloggs        Joe           Dublin  2020-12-22  2020     11   11
1   2    Joseph      Smith   Salt Lake City  1982-09-11  2020     11   11
2   3        J\       Mike            Tokyo  1991-01-01  2020     11   11
3   4   Kennedy       John  "Dublin,Ireland  1991-01-01  2020     11   11
4   5     Twain      Kevin            Osaka  1991-01-01  2020     11   11

■ 今回の解決案

* spark.read.csv() の escape を明示的に指定する
 => 指定しない or escape=None の場合、
  「\」なのでエスケープされてしまうので
  今回の場合、「escape="\""」を指定した

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv

サンプル:PySpark (OKケース)

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv(
    # 「, escape="\""」を追加
    "hello_world.csv", header=True, escape="\"")
  data_frame.show(truncate=False)

if __name__ == '__main__':
  main()

出力結果

+---+---------+----------+---------------+----------+----+-----+---+
|id |last_name|first_name|city           |birth_date|year|month|day|
+---+---------+----------+---------------+----------+----+-----+---+
|1  |Bloggs   |Joe       |Dublin         |2020-12-22|2020|11   |11 |
|2  |Joseph   |Smith     |Salt Lake City |1982-09-11|2020|11   |11 |
|3  |J\       |Mike      |Tokyo          |1991-01-01|2020|11   |11 |
|4  |Kennedy  |John      |"Dublin,Ireland|1991-01-01|2020|11   |11 |
|5  |Twain    |Kevin     |Osaka          |1991-01-01|2020|11   |11 |
+---+---------+----------+---------------+----------+----+-----+---+

関連記事

PySpark ~ 環境設定 + Hello World / Windows編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
Pandas ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/22/014957

【トラブル】【Hive】Hive に関するトラブルシューティング

■ はじめに

https://dk521123.hatenablog.com/entry/2020/05/28/175428

の続き。

Hiveで発生したトラブルをまとめる。

目次

【1】エラー「Cannot inspect org.apache.hadoop.io.IntWritable」が発生する
【2】エラー「java.lang.NoClassDefFoundError: scala/collection/Iterable」が発生する
【3】エラー「FAILED: Execution Error, return code 1」が発生する
【4】エラー「ConsistencyException」が表示

【1】エラー「Cannot inspect org.apache.hadoop.io.IntWritable」が発生する

Parquet ファイル から外部参照テーブルを作成したが、
テーブルを表示する際に以下のエラーが表示されて、データが表示できない

エラー内容

[Amazon][HiveJDBCDriver](500312) Error in fetching data rows:
*org.apache.hive.service.cli.HiveSQLException:java.io.IOException:
org.apache.hadoop.hive.ql.metadata.HiveException:java.lang.UnsupportedOperationException:
Cannot inspect org.apache.hadoop.io.IntWritable:25:24;

原因

Parquet ファイル で定義したデータ型(int型)と
外部参照テーブル で定義したデータ型(string型)で不一致を起こしていた。

補足:調査方法

Parquetファイルをstring型で作り直し、
外部参照テーブル で定義したデータ型(string型)で一致させ
SELECTしてデータを表示させる。
 ⇒ 今回の場合、データが不正な形で格納されていた

解決案

* データが正しい形になるようにParquetファイルを作成しなおし、
 データ型も外部テーブルと一致させるように修正

【2】エラー「java.lang.NoClassDefFoundError: scala/collection/Iterable」が発生する

ある環境で動いていたHivQLを実行したら、以下の「エラー内容」が発生した

エラー内容

その1

Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Iterable
 ・・・
Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
 ・・・

その2:別エラー

[HiveJDBCDriver](500051) ERROR processing query/statement.
Error Code: 1, SQL state: org.apache.hive.service.cli.HiveSQLException:
Error while processing statement:
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tex.TezTask
   at org....
. Query: SQL COUNT(*) FROM ... 

対応案

エンジンを変更した。

今回の場合、以下のようにした。
(各自の状況で、別のエンジンの値に変更する必要がある)

【修正前】

SET hive.execution.engine=spark;

# エラー内容・その2の場合は、未指定。

【修正後】

# MapReduce エンジンを利用する
SET hive.execution.engine=mr;

hive.execution.engine について

以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/03/04/225943

【3】エラー「FAILED: Execution Error, return code 1」が発生する

https://dk521123.hatenablog.com/entry/2020/07/16/224332

で扱った
MSCK REPAIR TABLE table_name;
を実行した際に、以下の「エラー内容」が表示された

エラー内容

FAILED: Execution Error, return code 1
 from org.apache.hadoop.hive.ql.exec.DDLTask
/mnt/var/.../xxx.hql maximum number of executions has been reached.
run query fail.
command exiting with ret '1'

解決案

# 今回、たまたまだったかもしれないが、
* 不要なファイル「xxxxx_$folder$」を削除する

別解
https://stackoverflow.com/questions/23333075/hive-execution-error-return-code-1-from-org-apache-hadoop-hive-ql-exec-ddltask

# 試してないけど、、、
~~~~
set hive.msck.path.validation=ignore;
MSCK REPAIR TABLE table_name;
~~~~

* hive.msck.path.validation の詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/07/16/224332

【4】エラー「ConsistencyException」が表示

 単純なSELECT文を実行した際に、
エラー「ConsistencyException」が表示されてしまった

エラー内容

エラー内容1

[Code: 500312, SQL State: HY000] [Amazon][HiveJDBCDriver] (500312)
Error in fetching data rows:
*org.apache.hive.service.cli.HiveSQLException; java.io.IOException:
com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException:
eTag in metadata for File '/<s3 bucket>/xxxx/xxxx' does not match eTag from s3!:xx:xx;

エラー内容2

[Code: 500312, SQL State: HY000] [Amazon][HiveJDBCDriver] (500312)
Error in fetching data rows:
*org.apache.hive.service.cli.HiveSQLException; java.io.IOException:
com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException:
2 items inconsistent (no s3 object for associated metadata item).
First object: /<s3 bucket>/xxxx/xxxx;

対応案

emrfs syncコマンドを実行してみる
~~~~
emrfs sync s3://<対象S3バケット>/<対象パス>
~~~~

emrfs syncコマンドについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/11/13/145545
補足:emrfs syncコマンドでダメな場合

以下のサイトだとemrfs syncコマンドだけではだめで、
メタデータを作り直して対応できたらしい。

1) emrfs delete-metadata
2) emrfs create-metadata
3) emrfs sync

http://ritchiekotzen.hatenablog.com/entry/2015/07/30/EMR_%E3%81%A7S3%E3%81%AE%E3%80%8C%E6%95%B4%E5%90%88%E6%80%A7%E3%81%AE%E3%81%82%E3%82%8B%E3%83%93%E3%83%A5%E3%83%BC%E3%80%8D%E3%81%AE%E3%82%A8%E3%83%A9%E3%83%BC

関連記事

Amazon EMR ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2020/05/27/175610
Amazon EMR に関するトラブルシューティング
https://dk521123.hatenablog.com/entry/2020/08/05/144724
Hive / Partition に関するトラブルシューティング
https://dk521123.hatenablog.com/entry/2020/05/28/175428

【Hive】Hiveクエリで入力ファイル名を取得するには

■ はじめに

小ネタ。

今回は、Hiveクエリで入力ファイル名を取得する方法をメモする。

なお、PySparkでは、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/11/15/000000

■ Hiveクエリで入力ファイル名を取得するには

* INPUT__FILE__NAME から取得できる
 => 区切り文字「_」が2つづつ。
 => フルパスで取得。

■ サンプル

SELECT
 INPUT__FILE__NAME AS file_name
FROM
 sample_table;

出力結果

file_name
-----------------------------------------------------
s3://your-s3-bucket/xxx/xxx/hello_world_01.csv
s3://your-s3-bucket/xxx/xxx/hello_world_01.csv
s3://your-s3-bucket/xxx/xxx/hello_world_01.csv
s3://your-s3-bucket/xxx/xxx/hello_world_02.csv
...

参考文献

https://www.it-swarm-ja.tech/ja/hadoop/hive%E3%82%AF%E3%82%A8%E3%83%AA%E5%86%85%E3%81%AE%E5%88%97%E3%81%A8%E3%81%97%E3%81%A6%E5%85%A5%E5%8A%9B%E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB%E5%90%8D%E3%82%92%E5%8F%96%E5%BE%97%E3%81%99%E3%82%8B%E6%96%B9%E6%B3%95/1073402434/

関連記事

Hive / HiveQL ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2019/11/25/235219
Hive / HiveQL ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/02/25/231235
Hive / HiveQL ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2020/06/02/183823
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2020/11/15/000000