【AWS】Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/08/26/193237

の続き。

AWS Glue の Spark Job で、Redshift にデータを追加することなどを考える

 また、前回も述べているが、
Redshiftは、PostgreSQLから派生したものなので、
PostgreSQLでも使える。

目次

【1】関連するAPI
 1)DynamicFrameWriter.from_jdbc_conf
 2)DynamicFrameReader.from_options
 3)DynamicFrameReader.from_catalog
【2】サンプル
 例1:Redshiftへデータを追加する
 例2:PostgreSQL からデータを取得する

【1】関連するAPI

1)DynamicFrameWriter.from_jdbc_conf

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer-from_jdbc_conf

# 戻り値は DynamicFrame クラス
DynamicFrameWriter.from_jdbc_conf(
  # 書き込む DynamicFrame
  frame,
  # 接続するGlue connection
  catalog_connection,
  # 接続オプション (パスやDB, Tableなど) (オプション)
  connection_options={},
  # 使用する Amazon Redshift の一時ディレクトリ (オプション)
  redshift_tmp_dir = "",
  # 使用する変換コンテキスト (オプション)
  transformation_ctx=""
)

戻り値:DynamicFrame クラス
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

2)DynamicFrameReader.from_options

* 指定された接続と形式で DynamicFrame を読み込む

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options

connection_type – 接続タイプ。
有効な値は、s3、mysql、postgresql、redshift、sqlserver、oracle、および dynamodb です。
 => っとあるので、他のDBでも使用できそう!

3)DynamicFrameReader.from_catalog

* 指定されたカタログから DynamicFrame を読み取る

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_catalog
push_down_predicate

# フィルター機能

# 例1:product_category=Video パーティション内のデータのみを処理
datasource0 = glueContext.create_dynamic_frame.from_catalog(
  database = "testdata",
  table_name = "sampletable",
  transformation_ctx = "datasource0",
  push_down_predicate = "(product_category == 'Video')"
)

# 例2:year=2019/month=08/day=02 パーティション内のデータのみを処理
datasource0 = glueContext.create_dynamic_frame.from_catalog(
  database = "testdata",
  table_name = "sampletable",
  transformation_ctx = "datasource0",
  push_down_predicate = "(year == '2019' and month == '08' and day == '02')"
)

# 例3:xxx/2019/07/03 パーティション内のデータのみを処理
datasource0 = glueContext.create_dynamic_frame.from_catalog(
  database = "testdata",
  table_name = "sampletable",
  transformation_ctx = "datasource0",
  push_down_predicate ="(partition_0 == '2019' and partition_1 == '07' and partition_2 == '03')"
)

【2】サンプル

例1:Redshiftへデータを追加する

* 以下の公式サイトのサンプルが参考になる

https://aws.amazon.com/jp/blogs/news/load-data-incrementally-and-optimized-parquet-writer-with-aws-glue/
https://aws.amazon.com/jp/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
glue_spark_job.py

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## Step1:初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

job = Job(glue_context)
job.init(args['JOB_NAME'], args)

## Step2:データ読み込み(データフレーム化)
# 今回は、クローラなどで外部参照できる形になっている場合
input_data_source = glue_context.create_dynamic_frame.from_catalog(
  database = "your_db",
  table_name = "your_raw_table",
  transformation_ctx = "input_data_source")

# データフレーム と 挿入するtable の項目が一致していない場合、その変換処理
input_data_mapped_data_frame = ApplyMapping.apply(
  frame = input_data_source,
  mappings = [("col0", "string", "id", "string"), ("col1", "string", "name", "string")],
  transformation_ctx = "input_data_mapped_data_frame")

## Step3:データフレームをRedshiftに挿入
input_data_sink = glue_context.write_dynamic_frame.from_jdbc_conf(
  frame = input_data_mapped_data_frame,
  catalog_connection = "redshift",
  connection_options = {"database": "redshift_db", "dbtable": "redshift_output_table"},
  redshift_tmp_dir= "s3://redshift_tmp_dir_path",
  transformation_ctx= "input_data_sink")

## Step4:後処理(コミット)
job.commit()

例2:PostgreSQL からデータを取得する

glue_spark_job.py

import sys

from pyspark.context import SparkContext
from pyspark.sql.functions import col

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

## Step1:初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

job = Job(glue_context)
job.init(args['JOB_NAME'], args)

## Step2:データ読み込み(データフレーム化)
connection_options = {
  "url": "jdbc:postgresql://localhost:5432/sample_db",
  "user": "postgres",
  "password": "password",
  "dbtable": "sample_table",
}

data_frame = glue_context.create_dynamic_frame.from_options(
  connection_type="postgresql",
  connection_options=connection_options
)

# Step3:データ取得(id="x0001" のデータを dict 形式で取得)
table_data = data_frame.toDF().filter(col('id') == "x0001").toPandas().to_dict(orient="records")
print(table_data)

参考文献

https://dev.classmethod.jp/articles/aws-glue-using-re dshift-etl/
https://qiita.com/oganyanATF/items/9d98662297da464c4715
https://qiita.com/pioho07/items/3a07cf6dccb8dfe046ff

関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Amazon Redshift ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/02/22/002139
Amazon Redshift ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2020/04/07/124519
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000