44

在使用 Azure 表存储时,我遇到了巨大的性能瓶颈。我的愿望是将表用作一种缓存,因此一个漫长的过程可能会导致数百到数千行数据。然后可以通过分区键和行键快速查询数据。

查询工作得非常快(仅使用分区和行键时非常快,有点慢,但在搜索特定匹配的属性时仍然可以接受)。

但是,插入和删除行都非常缓慢。

澄清

我想澄清一下,即使插入一批 100 个项目也需要几秒钟。这不仅仅是数千行总吞吐量的问题。当我只插入 100 时,它会影响我。

这是我对表进行批量插入的代码示例:

static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        while ( rowOffset < entities.Count )
        {
            Stopwatch sw = Stopwatch.StartNew();

            var batch = new TableBatchOperation();

            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            foreach ( var row in rows )
                batch.Insert( row );

            // submit
            await table.ExecuteBatchAsync( batch );

            rowOffset += rows.Count;

            Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "g" ) );
        }
    }

我正在使用批处理操作,这是调试输出的一个示例:

Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : b08a07da-fceb-4bec-af34-3beaa340239b: StringToSign = POST..multipart/mixed; boundary=batch_6d86d34c-5e0e-4c0c-8135-f9788ae41748.Tue, 30 Jul 2013 18:48:38 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch insert 100 rows: 0:00:00.9351871

如您所见,此示例插入 100 行需要将近 1 秒的时间。在我的开发机器(3.4 Ghz 四核)上,平均时间似乎约为 0.8 秒。

这似乎很荒谬。

以下是批量删除操作的示例:

Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: StringToSign = POST..multipart/mixed; boundary=batch_7e3d229f-f8ac-4aa0-8ce9-ed00cb0ba321.Tue, 30 Jul 2013 18:47:41 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch delete 100 rows: 0:00:00.6524402

持续超过 0.5 秒。

我也将此部署到 Azure(小型实例),并记录了 20 分钟插入 28000 行的时间。

我目前正在使用 2.1 RC 版本的存储客户端库:MSDN 博客

我一定做错了什么。有什么想法吗?

更新

我已经尝试了并行性与整体速度提高的净效果(以及 8 个最大化逻辑处理器),但在我的开发机器上每秒仍然只有 150 行插入。

我可以说没有比这更好的了,当部署到 Azure(小实例)时可能更糟。

我增加了线程池,并按照这个建议增加了我的 WebRole 的最大 HTTP 连接数。

我仍然觉得我错过了一些基本的东西,将我的插入/删除限制在 150 ROPS。

更新 2

在分析了部署到 Azure 的小实例中的一些诊断日志(使用 2.1 RC 存储客户端中内置的新日志)之后,我有了更多信息。

批量插入的第一个存储客户端日志在635109046781264034滴答声中:

caf06fca-1857-4875-9923-98979d850df3: Starting synchronous request to https://?.table.core.windows.net/.; TraceSource 'Microsoft.WindowsAzure.Storage' event

635109046810104314然后差不多 3 秒后,我在滴答声中看到了这个日志:

caf06fca-1857-4875-9923-98979d850df3: Preparing to write request data.; TraceSource 'Microsoft.WindowsAzure.Storage' event

然后还有几条日志,它们总共占用 0.15 秒,最后以这个在结束635109046811645418插入的滴答声结束:

caf06fca-1857-4875-9923-98979d850df3: Operation completed successfully.; TraceSource 'Microsoft.WindowsAzure.Storage' event

我不确定该怎么做,但在我检查的批量插入日志中它是非常一致的。

更新 3

这是用于并行批量插入的代码。在此代码中,仅用于测试,我确保将每批 100 个插入到唯一的分区中。

static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
                {
                    Stopwatch sw = Stopwatch.StartNew();

                    var batch = new TableBatchOperation();

                    foreach ( var row in rows )
                    {
                        row.PartitionKey = row.PartitionKey + partition;
                        batch.InsertOrReplace( row );
                    }

                    // submit
                    table.ExecuteBatch( batch );

                    Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "F2" ) );
                } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

