问题标签 [spotify-scio]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-bigquery - Google Pub/Sub to Dataflow,避免与记录 ID 重复
我正在尝试构建一个流式数据流作业,它从 Pub/Sub 读取事件并将它们写入 BigQuery。
根据文档,如果使用记录 ID,Dataflow 可以检测重复的消息传递(请参阅:https ://cloud.google.com/dataflow/model/pubsub-io#using-record-ids )
但即使使用这个记录 ID,我仍然有一些重复(大约 0.0002%)。
我错过了什么 ?
编辑:
我使用Spotify Async PubSub Client发布带有以下 snipplet 的消息:
然后我使用Spotify scio从 pub/sub 读取消息并将其保存到 DataFlow:
窗口大小为 1 分钟。
在注入消息几秒钟后,我已经在 BigQuery 中有重复消息。
我使用此查询来计算重复项:
而这个来看看他们:
google-cloud-platform - 在 Google Cloud Dataflow 中按顺序读取文件
我正在使用Spotify Scio读取从 Stackdriver 导出到 Google Cloud Storage 的日志。它们是 JSON 文件,其中每一行都是一个条目。查看工作日志,文件似乎被分成块,然后以任何顺序读取。在这种情况下,我已经将我的工作限制为 1 名工人。有没有办法强制按顺序读取和处理这些块?
举个例子(textFile 基本上是一个 TextIO.Read):
将根据工作日志产生与此类似的输出:
我想知道的是是否有办法强制它按顺序读取第 1-12 行。我发现压缩文件并使用指定的 CompressionType 读取它是一种解决方法,但我想知道是否有任何方法可以做到这一点,而不涉及压缩或更改原始文件。
google-cloud-dataflow - 数据流触发 AfterProcessingTime.pastFirstElementInPane() 如何工作?
在 Dataflow 流媒体世界中。
当我说:
是在一小时的固定窗口内,触发器在看到第一个元素后等待或批处理元素。
但是当我说:
它是从第一次看到第一个元素开始每次触发还是隐式批处理元素?因为在每个元素上触发都会使系统过载。
google-cloud-dataflow - 如何将 SCollection 元素保存到不同的 BigQuery 表?
我需要SCollection
根据时间戳将元素保存到不同的每小时 BigQuery 表中。我尝试了以下方法 -
对元素进行分组
(TableName, Iterable[TableRow])
,然后使用实例将每个元素保存Iterable[TableRow]
到各自的表中。BigQueryClient
这不起作用,因为BigQueryClient
不可序列化。创建一个
SCollection[TableName, PCollection[TableRow]]
,然后PCollection[TableRow]
使用BigQueryIO.Write
. 要创建PCollection[TableRow]
我使用的对象.map(s => (s._1, sc.pipeline.apply(Create.of(s._2.toList.asJava))))
,sc
ScioContext 的实例在哪里。这不起作用,因为ScioContext
不可序列化。
有没有办法将插入元素流式传输到不同的 BigQuery 表中?
apache-beam - Scio:使用 Pub/Sub 作为集合源时 groupByKey 不起作用
我将WindowsWordCount 示例程序的源代码从文本文件更改为云 Pub/Sub,如下所示。我将莎士比亚文件的数据发布到 Pub/Sub,这些数据确实被正确获取,但之后的任何转换.groupByKey
似乎都不起作用。
scala - 数据流模板的功能不一致?
当我创建数据流模板时,运行时参数的特征不会保留在模板文件中。在运行时,如果我尝试为此参数传递一个值,我会收到 400 错误
我正在使用 Scio 0.3.2、scala 2.11.11 和 apache 梁(0.6)。
我的参数如下:
他们使用此代码注册
要创建模板,我使用以下参数调用 sbt:
如果我显式传递 --input,它会变成一个 StaticValue 而不是 RuntimeValue,而这一次,我可以在模板文件中看到它。
该模板是从一个谷歌函数中调用的,该函数观察一个桶存储(灵感来自https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/):
400 错误:
当我尝试这个时同样的错误:
=>
当前设置是:
环境
scala - 将 HBase 扫描转换为 RowFilter
我正在使用来自 spotify 的scio来完成我的 Dataflow 作业。在上一个 scio 版本中,使用了新的 bigtable java api (com.google.bigtable.v2)
现在scio bigtable 入口点需要“RowFilter”来过滤而不是 Hbase“Scan”。有没有一种简单的方法可以将“Scan”转换为“RowFilter”?我在源代码中寻找适配器,但我不确定如何使用它。我没有找到可以轻松从 hbase api 迁移到“新”api 的文档。
我在需要转换的代码中使用的简单扫描:
scala - 尽管有阴影,但由多个 protobuf 2/3 依赖项引起的 SBT 验证错误
我正在努力使用 Scio/Bigtable/HBase 验证以下示例项目的错误。依赖树需要 protobuf 版本(2.5、2.6.1、3.0、3.1)并且似乎默认为 3.2。我使用了 的着色组件sbt-assembly
,不确定我是否正确。
我的build.sbt:
我的Main.scala:
输出到:
我究竟做错了什么 ?
谢谢你的帮助
google-cloud-dataflow - Scio:如何使用 Apache Beam 合并从 cloud pub sub 发送的消息?
我目前正在使用 Apache Beam 的 scala 包装库 scio。您要做的是根据 ID 组合从 CloudPubSub 发送的不同类型的消息。
消息A每秒发送一次,消息B每三秒发送一次。当我收到消息 B 时,我想将收到的消息 A 中具有相同 ID 的消息合并。
消息示例)
当前代码
这两行都执行到 keyBy 行。但是,加入后打印不会打印任何内容。没有错误等...
麻烦。我在等一个答案...
(控制台日志)</p>
窗口处理似乎每 10 秒进行一次,但处理时间却分崩离析。此外,我发现如果我使用 DataflowRunner 而不是 DirectRunner 启动它,它会成功。