4

我找不到任何使用 Python 在线订阅和使用来自 Azure 事件中心的消息的文档。我知道这在 C、C# 和 Java 中是可能的。我只需要知道是否可以使用 Python。

Azure python SDK 目前似乎只支持发送消息,但不打开异步连接以不断接收来自事件中心的消息。 http://azure-sdk-for-python.readthedocs.org/en/latest/servicebus.html#event-hub

4

3 回答 3

3

我发现从 python 连接到 EventHubs 的唯一方法是使用 python-qpid-proton library/pypi 模块。

这是因为 eventthubs 使用 amqp 1.0 + TLS,所以您会发现的大多数其他库都不起作用(它们实现 <= amqp 0.9)。

我仍然希望找到一个更容易在 Windows 上与 python 一起使用的解决方案,但这应该可以在 OS X 和 Linux 机器上正常工作。

于 2015-08-22T03:28:13.727 回答
2

在不了解您的仪表板的详细要求的情况下,我们只能给您一些高水平的建议。简而言之,您可以将仪表板视为一个消费者组(如果有多个分区,则可以将其视为多个消费者组,因为每个组只能连接到一个分区)。您只需要使用 AMQP 连接到事件中心。在 Python 中,您可以使用 AMPQ 库,例如https://pypi.python.org/pypi/amqp。不用担心,在您的仪表板中读取事件后,这些事件不会被删除,因此它们将继续提供给其他消费者组。AMPQ 是一个标准。因此,您只需向库提供事件中心的地址、身份验证信息,然后您就可以连接到事件中心。

于 2015-06-11T14:21:49.110 回答
2

对于您的项目来说可能为时已晚,但 azure-eventhub Python SDK 现在可用,它提供了向/从事件中心服务发送/接收事件的功能。

我将在此处发布信息,以帮助用户稍后查找 SDK。

azure-eventhub v5 在 pypi 上可用:https ://pypi.org/project/azure-eventhub 。

还有一个从 v1 到 v5 的迁移指南,供正在使用 v1 sdk 的人将程序顺利迁移到 v5。

要使用来自事件中心的消息,请遵循示例代码

#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
An example to show receiving events from an Event Hub.
"""
import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']


def on_event(partition_context, event):
    # Put your code here.
    # If the operation is i/o intensive, multi-thread will have better performance.
    print("Received event from partition: {}.".format(partition_context.partition_id))


def on_partition_initialize(partition_context):
    # Put your code here.
    print("Partition: {} has been initialized.".format(partition_context.partition_id))


def on_partition_close(partition_context, reason):
    # Put your code here.
    print("Partition: {} has been closed, reason for closing: {}.".format(
        partition_context.partition_id,
        reason
    ))


def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))


if __name__ == '__main__':
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
    )

    try:
        with consumer_client:
            consumer_client.receive(
                on_event=on_event,
                on_partition_initialize=on_partition_initialize,
                on_partition_close=on_partition_close,
                on_error=on_error,
                starting_position="-1",  # "-1" is from the beginning of the partition.
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')
于 2020-12-10T00:43:44.390 回答