96

有没有办法从主题中删除所有数据或在每次运行之前删除主题?

我可以修改 KafkaConfig.scala 文件来更改logRetentionHours属性吗?有没有办法在消费者阅读后立即删除消息?

我正在使用生产者从某处获取数据并将数据发送到消费者消费的特定主题,我可以在每次运行时从该主题中删除所有数据吗?我每次在主题中只想要新数据。有没有办法以某种方式重新初始化主题?

4

14 回答 14

76

正如我在这里提到的Purge Kafka Queue

在 Kafka 0.8.2 中测试,快速入门示例: 首先,在 config 文件夹下的 server.properties 文件中添加一行:

delete.topic.enable=true

然后,您可以运行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
于 2015-06-14T20:06:04.703 回答
70

不要认为它还受支持。看看这个JIRA 问题“添加删除主题支持”。

手动删除:

  1. 关闭集群
  2. 清理 kafka 日志目录(由kafka配置log.dir文件中的属性指定)以及 zookeeper 数据
  3. 重启集群

对于任何给定的主题,您可以做的是

  1. 停止卡夫卡
  2. 清理特定于分区的 kafka 日志,kafka 以“logDir/topic-partition”格式存储其日志文件,因此对于名为“MyTopic”的主题,分区 id 0 的日志将存储在/tmp/kafka-logs/MyTopic-0属性指定/tmp/kafka-logs的位置log.dir
  3. 重启卡夫卡

这是NOT一个很好的推荐方法,但它应该有效。在 Kafka 代理配置文件中,该log.retention.hours.per.topic属性用于定义The number of hours to keep a log file before deleting it for some specific topic

此外,有没有办法在消费者阅读后立即删除消息?

卡夫卡文档

Kafka 集群会在可配置的时间段内保留所有已发布的消息(无论它们是否已被使用)。例如,如果将日志保留时间设置为两天,那么在消息发布后的两天内,它都可以使用,之后将被丢弃以释放空间。就数据大小而言,Kafka 的性能实际上是恒定的,因此保留大量数据不是问题。

事实上,每个消费者保留的唯一元数据是消费者在日志中的位置,称为“偏移量”。这个偏移量是由消费者控制的:通常消费者在读取消息时会线性增加偏移量,但实际上位置是由消费者控制的,它可以按照自己喜欢的任何顺序消费消息。例如,消费者可以重置为较旧的偏移量以重新处理。

为了找到要在 Kafka 0.8 Simple Consumer 示例中读取的起始偏移量,他们说

Kafka 包含两个常量来帮助,kafka.api.OffsetRequest.EarliestTime()在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime()只会流式传输新消息。

您还可以在此处找到用于管理消费者端偏移量的示例代码。

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
于 2013-08-23T22:14:08.013 回答
15

用 kafka 0.10 测试

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

注意:如果您要删除 kafka-logs 中的主题文件夹,而不是从 zookeeper-data 文件夹中删除,那么您会看到主题仍然存在。

于 2016-09-15T11:59:01.310 回答
9

作为一个肮脏的解决方法,您可以调整每个主题的运行时保留设置,例如bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1retention.bytes=0也可能有效)

片刻之后,kafka 应该释放空间。与重新创建主题相比,不确定这是否有任何影响。

附言。一旦 kafka 完成清洁,最好恢复保留设置。

您还可以retention.ms用来持久化历史数据

于 2014-08-06T17:10:13.280 回答
8

下面是用于清空和删除 Kafka 主题的脚本,假设 localhost 作为 zookeeper 服务器并且 Kafka_Home 设置为安装目录:

下面的脚本将通过将保留时间设置为 1 秒然后删除配置来清空主题:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

完全删除主题,您必须停止任何适用的 kafka 代理并从 kafka 日志目录(默认值:/tmp/kafka-logs)中删除它的目录,然后运行此脚本以从 zookeeper 中删除主题。为了验证它是否已从 zookeeper 中删除, ls /brokers/topics 的输出不应再包含主题:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
于 2016-12-06T16:32:42.550 回答
7

我们几乎尝试了其他答案所描述的内容,并取得了中等程度的成功。真正对我们有用的是类命令(Apache Kafka 0.8.1)

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181

于 2014-11-20T19:07:44.773 回答
5

对于 brew 用户

如果您brew像我一样使用并浪费大量时间搜索臭名昭著的kafka-logs文件夹,请不要再害怕。(请让我知道这是否适用于您以及 Homebrew、Kafka 等的多个不同版本 :))

您可能会在以下位置找到它:

地点:

/usr/local/var/lib/kafka-logs


如何实际找到该路径

(这对您通过 brew 安装的每个应用程序都有帮助)

1)brew services list

kafka 启动 matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2)打开并阅读plist您在上面找到的内容

3)找到定义server.properties位置的行打开它,在我的例子中:

  • /usr/local/etc/kafka/server.properties

4)寻找log.dirs线:

log.dirs=/usr/local/var/lib/kafka-logs

5) 转到该位置并删除您想要的主题的日志

6)重启卡夫卡brew services restart kafka

于 2017-09-27T00:39:05.450 回答
2

有关主题及其分区的所有数据都存储在tmp/kafka-logs/. 此外,它们以一种格式存储topic-partionNumber,所以如果你想删除一个主题newTopic,你可以:

  • 停止卡夫卡
  • 删除文件rm -rf /tmp/kafka-logs/newTopic-*
于 2015-01-26T06:27:29.340 回答
2

从 kafka 2.3.0 版本开始,还有一种软删除 Kafka 的替代方法(不推荐使用旧方法)。

将retention.ms 更新为1 秒(1000 毫秒),然后在一分钟后再次将其设置为默认设置,即7 天(168 小时,604,800,000 毫秒)

软删除 :-(rentention.ms=1000)(使用kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

设置为默认值:- 7 天(168 小时,retention.ms= 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
于 2019-09-08T17:22:29.680 回答
1
  1. 停止 ZooKeeper 和 Kafka
  2. 在 server.properties 中,更改 log.retention.hours 值。您可以评论log.retention.hours和添加log.retention.ms=1000。它只会在 Kafka Topic 上保留一秒钟的记录。
  3. 启动 zookeeper 和 kafka。
  4. 检查消费者控制台。当我第一次打开控制台时,记录就在那里。但是当我再次打开控制台时,记录被删除了。
  5. 稍后,您可以将 的值设置log.retention.hours为您想要的数字。
于 2017-04-20T10:57:49.680 回答
1

在我的集成测试运行后,我使用下面的实用程序进行清理。

它使用最新的AdminZkClientapi。旧的 api 已被弃用。

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

有一个选项删除主题。但是,它标志着删除的主题。Zookeeper 稍后会删除该主题。由于这可能会非常长,我更喜欢retention.ms 方法

于 2019-10-01T10:36:01.650 回答
0

在从 kafka 集群中手动删除主题时,您可以查看https://github.com/darrenfu/bigdata/issues/6 在大多数解决方案中遗漏的重要步骤是删除/config/topics/<topic_name>ZK 中的主题。

于 2017-06-16T10:50:28.057 回答
0

我使用这个脚本:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
于 2019-06-07T13:06:26.653 回答
0

清理主题数据有两种解决方案

  1. 将 zookeeper dataDir 路径“dataDir=/dataPath”更改为其他值,删除 kafka logs 文件夹并重启 zookeeper 和 kafka 服务器

  2. 从 zookeeper 服务器运行 zkCleanup.sh

于 2021-10-04T05:49:40.153 回答