继续我之前的问题C# Confluent.Kafka SetValueDeserializer object deserialization,我尝试创建自定义反序列化器来反序列化 protobuf 消息,但收到此错误:

System.InvalidOperationException: 'Type is not expected, and no contract can be inferred: Ileco.Chimp.Proto.FinalValue'


return Serializer.Deserialize<T>(stream);


    class Worker
        public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
            var config = new ConsumerConfig
                BootstrapServers = brokerList,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
                SessionTimeoutMs = 30000,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = connStr,
                SslCaLocation = cacertlocation,
                GroupId = consumergroup,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
                //Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information

            using (var consumer = new ConsumerBuilder<string, FinalValue>(config)
                .SetValueDeserializer(new MyCustomDeserializer<FinalValue>())
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };


                Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);

                while (true)
                        var msg = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{msg.Message.Value}'");

                        //var bytes = Encoding.ASCII.GetBytes(msg.Message.Value);
                        //var fv = FromByteArray<ProtobufMsg>(bytes);

                        //var proto = ProtoDeserialize<ProtobufMsg>(bytes);
                    catch (ConsumeException e)
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    catch (Exception e)
                        Console.WriteLine($"Error: {e.Message}");


    public class MyCustomDeserializer<T> : IDeserializer<T>
        public T Deserialize(ReadOnlySpan<byte> data, bool isNull, Confluent.Kafka.SerializationContext context)
            using (var stream = new MemoryStream(data.ToArray()))
                return Serializer.Deserialize<T>(stream);


syntax = "proto3";

package ileco.chimp.proto;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";

message FinalValue {
  google.protobuf.Timestamp timestamp = 1;
  uint32 inputId = 2;
  google.protobuf.DoubleValue value = 3;
  uint32 sourceId = 4;
  string inputGuid = 5;


1 回答 1


正如我昨天提到的,您似乎使用了 Google .proto 处理工具 ( protoc),但使用的是 protobuf-net;如果您想使用 protobuf-net,存在与 protobuf-net 库兼容的类似命令行/IDE/build/etc 工具,或者您可以使用https://protogen.marcgravell.com/进行临时使用(以避免必须安装任何东西)。或者:继续使用 Google 架构工具,但使用 Google 库。基本上:他们需要匹配。

这里唯一的小问题是 protobuf-net 目前没有明确的内置支持DoubleValue; 供参考:这可以被认为是简单的:

namespace Google.Protobuf.WellKnownTypes
    public sealed class DoubleValue
        public double Value {get;set;}

我可能应该时间从wrappers.proto中获取所有类型并允许它们作为double?,float?long?- 但它需要一个额外的标记,因为Nullable<T>已经处理但具有不同的含义(即optional在 .proto 术语中)

于 2021-09-08T09:50:59.653 回答