我正在编写交易软件。我的系统生成需要处理的“顺序项目”(1,2,3...)(在我的应用程序中,每个项目都是一个执行订单,id 被称为内部订单 ID)。
我有很多线程可以生产项目(许多策略),但可以保证每个项目都只生产一次。
我只有一个处理器(订单执行者)应该
- 每当有新项目可用时提交执行(下订单)
- 在回调中接收结果并更新它“项目结果”(从证券交易所获得响应)
每个生产者都应该
- 提交要执行的“顺序包”项目(例如提交“1234,1235,1236,1237”)
- 阻止,直到所有项目的结果都可用。当所有项目的结果都可用时 - 处理
注意:
- 项目可以从不同的线程并行提交
- 我需要最小的延迟和最小的“锁定”
- 很高兴有一个易于移植到 C++ 的代码
- 在任何时候,我都有相当“有限”数量的“实时”ID。例如,我不能同时拥有 live id 的“1 和 10000”。因为每当创建新 id 时 - 它必须被处理和清理。所以我总是有一组彼此接近的id(例如~9900-10000)。所以使用循环数组来实现可能是有意义的
如果您可以提出建议-请提出建议。我在下面添加了我奇怪的实现,但没有必要阅读它。
这是我的问题实现:
private Dictionary<uint, AutoResetEvent> transactionsEvents = new Dictionary<uint, AutoResetEvent>();
private Dictionary<uint, TransactionResult> transactionsResults = new Dictionary<uint, TransactionResult> ();
public void IssueOrders(List<OrderAction> actions)
{
int count = actions.Count;
if (count == 0)
{
return;
}
uint finishUserId = (uint) apiTransactions.counter.Next(count);
uint startUserId = finishUserId + 1 - (uint) count;
AutoResetEvent[] events = new AutoResetEvent[count];
for (int i = 0; i < count; i++)
{
var action = actions[i];
uint userId = startUserId + (uint) i;
action.UserId = userId;
var e = new AutoResetEvent(false);
events[i] = e;
transactionsEvents[userId] = e;
}
for (int i = 0; i < count; i++)
{
var action = actions[i];
apiTransactions.ScheduleOrderAction(action);
}
WaitHandle.WaitAll(events);
// now all answers are available, need to apply information
foreach (var action in actions)
{
UpdateActionWithResult(action, transactionsResults[action.UserId]);
transactionsResults.Remove(action.UserId);
}
}
ScheduleOrderAction
将项目添加到 BlockingCollection。处理器执行 BlockingCollection 中的项目,将结果放入 transactionsResults 并引发相应的事件。
我的实现有很多问题:
- 我从不同的线程访问(和修改)字典。例如,当一个线程从
transactionResults
另一个线程(处理器)中删除项目时,可能会向其添加项目。 - 我不想切换到 ConcurentDictionaries,因为即使是 Dictionary 对我来说已经太贵了(就速度而言)