2

我正在尝试从 BlockingCollection 支持的方法返回 IEnumerable。代码模式是:

public IEnumerable<T> Execute() {   
    var results = new BlockingCollection<T>(10);  
    _ExecuteLoad(results);   
    return results.GetConsumingEnumerable(); 
}

private void _ExecuteLoad<T>(BlockingCollection<T> results) {
    var loadTask = Task.Factory.StartNew(() =>
    { 
        //some async code that adds items to results
        results.CompleteAdding();
    });
}

public void Consumer() {
    var count = Execute().Count();
}

问题是从 Execute() 返回的可枚举总是空的。我见过的所有示例都在任务中迭代 BlockingCollection。在这种情况下,这似乎行不通。

有谁知道我哪里出错了?


为了让事情更清楚一点,我粘贴了我正在执行以填充集合的代码。也许这里有什么导致问题的原因?

Task.Factory.StartNew(() =>
{
    var continuationRowKey = "";
    var continuationParitionKey = "";
    var action = HttpMethod.Get;
    var queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout);
    while (true)
    {
        using (var request = GetRequest(queryUri, null, action.Method, azureAccountName, azureAccountKey))
        {
            request.Method = action;
            request.RequestUri = queryUri;

            using (var client = new HttpClient())
            {
                var sendTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
                using (var response = sendTask.Result)
                {
                    continuationParitionKey = // stuff from headers
                    continuationRowKey = // stuff from headers

                    var streamTask = response.Content.ReadAsStreamAsync();
                    using (var stream = streamTask.Result)
                    {
                        using (var reader = XmlReader.Create(stream))
                        {
                            while (reader.Read())
                            {
                                if (reader.NodeType == XmlNodeType.Element && reader.Name == "entry" && reader.NamespaceURI == "http://www.w3.org/2005/Atom")
                                {
                                    results.Add(XNode.ReadFrom(reader) as XElement);
                                }
                            }
                            reader.Close();
                        }
                    }
                }
            }

            if (continuationParitionKey == null && continuationRowKey == null)
                break;

            queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout);
        }
    }
    results.CompleteAdding();
});
4

1 回答 1

3

results.CompleteAdding()将项目添加到集合后,您需要调用。

如果不这样做,枚举将永远不会结束,Count()也永远不会返回。

除此之外,您发布的代码是正确的。

于 2012-02-22T05:03:10.817 回答