■ はじめに
Spark の FutureAction について、業務上でてきたので、 調べてみた。あんまり情報がないので、随時更新していく、、、
目次
【1】FutureAction に関する記述 【2】サンプル
【1】FutureAction に関する記述
http://mogile.web.fc2.com/spark/rdd-programming-guide.html
より抜粋 ~~~~~~~~~~~~~~~~ Spark RDD API は foreachについてのforeachAsync のような、 いくつかのアクションの非同期バージョンを公開します。 これらはアクションの完了を遮ることなくただちにcallerにFutureAction を返します。 これはアクションの非同期実行を管理あるいは待つために使うことができます。 ~~~~~~~~~~~~~~~~ => え!?これだけ、、、
API仕様
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/FutureAction.html
org.apache.spark Interface FutureAction<T> <= Interface でクラスじゃない All Superinterfaces: scala.concurrent.Awaitable<T>, scala.concurrent.Future<T> All Known Implementing Classes: ComplexFutureAction, SimpleFutureAction
Github
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala
import scala.concurrent._ # ここで「scala.concurrent.Future」してそ /** * A future for the result of an action to support cancellation. This is an extension of the * Scala Future interface to support cancellation. */ trait FutureAction[T] extends Future[T] {
【2】サンプル
動作確認する上での前提条件
* 以下の関連記事などで環境を整える
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
例1:実験コード
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.FutureAction object Main extends App { val spark = SparkSession.builder .master("local") .appName("Hello_World") .config("spark.some.config.option", "some-value") .getOrCreate() val sparkContext = spark.sparkContext val rdd = sparkContext.parallelize(Array( Row(1, "Mike", 45, "Sales"), Row(2, "Tom", 65, "IT"), Row(3, "Sam", 32, "Sales"), Row(4, "Kevin", 28, "Human resources"), Row(5, "Bob", 25, "IT"), Row(6, "Alice", 20, "Banking"), Row(7, "Carol", 30, "IT") )) val schema = StructType(Array( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("age", IntegerType, false), StructField("job", StringType, false), )) val dataFrame = spark.createDataFrame(rdd, schema) dataFrame.printSchema() dataFrame.show() // 同期処理 val longAcc1 = spark.sparkContext.longAccumulator("SumAccumulator1") dataFrame.foreach(row => { longAcc1.add(row.getInt(0)) }) println("*****************") println("Accumulator value:" + longAcc1.value) println("*****************") // 非同期処理 val longAcc2 = spark.sparkContext.longAccumulator("SumAccumulator2") val future = rdd.foreachAsync(row => { longAcc2.add(row.getInt(2)) }) // future.get // ここでコメント外せば待ち合わせになり、↓の合計値が算出される println("*****************") println("Accumulator value:" + longAcc2.value) println("*****************") sparkContext.stop() }
出力結果
結果1:コメントを付けた場合
***************** Accumulator value:28 ***************** ***************** Accumulator value:0 *****************
結果2:コメントを外した場合
***************** Accumulator value:28 ***************** ***************** Accumulator value:245 *****************
参考文献
https://sparkbyexamples.com/spark/spark-foreachpartition-vs-foreach-explained/
https://sparkbyexamples.com/spark/spark-foreach-usage-with-examples/
関連記事
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Scala ~ 非同期 / Future ~
https://dk521123.hatenablog.com/entry/2023/04/30/000000
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942