0

我希望有人可以帮助我了解我在使用这个 azure 函数时做错了什么。

我的想法是结构如下。

我有2个主题。TOPIC_A 旨在接收消息并对其进行处理并将其发送到 TOPIC_B,我只想拥有该消息,直到我决定如何处理它。

这是实际的天蓝色函数应用程序代码。

import logging
import json
import boto3
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import azure.functions as func

def main(message: func.ServiceBusMessage):
    # Log the Service Bus Message as plaintext
    message_content_type = message.content_type
    message_body = message.get_body().decode("utf-8")

    logging.info("Python ServiceBus topic trigger processed message.")
    logging.info("Message Content Type: " + message_content_type)
    logging.info("Message Body: " + message_body)

    #KeyVault Configuration
    KeyVault_Url = f'keyvailt_url'
    Keyvault_Name = 'keyvalt_name'
    credential = DefaultAzureCredential()
    client_keyvault = SecretClient(vault_url=KeyVault_Url, credential=credential)
    
    # Service Bus Connection string
    CONNECTION_STR = client_keyvault.get_secret("Service-CONN").value
    # For receiving the feedback from campaigns
    TOPIC_NAME_A = "topicAname" 
    SUBSCRIPTION_NAME = "subscriptionName"

    # For sending feedback and results of sentiment analysis and language detection
    TOPIC_NAME_B = "topicBname" 


    comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1', aws_access_key_id=client_keyvault.get_secret("ID").value, aws_secret_access_key=client_keyvault.get_secret("SECRET").value)

    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
    with servicebus_client:
        receiver = servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME_A,
            subscription_name=SUBSCRIPTION_NAME
        )
        with receiver:
            received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
            

            for msg in received_msgs:

                message= str(msg)
                res = json.loads(message) 
                text = res['Text']
                result_json= json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4)
                result = json.loads(result_json) # converting json to python dictionary

                #extracting the sentiment value 
                sentiment = result["Sentiment"]

                #extracting the sentiment score
                if sentiment == "POSITIVE":
                    value = round(result["SentimentScore"]["Positive"] * 100,2)

                elif sentiment == "NEGATIVE":
                    value = round(result["SentimentScore"]["Negative"] * 100,2)

                elif sentiment == "NEUTRAL":
                    value = round(result["SentimentScore"]["Neutral"] * 100,2)
                    
                elif sentiment == "MIXED":
                    value = round(result["SentimentScore"]["Mixed"] * 100,2)

                lang_result=json.dumps(comprehend.detect_dominant_language(Text = text), sort_keys=True, indent=4)

                #converting languages detection results into a dictionary
                lang_result_json=json.loads(lang_result)

                #Formatting the score from the results
                for line in lang_result_json["Languages"]:
                    line['Score'] = round(line['Score']* 100, 2)
                
                #storing the output of sentiment analysis, language detection and ids in a dictionary and converting it to JSON
                output={
                'ID':res['ID'],
                'userId':res['userId'],
                'defID':res['defID']
                }

                output_json = json.dumps(output, ensure_ascii=False)
                
                #-------------------------------------------------------------------------------------------------------
                # Sending the processed output (output_json) in json format to another service bus

                def send_output(sender):
                    message = ServiceBusMessage(
                    output_json,
                    content_type="Analysis", #setting the content type so that the service bus can route it.
                    application_properties={b'CODE':msg.application_properties[b'CODE']} #setting the tenant code
                    )
                    sender.send_messages(message)
                    

                servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

                with servicebus_client:
                    sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME_B)
                    with sender:
                        send_output(sender)

                receiver.complete_message(msg)

在解释我遇到的函数问题之前,我将解释如果我在本地运行此代码会发生什么。

如果我向 TOPIC_A 发送一条消息,并从 azure 门户前往该主题,我可以在 RECEIVED 选项卡中看到该消息,一旦我运行我的 python 代码,来自 TOPIC_A 的该消息将被使用并发送到 TOPIC_B,它留在已收到。这正是我所期待的输出。

现在,我将整个逻辑移到了一个 azure 函数下,我将消息发送到 TOPIC_A ..我立即看到消息以 DEADLETTERS 结尾,并且在我的 azure 函数中我收到了这个错误。

Result: Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import) (/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py). Troubleshooting Guide: https://aka.ms/functions-modulenotfound Stack

而如果我偷看死信中的消息(TOPIC_A),我会看到这条消息:

deadletterreason "MaxDeliveryCountExceeded"

据我了解,消息在死信下结束得如此之快,而我的代码正在寻找 RECEIVED 下的消息,它尝试了 10 次然后失败。

但是我难以理解的想法是为什么只有在我使用 azure 函数时才会发生这种情况。

如果有人可以帮助我理解这个问题,我会非常满意。

如果您需要更多详细信息,请告诉我。

更新:

经过不同的调试过程,我可以肯定地说问题是由于uamqp但我不明白为什么不起作用。

我在 ubuntu 工作人员上运行我的管道。

我的requirement.txt 有:

azure-functions
azure-servicebus==7.0.0
boto3
azure-keyvault
azure-identity
uamqp

在管道输出中我可以看到已经安装

uamqp-1.4.3

但我得到了同样的错误:

Result: Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import) (/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py). Troubleshooting Guide: https://aka.ms/functions-modulenotfound Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 305, in _handle__function_load_request func = loader.load_function( File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/utils/wrappers.py", line 42, in call raise extend_exception_message(e, message) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/utils/wrappers.py", line 40, in call return func(*args, **kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/loader.py", line 83, in load_function mod = importlib.import_module(fullmodname) File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1030, in _gcd_import File "<frozen importlib._bootstrap>", line 1007, in _find_and_load File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 680, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 850, in exec_module File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed File "/home/site/wwwroot/myfunction/testServiceBus.py", line 4, in <module> from azure.servicebus import ServiceBusClient, ServiceBusMessage File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/servicebus/__init__.py", line 6, in <module> from uamqp import constants File "/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py", line 12, in <module> from uamqp import c_uamqp # pylint: disable=import-self

起初,我的函数是从__init__.pyVisual Studio 代码创建的默认文件运行的。所以我认为这可能是一个名字冲突。所以我将文件名更改为,testServiceBus.py但我仍然有同样的错误。

这是我的完整配置。

函数.json

{
  "scriptFile": "testServiceBus.py",
  "entryPoint": "main",
  "bindings": [
    {
      "name": "message",
      "type": "serviceBusTrigger",
      "direction": "in",
      "topicName": "XXX",
      "subscriptionName": "XXX",
      "connection": "XXX"
    }
  ]
}

主机.json

{
  "version": "2.0",
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": {
        "autoComplete": false
      }
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

非常感谢您的任何帮助或提示,完全迷失在这里,我没有更多的想法可以尝试

4

0 回答 0