文章目录
1.CEP概述
复杂事件处理(Complex Event Processing)
总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:
- 定义一个匹配规则
- 将匹配规则应用到事件流上,检测满足规则的复杂事件
- 对检测到的复杂事件进行处理,得到结果进行输出
输入的事件流在一个模式规则中输出得到的数据流
CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流。
目的在于在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。
2.模式
CEP 的第一步所定义的匹配规则, 称为模式。
模式定义的内容:
- 每个简单事件的特征
- 简单事件之间的组合关系
3.CEP 的应用场景
CEP 主要用于实时流数据的分析处理。Cep可以帮助在复杂的事件中得到有意义的事件流。从而做出预警
这在企业项目的风控管理、用户画像和运维监控中,都有非常重要的应用。
- 风险控制
- 用户画像
- 运维监控
CEP 的应用场景非常丰富。很多大数据框架,如 Spark、Samza、Beam 等都提供了不同的CEP 解决方案,但没有专门的库(library)。而 Flink 提供了专门的 CEP 库用于复杂事件处理,可以说是目前 CEP 的最佳解决方案。
4.快速上手代码
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
定义的一个POJO
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent() {
}
public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
需求:
检测用户行为,如果连续三次登录失败,就输出报警信息。
public class LoginFailDetectExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
KeyedStream<LoginEvent, String> stream = environment
.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}
)
)
.keyBy(r -> r.userId);
// 定义模式
Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("one")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("two")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("three")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
});
// 模式应用在数据流上, 检查复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent event1 = map.get("one").get(0);
LoginEvent event2 = map.get("two").get(0);
LoginEvent event3 = map.get("three").get(0);
return event1.userId + "连续三次登录失败!登录时间:" + event1.timestamp + "," + event2.timestamp + "," + event3.timestamp;
}
})
.print("warning");
environment.execute();
}
}