234

有没有办法清除kafka中的主题?

我将一条太大的消息推送到本地机器上的 kafka 消息主题中,现在出现错误:

kafka.common.InvalidMessageSizeException: invalid message size

在这里增加fetch.size并不理想,因为我实际上不想接受那么大的消息。

4

24 回答 24

423

暂时将主题的保留时间更新为一秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在较新的 Kafka 版本中,您还可以使用kafka-configs --entity-type topics

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。清除后,恢复之前的retention.ms值。

于 2015-04-16T10:43:07.797 回答
90

要清除队列,您可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
于 2015-03-24T12:54:57.450 回答
51

以下是我删除名为 的主题的步骤MyTopic

  1. 描述主题,不要使用代理 ID
  2. 为列出的每个代理 ID 停止 Apache Kafka 守护程序。
  3. 连接到每个代理,并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0. 对其他分区和所有副本重复
  4. 删除主题元数据:zkCli.sh然后rmr /brokers/MyTopic
  5. 为每台停止的机器启动 Apache Kafka 守护进程

如果您错过了第 3 步,则 Apache Kafka 将继续报告主题为存在(例如,当您运行时kafka-list-topic.sh)。

使用 Apache Kafka 0.8.0 进行测试。

于 2014-02-19T13:32:42.010 回答
51

虽然接受的答案是正确的,但该方法已被弃用。主题配置现在应该通过kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

通过该方法设置的配置可以用命令显示

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
于 2016-04-21T17:56:09.867 回答
42

在 Kafka 0.8.2 中测试,快速入门示例: 首先,在 config 文件夹下的 server.properties 文件中添加一行:

delete.topic.enable=true

然后,您可以运行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它,让客户端继续对空主题进行操作

于 2015-06-14T20:02:28.613 回答
11

从卡夫卡 1.1

清除主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

等待至少 1 分钟,以确保 kafka 清除主题删除配置,然后转到默认值

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
于 2018-10-09T11:13:34.777 回答
10

以下命令可用于删除 kafka 主题中的所有现有消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

delete.json 文件的结构应如下所示:

{“分区”:[{“主题”:“foo”,“分区”:1,“偏移”:-1}],“版本”:1}

其中 offset :-1 将删除所有记录(此命令已用 kafka 2.0.1 测试过

于 2020-07-25T11:09:38.230 回答
7

kafka 没有用于清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来做到这一点。

首先确保 sever.properties 文件有,如果没有添加delete.topic.enable=true

然后,删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

然后再次创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
于 2017-10-09T10:55:18.887 回答
7

在@steven appleyard 回答之后,我在 Kafka 2.2.0 上执行了以下命令,它们为我工作。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
于 2019-08-08T07:33:50.840 回答
5

更新:这个答案与 Kafka 0.6 有关。对于 Kafka 0.8 及更高版本,请参阅@Patrick 的回答。

是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易找到它)。kafka 重启后,主题将为空。

于 2013-05-01T06:56:55.710 回答
5

