【AWS】AWS Glue ~ DynamicFrame ~

■ はじめに

Glue の DynamicFrame で引数の設定をミスっていて
ハマりにハマったので、メモしておく。

目次

【0】関連するAPI
 1)DynamicFrameReader.from_options
【1】サンプル
 例1:CSVからParquetへ変換
【2】DynamicFrame あれこれ
 1)s3 の指定パスから再帰的にファイルを取得するには
 2)データ数の確認
 3)データの表示

【0】関連するAPI

1)DynamicFrameReader.from_options

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format.html

【1】サンプル

例1:CSVからParquetへ変換

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


# ジョブ初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)

# from_options関数で、「s3://your-s3-bucket-name/inputs/」配下の
# 全ファイルをCSVファイル(区切り文字「|」)として読み込む
# See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options  
dynamic_frame = glue_context.create_dynamic_frame.from_options(
  connection_type="s3",
  # See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect.html
  connection_options={
    "paths": ["s3://your-s3-bucket-name/inputs/"],
    "recurse": True
  },
  format="csv",
  # See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format.html
  format_options={
    "separator": "|",
    "withHeader": True,
    "quoteChar": '"',
    "multiLine": True
  }
)

if dynamic_frame.count() == 0:
  print("No data...")
else:
  # データの情報を表示
  print('Count: {0}'.format(dynamic_frame.count()))
  dynamic_frame.printSchema()

  # データの中身を表示
  dynamic_frame.show(dynamic_frame.count())

  # Apache Spark DataFrame に変換
  data_frame = dynamic_frame.toDF()
  # Parquet形式でファイル出力(書き込み処理)
  data_frame.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("compression", "snappy") \
    .save("s3://your-s3-bucket-name/outputs/")

# ジョブコミット
job.commit()

【2】DynamicFrame あれこれ

1)s3 の指定パスから再帰的にファイルを取得するには

* この指定を間違ったので、えらいハマった。。。

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect.html

"connectionType": "s3" より抜粋
~~~~~~~~
"recurse": (オプション) true に設定した場合は、
指定したパスの下にあるすべてのサブディレクトリ内の
ファイルを再帰的に読み取ります。
~~~~~~~~

cf. recurse(リカース) = 再帰, 再帰的に処理する

2)データ数の確認

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-count

* dynamic_frame.count() を使う

3)データの表示

* dynamic_frame.show() を使う
 => toDF()して、PySparkのDataFrameに変換すれば、
  DataFrame.show(n=20, truncate=True, vertical=False) のように指定できる
 => PySparkのDataFrameの詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/04/26/161342

参考文献

https://dev.classmethod.jp/articles/convert-tsv-to-parquet-with-glue-spark-job/

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / ジョブ ~
https://dk521123.hatenablog.com/entry/2019/11/17/231505
PySpark ~ DataFrame / show() ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
Python で Parquet を扱う
https://dk521123.hatenablog.com/entry/2021/11/13/095519