0
点赞
收藏
分享

微信扫一扫

Flink 应用-电商用户行为分析

洒在心头的阳光 2022-04-13 阅读 58
flink

Flink 应用-电商用户行为分析

文章目录

相关博客:

Flink电商项目第一天-电商用户行为分析及完整图步骤解析-热门商品统计TopN的实现

一、电商用户行为分析

在这里插入图片描述

电商业务分析主要有以下三类:

  • 统计分析
    • 点击、浏览
    • 热门商品、最近热门商品、分类热门商品、流量统计
  • 偏好统计
    • 收藏、喜欢、评分、打标签
    • 用户画像、推荐列表
  • 风险控制
    • 下订单、支付、登录
    • 刷单监控、订单失效监控、恶意登录(短时间内频繁登录失败)监控

1.1 项目模块设计

电商分析按照流量和业务分类,可分为两大类:

在这里插入图片描述

按照统计类型分类如下:

在这里插入图片描述

1.2 数据源

在这里插入图片描述

数据结构

UserBehavior

在这里插入图片描述

ApacheLogEvent

在这里插入图片描述

二、项目模块

本次项目做5个分析:

在这里插入图片描述

处理数据时,先对某个id分组,并设定窗口,之后对某个字段增量聚合(并设定指定输出格式),最后最窗口分组,将同一个窗口内的数据累加。

2.1 实时热门商品统计

  • 基本需求
    • 统计近1个小时内的热门商品,每五分钟更新一次
    • 热门度使用浏览(“pv”)来衡量
  • 解决思路
    1. 在所有用户行为数据中,过滤出浏览(”pv”)行为进行统计
    2. 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟,统计出每一种商品的访问数
    3. 再根据滑动窗口的时间,统计出访问次数最多的5个商品

第二步的流程大致如下:

在这里插入图片描述

首先,按照商品id进行分区

在这里插入图片描述

接着对数据划分滑动时间窗口

在这里插入图片描述

时间窗口区间为左闭右开,同一份数据会被分到不同的窗口。

例如:

在这里插入图片描述

然后进行窗口聚合

在这里插入图片描述

aggregate第一个参数是窗口聚合的规则,第二个参数是定义输出的数据结构

窗口聚合函数

窗口聚合策略—每出现一条记录就加一。

需要实现AggragateFunction接口,并需要实现4个函数createAccumulatoraddmergegetResult

interface AggregateFunction<IN, ACC, OUT>
/**
* IN :输入类型
* ACC :累加器类型
* OUT :输出类型
*/

Window输出类型函数

定义输出结构:ItemViewCount(itemId,windowEnd,count)

itemId:商品id

windowEnd:窗口结束时间

count:计数

interface WindowFunction<IN,OUT,KEY,W extends Window>

/**
     *  WindowFunction<IN,OUT,KEY,W extends Window>
     *      IN :输出类型,就是累加器最后输出类型
     *      OUT:最后想要输出类型
     *      KEY:Tuple泛型,分组的key,在这里是itemId,窗口根据itemId聚合
     *      W  : 聚合的窗口,w.getEnd就能拿到窗口的结束时间
     */

画图举例:

首先根据id分组,然后窗口聚合。

在这里插入图片描述

之后再进行统计处理,相同窗口的数据放在一起,并输出top5

在这里插入图片描述

这个过程需要一个中间值,把同一个窗口的数据都放进去,这就需要状态了。

在这里插入图片描述

最终排序输出 — KeyedProcessFunction

  • 针对有状态流的底层API
  • KeyedProcessFunction会对分区后的每一条子流进行处理
  • 以windowEnd作为key,保证分流以后每一条流的数据都在一个时间窗口内
  • 从ListState中读取当前流的状态,存储数据进行排序输出

使用ProcessFunction定义KeyedStream的处理逻辑。

分区之后,每个KeyedStream都有自己的生命周期

  • open:初始化,在这里可以获取当前流的状态
  • processElement:处理流中每一个元素时调用
  • onTimer:定时调用,注册定时器Timer并触发之后的回调操作

在这里插入图片描述

创建POJO

需要生成get/set、无参/有参构造函数,toString

  • ItemViewCount

    private Long itemId;
    private Long windowEnd;
    private Long count;
    
  • UserBehavior

    private Long uerId;
    private Long itemId;
    private Integer categoryId;
    private String behavior;
    private Long timestamp;
    

代码

测试使用的是60秒窗口大小、30秒滑动窗口

import beans.ItemViewCount;
import beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;

/**
 * @author Kewei
 * @Date 2022/3/5 15:10
 */

public class HotItems {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");

        // 将数据转换为POJO,并设置事件时间
        SingleOutputStreamOperator<UserBehavior> dataStream = inputStream.map(line -> {
            String[] filed = line.split(",");
            return new UserBehavior(new Long(filed[0]), new Long(filed[1]), new Integer(filed[2]), filed[3], new Long(filed[4]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior userBehavior) {
                return userBehavior.getTimestamp() * 1000;
            }
        });

        // 筛选出pv的数据,按照商品id分组,划分滑动时间窗口,对每个窗口进行增量聚合,并将输出结果进行设定指定格式ItemViewCount
        SingleOutputStreamOperator<ItemViewCount> windowAggStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .keyBy("itemId")
                .timeWindow(Time.seconds(60), Time.seconds(30))
                .aggregate(new CountAgg(), new WindowResultFunction());

        // 将同一个窗口的数据,进行分组,最后设置定时输出
        SingleOutputStreamOperator<String> resultStream = windowAggStream
                .keyBy("windowEnd")
                .process(new TopNHotItems(5));

        resultStream.print();

        env.execute("hot items");
    }

    // 设定同一个商品数据的聚合方法

