【トラブル】PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する

■ はじめに

大きいファイルサイズのデータを扱った際に
エラー「Total size ... is bigger than spark.driver.maxResultSize」
が発生したので、調べてみた。
 => 凄く勉強になった、、、

 なお、余談だが
大きいテキストファイルを扱った際の調査に、以下のコマンドを使った。

大きいファイルを扱う際のコマンド
https://dk521123.hatenablog.com/entry/2020/06/12/000000

目次

【1】エラー内容
【2】原因
【3】解決案
 1)コードのリファクタリング
  [解決案 1-a] collect() や toPandas() などを使わずに実装する
  [解決案 1-b] ファイルを分割する
 2)設定値「spark.driver.maxResultSize」を上げる
  変更する場合の注意点
  Glue 上で設定値「spark.driver.maxResultSize」を変更する場合
【4】補足:Spark の 設定値について
 1)spark.driver.maxResultSize
 2)spark.driver.memory
 3)spark.executor.memory

【1】エラー内容

ERROR TaskSetManager:
Total size of serialized results of 8 tasks (1077.1 MB)
 is bigger than spark.driver.maxResultSize (1024.0 MB)

【2】原因

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#cause

より抜粋
~~~~~~~~~~~~~~~~~~~~~~~
原因
このエラーは、構成されたサイズ制限を超えたことが原因で発生します。
サイズ制限は、すべてのパーティションにわたる 
Spark アクションのシリアル化された結果の合計に適用されます。 
Spark アクションには collect()
ドライバーノードへの操作 toPandas() 、
ドライバーのローカルファイルシステムへの大きなファイルの保存
などの操作が含まれます。
~~~~~~~~~~~~~~~~~~~~~~~

https://aws.amazon.com/jp/blogs/news/optimize-memory-management-in-aws-glue/

より抜粋 (一部、機械翻訳っぽいところを改変)
~~~~~~~~~~~~~~
Spark クエリを最適化:
非効率的なクエリまたは変換は、Apache Spark ドライバーのメモリ使用率に
大きな影響を与える可能性があります。一般的な例は次のとおりです。

collect は、ワーカーから結果を収集してドライバーに返す Spark アクションです。
場合によっては、結果が非常に大きく、ドライバーを圧倒することがあります。
以下に示すように、Spark ドライバーの OOM 例外が頻繁に発生する可能性があるため、
収集を使用する際は注意することをお勧めします。

An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
Job aborted due to stage failure:
Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

今回のケースでは...

* でかい容量のデータ(RDD)に対して、collect() を実行していたため

補足:toPandas() について

* toPandas() については、以下の関連記事で少し触れている。

https://dk521123.hatenablog.com/entry/2021/05/19/143043

【3】解決案

以下の2点。

1)コードのリファクタリング
2)設定値「spark.driver.maxResultSize」を上げる

今回のケースでは...

2)でも解決できなかったので、
1)を検討して、 collect() を使用しないで実装可能だったので対応して解決した

色々なサイトでは、2)の方で解決している場合が、
本来、原因となっている1)の方から検討してみた方がいいかと、、、
=> 実運用していて、コード修正をなるべくしたくないのであれば、
 2)を試してみるのもありだが、上述の通り、増やしても発生する可能性はある

1)コードのリファクタリング

[解決案 1-a] collect() や toPandas() などを使わずに実装する

* collect() や toPandas() などを使わずにやりたいことを実装できるかを検討する

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#solution

より重要な部分を一部抜粋
~~~~~~~~~~~~~~~~~~~~~~~
場合によっては、ドライバーノードで大量のデータが収集されないように、
コードをリファクタリングする必要があります。
コードを変更して、ドライバーノードが限られた量のデータを収集するようにしたり、
ドライバーインスタンスのメモリサイズを増やしたりすることができます。

たとえば、矢印を有効にしてを呼び出す toPandas か、ファイルを書き込んでから、
大量のデータをドライバーに返すのではなく、これらのファイルを読み取ることができます。
~~~~~~~~~~~~~~~~~~~~~~~

[解決案 1-b] ファイルを分割する

上記が難しい場合は、読み込むサイズを物理的に分割してから
読み込むように実装する

