0

我正在尝试使用 python 将发布/订阅消息加载到 BigQuery。我可以将消息加载到 BigQuery。但是,数据带有不正确的架构。这是我编写的代码示例及其在 BigQuery 中的输出和屏幕截图。你知道如何解决这个问题吗?

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
from google.cloud import bigquery
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    message_data = json.loads(message.data)
    print(f"Received a JSON-encoded message:\n{message_data}")
    message_data
    t_message_data = tuple(message_data.items())
    print(t_message_data)
    message.ack()

    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset('message')
    table_ref = dataset_ref.table('pubsub_message')
    table = bigquery_client.get_table(table_ref)
    errors = bigquery_client.insert_rows(table, t_message_data)


streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)

在此处输入图像描述 在此处输入图像描述

4

0 回答 0