    /**
     * AggregateFunction<IN, ACC, OUT>
     *     IN :输出类型
     *     ACC:累加器类型
     *     OUT:最后输出结果
     */
    private static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior userBehavior, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return aLong + acc1;
        }
    }

    // 设定输出格式

    /**
     *  WindowFunction<IN,OUT,KEY,W extends Window>
     *      IN :输出类型,就是累加器最后输出类型
     *      OUT:最后想要输出类型
     *      KEY:Tuple泛型,分组的key,在这里是itemId,窗口根据itemId聚合
     *      W  : 聚合的窗口,w.getEnd就能拿到窗口的结束时间
     */
    private static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> out) throws Exception {
            Long itemId = tuple.getField(0);
            Long end = timeWindow.getEnd();
            Long next = iterable.iterator().next();
            out.collect(new ItemViewCount(itemId, end, next));
        }
    }

    /**
     * KeyedProcessFunction<KEY,IN,OUT>
     *     KEY: 分组key的类型
     *     IN : 输入的类型
     *     OUT:输出的类型
     */
    private static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
        private Integer topSize;
        public TopNHotItems(Integer topSize) {
            this.topSize = topSize;
        }

        ListState<ItemViewCount> itemViewCountListState;

        @Override
        public void open(Configuration parameters) throws Exception {
            itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("Item-view-count-list",ItemViewCount.class));
        }

        @Override
        public void processElement(ItemViewCount count, KeyedProcessFunction<Tuple, ItemViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            itemViewCountListState.add(count);
            // 注册一个定时器,在1毫秒之后运行,由于同一个窗口的结束时间时一样的,所以当时间变了,就说明同一个窗口的数据都添加进去了
            ctx.timerService().registerEventTimeTimer(count.getWindowEnd() + 1);
        }

        // 设定定时器任务
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, ItemViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将ListState转换成ArrayList
            ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());

            // 排序
            itemViewCounts.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return o2.getCount().intValue() - o1.getCount().intValue();
                }
            });

            // 将数据格式化
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append("===========\n");
            stringBuffer.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");

            for (int i = 0; i < Math.min(topSize, itemViewCounts.size()); i++) {
                ItemViewCount itemViewCount = itemViewCounts.get(i);
                stringBuffer.append("NO ").append(i+1).append(":")
                        .append(" 商品id = ").append(itemViewCount.getItemId())
                        .append(" 热门度 = ").append(itemViewCount.getCount())
                        .append("\n");
            }
            stringBuffer.append("============\n\n");

            // 控制输出频率
            Thread.sleep(1000L);

            // 输出数据
            out.collect(stringBuffer.toString());
        }
    }
}

输出:

===========
窗口结束时间:2017-11-26 09:00:30.0
NO 1: 商品id = 2455388 热门度 = 2
NO 2: 商品id = 1715 热门度 = 1
NO 3: 商品id = 2244074 热门度 = 1
NO 4: 商品id = 3076029 热门度 = 1
NO 5: 商品id = 176722 热门度 = 1
============

...

2.2 实时流量统计 — 热门网页

  • 基本需求
    • 从web服务器的日志中,统计实时的热门访问页面
    • 统计每分钟的ip访问量,取出访问量最大的5个地址,每五秒更新一次
  • 解决思路
    1. 将apache服务器日志中的时间,转换为时间戳,作为Event Time
    2. 筛选出get请求的网页,将请求资源的的数据过滤掉
    3. 根据url分组,构建滑动窗口,窗口长度1分钟、滑动距离为5秒,之后进行增量聚合,并指定格式输出
    4. 最后根据窗口的时间分组,将同一个窗口的数据聚合,格式化输出

创建POJO

  • ApacheLogEvent

    private String ip;
    private String userId;
    private Long timestamp;
    private String method;
    private String url;
    
  • PageViewCount

    private String url;
    private Long windowEnd;
    private Long count;
    

代码

import bean.ApacheLogEvent;
import bean.PageViewCount;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.regex.Pattern;

/**
 * @author Kewei
 * @Date 2022/3/10 15:57
 */

public class HotPages {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotPages\\apache.log");

        // 将字符串格式的DataSream转换为POJO格式,并设置EventTime和watermark
        SingleOutputStreamOperator<ApacheLogEvent> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(" ");
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                    long time = simpleDateFormat.parse(fields[3]).getTime();
                    return new ApacheLogEvent(fields[0], fields[1], time, fields[5], fields[6]);
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) {
                    @Override
                    public long extractTimestamp(ApacheLogEvent element) {
                        return element.getTimestamp();
                    }
                });

        /**
         * 筛选数据,将请求和请求网页的数据筛选出来
         * 之后根据url分组,设置滑动窗口(窗口大小1分钟,滑动距离5秒)
         * 最后增量聚合,设置指定格式输出
         */
        SingleOutputStreamOperator<PageViewCount> resStream = dataStream
                .filter(data -> "GET".equals(data.getMethod()))
                .filter(data -> {
                    String regex = "^((?!\\\\.(css|js|png|ico)$).)*$";
                    return Pattern.matches(regex, data.getUrl());
                })
                .keyBy(ApacheLogEvent::getUrl)
                .timeWindow(Time.minutes(1), Time.seconds(5))
                .aggregate(new PageCountAgg(), new PageView());

        /**
         * 根据窗口的最后时间进行分组,之后对分组之后的数据格式化输出
         */
        SingleOutputStreamOperator<String> resultStream = resStream
                .keyBy(PageViewCount::getWindowEnd)
                .process(new MyProcessFunc());

        resultStream.print();

        env.execute();

    }

    public static class PageCountAgg implements AggregateFunction<ApacheLogEvent, Long, Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(ApacheLogEvent apacheLogEvent, Long aLong) {
            return aLong+1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return aLong+acc1;
        }
    }

    public static class PageView implements WindowFunction<Long, PageViewCount,String, TimeWindow>{
        @Override
        public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
            out.collect(new PageViewCount(s, window.getEnd(), input.iterator().next()));
        }
    }

    public static class MyProcessFunc extends KeyedProcessFunction<Long,PageViewCount,String>{
        private ListState<PageViewCount> list;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, PageViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            ArrayList<PageViewCount> pageViewCountArrayList = Lists.newArrayList(list.get().iterator());

            pageViewCountArrayList.sort((p1,p2) -> p2.getCount().intValue() - p1.getCount().intValue());

            StringBuffer result = new StringBuffer();
            result.append("====================\n");
            result.append("窗口结束时间").append(new Timestamp(timestamp - 1)).append("\n");

            for (int i = 0; i < Math.min(5, pageViewCountArrayList.size()); i++) {
                PageViewCount pageView = pageViewCountArrayList.get(i);
                result.append(pageView.getUrl()).append("  ").append(pageView.getCount()).append("\n");
            }
            result.append("===============\n\n\n");

            Thread.sleep(1000);

            out.collect(result.toString());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            list = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("PageViewCount",PageViewCount.class));
        }

        @Override
        public void processElement(PageViewCount value, KeyedProcessFunction<Long, PageViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            list.add(value);

            ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1);
        }
    }
}

