■ はじめに
RichParallelSourceFunction について調べてみた
目次
【1】RichParallelSourceFunction 1)使用上の注意 2)サンプル
【1】RichParallelSourceFunction
1)使用上の注意
調べた後に気が付いたが、非推奨だった
Deprecated. This class is based on the SourceFunction API, which is due to be removed. Use the new Source API instead.
2)サンプル
import java.util.Calendar import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import scala.util.Random import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} class DemoTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[Output](Time.seconds(5)) { override def extractTimestamp(output: Output): Long = output.timestamp } case class Output(id: String, timestamp: Long, value: Double) class DemoSourceFunction extends RichParallelSourceFunction[Output] { // flag indicating whether source is still running. var isRunning: Boolean = true /** run() continuously emits SensorReadings by emitting them through the SourceContext. */ override def run(sourceContext: SourceContext[Output]): Unit = {1 val rand = new Random() val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask var currentValue = (1 to 10).map { i => ("x_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20)) } // emit data until being canceled while (isRunning) { currentValue = currentValue.map(t => (t._1, t._2 + (rand.nextGaussian() * 0.5)) ) val currentTime = Calendar.getInstance.getTimeInMillis currentValue.foreach(t => sourceContext.collect(Output(t._1, currentTime, t._2))) // wait for 2000 ms Thread.sleep(2000) } } /** Cancels this SourceFunction. */ override def cancel(): Unit = { isRunning = false } /** Close this SourceFunction. */ override def close(): Unit = { isRunning = false } } object Demo { def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // checkpoint every 10 seconds env.getCheckpointConfig.setCheckpointInterval(10 * 1000) // use event time for the application env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // configure watermark interval env.getConfig.setAutoWatermarkInterval(1000L) // ingest sensor stream val demo = new DemoSourceFunction val demoData = env // SensorSource generates random temperature readings .addSource(demo) // assign timestamps and watermarks which are required for event time .assignTimestampsAndWatermarks(new DemoTimeAssigner) val result = demoData .map(r => Output(r.id, r.timestamp, (r.value - 32) * (5.0 / 9.0)) ) result.print env.execute() } }
関連記事
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ DataStream API ~
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL ~
https://dk521123.hatenablog.com/entry/2023/10/04/001517
LakeFormation ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2020/10/13/094041