【Scala】Scala ~ AWS SDK / S3サンプル ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/03/24/211033

の続き。

今回は、Scalaで、AWS SDK for Java を使ったS3のサンプル集

目次

【0】公式のサンプル
【1】build.sbt
【2】S3をハンドリングする共通処理

【0】公式のサンプル

https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/javav2/example_code/s3/src/main/java/com/example/s3/S3ObjectOperations.java

【1】build.sbt

scalaVersion := "2.12.17"

name := "hello-world"
organization := "ch.epfl.scala"
version := "1.0"

libraryDependencies ++= Seq(
  "org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1",
  "org.yaml" % "snakeyaml" % "1.33",
  "software.amazon.awssdk" % "s3" % "2.20.30",
)

【2】S3をハンドリングする共通処理

 以下の関連記事のように、
LocalStackを使って、ローカル上でAWS開発できた場合にも
合わせて実装する

https://dk521123.hatenablog.com/entry/2023/03/25/021432
https://dk521123.hatenablog.com/entry/2023/04/02/172851

import scala.util.control.Breaks._
import scala.collection.JavaConversions._
import scala.reflect._

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response
import software.amazon.awssdk.services.s3.model.S3Object
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider

import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor

import java.io.File
import java.io.ByteArrayInputStream
import java.net.URI
import java.nio.charset.StandardCharsets


final case class S3Utils(
  val region: Region = Region.US_WEST_2,
  val isDev: Boolean = false
) {
  val s3Client = createS3Client(region, isDev)

  private def createS3Client(
    region: Region = Region.US_WEST_2,
    isDev: Boolean = false
  ): S3Client = {
    if (isDev) {
      val accessKey = "dummy"
      val secretAccessKey = "dummy"
      val endpoint = "http://localhost:4566"
      val credentials = AwsBasicCredentials.create(accessKey, secretAccessKey)

      S3Client.builder()
        .region(region)
        .credentialsProvider(StaticCredentialsProvider.create(credentials))
        .endpointOverride(new URI(endpoint))
        .forcePathStyle(true)
        .build()
    } else {
      S3Client.builder().region(region).build()
    }
  }

  def getFileList(bucketName: String, s3Key: String): Seq[S3FileObject] = {
    // See https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/ListObjectsV2Request.html
    var listObjectsReqManual = ListObjectsV2Request.builder()
      .bucket(bucketName)
      .prefix(s3Key)
      .build()

    var fileList = Seq[S3FileObject]()
    breakable {
      while (true) {
        // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
        val listObjResponse = s3Client.listObjectsV2(listObjectsReqManual)

        // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/S3Object.html
        for (content <- listObjResponse.contents()) {
          fileList = fileList :+ new S3FileObject(
            bucketName,
            content.key,
            content.size
          )
        }
        val nextToken = listObjResponse.nextContinuationToken()
        if (nextToken == null) {
          // Done
          break
        }
        // Set next token and get next list
        listObjectsReqManual = listObjectsReqManual.toBuilder()
          .continuationToken(nextToken)
          .build()
      }
    }
    fileList
  }

  def readYaml[T: ClassTag](bucketName: String, s3Key: String): T = {
    val fileContentAsBytes = getS3FileBytes(bucketName, s3Key)
    readYaml[T](fileContentAsBytes)
  }

  def readYaml[T: ClassTag](inputPath: Array[Byte]): T = {
    val reader = new ByteArrayInputStream(inputPath)
    try {
      // e.g. "class ProcessInfo"
      val targetClass = classTag[T].runtimeClass.toString()
      // e.g. "class ProcessInfo" -> "ProcessInfo"
      val className = targetClass.replaceFirst("class ", "")

      val inputYaml = new Yaml(new Constructor(className))
      return inputYaml.load(reader).asInstanceOf[T]
    } finally {
      reader.close()
    }
  }

  // https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/example_s3_GetObject_section.html  
  def getS3FileBytes(bucketName: String, s3Key: String): Array[Byte] = {
    val objectRequest = GetObjectRequest
      .builder()
      .key(s3Key)
      .bucket(bucketName)
      .build()
    val objectBytes = s3Client.getObjectAsBytes(objectRequest)
    objectBytes.asByteArray()
  }

  def getS3FileContent(bucketName: String, s3Key: String, charset: Charset = StandardCharsets.UTF_8): String = {
    val fileContentAsByteArray = getS3FileBytes(bucketName, s3Key)
    // byte array to String
    //  See https://www.baeldung.com/scala/convert-byte-array-to-string  
    new String(fileContentAsByteArray, charset)
  }

  /**
    * To delete a s3 object.
    *
    * @param bucketName s3 bucket name (e.g. 'local-demo-bucket')
    * @param s3KeyForFilePath s3 bucket file path (e.g. "sample/hi.txt")
    */
  def deleteObject(bucketName: String, s3KeyForFilePath: String): Unit = {
    val deleteObjectRequest = DeleteObjectRequest.builder()
      .bucket(bucketName)
      .key(s3KeyForFilePath)
      .build()
    s3Client.deleteObject(deleteObjectRequest)
  }
  /**
    * To delete s3 objects.
    *
    * @param bucketName s3 bucket name (e.g. 'local-demo-bucket')
    * @param s3KeyForPath s3 bucket path (e.g. "sample/")
    */
  def deleteObjects(bucketName: String, s3KeyForPath: String): Unit = {
    val fileObjectList = getFileList(bucketName, s3KeyForPath)
    for (fileObject <- fileObjectList) {
      deleteObject(fileObject.bucketName, fileObject.s3Key)
    }
  }
}

