1

我有一个天蓝色函数 ServiceBus,我想在其上自定义其属性。

我有 2 个主题,我发送关于主题 1 的消息(我在其上进行一些处理)并将其输出发送到第二个主题。由于此消息来自不同资源的事实,我希望应用程序属性能够明确此消息的来源。

所以我的代码如下:

def main(message: func.ServiceBusMessage):
    logging.info(message)
    print(message)
    message_content_type = message.content_type
    message_body = message.get_body().decode("utf-8")
    .....
    
    .....
    message1 = str(message_body)
    def send_output(sender):
        message_out = ServiceBusMessage(
        output_json,
        content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
        application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
        )
        sender.send_messages(message_out)

但是当我发送消息时,该函数失败并抛出以下错误。

Result: Failure Exception: AttributeError: 'ServiceBusMessage' object has no attribute 'application_properties' Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/function/testServiceBus.py", line 118, in main send_output(sender) File "/home/site/wwwroot/function/testServiceBus.py", line 108, in send_output application_properties={b'source':message.application_properties[b'source']} #setting the tenant code

我试图message用变量替换,message1但在这种情况下我得到一个错误str does not contain an attribute...

任何帮助将不胜感激,以了解我做错了什么并更好地了解它是如何工作的。

非常感谢您,如果您需要更多信息,请告诉我

更新。

根据 Microsoft 文档,从servicebus 5.x它们替换 user_properties withapplicationProperties` 我确实尝试了它们,但没有任何效果。

这是为了让OP有点清楚。

这是我更新的代码

import logging
import json

import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage


def main(message: func.ServiceBusMessage):
    # Log the Service Bus Message as plaintext

    message_content_type = message.content_type
    message_body = message.get_body().decode("utf-8")
    result = json.dumps({
        'message_id': message.message_id,
        'body': message.get_body().decode('utf-8'),
        'content_type': message.content_type,
        'user_properties': message.user_properties,
        'metadata' : message.metadata
    })
    logging.info(result)

    # logging.info("Python ServiceBus topic trigger processed message.")
    # logging.info("Message Content Type: " + message_content_type)
    # logging.info("Message Body: " + message_body)

    CONN_STR = "XXX"

    topic_b = "topic_b"
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONN_STR)
    def send_output(sender):
        message_out = ServiceBusMessage(
        result,
        content_type="application/json", #setting the content type so that the service bus can route it.
        user_properties={b'tenant': result.user_properties[b'MessageId']} #setting the tenant code
        )
        sender.send_messages(message_out)

    servicebus_client_out = servicebus_client.from_connection_string(conn_str=CONN_STR, logging_enable=True)
    
    with servicebus_client:
        sender = servicebus_client_out.get_topic_sender(topic_name=topic_b)
        with sender:
            send_output(sender)

出于测试目的,我试图在 with 上添加一个自定义属性以返回,message_id因为它是传入消息的一部分。但我收到此错误

Result: Failure Exception: AttributeError: 'str' object has no attribute 'user_properties' Stack

所以我尝试了sender

消息是

Result: Failure Exception: AttributeError: 'ServiceBus' object has no attribute 'user_properties' Stack

这是我在 Microsoft 文档中的任何地方都找不到的东西。

因此,为了确保一切正常,我尝试使用以下代码

application_properties={'tenant': 'DEMO'}

这工作得很好。

如果有人对此问题有任何提示,我将不胜感激

4

2 回答 2

1

我不得不同意文件矛盾的。

因此,让我们忽略文档并直接访问源代码:https ://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message .py#L63

这告诉我们正确的关键字参数是application_properties.

提醒:这个答案是指出分支中的当前(在撰写本文时)代码。main这似乎适用于azure-servicebuspackage的 7+ 版本。

于 2021-10-25T09:49:54.050 回答
1

@gvee 是的,文档完全矛盾,图书馆也不完全清楚。我设法解决了我的问题,我希望这对将来的任何人都有帮助。

图书馆服务总线 7+ 已解决此问题

在我的具体情况下,我需要函数 2 执行 2 个重要步骤。它们如下

  • 从主题 A.. 的传入消息中获取所有自定义属性并将特定属性保存在变量中。
  • 在将消息从主题 A 发送到主题 B 时。将保存的属性附加到消息before以将其发送出去。

正如我的 OP 中提到的,现在对我来说主要问题是库无法识别我声明的属性。这主要是由于库有 2 个属性,user_properties ANDapplication_properties。

所以我必须在我的代码中做如下。

在服务总线上,message我必须使用它user_properties来检索特定属性。

def main(message: func.ServiceBusMessage):
    logging.info(message)    
    test_user = message.user_properties['property']

这样做我能够提取我正在寻找的字符串。到目前为止,一切都很好。

然后我想将其test_user作为属性附加到消息中,同时将其发送到主题 B.. 所以我尝试再次使用,user_properties但由于无法识别该属性而失败。

这里有奇怪的想法。我用application_properties它工作,如下

    def send_output(sender):
        message_out = ServiceBusMessage(
        output_json,
        content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
        application_properties={'property': test_user}
        )
        sender.send_messages(message_out)

所以我的结论是,要检索属性,我应该使用user_properties并设置我必须使用的自定义属性application_properties

我无法详细解释为什么它会以这种方式工作,因为文档根本不清楚(至少对我而言)并且源代码与库功能没有正确对齐。

但我希望这将有助于未来的人。

于 2021-10-25T10:14:05.887 回答