■ はじめに
https://dk521123.hatenablog.com/entry/2021/05/14/095125
の続き。 Glue Job からパーティション更新を行うことを考える。
目次
■ Job からパーティション更新実装案 ■ 案1)GlueContext クラスを駆使して実装する 方法 1:write_dynamic_frame_from_catalogで enableUpdateCatalog と partitionKeys を渡す 方法2:getSinkからsinkオブジェクトを取得しsetCatalogInfoで指定する ■ 案2)boto3 / Data Wrangler API を駆使して実装する 1)パーティション関連の boto3 API 2)パーティション関連の Data Wrangler API
■ Job からパーティション更新実装案
以下を考えてみた。 案1)GlueContext クラスを駆使して実装する 案2)boto3 / Data Wrangler API を駆使して実装する
■ 案1)GlueContext クラスを駆使して実装する
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/update-from-job.html
に記載されている。 なお、以下のサイトも一読してみるといい。 例えば、Dynamic frameは、Append(追加)のみで、 Overwrite(上書き)は未サポートでその解決策を記載しているところとか。
https://yomon.hatenablog.com/entry/04/glueoverwrite
方法 1:write_dynamic_frame_from_catalogで enableUpdateCatalog と partitionKeys を渡す
以下の公式サイトにも、 数例に対して、write_dynamic_frame_from_catalogの呼び出し方が載っている。
https://aws.amazon.com/jp/premiumsupport/knowledge-center/glue-job-specific-s3-partition/
サンプル
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) job_name = args['JOB_NAME'] print(f"Start {job_name}!!") spark_context = SparkContext() glue_context = GlueContext(spark_context) spark = glue_context.spark_session job = Job(glue_context) job.init(job_name, args) # create glue dynamicframe # Ref to https://www.reddit.com/r/aws/comments/e20ggy/writing_from_dynamicframe_gluepyspark_to_s3_is/ dynamic_frame= glue_context.create_dynamic_frame_from_options( connection_type="s3", connection_options={"paths": ["s3://your-bucket-name/xxx/your-folder"]}, format="csv", format_options={ "withHeader": True, "separator": "," } ) # get row count print("Count from S3: ", dynamic_frame.toDF().count()) # ★ここから★ additional_options = {"enableUpdateCatalog": True} additional_options["partitionKeys"] = ["year", "month", "day"] sink = glue_context.write_dynamic_frame_from_catalog( frame=dynamic_frame, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additional_options) # ★ここまで★ job.commit()
方法2:getSinkからsinkオブジェクトを取得しsetCatalogInfoで指定する
sink = glue_context.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["year", "month", "day"]) sink.setFormat("csv") sink.setCatalogInfo( catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(dynamic_frame)
■ 案2)boto3 / Data Wrangler API を駆使して実装する
boto3 API もしくは、 Data Wrangler API に使って パーティションを更新する。 なお、boto3 API および Data Wrangler API については、以下の関連記事を参照のこと。
AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
AWS Data Wrangler ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/28/174316
1)パーティション関連の boto3 API
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#client
より一部抜粋 * batch_create_partition (create_partition もあるが、こっちは1パーティションのみ) * create_partition_index * update_column_statistics_for_partition * update_table etc...
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_partition_index
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.update_column_statistics_for_partition
* 以下に一部を記載。
AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
2)パーティション関連の Data Wrangler API
https://aws-data-wrangler.readthedocs.io/en/2.7.0/api.html#aws-glue-catalog
より一部抜粋 * add_csv_partitions * upsert_table_parameters etc...
https://aws-data-wrangler.readthedocs.io/en/2.7.0/stubs/awswrangler.catalog.add_csv_partitions.html#awswrangler.catalog.add_csv_partitions
https://aws-data-wrangler.readthedocs.io/en/2.7.0/stubs/awswrangler.catalog.upsert_table_parameters.html#awswrangler.catalog.upsert_table_parameters
参考文献
https://dev.classmethod.jp/articles/aws-glue-now-supports-the-ability-to-update-partitions-from-glue-spark-etl-jobs/
https://zenn.dev/nagomiso/articles/8033c0f8d01a54791498
関連記事
AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / クローラ ~
https://dk521123.hatenablog.com/entry/2019/12/01/003455
AWS Glue ~ Boto3 / クローラ編 ~
https://dk521123.hatenablog.com/entry/2021/04/16/135558
AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
Glue から DataCatalogテーブル に対して Spark SQLを実行する
https://dk521123.hatenablog.com/entry/2021/05/11/220731
Hive / HiveQL ~ 基本編 / テーブル作成 ~
https://dk521123.hatenablog.com/entry/2020/11/03/000000
AWS Data Wrangler ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/28/174316