我有以下扩展方法,我想将状态传递给:
// Overload 2
public static Task<TableQuerySegment<T>> ExecuteQuerySegmentedAsync<T> (this CloudTable tbl, TableQuery<T> query, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new()
{
return tbl.ExecuteQuerySegmentedAsync<T>(query, continuationToken, null, null, token);
}
// Overload 5
public static Task<TableQuerySegment<TElement>> ExecuteQuerySegmentedAsync<TElement>(this CloudTable tbl, TableQuery<TElement> query, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new()
{
ICancellableAsyncResult result = null;
if (opt == null && ctx == null)
result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, null, tbl);
else
result = tbl.BeginExecuteQuerySegmented<TElement>(query, continuationToken, opt, ctx, null, tbl);
var cancellationRegistration = token.Register(result.Cancel);
return Task.Factory.FromAsync(result, iAsyncResult =>
{
CloudTable currentTable = iAsyncResult.AsyncState as CloudTable;
cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration)
return currentTable.EndExecuteQuerySegmented<TElement>(result);
});
}
// Overload 3
public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<T, R>(this CloudTable tbl, TableQuery<T> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, CancellationToken token) where T : ITableEntity, new()
{
return tbl.ExecuteQuerySegmentedAsync<T, R>(query, resolver, continuationToken, null, null, token);
}
// Overload 6
public static Task<TableQuerySegment<R>> ExecuteQuerySegmentedAsync<TElement, R>(this CloudTable tbl, TableQuery<TElement> query, EntityResolver<R> resolver, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx, CancellationToken token) where TElement : ITableEntity, new()
{
ICancellableAsyncResult result = null;
if (opt == null && ctx == null)
result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, null, null);
else
result = tbl.BeginExecuteQuerySegmented<TElement, R>(query, resolver, continuationToken, opt, ctx, null, null);
var cancellationRegistration = token.Register(result.Cancel);
return Task.Factory.FromAsync(result, iAsyncResult =>
{
CloudTable currentTable = iAsyncResult.AsyncState as CloudTable;
cancellationRegistration.Dispose(); // todo: handle cleanup of this (deregistration)
return currentTable.EndExecuteQuerySegmented<R>(result);
});
}
所以我可以这样调用代码
// prepare the query
trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken);
// my state
object mystate = trustsInBatchesOf100.AsyncState;
我一直试图弄清楚我应该如何修改扩展方法以包含stateObject
. 困难在于我在TaskFactory.FromAsync中找到正确的重载并在扩展方法中正确实现它。
问题
如何修改上述扩展方法以正确接受状态参数?
更多源代码
private CloudTableClient tableClient;
private CloudStorageAccount account;
private TableRequestOptions opt;
List<Task<TableResult>> AllRunningTasks = null;
public async Task GetTrustsAndValues(string thingToSearchFor,string trustStartingPoint,
int depth,
TableQuery query,
OperationContext ctx, CancellationToken cancelToken)
{
TrustState asyncState = new TrustState() { ThingToSearchFor = thingToSearchFor, TimeStarted = DateTime.UtcNow };
var ret = new TrustTree<string>(thingToSearchFor, "start", 10);
CloudTableClient client = account.CreateCloudTableClient();
CloudTable tableSymmetricKeys = client.GetTableReference("SymmetricKeys5");
// List<Task> taskList = new List<Task>();
TableContinuationToken token = new TableContinuationToken() { };
Task<TableQuerySegment<DynamicTableEntity>> trustsInBatchesOf100 = null;
while (true)
{
// prepare the query
trustsInBatchesOf100 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken);
object mystate = trustsInBatchesOf100.AsyncState;
// Run the method
trustsInBatchesOf100.Wait();
// Create tasks for each of the 100 domains found.
IEnumerable<Task<TableResult>> getTrustDataQuery =
from domainData in trustsInBatchesOf100.Result select QueryPartnerForData(ref ret,domainData, thingToSearchFor, client, cancelToken);
// Save the count of lookups as appropriate
asyncState.RunningDirectQueries = getTrustDataQuery.Count();
// Run up to 100 lookups on those domains.
var runningTasks = getTrustDataQuery.ToList();
// Add running tasks to the full compilation of running tasks
AllRunningTasks.AddRange(runningTasks );
// Prepare for next iteration or quit
if (token == null)
{
break;
}
else
{
token = trustsInBatchesOf100.Result.ContinuationToken;
// todo: persist token token.WriteXml()
}
}
}
private static Task<TableResult> QueryPartnerForData(ref TrustTree<string> tree, DynamicTableEntity domainData, string thingToSearchFor, CloudTableClient client, CancellationToken cancelToken)
{
// Create the table client.
CloudTable tableDirectQuery = client.GetTableReference(String.Format("{0}_FW", domainData.RowKey));
// Retrieve the entity with partition key of "Smith" and row key of "Jeff"
TableOperation tableOperation = TableOperation.Retrieve(domainData.RowKey, thingToSearchFor);
// Retrieve entity asynchronously
return tableDirectQuery.ExecuteAsync(tableOperation, cancelToken);
}
public async void SaveCompletedQueriesTo()
{
while (AllRunningTasks.Count > 0)
{
// Identify the first task that completes.
Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningTasks);
// ***Remove the selected task from the list so that you don't
// process it more than once.
AllRunningTasks.Remove(firstFinishedTask);
// Await the completed task.
var taskOfTableResult = await firstFinishedTask;
//todo: asyncState: need to know what the
// - target to save to is.
}
// TODO: if all tasks have completed then update
}