乱序输出

有点不理解,之后再看一下

2.3 实时流量统计 — PV和UV

  • 基本需求
    • 从埋点日志中,统计实时的PV和UV
    • 统计每小时的访问量(PV),并且对用户去重(UV)
  • 解决思路
    • 对于pv行为,可以直接对数据筛选过滤之后,设置滚动时间窗口,sum累计就可以了。
    • 对于uv行为,需要利用Set数据结构进行去重
    • 对于超大规模的数据,可以考虑用布隆过滤器进行去重

统计PV代码

import beans.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * @author Kewei
 * @Date 2022/3/10 17:15
 */

public class PageView {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputSream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> dataStream = inputSream.map(line -> {
                    String[] field = line.split(",");
                    return new UserBehavior(new Long(field[0]), new Long(field[1]), new Integer(field[2]), field[3], new Long(field[4]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
                        return new Tuple2<>("pv", 1L);
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.hours(1))
                .sum(1);

        result.print();

        env.execute();

    }
}

输出:

(pv,41890)
(pv,48022)
(pv,47298)
(pv,44499)
(pv,48649)
(pv,50838)
(pv,52296)
(pv,52552)
(pv,48292)
(pv,13)

可以发现最后一个窗口的数据,特别少的数据,数据发生倾斜。

下面使用随机Key来统计数据。

import beans.PageViewCount;
import beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Random;

/**
 * @author Kewei
 * @Date 2022/3/10 17:15
 */

public class PageView2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputSream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> dataStream = inputSream.map(line -> {
                    String[] field = line.split(",");
                    return new UserBehavior(new Long(field[0]), new Long(field[1]), new Integer(field[2]), field[3], new Long(field[4]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
                        return new Tuple2<>("pv", 1L);
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.hours(1))
                .sum(1);
        
        // 使用随机key,解决数据倾斜的问题
        SingleOutputStreamOperator<PageViewCount> resStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
                    @Override
                    public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
                        Random random = new Random();
                        return new Tuple2<>(random.nextInt(10), 1L);
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.hours(1))
                .aggregate(new MyAgg(), new WindowFunc());

        SingleOutputStreamOperator<Tuple2<String, Long>> result2 = resStream
                .keyBy(PageViewCount::getWindowEnd)
                .process(new MyProcessFunc());

        result2.print();

//        result.print();

        env.execute();

    }

    public static class MyAgg implements AggregateFunction<Tuple2<Integer,Long>,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
            return aLong+integerLongTuple2.f1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return acc1+aLong;
        }
    }

    public static class WindowFunc implements WindowFunction<Long, PageViewCount,Integer, TimeWindow>{
        @Override
        public void apply(Integer integer, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
            out.collect(new PageViewCount(integer.toString(), window.getEnd(), input.iterator().next()));
        }
    }

    public static class MyProcessFunc extends KeyedProcessFunction<Long,PageViewCount,Tuple2<String,Long>>{
        private ListState<PageViewCount> pageViewCountListState;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, PageViewCount, Tuple2<String, Long>>.OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            ArrayList<PageViewCount> pageViewCounts = Lists.newArrayList(pageViewCountListState.get().iterator());

            Long sum = 0L;
            for (PageViewCount pageViewCount : pageViewCounts) {
                sum += pageViewCount.getCount();
            }

            out.collect(new Tuple2<>("pv",sum));
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            pageViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("PageViewCount",PageViewCount.class));
        }

        @Override
        public void processElement(PageViewCount value, KeyedProcessFunction<Long, PageViewCount, Tuple2<String, Long>>.Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            pageViewCountListState.add(value);

            ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1);
        }
    }
}

输出

(pv,41890)
(pv,48022)
(pv,47298)
(pv,44499)
(pv,48649)
(pv,50838)
(pv,52296)
(pv,52552)
(pv,48292)
(pv,13)

统计UV代码

import beans.PageViewCount;
import beans.UserBehavior;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.HashSet;

/**
 * @author Kewei
 * @Date 2022/3/10 20:39
 */

public class UserView {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        SingleOutputStreamOperator<PageViewCount> result = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .timeWindowAll(Time.hours(1))
                .apply(new UvCountResult());

        result.print();

        env.execute();

    }

    // 实现一个自定义全窗口函数
    public static class UvCountResult implements AllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{
        @Override
        public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<PageViewCount> out) throws Exception {
            // 定义一个Set结构,保存窗口中所有的userid,自动去重
            HashSet<Long> uidSet = new HashSet<>();
            for (UserBehavior value : values) {
                uidSet.add(value.getUserId());
            }
            out.collect(new PageViewCount("uv",window.getEnd(),(long) uidSet.size()));
        }
    }
}

使用布隆过滤器?不明白?

2.4 市场营销分析—APP市场推广统计

  • 基本需求
    • 从埋点日志中,统计APP市场推广的数据指标
    • 按照不同的推广渠道,分别统计数据
  • 解决思路
    • 通过过滤日志中的用户行为,按照不同的渠道进行统计
    • 可以用process function处理,得到自定义的输出数据信息

创建POJO

  • MarketingUserBehavior

    private Long userId;
    private String behavior;
    private String channel;
    private Long timestamp;
    
  • ChannelPromotionCount

    private String channel;
    private String behavior;
    private String windowEnd;
    private Long count;
    

代码

import bean.ChannelPromotionCount;
import bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @author Kewei
 * @Date 2022/3/11 9:12
 */

