1

我是 kafka 的新手,我想尝试创建主题并从我的 .net 应用程序向 kafka 发送消息。我正在使用 kafka.net dll 并使用以下代码成功创建主题:

        Uri uri = new Uri("http://localhost:9092");

        string topic = "testkafka";

        string payload = "test msg";

        var sendMsg = new Thread(() =>

        {

            KafkaNet.Protocol.Message msg = new KafkaNet.Protocol.Message(payload);

            var options = new KafkaOptions(uri);

            var router = new KafkaNet.BrokerRouter(options);

            var client = new Producer(router);

            client.SendMessageAsync(topic, new List<KafkaNet.Protocol.Message> { msg }).Wait();

        });

        sendMsg.Start();

但我看不到任何消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testkafka --from-beginning

谁能帮我举个例子?谢谢。

4

1 回答 1

1

对于这两种操作,您都可以使用cofluent-kafka-dotnet客户端。


为了以编程方式创建主题

static async Task CreateTopicAsync(string bootstrapServers, string topicName) {

    using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
        try {
            await adminClient.CreateTopicsAsync(new TopicSpecification[] { 
                    new TopicSpecification { Name = 'myTopicName', ReplicationFactor = 1, NumPartitions = 1 } });
            } 
            catch (CreateTopicsException e) {
                Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
            }
        }
    }

为了产生消息:

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{
    public static async Task Main(string[] args)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        // If serializers are not specified, default serializers from
        // `Confluent.Kafka.Serializers` will be automatically used where
        // available. Note: by default strings are encoded as UTF8.
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync("myTopicName", new Message<Null, string> { Value="test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }
}
于 2020-05-18T08:30:41.987 回答