■ はじめに
Glue の DynamicFrame で引数の設定をミスっていて ハマりにハマったので、メモしておく。
目次
【0】関連するAPI 1)DynamicFrameReader.from_options 【1】サンプル 例1:CSVからParquetへ変換 【2】DynamicFrame あれこれ 1)s3 の指定パスから再帰的にファイルを取得するには 2)データ数の確認 3)データの表示
【0】関連するAPI
1)DynamicFrameReader.from_options
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format.html
【1】サンプル
例1:CSVからParquetへ変換
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # ジョブ初期化 args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark_context = SparkContext() glue_context = GlueContext(spark_context) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) # from_options関数で、「s3://your-s3-bucket-name/inputs/」配下の # 全ファイルをCSVファイル(区切り文字「|」)として読み込む # See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options dynamic_frame = glue_context.create_dynamic_frame.from_options( connection_type="s3", # See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect.html connection_options={ "paths": ["s3://your-s3-bucket-name/inputs/"], "recurse": True }, format="csv", # See https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format.html format_options={ "separator": "|", "withHeader": True, "quoteChar": '"', "multiLine": True } ) if dynamic_frame.count() == 0: print("No data...") else: # データの情報を表示 print('Count: {0}'.format(dynamic_frame.count())) dynamic_frame.printSchema() # データの中身を表示 dynamic_frame.show(dynamic_frame.count()) # Apache Spark DataFrame に変換 data_frame = dynamic_frame.toDF() # Parquet形式でファイル出力(書き込み処理) data_frame.write \ .mode("overwrite") \ .format("parquet") \ .option("compression", "snappy") \ .save("s3://your-s3-bucket-name/outputs/") # ジョブコミット job.commit()
【2】DynamicFrame あれこれ
1)s3 の指定パスから再帰的にファイルを取得するには
* この指定を間違ったので、えらいハマった。。。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect.html
"connectionType": "s3" より抜粋 ~~~~~~~~ "recurse": (オプション) true に設定した場合は、 指定したパスの下にあるすべてのサブディレクトリ内の ファイルを再帰的に読み取ります。 ~~~~~~~~ cf. recurse(リカース) = 再帰, 再帰的に処理する
2)データ数の確認
* dynamic_frame.count() を使う
3)データの表示
* dynamic_frame.show() を使う => toDF()して、PySparkのDataFrameに変換すれば、 DataFrame.show(n=20, truncate=True, vertical=False) のように指定できる => PySparkのDataFrameの詳細は、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/04/26/161342
参考文献
https://dev.classmethod.jp/articles/convert-tsv-to-parquet-with-glue-spark-job/
関連記事
AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / ジョブ ~
https://dk521123.hatenablog.com/entry/2019/11/17/231505
PySpark ~ DataFrame / show() ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
Python で Parquet を扱う
https://dk521123.hatenablog.com/entry/2021/11/13/095519