0
点赞
收藏
分享

微信扫一扫

Flink_状态编程_2

MaxWen 2022-04-02 阅读 68

文章目录

1.算子状态概述

1.1 算子状态分类

  • 算子状态:
    列表状态, 联合列表状态, 广播状态
    ListState, UnionListState, BroadcastState

1.2 状态分析

  1. 列表状态: 与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

  2. 联合列表状态: 与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

  3. 广播状态: 状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。广播状态是以类似映射结构(map)的键值对(key-value)来保存的

1.3 CheckpointedFunction 接口

在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个 CheckpointedFunction 接口。

public interface CheckpointedFunction {
	// 保存状态快照到检查点时,调用这个方法
	void snapshotState(FunctionSnapshotContext context) throws Exception
	// 初始化状态时调用这个方法,也会在恢复状态时调用
	void initializeState(FunctionInitializationContext context) throws Exception;
}

每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。
而在算子任务进行初始化时,会调用. initializeState()方法。

2.算子状态 编程案例

2.1 列表状态案例

自定义的 SinkFunction 会在CheckpointedFunction 中进行数据缓存,然后统一发送到下游。这个例子演示了列表状态的平均分割重组(event-split redistribution)。

缓存机制

public class BufferingSinkExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );
        stream.addSink(new BufferingSink(10));
        stream.print("input");

        environment.execute();
    }
    public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction{
        private final int threshold;

        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElement = new ArrayList<>();
        }

        private List<Event> bufferedElement;

        private ListState<Event> listState;

        @Override
        public void invoke(Event value, Context context) throws Exception {
            bufferedElement.add(value);
            if (bufferedElement.size() == threshold){
                for (Event event : bufferedElement) {
                    System.out.println(event);
                }
                bufferedElement.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            // 对状态进行持久化, 复制缓存的列表到 列表状态
            listState.clear();
            for (Event event : bufferedElement) {
                listState.add(event);
            }
        }

        @Override
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            // 定义算子状态
            ListStateDescriptor<Event> listStateDescriptor = new ListStateDescriptor<>("buffered-elements", Event.class);

            listState = functionInitializationContext.getOperatorStateStore().getListState(listStateDescriptor);

            // 如果有故障, 需要将列表状态中元素 复制到 列表
            if (functionInitializationContext.isRestored()){
                for (Event event : listState.get()) {
                    bufferedElement.add(event);
                }
            }
        }
    }
}

在这里插入图片描述

2.2 广播机制 案例

考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统
计,就可以了解平台的运用状况以及用户的行为习惯。

public class BehaviorPatternDetectExample {
    // 用户行为模式
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        // 1.行为的数据流
        DataStream<Action> actionStream = environment.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "order")
        );
        DataStream<Pattern> patternStream = environment.fromElements(
                new Pattern("login", "pay"),
                new Pattern("login", "order")
        );
        // 定义广播状态描述器
        MapStateDescriptor<Void, Pattern> stateDescriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> broadcast = patternStream.broadcast(stateDescriptor);

        // 连接两条流 处理
        actionStream.keyBy(data -> data.userId)
                .connect(broadcast)
                .process(new PatternDetector())
                .print();

        environment.execute();

        // 2.行为的模式流, 广播流

    }
    private static class Action{
        public String userId;
        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId='" + userId + '\'' +
                    ", action='" + action + '\'' +
                    '}';
        }
    }
    public static class Pattern{
        public String active1;
        public String active2;

        public Pattern() {
        }

        public Pattern(String active1, String active2) {
            this.active1 = active1;
            this.active2 = active2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "active1='" + active1 + '\'' +
                    ", active2='" + active2 + '\'' +
                    '}';
        }
    }
    public static class PatternDetector extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>{
        // 定义一个KeyedState, 保存上一次用户的行为
        ValueState<String> preActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            preActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action", String.class));
        }


        @Override
        public void processElement(Action action, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            // 判断规则
            // 从广播状态拿取
            ReadOnlyBroadcastState<Void, Pattern> patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            // 规则
            Pattern pattern = patternState.get(null);
            // 上一次行为
            String preAction = preActionState.value();
            // 判断是否匹配
            if (pattern != null && preAction != null){
                if (pattern.active1.equals(preAction) && pattern.active2.equals(action.action)){
                    collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
                }
            }
            preActionState.update(action.action);


        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context context, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            BroadcastState<Void, Pattern> patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            patternState.put(null, pattern);

        }
    }
}

在这里插入图片描述

3.状态持久化和状态后端

在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)
保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

3.1 检查点(Checkpoint)

有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。

默认情况下,检查点是被禁用的.

 environment.enableCheckpointing(10000L);

3.2 状态后端 state Backends

检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;
完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存

在这里插入图片描述
上图为检查点的保存的图

Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

哈希表状态后端(HashMapStateBackend)
把状态存放在内存里

environment.setStateBackend(new HashMapStateBackend());

内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend)
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>1.13.0</version>
        </dependency>
environment.setStateBackend(new EmbeddedRocksDBStateBackend());

如何选择正确的状态后端

HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。

HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写
性能要比 HashMapStateBackend 慢一个数量级。

4.状态编程总结

有状态的流处理是 Flink 的本质,所以状态可以说是 Flink 中最为重要的概念。之前聚合算子、窗口算子中已经提到了状态的概念。

状态编程主要是介绍了各种状态以及其案例编程, 以及状态后端和 checkpoint。

检查点是一个非常重要的概念,是 Flink 容错机制的核心

举报

相关推荐

0 条评论