public class AppMarketingByChannel2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 使用自定义数据源
        SingleOutputStreamOperator<MarketingUserBehavior> dataStream = env.addSource(new MySource())
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MarketingUserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(MarketingUserBehavior element) {
                        return element.getTimestamp();
                    }
                });

        // 根据渠道和行为分组,然后聚合
        SingleOutputStreamOperator<ChannelPromotionCount> result = dataStream
                .filter(data -> !"UNINSTALL".equals(data.getBehavior()))
                .keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> getKey(MarketingUserBehavior value) throws Exception {
                        return new Tuple2<>(value.getChannel(), value.getBehavior());
                    }
                })
                .timeWindow(Time.seconds(30), Time.seconds(10))
                .aggregate(new MyAggFun(), new WindowFun());

        result.print();

        env.execute();
    }

    // AggregateFunction它会自动对相同的key进行聚合,不需要再处理key
    public static class MyAggFun implements AggregateFunction<MarketingUserBehavior,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(MarketingUserBehavior marketingUserBehavior, Long aLong) {
            return aLong+1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return acc1+aLong;
        }
    }

    public static class WindowFun implements WindowFunction<Long,ChannelPromotionCount,Tuple2<String,String>,TimeWindow>{
        @Override
        public void apply(Tuple2<String, String> tuple2, TimeWindow window, Iterable<Long> input, Collector<ChannelPromotionCount> out) throws Exception {
            out.collect(new ChannelPromotionCount(tuple2.f0,tuple2.f1,String.valueOf(window.getEnd()),input.iterator().next()));
        }
    }

    // 自定义数据源
    public static class MySource implements SourceFunction<MarketingUserBehavior>{
        Boolean running = true;

        // 定义用户行为和渠道的范围
        List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
        List<String> channelList = Arrays.asList("app store", "wechat", "weibo");

        Random  random= new Random();

        @Override
        public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
            while (running){
                // 随机生成所有字段
                long id = random.nextLong();
                String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
                String channel = channelList.get(random.nextInt(channelList.size()));
                long timestamp = System.currentTimeMillis();

                // 输出数据
                ctx.collect(new MarketingUserBehavior(id,behavior,channel,timestamp));

                Thread.sleep(100L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

代码—不分渠道

import bean.ChannelPromotionCount;
import bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @author Kewei
 * @Date 2022/3/11 9:12
 */

public class AppMarketing {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 使用自定义数据源
        SingleOutputStreamOperator<MarketingUserBehavior> dataStream = env.addSource(new MySource())
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MarketingUserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(MarketingUserBehavior element) {
                        return element.getTimestamp();
                    }
                });

        // 在之前的基础上,将分组规则都改为相同的就可以了
        SingleOutputStreamOperator<ChannelPromotionCount> result = dataStream
                .filter(data -> !"UNINSTALL".equals(data.getBehavior()))
                .keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> getKey(MarketingUserBehavior value) throws Exception {
                        return new Tuple2<>("total", "total");
                    }
                })
                .timeWindow(Time.seconds(30), Time.seconds(10))
                .aggregate(new MyAggFun(), new WindowFun());

        result.print();

        env.execute();
    }

    // AggregateFunction它会自动对相同的key进行聚合,不需要再处理key
    public static class MyAggFun implements AggregateFunction<MarketingUserBehavior,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(MarketingUserBehavior marketingUserBehavior, Long aLong) {
            return aLong+1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return acc1+aLong;
        }
    }

    public static class WindowFun implements WindowFunction<Long,ChannelPromotionCount,Tuple2<String,String>,TimeWindow>{
        @Override
        public void apply(Tuple2<String, String> tuple2, TimeWindow window, Iterable<Long> input, Collector<ChannelPromotionCount> out) throws Exception {
            out.collect(new ChannelPromotionCount(tuple2.f0,tuple2.f1,String.valueOf(window.getEnd()),input.iterator().next()));
        }
    }

    // 设定自定义数据源,实现SourceFunction
    public static class MySource implements SourceFunction<MarketingUserBehavior>{
        Boolean running = true;

        // 定义用户行为和渠道的范围
        List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
        List<String> channelList = Arrays.asList("app store", "wechat", "weibo");

        Random  random= new Random();

        @Override
        public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
            while (running){
                // 随机生成所有字段
                long id = random.nextLong();
                String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
                String channel = channelList.get(random.nextInt(channelList.size()));
                long timestamp = System.currentTimeMillis();

                // 发出数据
                ctx.collect(new MarketingUserBehavior(id,behavior,channel,timestamp));

                Thread.sleep(100L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.5 市场营销分析—页面广告统计

  • 基本需求
    • 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分。
    • 对于刷单式的频繁点击行为进行过滤,并将该用户加入黑名单
  • 解决思路
    • 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计
    • 可以用process function进行黑名单过滤,检测用户对同一广告的点击量,如果超过上限则将用户信息以侧输出流输出到黑名单中

创建POJO

  • AdClickEvent

    private Long userId;
    private Long adId;
    private String province;
    private String city;
    private Long timestamp;
    
  • BlackListUserWarning

    private Long userId;
    private Long adId;
    private String warningMsg;
    

广告点击次数统计 代码

import bean.AdClickEvent;
import bean.AdCountViewByProvince;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author Kewei
 * @Date 2022/3/11 10:21
 */

public class AdStatisticsByProvince {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\AdClickEvent\\AdClickLog.csv");

        SingleOutputStreamOperator<AdClickEvent> dataStream = inputStream.map(line -> {
                    String[] field = line.split(",");
                    return new AdClickEvent(new Long(field[0]), new Long(field[1]), field[2], field[3], new Long(field[4]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AdClickEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(AdClickEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 统计每个省点击广告的次数,滑动窗口
        SingleOutputStreamOperator<AdCountViewByProvince> result = dataStream
                .keyBy(AdClickEvent::getProvince)
                .timeWindow(Time.minutes(10), Time.minutes(2))
                .aggregate(new MyAgg(), new WindowFun());

        result.print();

        env.execute();

    }

    public static class MyAgg implements AggregateFunction<AdClickEvent,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(AdClickEvent adClickEvent, Long aLong) {
            return aLong+1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return acc1+aLong;
        }
    }

    public static class WindowFun implements WindowFunction<Long, AdCountViewByProvince,String, TimeWindow>{
        @Override
        public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<AdCountViewByProvince> out) throws Exception {
            out.collect(new AdCountViewByProvince(s,String.valueOf(window.getEnd()), input.iterator().next()));
        }
    }
}

点击异常行为黑名单过滤

import bean.AdClickEvent;
import bean.AdCountViewByProvince;
import bean.BlackListUserWarning;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Date;

/**
 * @author Kewei
 * @Date 2022/3/11 10:21
 */

public class AdStatisticsByProvinceAndWar {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\AdClickEvent\\AdClickLog.csv");

        SingleOutputStreamOperator<AdClickEvent> dataStream = inputStream.map(line -> {
                    String[] field = line.split(",");
                    return new AdClickEvent(new Long(field[0]), new Long(field[1]), field[2], field[3], new Long(field[4]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AdClickEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(AdClickEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 将数据过滤掉出某个用户点击某个广告100次以上的
        SingleOutputStreamOperator<AdClickEvent> filterStream = dataStream
                .keyBy(new KeySelector<AdClickEvent, Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> getKey(AdClickEvent value) throws Exception {
                        return new Tuple2<>(value.getUserId(), value.getAdId());
                    }
                })
                .process(new MyProcess(100));

        SingleOutputStreamOperator<AdCountViewByProvince> result = filterStream
                .keyBy(AdClickEvent::getProvince)
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new MyAgg(), new WindowFun());

        result.print();
        // 将黑名单数据输出,注意设置侧输出流的id必须和在过滤中设置的id保持一致,不然输出不了数据
        filterStream.getSideOutput(new OutputTag<BlackListUserWarning>("warning"){})
                        .print("blacklist-user");

        env.execute();

    }

    public static class MyProcess extends KeyedProcessFunction<Tuple2<Long,Long>,AdClickEvent,AdClickEvent>{
        // 设置点击次数上线
        private Integer countUpperBound;

        public MyProcess(Integer countUpperBound){
            this.countUpperBound = countUpperBound;
        }

        // 定义点击次数状态
        ValueState<Long> countState;

        // 定义是否在黑名单中状态
        ValueState<Boolean> isSentState;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent>.OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
            // 触发定时任务时,清空状态
            countState.clear();
            isSentState.clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化状态
            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count",Long.class));
            isSentState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isWri",Boolean.class));
        }

        @Override
        public void processElement(AdClickEvent value, KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent>.Context ctx, Collector<AdClickEvent> out) throws Exception {
            // 获取当前状态的值
            Long curCount = countState.value();
            Boolean isSent = isSentState.value();

            // 判断状态是否为空
            if (null == curCount){
                curCount = 0L;
            }

            if (null == isSent){
                isSent = false;
            }

            // 若计数值为0,则设置一个第二天的定时器,用于清空状态
            if (curCount == 0){
                long ts = ctx.timerService().currentProcessingTime();
                long fixedtime = DateUtils.addDays(new Date(ts), 1).getTime();
                ctx.timerService().registerEventTimeTimer(fixedtime);
            }

            // 判断次数是否大于设置的最大点击次数
            if (curCount >= countUpperBound){

                // 若还没有加入黑名单,则更新黑名单状态,并将数据加入到侧输出流中,使用ctx.output输出
                if (!isSent){
                    isSentState.update(true);
                    ctx.output(new OutputTag<BlackListUserWarning>("warning"){},new BlackListUserWarning(value.getUserId(), value.getAdId(), "click over " + countUpperBound + "times."));
                }
                // 直接返回空,不进行下面的操作
                return;
            }

            // 若判断次数没有超过设置的最大点击次数,就更新计数状态,输出原来的值
            countState.update(curCount+1);
            out.collect(value);
        }
    }

    public static class MyAgg implements AggregateFunction<AdClickEvent,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(AdClickEvent adClickEvent, Long aLong) {
            return aLong+1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return acc1+aLong;
        }
    }

    public static class WindowFun implements WindowFunction<Long, AdCountViewByProvince,String, TimeWindow>{
        @Override
        public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<AdCountViewByProvince> out) throws Exception {
            out.collect(new AdCountViewByProvince(s,String.valueOf(window.getEnd()), input.iterator().next()));
        }
    }

}

2.6 恶意登录监控

  • 基本需求
    • 用户在短时间内频繁登录失败,有可能恶意攻击的可能
    • 同一用户(可以是不同ip)在两秒内连续登录失败,需要报警
  • 解决思路
    • 将用户的登录失败行为存入ListState,设定定时器2秒后出发,查看ListState中有几次失败登录
    • 更加精确的检测,可以使用CEP库实现事件流的模式匹配?

创建POJO

  • LoginEvent

    private Long userId;
    private String ip;
    private String loginState;
    private Long timestamp;
    
  • LoginFailWarning

    private Long userId;
    private Long firstFailTime;
    private Long lastFailTime;
    private String warningMsg;
    

代码实现

import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;

/**
 * @author Kewei
 * @Date 2022/3/11 11:15
 */

public class LoginFail2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");

        SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
                    String[] field = line.split(",");
                    return new LoginEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(LoginEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 根据用户id分组,判断用户是否需要登录警告
        SingleOutputStreamOperator<LoginFailWarning> result = dataStream
                .keyBy(LoginEvent::getUserId)
                .process(new MyLogin());

        result.print();

        env.execute();
    }

    public static class MyLogin extends KeyedProcessFunction<Long,LoginEvent,LoginFailWarning>{
        // 定义列表状态,保存2s内登录失败的事件
        ListState<LoginEvent> failList;

        // 定义状态:保存注册的定时器时间戳
        // 当有需要删除定时器时,需一个状态保证注册时的时间戳
        ValueState<Long> timeState;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>.OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
            // 定时器触发。说明2s内没有登录成功,判断failList中的个数是否大于2
            ArrayList<LoginEvent> loginFails = Lists.newArrayList(failList.get().iterator());
            int failtimes = loginFails.size();

            if (failtimes >= 2){
                // 若大于2,需要输出报警
                out.collect(new LoginFailWarning(ctx.getCurrentKey()
                        ,loginFails.get(0).getTimestamp()
                        ,loginFails.get(failtimes-1).getTimestamp()
                        ,"fail times"+failtimes));
            }

            failList.clear();
            timeState.clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            failList = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("fail list",LoginEvent.class));
            timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("time",Long.class));
        }

        @Override
        public void processElement(LoginEvent value, KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>.Context ctx, Collector<LoginFailWarning> out) throws Exception {
            // 判断是否登录失败,若登录失败,将事件添加到failList中
            if ("fail".equals(value.getLoginState())){
                failList.add(value);
                if (null == timeState.value()){
                    // 若还没有注册定时任务,定一个2s后的定时任务,更新保存时间戳的状态
                    long ts = value.getTimestamp() * 1000 + 2000L;
                    timeState.update(ts);
                    ctx.timerService().registerEventTimeTimer(ts);
                }
            }else {
                // 若登录成功了,检验一下是否注册了定时器,若注册了,需要删除定时任务
                // 随后清空状态
                if (timeState.value() !=null){
                    ctx.timerService().deleteEventTimeTimer(timeState.value());
                }
                timeState.clear();
                failList.clear();
            }
        }
    }

    // 自己尝试写的,当有多个状态值,需要保存时,可以试着使用列表状态
    public static class LoginFailProcess extends KeyedProcessFunction<Long,LoginEvent,LoginEvent>{

        ValueState<Long> curCount;
        ValueState<Boolean> isFail;
        ValueState<Long> failTime;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, LoginEvent, LoginEvent>.OnTimerContext ctx, Collector<LoginEvent> out) throws Exception {
            Long failCount = curCount.value();
            ctx.output(new OutputTag<LoginFailWarning>("log"){},new LoginFailWarning(ctx.getCurrentKey(),"登录失败次数:"+failCount));
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            failTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("fail time",Long.class));
            isFail = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is fail",Boolean.class));
            curCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count",Long.class));
        }

        @Override
        public void processElement(LoginEvent value, KeyedProcessFunction<Long, LoginEvent, LoginEvent>.Context ctx, Collector<LoginEvent> out) throws Exception {
            Long cuCnt = curCount.value();
            Boolean isfail = isFail.value();

            if (null == cuCnt){
                cuCnt = 0L;
            }

            if (null == isfail){
                isfail = false;
            }

            if ("fail".equals(value.getLoginState())){
                curCount.update(cuCnt+1);

                if (!isfail){
                    isFail.update(true);
                    long ts = value.getTimestamp() * 1000 + 2000L;
                    failTime.update(ts);
                    ctx.timerService().registerEventTimeTimer(ts);
                }
                return;
            } else if ("success".equals(value.getLoginState())){
                if (isfail){
                    Long failtime = failTime.value();
                    ctx.timerService().deleteEventTimeTimer(failtime);
                    curCount.clear();
                    isFail.clear();
                    failTime.clear();
                }
            }
            out.collect(value);
        }
    }

}

