【AWS】AWS Glue ~ Scalaでの実装 ~

■ はじめに

AWS Glue は、3年以上使っていて、その際は、PythonでGlue jobを記述していた。
今、行っている仕事で、Scalaを使ってPoC(Proof of Concept:概念実証)
をすることになったので、調べてみた。

別件:Glue4.0

ってゆーかーいつの間にGlue4.0がリリースされている。
詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2023/03/15/000000

目次

【1】Glueで使えるScalaバージョン
 1)補足:公式ドキュメントの記載
【2】Pythonとの比較
【3】公式ドキュメント
【4】Scala Jobの設定
 1)Scala class name
 2)Jar lib path
【5】テンプレート
【6】サンプル
 1)S3 to S3 with AWS API
 2)S3 to S3 with Spark API
【7】補足:Quick start
 【注意点】修正が必要な部分

【1】Glueで使えるScalaバージョン

* Sparkのバージョンに依存する
 => 最新でも、Scala 2.12/2.13
 => 現状、Scala 3.2.2まででているが、Scala 3.Xは対応していない
Glue version Spark version Scala version Related Link
Glue4.0 Apache Spark 3.3.0 Scala 2.12 https://spark.apache.org/docs/latest/
Glue3.0 Apache Spark 3.1.1 Scala 2.12 https://spark.apache.org/docs/3.1.1/

1)補足:公式ドキュメントの記載

* Sparkのバージョンは明記しているのだが、
 Scalaのバージョンの記載はあいまい。
 ただし、以下の記載から「Scala 2.12」そう

Glue4.0
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/migrating-version-40.html

より抜粋
~~~~~~~~
AWS Glue 1.0 から AWS Glue 4.0 への移行

Scala も 2.11 から 2.12 に更新されており、<< ここに「2.12」
Scala 2.12 には Scala 2.11 との下位互換性がありません。
~~~~~~~~

Glue3.0
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/migrating-version-30.html

より抜粋
~~~~~~~~
移行チェックリスト
ジョブが Scala 2.11 に依存していますか。

AWS Glue 3.0 は Scala 2.12 を使用するため、 << ここで明記... (ちゃんと書くとこ考えよ、、、)
ライブラリが Scala 2.11 に依存している場合は、
Scala 2.12 でライブラリを再構築する必要があります。
~~~~~~~~

【2】Pythonとの比較

https://www.casleyconsulting.co.jp/blog/engineer/6372/

より、多少Scalaの方が早いらしい。
 => Python(PySpark)は、所詮、Scalaコードのラッパーなので、
  そりゃーそうだなって感じ。

【3】公式ドキュメント

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-scala.html

1)主なAPI

AWS Glue Scala GlueContext API

* getSourceWithFormat, getSinkWithFormat などを使う

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html
AWS Glue Scala DynamicFrame クラス

* applyMapping などを使う

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/glue-etl-scala-apis-glue-dynamicframe-class.html

【4】Scala Jobの設定

* Python とは、Job設定が異なる点がある

1)Scala class name

* パッケージ名を含むクラス名を指定

// 以下の場合「Scala class name: com.aws.HelloWorld」

package com.aws

object GlueApp {
  // ...
}

2)Jar lib path

* Jar がある場合、そのJarのパスを指定
* e.g. s3://your-bucket/hello/scripts/utils.jar

【5】テンプレート

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // ★ここに自分の実装を書く★

    Job.commit()
  }
}

【6】サンプル

* Scala に慣れていない場合、GlueのWeb UIで自動生成してもいいかも

1)S3 to S3 with AWS API

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import java.util.Calendar
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
import sparkSession.implicits._

