1

我在我的网络项目中使用 log4j2。我试图通过扩展 abstractAppender 将日志直接放入 kafka。根据文档,我的理解是我可以为自定义附加程序指定模式布局,并且设置好之后,我的记录器将使用格式化字符串将日志事件发送到 kafka,但这并没有发生。log4j2.xml 看起来像

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" packages="com.abc.webservice.log.appender">

    <Appenders>
        <Console name="console" target="SYSTEM_OUT">
            <PatternLayout>
                <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n</pattern>
            </PatternLayout>
        </Console>
        <Kafka name="kafka" topic="test">
            <PatternLayout>
                <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n</pattern>
            </PatternLayout>
            <Property name="metadata.broker.list">127.0.0.1:9092</Property>
            <Property name="serializer.class">kafka.serializer.StringEncoder</Property>
        </Kafka>
    </Appenders>

    <Loggers>
        <AsyncLogger name="async">
            <AppenderRef ref="kafka" />
            <AppenderRef ref="console" />
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="console" />
            <AppenderRef ref="kafka" />
        </Root>
        <Logger name="com.abc" level="debug">
<!--            <appender-ref ref="console" level="debug"/>-->
            <!--<appender-ref ref="kafka" level="debug"/>-->
            <!--<appender-ref ref="console" level="error"/>-->
            <appender-ref ref="kafka" level="error"/>
        </Logger>

        <Logger name="org.hibernate.SQL" >     
            <appender-ref ref="kafka" level="info" />
            <appender-ref ref="console" level="info"/>
        </Logger>

        <Logger name="org.hibernate.type">
            <appender-ref ref="console" level="info"/>
            <appender-ref ref="kafka" level="info"/>
        </Logger>

        <Root level="info">
            <AppenderRef ref="kafka"/>
            <AppenderRef ref="console"/>
        </Root>

    </Loggers>
</Configuration>

如果我使用控制台附加程序,则日志以正确的格式出现,但是当我使用自定义附加程序时,收到的日志没有格式。如何使用指定的模式布局将日志发送到 kafka。

请找到我的附加程序实现

import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.logging.log4j.core.util.Booleans;
import org.apache.logging.log4j.message.Message;

@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {

    private final Lock lock = new ReentrantLock();

    private KafkaManager manager;

    protected KafkaAppender(String name, Filter filter, Layout layout, boolean ignoreExceptions, KafkaManager manager) {
        super(name, filter, layout, ignoreExceptions);
                System.err.println("hello world hello");
        this.manager = manager;
    }

    @PluginFactory
    public static KafkaAppender createAppender(@PluginAttribute("name") final String name, @PluginElement("Filter") final Filter filter,
            @PluginAttribute("ignoreExceptions") final String ignore, @PluginAttribute("topic") final String topic,
            @PluginElement("Properties") final Property[] properties, @PluginElement("layout") final Layout layout) {
        boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
        KafkaManager kafkaManager = KafkaManager.getKafkaManager(name, topic, properties);
        if (kafkaManager == null) {
            return null;
        }

//                Layout patternLayout = PatternLayout.createLayout("%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n",
//                        null, null, null, true, false, null, null);  
//                System.err.println(patternLayout.toString());
        return new KafkaAppender(name, filter, layout, ignoreExceptions, kafkaManager);
    }

    @Override
    public final void start() {
        if (this.getManager() == null) {
            LOGGER.error("No KafkaManager set for the appender named [{}].", this.getName());
        }
        super.start();
        if (this.getManager() != null) {
            this.getManager().startup();
        }
    }

    @Override
    public final void stop() {
        super.stop();
        if (this.getManager() != null) {
            this.getManager().release();
        }
    }

    public final KafkaManager getManager() {
        return this.manager;
    }

    public void append(LogEvent event) {
        this.lock.lock();
        try {
            String s = event.getMessage().getFormattedMessage();
                        Message logEvent1 = event.getMessage();
                        String sp = logEvent1.getFormattedMessage();
                        this.getManager().send(event.getMessage().getFormattedMessage());
        } catch (final Exception e) {
            LOGGER.error("Unable to write to kafka [{}] for appender [{}].", this.getManager().getName(), this.getName(), e);
            throw new AppenderLoggingException("Unable to write to kafka in appender: " + e.getMessage(), e);
        } finally {
            this.lock.unlock();
        }
    }

    @Override
    public Layout<? extends Serializable> getLayout() {
            Layout patternLayout = PatternLayout.createLayout("%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n",
                        null, null, null, true, false, null, null);      
            return patternLayout;
    }

}
4

1 回答 1

0

在 KafkaAppender 类中,应调用 append 方法getLayout().toByteArray(event)来格式化事件。

我注意到示例代码覆盖了getLayout. 我不会推荐这个。getLayout 的 AbstractAppender 实现返回配置的布局,它允许您在配置中控制布局而无需更改代码。

@Override
public void append(LogEvent event) {
    this.lock.lock();
    try {
        // let the Layout format the data in the LogEvent object
        final byte[] bytes = getLayout().toByteArray(event);

        // then pass the byte[] array with the formatted event to the manager
        // (I assume that your manager provides this method)
        manager.write(bytes, 0, bytes.length);

    } catch (Exception e) {
        LOGGER.error("Unable to write to kafka [{}] for appender [{}].",
               this.getManager().getName(), this.getName(), e);
        if (!ignoreExceptions()) {
            throw new AppenderLoggingException(
                "Unable to write to kafka in appender: " + e.getMessage(), e);
        }
    } finally {
        this.lock.unlock();
    }
}

// I would recommend not to override getLayout.
// The AbstractAppender implementation of getLayout returns the configured
// layout, which allows you to control the layout in configuration
// without code changes.
// @Override
// public Layout<? extends Serializable> getLayout() {...
于 2015-08-03T05:58:08.680 回答