0
点赞
收藏
分享

微信扫一扫

[Spark应用]-- 创建单例的kafka链接


一、创建一个单例的spark streaming  kafka链接类:KafkaGetStreamUtil.java

import enn.bean.SparkProperties;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Document:本类作用---->单例模式的KafkaStream,不用每次都去创建连接
* User: yangjf
* Date: 2016/7/16 10:20
*/
public class KafkaGetStreamUtil {
private static Map<String, Object> kafkaStreamMap = new HashMap<String, Object>();
public static synchronized Map<String,Object> getKafkaStreamMap() {
if(kafkaStreamMap.isEmpty()){
// SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("ConsumerKafkaRealData");
SparkConf conf = new SparkConf().setAppName("ConsumerKafkaRealData").setMaster("yarn-cluster");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(
SparkProperties.SPARK_DSTREAM_MILLISECONDS));
//kafka
Set<String> topics = new HashSet<String>();
topics.add(SparkProperties.KAFKA_TOPIC);
Map<String,String> kafkaParams = new HashMap<String,String>();
kafkaParams.put(SparkProperties.KAFKA_METADATA_BROKER_LIST_TXT, SparkProperties.KAFKA_BROKER_URL);
kafkaParams.put(SparkProperties.KAFKA_SERIALIZER_CLASS, SparkProperties.KAFKA_SERIALIZER_STRINGENCODER);
kafkaParams.put("group.id","e_lvbin");
// kafkaParams.put("auto.offset.reset", "smallest");/**保证可以读取丢失的数据*/
JavaPairInputDStream directKafkaStream =
KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topics);

//可迭代对象数据
kafkaStreamMap.put("directKafkaStream",directKafkaStream);
kafkaStreamMap.put("jssc",jssc);

return kafkaStreamMap;
}else {
return kafkaStreamMap;
}
}
}

 

SparkProperties.java文件内容:


 


import java.io.Serializable; /**  * Document:本类作用---->--->开发环境的配置文件  * User: yangjf  * Date: 2016072016/7/119:43  */ public class SparkProperties implements Serializable { /**spark相关*/  public static long SPARK_DSTREAM_MILLISECONDS=200;//毫秒  /**kafka相关*/  public static String KAFKA_TOPIC="20160710aa"; //topic名称10.37.149.180  public static String KAFKA_BROKER_URL="elcndc2bdkf01t:9092";//kafka地址,可以是列表 // public static String KAFKA_BROKER_URL="10.37.149.180:9092";//kafka地址,可以是列表  public static String KAFKA_METADATA_BROKER_LIST_TXT="metadata.broker.list"; public static String KAFKA_SERIALIZER_CLASS="serializer.class"; public static String KAFKA_SERIALIZER_STRINGENCODER="kafka.serializer.StringEncoder"; /**zookeeper相关本地测试使用*/  public static String ZOOKEEPER_URL="node1:2180,node2:2180,node3:2180"; public static String HADOOP_NAMENODE="hdfs://node1:9000/"; /**redis相关*/  public static String REDIS_HOST="elcndc2bdrd01t"; //redis master的ip地址:10.37.149.184  public static Integer REDIS_PORT=6379; public static Integer REDIS_TIMEOUT=30000; }

二、使用单例模式的类创建kafka链接;核心代码如下

JavaPairInputDStream directKafkaStream = (JavaPairInputDStream) KafkaGetStreamUtil.getKafkaStreamMap().get("directKafkaStream");
JavaStreamingContext jssc = (JavaStreamingContext) KafkaGetStreamUtil.getKafkaStreamMap().get("jssc");
JavaDStream<JSONObject> lines = directKafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
@Override
public JSONObject call(Tuple2<String, String> tuple2) {
return JSONObject.fromObject(tuple2._2);

}
});

 

 

 

 

 

举报

相关推荐

0 条评论