我正在尝试解决这个我不完全理解为什么不起作用的问题。
我有2个主题。TOPIC_A
我可以在上面发送我的消息并正确接收它们。收到消息后,我想将其发送到另一个主题,TOPIC_B
. 到目前为止,我一直在本地测试这段代码,一切正常。但是自从我开始使用 azure.function 服务总线之后。代码开始表现得很有趣。这是我的代码:
import logging
import azure.functions as func
import json
import boto3
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient, ServiceBusMessage
def main(message: func.ServiceBusMessage):
logging.info(message)
print(message)
#KeyVault Configuration
KeyVault_Url = f'url'
credential = DefaultAzureCredential()
client_keyvault = SecretClient(vault_url=KeyVault_Url, credential=credential)
# # Service Bus Connection string
CONNECTION_STR = client_keyvault.get_secret("CONN").value
# For receiving the feedback from campaigns
TOPIC_NAME_A = "TOPICA"
SUBSCRIPTION_NAME = "XXX"
# For sending feedback and results of sentiment analysis and language detection
TOPIC_NAME_B = "TOPICB"
comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1', aws_access_key_id=client_keyvault.get_secret("ID").value, aws_secret_access_key=client_keyvault.get_secret("SECRET").value)
# This block will receiver the messages from the service bus listed above.
# Please mind, once the message get received and printed (json format) that event will be destroyed from the portal service bus.
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
with servicebus_client:
receiver = servicebus_client.get_subscription_receiver(
topic_name=TOPIC_NAME_A,
subscription_name=SUBSCRIPTION_NAME
)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=60)
output_global = {}
for msg in received_msgs:
message1 = str(msg)
res = json.loads(message1)
# extracting the text from the message from service bus
text = res['Text']
#passing the text to comprehend
result_json= json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4)
result = json.loads(result_json) # converting json to python dictionary
print(result)
# logging.info("Result from comprehend" , result)
#extracting the sentiment value
sentiment = result["Sentiment"]
#extracting the sentiment score
if sentiment == "POSITIVE":
value = round(result["SentimentScore"]["Positive"] * 100,2)
elif sentiment == "NEGATIVE":
value = round(result["SentimentScore"]["Negative"] * 100,2)
elif sentiment == "NEUTRAL":
value = round(result["SentimentScore"]["Neutral"] * 100,2)
elif sentiment == "MIXED":
value = round(result["SentimentScore"]["Mixed"] * 100,2)
# To detect the language of the feedback, the text received from service bus is passed to the function below
lang_result=json.dumps(comprehend.detect_dominant_language(Text = text), sort_keys=True, indent=4)
#converting languages detection results into a dictionary
lang_result_json=json.loads(lang_result)
#Formatting the score from the results
for line in lang_result_json["Languages"]:
line['Score'] = round(line['Score']* 100, 2)
#storing the output of sentiment analysis, language detection and ids in a dictionary and converting it to JSON
output = {
'XXX': res['XXX'],
'XXX Id': res['XXX'],
'XXX': res['XXX'],
'XXX': res['XXX'],
'XXX': res['XXX'],
'Sentiment': sentiment,
'Value': value,
'Languages': lang_result_json['Languages']
}
# logging.info("Message Body: " + output)
output_json = json.dumps(output, ensure_ascii=False)
#-------------------------------------------------------------------------------------------------------
# Sending the processed output (output_json) in json format to another service bus
def send_output(sender):
message2 = ServiceBusMessage(
output_json,
content_type="XXX", #setting the content type so that the service bus can route it.
ApplicationProperties={b'tenantcode':msg.ApplicationProperties[b'tenantcode']} #setting the tenant code
)
sender.send_messages(message2)
servicebus_client = servicebus_client.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME_B)
with sender:
send_output(sender)
这是我的 host.json
{
"version": "2.0",
"extensions": {
"serviceBus": {
"messageHandlerOptions": {
"autoComplete": true
}
}
},
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[2.*, 3.0.0)"
}
}
这是我的function.json
{
"scriptFile": "outthinkServiceBus.py",
"entryPoint": "main",
"bindings": [
{
"name": "message",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "XXX",
"subscriptionName": "XXX",
"connection": "XXX"
}
]
}
在收到的内容中,我有一个 for 循环msg
,我想在该循环上遍历主题内的所有消息,并将它们一一发送到主题 B。
一切正常,在 azure 函数的输出中,我可以看到这条消息
2021-10-15 15:23:45.124
Message receiver b'receiver-link-' state changed from <MessageReceiverState.Open: 3> to <MessageReceiverState.Closing: 4> on connection: b'SBReceiver-'
Information
2021-10-15 15:23:45.552
Shutting down connection b'SBReceiver-'.
所以处理一直到接收者,但函数发送者永远不会被触发。
如果我删除 for 循环,代码就可以正常工作。我能够看到发件人成功触发并完成。
任何帮助了解错误在哪里以及我做错了什么?
非常感谢您为我提供的任何帮助。如果您需要更多信息,请询问
更新:我试图在for
循环外缩进 send_out 函数,在这种情况下,函数sent_out
触发但天蓝色函数 Servicebus 失败,因为output_json
超出范围。到目前为止,我唯一能弄清楚的是,由于某种原因,循环中定义的函数永远不会触发。