/**
- 使用CEP,编程简单,但不够灵活和及时
- within(Time.minutes(1)),事件EventTime,实现是滚动窗口【延时数据10秒】
- 先出现 price<1元 ,再出现 price>500元的匹配事件
*/
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class FraudDetectionByCEP {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.socketTextStream("doit01", 8888);
env.enableCheckpointing(50000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
KeyedStream<Account, String> keyedStream = data.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(
new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[1]);
}
}
)).map(new MapFunction<String, Account>() {
@Override
public Account map(String value) throws Exception {
String[] fields = value.split(",");
return new Account(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
}
}).keyBy(Account::getId);
Pattern<Account, Account> pattern = Pattern.<Account>begin("start").where(new IterativeCondition<Account>() {
@Override
public boolean filter(Account account, Context<Account> context) throws Exception {
Double price = account.getPrice();
return price < 1.0;
}
}).next("second").where(new IterativeCondition<Account>() {
@Override
public boolean filter(Account account, Context<Account> context) throws Exception {
Double price = account.getPrice();
return price >= 500.0;
}
}).within(Time.minutes(1));
OutputTag<Account> outputTag = new OutputTag<Account>("lateTag") {};
SingleOutputStreamOperator<Account> selectStream = CEP.pattern(keyedStream, pattern).inEventTime().select(outputTag, new MyTimeOutFunction(), new MyPatternSelectFunction());
DataStream<Account> warnStream = selectStream.getSideOutput(outputTag);
warnStream.print("迟到");
keyedStream.print("主流");
env.execute();
}
}
class MyTimeOutFunction implements PatternTimeoutFunction<Account,Account>
{
@Override
public Account timeout(Map<String, List<Account>> map, long l) throws Exception {
Account account = map.get("start").iterator().next();
//System.out.println("超时数据是:"+account);
return account;
}
}
class MyPatternSelectFunction implements PatternSelectFunction<Account,Account>
{
@Override
public Account select(Map<String, List<Account>> map) throws Exception {
Account account = map.get("second").iterator().next();
System.out.println("危险交易是:"+account);
return account;
}
}
- 1、输入数据:
主流:4> Account(id=1, createTime=1650168000000, price=0.01)
主流:1> Account(id=4, createTime=1650168000003, price=0.01)
主流:3> Account(id=3, createTime=1650168000002, price=0.01)
主流:2> Account(id=2, createTime=1650168000001, price=0.01)
主流:5> Account(id=5, createTime=1650168000004, price=0.01)
主流:4> Account(id=1, createTime=1650168030000, price=1000.0)
主流:3> Account(id=3, createTime=1650168030001, price=100.0)
主流:3> Account(id=3, createTime=1650168040000, price=600.0)
输入此条数据后,触发:
主流:2> Account(id=2, createTime=1650168120000, price=1000.0)
输出结果是:
危险交易是:Account(id=1, createTime=1650168030000, price=1000.0)
迟到:1> Account(id=4, createTime=1650168000003, price=0.01)
迟到:2> Account(id=2, createTime=1650168000001, price=0.01)
迟到:5> Account(id=5, createTime=1650168000004, price=0.01)
*
- 3、总结
- id=2,4,5 在1min内没有next要匹配的数据,所有认定存在迟到数据
- 当事件时间超过一分钟后,触发
- 1 是危险数据