■ はじめに
久しぶりのGlueネタ。
https://dk521123.hatenablog.com/entry/2022/12/03/000119
で、Glue のパフォーマンスチューニングに関して書いたが 更に、AWS Black Belt Online Seminarで学ぶべきことが多かったので 徐々にメモしておく
目次
【1】AWS Black Belt Online Seminar 1)動画 2)その際の資料 【2】チューニングする際の⾒るべきメトリクス 1)Spark UI(Spark History Server) 2)Executor/Driverログ 3)AWS Glue ジョブメトリクス 4)SparkのAPIで得られる統計情報 【3】チューニングの基本戦略 1)新しいバージョンを利⽤する 2)データのI/O負荷を⼩さくする 3)シャッフルを最⼩限にする 4)タスク単位の処理を⾼速化する 5)並列化する 【4】補足:「ファイルがsplittable」について
【1】AWS Black Belt Online Seminar
https://aws.amazon.com/jp/blogs/news/webinar-bb-glueetlperformancetuning-2021/
1)動画
猫でもわかる、AWS Glue ETLパフォーマンス・チューニング
https://youtu.be/k-qTKz0xG-0?t=158
https://youtu.be/0a1MA3rI3pY?t=190
2)その際の資料
https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance1.pdf
https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance2.pdf
【2】チューニングする際の⾒るべきメトリクス
* Black Beltだけでなく、以下も参考になるかも。
https://repost.aws/ja/knowledge-center/glue-job-running-long
1)Spark UI(Spark History Server)
* Sparkの処理の詳細を確認できる
Dockerで起動する例
git clone aws-samples/aws-glue-samples cd aws-glue-samples/utilities/Spark_UI/glue-1_0-2_0 docker build -t glue/sparkui:latest . # 「s3a://path_to_eventlog」は自分が使用しているS3バケット docker run -itd -e SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=s3a://path_to_eventlog -Dspark.hadoop.fs.s3a.access.key=AWS_ACCESS_KEY_ID -Dspark.hadoop.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY" -p 18080:18080 glue/sparkui:latest "/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer” # => 起動したら、ブラウザで http://localhost:18080 にアクセス
2)Executor/Driverログ
* ExecutorとDriverのstdout/stderrログを確認できる
3)AWS Glue ジョブメトリクス
* 各ExecutorやDriverノードのCPU/メモリなどの リソース使⽤状況を確認できる
4)SparkのAPIで得られる統計情報
* データのサンプルや統計値を確認できる
【3】チューニングの基本戦略
1)新しいバージョンを利⽤する 2)データのI/O負荷を⼩さくする 3)シャッフルを最⼩限にする 4)タスク単位の処理を⾼速化する 5)並列化する
1)新しいバージョンを利⽤する
* 新しいJobを作る場合、特に理由がなければ最新バージョンを使う
2)データのI/O負荷を⼩さくする
https://youtu.be/0a1MA3rI3pY?t=353
* データI/O負荷を最⼩限にする方法は、以下の通り、
[1] 必要なデータだけ読む
* 読み取りデータ量を削減する機能として、以下がある a) Partition Filtering * filter句やWhere句で指定されたパーティション内のファイルのみを読み取る機能 * Text/CSV/JSON/ORC/Parquetで利用可能 b) Filter Pushdown * パーティション列に利用されていない列に対するfilter句やwhere句に ヒットするブロックのみを読み取る機能 * AWS GlueではParquetを利用した場合に自動的に適用される << ここでもParquetのメリットが
[2] 1taskで読むデータ量をコントロールする
* データの読み取り/書き込みのタスクは、基本的に1ファイルに紐づく * ファイルがsplittable(※補足参照)な場合は、1ファイルを複数タスクに分割できる * AWS Glueで扱うファイルサイズは128MB-512MBを推奨
[3] 適切な圧縮形式を選ぶ
* 分析⽤途に向いたデータフォーマットである『Apache Parquet』を使⽤するといい * 理由としては、不要なデータの読み⾶ばしやメタデータを利⽤した集計や SparkエンジンではApache Parquetを効率的に利⽤できる インテグレーションが⾏われているため * gzipで圧縮されたファイルは、分割できないため注意
3)シャッフルを最⼩限にする
https://youtu.be/0a1MA3rI3pY?t=1057
* cacheをうまく利⽤する * filter処理をなるべく前段で⾏う * データを⼩さく保つようにjoinの順番を⼯夫する * joinを使い分ける(Sort Merge Join/Broadcast Join/Shuffle Hash Join) * データの偏りへの対処 * self joinによるデータ加⼯の代わりにWindow処理を利⽤する
4)タスク単位の処理を⾼速化する
https://youtu.be/0a1MA3rI3pY?t=1532
Scalaを利⽤する
* DataFrameの殆どの処理は、PySparkで記述しても 内部的にはJavaに変換して、JVM上で動作するが、 以下についてはPythonを利⽤すると処理が遅くなる * Scalaを利⽤することで処理が⾼速化する + RDDで処理を記述している部分 => RDDで記述された処理はオプティマイザによる最適化が⾏われない + UDFを利⽤している部分 => IteratorごとにシリアライゼーションとPythonプロセスへのパイプが発⽣ => PythonプロセスのメモリはJVMの制御が⾏われない => もしPySparkを使う場合、PythonUDFよりPandasUDFを利⽤する(以下参照)
PythonUDFよりPandasUDFを利⽤する
a) Python UDF * シリアライズ/デシリアライズはPicklingで⾏われる * データはブロックごとにfetchされるが、UDF処理は⾏毎に実⾏される b) Pandas UDF << ★こっちがお勧め * シリアライズ/デシリアライズはApache Arrowで⾏われる * データのfetchもUDF処理も複数⾏まとめてに実⾏される
5)並列化する
https://youtu.be/0a1MA3rI3pY?t=1670
* データの偏り(Skewness)がないようにし、並列化する => 偏りがある場合、データ均一化を試みる
データ均一化の対応
[1] インプットファイル作成時にファイルサイズを均一化するようにする [2] Repatitionする [3] broadcast joinする [4] saltingする
【4】補足:「ファイルがsplittable」について
* splittable = 分裂可能な * 大きなunspilitableデータが1ファイルにあるとき、 以下のようなマイナス現象が発生して良いことがない + 1つのノード上でメモリにデータが乗り切らない + 分散処理されない
gzip | bzip2 | lzo | snappy | |
---|---|---|---|---|
Is splittable | No | No(※) | Yes, if indexed | No |
* bzip2, lzo, snappyで圧縮されたファイルは読み込み時に分割して処理することができる * gzip(※)で圧縮されたファイルは分割できない
※
* Parquetの場合はgzipでも分割できる
関連記事
AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ パフォーマンスチューニング ~
https://dk521123.hatenablog.com/entry/2022/12/03/000119