object streamJoiner {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Read
    val inputDynamicFrame = glueContext.getSourceWithFormat(
        connectionType="s3",
        options =JsonOptions(s"""{"paths": [ "s3://your-s3-bucket/nycflights.csv"]}"""),
        transformationContext = "inputDatasource", 
        format = "csv",
        formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""")
    ).getDynamicFrame()

    val inputMapping = inputDynamicFrame.applyMapping(
        mappings : Seq(
            ("id", "string", "name", "string"),
            ("name", "string", "name", "string"),
            ("age", "int", "age", "int")
        ),
        caseSensitive=true,
        transformationContex="inputDatasource"
    )

    // Write
    val outputDataFrame = glueContext.getSinkWithFormat(
        connectionType="s3",
        options = JsonOptions("""{"path": "s3://my-bucket/tmp", "partitionKeys": []}"""),
        transformationContext = "outputDataFrame",
        format = "parquet"
    ).writeDynamicFrame(inputMapping)

    Job.commit()
  }
}

2)S3 to S3 with Spark API

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.log.GlueLogger
import com.amazonaws.services.glue.util.Job
import java.util.Calendar
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame
import scala.collection.JavaConverters._

object streamJoiner {
  def main(sysArgs: Array[String]) {
    val logger = new GlueLogger
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Read
    val inputDataFrame = spark.read
      .option("header", "true").csv("s3://your-bucket/xxxx/input/")
    if (inputDataFrame.count == 0) {
      logger.warn("Zero file...")
      return
    }
    logger.info(s"Count = ${inputDataFrame.count}")

    // Add column 'insert_date'
    val today = "%tY/%<tm/%<td" format new Date
    inputDataFrame.withColumn("insert_date", lit(today))

    // Write
    inputDataFrame.write.mode(SaveMode.Overwrite)
      .format("parquet").partitionBy("insert_date").option("partitionOverwriteMode", "dynamic")
      .save("s3://your-bucket/xxxx/output/")

    Job.commit()
  }
}

【7】補足:Quick start

* 以下からCloneするといい感じだが、一部バグあり。

https://github.com/jhole89/aws-glue-sbt-quickstart

【注意点】修正が必要な部分

* 2か所、修正が必要。

https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/shared/src/main/scala/shared/BaseGlueScript.scala#L21

// protected def run(myArg: String): Unit
protected def run(myArgs: Map[String, String]): Unit

https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/scripts/src/main/scala/scripts/Script.scala#L9

// override def run(myArg: String): Unit = {
override def run(myArgs: Map[String, String]): Unit = {

オプション

* 以下も直していいかも

https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/build.sbt#L39

  "com.amazonaws" % "AWSGlueETL" % "4.0.0" % Provided, // Glueバージョン「4.0.0」に変更

https://github.com/jhole89/aws-glue-sbt-quickstart/blob/master/build.sbt#L4

scalaVersion in ThisBuild := "2.12.17" // Glueバージョンに合うScalaバージョンに変更

参考文献

https://dev.classmethod.jp/articles/aws-glue-using-scala/
https://sparkbyexamples.com/spark/spark-dataframe-withcolumn/

■ 関連記事

AWS Glue ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/01/221926
AWS Glue ~ 基本編 / ジョブ ~
https://dk521123.hatenablog.com/entry/2019/11/17/231505
AWS Glue ~ Glue Version 2.0 ~
https://dk521123.hatenablog.com/entry/2020/08/19/130118
AWS Glue ~ Glue Version 3.0 ~
https://dk521123.hatenablog.com/entry/2021/09/18/232556
AWS Glue ~ Glue Version 4.0 ~
https://dk521123.hatenablog.com/entry/2023/03/15/000000
AWS Glue ~ ジョブパラメータ ~
https://dk521123.hatenablog.com/entry/2022/09/28/105558
Hive / HiveQL ~ HiveQL関数 / NULL関連編 ~
https://dk521123.hatenablog.com/entry/2021/06/22/213241
Hive / HiveQL ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/06/232350
SparkからSnowflakeへの接続について考える
https://dk521123.hatenablog.com/entry/2023/03/19/013833
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
Docker compose ~ LocalStack/Glue4.0 ~
https://dk521123.hatenablog.com/entry/2023/03/25/021432
Terraform ~ AWS Glue ~
https://dk521123.hatenablog.com/entry/2023/04/08/220411