我有一个天蓝色函数 ServiceBus,我想在其上自定义其属性。
我有 2 个主题,我发送关于主题 1 的消息(我在其上进行一些处理)并将其输出发送到第二个主题。由于此消息来自不同资源的事实,我希望应用程序属性能够明确此消息的来源。
所以我的代码如下:
def main(message: func.ServiceBusMessage):
logging.info(message)
print(message)
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
.....
.....
message1 = str(message_body)
def send_output(sender):
message_out = ServiceBusMessage(
output_json,
content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
)
sender.send_messages(message_out)
但是当我发送消息时,该函数失败并抛出以下错误。
Result: Failure Exception: AttributeError: 'ServiceBusMessage' object has no attribute 'application_properties' Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/function/testServiceBus.py", line 118, in main send_output(sender) File "/home/site/wwwroot/function/testServiceBus.py", line 108, in send_output application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
我试图message
用变量替换,message1
但在这种情况下我得到一个错误str does not contain an attribute...
任何帮助将不胜感激,以了解我做错了什么并更好地了解它是如何工作的。
非常感谢您,如果您需要更多信息,请告诉我
更新。
根据 Microsoft 文档,从servicebus 5.x
它们替换 user_properties with
applicationProperties` 我确实尝试了它们,但没有任何效果。
这是为了让OP有点清楚。
这是我更新的代码
import logging
import json
import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage
def main(message: func.ServiceBusMessage):
# Log the Service Bus Message as plaintext
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
result = json.dumps({
'message_id': message.message_id,
'body': message.get_body().decode('utf-8'),
'content_type': message.content_type,
'user_properties': message.user_properties,
'metadata' : message.metadata
})
logging.info(result)
# logging.info("Python ServiceBus topic trigger processed message.")
# logging.info("Message Content Type: " + message_content_type)
# logging.info("Message Body: " + message_body)
CONN_STR = "XXX"
topic_b = "topic_b"
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONN_STR)
def send_output(sender):
message_out = ServiceBusMessage(
result,
content_type="application/json", #setting the content type so that the service bus can route it.
user_properties={b'tenant': result.user_properties[b'MessageId']} #setting the tenant code
)
sender.send_messages(message_out)
servicebus_client_out = servicebus_client.from_connection_string(conn_str=CONN_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client_out.get_topic_sender(topic_name=topic_b)
with sender:
send_output(sender)
出于测试目的,我试图在 with 上添加一个自定义属性以返回,message_id
因为它是传入消息的一部分。但我收到此错误
Result: Failure Exception: AttributeError: 'str' object has no attribute 'user_properties' Stack
所以我尝试了sender
消息是
Result: Failure Exception: AttributeError: 'ServiceBus' object has no attribute 'user_properties' Stack
这是我在 Microsoft 文档中的任何地方都找不到的东西。
因此,为了确保一切正常,我尝试使用以下代码
application_properties={'tenant': 'DEMO'}
这工作得很好。
如果有人对此问题有任何提示,我将不胜感激