map和flat_map区别:
[root@master pyflink]# cat flik_5.py
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream=data_stream.flat_map(lambda x: x.split(' ')[0], output_type=Types.STRING())
# 输出到控制台
data_stream.print()
# 执行任务
env.execute('Add "aaaa" to each line')
[root@master pyflink]# python flik_5.py
3> s
3> t
3> r
3> e
3> a
3> m
3> i
3> n
3> g
2> h
2> e
2> l
2> l
2> o
[root@master pyflink]# cat flik_5.py
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream=data_stream.map(lambda x: x.split(' ')[0], output_type=Types.STRING())
# 输出到控制台
data_stream.print()
# 执行任务
env.execute('Add "aaaa" to each line')
[root@master pyflink]# python flik_5.py
3> hello
4> streaming