如上所述,这确实有助于提高插入数千行的总时间,但每批 100 行仍然需要几秒钟。

更新 4

所以我创建了一个全新的 Azure 云服务项目,使用 VS2012.2,将 Web 角色作为单页模板(其中包含 TODO 示例的新模板)。

这是开箱即用的,没有新的 NuGet 包或任何东西。它默认使用 Storage 客户端库 v2,以及 EDM 和关联库 v5.2。

我只是将 HomeController 代码修改为以下(使用一些随机数据来模拟我想要存储在真实应用程序中的列):

public ActionResult Index( string returnUrl )
    {
        ViewBag.ReturnUrl = returnUrl;

        Task.Factory.StartNew( () =>
            {
                TableTest();
            } );

        return View();
    }

    static Random random = new Random();
    static double RandomDouble( double maxValue )
    {
        // the Random class is not thread safe!
        lock ( random ) return random.NextDouble() * maxValue;
    }

    void TableTest()
    {
        // Retrieve storage account from connection-string
        CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
            CloudConfigurationManager.GetSetting( "CloudStorageConnectionString" ) );

        // create the table client
        CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

        // retrieve the table
        CloudTable table = tableClient.GetTableReference( "test" );

        // create it if it doesn't already exist
        if ( table.CreateIfNotExists() )
        {
            // the container is new and was just created
            Trace.TraceInformation( "Created table named " + "test" );
        }


        Stopwatch sw = Stopwatch.StartNew();

        // create a bunch of objects
        int count = 28000;
        List<DynamicTableEntity> entities = new List<DynamicTableEntity>( count );

        for ( int i = 0; i < count; i++ )
        {
            var row = new DynamicTableEntity()
            {
                PartitionKey = "filename.txt",
                RowKey = string.Format( "$item{0:D10}", i ),
            };

            row.Properties.Add( "Name", EntityProperty.GeneratePropertyForString( i.ToString() ) );
            row.Properties.Add( "Data", EntityProperty.GeneratePropertyForString( string.Format( "data{0}", i ) ) );
            row.Properties.Add( "Value1", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value2", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value3", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );
            row.Properties.Add( "Value4", EntityProperty.GeneratePropertyForDouble( RandomDouble( 90 ) ) );
            row.Properties.Add( "Value5", EntityProperty.GeneratePropertyForDouble( RandomDouble( 180 ) ) );
            row.Properties.Add( "Value6", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );

            entities.Add( row );
        }

        Trace.TraceInformation( "Elapsed time to create record rows: " + sw.Elapsed.ToString() );

        sw = Stopwatch.StartNew();

        Trace.TraceInformation( "Inserting rows" );

        // batch our inserts (100 max)
        BatchInsert( table, entities ).Wait();

        Trace.TraceInformation( "Successfully inserted " + entities.Count + " rows into table " + table.Name );
        Trace.TraceInformation( "Elapsed time: " + sw.Elapsed.ToString() );

        Trace.TraceInformation( "Done" );
    }


            static async Task BatchInsert( CloudTable table, List<DynamicTableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
            {
                var batch = new TableBatchOperation();

                foreach ( var row in rows )
                {
                    row.PartitionKey = row.PartitionKey + partition;
                    batch.InsertOrReplace( row );
                }

                // submit
                table.ExecuteBatch( batch );

                Trace.TraceInformation( "Inserted batch for partition " + partition );
            } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

这是我得到的输出:

iisexpress.exe Information: 0 : Elapsed time to create record rows: 00:00:00.0719448
iisexpress.exe Information: 0 : Inserting rows
iisexpress.exe Information: 0 : Inserted batch for partition $100
...
iisexpress.exe Information: 0 : Successfully inserted 28000 rows into table test
iisexpress.exe Information: 0 : Elapsed time: 00:01:07.1398928

这比我的其他应用程序快一点,超过 460 ROPS。这仍然是不可接受的。在这个测试中,我的 CPU(8 个逻辑处理器)几乎用尽了,磁盘访问几乎是空闲的。

我不知道出了什么问题。

更新 5

一轮又一轮的摆弄和调整已经产生了一些改进,但我无法比 500-700(ish)ROPS 执行批量 InsertOrReplace 操作(批量 100 个)快得多。

此测试是在 Azure 云中使用一个(或两个)小实例完成的。根据下面的评论,我接受了本地测试充其量是缓慢的事实。

这里有几个例子。每个示例都是它自己的 PartitionKey:

Successfully inserted 904 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:01.3401031; TraceSource 'w3wp.exe' event

Successfully inserted 4130 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:07.3522871; TraceSource 'w3wp.exe' event

Successfully inserted 28020 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:51.9319217; TraceSource 'w3wp.exe' event

也许是我的 MSDN Azure 帐户有一些性能上限?我不知道。

在这一点上,我想我已经完成了。也许它的速度足以满足我的目的,或者我会走一条不同的路。

结论

下面的所有答案都很好!

对于我的具体问题,我已经能够在小型 Azure 实例上看到高达 2k ROPS 的速度,通常在 1k 左右。由于我需要降低成本(从而降低实例大小),这定义了我将能够使用表的用途。

感谢大家的帮助。

4

4 回答 4

14

基本概念 - 使用并行来加速。

第 1 步 - 给你的线程池足够的线程来解决这个问题 - ThreadPool.SetMinThreads(1024, 256);

第 2 步 - 使用分区。我使用 guid 作为 Id,我使用最后一个 to 字符分成 256 个唯一的分区(实际上我将它们分组为 N 个子集,在我的情况下为 48 个分区)

第 3 步 - 使用任务插入,我对表引用使用对象池

public List<T> InsertOrUpdate(List<T> items)
        {
            var subLists = SplitIntoPartitionedSublists(items);

            var tasks = new List<Task>();

            foreach (var subList in subLists)
            {
                List<T> list = subList;
                var task = Task.Factory.StartNew(() =>
                    {
                        var batchOp = new TableBatchOperation();
                        var tableRef = GetTableRef();

                        foreach (var item in list)
                        {
                            batchOp.Add(TableOperation.InsertOrReplace(item));
                        }

                        tableRef.ExecuteBatch(batchOp);
                        ReleaseTableRef(tableRef);
                    });
                tasks.Add(task);
            }

            Task.WaitAll(tasks.ToArray());

            return items;
        }

private IEnumerable<List<T>> SplitIntoPartitionedSublists(IEnumerable<T> items)
        {
            var itemsByPartion = new Dictionary<string, List<T>>();

            //split items into partitions
            foreach (var item in items)
            {
                var partition = GetPartition(item);
                if (itemsByPartion.ContainsKey(partition) == false)
                {
                    itemsByPartion[partition] = new List<T>();
                }
                item.PartitionKey = partition;
                item.ETag = "*";
                itemsByPartion[partition].Add(item);
            }

            //split into subsets
            var subLists = new List<List<T>>();
            foreach (var partition in itemsByPartion.Keys)
            {
                var partitionItems = itemsByPartion[partition];
                for (int i = 0; i < partitionItems.Count; i += MaxBatch)
                {
                    subLists.Add(partitionItems.Skip(i).Take(MaxBatch).ToList());
                }
            }

            return subLists;
        }

        private void BuildPartitionIndentifiers(int partitonCount)
        {
            var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
            var keys = new List<string>();

            for (int i = 0; i < chars.Count; i++)
            {
                var keyA = chars[i];
                for (int j = 0; j < chars.Count; j++)
                {
                    var keyB = chars[j];
                    keys.Add(string.Concat(keyA, keyB));
                }
            }


            var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
            var keySets = new List<List<string>>();

            if (partitonCount > keys.Count)
            {
                partitonCount = keys.Count;
            }

            //Build the key sets
            var index = 0;
            while (index < keys.Count)
            {
                var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
                keySets.Add(keysSet);
                index += keySetMaxSize;
            }

            //build the lookups and datatable for each key set
            _partitions = new List<string>();
            for (int i = 0; i < keySets.Count; i++)
            {
                var partitionName = String.Concat("subSet_", i);
                foreach (var key in keySets[i])
                {
                    _partitionByKey[key] = partitionName;
                }
                _partitions.Add(partitionName);
            }

        }

        private string GetPartition(T item)
        {
            var partKey = item.Id.ToString().Substring(34,2);
            return _partitionByKey[partKey];
        }

        private string GetPartition(Guid id)
        {
            var partKey = id.ToString().Substring(34, 2);
            return _partitionByKey[partKey];
        }

        private CloudTable GetTableRef()
        {
            CloudTable tableRef = null;
            //try to pop a table ref out of the stack
            var foundTableRefInStack = _tableRefs.TryPop(out tableRef);
            if (foundTableRefInStack == false)
            {
                //no table ref available must create a new one
                var client = _account.CreateCloudTableClient();
                client.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(1), 4);
                tableRef = client.GetTableReference(_sTableName);
            }

            //ensure table is created
            if (_bTableCreated != true)
            {
                tableRef.CreateIfNotExists();
                _bTableCreated = true;
            }

            return tableRef;
        }

