1

很抱歉提出抽象问题,但我正在寻找一些关于应用程序类型的示例/建议/文章,这些应用程序在循环中执行一些等效操作,并且循环的每次迭代都应在特定时间段(例如 10 秒)内公开其结果.

我的应用程序在外部 WCF 服务和本地数据库之间同步数据。在每次迭代中,应用程序都会检索向 WCF 服务传递请求的数据更改并将更改放入数据库,反之亦然。此应用程序最严格的要求之一是迭代应该每十秒触发一次。

所以这里出现了问题。我如何保证迭代不会超过 10 秒?

我猜这种类型的应用程序称为实时应用程序(以实时操作系统的方式)。

我们使用的 DAL 组件随机作用于连接超时行为。因此数据库操作可能需要超过 10 秒的时间。

这是一次迭代的估计代码:

        Stopwatch s1 = new Stopwatch();
        s1.Start();
        Parallel.ForEach(Global.config.databases, new ParallelOptions { MaxDegreeOfParallelism = -1 }, (l) =>            
        {
            Console.WriteLine("Started for {0}", l.key.name);                
            DB db = new DB(l.connectionString);

            DateTime lastIterationTS = GetPreviousIterationTS(l.id);

            ExternalService serv = new ExternalService(l.id);
            List<ChangedData> ChangedDataDb = db.GetChangedData(DateTime.Now.AddSeconds((lastIterationTS == DateTime.MinValue) ? -300 : -1 * (DateTime.Now - lastIterationTS).Seconds));

            List<Data> ChangedDataService = serv.GetModifiedData();                

                    Action syncDBChanges = new Action(() =>
                        {
                            // Изменения в БД                                   
                            foreach (ChangedData d in ChangedDataDb)
                            {
                                try
                                {
                                    // ...
                                    // analyzing & syncing
                                }
                                catch (Exception e)
                                {
                                    logger.InfoEx("Exception_SyncDatabase", e.ToString());
                                }
                            }
                        }
                    );

                    Action syncService = new Action(() =>
                    {                            
                        foreach (Data d in ChangedDataService)
                        {
                            try
                            {
                                // ...
                                // analyzing & syncing
                            }
                            catch (Exception e)
                            {
                                logger.InfoEx("Exception_SyncService", e.ToString());
                            }
                        }
                    });

                    List<WaitHandle> handles = new List<WaitHandle>();
                    IAsyncResult ar1 = syncDBChanges.BeginInvoke(syncDBChanges.EndInvoke, null);
                    IAsyncResult ar2 = syncService.BeginInvoke(syncService.EndInvoke, null);

                    handles.Add(ar1.AsyncWaitHandle);
                    handles.Add(ar2.AsyncWaitHandle);

                    WaitHandle.WaitAll(handles.ToArray(), (int)((Global.config.syncModifiedInterval - 1) * 1000));
                    SetCurrentIterationTS(l.id);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                    logger.InfoEx("Exception_Iteration", e.ToString());
                    continue;
                }
            }
            logger.InfoEx("end_Iteration", IterationContextParams);
        }
        );
        s1.Stop();
        Console.WriteLine("Main iteration done for {0}...", s1.Elapsed);        
4

5 回答 5

2

你可以考虑几个选项...

  1. 如果超过 10 秒则终止迭代,并希望下一次迭代能够完成进程。这种方法的问题是很可能没有一个迭代将完成,因此同步过程将永远不会发生。我会推荐以下选项...

  2. 如果迭代时间超过 10 秒,请等待它完成并跳过下一次迭代。这样,您可以确保该过程至少完成一次。以下是一个简化的代码示例,供参考...

    class Updater
    {
        Timer timer = new Timer();
        public object StateLock = new object();
        public string State;
    
        public Updater()
        {
            timer.Elapsed += timer_Elapsed;
            timer.Interval = 10000;
            timer.AutoReset = true;
            timer.Start();
        }
    
        void timer_Elapsed(object sender, ElapsedEventArgs e)
        {
            if (State != "Running")
            {
                Process();
            }
        }
    
        private void Process()
        {
            try
            {
                lock (StateLock)
                {
                    State = "Running";
                }
    
                // Process
    
                lock (StateLock)
                {
                    State = "";
                }
            }
            catch
            {
                throw;
            }
        }
    }
    

...

class Program
{
    static void Main(string[] args)
    {
        Updater updater = new Updater();
        Console.ReadLine();
    }
}
于 2013-09-19T18:21:09.970 回答
1

Quartz.net是一个出色的 .NET 平台调度程序,我认为它可以满足您的需求。

  • 如果你想杀死一个工作,你可以实现IInterruptableJob。您应该能够在 Interupt 方法中添加一些清理代码来处理任何数据库连接。
  • 如果您想完成一项工作,但只有在最后一项完成后才开始另一项工作(我认为这是更好的选择),您可以实现IStatefulJob接口
于 2013-09-22T22:14:04.243 回答
0

也许试试这个。请确保不要在 DoWork() 方法中创建和使用任何新线程。

class DatabaseUpdater
{
    private readonly Timer _timer;
    private List<Thread> _threads;
    private readonly List<DatabaseConfig> _dbConfigs;

    public DatabaseUpdater(int seconds, List<DatabaseConfig> dbConfigs)
    {
        _timer = new Timer(seconds * 1000);
        _timer.Elapsed += TimerElapsed;
        _dbConfigs = dbConfigs;
    }

    public void Start()
    {
        StartThreads();
        _timer.Start();
    }

    public void Stop()
    {
        _timer.Stop();
        StopThreads();
    }

    void TimerElapsed(object sender, ElapsedEventArgs e)
    {
        StopThreads();
        StartThreads();
    }

    private void StartThreads()
    {
        var newThreads = new List<Thread>();
        foreach (var config in _dbConfigs)
        {
            var thread = new Thread(DoWork);
            thread.Start(config);
            newThreads.Add(thread);
        }

        _threads = newThreads;
    }

    private void StopThreads()
    {
        if (_threads == null) return;

        var oldThreads = _threads;
        foreach (var thread in oldThreads)
        {
            thread.Abort();
        }
    }

    static void DoWork(object objConfig)
    {
        var dbConfig = objConfig as DatabaseConfig;
        if (null == dbConfig) return;

        var n = GetRandomNumber();

        try
        {
            ConsoleWriteLine("Sync started for : {0} - {1} sec work.", dbConfig.Id, n);

            // update/sync db
            Thread.Sleep(1000 * n);

            ConsoleWriteLine("Sync finished for : {0} - {1} sec work.", dbConfig.Id, n);
        }
        catch (Exception ex)
        {
            // cancel/rollback db transaction
            ConsoleWriteLine("Sync cancelled for : {0} - {1} sec work.",
                dbConfig.Id, n);
        }
    }

    static readonly Random Random = new Random();

    [MethodImpl(MethodImplOptions.Synchronized)]
    static int GetRandomNumber()
    {
        return Random.Next(5, 20);
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    static void ConsoleWriteLine(string format, params object[] arg)
    {
        Console.WriteLine(format, arg);
    }
}

static void Main(string[] args)
{
    var configs = new List<DatabaseConfig>();
    for (var i = 1; i <= 3; i++)
    {
        configs.Add(new DatabaseConfig() { Id = i });
    }

    var databaseUpdater = new DatabaseUpdater(10, configs);
    databaseUpdater.Start();

    Console.ReadKey();

    databaseUpdater.Stop();
}
于 2013-09-23T01:59:26.950 回答
0

您可能想了解 .Net 中可用的各种 Timer 对象:http: //msdn.microsoft.com/en-us/magazine/cc164015.aspx

我个人喜欢System.Threading.Timer,因为您可以轻松使用 lambda,并且如果您创建单独的回调,它允许传递状态对象。

我还建议使用该System.Threading.Tasks库,因为它允许您在工作完成之前计时器到期的情况下优雅地处理取消。Msdn 示例:http: //msdn.microsoft.com/en-us/library/dd537607.aspx

以下是在 10 分钟计时器中一起使用这些组件的示例: 注意:要使用您的 sql 数据库执行此操作,您需要设置Asynchronous Processing=true;MultipleActiveResultSets=True;

CancellationTokenSource cancelSource = new CancellationTokenSource();
System.Threading.Timer timer = new System.Threading.Timer(callback =>
{
    //start sync
    Task syncTask = Task.Factory.StartNew(syncAction =>
        {
            using (SqlConnection conn = 
                new SqlConnection(
                   ConfigurationManager.ConnectionStrings["db"].ConnectionString))
            {
                conn.Open();
                using (SqlCommand syncCommand = new SqlCommand
                {
                    CommandText = "SELECT getdate() \n WAITFOR DELAY '00:11'; ",
                    CommandTimeout = 600,
                    Transaction = conn.BeginTransaction(),
                    Connection = conn
                })
                {
                    try
                    {
                        IAsyncResult t = syncCommand.BeginExecuteNonQuery();
                        SpinWait.SpinUntil(() => 
                            (t.IsCompleted || cancelSource.Token.IsCancellationRequested));
                        if (cancelSource.Token.IsCancellationRequested && !t.IsCompleted)
                            syncCommand.Transaction.Rollback();

                    }
                    catch (TimeoutException timeoutException)
                    {
                        syncCommand.Transaction.Rollback();
                        //log a failed sync attepmt here
                        Console.WriteLine(timeoutException.ToString());
                    }
                    finally
                    {
                        syncCommand.Connection.Close();
                    }
                }
            }
        }, null, cancelSource.Token);
    //set up a timer for processing in the interim, save some time for rollback
    System.Threading.Timer spinTimer = new System.Threading.Timer(c => {
        cancelSource.Cancel();
    }, null, TimeSpan.FromMinutes(9), TimeSpan.FromSeconds(5)); 

    //spin here until the spintimer elapses;
    //this is optional, but would be useful for debugging.
    SpinWait.SpinUntil(()=>(syncTask.IsCompleted || cancelSource.Token.IsCancellationRequested));
    if (syncTask.IsCompleted || cancelSource.Token.IsCancellationRequested)
        spinTimer.Dispose();

}, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(10));
于 2013-09-22T22:23:01.207 回答
0

我通常将更新周期与实际计时器分开

计时器做了两件事:

1) 如果更新未运行,则启动它。

2)如果服务已经在运行,设置一个标志让它继续运行。

更新周期:

1)设置运行标志

2)做更新

3) 将运行标志设置为 false

4) 如果设置了继续运行,则转到 1)。

于 2013-09-18T11:35:28.170 回答