package com.steven.flinkdemo.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class KafkaConsumerApp {
    public static void main(String[] args) throws Exception{
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","dbos-bigdata-test006:9092,dbos-bigdata-test007:9092,dbos-bigdata-test008:9092");
            properties.setProperty("group.id","flink");
            DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<String>("test",new SimpleStringSchema(),properties));
            stream.map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;
                @Override
                public String map(String value) throws Exception {
                    return "flink:" + value;
                }
            }).print();
            env.execute("consumer");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package com.steven.flinkdemo.kafka;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import java.io.Serializable;
import java.util.Properties;
public class KafkaProdcerApp {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","dbos-bigdata-test006:9092,dbos-bigdata-test007:9092,dbos-bigdata-test008:9092");
        DataStream<String> stream = env.addSource(new SimpleStringGenerator());
        stream.addSink(new FlinkKafkaProducer010<String>("test",new SimpleStringSchema(),props));
        env.execute();
    }
}
class SimpleStringGenerator implements SourceFunction<String>, Serializable{
    private static final long  serialVersionUID = 1L;
    private volatile boolean isRunning = true;
    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception{
        while (isRunning){
            String str = RandomStringUtils.randomAlphanumeric(5);
            ctx.collect(str);
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel(){
        isRunning = false;
    }
}