根据需求,我们可以简化一下流程,将过程函数定义为:

// 实现自定义KeyedProcessFunction
  public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
    // 定义属性,最大连续登录失败次数
    private Integer maxFailTimes;

    public LoginFailDetectWarning(Integer maxFailTimes) {
      this.maxFailTimes = maxFailTimes;
    }

    // 定义状态:保存2秒内所有的登录失败事件
    ListState<LoginEvent> loginFailEventListState;

    @Override
    public void open(Configuration parameters) throws Exception {
      loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
    }

    // 以登录事件作为判断报警的触发条件,不再注册定时器
    @Override
    public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
      // 判断当前事件登录状态
      if( "fail".equals(value.getLoginState()) ){
        // 1. 如果是登录失败,获取状态中之前的登录失败事件,继续判断是否已有失败事件
        Iterator<LoginEvent> iterator = loginFailEventListState.get().iterator();
        if( iterator.hasNext() ){
          // 1.1 如果已经有登录失败事件,继续判断时间戳是否在2秒之内
          // 获取已有的登录失败事件
          LoginEvent firstFailEvent = iterator.next();
          if( value.getTimestamp() - firstFailEvent.getTimestamp() <= 2 ){
            // 1.1.1 如果在2秒之内,输出报警
            out.collect( new LoginFailWarning(value.getUserId(), firstFailEvent.getTimestamp(), value.getTimestamp(), "login fail 2 times in 2s") );
          }

          // 不管报不报警,这次都已处理完毕,直接更新状态
          loginFailEventListState.clear();
          loginFailEventListState.add(value);
        } else {
          // 1.2 如果没有登录失败,直接将当前事件存入ListState
          loginFailEventListState.add(value);
        }
      } else {
        // 2. 如果是登录成功,直接清空状态
        loginFailEventListState.clear();
      }
    }

