0
点赞
收藏
分享

微信扫一扫

Flink_cep_1

alonwang 2022-04-07 阅读 54

文章目录

1.CEP概述

复杂事件处理(Complex Event Processing)

总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:

  • 定义一个匹配规则
  • 将匹配规则应用到事件流上,检测满足规则的复杂事件
  • 对检测到的复杂事件进行处理,得到结果进行输出

在这里插入图片描述
输入的事件流在一个模式规则中输出得到的数据流

CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流。

目的在于在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。

2.模式

CEP 的第一步所定义的匹配规则, 称为模式。

模式定义的内容:

  1. 每个简单事件的特征
  2. 简单事件之间的组合关系

3.CEP 的应用场景

CEP 主要用于实时流数据的分析处理。Cep可以帮助在复杂的事件中得到有意义的事件流。从而做出预警

这在企业项目的风控管理、用户画像和运维监控中,都有非常重要的应用。

  • 风险控制
  • 用户画像
  • 运维监控

CEP 的应用场景非常丰富。很多大数据框架,如 Spark、Samza、Beam 等都提供了不同的CEP 解决方案,但没有专门的库(library)。而 Flink 提供了专门的 CEP 库用于复杂事件处理,可以说是目前 CEP 的最佳解决方案。

4.快速上手代码

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

定义的一个POJO

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent() {
    }

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

需求:

检测用户行为,如果连续三次登录失败,就输出报警信息。

public class LoginFailDetectExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        KeyedStream<LoginEvent, String> stream = environment
                .fromElements(
                        new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                        new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                        new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 8000L),
                        new LoginEvent("user_2", "192.168.1.29", "success", 6000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<LoginEvent>() {
                                            @Override
                                            public long extractTimestamp(LoginEvent loginEvent, long l) {
                                                return loginEvent.timestamp;
                                            }
                                        }
                                )
                )
                .keyBy(r -> r.userId);


        // 定义模式
        Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("one")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("two")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("three")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                });
        // 模式应用在数据流上, 检查复杂事件
        PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
        // 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
                        LoginEvent event1 = map.get("one").get(0);
                        LoginEvent event2 = map.get("two").get(0);
                        LoginEvent event3 = map.get("three").get(0);
                        return event1.userId + "连续三次登录失败!登录时间:" + event1.timestamp + "," + event2.timestamp + "," + event3.timestamp;
                    }
                })
                .print("warning");

        environment.execute();
    }
}

在这里插入图片描述

举报

相关推荐

0 条评论