0
点赞
收藏
分享

微信扫一扫

Spark使用Log4j将日志发送到Kafka



文章目录

  • 自定义KafkaAppender
  • 修改log4j.properties配置
  • 启动命令配置添加参数
  • 启动之后可以在Kafka中查询发送数据
  • 时区问题-自定义实现JSONLayout解决
  • 自定义JSONLayout.java
  • 一键应用
  • 可能遇到的异常
  • ClassNotFoundException: xxx.KafkaLog4jAppender
  • Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
  • 参考文章


自定义KafkaAppender

注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-log4j-appender</artifactId>
            <version>2.4.1</version>
        </dependency>

配置依赖为

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.28</version>
            <scope>compile</scope>
        </dependency>

自定义KafkaLog4jAppender.java内容为

这里我们实现了包名过滤功能

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaLog4jAppender extends AppenderSkeleton {
    /** 包含规则条件 */
    private Set<String> includeSet = new HashSet<>();

    private Set<String> includeMatchSet = new HashSet<>();

    /** 不包含规则条件 */
    private Set<String> excludeSet = new HashSet<>();

    private Set<String> excludeMatchSet = new HashSet<>();
    private String brokerList;
    private String topic;
    private String compressionType;
    private String securityProtocol;
    private String sslTruststoreLocation;
    private String sslTruststorePassword;
    private String sslKeystoreType;
    private String sslKeystoreLocation;
    private String sslKeystorePassword;
    private String saslKerberosServiceName;
    private String saslMechanism;
    private String clientJaasConfPath;
    private String clientJaasConf;
    private String kerb5ConfPath;
    private Integer maxBlockMs;
    private int retries = 2147483647;
    private int requiredNumAcks = 1;
    private int deliveryTimeoutMs = 120000;
    private boolean ignoreExceptions = true;
    private boolean syncSend;
    private Producer<byte[], byte[]> producer;
    private String includes;
    private String excludes;

    public String getIncludes() {
        return includes;
    }

    public void setIncludes(String includes) {
        this.includes = includes;
    }

    public String getExcludes() {
        return excludes;
    }

    public void setExcludes(String excludes) {
        this.excludes = excludes;
    }

    public KafkaLog4jAppender() {}

    public Producer<byte[], byte[]> getProducer() {
        return this.producer;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public int getRequiredNumAcks() {
        return this.requiredNumAcks;
    }

    public void setRequiredNumAcks(int requiredNumAcks) {
        this.requiredNumAcks = requiredNumAcks;
    }

    public int getRetries() {
        return this.retries;
    }

    public void setRetries(int retries) {
        this.retries = retries;
    }

    public int getDeliveryTimeoutMs() {
        return this.deliveryTimeoutMs;
    }

    public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
        this.deliveryTimeoutMs = deliveryTimeoutMs;
    }

    public String getCompressionType() {
        return this.compressionType;
    }

    public void setCompressionType(String compressionType) {
        this.compressionType = compressionType;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean getIgnoreExceptions() {
        return this.ignoreExceptions;
    }

    public void setIgnoreExceptions(boolean ignoreExceptions) {
        this.ignoreExceptions = ignoreExceptions;
    }

    public boolean getSyncSend() {
        return this.syncSend;
    }

    public void setSyncSend(boolean syncSend) {
        this.syncSend = syncSend;
    }

    public String getSslTruststorePassword() {
        return this.sslTruststorePassword;
    }

    public String getSslTruststoreLocation() {
        return this.sslTruststoreLocation;
    }

    public String getSecurityProtocol() {
        return this.securityProtocol;
    }

    public void setSecurityProtocol(String securityProtocol) {
        this.securityProtocol = securityProtocol;
    }

    public void setSslTruststoreLocation(String sslTruststoreLocation) {
        this.sslTruststoreLocation = sslTruststoreLocation;
    }

    public void setSslTruststorePassword(String sslTruststorePassword) {
        this.sslTruststorePassword = sslTruststorePassword;
    }

    public void setSslKeystorePassword(String sslKeystorePassword) {
        this.sslKeystorePassword = sslKeystorePassword;
    }

    public void setSslKeystoreType(String sslKeystoreType) {
        this.sslKeystoreType = sslKeystoreType;
    }

    public void setSslKeystoreLocation(String sslKeystoreLocation) {
        this.sslKeystoreLocation = sslKeystoreLocation;
    }

    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
        this.saslKerberosServiceName = saslKerberosServiceName;
    }

    public void setClientJaasConfPath(String clientJaasConfPath) {
        this.clientJaasConfPath = clientJaasConfPath;
    }

    public void setKerb5ConfPath(String kerb5ConfPath) {
        this.kerb5ConfPath = kerb5ConfPath;
    }

    public String getSslKeystoreLocation() {
        return this.sslKeystoreLocation;
    }

    public String getSslKeystoreType() {
        return this.sslKeystoreType;
    }

    public String getSslKeystorePassword() {
        return this.sslKeystorePassword;
    }

    public String getSaslKerberosServiceName() {
        return this.saslKerberosServiceName;
    }

    public String getClientJaasConfPath() {
        return this.clientJaasConfPath;
    }

    public void setSaslMechanism(String saslMechanism) {
        this.saslMechanism = saslMechanism;
    }

    public String getSaslMechanism() {
        return this.saslMechanism;
    }

    public void setClientJaasConf(String clientJaasConf) {
        this.clientJaasConf = clientJaasConf;
    }

    public String getClientJaasConf() {
        return this.clientJaasConf;
    }

    public String getKerb5ConfPath() {
        return this.kerb5ConfPath;
    }

    public int getMaxBlockMs() {
        return this.maxBlockMs;
    }

    public void setMaxBlockMs(int maxBlockMs) {
        this.maxBlockMs = maxBlockMs;
    }

    @Override
    public void activateOptions() {
        // 加载过滤规则
        setFilterRules(includes, includeMatchSet, includeSet);
        setFilterRules(excludes, excludeMatchSet, excludeSet);

        Properties props = new Properties();
        if (this.brokerList != null) {
            props.put("bootstrap.servers", this.brokerList);
        }

        if (props.isEmpty()) {
            throw new ConfigException("The bootstrap servers property should be specified");
        } else if (this.topic == null) {
            throw new ConfigException("Topic must be specified by the Kafka log4j appender");
        } else {
            if (this.compressionType != null) {
                props.put("compression.type", this.compressionType);
            }

            props.put("acks", Integer.toString(this.requiredNumAcks));
            props.put("retries", this.retries);
            props.put("delivery.timeout.ms", this.deliveryTimeoutMs);
            if (this.securityProtocol != null) {
                props.put("security.protocol", this.securityProtocol);
            }

            if (this.securityProtocol != null
                    && this.securityProtocol.contains("SSL")
                    && this.sslTruststoreLocation != null
                    && this.sslTruststorePassword != null) {
                props.put("ssl.truststore.location", this.sslTruststoreLocation);
                props.put("ssl.truststore.password", this.sslTruststorePassword);
                if (this.sslKeystoreType != null
                        && this.sslKeystoreLocation != null
                        && this.sslKeystorePassword != null) {
                    props.put("ssl.keystore.type", this.sslKeystoreType);
                    props.put("ssl.keystore.location", this.sslKeystoreLocation);
                    props.put("ssl.keystore.password", this.sslKeystorePassword);
                }
            }

            if (this.securityProtocol != null
                    && this.securityProtocol.contains("SASL")
                    && this.saslKerberosServiceName != null
                    && this.clientJaasConfPath != null) {
                props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);
                System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);
            }

            if (this.kerb5ConfPath != null) {
                System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);
            }

            if (this.saslMechanism != null) {
                props.put("sasl.mechanism", this.saslMechanism);
            }

            if (this.clientJaasConf != null) {
                props.put("sasl.jaas.config", this.clientJaasConf);
            }

            if (this.maxBlockMs != null) {
                props.put("max.block.ms", this.maxBlockMs);
            }

            props.put("key.serializer", ByteArraySerializer.class.getName());
            props.put("value.serializer", ByteArraySerializer.class.getName());
            System.out.println("Properties:" + props);
            this.producer = this.getKafkaProducer(props);
            LogLog.debug("Kafka producer connected to " + this.brokerList);
            LogLog.debug("Logging for topic: " + this.topic);
        }
    }

    /**
     * 设置过滤规则
     *
     * @name setFilterRules
     * @date 2023/3/2 下午1:57
     * @return void
     * @param excludes
     * @param excludeMatchSet
     * @param excludeSet
     * @author Jast
     */
    private void setFilterRules(
            String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {
        if (excludes != null) {
            for (String exclude : excludes.split(",")) {
                if (exclude.length() > 0) {
                    if (exclude.endsWith(".*")) {
                        excludeMatchSet.add(exclude.replace(".*", ""));
                    } else {
                        excludeSet.add(exclude);
                    }
                }
            }
        }
    }

    protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
        return new KafkaProducer(props);
    }

    @Override
    protected void append(LoggingEvent event) {

        if (filterPackageName(event)) {
            return;
        }

        String message = this.subAppend(event);

        LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
        Future<RecordMetadata> response =
                this.producer.send(
                        new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));
        if (this.syncSend) {
            try {
                response.get();
            } catch (ExecutionException | InterruptedException var5) {
                if (!this.ignoreExceptions) {
                    throw new RuntimeException(var5);
                }

                LogLog.debug("Exception while getting response", var5);
            }
        }
    }

    private String subAppend(LoggingEvent event) {
        return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);
    }

    @Override
    public void close() {
        if (!this.closed) {
            this.closed = true;
            this.producer.close();
        }
    }

    @Override
    public boolean requiresLayout() {
        return true;
    }

    /**
     * 过滤包名,如果为True则不发送到Kafka
     *
     * @name filterPackageName
     * @date 2023/2/28 下午4:07
     * @return boolean
     * @param event
     * @author Jast
     */
    private boolean filterPackageName(LoggingEvent event) {
        boolean flag = true;
        if (includeSet.size() == 0
                && includeMatchSet.size() == 0
                && excludeSet.size() == 0
                && excludeMatchSet.size() == 0) {
            return false;
        }
        if (includeSet.size() == 0 && includeMatchSet.size() == 0) {
            flag = false;
        }
        /** 打印日志类/名称 */
        String loggerName = event.getLoggerName();
        for (String include : includeSet) {
            if (loggerName.equals(include)) {
                flag = false;
            }
        }
        for (String include : includeMatchSet) {
            if (loggerName.startsWith(include)) {
                flag = false;
            }
        }
        for (String exclude : excludeMatchSet) {
            if (loggerName.startsWith(exclude)) {
                flag = true;
            }
        }
        for (String exclude : excludeSet) {
            if (loggerName.equals(exclude)) {
                flag = true;
            }
        }

        return flag;
    }
}

