问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
352 浏览

scala - Scala宏:分配给变量的字符串文字不匹配

我正在使用来自 Spotify 的 Scio 库的宏注释。我想定义一个String类型的变量并像这样注释:

这不会编译,但是,如果我String直接注释,它可以工作:

看代码,这个匹配到这里就完成了,本质上代码如下:

问题是为什么这与变量不匹配,但字符串匹配?如果有任何方法可以使第一个示例工作?

0 投票
1 回答
1610 浏览

scala - Scio / apache beam java.lang.IllegalArgumentException:无法序列化方法

我正在尝试使用数据流将一些数据从 pub sub 移动到云存储。我需要为 scio / beam 提供时间戳,以便它可以将数据分组到窗口中。

我有一个简单的案例类来模拟我的事件,它看起来像这样(删除了一些字段)

我的管道由此开始。pub sub 中的事件为 json 格式,我使用 json4s 反序列化:

在同一范围内,我定义了 Json4s 的隐式格式

我正在使用支持 jodatime 的 json4s.ext,请注意案例类中的日期时间是 joda 时间。这个扩展库似乎有一些问题,因为我得到以下异常:

我尝试了一项工作,我在其中创建了 timestampUtc 和 publishedUtc 字符串的类型,并像这样解析管道内的字符串:

但我得到一个类似的例外:

为什么会发生这种情况,我该如何解决?

谢谢

0 投票
0 回答
1145 浏览

scala - Dataflow / apache beam 在窗口中的字节数上触发窗口

我有一个简单的工作,将数据从 pub sub 移动到 gcs。发布子主题是一个共享主题,具有许多不同大小的不同消息类型

我希望结果相应地在 GCS 垂直分区中:

架构/版本/年/月/日/

在该父键下应该是当天的一组文件,并且文件应该是合理的大小,即 10-200 mb

我正在使用 scio,并且我能够进行 groupby 操作来创建 [String, Iterable[Event]] 的 P/SCollection,其中键基于上述分区方案。

我无法使用默认文本接收器,因为它们不支持垂直分区,它只能将整个 pcollection 写入一个位置。而是遵循以下答案中的建议:

如何在 Apache Beam 中写入多个文件?

使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage

我创建了一个简单的函数,将我的组写入 gcs。

这可以工作并以我喜欢的方式写入文件,我使用以下带有固定窗口的触发规则:

基本上是在窗口结束时根据水印或收到 x 个元素时的复合触发。

问题在于源数据可能包含大小不一的消息。因此,如果我选择要触发的固定数量的元素,我将:

1)选择一个太大的数字,对于较大的事件组,它会炸毁工人上的 Java 堆 2)选择一个较小的数字,然后我最终会得到一些小文件用于安静的事件,我想在其中积累更多的事件在我的档案中。

我没有看到一个自定义触发器,我可以在其中传递一个 lambda,它输出每个元素的度量或类似的东西。有没有办法我可以实现自己的触发器来触发窗口中的字节数。

其他一些问题

我是否正确假设每个组中元素的迭代器在内存中而不是从存储中流式传输?如果不是,我可以以更节省内存的方式从迭代器流式传输到 gcs

对于我的 GCS 作家,我只是在地图或 ParDo 中进行操作。它没有实现文件输出接收器或看起来像 TextIo 的任何东西。这个简单的实现会不会有问题。在文档中它说,如果转换引发异常,它会被简单地重试(对于流媒体应用程序来说是不确定的)

0 投票
1 回答
330 浏览

xml - 使用 Spotify Scio(Apache Beam 的 Scala 包装器)处理 XML 文件

Apache beam java sdk 支持读取大型 xml 输入文件,带有 org.apache.beam.sdk.io.xml.XmlIO(我查看了 2.1.0 版本)

有谁知道 Scio 是否允许你做同样的事情或有一个例子?我有一组非常大的 xml 文件要处理。

0 投票
1 回答
108 浏览

scala - 如何在 GCP 中配置 Scio Scala API 的第三方库

有一个第三方 Scio 客户端库,它为 Cloud Dataflow 提供了一个 Scala API,以便访问 Cloud Bigtable。所以,在这个过程中,我无法在 GCP 中配置 Scala API。请帮忙。

链接:https ://cloud.google.com/bigtable/docs/third-party-clients

0 投票
0 回答
197 浏览

google-cloud-dataflow - 使用 Scio 和 BigQuery 时,简单的 Dataflow 作业卡住了

