【トラブル】PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する

■ はじめに

大きいファイルサイズのデータを扱った際に
エラー「Total size ... is bigger than spark.driver.maxResultSize」
が発生したので、調べてみた。
 => 凄く勉強になった、、、

 なお、余談だが
大きいテキストファイルを扱った際の調査に、以下のコマンドを使った。

大きいファイルを扱う際のコマンド
https://dk521123.hatenablog.com/entry/2020/06/12/000000

目次

【1】エラー内容
【2】原因
【3】解決案
 1)コードのリファクタリング
  [解決案 1-a] collect() や toPandas() などを使わずに実装する
  [解決案 1-b] ファイルを分割する
 2)設定値「spark.driver.maxResultSize」を上げる
  変更する場合の注意点
  Glue 上で設定値「spark.driver.maxResultSize」を変更する場合
【4】補足:Spark の 設定値について
 1)spark.driver.maxResultSize
 2)spark.driver.memory
 3)spark.executor.memory

【1】エラー内容

ERROR TaskSetManager:
Total size of serialized results of 8 tasks (1077.1 MB)
 is bigger than spark.driver.maxResultSize (1024.0 MB)

【2】原因

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#cause

より抜粋
~~~~~~~~~~~~~~~~~~~~~~~
原因
このエラーは、構成されたサイズ制限を超えたことが原因で発生します。
サイズ制限は、すべてのパーティションにわたる 
Spark アクションのシリアル化された結果の合計に適用されます。 
Spark アクションには collect()
ドライバーノードへの操作 toPandas() 、
ドライバーのローカルファイルシステムへの大きなファイルの保存
などの操作が含まれます。
~~~~~~~~~~~~~~~~~~~~~~~

https://aws.amazon.com/jp/blogs/news/optimize-memory-management-in-aws-glue/

より抜粋 (一部、機械翻訳っぽいところを改変)
~~~~~~~~~~~~~~
Spark クエリを最適化:
非効率的なクエリまたは変換は、Apache Spark ドライバーのメモリ使用率に
大きな影響を与える可能性があります。一般的な例は次のとおりです。

collect は、ワーカーから結果を収集してドライバーに返す Spark アクションです。
場合によっては、結果が非常に大きく、ドライバーを圧倒することがあります。
以下に示すように、Spark ドライバーの OOM 例外が頻繁に発生する可能性があるため、
収集を使用する際は注意することをお勧めします。

An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
Job aborted due to stage failure:
Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

今回のケースでは...

* でかい容量のデータ(RDD)に対して、collect() を実行していたため

補足:toPandas() について

* toPandas() については、以下の関連記事で少し触れている。

https://dk521123.hatenablog.com/entry/2021/05/19/143043

【3】解決案

以下の2点。

1)コードのリファクタリング
2)設定値「spark.driver.maxResultSize」を上げる

今回のケースでは...

2)でも解決できなかったので、
1)を検討して、 collect() を使用しないで実装可能だったので対応して解決した

色々なサイトでは、2)の方で解決している場合が、
本来、原因となっている1)の方から検討してみた方がいいかと、、、
=> 実運用していて、コード修正をなるべくしたくないのであれば、
 2)を試してみるのもありだが、上述の通り、増やしても発生する可能性はある

1)コードのリファクタリング

[解決案 1-a] collect() や toPandas() などを使わずに実装する

* collect() や toPandas() などを使わずにやりたいことを実装できるかを検討する

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#solution

より重要な部分を一部抜粋
~~~~~~~~~~~~~~~~~~~~~~~
場合によっては、ドライバーノードで大量のデータが収集されないように、
コードをリファクタリングする必要があります。
コードを変更して、ドライバーノードが限られた量のデータを収集するようにしたり、
ドライバーインスタンスのメモリサイズを増やしたりすることができます。

たとえば、矢印を有効にしてを呼び出す toPandas か、ファイルを書き込んでから、
大量のデータをドライバーに返すのではなく、これらのファイルを読み取ることができます。
~~~~~~~~~~~~~~~~~~~~~~~

[解決案 1-b] ファイルを分割する

上記が難しい場合は、読み込むサイズを物理的に分割してから
読み込むように実装する

