0

我有一个带有一组 PTransforms 的管道,我的方法变得很长。

我想将我的 DoFns 和我的复合变换写在一个单独的包中,然后在我的 main 方法中使用它们。使用 python 非常简单,我如何使用 Scio 来实现呢?我没有看到任何这样做的例子。:(

     withFixedWindows(
        FIXED_WINDOW_DURATION,
        options = WindowOptions(
          trigger = groupedWithinTrigger,
          timestampCombiner = TimestampCombiner.END_OF_WINDOW,
          accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
          allowedLateness = Duration.ZERO
        )
      )
      .sumByKey
      // How to write this in an another file and use it here?
      .transform("Format Output") {
        _
          .withWindow[IntervalWindow]
          .withTimestamp
      }
4

3 回答 3

1

我认为解决此问题的一种方法可能是在另一个包中定义一个对象,然后在该对象中创建一个方法,该方法具有转换所需的逻辑。例如:

def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaulTopic = "tweets"
    val input = args.getOrElse("inputTopic", defaulTopic)
    val output = args("outputTopic")

    val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
      .withName("map to tweet class").map(x => {parse(x).extract[Tweet]})

    inputStream
      .flatMap(sentiment.predict) // object sentiment with method predict

  }
object sentiment  {

  def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
    val data = tweet.text
    val emptyCase = Some("")
    Some(data) match {
      case `emptyCase` => None
      case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
    }

  }

另请参阅此链接以获取Scio 示例中给出的示例

于 2020-03-08T07:46:14.727 回答
1

您可以使用map函数来映射您的元素示例

您可以传递来自另一个类示例的方法引用,而不是传递 lambda.map(MyClass.MyFunction)

于 2019-08-26T21:18:22.277 回答
1

如果我正确理解您的问题,您希望将您的map, groupBy, ...转换捆绑在一个单独的包中,并在您的主管道中使用它们。

一种方法是使用applyTransform,但你最终会使用 PTransforms,它不是 scala 友好的。

您可以简单地编写一个接收 SCollection 并返回转换后的函数,例如:

def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???

但是如果你打算编写自己的 Source/Sink,请查看ScioIO 类

于 2019-11-21T19:40:55.117 回答