需求:
从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。
package cep
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import java.util
/**
* @Author yqq
* @Date 2021/12/28 23:14
* @Version 1.0
* 需求:从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户10秒内连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。
* @param id 登录日志id
* @param name 用户名
* @param eventType 登录类型(成功或失败)
* @param eventTime 登录时间,精确到秒
*/
case class LoginEvent(id:Long,name:String,eventType:String,eventTime:Long)
object TestCEPByLogin {
def main(args: Array[String]): Unit = {
val streamEvn = StreamExecutionEnvironment.getExecutionEnvironment
streamEvn.setParallelism(1)
streamEvn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置时间语义
import org.apache.flink.streaming.api.scala._
//1.输入事件流的创建
//读取登录日志
val stream: DataStream[LoginEvent] = streamEvn.fromCollection(List(
new LoginEvent(1, "yqq", "fail", 1577080451),//这里单位秒
new LoginEvent(2, "yqq", "fail", 1577080452),
new LoginEvent(3, "yqq", "fail", 1577080453),
new LoginEvent(4, "zifan", "fail", 1577080459),
new LoginEvent(4, "zifan", "success", 1577080460),
new LoginEvent(5, "yqq", "fail", 1577080463)
)).assignAscendingTimestamps(_.eventTime*1000) //指定EventTime的时候必须确保到时间戳(毫秒)
//2.定义模式(Pattern)
val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("start").where(_.eventType.equals("fail"))
.next("fail2").where(_.eventType.equals("fail"))
.next("fail3").where(_.eventType.equals("fail"))
.within(Time.seconds(10)) //时间限制,
//3.检测Pattern
val patternStream: PatternStream[LoginEvent] = CEP.pattern(stream.keyBy(_.name), pattern) //根据用户名分组
//4.选择结果并输出
val result: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {
override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
val keyIter: util.Iterator[String] = map.keySet().iterator()
val e1: LoginEvent = map.get(keyIter.next()).iterator().next()
val e2: LoginEvent = map.get(keyIter.next()).iterator().next()
val e3: LoginEvent = map.get(keyIter.next()).iterator().next()
"用户名:" + e1.name + "登录时间" + ":" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime
}
})
result.print()
streamEvn.execute()
}
}