【分散処理】PySpark ~ DataFrame / show() ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/01/04/150942
https://dk521123.hatenablog.com/entry/2020/05/18/154829
https://dk521123.hatenablog.com/entry/2020/07/02/000000

の続き。

調査する際に PySpark の DataFrame の show()などを使うのだが
その際に、結構、忘れてしまうので、ここでメモっておく。

目次

【0】API仕様
【1】項目を絞って表示させたい場合
【2】条件を絞って表示させたい場合
【3】値を省略させずに表示させたい場合
【4】表示行数を増やしたい場合
【5】ソート順に並べて表示する
【6】レコードを縦に表示したい場合
【7】RDDに対してデバッグログ表示したい場合

【0】API仕様

* 以下を参照

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.show.html

DataFrame.show(n=20, truncate=True, vertical=False)

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

spark_context = SparkContext()
spark = SparkSession(spark_context)

rdd = spark_context.parallelize([
  ('x0001', 'Mike', 'Sales'),
  ('x0002', 'Tom', 'IT'),
  ('x0003', 'Sam', 'Sales'),
  ('x0004', 'Kevin', 'Human resources'),
  ('x0005', 'Bob', 'IT'),
  ('x0006', 'Alice', 'Banking'),
  ('x0007', 'Carol', 'IT'),
])
schema = StructType([
  StructField('id', StringType(), False),
  StructField('name', StringType(), False),
  StructField('job', StringType(), False),
])

data_frame = spark.createDataFrame(rdd, schema)

# 表示
data_frame.show(n=5, truncate=False, vertical=True)

出力結果

-RECORD 0---------------
 id   | x0001
 name | Mike
 job  | Sales
-RECORD 1---------------
 id   | x0002
 name | Tom
 job  | IT
-RECORD 2---------------
 id   | x0003
 name | Sam
 job  | Sales
-RECORD 3---------------
 id   | x0004
 name | Kevin
 job  | Human resources
-RECORD 4---------------
 id   | x0005
 name | Bob
 job  | IT
only showing top 5 rows

【1】項目を絞って表示させたい場合

* select() + show() を使う

サンプル

# 項目を絞って表示
data_frame.select("name").show()

# 複数で絞りたい場合
data_frame.select("id", "name").show()

【2】条件を絞って表示させたい場合

* filter() + show() を使う

サンプル

# WHERE的に条件で絞る
data_frame.filter(data_frame["name"] == "Mike").show()

【3】値を省略させずに表示させたい場合

* デフォルト表示の場合、日時や長い文字列を表示した場合、
 「...」になるので、その表示方法を記す

+--------------------+
|                 col|
+--------------------+
|2015-11-16 07:15:...|
+--------------------+

サンプル

# cf. truncate = 切り捨てる
data_frame.show(truncate=False)

【4】表示行数を増やしたい場合

サンプル

# デフォルト 20行 の表示を 30行 に増やしたい場合
data_frame.show(n=30)
# data_frame.show(30)

【5】ソート順に並べて表示する

* 必ずしもID順に表示される訳ではないので確認時に大変

解決案

* OrderBy を使う。(e.g. data_frame.orderBy(col('id').desc()).show())
 ⇒ サンプルなどは、以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2020/01/04/150942

参考文献
http://sinhrks.hatenablog.com/entry/2015/04/29/085353

【6】レコードを縦に表示したい場合

* おまけとして、verticalについて書いておく
 => vertical=True でレコードを縦に表示する

サンプル

data_frame.show(vertical=True)

【7】RDDに対してデバッグログ表示したい場合

おまけ的に。。。
以下に記載されているものを参考に。

https://stackoverflow.com/questions/25295277/view-rdd-contents-in-python-spark
サンプル

# n : 表示させたいインデックス(1とか2とか)
print(rdd.take(n))

# あくまで、障害時のデバッグログとして(通常のログには使わないこと!)
print(rdd.collect())

# collect() に関する弊害は、以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2021/04/22/131849

参考文献

https://dev.classmethod.jp/articles/prevent-long-values-from-being-omitted-in-the-show-method-of-spark-dataframe/

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
AWS Glue ~ DynamicFrame ~
https://dk521123.hatenablog.com/entry/2021/12/14/221043

【Batch】バッチで簡易テストデータを作ることを考える

■ はじめに

バッチでテストデータ(今回は、CSVファイル)を作ることを考える

