问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
929 浏览

scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错

使用 SCIO fromspotify为 编写作业Dataflow,遵循 2 个示例e.g1e.g2PubSub将流写入到GCS,但以下代码出现以下错误

错误

代码

我可能将窗口概念与 Bounded PCollection 混合在一起,有没有办法实现这一点,或者我需要应用一些变换来实现这一点,任何人都可以在这方面提供帮助

0 投票
1 回答
3865 浏览

google-bigquery - Google Pub/Sub to Dataflow,避免与记录 ID 重复

我正在尝试构建一个流式数据流作业,它从 Pub/Sub 读取事件并将它们写入 BigQuery。

根据文档,如果使用记录 ID,Dataflow 可以检测重复的消息传递(请参阅:https ://cloud.google.com/dataflow/model/pubsub-io#using-record-ids )

但即使使用这个记录 ID,我仍然有一些重复(大约 0.0002%)。

我错过了什么 ?

编辑:

我使用Spotify Async PubSub Client发布带有以下 snipplet 的消息:

然后我使用Spotify scio从 pub/sub 读取消息并将其保存到 DataFlow:

窗口大小为 1 分钟。

在注入消息几秒钟后,我已经在 BigQuery 中有重复消息。

我使用此查询来计算重复项:

而这个来看看他们:

0 投票
1 回答
978 浏览

google-cloud-platform - 在 Google Cloud Dataflow 中按顺序读取文件

我正在使用Spotify Scio读取从 Stackdriver 导出到 Google Cloud Storage 的日志。它们是 JSON 文件,其中每一行都是一个条目。查看工作日志,文件似乎被分成块,然后以任何顺序读取。在这种情况下,我已经将我的工作限制为 1 名工人。有没有办法强制按顺序读取和处理这些块?

举个例子(textFile 基本上是一个 TextIO.Read):

将根据工作日志产生与此类似的输出:

我想知道的是是否有办法强制它按顺序读取第 1-12 行。我发现压缩文件并使用指定的 CompressionType 读取它是一种解决方法,但我想知道是否有任何方法可以做到这一点,而不涉及压缩或更改原始文件。

0 投票
1 回答
1711 浏览

google-cloud-dataflow - 数据流触发 AfterProcessingTime.pastFirstElementInPane() 如何工作?

在 Dataflow 流媒体世界中。

当我说:

是在一小时的固定窗口内,触发器在看到第一个元素后等待或批处理元素。

但是当我说:

它是从第一次看到第一个元素开始每次触发还是隐式批处理元素?因为在每个元素上触发都会使系统过载。

0 投票
2 回答
660 浏览

google-cloud-dataflow - 如何将 SCollection 元素保存到不同的 BigQuery 表?

我需要SCollection根据时间戳将元素保存到不同的每小时 BigQuery 表中。我尝试了以下方法 -

  1. 对元素进行分组(TableName, Iterable[TableRow]),然后使用实例将每个元素保存Iterable[TableRow]到各自的表中。BigQueryClient这不起作用,因为BigQueryClient不可序列化。

  2. 创建一个SCollection[TableName, PCollection[TableRow]],然后PCollection[TableRow]使用BigQueryIO.Write. 要创建PCollection[TableRow]我使用的对象.map(s => (s._1, sc.pipeline.apply(Create.of(s._2.toList.asJava))))scScioContext 的实例在哪里。这不起作用,因为ScioContext不可序列化。

有没有办法将插入元素流式传输到不同的 BigQuery 表中?

0 投票
2 回答
989 浏览

apache-beam - Scio:使用 Pub/Sub 作为集合源时 groupByKey 不起作用

我将WindowsWordCount 示例程序的源代码从文本文件更改为云 Pub/Sub,如下所示。我将莎士比亚文件的数据发布到 Pub/Sub,这些数据确实被正确获取,但之后的任何转换.groupByKey似乎都不起作用。

0 投票
1 回答
1484 浏览

scala - 数据流模板的功能不一致?

当我创建数据流模板时,运行时参数的特征不会保留在模板文件中。在运行时,如果我尝试为此参数传递一个值,我会收到 400 错误

我正在使用 Scio 0.3.2、scala 2.11.11 和 apache 梁(0.6)。

我的参数如下:

他们使用此代码注册

要创建模板,我使用以下参数调用 sbt:

如果我显式传递 --input,它会变成一个 StaticValue 而不是 RuntimeValue,而这一次,我可以在模板文件中看到它。

该模板是从一个谷歌函数中调用的,该函数观察一个桶存储(灵感来自https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/):

400 错误:

当我尝试这个时同样的错误:

=>

当前设置是:

环境

环境

0 投票
1 回答
429 浏览

scala - 将 HBase 扫描转换为 RowFilter

我正在使用来自 spotify 的scio来完成我的 Dataflow 作业。在上一个 scio 版本中,使用了新的 bigtable java api (com.google.bigtable.v2)

现在scio bigtable 入口点需要“RowFilter”来过滤而不是 Hbase“Scan”。有没有一种简单的方法可以将“Scan”转换为“RowFilter”?我在源代码中寻找适配器,但我不确定如何使用它。我没有找到可以轻松从 hbase api 迁移到“新”api 的文档。

我在需要转换的代码中使用的简单扫描:

0 投票
2 回答
164 浏览

scala - 尽管有阴影,但由多个 protobuf 2/3 依赖项引起的 SBT 验证错误

我正在努力使用 Scio/Bigtable/HBase 验证以下示例项目的错误。依赖树需要 protobuf 版本(2.5、2.6.1、3.0、3.1)并且似乎默认为 3.2。我使用了 的着色组件sbt-assembly,不确定我是否正确。

我的build.sbt

我的Main.scala

输出到:

我究竟做错了什么 ?

谢谢你的帮助

0 投票
0 回答
984 浏览

google-cloud-dataflow - Scio:如何使用 Apache Beam 合并从 cloud pub sub 发送的消息?

我目前正在使用 Apache Beam 的 scala 包装库 scio。您要做的是根据 ID 组合从 CloudPubSub 发送的不同类型的消息。

消息A每秒发送一次,消息B每三秒发送一次。当我收到消息 B 时,我想将收到的消息 A 中具有相同 ID 的消息合并。

消息示例)

当前代码

这两行都执行到 keyBy 行。但是,加入后打印不会打印任何内容。没有错误等...

麻烦。我在等一个答案...

(控制台日志)</p>

窗口处理似乎每 10 秒进行一次,但处理时间却分崩离析。此外,我发现如果我使用 DataflowRunner 而不是 DirectRunner 启动它,它会成功。