有没有办法从主题中删除所有数据或在每次运行之前删除主题?
我可以修改 KafkaConfig.scala 文件来更改logRetentionHours
属性吗?有没有办法在消费者阅读后立即删除消息?
我正在使用生产者从某处获取数据并将数据发送到消费者消费的特定主题,我可以在每次运行时从该主题中删除所有数据吗?我每次在主题中只想要新数据。有没有办法以某种方式重新初始化主题?
有没有办法从主题中删除所有数据或在每次运行之前删除主题?
我可以修改 KafkaConfig.scala 文件来更改logRetentionHours
属性吗?有没有办法在消费者阅读后立即删除消息?
我正在使用生产者从某处获取数据并将数据发送到消费者消费的特定主题,我可以在每次运行时从该主题中删除所有数据吗?我每次在主题中只想要新数据。有没有办法以某种方式重新初始化主题?
正如我在这里提到的Purge Kafka Queue:
在 Kafka 0.8.2 中测试,快速入门示例: 首先,在 config 文件夹下的 server.properties 文件中添加一行:
delete.topic.enable=true
然后,您可以运行以下命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
不要认为它还受支持。看看这个JIRA 问题“添加删除主题支持”。
手动删除:
log.dir
文件中的属性指定)以及 zookeeper 数据对于任何给定的主题,您可以做的是
/tmp/kafka-logs/MyTopic-0
属性指定/tmp/kafka-logs
的位置log.dir
这是NOT
一个很好的推荐方法,但它应该有效。在 Kafka 代理配置文件中,该log.retention.hours.per.topic
属性用于定义The number of hours to keep a log file before deleting it for some specific topic
此外,有没有办法在消费者阅读后立即删除消息?
从卡夫卡文档:
Kafka 集群会在可配置的时间段内保留所有已发布的消息(无论它们是否已被使用)。例如,如果将日志保留时间设置为两天,那么在消息发布后的两天内,它都可以使用,之后将被丢弃以释放空间。就数据大小而言,Kafka 的性能实际上是恒定的,因此保留大量数据不是问题。
事实上,每个消费者保留的唯一元数据是消费者在日志中的位置,称为“偏移量”。这个偏移量是由消费者控制的:通常消费者在读取消息时会线性增加偏移量,但实际上位置是由消费者控制的,它可以按照自己喜欢的任何顺序消费消息。例如,消费者可以重置为较旧的偏移量以重新处理。
为了找到要在 Kafka 0.8 Simple Consumer 示例中读取的起始偏移量,他们说
Kafka 包含两个常量来帮助,
kafka.api.OffsetRequest.EarliestTime()
在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime()
只会流式传输新消息。
您还可以在此处找到用于管理消费者端偏移量的示例代码。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
用 kafka 0.10 测试
1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.
注意:如果您要删除 kafka-logs 中的主题文件夹,而不是从 zookeeper-data 文件夹中删除,那么您会看到主题仍然存在。
作为一个肮脏的解决方法,您可以调整每个主题的运行时保留设置,例如bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
(retention.bytes=0也可能有效)
片刻之后,kafka 应该释放空间。与重新创建主题相比,不确定这是否有任何影响。
附言。一旦 kafka 完成清洁,最好恢复保留设置。
您还可以retention.ms
用来持久化历史数据
下面是用于清空和删除 Kafka 主题的脚本,假设 localhost 作为 zookeeper 服务器并且 Kafka_Home 设置为安装目录:
下面的脚本将通过将保留时间设置为 1 秒然后删除配置来清空主题:
#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms
要完全删除主题,您必须停止任何适用的 kafka 代理并从 kafka 日志目录(默认值:/tmp/kafka-logs)中删除它的目录,然后运行此脚本以从 zookeeper 中删除主题。为了验证它是否已从 zookeeper 中删除, ls /brokers/topics 的输出不应再包含主题:
#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
我们几乎尝试了其他答案所描述的内容,并取得了中等程度的成功。真正对我们有用的是类命令(Apache Kafka 0.8.1)
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181
如果您brew
像我一样使用并浪费大量时间搜索臭名昭著的kafka-logs
文件夹,请不要再害怕。(请让我知道这是否适用于您以及 Homebrew、Kafka 等的多个不同版本 :))
您可能会在以下位置找到它:
/usr/local/var/lib/kafka-logs
(这对您通过 brew 安装的每个应用程序都有帮助)
1)brew services list
kafka 启动 matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist
2)打开并阅读plist
您在上面找到的内容
3)找到定义server.properties
位置的行打开它,在我的例子中:
/usr/local/etc/kafka/server.properties
4)寻找log.dirs
线:
log.dirs=/usr/local/var/lib/kafka-logs
5) 转到该位置并删除您想要的主题的日志
6)重启卡夫卡brew services restart kafka
有关主题及其分区的所有数据都存储在tmp/kafka-logs/
. 此外,它们以一种格式存储topic-partionNumber
,所以如果你想删除一个主题newTopic
,你可以:
rm -rf /tmp/kafka-logs/newTopic-*
从 kafka 2.3.0 版本开始,还有一种软删除 Kafka 的替代方法(不推荐使用旧方法)。
将retention.ms 更新为1 秒(1000 毫秒),然后在一分钟后再次将其设置为默认设置,即7 天(168 小时,604,800,000 毫秒)
软删除 :-(rentention.ms=1000)(使用kafka-configs.sh)
bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.
设置为默认值:- 7 天(168 小时,retention.ms= 604800000)
bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics --add-config retention.ms=604800000
log.retention.hours
和添加log.retention.ms=1000
。它只会在 Kafka Topic 上保留一秒钟的记录。 log.retention.hours
为您想要的数字。在我的集成测试运行后,我使用下面的实用程序进行清理。
它使用最新的AdminZkClient
api。旧的 api 已被弃用。
import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time
class ZookeeperUtils @Inject() (config: AppConfig) {
val testTopic = "users_1"
val zkHost = config.KafkaConfig.zkHost
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 60 * 1000
val isSecure = false
val maxInFlightRequests = 10
val time: Time = Time.SYSTEM
def cleanupTopic(config: AppConfig) = {
val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
val zkUtils = new AdminZkClient(zkClient)
val pp = new Properties()
pp.setProperty("delete.retention.ms", "10")
pp.setProperty("file.delete.delay.ms", "1000")
zkUtils.changeTopicConfig(testTopic , pp)
// zkUtils.deleteTopic(testTopic)
println("Waiting for topic to be purged. Then reset to retain records for the run")
Thread.sleep(60000L)
val resetProps = new Properties()
resetProps.setProperty("delete.retention.ms", "3000000")
resetProps.setProperty("file.delete.delay.ms", "4000000")
zkUtils.changeTopicConfig(testTopic , resetProps)
}
}
有一个选项删除主题。但是,它标志着删除的主题。Zookeeper 稍后会删除该主题。由于这可能会非常长,我更喜欢retention.ms 方法
在从 kafka 集群中手动删除主题时,您可以查看https://github.com/darrenfu/bigdata/issues/6
在大多数解决方案中遗漏的重要步骤是删除/config/topics/<topic_name>
ZK 中的主题。
我使用这个脚本:
#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do
for p in retention.ms retention.bytes segment.ms segment.bytes; do
kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
done
done
sleep 60
for t in $topics; do
for p in retention.ms retention.bytes segment.ms segment.bytes; do
kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
done
done
清理主题数据有两种解决方案
将 zookeeper dataDir 路径“dataDir=/dataPath”更改为其他值,删除 kafka logs 文件夹并重启 zookeeper 和 kafka 服务器
从 zookeeper 服务器运行 zkCleanup.sh