0

我正在开发一个 Web 应用程序,我将在其中上传一个日志文件,该文件将根据记录器级别(信息、错误、警告等)被读取和分类。我需要使用 java 高级别的客户端 api 将这些日志索引到 elasticsearch。

目前我正在为类名创建一个索引(日志将包含类名)并将这些类日志存储在该特定索引中。如果日志文件包含来自 100 个不同类的日志,我觉得这种方法在某些情况下不会很好,我将为它创建 100 个索引并存储这些日志。

有什么有效的方法可以将日志索引到elasticsearch?在我的情况下如何确定索引?

示例日志:

2021 年 7 月 2 日|10:03:10.040|INFO|[main]|org.apache.catalina.startup.VersionLoggerListener.log|服务器建成时间:2021 年 6 月 11 日 13:32:01 UTC

4

1 回答 1

1

这是我们在一些基于 Java 的堆栈中使用的实践,它具有许多特权,可以将 Apache Kafka 用作中间数据管道,将 logstash 用作数据摄取管道。首先,您需要在 pom.xml 文件中删除 Spring Boot 应用程序中日志的默认提供程序,它们是 Logback 并且可能是 Log-classic,然后您需要添加 log4j2 作为新的日志提供程序并添加 Kafka appender。添加依赖项后,您需要 xml 配置文件,您可以在其中添加 Kafka appender 配置。默认情况下,您需要在项目的资源路径中找到您的配置文件,并将其命名为“log4j2.xml”。

您可以找到许多其他 Log4j2 附加程序,例如 Cassandra 或故障转移附加程序,并将它们添加到配置文件中的 Kafka 附加程序旁边。您可以在下面找到一个适用且正确的示例。

<!--excluding logback -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


<!--adding log4j2 and kafka appender-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

卡夫卡追加器配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="kafka-appender" packages="Citydi.ElasticDemo">

    <Appenders>

        <Kafka name="kafkaLogAppender" topic="Second-Topic">
            <JSONLayout />
            <Property name="bootstrap.servers">localhost:9092</Property>
            <MarkerFilter marker="Recorder" onMatch="DENY" onMismatch="ACCEPT"/>
        </Kafka>

        <Console name="stdout" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} stdout %highlight{%-5p} [%-7t] %F:%L - %m%n"/>
            <MarkerFilter marker="Recorder" onMatch="DENY" onMismatch="ACCEPT"/>
        </Console>

    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="kafkaLogAppender"/>
            <AppenderRef ref="stdout"/>
        </Root>
        <Logger name="org.apache.kafka" level="warn" />
    </Loggers>

</Configuration>

激活 Zookeeper 代理

./zookeeper-server-start.sh ../config/zookeeper.properties

激活 Kafka 代理

./kafka-server-start.sh ../config/server.properties

创建主题

./kafka-topics.sh --create --topic test-topic -zookeeper localhost:2181 --replication-factor 1 --partitions 4

已创建主题的活跃消费者

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

然后为创建的主题添加日志附加器以使用日志(这个由您决定),然后创建一个 Logstash 管道,如下面的配置,将您的日志摄取到您想要的 elastic 索引中。

input {
    kafka{
        group_id => "35834"
        topics => ["yourtopicname"]
        bootstrap_servers => "localhost:9092"
        codec => json
    }
}

filter {

}

output {
    file {
        path => "C:\somedirectory"
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        document_type => "_doc"
        index => "yourindexname"
    }
    stdout { codec => rubydebug
    }
}

于 2021-08-04T13:26:18.720 回答