0

有没有可能在芭蕾舞演员中做到这一点

  1. 在芭蕾舞女演员中创建一个新的 kafka 主题
  2. 列出 Ballerina 中的可用主题
  3. 订阅芭蕾舞女演员中创建的主题
4

2 回答 2

0

编辑:更新示例代码以符合最新的芭蕾舞演员版本(从 V1.2.0 向上)。

你可以

  1. 创建一个新主题

如果您使用 a 发送数据Kafka producer,它将向该特定主题发布数据,如果该主题不可用,它将创建该主题并发布。(要支持这一点,您必须auto.create.topics.enable=true在代理属性中进行设置)。

考虑您想test从生产者发布到主题。您可以创建一个名为的生产者端点,并使用函数kafka:Producer将数据发送到特定主题。send()

kafka:Producer sampleProducer = new ({
  bootstrapServers: "localhost:9092",
  acks: "all",
  valueSerializerType: kafka:SER_STRING
});

string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`

现在,如果托管在 的 Kafka 代理有一个名为test可用的主题localhost:9092,它会将消息发布到该主题,或者如果该主题不存在,它将创建该主题。

  1. 订阅新主题

您可以使用subscribe()的功能Kafka:Consumer订阅主题。

listener kafka:Consumer sampleConsumer = new ({
  bootstrapServers: "localhost:9090",
  groupId: "test-consumers",
  valueDeserializerType: kafka:DES_STRING
});

string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);

请注意,subscribe()需要string[]作为输入参数,因此您应该将 a 传递string[]给它。

还有其他功能,例如subscribeToPattern()subscribeWithPartitionRebalance()也可用于为消费者订阅主题,您可以在API 文档中找到有关它们的更多信息。

但要列出可用主题,您需要从 Kafka 代理本身获取主题列表。但是您可以使用 ballerina 获取当前由特定消费者订阅的主题列表。

string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
  // Your logic for handling the error
} else {
    subscribedTopics = result;
}

确保在此处处理错误,因为getSubscription()可以返回 astring[]或 a error。芭蕾舞女演员型后卫可以为您解决问题。

于 2018-10-16T05:18:01.980 回答
0

您可以使用以下代码订阅主题:

import ballerina/log;
import wso2/kafka;
import ballerina/internal;

// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092, localhost:9093",
    // Consumer group ID
    groupId: "test-group",
    // Listen from topic 'test'
    topics: ["test"],
    // Poll every 1 second
    pollingInterval:1000
};

// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
    // Triggered whenever a message added to the subscribed topic
    onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach entry in records {
            byte[] serializedMsg = entry.value;
            // Convert the serialized message to string message
            string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
            log:printInfo("New message received from the product admin");
            // log the retrieved Kafka record
            log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
            // Mock logic
            // Update the database with the new price for the specified product
            log:printInfo("Database updated with the new price of the product");
        }
    }
}

这个 Github repo可能对你很有用。它包含消费者和生产者的各种示例。

关于创建和列出主题的问题,如果您不需要从 Ballerina 执行这些操作,您可以从命令行执行:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
于 2018-10-08T11:23:23.303 回答