-1

I have a product to define and configure business workflows. A part of this product is a form-builder which enables users to setup different forms.

This entire forms data is backed on MongoDB in the following structure

- form_schemas

{
  "_id" : "",
  "name" : "",
  "account_id" : "",
  "fields" : [
    {
      "label" : "Enter Email",
      "name" : "email",
      "type" : "text",
      "required" : "true",
      "hidden" : "false",
      "additional_config" : { }
    },
    {
      "label" : "Select DOB",
      "name" : "dob",
      "type" : "date",
      "required" : "true",
      "hidden" : "false",
      "additional_config" : { }
    }
    ...
  ]
}

- form_datas

{
  "workflow_template_id" : "",
  "account_id" : ""
  "data" : {
    "email" : "xyx@gmail.com",
    "dob" : "2001-04-05"
  },
  "created_at" : "",
  "updated_at" : ""

}


As seen above the form can be for various different businesses. However, I am looking at data pipeline to transport the data to Google Bigquery at periodic intervals for analysis.

On BQ side, I am maintaining separate tables for each workflows

I have a current working solution which is completely written on Google Cloud Functions. I have a Google Scheduler Job run at periodic intervals invoking the different cloud functions. The cloud functions is doing the following things at high level

  • Iterate for each schema
  • Read the data mongodb for every schema since the last run (as cursor)
  • For each row of data, run the custom transformation logic (this includes transforming various nested data types like grids/lookup etc)
  • Write each row of transformed data directly as stream as ndjson on Google Cloud Storage

I above solution provides me with,

  • Complete control on transformation
  • Simple deployment

However since its all on CF, I am bound by limitation of 9 minutes per run. This essentially puts a lot of pagination requirements especially if there is a need to regenerate the complete data from beginning of time

While the above solution works fine for now, I was looking at other serverless options like Google data-flow. Since I am just starting on data-flow/apache beam, I was wondering

If I were to write a pipeline on beam, should I go with same approach of

  1. Extract(Row by Row) -> Transform -> Load (GCS) -> Load (BQ)

or

  1. Extract (entire data as JSON) -> Load to GCS -> Transform (Beam) -> Load to GCS -> Load to BQ

Let me know if there is any better option for entire data processing.

4

1 回答 1

1

通常,此类过程将原始数据写入 GCS,然后转换为 Bigquery。这样做是为了当您发现转换中的缺陷(这是不可避免的)和需求变化(也是不可避免的)时,您可以使用新代码重放数据。

理想情况下,转换之前的步骤由变更数据捕获(CDC) 工具自动执行。有很多 CDC 工具,但 Debezium 正在接管,因为它可靠且免费。有一个Debezium 连接器可以从 MongoDB 获取数据以及如何将 Debezium CDC 放入 Bigquery 的示例

如果您要编写将数据放入 GCS 的代码,我建议您考虑使用Apache Parquet而不是 NDJSON 作为格式。性能和成本会更好,而且我发现具有数据类型的格式更易于使用。

于 2020-11-30T20:23:13.420 回答