■ はじめに
AWS Glue は、3年以上使っていて、その際は、PythonでGlue jobを記述していた。 今、行っている仕事で、Scalaを使ってPoC(Proof of Concept:概念実証) をすることになったので、調べてみた。
別件:Glue4.0
ってゆーかーいつの間にGlue4.0がリリースされている。 詳細は、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2023/03/15/000000
目次
【1】Glueで使えるScalaバージョン 1)補足:公式ドキュメントの記載 【2】Pythonとの比較 【3】公式ドキュメント 【4】Scala Jobの設定 1)Scala class name 2)Jar lib path 【5】テンプレート 【6】サンプル 1)S3 to S3 with AWS API 2)S3 to S3 with Spark API 【7】補足:Quick start 【注意点】修正が必要な部分
【1】Glueで使えるScalaバージョン
* Sparkのバージョンに依存する => 最新でも、Scala 2.12/2.13 => 現状、Scala 3.2.2まででているが、Scala 3.Xは対応していない
Glue version | Spark version | Scala version | Related Link |
---|---|---|---|
Glue4.0 | Apache Spark 3.3.0 | Scala 2.12 | https://spark.apache.org/docs/latest/ |
Glue3.0 | Apache Spark 3.1.1 | Scala 2.12 | https://spark.apache.org/docs/3.1.1/ |
1)補足:公式ドキュメントの記載
* Sparkのバージョンは明記しているのだが、 Scalaのバージョンの記載はあいまい。 ただし、以下の記載から「Scala 2.12」そう
Glue4.0
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/migrating-version-40.html
より抜粋 ~~~~~~~~ AWS Glue 1.0 から AWS Glue 4.0 への移行 Scala も 2.11 から 2.12 に更新されており、<< ここに「2.12」 Scala 2.12 には Scala 2.11 との下位互換性がありません。 ~~~~~~~~
Glue3.0
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/migrating-version-30.html
より抜粋 ~~~~~~~~ 移行チェックリスト ジョブが Scala 2.11 に依存していますか。 AWS Glue 3.0 は Scala 2.12 を使用するため、 << ここで明記... (ちゃんと書くとこ考えよ、、、) ライブラリが Scala 2.11 に依存している場合は、 Scala 2.12 でライブラリを再構築する必要があります。 ~~~~~~~~
【2】Pythonとの比較
https://www.casleyconsulting.co.jp/blog/engineer/6372/
より、多少Scalaの方が早いらしい。 => Python(PySpark)は、所詮、Scalaコードのラッパーなので、 そりゃーそうだなって感じ。
【3】公式ドキュメント
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-scala.html
1)主なAPI
AWS Glue Scala GlueContext API
* getSourceWithFormat, getSinkWithFormat などを使う
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html
AWS Glue Scala DynamicFrame クラス
* applyMapping などを使う
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/glue-etl-scala-apis-glue-dynamicframe-class.html
【4】Scala Jobの設定
* Python とは、Job設定が異なる点がある
1)Scala class name
* パッケージ名を含むクラス名を指定
例
// 以下の場合「Scala class name: com.aws.HelloWorld」 package com.aws object GlueApp { // ... }
2)Jar lib path
* Jar がある場合、そのJarのパスを指定 * e.g. s3://your-bucket/hello/scripts/utils.jar
【5】テンプレート
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // ★ここに自分の実装を書く★ Job.commit() } }
【6】サンプル
* Scala に慣れていない場合、GlueのWeb UIで自動生成してもいいかも
1)S3 to S3 with AWS API
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import java.util.Calendar import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ import sparkSession.implicits._ object streamJoiner { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read val inputDynamicFrame = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://your-s3-bucket/nycflights.csv"]}"""), transformationContext = "inputDatasource", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame() val inputMapping = inputDynamicFrame.applyMapping( mappings : Seq( ("id", "string", "name", "string"), ("name", "string", "name", "string"), ("age", "int", "age", "int") ), caseSensitive=true, transformationContex="inputDatasource" ) // Write val outputDataFrame = glueContext.getSinkWithFormat( connectionType="s3", options = JsonOptions("""{"path": "s3://my-bucket/tmp", "partitionKeys": []}"""), transformationContext = "outputDataFrame", format = "parquet" ).writeDynamicFrame(inputMapping) Job.commit() } }
2)S3 to S3 with Spark API
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.log.GlueLogger import com.amazonaws.services.glue.util.Job import java.util.Calendar import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.lit import org.apache.spark.sql.DataFrame import scala.collection.JavaConverters._ object streamJoiner { def main(sysArgs: Array[String]) { val logger = new GlueLogger val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read val inputDataFrame = spark.read .option("header", "true").csv("s3://your-bucket/xxxx/input/") if (inputDataFrame.count == 0) { logger.warn("Zero file...") return } logger.info(s"Count = ${inputDataFrame.count}") // Add column 'insert_date' val today = "%tY/%<tm/%<td" format new Date inputDataFrame.withColumn("insert_date", lit(today)) // Write inputDataFrame.write.mode(SaveMode.Overwrite) .format("parquet").partitionBy("insert_date").option("partitionOverwriteMode", "dynamic") .save("s3://your-bucket/xxxx/output/") Job.commit() } }
【7】補足:Quick start
* 以下からCloneするといい感じだが、一部バグあり。
https://github.com/jhole89/aws-glue-sbt-quickstart
【注意点】修正が必要な部分
* 2か所、修正が必要。
// protected def run(myArg: String): Unit protected def run(myArgs: Map[String, String]): Unit
// override def run(myArg: String): Unit = { override def run(myArgs: Map[String, String]): Unit = {
オプション
* 以下も直していいかも
https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/build.sbt#L39
"com.amazonaws" % "AWSGlueETL" % "4.0.0" % Provided, // Glueバージョン「4.0.0」に変更
https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/build.sbt#L4
scalaVersion in ThisBuild := "2.12.17" // Glueバージョンに合うScalaバージョンに変更
参考文献
https://dev.classmethod.jp/articles/aws-glue-using-scala/
https://sparkbyexamples.com/spark/spark-dataframe-withcolumn/
■ 関連記事
AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / ジョブ ~
https://dk521123.hatenablog.com/entry/2019/11/17/231505
AWS Glue ~ Glue Version 2.0 ~
https://dk521123.hatenablog.com/entry/2020/08/19/130118
AWS Glue ~ Glue Version 3.0 ~
https://dk521123.hatenablog.com/entry/2021/09/18/232556
AWS Glue ~ Glue Version 4.0 ~
https://dk521123.hatenablog.com/entry/2023/03/15/000000
AWS Glue ~ ジョブパラメータ ~
https://dk521123.hatenablog.com/entry/2022/09/28/105558
Hive / HiveQL ~ HiveQL関数 / NULL関連編 ~
https://dk521123.hatenablog.com/entry/2021/06/22/213241
Hive / HiveQL ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/06/232350
SparkからSnowflakeへの接続について考える
https://dk521123.hatenablog.com/entry/2023/03/19/013833
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
Docker compose ~ LocalStack/Glue4.0 ~
https://dk521123.hatenablog.com/entry/2023/03/25/021432
Terraform ~ AWS Glue ~
https://dk521123.hatenablog.com/entry/2023/04/08/220411