这是我们在一些基于 Java 的堆栈中使用的实践,它具有许多特权,可以将 Apache Kafka 用作中间数据管道,将 logstash 用作数据摄取管道。首先,您需要在 pom.xml 文件中删除 Spring Boot 应用程序中日志的默认提供程序,它们是 Logback 并且可能是 Log-classic,然后您需要添加 log4j2 作为新的日志提供程序并添加 Kafka appender。添加依赖项后,您需要 xml 配置文件,您可以在其中添加 Kafka appender 配置。默认情况下,您需要在项目的资源路径中找到您的配置文件,并将其命名为“log4j2.xml”。
您可以找到许多其他 Log4j2 附加程序,例如 Cassandra 或故障转移附加程序,并将它们添加到配置文件中的 Kafka 附加程序旁边。您可以在下面找到一个适用且正确的示例。
<!--excluding logback -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--adding log4j2 and kafka appender-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
卡夫卡追加器配置
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="kafka-appender" packages="Citydi.ElasticDemo">
<Appenders>
<Kafka name="kafkaLogAppender" topic="Second-Topic">
<JSONLayout />
<Property name="bootstrap.servers">localhost:9092</Property>
<MarkerFilter marker="Recorder" onMatch="DENY" onMismatch="ACCEPT"/>
</Kafka>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} stdout %highlight{%-5p} [%-7t] %F:%L - %m%n"/>
<MarkerFilter marker="Recorder" onMatch="DENY" onMismatch="ACCEPT"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="kafkaLogAppender"/>
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.kafka" level="warn" />
</Loggers>
</Configuration>
激活 Zookeeper 代理
./zookeeper-server-start.sh ../config/zookeeper.properties
激活 Kafka 代理
./kafka-server-start.sh ../config/server.properties
创建主题
./kafka-topics.sh --create --topic test-topic -zookeeper localhost:2181 --replication-factor 1 --partitions 4
已创建主题的活跃消费者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
然后为创建的主题添加日志附加器以使用日志(这个由您决定),然后创建一个 Logstash 管道,如下面的配置,将您的日志摄取到您想要的 elastic 索引中。
input {
kafka{
group_id => "35834"
topics => ["yourtopicname"]
bootstrap_servers => "localhost:9092"
codec => json
}
}
filter {
}
output {
file {
path => "C:\somedirectory"
}
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "yourindexname"
}
stdout { codec => rubydebug
}
}