这里有很多很棒的答案,但在其中,我没有找到关于 docker 的答案。我花了一些时间弄清楚在这种情况下使用代理容器是错误的(显然!!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该使用zookeeper:2181而不是--zookeeper localhost:2181按照我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望它会节省某人的时间。

另外,请注意消息不会立即删除,并且会在日志段关闭时发生。

于 2020-01-08T06:43:04.777 回答
5

有时,如果你有一个饱和的集群(太多的分区,或者使用加密的主题数据,或者使用 SSL,或者控制器在一个坏节点上,或者连接不稳定,那么清除所述主题需要很长时间.

我遵循这些步骤,特别是如果您使用的是 Avro。

1:使用kafka工具运行:

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2:运行:

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3:将主题保留设置回原来的设置,一旦主题为空。

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这对某人有所帮助,因为它不容易做广告。

于 2018-02-15T15:30:18.493 回答
4

最简单的方法是将各个日志文件的日期设置为早于保留期。然后经纪人应该在几秒钟内为您清理并删除它们。这提供了几个优点:

  1. 无需关闭代理,这是一个运行时操作。
  2. 避免无效偏移异常的可能性(更多内容见下文)。

根据我使用 Kafka 0.7.x 的经验,删除日志文件并重新启动代理可能会导致某些消费者出现无效的偏移异常。之所以会发生这种情况,是因为代理会在零处重新启动偏移量(在没有任何现有日志文件的情况下),并且之前从主题消费的消费者将重新连接以请求特定的 [曾经有效的] 偏移量。如果此偏移量恰好超出新主题日志的范围,则不会造成任何伤害,并且消费者会在开始或结束时恢复。但是,如果偏移量落在新主题日志的范围内,代理会尝试获取消息集但失败,因为偏移量与实际消息不一致。

这可以通过在 zookeeper 中为该主题清除消费者偏移量来缓解。但是,如果您不需要原始主题并且只想删除现有内容,那么只需“触摸”一些主题日志就比停止代理、删除主题日志和清除某些 Zookeeper 节点更容易、更可靠.

于 2014-06-06T20:09:22.723 回答
4

Thomas 的建议很棒,但不幸zkCli的是,旧版本的 Zookeeper(例如 3.3.6)似乎不支持rmr. 例如,将现代 Zookeeper中的命令行实现与3.3 版本进行比较。

如果您遇到旧版本的 Zookeeper,一个解决方案是使用客户端库,例如用于 Python的zc.zk。对于不熟悉 Python 的人,您需要使用pipeasy_install安装它。然后启动一个 Python shell ( python),你可以这样做:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

甚至

zk.delete_recursive('brokers')

如果您想从 Kafka 中删除所有主题。

于 2014-06-03T15:49:51.480 回答
4

除了更新retention.ms 和retention.bytes,我注意到主题清理策略应该是“delete”(默认),如果“compact”,它将保留更长时间的消息,即,如果它是“compact”,你有还要指定delete.retention.ms

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还必须监视最早/最新的偏移量应该相同以确认这是否成功发生,也可以检查 du -h /tmp/kafka-logs/test-topic-3-100-*

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

另一个问题是,您必须先获取当前配置,以便您记得在删除成功后恢复: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

于 2016-06-14T00:02:30.683 回答
2

使用您的应用程序组清理来自特定主题的所有消息(GroupName 应该与应用程序 kafka 组名称相同)。

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

于 2015-03-25T17:37:09.220 回答
2

另一种相当手动的清除主题的方法是:

在经纪人:

  1. 停止卡夫卡经纪人
    sudo service kafka stop
  2. 删除所有分区日志文件(应该在所有代理上完成)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

在动物园管理员中:

  1. 运行zookeeper命令行界面
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. 使用 zkCli 删除主题元数据
    rmr /brokers/topic/<some_topic_name>

再次在经纪人中:

  1. 重启代理服务
    sudo service kafka start
于 2018-10-02T15:18:22.007 回答
2
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

这应该给retention.ms配置。然后您可以使用上面的 alter 命令更改为 1 秒(稍后恢复为默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000
于 2018-11-18T06:55:25.940 回答
2

从 Java 中,使用 newAdminZkClient而不是 deprecated AdminUtils

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }
于 2019-06-03T14:57:34.947 回答
2

如果您想在 Java 应用程序中以编程方式执行此操作,您可以使用 AdminClient 的 API deleteRecords。使用 AdminClient 允许您删除分区和偏移级别的记录。

根据JavaDocs,0.11.0.0 或更高版本的代理支持此操作。

这是一个简单的例子:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}
于 2021-04-28T06:53:58.223 回答
1

user644265在此答案中建议的暂时减少主题保留时间的解决方法仍然有效,但最新版本kafka-configs会警告该--zookeeper选项已被弃用:

警告:--zookeeper 已弃用,将在未来版本的 Kafka 中删除

改为使用--bootstrap-server;例如

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
于 2021-11-16T06:45:42.423 回答
0
# you have to enable this on config
sudo echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 
# purge the topic
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

# create the topic
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
# list the topic
# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning
于 2021-10-11T17:16:44.490 回答
-1

这是删除主题的命令,如果您正在使用confluentinc/cp-kafka容器。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功响应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
于 2021-09-08T19:35:09.187 回答
-2

您是否考虑过让您的应用程序只使用一个新的重命名主题?(即与原始主题类似但在末尾附加“1”的主题)。

这也将为您的应用程序提供一个全新的主题。

于 2021-02-03T17:56:45.417 回答