0


我正在关注 PubSub lite 的以下文档,并尝试接收我使用以下代码发布的消息。回调函数中出现错误。
https://cloud.google.com/pubsub/lite/docs/subscribing
发布消息

#code for publishing message which runs fine
import json
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
project_number = 97466088
cloud_region = "us-west1"
zone_id = "b"
topic_id = "mytestopic"
# num_messages = 100

location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    f = open('fivekey_10000.json')
    data = json.load(f)
    #file = open("data_new_file.txt", "r")
    #file1 = file.read().split("\n")
    for message in data.items():
        data = message[0].encode("utf-8")
        ordering_key = message[1]
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data=data, ordering_key=ordering_key)
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Published messages with ordering keys to {topic_path}."
)
4

1 回答 1

0

我尝试按照文档使用排序键将消息发布到主题。

要从订阅接收消息,您需要从 Lite 订阅请求消息。这可以通过 gcloud 命令行或任何客户端库来完成。请注意,Pub/Sub lite 主题和订阅是区域资源,即。他们需要在同一个区域和项目中。

接收消息时,回调函数将Pubsub Message其作为唯一参数,并分别确认显示消息数据和元数据的消息。

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

如果您仍然遇到问题,请提供有关错误的更多信息以及回溯。

于 2022-02-15T08:53:10.710 回答