CEP代码

使用CEP来进行筛选

导入CEP依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

代码

import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.List;
import java.util.Map;

/**
 * @author Kewei
 * @Date 2022/3/11 13:28
 */

public class LoginFailWithCep {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");

        SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LoginEvent>() {
                    @Override
                    public long extractAscendingTimestamp(LoginEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 1.定义一个匹配模式,用于筛选两秒内都登录失败的事件
        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getLoginState());
                    }
                })
                .next("secondFail").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getLoginState());
                    }
                })
                .within(Time.seconds(2));

        // 2.将匹配模式应用到数据流上,得到一个pattern strea
        PatternStream<LoginEvent> patternStream = CEP.pattern(dataStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        // 3.检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
        SingleOutputStreamOperator<LoginFailWarning> result = patternStream.select(new LoginFailMatch());

        result.print();

        env.execute();

    }

    // 实现PatternSelectFunction接口,输出事件
    public static class LoginFailMatch implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
        @Override
        public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
            LoginEvent firstFail = map.get("firstFail").iterator().next();
            LoginEvent secondFail = map.get("secondFail").get(0);
            return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(), secondFail.getTimestamp(), "login fail 2 times");
        }
    }
}

这样只能输出第一次和第二次失败的事件,下面使用CEP循环模式优化一下代码

CEP循环模式

import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.List;
import java.util.Map;

/**
 * @author Kewei
 * @Date 2022/3/11 13:28
 */

public class LoginFailWithCep2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");

        SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LoginEvent>() {
                    @Override
                    public long extractAscendingTimestamp(LoginEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 定义一个匹配模式
        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("failEvents").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getLoginState());
                    }
                })
                .times(3).consecutive()
                .within(Time.seconds(5));

        PatternStream<LoginEvent> patternStream = CEP.pattern(dataStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        SingleOutputStreamOperator<LoginFailWarning> result = patternStream.select(new LoginFailMatch());

        result.print();

        env.execute();

    }

    public static class LoginFailMatch implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
        @Override
        public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
            LoginEvent firstFail = map.get("failEvents").get(0);
            LoginEvent secondFail = map.get("failEvents").get(map.get("failEvents").size()-1);
            return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(), secondFail.getTimestamp(), "login fail " + map.get("failEvents").size() + " times");

        }
    }
}

