1

upd:让我稍后重新表述我的问题。有N个双数。有 N 个专用线程,每个线程都更新自己的双数(_cachedProduct在下面的示例中)。

不知何故,我需要sum这些数字,并且我需要IndexUpdated在更改任何双数字后尽快引发事件(如果可以在 10 µs 或更短的时间内引发此类事件,那就太好了)。

以下是我尝试执行此任务的方式

================================================

为了计算证券交易所指数,我创建了private double[] _cachedProduct;字段。这些字段由许多线程编写

    // called from another threads
    public override void InstrumentUpdated(Instrument instrument)
    {
        if (!_initialized)
        {
            if (!Initialize())
            {
                return;
            }
        }
        int instrumentId = instrument.Id;
        OrderBook ob = Program.market.OrderBook(instrument);
        if (ob.MedianOrAskOrBid == null)
        {
            _cachedProduct[instrumentId] = 0;
        }
        else
        {
            _cachedProduct[instrumentId] = ((double) ob.MedianOrAskOrBid)*_ammounts[instrumentId];
        }
    }

_ammounts是预初始化的数组,请忽略Initialize方法和变量 - 它们可以正常工作。

在循环中,我只是对所有 _cachedProduct 求和,当值发生变化时,我会通知其他人。

        Task.Factory.StartNew(() =>
                {
                    while(true)
                    {
                        if (_initialized)
                        {
                            break;
                        }
                    }
                    while (true)
                    {
                        CalculateAndNotify();
                        //Thread.Sleep(5);
                    }
                }
            , TaskCreationOptions.LongRunning);


    protected void CalculateAndNotify()
    {
        var oldValue = Value;
        Calculate();
        if (oldValue != Value)
        {
            NotifyIndexChanged();
        } 
    }

    protected override void Calculate()
    {
        double result = 0;
        for (int i = 0; i < _instrumentIds.Count(); i++)
        {
            int instrumentId = _instrumentIds[i];
            if (_cachedProduct[instrumentId] == 0)
            {
                Value = null;
                return;
            }
            result += _cachedProduct[instrumentId];;
        }
        Value = result;
    }

我必须用它Interlocked来更新我的双精度_cachedProduct 值,但现在请忽略这个事实,您还看到此代码的其他问题吗?

我是否应该Calculate在内部调用方法,while(true)以便我always使用一个核心而不会延迟。我的机器有 24 个内核,所以我认为这没问题。

但是,如果没有Thread.Sleep(5)(评论),我确实看到整个程序的速度明显放缓,我不明白为什么。程序在许多地方执行慢了几十倍。

问题是我是否完全不使用while(true)任何锁定的想法是可以的。或者我应该引入一些锁定方法,以便仅Calculate在其中一个更新时进行索引_cachedProduct

4

3 回答 3

1

我认为如果您不使用额外的线程和循环来计算总和,您可能会获得更好的性能和更清晰的代码。每次更改工具时,您都会计算差异并立即更新索引并执行通知

因此,如果一个线程为单个仪器调用 InstrumentUpdated;

  change = newvalue - currentvalue;
  // used interlocked here to change the index threadsafe
  StockExchangeSum = Interlocked.Add(ref StockExchangeSum,change);
  NotifyIndexChanged();
于 2012-07-09T10:06:28.297 回答
0

double[] 可以是更复杂的类型吗?WaitHandle.WaitAny 如何比较性能?

如下所示。

private Index[] indicies;

public class Index
{
    public WaitHandle Updated =
        new EventWaitHandle(false, EventResetMode.AutoReset);
    public double _value;
    public double Value
    {
        get {return _value;}
        set
        {
            if(_value != value)
            {
                _value = value;
                Updated.Set();
            }
        }
    }
}

TaskFactory.StartNew(() =>
{
    while(true)
    {
        WaitHandle.Any(indicies.Select(i => i.Updated));
        CalculateAndNotify();
    }
});
于 2012-07-09T11:56:52.057 回答
0

你需要考虑的几点

  • 您是否尝试过将计算块与其余代码隔离开来?我在您的计算函数中注意到了这一点:

    for (int i = 0; i < _instrumentIds.Count(); i++)

    _instrumentIds.Count() 调用整个集合的迭代,并且有可能在循环的每次行程中调用它。即您正在执行 _instrumentIds 的 N^2/2 次迭代

  • _instrumentIdsIEnumerable 在此计算操作期间是否被修改?如果是这样,您可能会遇到各种导致错误答案的竞争条件。

  • 包含CalculateAndNotify 的任务是调用一次还是调用多次(嵌套)?例如,CalculateAndNotify 中是否有一些操作可能导致它被递归触发?

    如果是这样,您可能会发现同时执行多个计算(使用多个线程直到池耗尽)。您能否在操作开始/结束时包含一些日志记录,并可能计算同时计算的数量来检查这一点?

    如果这是一个问题,您可以包含一些逻辑,从而使 CalculateAndNotify 操作排队,并且在前一个计算操作完成之前无法执行新的计算操作。

于 2012-07-09T13:17:19.070 回答