我希望有人可以帮助我了解我在使用这个 azure 函数时做错了什么。
我的想法是结构如下。
我有2个主题。TOPIC_A 旨在接收消息并对其进行处理并将其发送到 TOPIC_B,我只想拥有该消息,直到我决定如何处理它。
这是实际的天蓝色函数应用程序代码。
import logging
import json
import boto3
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import azure.functions as func
def main(message: func.ServiceBusMessage):
# Log the Service Bus Message as plaintext
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
logging.info("Python ServiceBus topic trigger processed message.")
logging.info("Message Content Type: " + message_content_type)
logging.info("Message Body: " + message_body)
#KeyVault Configuration
KeyVault_Url = f'keyvailt_url'
Keyvault_Name = 'keyvalt_name'
credential = DefaultAzureCredential()
client_keyvault = SecretClient(vault_url=KeyVault_Url, credential=credential)
# Service Bus Connection string
CONNECTION_STR = client_keyvault.get_secret("Service-CONN").value
# For receiving the feedback from campaigns
TOPIC_NAME_A = "topicAname"
SUBSCRIPTION_NAME = "subscriptionName"
# For sending feedback and results of sentiment analysis and language detection
TOPIC_NAME_B = "topicBname"
comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1', aws_access_key_id=client_keyvault.get_secret("ID").value, aws_secret_access_key=client_keyvault.get_secret("SECRET").value)
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
with servicebus_client:
receiver = servicebus_client.get_subscription_receiver(
topic_name=TOPIC_NAME_A,
subscription_name=SUBSCRIPTION_NAME
)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in received_msgs:
message= str(msg)
res = json.loads(message)
text = res['Text']
result_json= json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4)
result = json.loads(result_json) # converting json to python dictionary
#extracting the sentiment value
sentiment = result["Sentiment"]
#extracting the sentiment score
if sentiment == "POSITIVE":
value = round(result["SentimentScore"]["Positive"] * 100,2)
elif sentiment == "NEGATIVE":
value = round(result["SentimentScore"]["Negative"] * 100,2)
elif sentiment == "NEUTRAL":
value = round(result["SentimentScore"]["Neutral"] * 100,2)
elif sentiment == "MIXED":
value = round(result["SentimentScore"]["Mixed"] * 100,2)
lang_result=json.dumps(comprehend.detect_dominant_language(Text = text), sort_keys=True, indent=4)
#converting languages detection results into a dictionary
lang_result_json=json.loads(lang_result)
#Formatting the score from the results
for line in lang_result_json["Languages"]:
line['Score'] = round(line['Score']* 100, 2)
#storing the output of sentiment analysis, language detection and ids in a dictionary and converting it to JSON
output={
'ID':res['ID'],
'userId':res['userId'],
'defID':res['defID']
}
output_json = json.dumps(output, ensure_ascii=False)
#-------------------------------------------------------------------------------------------------------
# Sending the processed output (output_json) in json format to another service bus
def send_output(sender):
message = ServiceBusMessage(
output_json,
content_type="Analysis", #setting the content type so that the service bus can route it.
application_properties={b'CODE':msg.application_properties[b'CODE']} #setting the tenant code
)
sender.send_messages(message)
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME_B)
with sender:
send_output(sender)
receiver.complete_message(msg)
在解释我遇到的函数问题之前,我将解释如果我在本地运行此代码会发生什么。
如果我向 TOPIC_A 发送一条消息,并从 azure 门户前往该主题,我可以在 RECEIVED 选项卡中看到该消息,一旦我运行我的 python 代码,来自 TOPIC_A 的该消息将被使用并发送到 TOPIC_B,它留在已收到。这正是我所期待的输出。
现在,我将整个逻辑移到了一个 azure 函数下,我将消息发送到 TOPIC_A ..我立即看到消息以 DEADLETTERS 结尾,并且在我的 azure 函数中我收到了这个错误。
Result: Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import) (/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py). Troubleshooting Guide: https://aka.ms/functions-modulenotfound Stack
而如果我偷看死信中的消息(TOPIC_A),我会看到这条消息:
deadletterreason "MaxDeliveryCountExceeded"
据我了解,消息在死信下结束得如此之快,而我的代码正在寻找 RECEIVED 下的消息,它尝试了 10 次然后失败。
但是我难以理解的想法是为什么只有在我使用 azure 函数时才会发生这种情况。
如果有人可以帮助我理解这个问题,我会非常满意。
如果您需要更多详细信息,请告诉我。
更新:
经过不同的调试过程,我可以肯定地说问题是由于uamqp
但我不明白为什么不起作用。
我在 ubuntu 工作人员上运行我的管道。
我的requirement.txt 有:
azure-functions
azure-servicebus==7.0.0
boto3
azure-keyvault
azure-identity
uamqp
在管道输出中我可以看到已经安装
uamqp-1.4.3
但我得到了同样的错误:
Result: Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import) (/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py). Troubleshooting Guide: https://aka.ms/functions-modulenotfound Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 305, in _handle__function_load_request func = loader.load_function( File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/utils/wrappers.py", line 42, in call raise extend_exception_message(e, message) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/utils/wrappers.py", line 40, in call return func(*args, **kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/loader.py", line 83, in load_function mod = importlib.import_module(fullmodname) File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1030, in _gcd_import File "<frozen importlib._bootstrap>", line 1007, in _find_and_load File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 680, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 850, in exec_module File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed File "/home/site/wwwroot/myfunction/testServiceBus.py", line 4, in <module> from azure.servicebus import ServiceBusClient, ServiceBusMessage File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/servicebus/__init__.py", line 6, in <module> from uamqp import constants File "/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py", line 12, in <module> from uamqp import c_uamqp # pylint: disable=import-self
起初,我的函数是从__init__.py
Visual Studio 代码创建的默认文件运行的。所以我认为这可能是一个名字冲突。所以我将文件名更改为,testServiceBus.py
但我仍然有同样的错误。
这是我的完整配置。
函数.json
{
"scriptFile": "testServiceBus.py",
"entryPoint": "main",
"bindings": [
{
"name": "message",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "XXX",
"subscriptionName": "XXX",
"connection": "XXX"
}
]
}
主机.json
{
"version": "2.0",
"extensions": {
"serviceBus": {
"messageHandlerOptions": {
"autoComplete": false
}
}
},
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[2.*, 3.0.0)"
}
}
非常感谢您的任何帮助或提示,完全迷失在这里,我没有更多的想法可以尝试