0

我正在尝试解决这个我不完全理解为什么不起作用的问题。

我有2个主题。TOPIC_A我可以在上面发送我的消息并正确接收它们。收到消息后,我想将其发送到另一个主题,TOPIC_B. 到目前为止,我一直在本地测试这段代码,一切正常。但是自从我开始使用 azure.function 服务总线之后。代码开始表现得很有趣。这是我的代码:

import logging
import azure.functions as func
import json
import boto3
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient, ServiceBusMessage


def main(message: func.ServiceBusMessage):
    logging.info(message)
    print(message)
    #KeyVault Configuration
    KeyVault_Url = f'url'
    credential = DefaultAzureCredential()
    client_keyvault = SecretClient(vault_url=KeyVault_Url, credential=credential)
    
    # # Service Bus Connection string
    CONNECTION_STR = client_keyvault.get_secret("CONN").value
    # For receiving the feedback from campaigns
    TOPIC_NAME_A = "TOPICA" 
    SUBSCRIPTION_NAME = "XXX"

    # For sending feedback and results of sentiment analysis and language detection
    TOPIC_NAME_B = "TOPICB" 


    comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1', aws_access_key_id=client_keyvault.get_secret("ID").value, aws_secret_access_key=client_keyvault.get_secret("SECRET").value)


    # This block will receiver the messages from the service bus listed above.
    # Please mind, once the message get received and printed (json format) that event will be destroyed from the portal service bus.

    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
    with servicebus_client:
        receiver = servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME_A,
            subscription_name=SUBSCRIPTION_NAME
        )
        with receiver:
            received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=60)
            
            output_global = {}
            for msg in received_msgs:

                message1 = str(msg)
                res = json.loads(message1) 
                
                # extracting the text from the message from service bus
                text = res['Text']


                #passing the text to comprehend
                result_json= json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4)
                result = json.loads(result_json) # converting json to python dictionary
                print(result)
                # logging.info("Result from comprehend" , result)
                #extracting the sentiment value 
                sentiment = result["Sentiment"]

                #extracting the sentiment score
                if sentiment == "POSITIVE":
                    value = round(result["SentimentScore"]["Positive"] * 100,2)

                elif sentiment == "NEGATIVE":
                    value = round(result["SentimentScore"]["Negative"] * 100,2)

                elif sentiment == "NEUTRAL":
                    value = round(result["SentimentScore"]["Neutral"] * 100,2)
                    
                elif sentiment == "MIXED":
                    value = round(result["SentimentScore"]["Mixed"] * 100,2)

                # To detect the language of the feedback, the text received from service bus is passed to the function below
                lang_result=json.dumps(comprehend.detect_dominant_language(Text = text), sort_keys=True, indent=4)

                #converting languages detection results into a dictionary
                lang_result_json=json.loads(lang_result)

                #Formatting the score from the results
                for line in lang_result_json["Languages"]:
                    line['Score'] = round(line['Score']* 100, 2)
                
                #storing the output of sentiment analysis, language detection and ids in a dictionary and converting it to JSON
                output = {
                    'XXX': res['XXX'],
                    'XXX Id': res['XXX'],
                    'XXX': res['XXX'],
                    'XXX': res['XXX'],
                    'XXX': res['XXX'],
                    'Sentiment': sentiment,
                    'Value': value,
                    'Languages': lang_result_json['Languages']
                }
                # logging.info("Message Body: " + output)
                output_json = json.dumps(output, ensure_ascii=False)
                
                #-------------------------------------------------------------------------------------------------------
                # Sending the processed output (output_json) in json format to another service bus

                def send_output(sender):
                    message2 = ServiceBusMessage(
                    output_json,
                    content_type="XXX", #setting the content type so that the service bus can route it.
                    ApplicationProperties={b'tenantcode':msg.ApplicationProperties[b'tenantcode']} #setting the tenant code
                    )
                    sender.send_messages(message2)
                
                
                servicebus_client = servicebus_client.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
                
                with servicebus_client:
                    sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME_B)
                    with sender:
                        send_output(sender)

这是我的 host.json

{
  "version": "2.0",
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": {
        "autoComplete": true
      }
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

这是我的function.json

{
  "scriptFile": "outthinkServiceBus.py",
  "entryPoint": "main",
  "bindings": [
    {
      "name": "message",
      "type": "serviceBusTrigger",
      "direction": "in",
      "topicName": "XXX",
      "subscriptionName": "XXX",
      "connection": "XXX"
    }
  ]
}

在收到的内容中,我有一个 for 循环msg,我想在该循环上遍历主题内的所有消息,并将它们一一发送到主题 B。

一切正常,在 azure 函数的输出中,我可以看到这条消息

2021-10-15 15:23:45.124
Message receiver b'receiver-link-' state changed from <MessageReceiverState.Open: 3> to <MessageReceiverState.Closing: 4> on connection: b'SBReceiver-'
Information
2021-10-15 15:23:45.552
Shutting down connection b'SBReceiver-'.

所以处理一直到接收者,但函数发送者永远不会被触发。

如果我删除 for 循环,代码就可以正常工作。我能够看到发件人成功触发并完成。

任何帮助了解错误在哪里以及我做错了什么?

非常感谢您为我提供的任何帮助。如果您需要更多信息,请询问

更新:我试图在for循环外缩进 send_out 函数,在这种情况下,函数sent_out触发但天蓝色函数 Servicebus 失败,因为output_json超出范围。到目前为止,我唯一能弄清楚的是,由于某种原因,循环中定义的函数永远不会触发。

4

0 回答 0