1

我已启用将日志导出到 pub 子主题。我正在使用数据流来处理这些日志并将相关列存储在 BigQuery 中。有人可以帮助将 pubsub 消息有效负载转换为LogEntry对象。我尝试了以下代码:

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
    PubsubMessage pubsubMessage = c.element();

    ObjectMapper mapper = new ObjectMapper();

    byte[] payload = pubsubMessage.getPayload();
    String s = new String(payload, "UTF8");
    LogEntry logEntry = mapper.readValue(s, LogEntry.class);
}

但我收到以下错误:

com.fasterxml.jackson.databind.JsonMappingException: Can not find a (Map) Key deserializer for type [simple type, class com.google.protobuf.Descriptors$FieldDescriptor]

编辑:我尝试了以下代码:

try {
        ByteArrayInputStream stream = new ByteArrayInputStream(Base64.decodeBase64(pubsubMessage.getPayload()));
        LogEntry logEntry = LogEntry.parseDelimitedFrom(stream);
        System.out.println("Log Entry = " + logEntry);
    } catch (InvalidProtocolBufferException e) {
        e.printStackTrace();
    }

但我现在收到以下错误:

com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

4

1 回答 1

0

JSON 格式解析器应该能够做到这一点。Java 不是我的强项,但我认为您正在寻找类似的东西:

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
  LogEntry.Builder entryBuilder = LogEntry.newBuilder();
  JsonFormat.Parser.usingTypeRegistry(
      JsonFormat.TypeRegistry.newBuilder()
          .add(LogEntry.getDescriptor())
          .build())
      .ignoringUnknownFields()
      .merge(c.element(), entryBuilder);
  LogEntry entry = entryBuilder.build();
  ...
}

您可能无需注册类型就可以逃脱。我认为在 C++ 中,原型类型被链接到一个全局注册表中。

您将需要“ignoringUnknownFields”,以防服务添加新字段并导出它们并且您尚未更新原始描述符的副本。导出的 JSON 中的任何“@type”字段也会导致问题。

您可能需要对有效负载进行特殊处理(即从 JSON 中剥离 if 然后单独解析它)。如果是 JSON,我希望解析器尝试填充不存在的子消息。如果它是 proto ......如果您也注册Any类型,它实际上可能会起作用。

于 2017-12-01T15:11:36.813 回答