结果 - 19-22kops 存储帐户最大值

如果您对完整来源感兴趣,请联系我

需要呻吟吗?使用多个存储帐户!

这是几个月的反复试验,测试,我的头撞在桌子上的结果。我真的希望它有所帮助。

于 2013-07-31T01:37:04.880 回答
12

好的,第三个答案是魅力吗?

http://blogs.msdn.com/b/windowsazurestorage/archive/2010/11/06/how-to-get-most-out-of-windows-azure-tables.aspx

一些东西 - 存储模拟器 - 来自一个认真研究过它的朋友。

“一切都是针对单个数据库中的单个表(更多分区不会影响任何事情)。每个表插入操作至少是 3 个 sql 操作。每个批次都在一个事务中。根据事务隔离级别,这些批次将受到限制并行执行的能力。

由于 sql server 行为,串行批处理应该比单个插入更快。(单个插入本质上是每次刷新到磁盘的小事务,而真正的事务作为一个组刷新到磁盘)。”

使用多个分区的 IE 不会影响模拟器的性能,而它会影响真正的天蓝色存储。

还启用日志记录并稍微检查您的日志 - c:\users\username\appdata\local\developmentstorage

批量大小 100 似乎提供了最好的实际性能,关闭 naggle,关闭期望 100,加强连接限制。

