我在 python3.6 中使用 Google Pub/Sub 客户端 v2.2.0 作为订阅者。
我希望我的应用程序在确认它已经收到的所有消息后正常关闭。
来自 Google 指南的订阅者的示例代码,稍作更改将显示我的问题:
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from time import sleep
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
sleep(30)
message.ack()
print("Acked")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
sleep(10)
streaming_pull_future.cancel()
streaming_pull_future.result()
来自https://cloud.google.com/pubsub/docs/pull
我希望此代码停止提取消息并完成正在运行的消息然后退出。
实际上,此代码停止提取消息并完成执行正在运行的消息,但它不确认消息。.ack() 发生但服务器没有收到 ack,所以接下来运行相同的消息再次返回。
1、为什么服务端收不到ack?
2. 如何优雅的关闭订阅者?
3. .cancel() 的预期行为是什么?