0

以下是我的输出消息格式:

message EditorialTextAdEnforcementData {
  int32 customerId = 1;
  int32 source = 2;
  DecisionDetails decisionDetails = 3;
  int32 flagsEnforceOption = 4;
  int32 categoryEnforceOption = 5;
  int32 applyBypass = 6;
  map<int32, string> categories = 7;
  bcl.DateTime rowDateSource = 8;
  int32 accountId = 9;
  int64 adId = 10;
  int64 orderId = 11;
  int32 adType = 12;
  int32 campaignType = 13;
  bool hasImage = 14;
  bool isNewAdType = 15;
}

在准备输出数据集时,我使用以下方法来包含一个虚拟地图

 val output =...
        .withColumn(UC.Categories, map(lit("1"), lit("test"))).as[EditorialTextAdEnforcementData]
 output.show()

在这一步之前它工作正常。但是,当我打电话时:

output.map(_.toByteString).show()

我收到以下错误:

Exception in thread "main" scala.MatchError: MapType(IntegerType,StringType,false) (of class org.apache.spark.sql.types.MapType)
    at org.apache.spark.sql.catalyst.expressions.objects.MapObjects.doGenCode(objects.scala:836)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
    at scala.Option.getOrElse(Option.scala:121)

如果我删除类别字段,它似乎工作正常。请让我知道如何将 MapType 字段编写为 protobuf 消息

4

1 回答 1

0

这是在https://github.com/scalapb/sparksql-scalapb/issues/79下跟踪的未解决问题)。

作为一种解决方法,我们可以使用 map 作为消息内的封装结构字段,如下所示:https ://developers.google.com/protocol-buffers/docs/proto3#backwards_compatibility 。如下:

message Categories {
  int32 key = 1;
  string value = 2;
}

message EditorialTextAdEnforcementData {
  int32 customerId = 1;
  int32 source = 2;
  repeated DecisionDetails decisionDetails = 3;
  int32 flagsEnforceOption = 4;
  int32 categoryEnforceOption = 5;
  int32 applyBypass = 6;
  repeated Categories categories = 7;
  bcl.DateTime rowDateSource = 8;
  int32 accountId = 9;
  int64 adId = 10;
  int64 orderId = 11;
  int32 adType = 12;
  int32 campaignType = 13;
  bool hasImage = 14;
  bool isNewAdType = 15;
}
于 2020-07-17T02:02:53.853 回答