我正在尝试使用 python 提取 Pub/Sub 消息并加载到 BigQuery 的消息。我可以提取消息,但无法将其加载到 BigQuery。这是一个编写的代码示例。您知道如何使用 python 将此消息加载到 BigQuery。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"
# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)
dataset_id="message"
table_id="pubsub_message"
def write_messages_to_bq(dataset_id, table_id, subscription_path):
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
errors = client.insert_rows(table, subscription_path)
if not errors:
print('Loaded {} row(s) into {}:{}'.format(len(subscription_path), dataset_id, table_id))
else:
print('Errors:')
for error in errors:
print(error)