我得到了解决方案。ToAsyncEnumerable
一个对象具有返回的扩展方法IAsyncEnumerable<object>
。因此我们可以迭代它:
public async Task Process(object source)
{
using (var enumerator = source.ToAsyncEnumerable().GetEnumerator())
{
while (await enumerator.MoveNext())
{
var item = enumerator.Current;
}
}
}
可以创建一个接受IAsyncEnumerable<T>
和实现的包装器IAsyncEnumerable<object>
。Activator
在扩展方法中创建该包装器。这是实现:
public class AsyncEnumerable<T> : IAsyncEnumerable<object>
{
private IAsyncEnumerable<T> _source;
public AsyncEnumerable(IAsyncEnumerable<T> source)
{
_source = source;
}
public IAsyncEnumerator<object> GetEnumerator()
{
return new AsyncEnumerator<T>(_source.GetEnumerator());
}
}
public class AsyncEnumerator<T> : IAsyncEnumerator<object>
{
private IAsyncEnumerator<T> _source;
public AsyncEnumerator(IAsyncEnumerator<T> source)
{
_source = source;
}
public object Current => _source.Current;
public void Dispose()
{
_source.Dispose();
}
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
return await _source.MoveNext(cancellationToken);
}
}
public static class AsyncEnumerationExtensions
{
public static IAsyncEnumerable<object> ToAsyncEnumerable(this object source)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
else if (!source.GetType().GetInterfaces().Any(i => i.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)))
{
throw new ArgumentException("IAsyncEnumerable<> expected", nameof(source));
}
var dataType = source.GetType()
.GetInterfaces()
.First(i => i.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
.GetGenericArguments()[0];
var collectionType = typeof(AsyncEnumerable<>).MakeGenericType(dataType);
return (IAsyncEnumerable<object>)Activator.CreateInstance(collectionType, source);
}
}