2

我已经在 Windows 机器中的 Eclipse 中编写了 kafka 生产者并进行了分区。我的 Kafka 集群在 ec2 linux 中运行。我能够从 Eclipse 执行 kafaka 生产者代码,但我没有看到 ec2 框中的主题。

Produce code :
package com.panda.kafka.training;
import java.util.*;   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class PandaKafkaProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]); 
        Random rnd = new Random(); 
        Properties props = new Properties();
        props.put("metadata.broker.list", "ec2-xx-yy-zzz-212.compute-1.amazonaws.com:9092"); 
        //props.put("producer.type", "sync"); 
        props.put("serializer.class", "kafka.serializer.StringEncoder"); 
        props.put("partitioner.class", "com.panda.kafka.training.PandaKafkaPartitioner");
        props.put("request.required.acks", "1"); 
        props.put("producer.type","async");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String,
                String>(config); 
        for (long nEvents = 0; nEvents < events; nEvents++) 
        { System.out.println("creating event "+nEvents); 
        long runtime = new Date().getTime(); 
        String ip = "192.168.2."+ rnd.nextInt(255); 
        String msg = runtime + ",www.vulab.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("vulab123", ip, msg);
        producer.send(data); 
        } 
        producer.close(); 
    }

}


Server Properties file:
# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=ec2-xx-yy-zzz-212.compute-1.amazonaws.com

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=0000


Producer properties file:
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=52.2.202.212:0000

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

Output from Producer in ec2:
[ec2-user@ip-xxxx bin]$ sh kafka-console-producer.sh --broker-list xxxx:yyy.zzzz:9092 --topic vulab123
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

任何详细的解释都会有很大帮助。

4

1 回答 1

3

请找到详细答案。我有一台安装了 Eclipse 的 Windows 机器,并且我为 Kafka 创建了 Maven 项目。我想从 Windows Eclipse 向 kafaka 集群(EC2)发送一些消息。我有一台机器做所有卡夫卡的事情。

注意:Java 生产者代码。您必须确保此处提及完整的 DNS 名称。

props.put("metadata.broker.list", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092"); 

props.put("advertised.host.name", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092");  

步骤1:

I started a new EC2 Linux machine and installed Kafka.
wget http://mirror.sdunix.com/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz - See more at: http://vulab.com/blog/?p=576#sthash.GiMHbvXm.dpuf

tar -xzf kafka_2.9.2-0.8.1.1.tgz

第2步:

  1. 允许 ssh 和 EC2 安全组中入站规则中的所有流量。

  2. 在安装了 eclipse 的 windows 机器的 /etc/hosts/ 文件中添加公共 ip (EC2)。

第三步:

Modified the kafka server properties files. I put the exact DNS name.
    # The port the socket server listens on
    port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com
zookeeper.connect=ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181

第4步:

执行的命令

    1. sh zookeeper-server-start.sh /home/ec2/user/kafka/kafka/config/zookeeper.properties

    2. sh kafka-server-start.sh /home/ec2-user/kafka/kafka/config/server.properties

    3 .sh kafka-topics.sh --create --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --replication-factor 1 --partitions 1 --topic spanda20

    4. sh kafka-console-producer.sh --broker-list ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092 --topic spanda20--sync 

    5.sh kafka-console-consumer.sh --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --topic spanda20 --from-beginning

我可以看到消费者中的所有消息。

于 2015-07-20T17:56:05.163 回答