2

我是条老狗,想学新把戏。我非常熟悉一种名为 PowerBuilder 的语言,在这种语言中,当您想要异步执行操作时,您会在新线程中生成一个对象。我将重申:整个对象在单独的线程中实例化,并具有不同的执行上下文。该对象上的所有方法都在该单独线程的上下文中执行。

现在,我正在尝试使用 C# 实现一些异步执行,而 .NET 中的线程模型对我来说完全不同。看起来我正在一个线程中实例化对象,但我可以指定(在逐个调用的基础上)某些方法在不同的线程中执行。

差异似乎很微妙,但这让我感到沮丧。我的老派想法是:“我有一个叫鲍勃的帮手。鲍勃出去做事。” 如果我理解正确的话,新派的想法是“我鲍勃。如果需要,我有时可以一边揉肚子,一边拍头

我的实际编码问题:我正在编写一个接口引擎,它通过 TCP 接受消息,将它们解析为可用数据,然后将该数据放入数据库中。“解析”一条消息大约需要一秒钟。根据解析的数据,数据库操作可能需要不到一秒或十秒。(所有时间都是为了澄清问题而编造的。)

我的老派想法告诉我,我的数据库类应该存在于一个单独的线程中,并且具有类似ConcurrentQueue. 它只会在该队列上旋转,处理其中可能存在的任何内容。另一方面,解析器需要将消息推送到该队列中。这些消息将是(代表?)诸如“根据中的数据创建订单this object”或“根据中的数据更新订单this object”之类的内容。可能值得注意的是,我实际上想以严格的单线程 FIFO 顺序处理“队列”中的“消息”。

基本上,我的数据库连接不能总是跟上我的解析器。我需要一种方法来确保我的解析器在我的数据库进程试图赶上时不会变慢。建议?

-- 编辑:用代码!每个人和一切都在告诉我要使用BlockingCollection. 所以这里是对最终目标和代码的简要说明:

这将是一项 Windows 服务。启动时,它将产生多个“环境”,每个“环境”包含一个“dbworker”和一个“接口”。“接口”将有一个“解析器”和一个“监听器”。

class cEnvironment {
    private cDBWorker MyDatabase;
    private cInterface MyInterface;

    public void OnStart () {
        MyDatabase = new cDBWorker ();
        MyInterface = new cInterface ();

        MyInterface.OrderReceived += this.InterfaceOrderReceivedEventHandler;

        MyDatabase.OnStart ();
        MyInterface.OnStart ();
    }

    public void OnStop () {
        MyInterface.OnStop ();
        MyDatabase.OnStop ();

        MyInterface.OrderReceived -= this.InterfaceOrderReceivedEventHandler;
    }

    void InterfaceOrderReceivedEventHandler (object sender, OrderReceivedEventArgs e) {
        MyDatabase.OrderQueue.Add (e.Order);
    }
}

class cDBWorker {
    public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> ();
    private Task ProcessingTask;

    public void OnStart () {
        ProcessingTask = Task.Factory.StartNew (() => Process (), TaskCreationOptions.LongRunning);
    }

    public void OnStop () {
        OrderQueue.CompleteAdding ();
        ProcessingTask.Wait ();
    }

    public void Process () {
        foreach (cOrder Order in OrderQueue.GetConsumingEnumerable ()) {
            switch (Order.OrderType) {
                case 1:
                    SuperFastMethod (Order);
                    break;

                case 2:
                    ReallySlowMethod (Order);
                    break;
            }
        }
    }

    public void SuperFastMethod (cOrder Order) {
    }

    public void ReallySlowMethod (cOrder Order) {
    }
}

class cInterface {
    protected cListener MyListener;
    protected cParser MyParser;

    public void OnStart () {
        MyListener = new cListener ();
        MyParser = new cParser ();

        MyListener.DataReceived += this.ListenerDataReceivedHandler;
        MyListener.OnStart ();
    }

    public void OnStop () {
        MyListener.OnStop ();
        MyListener.DataReceived -= this.ListenerDataReceivedHandler;
    }

    public event OrderReceivedEventHandler OrderReceived;

    protected virtual void OnOrderReceived (OrderReceivedEventArgs e) {
        if (OrderReceived != null)
            OrderReceived (this, e);
    }

    void ListenerDataReceivedHandler (object sender, DataReceivedEventArgs e) {
        foreach (string Message in MyParser.GetMessages (e.RawData)) {
            OnOrderReceived (new OrderReceivedEventArgs (MyParser.ParseMessage (Message)));
        }
    }

它编译。(运送它!)但这是否意味着我做对了?

4

1 回答 1

3

BlockingCollection使得将这种东西放在一起非常容易:

// the queue
private BlockingCollection<Message> MessagesQueue = new BlockingCollection<Message>();


// the consumer
private MessageParser()
{
    foreach (var msg in MessagesQueue.GetConsumingEnumerable())
    {
        var parsedMessage = ParseMessage(msg);
        // do something with the parsed message
    }
}

// In your main program
// start the consumer
var consumer = Task.Factory.StartNew(() => MessageParser(),
    TaskCreationOptions.LongRunning);

// the main loop
while (messageAvailable)
{
    var msg = GetMessageFromTcp();
    // add it to the queue
    MessagesQueue.Add(msg);
}

// done receiving messages
// tell the consumer that no more messages will be added
MessagesQueue.CompleteAdding();

// wait for consumer to finish
consumer.Wait();

消费者在队列上进行非忙碌等待,因此当没有可用资源时它不会占用 CPU 资源。

于 2013-11-07T23:26:08.630 回答