0
点赞
收藏
分享

微信扫一扫

Springboot整合Flowable流程引擎

雅典娜的棒槌 2023-06-03 阅读 84

map和flat_map区别:

[root@master pyflink]# cat flik_5.py 
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
env = StreamExecutionEnvironment.get_execution_environment()

data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream=data_stream.flat_map(lambda x: x.split(' ')[0], output_type=Types.STRING())

# 输出到控制台
data_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')
[root@master pyflink]# python flik_5.py 
3> s
3> t
3> r
3> e
3> a
3> m
3> i
3> n
3> g
2> h
2> e
2> l
2> l
2> o

[root@master pyflink]# cat flik_5.py 
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
env = StreamExecutionEnvironment.get_execution_environment()

data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream=data_stream.map(lambda x: x.split(' ')[0], output_type=Types.STRING())

# 输出到控制台
data_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')
[root@master pyflink]# python flik_5.py 
3> hello
4> streaming

举报

相关推荐

0 条评论