case class S3FileObject(
  bucketName: String, s3Key: String, fileSize: Long
  ) {
    def getFullPath : String = s"s3://${bucketName}/${s3Key}"
}

ProcessInfo.scala (YAML用クラス)

import scala.beans.BeanProperty

import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor

class ProcessInfo {
  @BeanProperty var processName = ""
  @BeanProperty var schedule = ""
  @BeanProperty var emailListToNotify =
    new java.util.ArrayList[String]()
  @BeanProperty var subProcesses =
    new java.util.HashMap[String, java.util.HashMap[String, Any]]()
  override def toString: String =
    s"processName : $processName , schedule: $schedule, emailListToNotify : $emailListToNotify , subProcesses : $subProcesses "
  def getSubProcessNames: java.util.ArrayList[String] = {
    new java.util.ArrayList[String](subProcesses.keySet())
  }
}

呼び出し側

import scala.collection.JavaConversions._
import scala.reflect._

import java.net.URI

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider

object Main extends App {
  val region = Region.US_WEST_1
  val bucketName = "local-demo-bucket"
  val s3Key = "sample/"

  val s3 = S3Utils(region, true)
  val fileObjects = s3.getFileList(bucketName, s3Key)

  for (fileObject <- fileObjects) {
    println(s"bucketName = ${fileObject.bucketName}")
    println(s"s3Key = ${fileObject.s3Key}")
    println(s"fileSize = ${fileObject.fileSize}")
    println(s"Full Path = ${fileObject.getFullPath}")
  }

  val processInfo = s3.readYaml[ProcessInfo](bucketName, s"${s3Key}demo.yaml")
    println(s"processName = ${processInfo.processName}")
    println(s"schedule = ${processInfo.schedule}")
    for (email <-processInfo.emailListToNotify) {
      println(s"email = ${email}")
    }
    for (subProcess <-processInfo.subProcesses) {
      println(s"subProcess = ${subProcess}")
    }
    for (subProcessName <-processInfo.getSubProcessNames) {
      println(s"subProcessName = ${subProcessName}")
    }
    println(s"toString = ${processInfo.toString}")

  // Delete
  s3.deleteObject(bucketName, "sample/hi.txt")
  s3.deleteObjects(bucketName, "sample/")
}

demo.yaml