以下のサイトの「大きいファイル使用時にでるエラーについて」にも
記載されているので、こちらも参考にされるといいかも。
(小さく過ぎた際のトラブルも載っているので一読してみると参考になるかも)

https://blog.engineer.adways.net/entry/2018/10/05/150000

なお、ファイルを物理的に分割する方法については、
以下の関連記事の「【2】ファイル分割して出力する」を参照のこと

PySpark ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811

2)設定値「spark.driver.maxResultSize」を上げる

https://docs.microsoft.com/ja-jp/azure/databricks/kb/jobs/job-fails-maxresultsize-exception#solution

より重要な部分を一部抜粋
~~~~~~~~~~~~~~~~~~~~~~~
spark.driver.maxResultSize <X>g
クラスター Spark 構成の例外メッセージで報告された値よりも大きい値に設定できます。

上限を設定した場合、ドライバーでメモリ不足エラーが発生することがあります
 (spark.driver.memory JVM のオブジェクトのメモリオーバーヘッドによって異なります)。
メモリ不足エラーを防ぐために、適切な制限を設定します。
~~~~~~~~~~~~~~~~~~~~~~~

変更する場合の注意点

* spark.driver.memoryの変更も検討する
 => spark.driver.memoryを変更した場合、芋づる式にspark.executor.memoryの変更も。
 => 理由は、以下の「【4】補足:Spark の 設定値について」の説明を参照。

Glue 上で設定値「spark.driver.maxResultSize」を変更する場合
https://github.com/awslabs/athena-glue-service-logs/issues/14

より抜粋
~~~~~~~~~~~
Adding two spark configs is done like this:

Key: --conf
Value: spark.driver.maxResultSize=2g --conf spark.driver.memory=8g
~~~~~~~~~~~

【4】補足:Spark の 設定値について

* 各パラメータの説明を見る前に、
 以下のサイトの図をみておくと理解しやすいかも、、、

https://techblog.gmo-ap.jp/2018/09/06/spark%E9%81%93%E5%A0%B4%E3%83%A1%E3%83%A2%E3%83%AA%E3%81%A8cpu%E6%95%B0%E3%81%AE%E8%A8%AD%E5%AE%9A%E3%82%92%E6%9C%80%E9%81%A9%E5%8C%96%E3%81%99%E3%82%8B/
1)spark.driver.maxResultSize
http://mogile.web.fc2.com/spark/configuration.html

より抜粋
~~~~~~~~~~~~~~~~~~~~
各Sparkアクション(例えば、collect)のための
全てのパーティションの直列化された結果の総サイズのバイト数の制限。
少なくとも1Mでなければなりません。
0は無制限です。
総サイズがこの制限を越えるとジョブは中止されるでしょう。
制限を高くするとドライバでout-of-memoryエラーを起こすかもしれません
(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。
適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。
~~~~~~~~~~~~~~~~~~~~
 => 今回のケースは、上記にある
 「総サイズがこの制限を越えるとジョブは中止されるでしょう。」が発生したことになる。

2)spark.driver.memory
http://mogile.web.fc2.com/spark/configuration.html

より抜粋
~~~~~~~~~~~~~~~~~~~~
SparkContextが初期化される時に使用されるメモリ量。
サイズ単位のサフィックスを持つJVMメモリ文字列と
同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。
~~~~~~~~~~~~~~~~~~~~

https://aws.amazon.com/jp/blogs/news/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

より抜粋
~~~~~~~~~~~~~~~~~~~~
これは、spark.executors.memory と等しくなるように設定することをお勧めします。

spark.driver.memory = spark.executors.memory
~~~~~~~~~~~~~~~~~~~~

3)spark.executor.memory
http://mogile.web.fc2.com/spark/configuration.html

executorプロセスごとに使用するメモリ量。
サイズ単位のサフィックスを持つJVMメモリ文字列と
同じフォーマット ("k", "m", "g" あるいは "t") (例えば 512m, 2g)。

参考文献

https://qiita.com/Ruo_Ando/items/b23b5be870cace38771f

関連記事

PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ パーティション / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811
PySpark ~ RDD <=> DataFrame の相互変換 ~
https://dk521123.hatenablog.com/entry/2021/05/19/143043