44

我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入。下载 Kafka 后,我执行了以下步骤:

启动动物园管理员:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为“test”的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

消费者聆听:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

我想将一个数据文件甚至一个简单的文本文件传递给消费者可以直接看到的生产者,而不是标准输入。任何帮助将不胜感激。谢谢!

4

4 回答 4

95

您可以通过管道输入:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

在这里找到。

从 0.9.0 开始:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
于 2015-10-22T09:53:33.543 回答
12
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

在 Kafka-0.9.0 为我工作

于 2016-03-04T09:42:57.533 回答
7

这里有一些更通用的方法,但对于简单的文件可能有点过分

尾巴

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

解释

  1. tail随着文件的增长或日志不断添加到文件中,从文件末尾读取
  2. -n0表示输出最后 0 行,因此只选择新行
  3. -F按名称而不是描述符跟随文件,因此即使旋转它也可以工作

系统日志

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

消费

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

解释(来自man nc

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

参考

系统日志

于 2017-01-14T01:29:01.823 回答
1
echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
于 2016-10-28T18:37:27.743 回答