0

我正在尝试反序列化值以恢复 Kafka 消息值。但我找不到合适的反序列化器。我收到以下值:

AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2

但是当我试图通过以下方式获取反序列化消息的字符串表示时:

        var bytes = Convert.FromBase64String("AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2");
        var jsonBack = Encoding.UTF8.GetString(bytes);

而且它只能部分反序列化它。带有额外的符号。我找不到任何关于它如何序列化的文档,也看不到它的结构。有没有人在使用 AWS Lambda 和 .Net 的 MSK 触发器时看到同样的情况

下面我给出一个来自 aws 文档的消息示例。但是没有关于价值保留的消息(消息的键和值以及形式是什么)

MSK 事件负载

4

1 回答 1

2

我创建了一个用于此的类。当我开始使用这个时,我遇到了类似的问题,因为 AWS 只给出了节点 JS 的例子。所以从那里我创建了下面的类,它工作得很好。试试看。使用此类作为接收到您的 lambda 函数的有效负载类型。因此,您的处理程序签名如下所示。请注意,您需要对下面“KafkaMessage”类中的“body”属性进行 base64decode。

public async Task FunctionHandler(MSKEvent evnt, ILambdaContext context)
{
//do some magic.
}

/// <summary>
/// Represents an MSK event. MSK event is the event when lambda is triggered via a kafka topic.
/// </summary>
public class MSKEvent
{
    /// <summary>
    /// The source of the event.
    /// </summary>
    public string EventSource { get; set; }

    /// <summary>
    /// The AWS arn of the event source.
    /// </summary>
    public string EventSourceArn { get; set; }

    /// <summary>
    /// The collection of records.
    /// </summary>
    public Dictionary<string, IEnumerable<KafkaMessage>> Records { get; set; }
}

/// <summary>
/// Represents a kafka message.
/// </summary>
public class KafkaMessage
{
    /// <summary>
    /// The kafka topic name this message belongs to.
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// The kafka partition this message belongs to.
    /// </summary>
    public int Partition { get; set; }

    /// <summary>
    /// The offset of this message.
    /// </summary>
    public int Offset { get; set; }

    /// <summary>
    /// The created timestamp in unix ms of this message.
    /// </summary>
    public long Timestamp { get; set; }

    /// <summary>
    /// The timestamp type.
    /// </summary>
    public string TimestampType { get; set; }

    /// <summary>
    /// Base64 encoded message body. Base64 decode this field to obtain the actual message.
    /// </summary>
    [JsonProperty("value")]
    public string Body { get; set; }
}
于 2021-06-25T11:30:32.793 回答