我正在尝试使用 python 将发布/订阅消息加载到 BigQuery。我可以将消息加载到 BigQuery。但是,数据带有不正确的架构。这是我编写的代码示例及其在 BigQuery 中的输出和屏幕截图。你知道如何解决这个问题吗?
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
from google.cloud import bigquery
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"
# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
message_data = json.loads(message.data)
print(f"Received a JSON-encoded message:\n{message_data}")
message_data
t_message_data = tuple(message_data.items())
print(t_message_data)
message.ack()
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset('message')
table_ref = dataset_ref.table('pubsub_message')
table = bigquery_client.get_table(table_ref)
errors = bigquery_client.insert_rows(table, t_message_data)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)