はじめは、コピーして保存し実行すれば、
簡単なテストデータができればいいなーっと思ったが
ちょっとだけ凝ったもの
例えば、ランダムで文字列データ生成や指定サイズでデータを作る
などは、プログラム組んだ方が早いなーっと思った、、、

ただ、意外と学ぶものがあってよかった。

目次

【1】Tips
 1)ゼロ埋めパディング
 2)yyyy/mm/dd hh:mn:ssで取得する
 3)乱数を取得する
 4)「setlocal EnableDelayedExpansion」
【2】サンプル
 例1:指定行数までデータを生成する

【1】Tips

1)ゼロ埋めパディング

サンプル

@echo off

REM 今回は、5桁(00010)でデータを作る
set number=10

REM [1] "0" x 4(5-1(数字分)) と対象数字を結合させる(000010)
set fomated_number=0000%number%

REM [2] 5桁を超える部分は切り落とす(00010)
set id=%fomated_number:~-5%

echo %id%
pause

参考文献
https://bbh.bz/2019/09/12/windows-bat-format-0padding/

2)yyyy/mm/dd hh:mn:ssで取得する

@echo off
REM 日付を取得する
set today=%date%
REM 時間を取得する
set now=%time: =0%
echo %now%

REM 日付を年、月、日に分解する
set year=%today:~-10,4%
set month=%today:~-5,2%
set day=%today:~-2,2%

REM 時間を時、分、秒に分解する
set hour=%now:~0,2%
set minute=%now:~3,2%
set second=%now:~6,2%
REM yyyy/mm/dd hh:mm:dd
set current_datetime=%year%/%month%/%day% %hour%:%minute%:%second%

echo %current_datetime%
pause

参考文献
https://www.atmarkit.co.jp/ait/articles/0405/01/news002.html
https://hiroto1979.hatenablog.jp/entry/2017/10/14/003418

3)乱数を取得する

@echo off

REM 0~32767までの数値がランダムで取得
echo %random%
echo %random%
echo %random%
echo %random%

pause

参考文献
https://jj-blues.com/cms/wantto-randomnumber/

4)「setlocal EnableDelayedExpansion」

* for文の中で値を変更したい場合、
 for文全体を「setlocal EnableDelayedExpansion」と
 「endlocal」で挟む必要がある
* for文内で使用する変数は「%」ではなく「!」で囲む
* 詳細は、以下の関連記事の「【7】遅延環境変数」を参照のこと

https://dk521123.hatenablog.com/entry/2010/11/23/165433

【2】サンプル

例1:指定行数までデータを生成する

@echo off

REM 変数を設定
set OUTPUT_FILENAME=output.csv
set FILE_HEADER=id,name,update time,remarks
set MAX_LINE=10

REM 日付を取得する
set today=%date%
REM 時間を取得する
set now=%time: =0%

REM 日付を年、月、日に分解する
set year=%today:~-10,4%
set month=%today:~-5,2%
set day=%today:~-2,2%

REM 時間を時、分、秒に分解する
set hour=%now:~0,2%
set minute=%now:~3,2%
set second=%now:~6,2%
REM yyyy/mm/dd hh:mm:dd
set current_datetime=%year%/%month%/%day% %hour%:%minute%:%second%

REM 実行中に値を変化させたい場合
setlocal EnableDelayedExpansion

REM ****************************
REM * Main
REM ****************************
(
  REM 1行目にヘッダー
  echo %FILE_HEADER%

  REM Body部をループで作成する
  for /L %%i in (1, 1, %MAX_LINE%) do (
    REM ゼロ埋めしたIDを生成
    set number=%%i
    set fomated_number=0000!number!
    set id=!fomated_number:~-5!

    REM Body部
    echo !id!,Mike,!current_datetime!,xxxx-!random!-xxxx
  )

) > %OUTPUT_FILENAME%
REM ****************************

endlocal

出力結果

id,name,update time,remarks
00001,Mike,2021/04/24 18:10:41,xxxx-2036-xxxx
00002,Mike,2021/04/24 18:10:41,xxxx-24001-xxxx
00003,Mike,2021/04/24 18:10:41,xxxx-21998-xxxx
00004,Mike,2021/04/24 18:10:41,xxxx-9947-xxxx
00005,Mike,2021/04/24 18:10:41,xxxx-21798-xxxx
00006,Mike,2021/04/24 18:10:41,xxxx-16062-xxxx
00007,Mike,2021/04/24 18:10:41,xxxx-23932-xxxx
00008,Mike,2021/04/24 18:10:41,xxxx-998-xxxx
00009,Mike,2021/04/24 18:10:41,xxxx-2343-xxxx
00010,Mike,2021/04/24 18:10:41,xxxx-20117-xxxx

