package com.flink.DataStream.env;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class flinkEnvDemo {
    public static void main(String[] args) throws Exception {
        
        Configuration configuration = new Configuration();
        
        configuration.set(RestOptions.BIND_PORT, "8082");
        
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
                
                
                
                
                
                .createLocalEnvironmentWithWebUI(configuration);  
        
        
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        
        SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = dataStreamSource.flatMap(
                        
                        
                        (String s, Collector<Tuple2<String, Integer>> collector) -> {
                            String[] splitResult = s.split(" ");
                            
                            for (String word : splitResult) {
                                Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                                
                                collector.collect(wordsAndOne);
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(
                        (Tuple2<String, Integer> value) -> {
                            return value.f0;
                        }
                ).sum(1);
        
        singleOutputStreamOperator.print();
        
        streamExecutionEnvironment.execute("Flink Environment Demo");
        
        
        
    }
}