CEP的运行核心类为
/flinkCEP-Patterns/src/main/java/org/apache/flink/cep/operator/CepOperator.java
在此类的processElements可以看到有一个依据isProcessingTime的判断:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
if (comparator == null) {
// there can be no out of order elements in processing time
NFAState nfaState = getNFAState();
long timestamp = getProcessingTimeService().getCurrentProcessingTime();
advanceTime(nfaState, timestamp);
processEvent(nfaState, element.getValue(), timestamp);
updateNFA(nfaState);
} else {
long currentTime = timerService.currentProcessingTime();
bufferEvent(element.getValue(), currentTime);
// register a timer for the next millisecond to sort and emit buffered data
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
}
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are
// considered late.
// Late events are put in a dedicated side output, if the user has specified one.
if (timestamp > timerService.currentWatermark()) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
} else {
numLateRecordsDropped.inc();
}
}
}
isProcessingTime是在初始化传入的:
public CepOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
final NFACompiler.NFAFactory<IN> nfaFactory,
@Nullable final EventComparator<IN> comparator,
@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
final PatternProcessFunction<IN, OUT> function,
@Nullable final OutputTag<IN> lateDataOutputTag) {
super(function);
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
this.isProcessingTime = isProcessingTime;
this.comparator = comparator;
this.lateDataOutputTag = lateDataOutputTag;
if (afterMatchSkipStrategy == null) {
this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
} else {
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
}
}
根据创建来源,是org.apache.flink.cep.PatternStreamBuilder.build(TypeInformation, PatternProcessFunction<IN, OUT>)类的forStreamAndPattern函数:
// ---------------------------------------- factory-like methods
// ---------------------------------------- //
static <IN> PatternStreamBuilder<IN> forStreamAndPattern(
final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
return new PatternStreamBuilder<>(
inputStream, pattern, TimeBehaviour.EventTime, null, null);
}
可以看到这里写死了TimeBehaviour.EventTime,无论当前环境采用哪种时间模式,这就是不支持processTime的根本原因(ps,ingestTime由于processElement采用if/else而支持了),再看下面的代码,就可以知道isProcessingTime 恒为false了。
<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,
final PatternProcessFunction<IN, OUT> processFunction) {
checkNotNull(outTypeInfo);
checkNotNull(processFunction);
final TypeSerializer<IN> inputSerializer = inputStream.getType()
.createSerializer(inputStream.getExecutionConfig());
final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;
final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
final CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer, isProcessingTime, nfaFactory,
comparator, pattern.getAfterMatchSkipStrategy(), processFunction, lateDataOutputTag);
final SingleOutputStreamOperator<OUT> patternStream;
if (inputStream instanceof KeyedStream) {
KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
} else {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
patternStream = inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator)
.forceNonParallel();
}
return patternStream;
}
找到原因就好修改了,如要支持processTime(flink其实不鼓励),可以将forStreamAndPattern代码改为
static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream,
final Pattern<IN, ?> pattern) {
TimeCharacteristic tme = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic();
if (tme.equals(TimeCharacteristic.ProcessingTime)) {
System.out.println("----------------is ProcessingTime");
return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.ProcessingTime, null, null);
}
else
return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);
// return new PatternStreamBuilder<>(
// inputStream, pattern, TimeBehaviour.EventTime, null, null);
}
这样目前官网doc demo及1.12使用processTime的cep都可以运行。