我正在关注 PubSub lite 的以下文档,并尝试接收我使用以下代码发布的消息。回调函数中出现错误。
https://cloud.google.com/pubsub/lite/docs/subscribing
发布消息
#code for publishing message which runs fine
import json
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
project_number = 97466088
cloud_region = "us-west1"
zone_id = "b"
topic_id = "mytestopic"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
f = open('fivekey_10000.json')
data = json.load(f)
#file = open("data_new_file.txt", "r")
#file1 = file.read().split("\n")
for message in data.items():
data = message[0].encode("utf-8")
ordering_key = message[1]
# Messages of the same ordering key will always get published to the same partition.
# When ordering_key is unset, messsages can get published ot different partitions if
# more than one partition exists for the topic.
api_future = publisher_client.publish(
topic_path, data=data, ordering_key=ordering_key)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(
f"Published messages with ordering keys to {topic_path}."
)