我在使用协议缓冲区时正在处理的用例之一是反序列化我在消费者端收到的协议缓冲区 Kafka 消息(使用 sarama 库和 Go)。
我目前的做法是我定义了示例 pixel.proto 文件,如下所示。
syntax = "proto3";
package saramaprotobuf;
message Pixel {
// Session identifier stuff
string session_id = 2;
}
我正在通过 sarama.Producer 发送消息(通过编组)接收它 sarama.Consumer (通过引用已编译的 pixel.proto.pb 来解组消息)。代码如下。
import (
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"log"
"os"
"os/signal"
"protobuftest/example"
"syscall"
"time"
)
func main() {
topic := "test_topic"
brokerList := []string{"localhost:9092"}
producer, err := newSyncProducer(brokerList)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
elliot := &example.Pixel{
SessionId: t.String(),
}
pixelToSend := elliot
pixelToSendBytes, err := proto.Marshal(pixelToSend)
if err != nil {
log.Fatalln("Failed to marshal example:", err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(pixelToSendBytes),
}
producer.SendMessage(msg)
log.Printf("Pixel sent: %s", pixelToSend)
}
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
partitionConsumer, err := newPartitionConsumer(brokerList, topic)
if err != nil {
log.Fatalln("Failed to create Sarama partition consumer:", err)
}
log.Println("Waiting for messages...")
for {
select {
case msg := <-partitionConsumer.Messages():
receivedPixel := &example.Pixel{}
err := proto.Unmarshal(msg.Value, receivedPixel)
if err != nil {
log.Fatalln("Failed to unmarshal example:", err)
}
log.Printf("Pixel received: %s", receivedPixel)
case <-signals:
log.Print("Received termination signal. Exiting.")
return
}
}
}
func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// TODO configure producer
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
return nil, err
}
return producer, nil
}
func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) {
conf := sarama.NewConfig()
// TODO configure consumer
consumer, err := sarama.NewConsumer(brokerList, conf)
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
return nil, err
}
return partitionConsumer, err
}
如您所见,在代码中,我已导入 .proto 文件并在主函数中引用它,以便发送和接收消息。这里的问题是,解决方案不是通用的。我会在消费者端收到不同的 .proto 类型的消息。
我怎样才能使它通用?我知道作为 protobuf 的一部分,有一种叫做自描述消息(动态消息)的东西。我提到了这个链接https://developers.google.com/protocol-buffers/docs/techniques?csw=1#self-description。但它没有任何解释如何将其嵌入为 pixel.proto 的一部分(我使用过的示例),以便在消费者端我直接将其反序列化为所需的类型。