文章目录
- 自定义KafkaAppender
- 修改log4j.properties配置
- 启动命令配置添加参数
- 启动之后可以在Kafka中查询发送数据
- 时区问题-自定义实现JSONLayout解决
- 自定义JSONLayout.java
- 一键应用
- 可能遇到的异常
- ClassNotFoundException: xxx.KafkaLog4jAppender
- Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
- 参考文章
自定义KafkaAppender
注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>2.4.1</version>
</dependency>
配置依赖为
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.28</version>
<scope>compile</scope>
</dependency>
自定义KafkaLog4jAppender.java
内容为
这里我们实现了包名过滤功能
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaLog4jAppender extends AppenderSkeleton {
/** 包含规则条件 */
private Set<String> includeSet = new HashSet<>();
private Set<String> includeMatchSet = new HashSet<>();
/** 不包含规则条件 */
private Set<String> excludeSet = new HashSet<>();
private Set<String> excludeMatchSet = new HashSet<>();
private String brokerList;
private String topic;
private String compressionType;
private String securityProtocol;
private String sslTruststoreLocation;
private String sslTruststorePassword;
private String sslKeystoreType;
private String sslKeystoreLocation;
private String sslKeystorePassword;
private String saslKerberosServiceName;
private String saslMechanism;
private String clientJaasConfPath;
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;
private int retries = 2147483647;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
private String includes;
private String excludes;
public String getIncludes() {
return includes;
}
public void setIncludes(String includes) {
this.includes = includes;
}
public String getExcludes() {
return excludes;
}
public void setExcludes(String excludes) {
this.excludes = excludes;
}
public KafkaLog4jAppender() {}
public Producer<byte[], byte[]> getProducer() {
return this.producer;
}
public String getBrokerList() {
return this.brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public int getRequiredNumAcks() {
return this.requiredNumAcks;
}
public void setRequiredNumAcks(int requiredNumAcks) {
this.requiredNumAcks = requiredNumAcks;
}
public int getRetries() {
return this.retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getDeliveryTimeoutMs() {
return this.deliveryTimeoutMs;
}
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
this.deliveryTimeoutMs = deliveryTimeoutMs;
}
public String getCompressionType() {
return this.compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getTopic() {
return this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean getIgnoreExceptions() {
return this.ignoreExceptions;
}
public void setIgnoreExceptions(boolean ignoreExceptions) {
this.ignoreExceptions = ignoreExceptions;
}
public boolean getSyncSend() {
return this.syncSend;
}
public void setSyncSend(boolean syncSend) {
this.syncSend = syncSend;
}
public String getSslTruststorePassword() {
return this.sslTruststorePassword;
}
public String getSslTruststoreLocation() {
return this.sslTruststoreLocation;
}
public String getSecurityProtocol() {
return this.securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
this.sslTruststoreLocation = sslTruststoreLocation;
}
public void setSslTruststorePassword(String sslTruststorePassword) {
this.sslTruststorePassword = sslTruststorePassword;
}
public void setSslKeystorePassword(String sslKeystorePassword) {
this.sslKeystorePassword = sslKeystorePassword;
}
public void setSslKeystoreType(String sslKeystoreType) {
this.sslKeystoreType = sslKeystoreType;
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public void setClientJaasConfPath(String clientJaasConfPath) {
this.clientJaasConfPath = clientJaasConfPath;
}
public void setKerb5ConfPath(String kerb5ConfPath) {
this.kerb5ConfPath = kerb5ConfPath;
}
public String getSslKeystoreLocation() {
return this.sslKeystoreLocation;
}
public String getSslKeystoreType() {
return this.sslKeystoreType;
}
public String getSslKeystorePassword() {
return this.sslKeystorePassword;
}
public String getSaslKerberosServiceName() {
return this.saslKerberosServiceName;
}
public String getClientJaasConfPath() {
return this.clientJaasConfPath;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslMechanism() {
return this.saslMechanism;
}
public void setClientJaasConf(String clientJaasConf) {
this.clientJaasConf = clientJaasConf;
}
public String getClientJaasConf() {
return this.clientJaasConf;
}
public String getKerb5ConfPath() {
return this.kerb5ConfPath;
}
public int getMaxBlockMs() {
return this.maxBlockMs;
}
public void setMaxBlockMs(int maxBlockMs) {
this.maxBlockMs = maxBlockMs;
}
@Override
public void activateOptions() {
// 加载过滤规则
setFilterRules(includes, includeMatchSet, includeSet);
setFilterRules(excludes, excludeMatchSet, excludeSet);
Properties props = new Properties();
if (this.brokerList != null) {
props.put("bootstrap.servers", this.brokerList);
}
if (props.isEmpty()) {
throw new ConfigException("The bootstrap servers property should be specified");
} else if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
} else {
if (this.compressionType != null) {
props.put("compression.type", this.compressionType);
}
props.put("acks", Integer.toString(this.requiredNumAcks));
props.put("retries", this.retries);
props.put("delivery.timeout.ms", this.deliveryTimeoutMs);
if (this.securityProtocol != null) {
props.put("security.protocol", this.securityProtocol);
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SSL")
&& this.sslTruststoreLocation != null
&& this.sslTruststorePassword != null) {
props.put("ssl.truststore.location", this.sslTruststoreLocation);
props.put("ssl.truststore.password", this.sslTruststorePassword);
if (this.sslKeystoreType != null
&& this.sslKeystoreLocation != null
&& this.sslKeystorePassword != null) {
props.put("ssl.keystore.type", this.sslKeystoreType);
props.put("ssl.keystore.location", this.sslKeystoreLocation);
props.put("ssl.keystore.password", this.sslKeystorePassword);
}
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SASL")
&& this.saslKerberosServiceName != null
&& this.clientJaasConfPath != null) {
props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);
}
if (this.kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);
}
if (this.saslMechanism != null) {
props.put("sasl.mechanism", this.saslMechanism);
}
if (this.clientJaasConf != null) {
props.put("sasl.jaas.config", this.clientJaasConf);
}
if (this.maxBlockMs != null) {
props.put("max.block.ms", this.maxBlockMs);
}
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
System.out.println("Properties:" + props);
this.producer = this.getKafkaProducer(props);
LogLog.debug("Kafka producer connected to " + this.brokerList);
LogLog.debug("Logging for topic: " + this.topic);
}
}
/**
* 设置过滤规则
*
* @name setFilterRules
* @date 2023/3/2 下午1:57
* @return void
* @param excludes
* @param excludeMatchSet
* @param excludeSet
* @author Jast
*/
private void setFilterRules(
String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {
if (excludes != null) {
for (String exclude : excludes.split(",")) {
if (exclude.length() > 0) {
if (exclude.endsWith(".*")) {
excludeMatchSet.add(exclude.replace(".*", ""));
} else {
excludeSet.add(exclude);
}
}
}
}
}
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
return new KafkaProducer(props);
}
@Override
protected void append(LoggingEvent event) {
if (filterPackageName(event)) {
return;
}
String message = this.subAppend(event);
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response =
this.producer.send(
new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));
if (this.syncSend) {
try {
response.get();
} catch (ExecutionException | InterruptedException var5) {
if (!this.ignoreExceptions) {
throw new RuntimeException(var5);
}
LogLog.debug("Exception while getting response", var5);
}
}
}
private String subAppend(LoggingEvent event) {
return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);
}
@Override
public void close() {
if (!this.closed) {
this.closed = true;
this.producer.close();
}
}
@Override
public boolean requiresLayout() {
return true;
}
/**
* 过滤包名,如果为True则不发送到Kafka
*
* @name filterPackageName
* @date 2023/2/28 下午4:07
* @return boolean
* @param event
* @author Jast
*/
private boolean filterPackageName(LoggingEvent event) {
boolean flag = true;
if (includeSet.size() == 0
&& includeMatchSet.size() == 0
&& excludeSet.size() == 0
&& excludeMatchSet.size() == 0) {
return false;
}
if (includeSet.size() == 0 && includeMatchSet.size() == 0) {
flag = false;
}
/** 打印日志类/名称 */
String loggerName = event.getLoggerName();
for (String include : includeSet) {
if (loggerName.equals(include)) {
flag = false;
}
}
for (String include : includeMatchSet) {
if (loggerName.startsWith(include)) {
flag = false;
}
}
for (String exclude : excludeMatchSet) {
if (loggerName.startsWith(exclude)) {
flag = true;
}
}
for (String exclude : excludeSet) {
if (loggerName.equals(exclude)) {
flag = true;
}
}
return flag;
}
}
修改log4j.properties配置
修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf
不同的集群可能配置文件所在目录不同
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx
启动命令配置添加参数
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
说明:
- kafka-appender-1.0.0.jar 为我们刚刚自定义的
KafkaLog4jAppender
类打成的jar包 - slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
- slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交
启动之后可以在Kafka中查询发送数据
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}
这里有个问题net.logstash.log4j.JSONEventLayoutV1
实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout
时区问题-自定义实现JSONLayout解决
JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志
自定义JSONLayout.java
在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java
类,内容如下
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
public class JSONLayout extends Layout {
private boolean locationInfo = false;
private String customUserFields;
private boolean ignoreThrowable = false;
private boolean activeIgnoreThrowable = ignoreThrowable;
private String hostname = InetAddress.getLocalHost().getHostName();
private String threadName;
private long timestamp;
private String ndc;
private Map mdc;
private LocationInfo info;
private HashMap<String, Object> exceptionInformation;
private static Integer version = 1;
private JSONObject logstashEvent;
public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");
public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);
public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";
public static String dateFormat(long timestamp) {
return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);
}
/**
* For backwards compatibility, the default is to generate location information in the log
* messages.
*/
public JSONLayout() throws UnknownHostException {
this(true);
}
/**
* Creates a layout that optionally inserts location information into log messages.
*
* @param locationInfo whether or not to include location information in the log messages.
*/
public JSONLayout(boolean locationInfo) throws UnknownHostException {
this.locationInfo = locationInfo;
}
@Override
public String format(LoggingEvent loggingEvent) {
threadName = loggingEvent.getThreadName();
timestamp = loggingEvent.getTimeStamp();
exceptionInformation = new HashMap<String, Object>();
mdc = loggingEvent.getProperties();
ndc = loggingEvent.getNDC();
logstashEvent = new JSONObject();
String whoami = this.getClass().getSimpleName();
/**
* All v1 of the event format requires is "@timestamp" and "@version" Every other field is
* arbitrary
*/
logstashEvent.put("@version", version);
logstashEvent.put("@timestamp", dateFormat(timestamp));
/** Extract and add fields from log4j config, if defined */
if (getUserFields() != null) {
String userFlds = getUserFields();
LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);
addUserFields(userFlds);
}
/**
* Extract fields from system properties, if defined Note that CLI props will override
* conflicts with log4j config
*/
if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {
if (getUserFields() != null) {
LogLog.warn(
"["
+ whoami
+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");
}
String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);
LogLog.debug(
"[" + whoami + "] Got user data from system property: " + userFieldsProperty);
addUserFields(userFieldsProperty);
}
/** Now we start injecting our own stuff. */
logstashEvent.put("source_host", hostname);
logstashEvent.put("message", loggingEvent.getRenderedMessage());
if (loggingEvent.getThrowableInformation() != null) {
final ThrowableInformation throwableInformation =
loggingEvent.getThrowableInformation();
if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {
exceptionInformation.put(
"exception_class",
throwableInformation.getThrowable().getClass().getCanonicalName());
}
if (throwableInformation.getThrowable().getMessage() != null) {
exceptionInformation.put(
"exception_message", throwableInformation.getThrowable().getMessage());
}
if (throwableInformation.getThrowableStrRep() != null) {
String stackTrace =
StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");
exceptionInformation.put("stacktrace", stackTrace);
}
addEventData("exception", exceptionInformation);
}
if (locationInfo) {
info = loggingEvent.getLocationInformation();
addEventData("file", info.getFileName());
addEventData("line_number", info.getLineNumber());
addEventData("class", info.getClassName());
addEventData("method", info.getMethodName());
}
addEventData("logger_name", loggingEvent.getLoggerName());
addEventData("mdc", mdc);
addEventData("ndc", ndc);
addEventData("level", loggingEvent.getLevel().toString());
addEventData("thread_name", threadName);
return logstashEvent.toString() + "\n";
}
@Override
public boolean ignoresThrowable() {
return ignoreThrowable;
}
/**
* Query whether log messages include location information.
*
* @return true if location information is included in log messages, false otherwise.
*/
public boolean getLocationInfo() {
return locationInfo;
}
/**
* Set whether log messages should include location information.
*
* @param locationInfo true if location information should be included, false otherwise.
*/
public void setLocationInfo(boolean locationInfo) {
this.locationInfo = locationInfo;
}
public String getUserFields() {
return customUserFields;
}
public void setUserFields(String userFields) {
this.customUserFields = userFields;
}
@Override
public void activateOptions() {
activeIgnoreThrowable = ignoreThrowable;
}
private void addUserFields(String data) {
if (null != data) {
String[] pairs = data.split(",");
for (String pair : pairs) {
String[] userField = pair.split(":", 2);
if (userField[0] != null) {
String key = userField[0];
String val = userField[1];
addEventData(key, val);
}
}
}
}
private void addEventData(String keyname, Object keyval) {
if (null != keyval) {
logstashEvent.put(keyname, keyval);
}
}
}
相关依赖
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.21</version>
<scope>provided</scope>
</dependency>
打包上传服务器准备运行
启动命令中将kafka-appender-1.0.0.jar
以及相关依赖添加
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
启动后查看数据,发现@timestamp时间正常了
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}
一键应用
查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency>
<groupId>com.gitee.jastee</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>1.0.5</version>
</dependency>
- 在代码中使用Log打印日志
- 修改
Spark
配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
- 启动命令添加
使用--conf
指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf
解压,jar -cvfM
在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
- 启动程序
可能遇到的异常
ClassNotFoundException: xxx.KafkaLog4jAppender
启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类
log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender
原因:
因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPath
与spark.driver.extraClassPath
将我们自定义kafka-appender-1.0.0.jar
(jar包中的类就是KafkaLog4jAppender.java
)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可
解决方法:
在启动脚本添加
--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \
Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerException
at org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)
at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
... 27 more
原因:
使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar
解决方法:
使用slf4j的1.8.0-beta2
版本
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-beta2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-beta2</version>
</dependency>
通过spark.driver.extraClassPath
和spark.executor.extraClassPath
参数提交
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \