需要定期执行一些操作。主要要求是
1)不开始下一个更新周期,而前一个没有完成
2)如果上一次迭代中获得的数据仍然有效,则不开始更新,即自上次刷新以来的时间小于 TTL 值
3)有单独的(比如说 >10)进行此类更新所需的线程。
SO上有很多类似的问题,所以我在@Jim这里找到了PeriodicTaskFactory的这个实现。
它按预期工作,但是当同时产生多个这样的工厂时,我开始在刷新过程中遇到一些开销,这会使整个过程变形(取消了一些即将发生的迭代)。这是代码:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CA_TasksTests
{
class Program
{
// Result
public class Result
{
public string Message { get; set; }
public Result(int iter, string info)
{
Message = "#" + iter + info;
}
public override string ToString()
{
return Message;
}
}
// Operation parameters
public class Operation
{
public string OperationName { get; set; }
public TimeSpan TTL { get { return TimeSpan.FromMilliseconds(Interval); } }
public DateTime LastUpdate { get; set; }
public Operation(int id)
{
OperationName = "Operation" + ((id < 10) ? "0" : "") + id;
}
}
public static int Interval = 2000;
public static int Duration = 10000;
public static int OperationsCount = 10;
static void Main()
{
// Creating 10 test operations
var operations = Enumerable.Range(1, OperationsCount).Select(i => new Operation(i)).ToList();
// Executing them
var r = ExecuteActions(operations).OrderBy(i => i.Message).ToList();
Console.WriteLine("Results (expected=" + (Duration/Interval*OperationsCount) + ") : " + r.Count);
Console.ReadLine();
}
// Operation execution
public static Result ExecuteOperation(int iter, Operation operation)
{
// Assiging last update timestamp
operation.LastUpdate = DateTime.Now;
var t = Task.Factory.StartNew(() =>
{
// Some operation
Thread.Sleep(1000);
return new Result(iter, operation.OperationName);
});
var r = t.Result;
return r;
}
public static List<Result> ExecuteActions(List<Operation> operations)
{
var list = new List<Result>();
var tasks = new ConcurrentBag<Task>();
foreach (var currentOperation in operations)
{
var iter = 0;
var locker = new object();
Operation operation = currentOperation;
var perdiodicTask = PeriodicTaskFactory.Start(() =>
{
// (*) Looking if we need updates semantically -
// through comparing time since last refresh with operation TTL
Console.WriteLine(DateTime.Now + " : " + (DateTime.Now - operation.LastUpdate) + " ?> " + operation.TTL);
// Looking if we need updates logically -
// if previous operation is still running
if (!Monitor.TryEnter(locker))
{
Console.WriteLine(">>>" + DateTime.Now + " Cancelled");
return;
}
try
{
// Semantic update
if (DateTime.Now - operation.LastUpdate > operation.TTL)
{
iter++;
Console.WriteLine(DateTime.Now + " Refresh #" + iter + " " + operation.OperationName);
list.Add(ExecuteOperation(iter, operation));
}
}
finally
{
Monitor.Exit(locker);
}
}, intervalInMilliseconds: (int)operation.TTL.TotalMilliseconds, duration: Duration /*maxIterations:2*/);
var end = perdiodicTask.ContinueWith(_ =>
{
_.Dispose();
Console.WriteLine(">>>" + DateTime.Now + " " + operation.OperationName + " finished");
});
tasks.Add(end);
}
Task.WaitAll(tasks.ToArray());
return list;
}
}
/// <summary>
/// Factory class to create a periodic Task to simulate a <see cref="System.Threading.Timer"/> using <see cref="Task">Tasks.</see>
/// </summary>
public static class PeriodicTaskFactory
{
/// <summary>
/// Starts the periodic task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
/// <param name="delayInMilliseconds">The delay in milliseconds, i.e. how long it waits to kick off the timer.</param>
/// <param name="duration">The duration.
/// <example>If the duration is set to 10 seconds, the maximum time this task is allowed to run is 10 seconds.</example></param>
/// <param name="maxIterations">The max iterations.</param>
/// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
/// is included in the total duration of the Task.</param>
/// <param name="cancelToken">The cancel token.</param>
/// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create the task for executing the <see cref="Action"/>.</param>
/// <returns>A <see cref="Task"/></returns>
/// <remarks>
/// Exceptions that occur in the <paramref name="action"/> need to be handled in the action itself. These exceptions will not be
/// bubbled up to the periodic task.
/// </remarks>
public static Task Start(Action action,
int intervalInMilliseconds = Timeout.Infinite,
int delayInMilliseconds = 0,
int duration = Timeout.Infinite,
int maxIterations = -1,
bool synchronous = false,
CancellationToken cancelToken = new CancellationToken(),
TaskCreationOptions periodicTaskCreationOptions = TaskCreationOptions.None)
{
//Console.WriteLine(DateTime.Now + " PeriodicTaskFactory.Start");
Stopwatch stopWatch = new Stopwatch();
Action wrapperAction = () =>
{
CheckIfCancelled(cancelToken);
action();
};
Action mainAction = () =>
{
MainPeriodicTaskAction(intervalInMilliseconds, delayInMilliseconds, duration, maxIterations, cancelToken, stopWatch, synchronous, wrapperAction, periodicTaskCreationOptions);
};
return Task.Factory.StartNew(mainAction, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
/// <summary>
/// Mains the periodic task action.
/// </summary>
/// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
/// <param name="delayInMilliseconds">The delay in milliseconds.</param>
/// <param name="duration">The duration.</param>
/// <param name="maxIterations">The max iterations.</param>
/// <param name="cancelToken">The cancel token.</param>
/// <param name="stopWatch">The stop watch.</param>
/// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
/// is included in the total duration of the Task.</param>
/// <param name="wrapperAction">The wrapper action.</param>
/// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create a sub task for executing the <see cref="Action"/>.</param>
private static void MainPeriodicTaskAction(int intervalInMilliseconds,
int delayInMilliseconds,
int duration,
int maxIterations,
CancellationToken cancelToken,
Stopwatch stopWatch,
bool synchronous,
Action wrapperAction,
TaskCreationOptions periodicTaskCreationOptions)
{
var iters = duration / intervalInMilliseconds;
if (iters > 0)
{
maxIterations = iters;
}
TaskCreationOptions subTaskCreationOptions = TaskCreationOptions.AttachedToParent | periodicTaskCreationOptions;
CheckIfCancelled(cancelToken);
if (delayInMilliseconds > 0)
{
Thread.Sleep(delayInMilliseconds);
}
if (maxIterations == 0) { return; }
int iteration = 0;
////////////////////////////////////////////////////////////////////////////
// using a ManualResetEventSlim as it is more efficient in small intervals.
// In the case where longer intervals are used, it will automatically use
// a standard WaitHandle....
// see http://msdn.microsoft.com/en-us/library/vstudio/5hbefs30(v=vs.100).aspx
using (ManualResetEventSlim periodResetEvent = new ManualResetEventSlim(false))
{
////////////////////////////////////////////////////////////
// Main periodic logic. Basically loop through this block
// executing the action
while (true)
{
CheckIfCancelled(cancelToken);
Task subTask = Task.Factory.StartNew(wrapperAction, cancelToken, subTaskCreationOptions, TaskScheduler.Current);
if (synchronous)
{
stopWatch.Start();
try
{
subTask.Wait(cancelToken);
}
catch { /* do not let an errant subtask to kill the periodic task...*/ }
stopWatch.Stop();
}
// use the same Timeout setting as the System.Threading.Timer, infinite timeout will execute only one iteration.
if (intervalInMilliseconds == Timeout.Infinite) { break; }
iteration++;
if (maxIterations > 0 && iteration >= maxIterations) { break; }
try
{
stopWatch.Start();
periodResetEvent.Wait(intervalInMilliseconds, cancelToken);
stopWatch.Stop();
}
finally
{
periodResetEvent.Reset();
}
CheckIfCancelled(cancelToken);
if (duration > 0 && stopWatch.ElapsedMilliseconds >= duration) { break; }
}
}
}
/// <summary>
/// Checks if cancelled.
/// </summary>
/// <param name="cancelToken">The cancel token.</param>
private static void CheckIfCancelled(CancellationToken cancellationToken)
{
if (cancellationToken == null)
throw new ArgumentNullException("cancellationToken");
cancellationToken.ThrowIfCancellationRequested();
}
}
}
TTL 比较检查 (*) 的输出显示:
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.0020000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:00.9940000 >? 00:00:02
因此,由于这种开销,我取消了很少的更新。什么可能导致这种情况以及如何解决?我的第一个猜测是线程切换费用,这导致在比较中设置一些 Epsilon 并接受它。感谢帮助。