【Spark】Spark ~ FutureAction ~

■ はじめに

Spark の FutureAction について、業務上でてきたので、
調べてみた。あんまり情報がないので、随時更新していく、、、

目次

【1】FutureAction に関する記述
【2】サンプル

【1】FutureAction に関する記述

http://mogile.web.fc2.com/spark/rdd-programming-guide.html

より抜粋
~~~~~~~~~~~~~~~~
Spark RDD API は foreachについてのforeachAsync のような、
いくつかのアクションの非同期バージョンを公開します。
これらはアクションの完了を遮ることなくただちにcallerにFutureAction を返します。
これはアクションの非同期実行を管理あるいは待つために使うことができます。
~~~~~~~~~~~~~~~~
 => え!?これだけ、、、

API仕様
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/FutureAction.html

org.apache.spark
Interface FutureAction<T> <= Interface でクラスじゃない

All Superinterfaces:
scala.concurrent.Awaitable<T>, scala.concurrent.Future<T>

All Known Implementing Classes:
ComplexFutureAction, SimpleFutureAction

Github
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala

import scala.concurrent._ # ここで「scala.concurrent.Future」してそ

/**
 * A future for the result of an action to support cancellation. This is an extension of the
 * Scala Future interface to support cancellation.
 */
trait FutureAction[T] extends Future[T] {

【2】サンプル

動作確認する上での前提条件

* 以下の関連記事などで環境を整える

Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450

例1:実験コード

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.FutureAction

object Main extends App {
  val spark = SparkSession.builder
    .master("local")
    .appName("Hello_World")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
  val sparkContext = spark.sparkContext

  val rdd = sparkContext.parallelize(Array(
    Row(1, "Mike", 45, "Sales"),
    Row(2, "Tom", 65, "IT"),
    Row(3, "Sam", 32, "Sales"),
    Row(4, "Kevin", 28, "Human resources"),
    Row(5, "Bob", 25, "IT"),
    Row(6, "Alice", 20, "Banking"),
    Row(7, "Carol", 30, "IT")
  ))
  val schema = StructType(Array(
    StructField("id", IntegerType, false),
    StructField("name", StringType, false),
    StructField("age", IntegerType, false),
    StructField("job", StringType, false),
  ))
  val dataFrame = spark.createDataFrame(rdd, schema)
  dataFrame.printSchema()
  dataFrame.show()

  // 同期処理
  val longAcc1 = spark.sparkContext.longAccumulator("SumAccumulator1")
  dataFrame.foreach(row => {
    longAcc1.add(row.getInt(0))
  })
  println("*****************")
  println("Accumulator value:" + longAcc1.value)
  println("*****************")

  // 非同期処理
  val longAcc2 = spark.sparkContext.longAccumulator("SumAccumulator2")
  val future = rdd.foreachAsync(row => {
    longAcc2.add(row.getInt(2))
  })
  // future.get // ここでコメント外せば待ち合わせになり、↓の合計値が算出される
  println("*****************")
  println("Accumulator value:" + longAcc2.value)
  println("*****************")

  sparkContext.stop()
}

出力結果

結果1:コメントを付けた場合

*****************
Accumulator value:28
*****************

*****************
Accumulator value:0
*****************

結果2:コメントを外した場合

*****************
Accumulator value:28
*****************

*****************
Accumulator value:245
*****************

参考文献

https://sparkbyexamples.com/spark/spark-foreachpartition-vs-foreach-explained/
https://sparkbyexamples.com/spark/spark-foreach-usage-with-examples/

関連記事

Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Scala ~ 非同期 / Future ~
https://dk521123.hatenablog.com/entry/2023/04/30/000000
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942