0

我正在尝试使用 python 提取 Pub/Sub 消息并加载到 BigQuery 的消息。我可以提取消息,但无法将其加载到 BigQuery。这是一个编写的代码示例。您知道如何使用 python 将此消息加载到 BigQuery。

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

def write_messages_to_bq(dataset_id, table_id, subscription_path):
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    table = client.get_table(table_ref)

    errors = client.insert_rows(table, subscription_path)
    if not errors:
        print('Loaded {} row(s) into {}:{}'.format(len(subscription_path), dataset_id, table_id))
    else:
        print('Errors:')
        for error in errors:
            print(error)
4

1 回答 1

1

你的代码不起作用。您无法提供对 BigQuery API 的 PubSub 订阅来加载数据。你需要做不同的事情。

您需要在 BigQuery 中逐条写入消息(因为您使用流式 API,所以这不是问题)。为此,在您的回调方法中,即在收到消息时处理消息的方法,您将消息写入 BigQuery,如果写入正常,则确认您的消息。

如果我重构您的代码(未经测试,只是为了向您展示要执行的更改)

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)

def callback(message):
    print(f"Received {message}.")
    errors = client.insert_rows(table, message)
    if not errors:
        print('Loaded {} row(s) into {}:{}'.format(len(subscription_path), dataset_id, table_id))
    message.ack()
    else:
        print('Errors:')
        for error in errors:
            print(error)

于 2021-08-02T13:08:34.797 回答