4

我正在尝试构建一个流式数据流作业,它从 Pub/Sub 读取事件并将它们写入 BigQuery。

根据文档,如果使用记录 ID,Dataflow 可以检测重复的消息传递(请参阅:https ://cloud.google.com/dataflow/model/pubsub-io#using-record-ids )

但即使使用这个记录 ID,我仍然有一些重复(大约 0.0002%)。

我错过了什么 ?

编辑:

我使用Spotify Async PubSub Client发布带有以下 snipplet 的消息:

Message
      .builder()
      .data(new String(Base64.encodeBase64(json.getBytes())))
      .attributes("myid", id, "mytimestamp", timestamp.toString)
      .build()

然后我使用Spotify scio从 pub/sub 读取消息并将其保存到 DataFlow:

val input = sc.withName("ReadFromSubscription")
              .pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
    .withName("FixedWindow")
    .withFixedWindows(windowSize)  // apply windowing logic
    .toWindowed  // convert to WindowedSCollection
    //
    .withName("ParseJson")
    .map { wv =>
      wv.copy(value = TableRow(
        "message_id" -> (Json.parse(wv.value) \ "id").as[String],
        "message" -> wv.value)
      )
    }
    //
    .toSCollection  // convert back to normal SCollection
    //
    .withName("SaveToBigQuery")
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)

窗口大小为 1 分钟。

在注入消息几秒钟后,我已经在 BigQuery 中有重复消息。

我使用此查询来计算重复项:

SELECT 
   COUNT(message_id) AS TOTAL, 
   COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table

//returning 273666  273564

而这个来看看他们:

SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
  SELECT message_id
  FROM my_dataset.my_table
  GROUP BY message_id
  HAVING COUNT(*) > 1
) ORDER BY message_id

//returning for instance:
row|id                                    | processed_at           | processed_at_epoch    
1   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410367   { ...json1... }  
2   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410368   { ...json1... }  
3   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973907   { ...json2... }  
4   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973741   { ...json2... } 
5   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:09:10 UTC 1486044550489   { ...json3... } 
6   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:08:52 UTC 1486044532680   { ...json3... }
4

1 回答 1

2

BigQuery 文档指出,可能会出现重复到达的罕见情况:

  1. “BigQuery 会记住此 ID 至少一分钟”——如果 Dataflow 在重试插入之前花费超过一分钟 BigQuery 可能允许重复输入。您可以查看管道中的日志以确定是否是这种情况.
  2. “在谷歌数据中心意外失去连接的罕见情况下,自动重复数据删除可能是不可能的。”

您可能想尝试手动删除重复项的说明。这也将允许您查看insertID与每一行一起使用的 ,以确定问题是在 Dataflow 端(insertID为同一记录生成不同的 s)还是在 BigQuery 端(未能根据它们的 对行进行重复数据删除insertID)。

于 2017-02-03T21:22:18.863 回答