■ はじめに
https://dk521123.hatenablog.com/entry/2020/08/26/193237
の続き。 AWS Glue の Spark Job で、Redshift にデータを追加することなどを考える また、前回も述べているが、 Redshiftは、PostgreSQLから派生したものなので、 PostgreSQLでも使える。
目次
【1】関連するAPI 1)DynamicFrameWriter.from_jdbc_conf 2)DynamicFrameReader.from_options 3)DynamicFrameReader.from_catalog 【2】サンプル 例1:Redshiftへデータを追加する 例2:PostgreSQL からデータを取得する
【1】関連するAPI
1)DynamicFrameWriter.from_jdbc_conf
# 戻り値は DynamicFrame クラス DynamicFrameWriter.from_jdbc_conf( # 書き込む DynamicFrame frame, # 接続するGlue connection catalog_connection, # 接続オプション (パスやDB, Tableなど) (オプション) connection_options={}, # 使用する Amazon Redshift の一時ディレクトリ (オプション) redshift_tmp_dir = "", # 使用する変換コンテキスト (オプション) transformation_ctx="" )
戻り値:DynamicFrame クラス
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html
2)DynamicFrameReader.from_options
* 指定された接続と形式で DynamicFrame を読み込む
connection_type – 接続タイプ。 有効な値は、s3、mysql、postgresql、redshift、sqlserver、oracle、および dynamodb です。 => っとあるので、他のDBでも使用できそう!
3)DynamicFrameReader.from_catalog
* 指定されたカタログから DynamicFrame を読み取る
# フィルター機能 # 例1:product_category=Video パーティション内のデータのみを処理 datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "testdata", table_name = "sampletable", transformation_ctx = "datasource0", push_down_predicate = "(product_category == 'Video')" ) # 例2:year=2019/month=08/day=02 パーティション内のデータのみを処理 datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "testdata", table_name = "sampletable", transformation_ctx = "datasource0", push_down_predicate = "(year == '2019' and month == '08' and day == '02')" ) # 例3:xxx/2019/07/03 パーティション内のデータのみを処理 datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "testdata", table_name = "sampletable", transformation_ctx = "datasource0", push_down_predicate ="(partition_0 == '2019' and partition_1 == '07' and partition_2 == '03')" )
【2】サンプル
例1:Redshiftへデータを追加する
* 以下の公式サイトのサンプルが参考になる
https://aws.amazon.com/jp/blogs/news/load-data-incrementally-and-optimized-parquet-writer-with-aws-glue/
https://aws.amazon.com/jp/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
glue_spark_job.py
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## Step1:初期化 args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark_context = SparkContext() glue_context = GlueContext(spark_context) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) ## Step2:データ読み込み(データフレーム化) # 今回は、クローラなどで外部参照できる形になっている場合 input_data_source = glue_context.create_dynamic_frame.from_catalog( database = "your_db", table_name = "your_raw_table", transformation_ctx = "input_data_source") # データフレーム と 挿入するtable の項目が一致していない場合、その変換処理 input_data_mapped_data_frame = ApplyMapping.apply( frame = input_data_source, mappings = [("col0", "string", "id", "string"), ("col1", "string", "name", "string")], transformation_ctx = "input_data_mapped_data_frame") ## Step3:データフレームをRedshiftに挿入 input_data_sink = glue_context.write_dynamic_frame.from_jdbc_conf( frame = input_data_mapped_data_frame, catalog_connection = "redshift", connection_options = {"database": "redshift_db", "dbtable": "redshift_output_table"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path", transformation_ctx= "input_data_sink") ## Step4:後処理(コミット) job.commit()
例2:PostgreSQL からデータを取得する
glue_spark_job.py
import sys from pyspark.context import SparkContext from pyspark.sql.functions import col from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job ## Step1:初期化 args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark_context = SparkContext() glue_context = GlueContext(spark_context) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) ## Step2:データ読み込み(データフレーム化) connection_options = { "url": "jdbc:postgresql://localhost:5432/sample_db", "user": "postgres", "password": "password", "dbtable": "sample_table", } data_frame = glue_context.create_dynamic_frame.from_options( connection_type="postgresql", connection_options=connection_options ) # Step3:データ取得(id="x0001" のデータを dict 形式で取得) table_data = data_frame.toDF().filter(col('id') == "x0001").toPandas().to_dict(orient="records") print(table_data)
参考文献
https://dev.classmethod.jp/articles/aws-glue-using-re
dshift-etl/
https://qiita.com/oganyanATF/items/9d98662297da464c4715
https://qiita.com/pioho07/items/3a07cf6dccb8dfe046ff
関連記事
AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Amazon Redshift ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/02/22/002139
Amazon Redshift ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2020/04/07/124519
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000