0

我有这样的代码:

    public void IssueOrders(List<OrderAction> actions)
    {
        foreach (var action in actions)
        {
            if (action is AddOrder)
            {
                uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
                Console.WriteLine("order is placing userId = " + userId);
            }
            // TODO: implement other actions
        }
        // how to wait until OnApiTransactionsDataMessageReceived for all userId is received?

        // TODO: need to update actions with received data here
    }

    private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
    {
        var dataMsg = e.message;
        var userId = dataMsg.UserId;

apiTransactions.PlaceOrder是异步的,所以我会收到userId结果,但我会在回调 OnApiTransactionsDataMessageReceived 中接收数据。

例如,如果我下了 3 个订单,我将收到 3 个 userId,例如134。现在我需要等到收到所有这些 userId 的数据。

userId如果这很重要,它总是在增加。这几乎是整数序列,但由于并行执行,可能会省略一些数字。

升级版注意事项:

  • IssueOrders 可以从不同的线程并行执行
  • callack 可以在 PlaceOrder 返回之前被调用

UPD2

可能我需要重构下面的 PlaceOrder 代码,以便我可以保证userId在收到“回调”之前知道它:

    public uint PlaceOrder(Order order)
    {
        Publisher pub = GetPublisher();

        SchemeDesc schemeDesc = pub.Scheme;
        MessageDesc messageDesc = schemeDesc.Messages[0]; //AddMM
        FieldDesc fieldDesc = messageDesc.Fields[3];
        Message sendMessage = pub.NewMessage(MessageKeyType.KeyName, "FutAddOrder");

        DataMessage smsg = (DataMessage)sendMessage;
        uint userId = counter.Next();
        FillDataMessageWithPlaceOrder(smsg, order, userId);

        System.Console.WriteLine("posting message dump: {0}", sendMessage);
        pub.Post(sendMessage, PublishFlag.NeedReply);
        sendMessage.Dispose();

        return userId;
    }

所以我需要将 PlaceOrder 拆分为两种方法:userId CreateOrdervoid PostOrder. 这将保证当收到回调时我知道userId

4

3 回答 3

1

我会查看 Reactive Framework 中的ForkJoin方法。它将阻塞,直到多个异步调用完成。

编辑:似乎 ForkJoin() 只包含在 Rx 的实验版本中。这是基于 Merge() 对您想要的内容的讨论。

于 2012-09-05T08:32:37.850 回答
0

最愚蠢和最有效的方法之一是:

public void IssueOrders(List<OrderAction> actions)
{
    var userIds = new List<uint>();
    lock(theHashMap)
        theHashMap[userIds] = "blargh";

    foreach (var action in actions)
    {
        if (action is AddOrder)
        {
            lock(userIds)
            {
               uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
               Console.WriteLine("order is placing userId = " + userId);

               userIds.Add(userId);
            }
        }

        // TODO: implement other actions
    }

    // waiting:
    do
    {
       lock(userIds)
          if(userIds.Count == 0)
             break;

       Thread.Sleep(???); // adjust the time depending on how long you wait for a callback on average

    }while(true);

    lock(theHashMap)
        theHashMap.Remove(userIds);

    // now you have the guarantee that all were received
}

private Dictionary<List<uint>, string> theHashMap = new Dictionary<List<uint>,string>();

private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
{
    var dataMsg = e.message;
    var userId = dataMsg.UserId;

    // do some other things

    lock(theHashMap)
        foreach(var list in theHashMap.Keys)
           lock(list)
              if(list.Remove(userId))
                 break;
}

但是,这是一种相当粗暴的方法。除非你解释等待是什么意思,否则很难提出更多建议——正如乔恩在评论中所问的那样。例如,您可能想离开IssueOrders,在任何地方等待,并确保在所有人都到达时完成了一些额外的工作?或者,IssueOrders除非全部收到,否则您不能离开?ETC..

编辑:请注意,在 ADD 附近,锁必须PlaceOrder 之前,否则,当回调超快到达时,回调可能会在添加之前尝试删除 ID。另外,请注意这个实现非常幼稚:回调必须每次都搜索并锁定所有列表。加上一些额外的字典/地图/索引,它可能会被优化很多,但为了可读性我没有在这里做。

于 2012-09-05T08:40:56.530 回答
0

如果您能够更改 API,请考虑使用Task Parallel Library,您的代码将变得更加容易。

否则AutoResetEvent可能会帮助您:

    private Dictionary<int, AutoResetEvent> m_Events = new ...;

    public void IssueOrders(List<OrderAction> actions)
    {
        foreach (var action in actions)
        {
            if (action is AddOrder)
            {
                uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
                // Attention: Race condition if PlaceOrder finishes                                         
                // before the MRE is created and added to the dictionary!
                m_Events[userId] = new ManualResetEvent(false);
                Console.WriteLine("order is placing userId = " + userId);
            }
            // TODO: implement other actions
        }

        WaitHandle.WaitAll(m_Events.Values);

        // TODO: Dispose the created MREs
    }

    private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
    {
        var dataMsg = e.message;
        var userId = dataMsg.UserId;

        m_Events[userId].Set();
    }
于 2012-09-05T08:41:14.673 回答