以下のサイトの「大きいファイル使用時にでるエラーについて」にも
記載されているので、こちらも参考にされるといいかも。
(小さく過ぎた際のトラブルも載っているので一読してみると参考になるかも)

https://blog.engineer.adways.net/entry/2018/10/05/150000

なお、ファイルを物理的に分割する方法については、
以下の関連記事の「【2】ファイル分割して出力する」を参照のこと

PySpark ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811

2)設定値「spark.driver.maxResultSize」を上げる

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#solution

より重要な部分を一部抜粋
~~~~~~~~~~~~~~~~~~~~~~~
spark.driver.maxResultSize <X>g
クラスター Spark 構成の例外メッセージで報告された値よりも大きい値に設定できます。

上限を設定した場合、ドライバーでメモリ不足エラーが発生することがあります
 (spark.driver.memory JVM のオブジェクトのメモリオーバーヘッドによって異なります)。
メモリ不足エラーを防ぐために、適切な制限を設定します。
~~~~~~~~~~~~~~~~~~~~~~~

変更する場合の注意点

* spark.driver.memoryの変更も検討する
 => spark.driver.memoryを変更した場合、芋づる式にspark.executor.memoryの変更も。
 => 理由は、以下の「【4】補足:Spark の 設定値について」の説明を参照。

Glue 上で設定値「spark.driver.maxResultSize」を変更する場合
https://github.com/awslabs/athena-glue-service-logs/issues/14

より抜粋
~~~~~~~~~~~
Adding two spark configs is done like this:

Key: --conf
Value: spark.driver.maxResultSize=2g --conf spark.driver.memory=8g
~~~~~~~~~~~

【4】補足:Spark の 設定値について

* 各パラメータの説明を見る前に、
 以下のサイトの図をみておくと理解しやすいかも、、、

https://techblog.gmo-ap.jp/2018/09/06/spark%E9%81%93%E5%A0%B4%E3%83%A1%E3%83%A2%E3%83%AA%E3%81%A8cpu%E6%95%B0%E3%81%AE%E8%A8%AD%E5%AE%9A%E3%82%92%E6%9C%80%E9%81%A9%E5%8C%96%E3%81%99%E3%82%8B/
1)spark.driver.maxResultSize
http://mogile.web.fc2.com/spark/configuration.html

より抜粋
~~~~~~~~~~~~~~~~~~~~
各Sparkアクション(例えば、collect)のための
全てのパーティションの直列化された結果の総サイズのバイト数の制限。
少なくとも1Mでなければなりません。
0は無制限です。
総サイズがこの制限を越えるとジョブは中止されるでしょう。
制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません
(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。
適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。
~~~~~~~~~~~~~~~~~~~~
 => 今回のケースは、上記にある
 「総サイズがこの制限を越えるとジョブは中止されるでしょう。」が発生したことになる。

2)spark.driver.memory
http://mogile.web.fc2.com/spark/configuration.html

より抜粋
~~~~~~~~~~~~~~~~~~~~
SparkContextが初期化される時に使用されるメモリ量。
サイズ単位のサフィックスを持つJVMメモリ文字列と
同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
~~~~~~~~~~~~~~~~~~~~

https://aws.amazon.com/jp/blogs/news/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

より抜粋
~~~~~~~~~~~~~~~~~~~~
これは、spark.executors.memory と等しくなるように設定することをお勧めします。

spark.driver.memory = spark.executors.memory
~~~~~~~~~~~~~~~~~~~~

3)spark.executor.memory
http://mogile.web.fc2.com/spark/configuration.html

executorプロセスごとに使用するメモリ量。
サイズ単位のサフィックスを持つJVMメモリ文字列と
同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。

参考文献

https://qiita.com/Ruo_Ando/items/b23b5be870cace38771f

関連記事

PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811
PySpark ~ RDD <=> DataFrame の相互変換 ~
https://dk521123.hatenablog.com/entry/2021/05/19/143043

【AWS】AWS Glue ~ Boto3 / 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/10/14/000000

の続き。長くなったので、分冊。

 今回は、boto3 API を使って、