(4ea13f859044f090):工作流程失败。原因:(4ea13f859044f04d):数据流似乎卡住了。请通过http://stackoverflow.com/questions/tagged/google-cloud-dataflow与 Dataflow 团队联系。

我不知道为什么我的工作失败了,你的日志里有什么东西吗?

该作业对 BQ 表执行简单的 BQ 查询,然后尝试使用 Scio 4.4 将输出写入另一个 BQ 表

0 投票
1 回答
715 浏览

google-cloud-dataflow - 如何在 apache 梁/数据流中对重叠的滑动窗口进行重复数据删除

我有以下要求:

  • 从发布子主题中读取事件
  • 采取持续时间 30 分钟和周期 1 分钟的窗口
  • 在该窗口中,如果给定 id 的 3 个事件都匹配某个谓词,那么我需要在不同的 pub 子主题中引发事件
  • 该事件应在第三个事件进入分组 ID 时立即引发,因为这是用于检测欺诈行为。在一个窗格中,有许多 id 有 3 个与我的谓词匹配的事件,因此我可能需要在每个窗格中发出多个事件

我能够编写一个使用 PCollection 的函数,它会根据我的业务逻辑进行必要的分组、逻辑和过滤并发出事件。

问题:

  1. 由于重叠的滑动窗口,输出 PCollection 包含重复项。我知道这是滑动窗口的预期行为,但我怎样才能避免这种情况,同时保持在同一个数据流管道中。我意识到我可以在外部系统中进行重复数据删除,但这只会增加我的系统的复杂性。
  2. 我还需要编写某种触发器,每次在窗口中达到我的条件时都会触发
  3. 数据流是否适合这种类型的实时检测场景

非常感谢

0 投票
1 回答
2064 浏览

google-cloud-dataflow - 如何使用 scio 在谷歌数据流作业中设置标签?

我想为谷歌数据流作业设置标签以用于成本分配。这是一个工作 Java 代码的示例:

setLabels:方法文档链接

有人可以帮忙看看 scio / scala 的例子吗?我检查了几个 scio + google 数据流示例,但找不到任何有用的东西。

其他选项是在 mvn / gradle 命令中提供标签参数,如下所示:

我不确定传递标签={a:b} 是否是正确的语法。

有什么帮助吗?谢谢

0 投票
1 回答
1091 浏览

google-cloud-dataflow - 数据流作业因“处理暂停”而停止

通过使用会话窗口运行具有相当高级组的流式数据流管道,我在运行几个小时后遇到了问题。这项工作在工人中扩大规模,但后来开始使用以下内容获取日志负载

记录此代码的转换就在“group by”块之后,并对scala.concurrent.{Await/Promise}外部服务执行异步 HTTP 调用(使用 )。

任何想法为什么会发生这种情况?与异步、扩展或按策略分组有关?

  • 职位编号:2018-01-29_03_13_40-12789475517328084866
  • SDK:适用于 Java 2.2.0 的 Apache Beam SDK
  • Scio 版本:0.4.7
0 投票
1 回答
387 浏览

scala - 在 Scala 中建立与 Google Cloud Bigtable 的单例连接,类似于 Cassandra

我正在尝试使用谷歌云服务实现一个实时推荐系统。我已经使用 Kafka、Apache Storm 和 Cassandra 构建了引擎,但我想使用 Cloud Pub/Sub、Cloud Dataflow 和 Cloud Bigtable 在 Scala 中创建相同的引擎。

到目前为止,在 Cassandra 中,由于我在 Apache Storm bolt 操作期间多次读写,我已经实现了以下连接器MyDatabase.scala,它启动与数据库的单例连接,并在 bolt 内使用此连接来读取和更新用户使用来自 Kafka spout 的流数据的表。我为 Cassandra 使用了 Phantom Scala API 驱动程序。

MyDatabase.scala

用户.scala

数据流管道将如下所示:

  1. 从 Pub/Sub 提取流数据。
  2. 在单个 parDo 中实现逻辑,我们将使用从 Pub/Sub 提取的数据生成的一些新值更新 Bigtable 中的多个表。

当您使用 Phantom DSL 时,创建与 Cassandra 的连接非常简单。我的问题是,是否有任何等价的库,例如 Google Cloud Bigtable 的 Phantom,或者使用 Google Cloud API 和 Scio 实现此功能的正确方法是什么(因为我将使用 Scala 实现 Dataflow 管道)。似乎我找不到任何相关示例来建立与 Bigtable 的连接并在 Scala 的 Dataflow 管道中使用此连接。

谢谢