0
点赞
收藏
分享

微信扫一扫

flink CEP 1.12不支持processTime 模式分析

汤姆torn 2022-01-21 阅读 95

CEP的运行核心类为
/flinkCEP-Patterns/src/main/java/org/apache/flink/cep/operator/CepOperator.java
在此类的processElements可以看到有一个依据isProcessingTime的判断:

 @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            if (comparator == null) {
                // there can be no out of order elements in processing time
                NFAState nfaState = getNFAState();
                long timestamp = getProcessingTimeService().getCurrentProcessingTime();
                advanceTime(nfaState, timestamp);
                processEvent(nfaState, element.getValue(), timestamp);
                updateNFA(nfaState);
            } else {
                long currentTime = timerService.currentProcessingTime();
                bufferEvent(element.getValue(), currentTime);

                // register a timer for the next millisecond to sort and emit buffered data
                timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
            }

        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

            // In event-time processing we assume correctness of the watermark.
            // Events with timestamp smaller than or equal with the last seen watermark are
            // considered late.
            // Late events are put in a dedicated side output, if the user has specified one.

            if (timestamp > timerService.currentWatermark()) {

                // we have an event with a valid timestamp, so
                // we buffer it until we receive the proper watermark.

                saveRegisterWatermarkTimer();

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    }

isProcessingTime是在初始化传入的:

public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final NFACompiler.NFAFactory<IN> nfaFactory,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.nfaFactory = Preconditions.checkNotNull(nfaFactory);

        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
    }

根据创建来源,是org.apache.flink.cep.PatternStreamBuilder.build(TypeInformation, PatternProcessFunction<IN, OUT>)类的forStreamAndPattern函数:

// ---------------------------------------- factory-like methods
    // ---------------------------------------- //

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(
            final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
        return new PatternStreamBuilder<>(
                inputStream, pattern, TimeBehaviour.EventTime, null, null);
    }

可以看到这里写死了TimeBehaviour.EventTime,无论当前环境采用哪种时间模式,这就是不支持processTime的根本原因(ps,ingestTime由于processElement采用if/else而支持了),再看下面的代码,就可以知道isProcessingTime 恒为false了。

<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,
			final PatternProcessFunction<IN, OUT> processFunction) {

		checkNotNull(outTypeInfo);
		checkNotNull(processFunction);

		final TypeSerializer<IN> inputSerializer = inputStream.getType()
				.createSerializer(inputStream.getExecutionConfig());
		final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;

		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);

		final CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer, isProcessingTime, nfaFactory,
				comparator, pattern.getAfterMatchSkipStrategy(), processFunction, lateDataOutputTag);

		final SingleOutputStreamOperator<OUT> patternStream;
		if (inputStream instanceof KeyedStream) {
			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;

			patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
		} else {
			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

			patternStream = inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator)
					.forceNonParallel();
		}

		return patternStream;
	}

找到原因就好修改了,如要支持processTime(flink其实不鼓励),可以将forStreamAndPattern代码改为

static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream,
			final Pattern<IN, ?> pattern) {
		TimeCharacteristic tme = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic();
		if (tme.equals(TimeCharacteristic.ProcessingTime)) {
			System.out.println("----------------is ProcessingTime");
			return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.ProcessingTime, null, null);
        	}
		else
			return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);
//    	return new PatternStreamBuilder<>(
//                inputStream, pattern, TimeBehaviour.EventTime, null, null);
	}

这样目前官网doc demo及1.12使用processTime的cep都可以运行。

举报

相关推荐

0 条评论