修改log4j.properties配置

修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf

不同的集群可能配置文件所在目录不同

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx

启动命令配置添加参数

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \

说明:

  • kafka-appender-1.0.0.jar 为我们刚刚自定义的KafkaLog4jAppender类打成的jar包
  • slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
  • slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交

启动之后可以在Kafka中查询发送数据

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}

这里有个问题net.logstash.log4j.JSONEventLayoutV1实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout

时区问题-自定义实现JSONLayout解决

JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志

自定义JSONLayout.java

在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java类,内容如下

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;

public class JSONLayout extends Layout {

    private boolean locationInfo = false;
    private String customUserFields;

    private boolean ignoreThrowable = false;

    private boolean activeIgnoreThrowable = ignoreThrowable;
    private String hostname = InetAddress.getLocalHost().getHostName();
    private String threadName;
    private long timestamp;
    private String ndc;
    private Map mdc;
    private LocationInfo info;
    private HashMap<String, Object> exceptionInformation;
    private static Integer version = 1;

    private JSONObject logstashEvent;

    public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");
    public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =
            FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);
    public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";

    public static String dateFormat(long timestamp) {
        return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);
    }

    /**
     * For backwards compatibility, the default is to generate location information in the log
     * messages.
     */
    public JSONLayout() throws UnknownHostException {
        this(true);
    }

    /**
     * Creates a layout that optionally inserts location information into log messages.
     *
     * @param locationInfo whether or not to include location information in the log messages.
     */
    public JSONLayout(boolean locationInfo) throws UnknownHostException {
        this.locationInfo = locationInfo;
    }

    @Override
    public String format(LoggingEvent loggingEvent) {
        threadName = loggingEvent.getThreadName();
        timestamp = loggingEvent.getTimeStamp();
        exceptionInformation = new HashMap<String, Object>();
        mdc = loggingEvent.getProperties();
        ndc = loggingEvent.getNDC();

        logstashEvent = new JSONObject();
        String whoami = this.getClass().getSimpleName();

        /**
         * All v1 of the event format requires is "@timestamp" and "@version" Every other field is
         * arbitrary
         */
        logstashEvent.put("@version", version);
        logstashEvent.put("@timestamp", dateFormat(timestamp));

        /** Extract and add fields from log4j config, if defined */
        if (getUserFields() != null) {
            String userFlds = getUserFields();
            LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);
            addUserFields(userFlds);
        }

        /**
         * Extract fields from system properties, if defined Note that CLI props will override
         * conflicts with log4j config
         */
        if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {
            if (getUserFields() != null) {
                LogLog.warn(
                        "["
                                + whoami
                                + "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");
            }
            String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);
            LogLog.debug(
                    "[" + whoami + "] Got user data from system property: " + userFieldsProperty);
            addUserFields(userFieldsProperty);
        }

        /** Now we start injecting our own stuff. */
        logstashEvent.put("source_host", hostname);
        logstashEvent.put("message", loggingEvent.getRenderedMessage());

        if (loggingEvent.getThrowableInformation() != null) {
            final ThrowableInformation throwableInformation =
                    loggingEvent.getThrowableInformation();
            if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {
                exceptionInformation.put(
                        "exception_class",
                        throwableInformation.getThrowable().getClass().getCanonicalName());
            }
            if (throwableInformation.getThrowable().getMessage() != null) {
                exceptionInformation.put(
                        "exception_message", throwableInformation.getThrowable().getMessage());
            }
            if (throwableInformation.getThrowableStrRep() != null) {
                String stackTrace =
                        StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");
                exceptionInformation.put("stacktrace", stackTrace);
            }
            addEventData("exception", exceptionInformation);
        }

        if (locationInfo) {
            info = loggingEvent.getLocationInformation();
            addEventData("file", info.getFileName());
            addEventData("line_number", info.getLineNumber());
            addEventData("class", info.getClassName());
            addEventData("method", info.getMethodName());
        }

        addEventData("logger_name", loggingEvent.getLoggerName());
        addEventData("mdc", mdc);
        addEventData("ndc", ndc);
        addEventData("level", loggingEvent.getLevel().toString());
        addEventData("thread_name", threadName);

        return logstashEvent.toString() + "\n";
    }

    @Override
    public boolean ignoresThrowable() {
        return ignoreThrowable;
    }

    /**
     * Query whether log messages include location information.
     *
     * @return true if location information is included in log messages, false otherwise.
     */
    public boolean getLocationInfo() {
        return locationInfo;
    }

    /**
     * Set whether log messages should include location information.
     *
     * @param locationInfo true if location information should be included, false otherwise.
     */
    public void setLocationInfo(boolean locationInfo) {
        this.locationInfo = locationInfo;
    }

    public String getUserFields() {
        return customUserFields;
    }

    public void setUserFields(String userFields) {
        this.customUserFields = userFields;
    }

    @Override
    public void activateOptions() {
        activeIgnoreThrowable = ignoreThrowable;
    }

    private void addUserFields(String data) {
        if (null != data) {
            String[] pairs = data.split(",");
            for (String pair : pairs) {
                String[] userField = pair.split(":", 2);
                if (userField[0] != null) {
                    String key = userField[0];
                    String val = userField[1];
                    addEventData(key, val);
                }
            }
        }
    }

    private void addEventData(String keyname, Object keyval) {
        if (null != keyval) {
            logstashEvent.put(keyname, keyval);
        }
    }
}

