0
点赞
收藏
分享

微信扫一扫

Flink之State状态编程

yundejia 2022-02-12 阅读 68

文章目录

1.State分类

在这里插入图片描述
State[ValueState、ReadOnlyBroadcastState、MapState、AppendingState]

AppendingState[FoldingState、MergingState]

MergingState[ListState、AggregatingState、ReducingState]

在flink中,状态始终与特定算子相关联,像reduce、sum等算子都是默认带状态的,而map、flatmap本身时不带状态的,如果需要用到状态,可以自定义

为了使运行的flink了解算子的状态,算子需要预先注册其状态

  • 算子状态(Operator State): 算子状态的作用范围限定为算子任务
  • 键控状态(keyed State):生产中应用案例较多, 根据输入数据流中定义的key来维护和访问

不管是哪种类型的State,都有2种不同的状态,raw(原生状态)、managed(Flink设定好的状态)

  • managed状态由Flink-runtime控制,类似于RocksDB、HashTable、Fs;类似于ValueState、ListState,Flink-runtime能将状态进行特定的编码,然后写入到检查点,所有的算子都能使用managed-state
  • raw状态而是将state维护在自己的数据结构,当checkpoint的时候,只会将state以序列化的形式写进checkpoint,flink只能看到原生的字节,而对state的数据结构一无所知

2.算子状态(Operator State)

在这里插入图片描述

2.1 算子状态的数据结构(non-keyed state)

列表状态(list state)
联合列表状态(union list state)
广播状态(broadast)

3.键控状态(keyed State)

在这里插入图片描述

3.1 键控状态数据结构

值状态(value state)
列表状态 list state
映射状态 map state
聚合状态(reducing state & aggregating state)

4.状态后端(state backends)

MemoryStateBackend(一般用于测试环境)

FsStateBackend(将checkpoint存在文件系统中,本地状态还是存在taskmanager本地内存中,不适合超大状态的存储)

RocksDBStateBackend(将所有状态序列化后,存入本地RocksDB(kv存储介质)中存储)

5.状态编程

package com.shufang.flink.state

import com.shufang.flink.bean.SensorReading
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object StateDemo {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(300)
    //    env.setStateBackend(new MemoryStateBackend())

    val sensorStream: DataStream[SensorReading] = env.socketTextStream("localhost", 9999)
      .map(a => {
        val strings: Array[String] = a.split(",")
        SensorReading(strings(0), strings(1).trim.toLong, strings(2).trim.toDouble)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
      override def extractTimestamp(element: SensorReading): Long = {
        element.timeStamp
      }
    })

    val processedStream: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .process(new MyProcessFunction01)

    val processedStream02: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .flatMap(new MyFlatMapFunction)

    /**
     * 1.通过flatMapWithstate的方式来保存状态 Seq与List都是TraversableOnce类型
     * 这里面的状态其实就是通过Option()来维护的,
     * 如果之前没有状态保存,那么option就是None,
     * 吐过之前有保存过状态,那么Option就是Some,可以通过getOrElse(0)获取状态
     * -----------------------------------------------------------------------
     * def flatMapWithState[R: TypeInformation, S: TypeInformation](
     * fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R]
     * 源码范型解析:
     * >>>>>>>> DataStream[R] -> R就是返回值中的类型
     * >>>>>>>> Oprion[S] -> 那么S肯定就是状态的类型
     * >>>>>>>> 返回值、状态类型都有了,那么T肯定就是输入数据的类型
     */
    val processStream03: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id)
      .flatMapWithState[(String, Double, Double), Double] {
        //如果状态为空,那么只更新state为当前temp
        case (sensor: SensorReading, None) => (List.empty, Some(sensor.temperture))
        //实际上,这里使用Option来维持状态的,没有状态保存而获取的话,就相当于getorelse(0)

        case (sensor: SensorReading, pretemp: Some[Double]) =>
          val lastTemp: Double = pretemp.get
          val diff: Double = (sensor.temperture - lastTemp).abs
          if (diff > 10) {
            (Seq((sensor.id, lastTemp, sensor.temperture)), Some(sensor.temperture))
          } else {
            (List(), Some(sensor.temperture))
          }
      }
    processStream03.print("flatMapWith-result")
//    processedStream02.print("报警信息-温度波动过大")
    sensorStream.print("输入数据")

    env.execute("state")
  }
}


/**
 * 2.flatMapFunction
 */
class MyFlatMapFunction extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {

  private var preTemp: ValueState[Double] = _
  //利用open函数的特性,在初始化的时候就执行
  override def open(parameters: Configuration): Unit = {
    preTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  }

  override def flatMap(value: SensorReading,
                       out: Collector[(String, Double, Double)]): Unit = {
    val lastTemp: Double = preTemp.value()

    if ((value.temperture - lastTemp).abs > 10) {
      out.collect((value.id, lastTemp, value.temperture))
    }
    preTemp.update(value.temperture)
  }
}


/**
 * 3.processFunction
 * processFunction可以处理所有Api能处理的事情
 * 主要方法processElement(ctx,value,out)、onTime(ctx,value,out)回调函数
 */
class MyProcessFunction01 extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] {
  //声明State
  lazy val pretemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("pretemp", classOf[Double]))

  override def processElement(
                               value: SensorReading,
                               ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context,
                               out: Collector[(String, Double, Double)]): Unit = {
    //调用state
    val lastTemp: Double = pretemp.value()
    val currentTemp: Double = value.temperture

    if ((currentTemp - lastTemp).abs > 10) {
      out.collect((value.id, lastTemp, currentTemp))
    }

    //更新state
    pretemp.update(currentTemp)

  }
}

state设置ttl

// 设置ttl的配置
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
 //声明状态描述器   
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
//state.enableTimeToLive(ttl配置)
  stateDescriptor.enableTimeToLive(ttlConfig)
举报

相关推荐

0 条评论