processName: processA
schedule: 10:00-12:00
emailListToNotify:
  - sample@gmail
  - sample@yahoo.com
subProcesses:
  sub-processA1:
    inputPath: s3://your-bucket/sub-processA1/input/
    outputPath: s3://your-bucket/sub-processA1/output/
  sub-processA2:
    inputPath: s3://your-bucket/sub-processA2/input/
    outputPath: s3://your-bucket/sub-processA2/output/
  sub-processA3:
    inputPath: s3://your-bucket/sub-processA3/input/
    outputPath: s3://your-bucket/sub-processA3/output/

関連記事

ScalaAWS SDK
https://dk521123.hatenablog.com/entry/2023/03/24/211033
ScalaAWS SDK / Secrets Managerサンプル ~
https://dk521123.hatenablog.com/entry/2023/04/03/012600
ScalaAWS SDK / SESサンプル ~
https://dk521123.hatenablog.com/entry/2023/04/16/003338
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Scala ~ 基本編 / 繰り返し ~
https://dk521123.hatenablog.com/entry/2023/01/24/000000
Scala ~ 基本編 / Option型 ~
https://dk521123.hatenablog.com/entry/2023/03/09/000000
Scala ~ 基本編 / メソッド ~
https://dk521123.hatenablog.com/entry/2023/03/03/000000
Scala ~ 基本編 / クラス ~
https://dk521123.hatenablog.com/entry/2023/03/14/000857
Scala ~ 基本編 / コレクション ~
https://dk521123.hatenablog.com/entry/2023/03/13/000345
Scala ~ 基本編 / 日付・日時 ~
https://dk521123.hatenablog.com/entry/2023/03/08/000000
Scala ~ 基本編 / 正規表現
https://dk521123.hatenablog.com/entry/2023/03/18/034704
Scala ~ 基本編 / ジェネリック
https://dk521123.hatenablog.com/entry/2023/03/21/003817
Scala ~ ファイル名・パスの扱い ~
https://dk521123.hatenablog.com/entry/2023/03/11/000000
Scala ~ ファイルハンドリング ~
https://dk521123.hatenablog.com/entry/2023/01/03/000000
ScalaYAML
https://dk521123.hatenablog.com/entry/2023/03/16/012034
JavaでEmail ~ JavaMail / Text ~
https://dk521123.hatenablog.com/entry/2016/07/16/222422
JavaでEmail ~ JavaMail / 添付ファイル ~
https://dk521123.hatenablog.com/entry/2016/07/17/023459
JavaでEmail ~ SMTP認証 ~
https://dk521123.hatenablog.com/entry/2016/11/07/215251
JavaでEmail ~ SMTP認証 / DIGEST-MD5
https://dk521123.hatenablog.com/entry/2016/12/07/222229
JavaでEmail ~ JavaMail / TLS
https://dk521123.hatenablog.com/entry/2017/05/03/163219
JavaでEmail ~ JavaMail / Return-Path・Errors-To ~
https://dk521123.hatenablog.com/entry/2017/05/07/000344
Amazon SES ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon S3 ~ Boto3編 ~
https://dk521123.hatenablog.com/entry/2019/10/21/230004
Amazon S3 ~ Boto3でファイル存在チェック ~
https://dk521123.hatenablog.com/entry/2022/02/26/182526
AWS Glue ~ Scalaでの実装 ~
https://dk521123.hatenablog.com/entry/2023/03/17/000000
Docker compose ~ LocalStack/Glue4.0 ~
https://dk521123.hatenablog.com/entry/2023/03/25/021432
AWS Glue ~ ローカル環境を作成する / Glue v3.0版 ~
https://dk521123.hatenablog.com/entry/2022/01/31/165650
LocalStack ~ ローカルで疑似AWSを作成する ~
https://dk521123.hatenablog.com/entry/2019/12/14/010524
LocalStack ~ ローカルで疑似Lambda/S3/DynamoDBを作成する ~
https://dk521123.hatenablog.com/entry/2019/12/16/231149