PythonでGlueのコンポーネント(Workflow/Job/Trigger ※)を
デプロイする簡単なサンプルの実装および
環境周りなどの注意事項をまとめておく。

※ Crawler (クローラ)については、以下の関連記事を参照のこと

AWS Glue ~ Boto3 / クローラ編 ~
https://dk521123.hatenablog.com/entry/2021/04/16/135558

目次

【1】使用上の注意
 1)IAM権限について
 2)Workflow の WorkflowGraph 作成について
 3)「Spark SQL 用に データカタログ を有効」にする際の注意点
 4)その他 / トラブル

【2】サンプル
 例1:ジョブ作成 (create_job)
 例2:トリガー作成 (create_trigger) with YAML
 例3:ワークフロー存在チェック (get_workflow) with 例外処理

【1】使用上の注意

1)IAM権限について

以下のIAMロールが必要になる

* iam:PassRole
* AWSGlueServiceRole
* S3系の権限
etc

2)Workflow の WorkflowGraph 作成について

* Workflow の WorkflowGraph は、CreateTrigger() のタイミングでのみ作成される。
  => UpdateTrigger()では作成されない
  => トリガーを再作成する必要がある
  => DeleteTrigger() を実行
    -> GetTrigger で EntityNotFoundException になるまで待つ 
         (Response["Trigger"]["State"] で"DELETING"になっている)
    -> CreateTrigger() を再実行

WorkflowGraph
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-workflow.html#aws-glue-api-workflow-WorkflowGraph
CreateTrigger / UpdateTrigger
https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTrigger.html
https://docs.aws.amazon.com/glue/latest/webapi/API_UpdateTrigger.html
DeleteTrigger / GetTrigger
https://docs.aws.amazon.com/glue/latest/webapi/API_DeleteTrigger.html
https://docs.aws.amazon.com/glue/latest/webapi/API_GetTrigger.html

3)「Spark SQL 用に データカタログ を有効」にする際の注意点

「--enable-glue-datacatalog: ""」を追加する

※ YAMLやJSONなどで指定した場合、「""」も必要
 (これで、若干はまった)

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-glue-data-catalog-hive.html
https://qiita.com/pioho07/items/34716b2b63761100096a

4)その他 / トラブル

詳細について、以下の関連記事を参照のこと。
 => 書き溜めていたら、めちゃくちゃ多くなってしまった。。。

boto3 AWS Glue API のトラブル ~ trigger全般 編 ~
https://dk521123.hatenablog.com/entry/2020/10/23/110821
boto3 AWS Glue API のトラブル ~ scheduled trigger編 ~
https://dk521123.hatenablog.com/entry/2020/01/16/205331
boto3 AWS Glue API のトラブル ~ job/crawler編 ~
https://dk521123.hatenablog.com/entry/2020/02/05/223307

【2】サンプル

例1:ジョブ作成 (create_job)

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job

import boto3

glue = boto3.client('glue')

# ジョブ作成
job = glue.create_job(
  Name='sample-job',
  Role='Glue_IAMRole',
  Command={
      'Name': 'glueetl',
       # ★S3上にあるPythonソースを指定★
      'ScriptLocation': 's3://xxxx/yyyy/etl_sample.py'
  }
)

例2:トリガー作成 (create_trigger) with YAML

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_trigger

import boto3
import yaml

s3_client = boto3.client('s3')
glue_client = boto3.client('glue')

response = s3_client.get_object(
  Bucket='bucket-name',
  # Key="xxx/yyyy/zzzzz/trigger_on_demand.yaml")
  Key="xxx/yyyy/zzzzz/trigger_scheduled.yaml")
config = yaml.safe_load(response["Body"])

response_of_create_trigger = glue_client.create_trigger(**config)
print(response_of_create_trigger)

trigger_on_demand.yaml

Name: 'sample-trigger'
WorkflowName: 'sample-workflow'
Type: 'ON_DEMAND'
Actions:
   - JobName: 'sample-job'
     Arguments:
         --key1: 'value1'
Description: 'This is a sample trigger'

trigger_scheduled.yaml