相关依赖

<dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.21</version>
          <scope>provided</scope>
        </dependency>

打包上传服务器准备运行

启动命令中将kafka-appender-1.0.0.jar以及相关依赖添加

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \

启动后查看数据,发现@timestamp时间正常了

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}

一键应用

查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。

为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下

  1. maven中引用依赖

<dependency>
            <groupId>com.gitee.jastee</groupId>
            <artifactId>kafka-log4j-appender</artifactId>
            <version>1.0.5</version>
        </dependency>

  1. 在代码中使用Log打印日志
  2. 修改Spark配置文件log4j.properties

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx

  1. 启动命令添加

使用--conf 指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf解压,jar -cvfM在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \

  1. 启动程序

可能遇到的异常

ClassNotFoundException: xxx.KafkaLog4jAppender

启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类

log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender

原因:

因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPathspark.driver.extraClassPath将我们自定义kafka-appender-1.0.0.jar(jar包中的类就是KafkaLog4jAppender.java)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可

解决方法:

在启动脚本添加

--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \

Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException

Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerException
	at org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)
	at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)
	at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
	at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
	at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
	at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
	at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
	at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
	at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
	at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
	at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
	at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
	at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
	at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
	at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerError
	at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
	at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
	at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
	at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
	at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
	at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
	at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
	at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
	at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
	at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
	at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
	at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
	at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
	... 27 more

原因:

使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar

解决方法:

使用slf4j的1.8.0-beta2版本

<dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.8.0-beta2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.8.0-beta2</version>
        </dependency>

通过spark.driver.extraClassPathspark.executor.extraClassPath参数提交

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \

举报

相关推荐

0 条评论