有没有可能在芭蕾舞演员中做到这一点
- 在芭蕾舞女演员中创建一个新的 kafka 主题
- 列出 Ballerina 中的可用主题
- 订阅芭蕾舞女演员中创建的主题
有没有可能在芭蕾舞演员中做到这一点
编辑:更新示例代码以符合最新的芭蕾舞演员版本(从 V1.2.0 向上)。
你可以
如果您使用 a 发送数据
Kafka producer
,它将向该特定主题发布数据,如果该主题不可用,它将创建该主题并发布。(要支持这一点,您必须auto.create.topics.enable=true
在代理属性中进行设置)。考虑您想
test
从生产者发布到主题。您可以创建一个名为的生产者端点,并使用函数kafka:Producer
将数据发送到特定主题。send()
kafka:Producer sampleProducer = new ({
bootstrapServers: "localhost:9092",
acks: "all",
valueSerializerType: kafka:SER_STRING
});
string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`
现在,如果托管在 的 Kafka 代理有一个名为
test
可用的主题localhost:9092
,它会将消息发布到该主题,或者如果该主题不存在,它将创建该主题。
您可以使用
subscribe()
的功能Kafka:Consumer
订阅主题。
listener kafka:Consumer sampleConsumer = new ({
bootstrapServers: "localhost:9090",
groupId: "test-consumers",
valueDeserializerType: kafka:DES_STRING
});
string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);
请注意,
subscribe()
需要string[]
作为输入参数,因此您应该将 a 传递string[]
给它。还有其他功能,例如
subscribeToPattern()
,subscribeWithPartitionRebalance()
也可用于为消费者订阅主题,您可以在API 文档中找到有关它们的更多信息。
但要列出可用主题,您需要从 Kafka 代理本身获取主题列表。但是您可以使用 ballerina 获取当前由特定消费者订阅的主题列表。
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
// Your logic for handling the error
} else {
subscribedTopics = result;
}
确保在此处处理错误,因为
getSubscription()
可以返回 astring[]
或 aerror
。芭蕾舞女演员型后卫可以为您解决问题。
您可以使用以下代码订阅主题:
import ballerina/log;
import wso2/kafka;
import ballerina/internal;
// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "test-group",
// Listen from topic 'test'
topics: ["test"],
// Poll every 1 second
pollingInterval:1000
};
// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
// Triggered whenever a message added to the subscribed topic
onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach entry in records {
byte[] serializedMsg = entry.value;
// Convert the serialized message to string message
string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
log:printInfo("New message received from the product admin");
// log the retrieved Kafka record
log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
// Mock logic
// Update the database with the new price for the specified product
log:printInfo("Database updated with the new price of the product");
}
}
}
这个 Github repo可能对你很有用。它包含消费者和生产者的各种示例。
关于创建和列出主题的问题,如果您不需要从 Ballerina 执行这些操作,您可以从命令行执行:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test