0
点赞
收藏
分享

微信扫一扫

Flink使用CEP实现风险账户监控

云卷云舒xj 2022-04-17 阅读 53
flink

/**

  • 使用CEP,编程简单,但不够灵活和及时
  • within(Time.minutes(1)),事件EventTime,实现是滚动窗口【延时数据10秒】
  • 先出现 price<1元 ,再出现 price>500元的匹配事件
    */
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;

public class FraudDetectionByCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("doit01", 8888);
        env.enableCheckpointing(50000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        KeyedStream<Account, String> keyedStream = data.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(
                new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.parseLong(element.split(",")[1]);
                    }
                }
        )).map(new MapFunction<String, Account>() {
            @Override
            public Account map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Account(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
            }
        }).keyBy(Account::getId);

        Pattern<Account, Account> pattern = Pattern.<Account>begin("start").where(new IterativeCondition<Account>() {
            @Override
            public boolean filter(Account account, Context<Account> context) throws Exception {
                Double price = account.getPrice();
                return price < 1.0;
            }
        }).next("second").where(new IterativeCondition<Account>() {
            @Override
            public boolean filter(Account account, Context<Account> context) throws Exception {
                Double price = account.getPrice();
                return price >= 500.0;
            }
        }).within(Time.minutes(1));

        OutputTag<Account> outputTag = new OutputTag<Account>("lateTag") {};

        SingleOutputStreamOperator<Account> selectStream = CEP.pattern(keyedStream, pattern).inEventTime().select(outputTag, new MyTimeOutFunction(), new MyPatternSelectFunction());

        DataStream<Account> warnStream = selectStream.getSideOutput(outputTag);

        warnStream.print("迟到");
        keyedStream.print("主流");
        env.execute();
    }
}
class MyTimeOutFunction implements PatternTimeoutFunction<Account,Account>
{
    @Override
    public Account timeout(Map<String, List<Account>> map, long l) throws Exception {
        Account account = map.get("start").iterator().next();
        //System.out.println("超时数据是:"+account);
        return account;
    }
}

class MyPatternSelectFunction implements PatternSelectFunction<Account,Account>
{
    @Override
    public Account select(Map<String, List<Account>> map) throws Exception {
        Account account = map.get("second").iterator().next();
        System.out.println("危险交易是:"+account);
        return account;
    }
}

  • 1、输入数据:

主流:4> Account(id=1, createTime=1650168000000, price=0.01)
主流:1> Account(id=4, createTime=1650168000003, price=0.01)
主流:3> Account(id=3, createTime=1650168000002, price=0.01)
主流:2> Account(id=2, createTime=1650168000001, price=0.01)
主流:5> Account(id=5, createTime=1650168000004, price=0.01)
主流:4> Account(id=1, createTime=1650168030000, price=1000.0)
主流:3> Account(id=3, createTime=1650168030001, price=100.0)
主流:3> Account(id=3, createTime=1650168040000, price=600.0)

输入此条数据后,触发:
主流:2> Account(id=2, createTime=1650168120000, price=1000.0)

输出结果是:
危险交易是:Account(id=1, createTime=1650168030000, price=1000.0)
迟到:1> Account(id=4, createTime=1650168000003, price=0.01)
迟到:2> Account(id=2, createTime=1650168000001, price=0.01)
迟到:5> Account(id=5, createTime=1650168000004, price=0.01)
*

  • 3、总结
  • id=2,4,5 在1min内没有next要匹配的数据,所有认定存在迟到数据
  • 当事件时间超过一分钟后,触发
  • 1 是危险数据
举报

相关推荐

0 条评论