我找不到任何使用 Python 在线订阅和使用来自 Azure 事件中心的消息的文档。我知道这在 C、C# 和 Java 中是可能的。我只需要知道是否可以使用 Python。
Azure python SDK 目前似乎只支持发送消息,但不打开异步连接以不断接收来自事件中心的消息。 http://azure-sdk-for-python.readthedocs.org/en/latest/servicebus.html#event-hub
我找不到任何使用 Python 在线订阅和使用来自 Azure 事件中心的消息的文档。我知道这在 C、C# 和 Java 中是可能的。我只需要知道是否可以使用 Python。
Azure python SDK 目前似乎只支持发送消息,但不打开异步连接以不断接收来自事件中心的消息。 http://azure-sdk-for-python.readthedocs.org/en/latest/servicebus.html#event-hub
我发现从 python 连接到 EventHubs 的唯一方法是使用 python-qpid-proton library/pypi 模块。
这是因为 eventthubs 使用 amqp 1.0 + TLS,所以您会发现的大多数其他库都不起作用(它们实现 <= amqp 0.9)。
我仍然希望找到一个更容易在 Windows 上与 python 一起使用的解决方案,但这应该可以在 OS X 和 Linux 机器上正常工作。
在不了解您的仪表板的详细要求的情况下,我们只能给您一些高水平的建议。简而言之,您可以将仪表板视为一个消费者组(如果有多个分区,则可以将其视为多个消费者组,因为每个组只能连接到一个分区)。您只需要使用 AMQP 连接到事件中心。在 Python 中,您可以使用 AMPQ 库,例如https://pypi.python.org/pypi/amqp。不用担心,在您的仪表板中读取事件后,这些事件不会被删除,因此它们将继续提供给其他消费者组。AMPQ 是一个标准。因此,您只需向库提供事件中心的地址、身份验证信息,然后您就可以连接到事件中心。
对于您的项目来说可能为时已晚,但 azure-eventhub Python SDK 现在可用,它提供了向/从事件中心服务发送/接收事件的功能。
我将在此处发布信息,以帮助用户稍后查找 SDK。
azure-eventhub v5 在 pypi 上可用:https ://pypi.org/project/azure-eventhub 。
还有一个从 v1 到 v5 的迁移指南,供正在使用 v1 sdk 的人将程序顺利迁移到 v5。
要使用来自事件中心的消息,请遵循示例代码:
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an Event Hub.
"""
import os
from azure.eventhub import EventHubConsumerClient
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
def on_partition_initialize(partition_context):
# Put your code here.
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
# Put your code here.
print("Partition: {} has been closed, reason for closing: {}.".format(
partition_context.partition_id,
reason
))
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
if __name__ == '__main__':
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stopped receiving.')