0

我正在使用 RavenDB 来保存数千个文档。数据来自每日 xml 提要,我将通过运行 C# 控制台应用程序对其进行更新。下面是处理提要以使数据库与任何更改保持同步的代码。我对此有很多问题,所以我想知道我是否选择了错误的策略。以下是一些需要注意的重要事项。

  1. 新项目可能已添加到提要中,现有项目可能已更改,因此每次运行时,我都想添加或更新文档,具体取决于它是否是新的。
  2. xml 提要不包含对我的 RavenDB ID 的任何引用,仅包含每个项目的内部密钥。因此,在检索要更新的现有文档时,我只能通过检查文档上的“SourceID”属性来做到这一点。
  3. 我使用“take”一次只能处理 500 个文档,部分原因是我的数据库限制为 1000 个文档,部分原因是没有 Take() 我似乎只能检索 128 个文档。
  4. 就目前而言,此代码因“在一个会话中不能进行超过 30 次更新”错误而失败,我认为是因为每次我尝试从 dbItems 检索现有记录时,它实际上都会再次访问数据库。
  5. 我可以通过在项目上调用 ToList() 来解决上面 (4) 的问题,但是如果我这样做,则在我调用 session.SaveChanges() 时现有项目不会得到更新(我想象这就像一个断开连接的记录集) .

谁能给我一些指示?

        public void ProcessFeed(string rawXml)
        {
            XDocument doc = XDocument.Parse(rawXml);
            var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId).Take(500);
            using (IDocumentSession session = _store.OpenSession())
            {
                var dbItems = session.Query<AccItem>().OrderBy(x => x.SourceId).Take(500);
                foreach (var item in items)
                {
                    var existingRecord = dbItems.SingleOrDefault(x => x.SourceId == item.SourceId);
                    if (existingRecord == null)
                    {
                        session.Store(item);
                        _logger.Info("Saved new item {0}.", item.ShortName);
                    }
                    else
                    {
                        // update just one field for now
                        existingRecord.Village = item.Village;
                        _logger.Info("Updated item {0}.", item.ShortName);
                    }
                }
                session.SaveChanges();
            }            
        }
4

1 回答 1

0

下面是我最终得到的代码。我认为原始版本的最初问题只是我试图对每个项目使用相同的会话,打破了 30 个限制。

在 TekPub 截屏视频中,屏幕上的一些代码提示我通过将整个过程批处理成 15 个组(以允许一次读取和一次写入,因此每批总共 30 个请求)来解决此问题。这很慢,但并不像我预期的那样慢。我预计一次可能有 10,000 条记录,所以我会一直等到它完成。

public void ProcessFeed(string rawXml)
{
    XDocument doc = XDocument.Parse(rawXml);
    var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId);
    int numberOfItems = items.Count;
    int batchSize = 15;
    int numberOfBatchesRequired = numberOfItems / batchSize;
    int numberOfBatchesProcessed = 0;
    int numberOfItemsInLastBatch = numberOfItems - (numberOfBatchesRequired * batchSize); 
    for (var i = 0;i <= numberOfBatchesRequired;i++)
    {
        using (IDocumentSession session = _store.OpenSession())
        {
            var numberOfItemsProcessedSoFar = numberOfBatchesProcessed * batchSize;
            var numberOfItemsRemaining = numberOfItems - numberOfItemsProcessedSoFar;
            int itemsToTake = 15;
            if (numberOfItemsRemaining > 0 && numberOfItemsRemaining < 15)
            itemsToTake = numberOfItemsRemaining;
            foreach (var item in items.Skip(numberOfItemsProcessedSoFar).Take(itemsToTake))
            {
            var existingRecords = session.Query<AccItem>().Where(x => x.SourceId == item.SourceId).ToList();
            if (!existingRecords.Any())
            {
                session.Store(item);
                _logger.Info("Saved new item {0}.", item.ShortName);
            }
            else
            {
                if (existingRecords.Count() > 1)
                _logger.Warn("There's more than one item in the database with the sourceid {0}", item.SourceId);
                existingRecords.First().Village = item.Village;
                _logger.Info("Updated item {0}.", item.ShortName);
            }
            session.SaveChanges();
            }
        }            
        numberOfBatchesProcessed++;
    }
}
于 2012-06-28T09:31:40.777 回答