Name: 'sample-trigger'
WorkflowName: 'sample-workflow'
Type: 'SCHEDULED'
# 指定の仕方は、以下のURLを参照。
# https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-data-warehouse-schedule.html
# UTC時間で指定 https://www.jisakeisan.com/timezone/utc/
Schedule: 'cron(30 12 31 1 ? 2020)'
# 「StartOnCreation: True」にしないと動作しない
# 詳細は以下の関連記事の「【2】create_trigger で SCHEDULED...」を参照
# https://dk521123.hatenablog.com/entry/2020/01/16/205331
StartOnCreation: True
Actions:
   - JobName: 'sample-job'
     Arguments:
         --key1: 'value1'
Description: 'This is a sample trigger'

例3:ワークフロー存在チェック (get_workflow) with 例外処理

import boto3
try:
    from botocore.exceptions import BotoCoreError, ClientError
except ImportError:
    pass

def main():
    glue_client = boto3.client('glue')

    has_workflow = False
    workflow_name = 'sample-workflow'
    try:
        response = glue_client.get_workflow(
            Name=workflow_name,
            IncludeGraph=True)
        print(response)
        has_workflow = True
    except(BotoCoreError, ClientError) as ex:
        if ex.response['Error']['Code'] == 'EntityNotFoundException':
            has_workflow = False
        else:
            raise ex

    print('Is there the workflow {}? => {}'.format(
        workflow_name, has_workflow))

if __name__ == '__main__':
    main()

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ Boto3 / 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/14/000000
AWS Glue ~ Boto3 / クローラ編 ~
https://dk521123.hatenablog.com/entry/2021/04/16/135558
AWS Glue ~ Boto3 / セキュリティ設定編 ~
https://dk521123.hatenablog.com/entry/2020/04/08/171208
AWS Glue ~ Boto3 / Glue connection編 ~
https://dk521123.hatenablog.com/entry/2020/01/29/224525
AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
AWS Glue ~ Boto3 / DB・テーブル操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/11/164015
boto3 AWS Glue API のトラブル ~ trigger全般 編 ~
https://dk521123.hatenablog.com/entry/2020/10/23/110821
boto3 AWS Glue API のトラブル ~ scheduled trigger編 ~
https://dk521123.hatenablog.com/entry/2020/01/16/205331
boto3 AWS Glue API のトラブル ~ job/crawler編 ~
https://dk521123.hatenablog.com/entry/2020/02/05/223307
AWS Glue のトラブル ~ job編 - [1] ~
https://dk521123.hatenablog.com/entry/2019/10/25/232155
AWS Glue のトラブル ~ job編 - [2] ~
https://dk521123.hatenablog.com/entry/2020/10/12/152659
AWS Glue のトラブル ~ job編 - [3] ~
https://dk521123.hatenablog.com/entry/2021/02/16/145848
AWS Glue のトラブル ~ crawler編 ~
https://dk521123.hatenablog.com/entry/2020/05/07/144132

【AWS】AWS Glue ~ Boto3 / クローラ編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/10/14/000000
https://dk521123.hatenablog.com/entry/2021/04/17/001930

の続き。

boto3 を使ったクローラ作成時に結構エラーが出たので、
備忘録的にメモしておく。

目次

【1】各設定値について
 1)Targets
 2)Configuration
 3)SchemaChangePolicy
 4)DatabaseName
 5)TablePrefix
【2】サンプル
 例1:クローラ作成 / S3パスへのクローリング
 例2:クローラ作成 / テーブルへのクローリング

【1】各設定値について

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler

での各設定値の簡単なメモ。

1)Targets

https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-glue-crawler-targets.html
https://docs.aws.amazon.com/ja_jp/glue/latest/webapi/API_CrawlerTargets.html

クローラ対象の指定。

* CatalogTargets
 => データカタログを対象とする

* DynamoDBTargets
 => DynamoDBを対象とする

* JdbcTargets
 => JDBC(DB/RDS)を対象とする

* S3Targets
 => データカタログを対象とする

2)Configuration

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/crawler-configuration.html

3)SchemaChangePolicy

テーブル定義に変更した場合、どう振る舞うかを設定する