还要确保您没有意外插入重复项,这将导致错误并减慢一切。

并针对真实存储进行测试。那里有一个相当不错的库可以为您处理大部分内容 - http://www.nuget.org/packages/WindowsAzure.StorageExtensions/,只需确保您实际上在添加时调用 ToList ,例如它不会真的执行直到枚举。该库还使用动态表实体,因此序列化的性能很小,但它确实允许您使用没有 TableEntity 东西的纯 POCO 对象。

~ JT

于 2013-08-06T17:58:59.743 回答
7

在经历了许多痛苦、实验之后,终于能够在 Azure 表存储中获得单表分区的最佳吞吐量(每秒 2,000+ 批写入操作)和存储帐户更好的吞吐量(每秒 3,500+ 批写入操作)。我尝试了所有不同的方法,但以编程方式设置 .net 连接限制(我尝试了配置示例,但对我不起作用)解决了问题(基于Microsoft 提供的白皮书),如下所示:

ServicePoint tableServicePoint = ServicePointManager
    .FindServicePoint(_StorageAccount.TableEndpoint);

//This is a notorious issue that has affected many developers. By default, the value 
//for the number of .NET HTTP connections is 2.
//This implies that only 2 concurrent connections can be maintained. This manifests itself
//as "underlying connection was closed..." when the number of concurrent requests is
//greater than 2.

tableServicePoint.ConnectionLimit = 1000;

每个存储帐户获得 20K+ 批量写入操作的其他任何人,请分享您的经验。