2.7 订单支付实时监控

  • 基本需求
    • 用户下单之后,应设置订单失效事件,以提高用户支付的意愿,并降低系统风险
    • 用户下单后15分钟未支付,则输出监控信息
  • 解决思路
    • 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
    • 也可以利用状态编程,用process function实现处理逻辑

创建POJO

  • OrderEvent

    private Long orderId;
    private String eventType;
    private String txId;
    private Long timestamp;
    
  • OrderResult

    private Long orderId;
    private String resultState;
    
  • ReceiptEvent

    private String txId;
    private String payChannel;
    private Long timestamp;
    

CEP实现

import bean.OrderEvent;
import bean.OrderResult;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

/**
 * @author Kewei
 * @Date 2022/3/11 15:18
 */

public class OrderPayTimeout {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");

        SingleOutputStreamOperator<OrderEvent> dataStream = inputStream.map(line -> {
                    String[] field = line.split(",");
                    return new OrderEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
                    @Override
                    public long extractAscendingTimestamp(OrderEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 创建一个匹配规则,时间限制的模式
        Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent orderEvent) throws Exception {
                        return "create".equals(orderEvent.getEventType());
                    }
                })
                .followedBy("pay").where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent orderEvent) throws Exception {
                        return "pay".equals(orderEvent.getEventType());
                    }
                })
                .within(Time.minutes(5));

        // 定义一个侧输出流,表示超时事件
        OutputTag<OrderResult> outputTag = new OutputTag<OrderResult>("order-timeout"){};

        //将pattern应用到输入数据上,得到pattern stream
        PatternStream<OrderEvent> patternStream = CEP.pattern(dataStream.keyBy(OrderEvent::getOrderId), orderPayPattern);

        // 调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理
        /**
         * select方法有三个参数
         * 第一个:侧输出接收对象
         * 第二个:时间超出之后需要处理的方法
         * 第三个:正常事件处理的方法
         */
        SingleOutputStreamOperator<OrderResult> result = patternStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());

        result.print();
        result.getSideOutput(outputTag).print("timeout");

        env.execute();

    }

    // 实现自定义的超时事件处理函数
    public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent,OrderResult>{
        @Override
        public OrderResult timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {
            Long timeoutOrserId = map.get("create").iterator().next().getOrderId();
            return new OrderResult(timeoutOrserId,"timeout " + l);
        }
    }

    // 实现自定义的正常匹配事件处理函数
    public static class OrderPaySelect implements PatternSelectFunction<OrderEvent,OrderResult>{
        @Override
        public OrderResult select(Map<String, List<OrderEvent>> map) throws Exception {
            return new OrderResult(map.get("pay").iterator().next().getOrderId(),"payed");
        }
    }
}

ProcessFunction实现

CEP虽然更加简洁,但是ProcessFunction能控制的细节操作更多。

CEP还是比较适合事件之间有复杂联系的场景。

ProcessFunction用来处理每个独立且靠状态就能联系的事件,灵活性更高。

代码

import bean.OrderEvent;
import bean.OrderResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

/**
 * @author Kewei
 * @Date 2022/3/11 15:18
 */

public class OrderPayTimeoutWithProcess {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");

        SingleOutputStreamOperator<OrderEvent> dataStream = inputStream.map(line -> {
                    String[] field = line.split(",");
                    return new OrderEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
                    @Override
                    public long extractAscendingTimestamp(OrderEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 定义自定义处理函数,主流输出正常匹配订单事件,侧输出流输出超时报警事件
        SingleOutputStreamOperator<OrderResult> result = dataStream
                .keyBy(OrderEvent::getOrderId)
                .process(new OrderTimeoutProcess());

        result.print("pay normally");

        // 创建侧输出标签,必须使用匿名内部类(加上{})
        result.getSideOutput(new OutputTag<OrderResult>("timeout"){}).print("timeout");

        env.execute();

    }

    public static class OrderTimeoutProcess extends KeyedProcessFunction<Long,OrderEvent,OrderResult>{
        // 定义状态,保存之前点单是否已经来过create、pay的事
        ValueState<Boolean> isCreateState;
        ValueState<Boolean> isPayState;

        // 定义状态,保存定时器时间戳
        ValueState<Long> timestemp;

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, OrderEvent, OrderResult>.OnTimerContext ctx, Collector<OrderResult> out) throws Exception {
            // 触发定时任务时,肯定有一个事件没有来
            if (isPayState.value()){
                // 若pay来了,说明create没有来
                ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(ctx.getCurrentKey(), "payed but not found created log"));
            }else {
                // 若create来了,说明pay没来
                ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(ctx.getCurrentKey(),"timeout"));
            }

            isPayState.clear();
            isCreateState.clear();
            timestemp.clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 第三个参数,是否添加默认值,不加的化,默认添加默认的值null等等
            isCreateState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is create",Boolean.class,false));
            isPayState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is pay",Boolean.class,false));
            timestemp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("times temp",Long.class));
        }

        @Override
        public void processElement(OrderEvent value, KeyedProcessFunction<Long, OrderEvent, OrderResult>.Context ctx, Collector<OrderResult> out) throws Exception {
            // 获取三个状态的值
            Boolean isCreate = isCreateState.value();
            Boolean isPay = isPayState.value();
            Long timesTemp = timestemp.value();
            
            String eventType = value.getEventType();
            
            // 判断本次事件是否为create
            if ("create".equals(eventType)){
                // 若是,并且已经付过款pay了,则将数据正常输出,并清除状态,删除定时器
                if (isPay){
                    out.collect(new OrderResult(value.getOrderId(),"payed successfully"));

                    isCreateState.clear();
                    isPayState.clear();
                    timestemp.clear();
                    ctx.timerService().deleteEventTimeTimer(timesTemp);
                }else {
                    // 若是create,但是没有pay,则设置一个15分钟之后的定时任务,并更新相关状态
                    long ts = (value.getTimestamp() + 15 * 60) * 1000;
                    ctx.timerService().registerEventTimeTimer(ts);
                    timestemp.update(ts);
                    isCreateState.update(true);
                }
            }else if ("pay".equals(eventType)){
                if (isCreate){
                    // 若是pay,并且之前已经有create了
                    // 先判断一下是否超时,若没有超时,则正常输出,否则,将数据输出到 侧输出流
                    if (value.getTimestamp()*1000L < timesTemp){
                        out.collect(new OrderResult(value.getOrderId(),"payed successfully"));
                    }else {
                        ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(value.getOrderId(),"payed but already timeout"));
                    }
                    // 最后清空状态,并删除任务
                    isCreateState.clear();
                    isPayState.clear();
                    timestemp.clear();
                    ctx.timerService().deleteEventTimeTimer(timesTemp);
                }else {
                    // 若是pay,但是没有create ,是由于乱序数据,注册一个定时任务,等一下create
                    ctx.timerService().registerEventTimeTimer(value.getTimestamp()*1000L);
                    timestemp.update(value.getTimestamp()*1000L);
                    isPayState.update(true);
                }
            }
        }
    }

}