関連記事

バッチ ~入門編 ~
https://dk521123.hatenablog.com/entry/2010/07/06/223905
バッチ ~ 基本編 / 変数の扱い ~
https://dk521123.hatenablog.com/entry/2010/11/23/165433
バッチ ~ 基本編 / 制御文 ~
https://dk521123.hatenablog.com/entry/2015/08/26/221935
バッチ ~ 基本編 / コピー&削除 ~
https://dk521123.hatenablog.com/entry/2010/11/22/230218
バッチ ~ 基本編 / ファイル・ディレクトリの扱い ~
https://dk521123.hatenablog.com/entry/2010/11/23/164811
バッチ ~ 基本編 / リダイレクション ~
https://dk521123.hatenablog.com/entry/2015/03/16/232627
バッチ ~ タイマー起動 ~
https://dk521123.hatenablog.com/entry/2010/11/24/172455

【AWS】AWS Glue ~ CloudWatch Metrics ~

■ はじめに

AWS Glue の CloudWatch Metrics(メトリクス) について、
徐々にメモする。

目次

【1】公式ドキュメント
【2】関連用語
 1)ステージ(Stages)
 2)タスク(Tasks)
 3)ドライバ(Driver)
 4)エグゼキュタ(Executors)
【3】メトリクスの構成
 1)ETL Data Movement
 2)Data shuffle across executors
 3)Memory profile: Driver and Executors
 4)CPU Load: Driver and Executors
 5)Job Execution: Active Executors, Completed Stages & Maximum Needed Executors

【1】公式ドキュメント

要求の厳しいステージとストラグラータスクのデバッグ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-profile-debug-straggler.html
DPU の容量計画のモニタリング
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-debug-capacity.html

【2】関連用語

https://qiita.com/uryyyyyyy/items/ba2dceb709f8701715f7
https://qiita.com/yoshii0110/items/5b2263a0cbd2afe0ec20

を一読しておくといいかも。

1)ステージ(Stages)

* Sparkは 全体の処理を ステージ と呼ばれる単位で分割して扱う

2)タスク(Tasks)

* ステージを更に分割した処理単位(分散処理における最小処理単位)
* Task数 = Partition数 * Stage数

3)ドライバ(Driver)

* 処理全体を統括しているプロセス

4)エグゼキュタ(Executors)

* 分散処理(Task)を実際に実行するプロセス

【3】メトリクスの構成

以下で構成されている。

1)ETL Data Movement
2)Data shuffle across executors
3)Memory profile: Driver and Executors
4)CPU Load: Driver and Executors
5)Job Execution: Active Executors, Completed Stages & Maximum Needed Executors

1)ETL Data Movement

* ETL データ移動
 => 実行しているETLの
  データの読み込み(Read)と書き込み(Writtern)の推移をグラフ表示

2)Data shuffle across executors

* エグゼキュタ間のデータシャッフル

3)Memory profile: Driver and Executors

* メモリプロファイル:ドライバとエグゼキュタ

グラフの読み取りについて

* メモリ使用率が全体的に高い・低すぎる場合
 => NumberOfWorkers / Worker typeの変更を検討する

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-jobs-job.html

NumberOfWorkers 数値 (整数)。–
ジョブの実行時に割り当てられた、定義済みの workerType ワーカー数。

WorkerType – UTF-8 文字列 (有効な値: Standard="" | G.1X="" | G.2X="")。

4)CPU Load: Driver and Executors

* CPUロード:ドライバとエグゼキュタ

グラフの読み取りについて

* CPU使用率が全体的に高い・低すぎる場合
 => 「3)Memory profile: Driver and Executors」と同様に
  NumberOfWorkers / Worker typeの変更を検討する

5)Job Execution: Active Executors, Completed Stages & Maximum Needed Executors

* ジョブ実行:アクティブなエクゼキュータ、完了したステージ、必要なエクゼキュータの最大数

グラフの読み取りについて

* Max Allocated Executors を超えている場合
 => DPU数を増やせば、処理速度の改善が期待できる

* Max Allocated Executors より著しく少ない場合
 => DPU数を減らせば、コスト削減できる可能性がある

参考文献

https://buildersbox.corp-sansan.com/entry/2021/02/04/110000
https://dev.classmethod.jp/articles/20180717-aws-glue-support-etl-job-metrics/
https://qiita.com/pioho07/items/4b97b61f2ec098afb695

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926