https://docs.aws.amazon.com/glue/latest/webapi/API_SchemaChangePolicy.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/crawler-configuration.html
DeleteBehavior (削除された場合の振る舞い)

* LOG
 => 変更を無視(削除しない)し、ログに書き込む

* DELETE_FROM_DATABASE
 => Data Catalog からテーブルとパーティションを削除

* DEPRECATE_IN_DATABASE (デフォルト)
 => Data Catalog​ でテーブルを廃止としてマークを付ける

UpdateBehavior (更新された場合の振る舞い)

* LOG
 => 変更を無視(更新しない)し、ログに書き込む

* UPDATE_IN_DATABASE
 => AWS Glue データカタログ のテーブルを更新

4)DatabaseName

* 出力先のデータベース名

5)TablePrefix

* 出力するテーブル名に対してプレフィックスを付加
* e.g. TablePrefix: 'ex_' => ex_person, ex_employee

【2】サンプル

例1:クローラ作成 / S3パスへのクローリング

import boto3
import yaml

s3_client = boto3.client('s3')
glue_client = boto3.client('glue')

# S3上にあるYAMLファイルを取り込む
response = s3_client.get_object(
  Bucket='bucket-name',
  Key="xxx/yyyy/zzzzz/crawler.yaml")
config = yaml.safe_load(response["Body"])

# ★ここでクローラ作成を実行★
response_of_create_crawler = \
  glue_client.create_crawler(**config)
print(response_of_create_crawler)

crawler.yaml

Name: 'sample-crawler'
Description: 'This is a sample crawler'
Role: 'Glue_IAMRole'
Database: 'sample_db'
TablePrefix: 'demo_glue_'
# https://docs.aws.amazon.com/glue/latest/dg/crawler-configuration.html
# Configuration: "{\"Version\": 1.0,\"CrawlerOutput\": {\"Partitions\":{\"AddOrUpdateBehavior"\: \"InheritFromTable\"}}}"
# って書き方もできる
Configuration: |
  {
     "Version": 1.0,
     "CrawlerOutput": {
       "Partitions": {
         "AddOrUpdateBehavior": "InheritFromTable"
       }
     }
  }
SchemaChangePolicy:
  UpdateBehavior: 'LOG'
Targets:
  S3Targets:
    - Path: s3://your-bucket/aaaa/bbbb
    - Path: s3://your-bucket/cccc/dddd
Tags:
  Name: 'sample-crawler'

例2:クローラ作成 / テーブルへのクローリング

* Pythonコード自体は、そのままなので省略

注意点

* テーブルへのクローリングする際には、
 事前にクロールするテーブルがないとエラーが発生する

crawler.yaml

Name: 'sample2-crawler'
Description: 'This is a sample crawler'
Role: 'Glue_IAMRole'
# https://docs.aws.amazon.com/ja_jp/glue/latest/dg/crawler-configuration.html
Configuration: |
  {
     "Version": 1.0,
     "CrawlerOutput": {
       "Partitions": {
         "AddOrUpdateBehavior": "InheritFromTable"
       },
       "Tables": {
         "AddOrUpdateBehavior": "MergeNewColumns"
       }
     }
  }
# https://docs.aws.amazon.com/glue/latest/webapi/API_SchemaChangePolicy.html
SchemaChangePolicy:
  UpdateBehavior: 'UPDATE_IN_DATABASE'
  DeleteBehavior: 'DEPRECATE_IN_DATABASE'
Targets:
  CatalogTargets:
    - DatabaseName: 'sample_db1'
      Tables:
        - 'sample_table1'
    - DatabaseName: 'sample_db2'
      Tables:
        - 'sample_table2'
Tags:
  Name: 'sample2-crawler'

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ Boto3 / 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/14/000000
AWS Glue ~ Boto3 / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/17/001930
AWS Glue ~ Boto3 / セキュリティ設定編 ~
https://dk521123.hatenablog.com/entry/2020/04/08/171208
AWS Glue ~ Boto3 / Glue connection編 ~
https://dk521123.hatenablog.com/entry/2020/01/29/224525
Pulumi ~ AWS Glue のデプロイ ~
https://dk521123.hatenablog.com/entry/2022/03/02/122037