2.8 订单支付实时对账

  • 基本需求
    • 用户下单并支付之后,应查询到账信息,进行实时对帐
    • 如果有不匹配的支付信息或者到账信息,输出提示信息
  • 解决思路
    • 从两条流中分别读取订单支付信息和到账信息,合并处理
    • 用connect连接合并两条流,用coProcessFunction做匹配处理

创建POJO

  • ReceiptEvent

    private String txId;
    private String payChannel;
    private Long timestamp;
    

代码

import bean.OrderEvent;
import bean.ReceiptEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author Kewei
 * @Date 2022/3/11 16:22
 */

public class TxPayMatch {

    private final static OutputTag<OrderEvent> unmatchedPays = new OutputTag<OrderEvent>("unmatched-pays") {};
    private final static OutputTag<ReceiptEvent> unmatchedReceipts  = new OutputTag<ReceiptEvent>("unmatched-receipts") {};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream1 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
        SingleOutputStreamOperator<OrderEvent> orderStream = inputStream1.map(line -> {
                    String[] fields = line.split(",");
                    return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(OrderEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        DataStreamSource<String> inputStream2 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\ReceiptLog.csv");
        SingleOutputStreamOperator<ReceiptEvent> receiptStream = inputStream2.map(line -> {
                    String[] fields = line.split(",");
                    return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(ReceiptEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 将订单id为空的数据过滤,之后使用connect连接数据
        SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderStream.filter(data -> !"".equals(data.getTxId())).keyBy(OrderEvent::getTxId)
                .connect(receiptStream.keyBy(ReceiptEvent::getTxId))
                .process(new TxPayMatchDetect());

        result.print("normal");
        result.getSideOutput(unmatchedPays).print("unmatchedPays");
        result.getSideOutput(unmatchedReceipts).print("unmatchedReceipts");

        env.execute();
    }

    // 继承CoProcessFunction类,用于处理连接的数据
    public static class TxPayMatchDetect extends CoProcessFunction<OrderEvent,ReceiptEvent, Tuple2<OrderEvent,ReceiptEvent>>{
        // 设置两个状态,保存连接的数据
        ValueState<OrderEvent> payState;
        ValueState<ReceiptEvent> receiptState;

        @Override
        public void onTimer(long timestamp, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            // 判断状态中是否有值,而不是状态本身是否为null
            // 定时器触发,有可能是有一个事件没来,不匹配,也有可能是都来过了,已经输出并清空状态
            // 判断哪个不为空,那么另一个就没来
            if (payState.value() != null){
                ctx.output(unmatchedPays,payState.value());
            }

            if (receiptState.value() != null){
                ctx.output(unmatchedReceipts,receiptState.value());
            }

            payState.clear();
            receiptState.clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("pay state",OrderEvent.class));
            receiptState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt state",ReceiptEvent.class));
        }

        @Override
        public void processElement1(OrderEvent value, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            // 订单支付事件来了,判断是否已经有对应的到账事件
            ReceiptEvent receiptEvent = receiptState.value();
            if (null != receiptEvent){
                // 如果receipt不为空,说明到账事件已经来过,输出匹配事件,清空状态
                out.collect(new Tuple2<>(value,receiptEvent));

                payState.clear();
                receiptState.clear();
            }else {
                // 如果receipt没来,注册一个定时器,开始等待,并更新状态
                ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 5)*1000L);
                payState.update(value);
            }
        }

        @Override
        public void processElement2(ReceiptEvent value, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            // // 到账事件来了,判断是否已经有对应的订单事件
            OrderEvent orderEvent = payState.value();

            if (null != orderEvent){
                // 如果order不为空,说明订单事件已经来过,输出匹配事件,清空状态
                out.collect(new Tuple2<>(orderEvent,value));

                payState.clear();
                receiptState.clear();
            }else {
                // 如果order没来,注册一个定时器,开始等待,并更新状态
                ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 3) * 1000L);
                receiptState.update(value);
            }
        }
    }
}

使用Join实现

这种方法的缺陷,只能获得正常匹配的结果,不能获得未匹配成功的记录。

代码

import bean.OrderEvent;
import bean.ReceiptEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author Kewei
 * @Date 2022/3/11 16:22
 */

public class TxPayMatchByJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> inputStream1 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
        SingleOutputStreamOperator<OrderEvent> orderStream = inputStream1.map(line -> {
                    String[] fields = line.split(",");
                    return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(OrderEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        DataStreamSource<String> inputStream2 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\ReceiptLog.csv");
        SingleOutputStreamOperator<ReceiptEvent> receiptStream = inputStream2.map(line -> {
                    String[] fields = line.split(",");
                    return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(ReceiptEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 区间连接两条流,得到匹配的数据
        SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderStream.filter(data -> !"".equals(data.getTxId()))
                .keyBy(OrderEvent::getTxId)
                .intervalJoin(receiptStream.keyBy(ReceiptEvent::getTxId))
                .between(Time.seconds(-3), Time.seconds(5))   // -3,5 区间范围
                .process(new TxPayMatchDetectByJoin());

        result.print();

        env.execute();
    }

    // 实现自定义ProcessJoinFunction
    public static class TxPayMatchDetectByJoin extends ProcessJoinFunction<OrderEvent,ReceiptEvent,Tuple2<OrderEvent,ReceiptEvent>>{
        @Override
        public void processElement(OrderEvent left, ReceiptEvent right, ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            out.collect(new Tuple2<>(left,right));
        }
    }

}
举报

相关推荐

0 条评论