0

我正在尝试按照Marton的这篇文章创建谷歌云工作流程

我正在尝试使用云工作流执行一个大型查询过程,并且在执行该过程时,我需要获取表的最后更新时间戳,因为我只想合并目标表中稍后出现的那些记录。我只需要增量记录。

下面是我的云工作流代码。

main:
  steps:
    - getList:
        call: BQ_Query
        args:
          query: >
              SELECT 
              TIMESTAMP_MILLIS(creation_time) AS creation_time,
              FROM `project.dataset.__TABLES__` where table_id='table_name'
        result: items
    - data :
        call: BQ_Results_LoopItems
        args:
          items: ${items.rows}
        result: res

BQ_Results_LoopItems:
  params: [items]
  steps:
    - getupdatetime:
        call: BQ_Task
        args:
            table_id: ${items[0].f[0].v}
        result: result
   

BQ_Task:
  params: [table_id]
  steps:
    - delete:
        call: BQ_Query
        args:
         query: >       
              DECLARE last_update_date TIMESTAMP DEFAULT NULL;
              DECLARE source_dataset_name STRING DEFAULT NULL;
              DECLARE source_table_name STRING DEFAULT NULL;
              DECLARE destination_dataset_name STRING DEFAULT NULL;
              DECLARE destination_table_name STRING DEFAULT NULL;
              DECLARE recordstamp_column_name STRING DEFAULT NULL;
              DECLARE flag_column_name STRING DEFAULT NULL;
              DECLARE is_deleted_column_name STRING DEFAULT NULL;
              DECLARE hash_id_column_name STRING DEFAULT NULL;
              DECLARE pk_colmun_name_list ARRAY<STRING> DEFAULT NULL;
              CALL `project.dataset.procedure_name`("$table_id"", 'dataset', 'table','dataset', 'table', 'column1', 'column2', 'column3', 'column4', ['column5','column6','column7'])
        result: queryResult
    - documentFound:
        return: ${queryResult}



BQ_Query:
    params: [query]
    steps:
      - runBQquery: 
          call: googleapis.bigquery.v2.jobs.query
          args:
              projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              body:
                  useLegacySql: false
                  query: ${query}
          result: queryResult
      - documentFound:
          return: ${queryResult}

我这样做是为了获取表的最后更新时间戳。我不确定这是否正确,但我收到如下错误

HTTP server responded with error code 400
in step "runBQquery", routine "BQ_Query", line: 55
in step "delete", routine "BQ_Task", line: 31
in step "getupdatetime", routine "BQ_Results_LoopItems", line: 21
in step "data", routine "main", line: 12
{
  "body": {
    "error": {
      "code": 400,
      "errors": [
        {
          "domain": "global",
          "location": "q",
          "locationType": "parameter",
          "message": "Syntax error: Unclosed string literal at [1:600]",
          "reason": "invalidQuery"
        }
      ],
      "message": "Syntax error: Unclosed string literal at [1:600]",
      "status": "INVALID_ARGUMENT"
    }
  },
  "code": 400,
  "headers": {
    "Alt-Svc": "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000,h3-T051=\":443\"; ma=2592000,h3-Q050=\":443\"; ma=2592000,h3-Q046=\":443\"; ma=2592000,h3-Q043=\":443\"; ma=2592000,quic=\":443\"; ma=2592000; v=\"46,43\"",
    "Cache-Control": "private",
    "Content-Length": "371",
    "Content-Type": "application/json; charset=UTF-8",
    "Date": "Fri, 09 Jul 2021 17:25:46 GMT",
    "Server": "ESF",
    "Vary": "Origin",
    "X-Content-Type-Options": "nosniff",
    "X-Frame-Options": "SAMEORIGIN",
    "X-Xss-Protection": "0"
  },
  "message": "HTTP server responded with error code 400",
  "tags": [
    "HttpError"
  ]
}

我无法解决此错误。为上次更新时间戳调用变量时语法有什么问题吗?请忽略命名约定和变量名。我只是想先把它做好

4

0 回答 0