2

I am implementing a single Producer/Consumer Pattern using BlockingCollection.

When i click 'c' from keyboard i want to cancel the operation using CancellationToken.

The strange thing is that if i press 'c' as fast as i can after i run the program, the program listen to the event.

If i click 'c' later lets say at 45000th iteration the program doesnt react.

I have a for loop that populates the producer.

        for (int i = 0; i < 50000; i++)
        {
            logger.AddToQueue("Number with flush " + i, true);
        }
        logger.DataItems.CompleteAdding();

At the constructor of logger i call this method:

    private Task t;
    public void KeyPress()
    {

       t = Task.Run(() =>
            {
                if (Console.ReadKey(true).KeyChar == 'c')
                {
                        cts.Cancel();
                }
            });
    }

I dont know if the error is relevant to the other methods but i will post them just in case:

The addToQueue (Producer):

    public void AddToQueue(String text, bool isFlushOn) {
         Consumer(isFlushOn);
            try {
                 _dataItems.TryAdd(new LogLine() { Text = text, Timestamp = DateTime.Now }, 0, ct);
            } catch (OperationCanceledException) {
                _dataItems.CompleteAdding();
            }
    }

and the consumer:

    Task task = null;
    public void Consumer(bool isStopWithFlushOn)
    {

        if (task == null)
        {
            task = Task.Run(() =>
            {
                while (!_dataItems.IsCompleted) {
                    try {
                        LogLine data = null;
                        if (!_dataItems.TryTake(out data, 5, ct)) {
                            Console.WriteLine(" Take Blocked");
                        } else {
                            StringBuilder stringBuilder = new StringBuilder();
                            stringBuilder.Append(data.Timestamp.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                            stringBuilder.Append("\t");
                            stringBuilder.Append(data.LineText());
                            stringBuilder.Append("\t");
                            _writer.WriteLine(stringBuilder.ToString());
                            Console.WriteLine(" Take:{0}", data.Text);
                        }
                    } catch (OperationCanceledException) {
                        if (isStopWithFlushOn) {
                            Console.WriteLine("Canceled with flush.");
                            foreach (var dataItem in _dataItems.GetConsumingEnumerable()) {
                                Console.WriteLine("Canceled Take:{0}", dataItem.Text);
                                StringBuilder stringBuilder = new StringBuilder();

                                stringBuilder.Append(dataItem.Timestamp.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                                stringBuilder.Append("\t");
                                stringBuilder.Append("Number with flush " + dataItem.LineText());
                                stringBuilder.Append("\t");
                                _writer.WriteLine(stringBuilder.ToString());
                                Thread.SpinWait(500000);
                            }
                        } else {
                            Console.WriteLine("Canceled without flush.");
                            break;
                        }
                    }

                }
                Console.WriteLine("\r\nNo more items to take.");
            });
        }
    }
4

0 回答 0