【AWS】Glue Job から パーティションを更新することを考える

■ はじめに

https://dk521123.hatenablog.com/entry/2021/05/14/095125

の続き。

Glue Job からパーティション更新を行うことを考える。

目次

■ Job からパーティション更新実装案

■ 案1)GlueContext クラスを駆使して実装する
 方法 1:write_dynamic_frame_from_catalogで enableUpdateCatalog と partitionKeys を渡す
 方法2:getSinkからsinkオブジェクトを取得しsetCatalogInfoで指定する

■ 案2)boto3 / Data Wrangler API を駆使して実装する
 1)パーティション関連の boto3 API
 2)パーティション関連の Data Wrangler API

■ Job からパーティション更新実装案

以下を考えてみた。

案1)GlueContext クラスを駆使して実装する
案2)boto3 / Data Wrangler API を駆使して実装する

■ 案1)GlueContext クラスを駆使して実装する

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/update-from-job.html

に記載されている。

なお、以下のサイトも一読してみるといい。
例えば、Dynamic frameは、Append(追加)のみで、
Overwrite(上書き)は未サポートでその解決策を記載しているところとか。

https://yomon.hatenablog.com/entry/04/glueoverwrite

方法 1:write_dynamic_frame_from_catalogで enableUpdateCatalog と partitionKeys を渡す

以下の公式サイトにも、
数例に対して、write_dynamic_frame_from_catalogの呼び出し方が載っている。

https://aws.amazon.com/jp/premiumsupport/knowledge-center/glue-job-specific-s3-partition/
サンプル

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job_name = args['JOB_NAME']

print(f"Start {job_name}!!")

spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

job = Job(glue_context)
job.init(job_name, args)

# create glue dynamicframe
# Ref to https://www.reddit.com/r/aws/comments/e20ggy/writing_from_dynamicframe_gluepyspark_to_s3_is/  
dynamic_frame= glue_context.create_dynamic_frame_from_options(
  connection_type="s3",
  connection_options={"paths": ["s3://your-bucket-name/xxx/your-folder"]},
  format="csv",
  format_options={
    "withHeader": True,
    "separator": ","
  }
)
# get row count
print("Count from S3: ", dynamic_frame.toDF().count())

# ★ここから★
additional_options = {"enableUpdateCatalog": True}
additional_options["partitionKeys"] = ["year", "month", "day"]

sink = glue_context.write_dynamic_frame_from_catalog(
  frame=dynamic_frame,
  database=<target_db_name>,
  table_name=<target_table_name>,
  transformation_ctx="write_sink",
  additional_options=additional_options)
# ★ここまで★

job.commit()

方法2:getSinkからsinkオブジェクトを取得しsetCatalogInfoで指定する

sink = glue_context.getSink(
  connection_type="s3",
  path="<S3_output_path>",
  enableUpdateCatalog=True,
  partitionKeys=["year", "month", "day"])

sink.setFormat("csv")

sink.setCatalogInfo(
  catalogDatabase=<target_db_name>,
  catalogTableName=<target_table_name>)

sink.writeFrame(dynamic_frame)

■ 案2)boto3 / Data Wrangler API を駆使して実装する

boto3 API もしくは、 Data Wrangler API に使って
パーティションを更新する。

なお、boto3 API および Data Wrangler API については、以下の関連記事を参照のこと。

AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
AWS Data Wrangler ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/28/174316

1)パーティション関連の boto3 API

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#client

より一部抜粋

* batch_create_partition
 (create_partition もあるが、こっちは1パーティションのみ)
* create_partition_index
* update_column_statistics_for_partition
* update_table

etc...

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_partition_index
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.update_column_statistics_for_partition

* 以下に一部を記載。

AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458

2)パーティション関連の Data Wrangler API

https://aws-data-wrangler.readthedocs.io/en/2.7.0/api.html#aws-glue-catalog

より一部抜粋

* add_csv_partitions
* upsert_table_parameters

etc...

https://aws-data-wrangler.readthedocs.io/en/2.7.0/stubs/awswrangler.catalog.add_csv_partitions.html#awswrangler.catalog.add_csv_partitions
https://aws-data-wrangler.readthedocs.io/en/2.7.0/stubs/awswrangler.catalog.upsert_table_parameters.html#awswrangler.catalog.upsert_table_parameters

参考文献

https://dev.classmethod.jp/articles/aws-glue-now-supports-the-ability-to-update-partitions-from-glue-spark-etl-jobs/
https://zenn.dev/nagomiso/articles/8033c0f8d01a54791498

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / クローラ ~
https://dk521123.hatenablog.com/entry/2019/12/01/003455
AWS Glue ~ Boto3 / クローラ編 ~
https://dk521123.hatenablog.com/entry/2021/04/16/135558
AWS Glue ~ Boto3 / パーティション操作編 ~
https://dk521123.hatenablog.com/entry/2021/06/09/113458
Glue から DataCatalogテーブル に対して Spark SQLを実行する
https://dk521123.hatenablog.com/entry/2021/05/11/220731
Hive / HiveQL ~ 基本編 / テーブル作成 ~
https://dk521123.hatenablog.com/entry/2020/11/03/000000
AWS Data Wrangler ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/28/174316