1

我正在使用durable function singleton patternon consumption plan。我只想为队列消息中的相同 ID 执行该函数的一个实例。

我正在local环境中测试我的代码。尽管函数成功完成,但 DurableOrchestrationClient.GetStatusAsync() 返回具有相同实例 ID 的实例仍在执行。

使用Storage Explorer,我发现我的函数行Table>DurableFunctionHubInstances RuntimeStatus也被标记为Completed

为什么DurableOrchestrationClient.GetStatusAsync()报告存在具有相同 Instance Id 的实例?我怎样才能解决这个问题?

[FunctionName("OF_QueueStart")]
public static async void QueueStart(
[QueueTrigger("queue-name", Connection = "connection")]MyObject req,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
    log.LogInformation("some logging here");

    string instanceId = req.MyTenant + "-" + req.MyId;

    log.LogInformation($"Checking if instance with instance ID {instanceId} already exists.");

    var existingInstance = await starter.GetStatusAsync(instanceId);
    if (existingInstance == null)
    {
        log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
        await starter.StartNewAsync("OF", instanceId, req);
        log.LogInformation($"Started orchestration with ID = {instanceId}.");
    }
    else
    {
        log.LogInformation($"OF instance with Instance ID {instanceId} already exists.");
    }
}

[FunctionName("OF")]
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"Executing orchestration with instance id {context.InstanceId}");

    var input = context.GetInput<MyObject>();

    var myObject2= await context.CallActivityAsync<MyObject2>("OF_F1", input.MyTenant);
    var myObject3 = await context.CallActivityAsync<MyObject3>("OF_F2", input.MyId);

    SqlConnection conn = new SqlConnection(GetConnectionString(tenantInfo.DbConnection));

    /*Here will call OF_F3 and pass conn as a parameter. Currently this line is not written*/
    conn.Close();
}
4

2 回答 2

1

似乎existingInstance充满了DurableFunctionHubInstances表中具有相同的行Partition Key。我们还需要检查它RuntimeStatus

    var existingInstance = await starter.GetStatusAsync(instanceId);
    if (existingInstance == null || existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
    {
        log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
        await starter.StartNewAsync("OF", instanceId, req);
        log.LogInformation($"Started orchestration with ID = {instanceId}.");
    }
于 2019-02-11T10:11:53.697 回答
0

Python 代码:在使用新实例 ID 执行之前检查状态

async def main(event: func.EventGridEvent, SingletonOrchestration: str):
    client = df.DurableOrchestrationClient(SingletonOrchestration)
    instance_id = client_name + engagement_name + trigger_pattern
    logging.info(f"Instance id '{instance_id}'")
    logging.info(input_instanc_dict)
    flag_set = False
    loop_count = 0
    while flag_set == False: 
        existing_instance = await **client.get_status(instance_id,False,False,False)**
        logging.info(f"Orchestration with ID '{instance_id}' status ")
        logging.info(existing_instance.__dict__)
         if existing_instance._runtime_status == "Completed" or existing_instance._runtime_status == None or  existing_instance._runtime_status == "Failed" or  existing_instance._runtime_status == "Terminated" :
            instance_id = **await client.start_new("GenericEventGridOrchestrator", instance_id, input_instanc_dict)**
            logging.info(f"Started orchestration with ID = '{instance_id}'.")
            flag_set = True
于 2020-09-15T07:41:26.277 回答