于 2013-10-18T11:56:26.607 回答
5

为了更有趣,这里有一个新的答案——独立的独立测试,它为生产中的写入性能提供了一些惊人的数字,并且在避免 IO 阻塞和连接管理方面做得更好。我很想看看这对你有什么作用,因为我们的写入速度很可笑(> 7kps)。

网络配置

 <system.net>
    <connectionManagement>
      <add address="*" maxconnection="48"/>
    </connectionManagement>
  </system.net>

对于测试,我使用基于卷的参数,所以像 25000 个项目,24 个分区,100 的批大小似乎总是最好的,参考计数为 20。这是使用 TPL 数据流(http://www.nuget.org /packages/Microsoft.Tpl.Dataflow/ ) 用于 BufflerBlock,它提供了一个很好的等待线程安全表引用拉取。

public class DyanmicBulkInsertTestPooledRefsAndAsynch : WebTest, IDynamicWebTest
{
    private int _itemCount;
    private int _partitionCount;
    private int _batchSize;
    private List<TestTableEntity> _items;
    private GuidIdPartitionSplitter<TestTableEntity> _partitionSplitter;
    private string _tableName;
    private CloudStorageAccount _account;
    private CloudTableClient _tableClient;
    private Dictionary<string, List<TestTableEntity>> _itemsByParition;
    private int _maxRefCount;
    private BufferBlock<CloudTable> _tableRefs;


    public DyanmicBulkInsertTestPooledRefsAndAsynch()
    {
        Properties = new List<ItemProp>();    
        Properties.Add(new ItemProp("ItemCount", typeof(int)));
        Properties.Add(new ItemProp("PartitionCount", typeof(int)));
        Properties.Add(new ItemProp("BatchSize", typeof(int)));
        Properties.Add(new ItemProp("MaxRefs", typeof(int)));


    }

    public List<ItemProp> Properties { get; set; }

    public void SetProps(Dictionary<string, object> propValuesByPropName)
    {
        _itemCount = (int)propValuesByPropName["ItemCount"];
        _partitionCount = (int)propValuesByPropName["PartitionCount"];
        _batchSize = (int)propValuesByPropName["BatchSize"];
        _maxRefCount = (int)propValuesByPropName["MaxRefs"];
    }

    protected override void SetupTest()
    {
        base.SetupTest();

        ThreadPool.SetMinThreads(1024, 256);
        ServicePointManager.DefaultConnectionLimit = 256;
        ServicePointManager.UseNagleAlgorithm = false;
        ServicePointManager.Expect100Continue = false;


        _account = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("DataConnectionString"));
        _tableClient = _account.CreateCloudTableClient();
        _tableName = "testtable" + new Random().Next(100000);

        //create the refs
        _tableRefs = new BufferBlock<CloudTable>();
        for (int i = 0; i < _maxRefCount; i++)
        {
            _tableRefs.Post(_tableClient.GetTableReference(_tableName));
        }

        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;

        tableRef.CreateIfNotExists();
        ReleaseRef(tableRef);

        _items = TestUtils.GenerateTableItems(_itemCount);
        _partitionSplitter = new GuidIdPartitionSplitter<TestTableEntity>();
        _partitionSplitter.BuildPartitions(_partitionCount);

        _items.ForEach(o =>
            {
                o.ETag = "*";
                o.Timestamp = DateTime.Now;
                o.PartitionKey = _partitionSplitter.GetPartition(o);
            });

        _itemsByParition = _partitionSplitter.SplitIntoPartitionedSublists(_items);
    }

    private async Task<CloudTable> GetTableRef()
    {
        return await _tableRefs.ReceiveAsync();            
    }

    private void ReleaseRef(CloudTable tableRef)
    {
        _tableRefs.Post(tableRef);
    }

    protected override void ExecuteTest()
    {
        Task.WaitAll(_itemsByParition.Keys.Select(parition => Task.Factory.StartNew(() => InsertParitionItems(_itemsByParition[parition]))).ToArray());
    }

    private void InsertParitionItems(List<TestTableEntity> items)
    {

        var tasks = new List<Task>();

        for (int i = 0; i < items.Count; i += _batchSize)
        {
            int i1 = i;

            var task = Task.Factory.StartNew(async () =>
            {
                var batchItems = items.Skip(i1).Take(_batchSize).ToList();

                if (batchItems.Select(o => o.PartitionKey).Distinct().Count() > 1)
                {
                    throw new Exception("Multiple partitions batch");
                }

                var batchOp = new TableBatchOperation();
                batchItems.ForEach(batchOp.InsertOrReplace);   

                var tableRef = GetTableRef.Result();
                tableRef.ExecuteBatch(batchOp);
                ReleaseRef(tableRef);
            });

            tasks.Add(task);

        }

        Task.WaitAll(tasks.ToArray());


    }

    protected override void CleanupTest()
    {
        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;
        tableRef.DeleteIfExists();
        ReleaseRef(tableRef);
    }

我们目前正在开发一个可以处理多个存储帐户的版本,以期获得一些疯狂的速度。此外,我们在 8 核虚拟机上为大型数据集运行这些,但使用新的非阻塞 IO,它应该在有限的 vm 上运行良好。祝你好运!

 public class SimpleGuidIdPartitionSplitter<T> where T : IUniqueId
{
    private ConcurrentDictionary<string, string> _partitionByKey = new ConcurrentDictionary<string, string>();
    private List<string> _partitions;
    private bool _bPartitionsBuilt;

    public SimpleGuidIdPartitionSplitter()
    {

    }

    public void BuildPartitions(int iPartCount)
    {
        BuildPartitionIndentifiers(iPartCount);
    }

    public string GetPartition(T item)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = item.Id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    public string GetPartition(Guid id)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    #region Helpers
    private void BuildPartitionIndentifiers(int partitonCount)
    {
        var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
        var keys = new List<string>();

        for (int i = 0; i < chars.Count; i++)
        {
            var keyA = chars[i];
            for (int j = 0; j < chars.Count; j++)
            {
                var keyB = chars[j];
                keys.Add(string.Concat(keyA, keyB));
            }
        }


        var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
        var keySets = new List<List<string>>();

        if (partitonCount > keys.Count)
        {
            partitonCount = keys.Count;
        }

        //Build the key sets
        var index = 0;
        while (index < keys.Count)
        {
            var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
            keySets.Add(keysSet);
            index += keySetMaxSize;
        }

        //build the lookups and datatable for each key set
        _partitions = new List<string>();
        for (int i = 0; i < keySets.Count; i++)
        {
            var partitionName = String.Concat("subSet_", i);
            foreach (var key in keySets[i])
            {
                _partitionByKey[key] = partitionName;
            }
            _partitions.Add(partitionName);
        }

        _bPartitionsBuilt = true;
    }
    #endregion
}



internal static List<TestTableEntity> GenerateTableItems(int count)
        {
            var items = new List<TestTableEntity>();
            var random = new Random();

            for (int i = 0; i < count; i++)
            {
                var itemId = Guid.NewGuid();

                items.Add(new TestTableEntity()
                {
                    Id = itemId,
                    TestGuid = Guid.NewGuid(),
                    RowKey = itemId.ToString(),
                    TestBool = true,
                    TestDateTime = DateTime.Now,
                    TestDouble = random.Next() * 1000000,
                    TestInt = random.Next(10000),
                    TestString = Guid.NewGuid().ToString(),
                });
            }

            var dupRowKeys = items.GroupBy(o => o.RowKey).Where(o => o.Count() > 1).Select(o => o.Key).ToList();
            if (dupRowKeys.Count > 0)
            {
                throw  new Exception("Dupicate Row Keys");
            }

            return items;
        }

还有一件事-您的时间安排以及框架受到的影响指向此http://blogs.msdn.com/b/windowsazurestorage/archive/2013/08/08/net-clients-encountering-port-exhaustion-after-安装-kb2750149-或-kb2805227.aspx

于 2013-08-08T19:12:56.307 回答