CEP的运行核心类为
 /flinkCEP-Patterns/src/main/java/org/apache/flink/cep/operator/CepOperator.java
 在此类的processElements可以看到有一个依据isProcessingTime的判断:
 @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            if (comparator == null) {
                // there can be no out of order elements in processing time
                NFAState nfaState = getNFAState();
                long timestamp = getProcessingTimeService().getCurrentProcessingTime();
                advanceTime(nfaState, timestamp);
                processEvent(nfaState, element.getValue(), timestamp);
                updateNFA(nfaState);
            } else {
                long currentTime = timerService.currentProcessingTime();
                bufferEvent(element.getValue(), currentTime);
                // register a timer for the next millisecond to sort and emit buffered data
                timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
            }
        } else {
            long timestamp = element.getTimestamp();
            IN value = element.getValue();
            // In event-time processing we assume correctness of the watermark.
            // Events with timestamp smaller than or equal with the last seen watermark are
            // considered late.
            // Late events are put in a dedicated side output, if the user has specified one.
            if (timestamp > timerService.currentWatermark()) {
                // we have an event with a valid timestamp, so
                // we buffer it until we receive the proper watermark.
                saveRegisterWatermarkTimer();
                bufferEvent(value, timestamp);
            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    }
 
isProcessingTime是在初始化传入的:
public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final NFACompiler.NFAFactory<IN> nfaFactory,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);
        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;
        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
    }
 
根据创建来源,是org.apache.flink.cep.PatternStreamBuilder.build(TypeInformation, PatternProcessFunction<IN, OUT>)类的forStreamAndPattern函数:
// ---------------------------------------- factory-like methods
    // ---------------------------------------- //
    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(
            final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
        return new PatternStreamBuilder<>(
                inputStream, pattern, TimeBehaviour.EventTime, null, null);
    }
 
可以看到这里写死了TimeBehaviour.EventTime,无论当前环境采用哪种时间模式,这就是不支持processTime的根本原因(ps,ingestTime由于processElement采用if/else而支持了),再看下面的代码,就可以知道isProcessingTime 恒为false了。
<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,
			final PatternProcessFunction<IN, OUT> processFunction) {
		checkNotNull(outTypeInfo);
		checkNotNull(processFunction);
		final TypeSerializer<IN> inputSerializer = inputStream.getType()
				.createSerializer(inputStream.getExecutionConfig());
		final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;
		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
		final CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer, isProcessingTime, nfaFactory,
				comparator, pattern.getAfterMatchSkipStrategy(), processFunction, lateDataOutputTag);
		final SingleOutputStreamOperator<OUT> patternStream;
		if (inputStream instanceof KeyedStream) {
			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
			patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
		} else {
			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
			patternStream = inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator)
					.forceNonParallel();
		}
		return patternStream;
	}
 
找到原因就好修改了,如要支持processTime(flink其实不鼓励),可以将forStreamAndPattern代码改为
static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream,
			final Pattern<IN, ?> pattern) {
		TimeCharacteristic tme = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic();
		if (tme.equals(TimeCharacteristic.ProcessingTime)) {
			System.out.println("----------------is ProcessingTime");
			return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.ProcessingTime, null, null);
        	}
		else
			return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);
//    	return new PatternStreamBuilder<>(
//                inputStream, pattern, TimeBehaviour.EventTime, null, null);
	}
 
这样目前官网doc demo及1.12使用processTime的cep都可以运行。









