Flink 应用-电商用户行为分析
文章目录
相关博客:
Flink电商项目第一天-电商用户行为分析及完整图步骤解析-热门商品统计TopN的实现
一、电商用户行为分析
电商业务分析主要有以下三类:
- 统计分析
- 点击、浏览
- 热门商品、最近热门商品、分类热门商品、流量统计
- 偏好统计
- 收藏、喜欢、评分、打标签
- 用户画像、推荐列表
- 风险控制
- 下订单、支付、登录
- 刷单监控、订单失效监控、恶意登录(短时间内频繁登录失败)监控
1.1 项目模块设计
电商分析按照流量和业务分类,可分为两大类:
按照统计类型分类如下:
1.2 数据源
数据结构
UserBehavior
ApacheLogEvent
二、项目模块
本次项目做5个分析:
处理数据时,先对某个id分组,并设定窗口,之后对某个字段增量聚合(并设定指定输出格式),最后最窗口分组,将同一个窗口内的数据累加。
2.1 实时热门商品统计
- 基本需求
- 统计近1个小时内的热门商品,每五分钟更新一次
- 热门度使用浏览(“pv”)来衡量
- 解决思路
- 在所有用户行为数据中,过滤出浏览(”pv”)行为进行统计
- 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟,统计出每一种商品的访问数
- 再根据滑动窗口的时间,统计出访问次数最多的5个商品
第二步的流程大致如下:
首先,按照商品id进行分区
接着对数据划分滑动时间窗口
时间窗口区间为左闭右开,同一份数据会被分到不同的窗口。
例如:
然后进行窗口聚合
aggregate第一个参数是窗口聚合的规则,第二个参数是定义输出的数据结构
窗口聚合函数
窗口聚合策略—每出现一条记录就加一。
需要实现AggragateFunction接口,并需要实现4个函数createAccumulator
、add
、merge
、getResult
interface AggregateFunction<IN, ACC, OUT>
/**
* IN :输入类型
* ACC :累加器类型
* OUT :输出类型
*/
Window输出类型函数
定义输出结构:ItemViewCount(itemId,windowEnd,count)
itemId:商品id
windowEnd:窗口结束时间
count:计数
interface WindowFunction<IN,OUT,KEY,W extends Window>
/**
* WindowFunction<IN,OUT,KEY,W extends Window>
* IN :输出类型,就是累加器最后输出类型
* OUT:最后想要输出类型
* KEY:Tuple泛型,分组的key,在这里是itemId,窗口根据itemId聚合
* W : 聚合的窗口,w.getEnd就能拿到窗口的结束时间
*/
画图举例:
首先根据id分组,然后窗口聚合。
之后再进行统计处理,相同窗口的数据放在一起,并输出top5
这个过程需要一个中间值,把同一个窗口的数据都放进去,这就需要状态了。
最终排序输出 — KeyedProcessFunction
- 针对有状态流的底层API
- KeyedProcessFunction会对分区后的每一条子流进行处理
- 以windowEnd作为key,保证分流以后每一条流的数据都在一个时间窗口内
- 从ListState中读取当前流的状态,存储数据进行排序输出
使用ProcessFunction定义KeyedStream的处理逻辑。
分区之后,每个KeyedStream都有自己的生命周期
- open:初始化,在这里可以获取当前流的状态
- processElement:处理流中每一个元素时调用
- onTimer:定时调用,注册定时器Timer并触发之后的回调操作
创建POJO
需要生成get/set、无参/有参构造函数,toString
-
ItemViewCount
private Long itemId; private Long windowEnd; private Long count;
-
UserBehavior
private Long uerId; private Long itemId; private Integer categoryId; private String behavior; private Long timestamp;
代码
测试使用的是60秒窗口大小、30秒滑动窗口
import beans.ItemViewCount;
import beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
/**
* @author Kewei
* @Date 2022/3/5 15:10
*/
public class HotItems {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
// 将数据转换为POJO,并设置事件时间
SingleOutputStreamOperator<UserBehavior> dataStream = inputStream.map(line -> {
String[] filed = line.split(",");
return new UserBehavior(new Long(filed[0]), new Long(filed[1]), new Integer(filed[2]), filed[3], new Long(filed[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
return userBehavior.getTimestamp() * 1000;
}
});
// 筛选出pv的数据,按照商品id分组,划分滑动时间窗口,对每个窗口进行增量聚合,并将输出结果进行设定指定格式ItemViewCount
SingleOutputStreamOperator<ItemViewCount> windowAggStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.keyBy("itemId")
.timeWindow(Time.seconds(60), Time.seconds(30))
.aggregate(new CountAgg(), new WindowResultFunction());
// 将同一个窗口的数据,进行分组,最后设置定时输出
SingleOutputStreamOperator<String> resultStream = windowAggStream
.keyBy("windowEnd")
.process(new TopNHotItems(5));
resultStream.print();
env.execute("hot items");
}
// 设定同一个商品数据的聚合方法
/**
* AggregateFunction<IN, ACC, OUT>
* IN :输出类型
* ACC:累加器类型
* OUT:最后输出结果
*/
private static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return aLong + acc1;
}
}
// 设定输出格式
/**
* WindowFunction<IN,OUT,KEY,W extends Window>
* IN :输出类型,就是累加器最后输出类型
* OUT:最后想要输出类型
* KEY:Tuple泛型,分组的key,在这里是itemId,窗口根据itemId聚合
* W : 聚合的窗口,w.getEnd就能拿到窗口的结束时间
*/
private static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> out) throws Exception {
Long itemId = tuple.getField(0);
Long end = timeWindow.getEnd();
Long next = iterable.iterator().next();
out.collect(new ItemViewCount(itemId, end, next));
}
}
/**
* KeyedProcessFunction<KEY,IN,OUT>
* KEY: 分组key的类型
* IN : 输入的类型
* OUT:输出的类型
*/
private static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
private Integer topSize;
public TopNHotItems(Integer topSize) {
this.topSize = topSize;
}
ListState<ItemViewCount> itemViewCountListState;
@Override
public void open(Configuration parameters) throws Exception {
itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("Item-view-count-list",ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount count, KeyedProcessFunction<Tuple, ItemViewCount, String>.Context ctx, Collector<String> out) throws Exception {
itemViewCountListState.add(count);
// 注册一个定时器,在1毫秒之后运行,由于同一个窗口的结束时间时一样的,所以当时间变了,就说明同一个窗口的数据都添加进去了
ctx.timerService().registerEventTimeTimer(count.getWindowEnd() + 1);
}
// 设定定时器任务
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, ItemViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// 将ListState转换成ArrayList
ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());
// 排序
itemViewCounts.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return o2.getCount().intValue() - o1.getCount().intValue();
}
});
// 将数据格式化
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("===========\n");
stringBuffer.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < Math.min(topSize, itemViewCounts.size()); i++) {
ItemViewCount itemViewCount = itemViewCounts.get(i);
stringBuffer.append("NO ").append(i+1).append(":")
.append(" 商品id = ").append(itemViewCount.getItemId())
.append(" 热门度 = ").append(itemViewCount.getCount())
.append("\n");
}
stringBuffer.append("============\n\n");
// 控制输出频率
Thread.sleep(1000L);
// 输出数据
out.collect(stringBuffer.toString());
}
}
}
输出:
===========
窗口结束时间:2017-11-26 09:00:30.0
NO 1: 商品id = 2455388 热门度 = 2
NO 2: 商品id = 1715 热门度 = 1
NO 3: 商品id = 2244074 热门度 = 1
NO 4: 商品id = 3076029 热门度 = 1
NO 5: 商品id = 176722 热门度 = 1
============
...
2.2 实时流量统计 — 热门网页
- 基本需求
- 从web服务器的日志中,统计实时的热门访问页面
- 统计每分钟的ip访问量,取出访问量最大的5个地址,每五秒更新一次
- 解决思路
- 将apache服务器日志中的时间,转换为时间戳,作为Event Time
- 筛选出get请求的网页,将请求资源的的数据过滤掉
- 根据url分组,构建滑动窗口,窗口长度1分钟、滑动距离为5秒,之后进行增量聚合,并指定格式输出
- 最后根据窗口的时间分组,将同一个窗口的数据聚合,格式化输出
创建POJO
-
ApacheLogEvent
private String ip; private String userId; private Long timestamp; private String method; private String url;
-
PageViewCount
private String url; private Long windowEnd; private Long count;
代码
import bean.ApacheLogEvent;
import bean.PageViewCount;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.regex.Pattern;
/**
* @author Kewei
* @Date 2022/3/10 15:57
*/
public class HotPages {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotPages\\apache.log");
// 将字符串格式的DataSream转换为POJO格式,并设置EventTime和watermark
SingleOutputStreamOperator<ApacheLogEvent> dataStream = inputStream.map(line -> {
String[] fields = line.split(" ");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = simpleDateFormat.parse(fields[3]).getTime();
return new ApacheLogEvent(fields[0], fields[1], time, fields[5], fields[6]);
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) {
@Override
public long extractTimestamp(ApacheLogEvent element) {
return element.getTimestamp();
}
});
/**
* 筛选数据,将请求和请求网页的数据筛选出来
* 之后根据url分组,设置滑动窗口(窗口大小1分钟,滑动距离5秒)
* 最后增量聚合,设置指定格式输出
*/
SingleOutputStreamOperator<PageViewCount> resStream = dataStream
.filter(data -> "GET".equals(data.getMethod()))
.filter(data -> {
String regex = "^((?!\\\\.(css|js|png|ico)$).)*$";
return Pattern.matches(regex, data.getUrl());
})
.keyBy(ApacheLogEvent::getUrl)
.timeWindow(Time.minutes(1), Time.seconds(5))
.aggregate(new PageCountAgg(), new PageView());
/**
* 根据窗口的最后时间进行分组,之后对分组之后的数据格式化输出
*/
SingleOutputStreamOperator<String> resultStream = resStream
.keyBy(PageViewCount::getWindowEnd)
.process(new MyProcessFunc());
resultStream.print();
env.execute();
}
public static class PageCountAgg implements AggregateFunction<ApacheLogEvent, Long, Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ApacheLogEvent apacheLogEvent, Long aLong) {
return aLong+1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return aLong+acc1;
}
}
public static class PageView implements WindowFunction<Long, PageViewCount,String, TimeWindow>{
@Override
public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
out.collect(new PageViewCount(s, window.getEnd(), input.iterator().next()));
}
}
public static class MyProcessFunc extends KeyedProcessFunction<Long,PageViewCount,String>{
private ListState<PageViewCount> list;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, PageViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<PageViewCount> pageViewCountArrayList = Lists.newArrayList(list.get().iterator());
pageViewCountArrayList.sort((p1,p2) -> p2.getCount().intValue() - p1.getCount().intValue());
StringBuffer result = new StringBuffer();
result.append("====================\n");
result.append("窗口结束时间").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < Math.min(5, pageViewCountArrayList.size()); i++) {
PageViewCount pageView = pageViewCountArrayList.get(i);
result.append(pageView.getUrl()).append(" ").append(pageView.getCount()).append("\n");
}
result.append("===============\n\n\n");
Thread.sleep(1000);
out.collect(result.toString());
}
@Override
public void open(Configuration parameters) throws Exception {
list = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("PageViewCount",PageViewCount.class));
}
@Override
public void processElement(PageViewCount value, KeyedProcessFunction<Long, PageViewCount, String>.Context ctx, Collector<String> out) throws Exception {
list.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1);
}
}
}
乱序输出
有点不理解,之后再看一下
2.3 实时流量统计 — PV和UV
- 基本需求
- 从埋点日志中,统计实时的PV和UV
- 统计每小时的访问量(PV),并且对用户去重(UV)
- 解决思路
- 对于pv行为,可以直接对数据筛选过滤之后,设置滚动时间窗口,sum累计就可以了。
- 对于uv行为,需要利用Set数据结构进行去重
- 对于超大规模的数据,可以考虑用布隆过滤器进行去重
统计PV代码
import beans.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.java.StreamTableEnvironment;
/**
* @author Kewei
* @Date 2022/3/10 17:15
*/
public class PageView {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputSream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> dataStream = inputSream.map(line -> {
String[] field = line.split(",");
return new UserBehavior(new Long(field[0]), new Long(field[1]), new Integer(field[2]), field[3], new Long(field[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
return new Tuple2<>("pv", 1L);
}
})
.keyBy(data -> data.f0)
.timeWindow(Time.hours(1))
.sum(1);
result.print();
env.execute();
}
}
输出:
(pv,41890)
(pv,48022)
(pv,47298)
(pv,44499)
(pv,48649)
(pv,50838)
(pv,52296)
(pv,52552)
(pv,48292)
(pv,13)
可以发现最后一个窗口的数据,特别少的数据,数据发生倾斜。
下面使用随机Key来统计数据。
import beans.PageViewCount;
import beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Random;
/**
* @author Kewei
* @Date 2022/3/10 17:15
*/
public class PageView2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputSream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> dataStream = inputSream.map(line -> {
String[] field = line.split(",");
return new UserBehavior(new Long(field[0]), new Long(field[1]), new Integer(field[2]), field[3], new Long(field[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
return new Tuple2<>("pv", 1L);
}
})
.keyBy(data -> data.f0)
.timeWindow(Time.hours(1))
.sum(1);
// 使用随机key,解决数据倾斜的问题
SingleOutputStreamOperator<PageViewCount> resStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
Random random = new Random();
return new Tuple2<>(random.nextInt(10), 1L);
}
})
.keyBy(data -> data.f0)
.timeWindow(Time.hours(1))
.aggregate(new MyAgg(), new WindowFunc());
SingleOutputStreamOperator<Tuple2<String, Long>> result2 = resStream
.keyBy(PageViewCount::getWindowEnd)
.process(new MyProcessFunc());
result2.print();
// result.print();
env.execute();
}
public static class MyAgg implements AggregateFunction<Tuple2<Integer,Long>,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
return aLong+integerLongTuple2.f1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return acc1+aLong;
}
}
public static class WindowFunc implements WindowFunction<Long, PageViewCount,Integer, TimeWindow>{
@Override
public void apply(Integer integer, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
out.collect(new PageViewCount(integer.toString(), window.getEnd(), input.iterator().next()));
}
}
public static class MyProcessFunc extends KeyedProcessFunction<Long,PageViewCount,Tuple2<String,Long>>{
private ListState<PageViewCount> pageViewCountListState;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, PageViewCount, Tuple2<String, Long>>.OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
ArrayList<PageViewCount> pageViewCounts = Lists.newArrayList(pageViewCountListState.get().iterator());
Long sum = 0L;
for (PageViewCount pageViewCount : pageViewCounts) {
sum += pageViewCount.getCount();
}
out.collect(new Tuple2<>("pv",sum));
}
@Override
public void open(Configuration parameters) throws Exception {
pageViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("PageViewCount",PageViewCount.class));
}
@Override
public void processElement(PageViewCount value, KeyedProcessFunction<Long, PageViewCount, Tuple2<String, Long>>.Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
pageViewCountListState.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1);
}
}
}
输出
(pv,41890)
(pv,48022)
(pv,47298)
(pv,44499)
(pv,48649)
(pv,50838)
(pv,52296)
(pv,52552)
(pv,48292)
(pv,13)
统计UV代码
import beans.PageViewCount;
import beans.UserBehavior;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashSet;
/**
* @author Kewei
* @Date 2022/3/10 20:39
*/
public class UserView {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator<PageViewCount> result = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.timeWindowAll(Time.hours(1))
.apply(new UvCountResult());
result.print();
env.execute();
}
// 实现一个自定义全窗口函数
public static class UvCountResult implements AllWindowFunction<UserBehavior, PageViewCount, TimeWindow>{
@Override
public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<PageViewCount> out) throws Exception {
// 定义一个Set结构,保存窗口中所有的userid,自动去重
HashSet<Long> uidSet = new HashSet<>();
for (UserBehavior value : values) {
uidSet.add(value.getUserId());
}
out.collect(new PageViewCount("uv",window.getEnd(),(long) uidSet.size()));
}
}
}
使用布隆过滤器?不明白?
2.4 市场营销分析—APP市场推广统计
- 基本需求
- 从埋点日志中,统计APP市场推广的数据指标
- 按照不同的推广渠道,分别统计数据
- 解决思路
- 通过过滤日志中的用户行为,按照不同的渠道进行统计
- 可以用process function处理,得到自定义的输出数据信息
创建POJO
-
MarketingUserBehavior
private Long userId; private String behavior; private String channel; private Long timestamp;
-
ChannelPromotionCount
private String channel; private String behavior; private String windowEnd; private Long count;
代码
import bean.ChannelPromotionCount;
import bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* @author Kewei
* @Date 2022/3/11 9:12
*/
public class AppMarketingByChannel2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 使用自定义数据源
SingleOutputStreamOperator<MarketingUserBehavior> dataStream = env.addSource(new MySource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MarketingUserBehavior>() {
@Override
public long extractAscendingTimestamp(MarketingUserBehavior element) {
return element.getTimestamp();
}
});
// 根据渠道和行为分组,然后聚合
SingleOutputStreamOperator<ChannelPromotionCount> result = dataStream
.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketingUserBehavior value) throws Exception {
return new Tuple2<>(value.getChannel(), value.getBehavior());
}
})
.timeWindow(Time.seconds(30), Time.seconds(10))
.aggregate(new MyAggFun(), new WindowFun());
result.print();
env.execute();
}
// AggregateFunction它会自动对相同的key进行聚合,不需要再处理key
public static class MyAggFun implements AggregateFunction<MarketingUserBehavior,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketingUserBehavior marketingUserBehavior, Long aLong) {
return aLong+1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return acc1+aLong;
}
}
public static class WindowFun implements WindowFunction<Long,ChannelPromotionCount,Tuple2<String,String>,TimeWindow>{
@Override
public void apply(Tuple2<String, String> tuple2, TimeWindow window, Iterable<Long> input, Collector<ChannelPromotionCount> out) throws Exception {
out.collect(new ChannelPromotionCount(tuple2.f0,tuple2.f1,String.valueOf(window.getEnd()),input.iterator().next()));
}
}
// 自定义数据源
public static class MySource implements SourceFunction<MarketingUserBehavior>{
Boolean running = true;
// 定义用户行为和渠道的范围
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "weibo");
Random random= new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
while (running){
// 随机生成所有字段
long id = random.nextLong();
String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
String channel = channelList.get(random.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
// 输出数据
ctx.collect(new MarketingUserBehavior(id,behavior,channel,timestamp));
Thread.sleep(100L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
代码—不分渠道
import bean.ChannelPromotionCount;
import bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* @author Kewei
* @Date 2022/3/11 9:12
*/
public class AppMarketing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 使用自定义数据源
SingleOutputStreamOperator<MarketingUserBehavior> dataStream = env.addSource(new MySource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MarketingUserBehavior>() {
@Override
public long extractAscendingTimestamp(MarketingUserBehavior element) {
return element.getTimestamp();
}
});
// 在之前的基础上,将分组规则都改为相同的就可以了
SingleOutputStreamOperator<ChannelPromotionCount> result = dataStream
.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketingUserBehavior value) throws Exception {
return new Tuple2<>("total", "total");
}
})
.timeWindow(Time.seconds(30), Time.seconds(10))
.aggregate(new MyAggFun(), new WindowFun());
result.print();
env.execute();
}
// AggregateFunction它会自动对相同的key进行聚合,不需要再处理key
public static class MyAggFun implements AggregateFunction<MarketingUserBehavior,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketingUserBehavior marketingUserBehavior, Long aLong) {
return aLong+1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return acc1+aLong;
}
}
public static class WindowFun implements WindowFunction<Long,ChannelPromotionCount,Tuple2<String,String>,TimeWindow>{
@Override
public void apply(Tuple2<String, String> tuple2, TimeWindow window, Iterable<Long> input, Collector<ChannelPromotionCount> out) throws Exception {
out.collect(new ChannelPromotionCount(tuple2.f0,tuple2.f1,String.valueOf(window.getEnd()),input.iterator().next()));
}
}
// 设定自定义数据源,实现SourceFunction
public static class MySource implements SourceFunction<MarketingUserBehavior>{
Boolean running = true;
// 定义用户行为和渠道的范围
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "weibo");
Random random= new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
while (running){
// 随机生成所有字段
long id = random.nextLong();
String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
String channel = channelList.get(random.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
// 发出数据
ctx.collect(new MarketingUserBehavior(id,behavior,channel,timestamp));
Thread.sleep(100L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
2.5 市场营销分析—页面广告统计
- 基本需求
- 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分。
- 对于刷单式的频繁点击行为进行过滤,并将该用户加入黑名单
- 解决思路
- 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计
- 可以用
process function
进行黑名单过滤,检测用户对同一广告的点击量,如果超过上限则将用户信息以侧输出流输出到黑名单中
创建POJO
-
AdClickEvent
private Long userId; private Long adId; private String province; private String city; private Long timestamp;
-
BlackListUserWarning
private Long userId; private Long adId; private String warningMsg;
广告点击次数统计 代码
import bean.AdClickEvent;
import bean.AdCountViewByProvince;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author Kewei
* @Date 2022/3/11 10:21
*/
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\AdClickEvent\\AdClickLog.csv");
SingleOutputStreamOperator<AdClickEvent> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new AdClickEvent(new Long(field[0]), new Long(field[1]), field[2], field[3], new Long(field[4]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AdClickEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 统计每个省点击广告的次数,滑动窗口
SingleOutputStreamOperator<AdCountViewByProvince> result = dataStream
.keyBy(AdClickEvent::getProvince)
.timeWindow(Time.minutes(10), Time.minutes(2))
.aggregate(new MyAgg(), new WindowFun());
result.print();
env.execute();
}
public static class MyAgg implements AggregateFunction<AdClickEvent,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdClickEvent adClickEvent, Long aLong) {
return aLong+1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return acc1+aLong;
}
}
public static class WindowFun implements WindowFunction<Long, AdCountViewByProvince,String, TimeWindow>{
@Override
public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<AdCountViewByProvince> out) throws Exception {
out.collect(new AdCountViewByProvince(s,String.valueOf(window.getEnd()), input.iterator().next()));
}
}
}
点击异常行为黑名单过滤
import bean.AdClickEvent;
import bean.AdCountViewByProvince;
import bean.BlackListUserWarning;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Date;
/**
* @author Kewei
* @Date 2022/3/11 10:21
*/
public class AdStatisticsByProvinceAndWar {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\AdClickEvent\\AdClickLog.csv");
SingleOutputStreamOperator<AdClickEvent> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new AdClickEvent(new Long(field[0]), new Long(field[1]), field[2], field[3], new Long(field[4]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AdClickEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 将数据过滤掉出某个用户点击某个广告100次以上的
SingleOutputStreamOperator<AdClickEvent> filterStream = dataStream
.keyBy(new KeySelector<AdClickEvent, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> getKey(AdClickEvent value) throws Exception {
return new Tuple2<>(value.getUserId(), value.getAdId());
}
})
.process(new MyProcess(100));
SingleOutputStreamOperator<AdCountViewByProvince> result = filterStream
.keyBy(AdClickEvent::getProvince)
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new MyAgg(), new WindowFun());
result.print();
// 将黑名单数据输出,注意设置侧输出流的id必须和在过滤中设置的id保持一致,不然输出不了数据
filterStream.getSideOutput(new OutputTag<BlackListUserWarning>("warning"){})
.print("blacklist-user");
env.execute();
}
public static class MyProcess extends KeyedProcessFunction<Tuple2<Long,Long>,AdClickEvent,AdClickEvent>{
// 设置点击次数上线
private Integer countUpperBound;
public MyProcess(Integer countUpperBound){
this.countUpperBound = countUpperBound;
}
// 定义点击次数状态
ValueState<Long> countState;
// 定义是否在黑名单中状态
ValueState<Boolean> isSentState;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent>.OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
// 触发定时任务时,清空状态
countState.clear();
isSentState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count",Long.class));
isSentState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isWri",Boolean.class));
}
@Override
public void processElement(AdClickEvent value, KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent>.Context ctx, Collector<AdClickEvent> out) throws Exception {
// 获取当前状态的值
Long curCount = countState.value();
Boolean isSent = isSentState.value();
// 判断状态是否为空
if (null == curCount){
curCount = 0L;
}
if (null == isSent){
isSent = false;
}
// 若计数值为0,则设置一个第二天的定时器,用于清空状态
if (curCount == 0){
long ts = ctx.timerService().currentProcessingTime();
long fixedtime = DateUtils.addDays(new Date(ts), 1).getTime();
ctx.timerService().registerEventTimeTimer(fixedtime);
}
// 判断次数是否大于设置的最大点击次数
if (curCount >= countUpperBound){
// 若还没有加入黑名单,则更新黑名单状态,并将数据加入到侧输出流中,使用ctx.output输出
if (!isSent){
isSentState.update(true);
ctx.output(new OutputTag<BlackListUserWarning>("warning"){},new BlackListUserWarning(value.getUserId(), value.getAdId(), "click over " + countUpperBound + "times."));
}
// 直接返回空,不进行下面的操作
return;
}
// 若判断次数没有超过设置的最大点击次数,就更新计数状态,输出原来的值
countState.update(curCount+1);
out.collect(value);
}
}
public static class MyAgg implements AggregateFunction<AdClickEvent,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdClickEvent adClickEvent, Long aLong) {
return aLong+1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return acc1+aLong;
}
}
public static class WindowFun implements WindowFunction<Long, AdCountViewByProvince,String, TimeWindow>{
@Override
public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<AdCountViewByProvince> out) throws Exception {
out.collect(new AdCountViewByProvince(s,String.valueOf(window.getEnd()), input.iterator().next()));
}
}
}
2.6 恶意登录监控
- 基本需求
- 用户在短时间内频繁登录失败,有可能恶意攻击的可能
- 同一用户(可以是不同ip)在两秒内连续登录失败,需要报警
- 解决思路
- 将用户的登录失败行为存入ListState,设定定时器2秒后出发,查看ListState中有几次失败登录
- 更加精确的检测,可以使用CEP库实现事件流的模式匹配?
创建POJO
-
LoginEvent
private Long userId; private String ip; private String loginState; private Long timestamp;
-
LoginFailWarning
private Long userId; private Long firstFailTime; private Long lastFailTime; private String warningMsg;
代码实现
import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
/**
* @author Kewei
* @Date 2022/3/11 11:15
*/
public class LoginFail2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");
SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new LoginEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 根据用户id分组,判断用户是否需要登录警告
SingleOutputStreamOperator<LoginFailWarning> result = dataStream
.keyBy(LoginEvent::getUserId)
.process(new MyLogin());
result.print();
env.execute();
}
public static class MyLogin extends KeyedProcessFunction<Long,LoginEvent,LoginFailWarning>{
// 定义列表状态,保存2s内登录失败的事件
ListState<LoginEvent> failList;
// 定义状态:保存注册的定时器时间戳
// 当有需要删除定时器时,需一个状态保证注册时的时间戳
ValueState<Long> timeState;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>.OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
// 定时器触发。说明2s内没有登录成功,判断failList中的个数是否大于2
ArrayList<LoginEvent> loginFails = Lists.newArrayList(failList.get().iterator());
int failtimes = loginFails.size();
if (failtimes >= 2){
// 若大于2,需要输出报警
out.collect(new LoginFailWarning(ctx.getCurrentKey()
,loginFails.get(0).getTimestamp()
,loginFails.get(failtimes-1).getTimestamp()
,"fail times"+failtimes));
}
failList.clear();
timeState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
failList = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("fail list",LoginEvent.class));
timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("time",Long.class));
}
@Override
public void processElement(LoginEvent value, KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>.Context ctx, Collector<LoginFailWarning> out) throws Exception {
// 判断是否登录失败,若登录失败,将事件添加到failList中
if ("fail".equals(value.getLoginState())){
failList.add(value);
if (null == timeState.value()){
// 若还没有注册定时任务,定一个2s后的定时任务,更新保存时间戳的状态
long ts = value.getTimestamp() * 1000 + 2000L;
timeState.update(ts);
ctx.timerService().registerEventTimeTimer(ts);
}
}else {
// 若登录成功了,检验一下是否注册了定时器,若注册了,需要删除定时任务
// 随后清空状态
if (timeState.value() !=null){
ctx.timerService().deleteEventTimeTimer(timeState.value());
}
timeState.clear();
failList.clear();
}
}
}
// 自己尝试写的,当有多个状态值,需要保存时,可以试着使用列表状态
public static class LoginFailProcess extends KeyedProcessFunction<Long,LoginEvent,LoginEvent>{
ValueState<Long> curCount;
ValueState<Boolean> isFail;
ValueState<Long> failTime;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, LoginEvent, LoginEvent>.OnTimerContext ctx, Collector<LoginEvent> out) throws Exception {
Long failCount = curCount.value();
ctx.output(new OutputTag<LoginFailWarning>("log"){},new LoginFailWarning(ctx.getCurrentKey(),"登录失败次数:"+failCount));
}
@Override
public void open(Configuration parameters) throws Exception {
failTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("fail time",Long.class));
isFail = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is fail",Boolean.class));
curCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count",Long.class));
}
@Override
public void processElement(LoginEvent value, KeyedProcessFunction<Long, LoginEvent, LoginEvent>.Context ctx, Collector<LoginEvent> out) throws Exception {
Long cuCnt = curCount.value();
Boolean isfail = isFail.value();
if (null == cuCnt){
cuCnt = 0L;
}
if (null == isfail){
isfail = false;
}
if ("fail".equals(value.getLoginState())){
curCount.update(cuCnt+1);
if (!isfail){
isFail.update(true);
long ts = value.getTimestamp() * 1000 + 2000L;
failTime.update(ts);
ctx.timerService().registerEventTimeTimer(ts);
}
return;
} else if ("success".equals(value.getLoginState())){
if (isfail){
Long failtime = failTime.value();
ctx.timerService().deleteEventTimeTimer(failtime);
curCount.clear();
isFail.clear();
failTime.clear();
}
}
out.collect(value);
}
}
}
根据需求,我们可以简化一下流程,将过程函数定义为:
// 实现自定义KeyedProcessFunction
public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
// 定义属性,最大连续登录失败次数
private Integer maxFailTimes;
public LoginFailDetectWarning(Integer maxFailTimes) {
this.maxFailTimes = maxFailTimes;
}
// 定义状态:保存2秒内所有的登录失败事件
ListState<LoginEvent> loginFailEventListState;
@Override
public void open(Configuration parameters) throws Exception {
loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
}
// 以登录事件作为判断报警的触发条件,不再注册定时器
@Override
public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
// 判断当前事件登录状态
if( "fail".equals(value.getLoginState()) ){
// 1. 如果是登录失败,获取状态中之前的登录失败事件,继续判断是否已有失败事件
Iterator<LoginEvent> iterator = loginFailEventListState.get().iterator();
if( iterator.hasNext() ){
// 1.1 如果已经有登录失败事件,继续判断时间戳是否在2秒之内
// 获取已有的登录失败事件
LoginEvent firstFailEvent = iterator.next();
if( value.getTimestamp() - firstFailEvent.getTimestamp() <= 2 ){
// 1.1.1 如果在2秒之内,输出报警
out.collect( new LoginFailWarning(value.getUserId(), firstFailEvent.getTimestamp(), value.getTimestamp(), "login fail 2 times in 2s") );
}
// 不管报不报警,这次都已处理完毕,直接更新状态
loginFailEventListState.clear();
loginFailEventListState.add(value);
} else {
// 1.2 如果没有登录失败,直接将当前事件存入ListState
loginFailEventListState.add(value);
}
} else {
// 2. 如果是登录成功,直接清空状态
loginFailEventListState.clear();
}
}
CEP代码
使用CEP来进行筛选
导入CEP依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码
import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
/**
* @author Kewei
* @Date 2022/3/11 13:28
*/
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");
SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LoginEvent>() {
@Override
public long extractAscendingTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 1.定义一个匹配模式,用于筛选两秒内都登录失败的事件
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getLoginState());
}
})
.next("secondFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getLoginState());
}
})
.within(Time.seconds(2));
// 2.将匹配模式应用到数据流上,得到一个pattern strea
PatternStream<LoginEvent> patternStream = CEP.pattern(dataStream.keyBy(LoginEvent::getUserId), loginFailPattern);
// 3.检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> result = patternStream.select(new LoginFailMatch());
result.print();
env.execute();
}
// 实现PatternSelectFunction接口,输出事件
public static class LoginFailMatch implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent firstFail = map.get("firstFail").iterator().next();
LoginEvent secondFail = map.get("secondFail").get(0);
return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(), secondFail.getTimestamp(), "login fail 2 times");
}
}
}
这样只能输出第一次和第二次失败的事件,下面使用CEP循环模式优化一下代码
CEP循环模式
import bean.LoginEvent;
import bean.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
/**
* @author Kewei
* @Date 2022/3/11 13:28
*/
public class LoginFailWithCep2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailAnalysis\\LoginLog.csv");
SingleOutputStreamOperator<LoginEvent> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LoginEvent>() {
@Override
public long extractAscendingTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 定义一个匹配模式
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("failEvents").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getLoginState());
}
})
.times(3).consecutive()
.within(Time.seconds(5));
PatternStream<LoginEvent> patternStream = CEP.pattern(dataStream.keyBy(LoginEvent::getUserId), loginFailPattern);
SingleOutputStreamOperator<LoginFailWarning> result = patternStream.select(new LoginFailMatch());
result.print();
env.execute();
}
public static class LoginFailMatch implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent firstFail = map.get("failEvents").get(0);
LoginEvent secondFail = map.get("failEvents").get(map.get("failEvents").size()-1);
return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(), secondFail.getTimestamp(), "login fail " + map.get("failEvents").size() + " times");
}
}
}
2.7 订单支付实时监控
- 基本需求
- 用户下单之后,应设置订单失效事件,以提高用户支付的意愿,并降低系统风险
- 用户下单后15分钟未支付,则输出监控信息
- 解决思路
- 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
- 也可以利用状态编程,用process function实现处理逻辑
创建POJO
-
OrderEvent
private Long orderId; private String eventType; private String txId; private Long timestamp;
-
OrderResult
private Long orderId; private String resultState;
-
ReceiptEvent
private String txId; private String payChannel; private Long timestamp;
CEP实现
import bean.OrderEvent;
import bean.OrderResult;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
/**
* @author Kewei
* @Date 2022/3/11 15:18
*/
public class OrderPayTimeout {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
SingleOutputStreamOperator<OrderEvent> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new OrderEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
@Override
public long extractAscendingTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 创建一个匹配规则,时间限制的模式
Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return "create".equals(orderEvent.getEventType());
}
})
.followedBy("pay").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return "pay".equals(orderEvent.getEventType());
}
})
.within(Time.minutes(5));
// 定义一个侧输出流,表示超时事件
OutputTag<OrderResult> outputTag = new OutputTag<OrderResult>("order-timeout"){};
//将pattern应用到输入数据上,得到pattern stream
PatternStream<OrderEvent> patternStream = CEP.pattern(dataStream.keyBy(OrderEvent::getOrderId), orderPayPattern);
// 调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理
/**
* select方法有三个参数
* 第一个:侧输出接收对象
* 第二个:时间超出之后需要处理的方法
* 第三个:正常事件处理的方法
*/
SingleOutputStreamOperator<OrderResult> result = patternStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
result.print();
result.getSideOutput(outputTag).print("timeout");
env.execute();
}
// 实现自定义的超时事件处理函数
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent,OrderResult>{
@Override
public OrderResult timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {
Long timeoutOrserId = map.get("create").iterator().next().getOrderId();
return new OrderResult(timeoutOrserId,"timeout " + l);
}
}
// 实现自定义的正常匹配事件处理函数
public static class OrderPaySelect implements PatternSelectFunction<OrderEvent,OrderResult>{
@Override
public OrderResult select(Map<String, List<OrderEvent>> map) throws Exception {
return new OrderResult(map.get("pay").iterator().next().getOrderId(),"payed");
}
}
}
ProcessFunction实现
CEP虽然更加简洁,但是ProcessFunction能控制的细节操作更多。
CEP还是比较适合事件之间有复杂联系的场景。
ProcessFunction用来处理每个独立且靠状态就能联系的事件,灵活性更高。
代码
import bean.OrderEvent;
import bean.OrderResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
/**
* @author Kewei
* @Date 2022/3/11 15:18
*/
public class OrderPayTimeoutWithProcess {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
SingleOutputStreamOperator<OrderEvent> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new OrderEvent(new Long(field[0]), field[1], field[2], new Long(field[3]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
@Override
public long extractAscendingTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 定义自定义处理函数,主流输出正常匹配订单事件,侧输出流输出超时报警事件
SingleOutputStreamOperator<OrderResult> result = dataStream
.keyBy(OrderEvent::getOrderId)
.process(new OrderTimeoutProcess());
result.print("pay normally");
// 创建侧输出标签,必须使用匿名内部类(加上{})
result.getSideOutput(new OutputTag<OrderResult>("timeout"){}).print("timeout");
env.execute();
}
public static class OrderTimeoutProcess extends KeyedProcessFunction<Long,OrderEvent,OrderResult>{
// 定义状态,保存之前点单是否已经来过create、pay的事
ValueState<Boolean> isCreateState;
ValueState<Boolean> isPayState;
// 定义状态,保存定时器时间戳
ValueState<Long> timestemp;
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, OrderEvent, OrderResult>.OnTimerContext ctx, Collector<OrderResult> out) throws Exception {
// 触发定时任务时,肯定有一个事件没有来
if (isPayState.value()){
// 若pay来了,说明create没有来
ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(ctx.getCurrentKey(), "payed but not found created log"));
}else {
// 若create来了,说明pay没来
ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(ctx.getCurrentKey(),"timeout"));
}
isPayState.clear();
isCreateState.clear();
timestemp.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
// 第三个参数,是否添加默认值,不加的化,默认添加默认的值null等等
isCreateState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is create",Boolean.class,false));
isPayState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is pay",Boolean.class,false));
timestemp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("times temp",Long.class));
}
@Override
public void processElement(OrderEvent value, KeyedProcessFunction<Long, OrderEvent, OrderResult>.Context ctx, Collector<OrderResult> out) throws Exception {
// 获取三个状态的值
Boolean isCreate = isCreateState.value();
Boolean isPay = isPayState.value();
Long timesTemp = timestemp.value();
String eventType = value.getEventType();
// 判断本次事件是否为create
if ("create".equals(eventType)){
// 若是,并且已经付过款pay了,则将数据正常输出,并清除状态,删除定时器
if (isPay){
out.collect(new OrderResult(value.getOrderId(),"payed successfully"));
isCreateState.clear();
isPayState.clear();
timestemp.clear();
ctx.timerService().deleteEventTimeTimer(timesTemp);
}else {
// 若是create,但是没有pay,则设置一个15分钟之后的定时任务,并更新相关状态
long ts = (value.getTimestamp() + 15 * 60) * 1000;
ctx.timerService().registerEventTimeTimer(ts);
timestemp.update(ts);
isCreateState.update(true);
}
}else if ("pay".equals(eventType)){
if (isCreate){
// 若是pay,并且之前已经有create了
// 先判断一下是否超时,若没有超时,则正常输出,否则,将数据输出到 侧输出流
if (value.getTimestamp()*1000L < timesTemp){
out.collect(new OrderResult(value.getOrderId(),"payed successfully"));
}else {
ctx.output(new OutputTag<OrderResult>("timeout"){},new OrderResult(value.getOrderId(),"payed but already timeout"));
}
// 最后清空状态,并删除任务
isCreateState.clear();
isPayState.clear();
timestemp.clear();
ctx.timerService().deleteEventTimeTimer(timesTemp);
}else {
// 若是pay,但是没有create ,是由于乱序数据,注册一个定时任务,等一下create
ctx.timerService().registerEventTimeTimer(value.getTimestamp()*1000L);
timestemp.update(value.getTimestamp()*1000L);
isPayState.update(true);
}
}
}
}
}
2.8 订单支付实时对账
- 基本需求
- 用户下单并支付之后,应查询到账信息,进行实时对帐
- 如果有不匹配的支付信息或者到账信息,输出提示信息
- 解决思路
- 从两条流中分别读取订单支付信息和到账信息,合并处理
- 用connect连接合并两条流,用coProcessFunction做匹配处理
创建POJO
-
ReceiptEvent
private String txId; private String payChannel; private Long timestamp;
代码
import bean.OrderEvent;
import bean.ReceiptEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author Kewei
* @Date 2022/3/11 16:22
*/
public class TxPayMatch {
private final static OutputTag<OrderEvent> unmatchedPays = new OutputTag<OrderEvent>("unmatched-pays") {};
private final static OutputTag<ReceiptEvent> unmatchedReceipts = new OutputTag<ReceiptEvent>("unmatched-receipts") {};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream1 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
SingleOutputStreamOperator<OrderEvent> orderStream = inputStream1.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
});
DataStreamSource<String> inputStream2 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptStream = inputStream2.map(line -> {
String[] fields = line.split(",");
return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(ReceiptEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 将订单id为空的数据过滤,之后使用connect连接数据
SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderStream.filter(data -> !"".equals(data.getTxId())).keyBy(OrderEvent::getTxId)
.connect(receiptStream.keyBy(ReceiptEvent::getTxId))
.process(new TxPayMatchDetect());
result.print("normal");
result.getSideOutput(unmatchedPays).print("unmatchedPays");
result.getSideOutput(unmatchedReceipts).print("unmatchedReceipts");
env.execute();
}
// 继承CoProcessFunction类,用于处理连接的数据
public static class TxPayMatchDetect extends CoProcessFunction<OrderEvent,ReceiptEvent, Tuple2<OrderEvent,ReceiptEvent>>{
// 设置两个状态,保存连接的数据
ValueState<OrderEvent> payState;
ValueState<ReceiptEvent> receiptState;
@Override
public void onTimer(long timestamp, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// 判断状态中是否有值,而不是状态本身是否为null
// 定时器触发,有可能是有一个事件没来,不匹配,也有可能是都来过了,已经输出并清空状态
// 判断哪个不为空,那么另一个就没来
if (payState.value() != null){
ctx.output(unmatchedPays,payState.value());
}
if (receiptState.value() != null){
ctx.output(unmatchedReceipts,receiptState.value());
}
payState.clear();
receiptState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("pay state",OrderEvent.class));
receiptState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt state",ReceiptEvent.class));
}
@Override
public void processElement1(OrderEvent value, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// 订单支付事件来了,判断是否已经有对应的到账事件
ReceiptEvent receiptEvent = receiptState.value();
if (null != receiptEvent){
// 如果receipt不为空,说明到账事件已经来过,输出匹配事件,清空状态
out.collect(new Tuple2<>(value,receiptEvent));
payState.clear();
receiptState.clear();
}else {
// 如果receipt没来,注册一个定时器,开始等待,并更新状态
ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 5)*1000L);
payState.update(value);
}
}
@Override
public void processElement2(ReceiptEvent value, CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// // 到账事件来了,判断是否已经有对应的订单事件
OrderEvent orderEvent = payState.value();
if (null != orderEvent){
// 如果order不为空,说明订单事件已经来过,输出匹配事件,清空状态
out.collect(new Tuple2<>(orderEvent,value));
payState.clear();
receiptState.clear();
}else {
// 如果order没来,注册一个定时器,开始等待,并更新状态
ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 3) * 1000L);
receiptState.update(value);
}
}
}
}
使用Join实现
这种方法的缺陷,只能获得正常匹配的结果,不能获得未匹配成功的记录。
代码
import bean.OrderEvent;
import bean.ReceiptEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author Kewei
* @Date 2022/3/11 16:22
*/
public class TxPayMatchByJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream1 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\OrderLog.csv");
SingleOutputStreamOperator<OrderEvent> orderStream = inputStream1.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
});
DataStreamSource<String> inputStream2 = env.readTextFile("D:\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayAnalysis\\ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptStream = inputStream2.map(line -> {
String[] fields = line.split(",");
return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.milliseconds(200)) {
@Override
public long extractTimestamp(ReceiptEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 区间连接两条流,得到匹配的数据
SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderStream.filter(data -> !"".equals(data.getTxId()))
.keyBy(OrderEvent::getTxId)
.intervalJoin(receiptStream.keyBy(ReceiptEvent::getTxId))
.between(Time.seconds(-3), Time.seconds(5)) // -3,5 区间范围
.process(new TxPayMatchDetectByJoin());
result.print();
env.execute();
}
// 实现自定义ProcessJoinFunction
public static class TxPayMatchDetectByJoin extends ProcessJoinFunction<OrderEvent,ReceiptEvent,Tuple2<OrderEvent,ReceiptEvent>>{
@Override
public void processElement(OrderEvent left, ReceiptEvent right, ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>>.Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
out.collect(new Tuple2<>(left,right));
}
}
}