I am implementing a single Producer/Consumer Pattern using BlockingCollection.
When i click 'c' from keyboard i want to cancel the operation using CancellationToken.
The strange thing is that if i press 'c' as fast as i can after i run the program, the program listen to the event.
If i click 'c' later lets say at 45000th iteration the program doesnt react.
I have a for loop that populates the producer.
for (int i = 0; i < 50000; i++)
{
logger.AddToQueue("Number with flush " + i, true);
}
logger.DataItems.CompleteAdding();
At the constructor of logger i call this method:
private Task t;
public void KeyPress()
{
t = Task.Run(() =>
{
if (Console.ReadKey(true).KeyChar == 'c')
{
cts.Cancel();
}
});
}
I dont know if the error is relevant to the other methods but i will post them just in case:
The addToQueue (Producer):
public void AddToQueue(String text, bool isFlushOn) {
Consumer(isFlushOn);
try {
_dataItems.TryAdd(new LogLine() { Text = text, Timestamp = DateTime.Now }, 0, ct);
} catch (OperationCanceledException) {
_dataItems.CompleteAdding();
}
}
and the consumer:
Task task = null;
public void Consumer(bool isStopWithFlushOn)
{
if (task == null)
{
task = Task.Run(() =>
{
while (!_dataItems.IsCompleted) {
try {
LogLine data = null;
if (!_dataItems.TryTake(out data, 5, ct)) {
Console.WriteLine(" Take Blocked");
} else {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.Append(data.Timestamp.ToString("yyyy-MM-dd HH:mm:ss:fff"));
stringBuilder.Append("\t");
stringBuilder.Append(data.LineText());
stringBuilder.Append("\t");
_writer.WriteLine(stringBuilder.ToString());
Console.WriteLine(" Take:{0}", data.Text);
}
} catch (OperationCanceledException) {
if (isStopWithFlushOn) {
Console.WriteLine("Canceled with flush.");
foreach (var dataItem in _dataItems.GetConsumingEnumerable()) {
Console.WriteLine("Canceled Take:{0}", dataItem.Text);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.Append(dataItem.Timestamp.ToString("yyyy-MM-dd HH:mm:ss:fff"));
stringBuilder.Append("\t");
stringBuilder.Append("Number with flush " + dataItem.LineText());
stringBuilder.Append("\t");
_writer.WriteLine(stringBuilder.ToString());
Thread.SpinWait(500000);
}
} else {
Console.WriteLine("Canceled without flush.");
break;
}
}
}
Console.WriteLine("\r\nNo more items to take.");
});
}
}