【Flink】Apache Flink ~ RichParallelSourceFunction ~

■ はじめに

RichParallelSourceFunction について調べてみた

目次

【1】RichParallelSourceFunction
 1)使用上の注意
 2)サンプル

【1】RichParallelSourceFunction

1)使用上の注意

調べた後に気が付いたが、非推奨だった

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.html

Deprecated. 
This class is based on the SourceFunction API, which is due to be removed.
Use the new Source API instead.

2)サンプル

import java.util.Calendar

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.util.Random
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

class DemoTimeAssigner
  extends BoundedOutOfOrdernessTimestampExtractor[Output](Time.seconds(5)) {
  
  override def extractTimestamp(output: Output): Long = output.timestamp
}

case class Output(id: String, timestamp: Long, value: Double)

class DemoSourceFunction extends RichParallelSourceFunction[Output] {

  // flag indicating whether source is still running.
  var isRunning: Boolean = true

  /** run() continuously emits SensorReadings by emitting them through the SourceContext. */
  override def run(sourceContext: SourceContext[Output]): Unit = {1
    val rand = new Random()
    val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask
    var currentValue = (1 to 10).map {
      i => ("x_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20))
    }

    // emit data until being canceled
    while (isRunning) {
      currentValue = currentValue.map(t => (t._1, t._2 + (rand.nextGaussian() * 0.5)) )
      val currentTime = Calendar.getInstance.getTimeInMillis
      currentValue.foreach(t => sourceContext.collect(Output(t._1, currentTime, t._2)))

      // wait for 2000 ms
      Thread.sleep(2000)
    }
  }

  /** Cancels this SourceFunction. */
  override def cancel(): Unit = {
    isRunning = false
  }

  /** Close this SourceFunction. */
  override def close(): Unit = {
    isRunning = false
  }
}

object  Demo {
  def main(args: Array[String]): Unit = {
        // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // checkpoint every 10 seconds
    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // configure watermark interval
    env.getConfig.setAutoWatermarkInterval(1000L)

    // ingest sensor stream
    val demo = new DemoSourceFunction
    val demoData = env
      // SensorSource generates random temperature readings
      .addSource(demo)
      // assign timestamps and watermarks which are required for event time
      .assignTimestampsAndWatermarks(new DemoTimeAssigner)

    val result = demoData
      .map(r => Output(r.id, r.timestamp, (r.value - 32) * (5.0 / 9.0)) )
    result.print

    env.execute()
  }
}

関連記事

Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ DataStream API
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL
https://dk521123.hatenablog.com/entry/2023/10/04/001517
LakeFormation ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2020/10/13/094041