0

我尝试运行一个简单的 SCIO 代码。

case class Foo(first: String, second: String, third: String)

尝试Foo在 a中使用SCollection它会导致错误:

Cannot find an implicit Coder instance for type:

  >> (String, my.Foo)
...
<redacted prose>
...

错误信息中写了很多。我似乎无法理解,尽管所有这些文本如何为我应该围绕 Coders 做的事情提供任何信息来解决这个问题。

任何人都可以阐明如何解决这个问题。

不使用implicits的奖励积分。

4

1 回答 1

0

这是一个简约的例子

case class DemoEvents (reverse_geo_code_failed:String
                         ,eventType:String
                         ,city:String
                         ,area:String
                        )

class DoFnExample extends DoFn [String, DemoEvents]{
    // `@ProcessElement` is called once per element.
    @ProcessElement
    private[example] def processElement(c: DoFn[String, DemoEvents]#ProcessContext): Unit ={
      //val t = DemoEvents("BNG1111","pickup","ORR","1111")
      implicit val formats: DefaultFormats.type = DefaultFormats
      c.output(parse(c.element()).extract[DemoEvents])

    }
  }

val eventStream: SCollection[String] = sc.pubsubSubscription[String](args("pubsub-subscription-name"))
      .withFixedWindows(Duration.standardSeconds(120L))

val events: SCollection[DemoEvents] = eventStream.applyTransform(ParDo.of(new DoFnExample()))
于 2020-03-17T15:04:53.203 回答