我使用https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio作为创建后台服务和进程队列的参考。
我有来自 HTTP 请求的 csvfile 的 ImportService.cs 类,然后我想将它添加到队列中,该队列处理该 CSV 文件并将结果写入数据库。这是我有IBackgroundTaskQueue
实例的服务类
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
private readonly IUploadDataRepository _uploadDataRepository;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue { get; }
private const string AZURE_BLOB_CONTAINER = "blobcontainer";
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
{
_uploadDataRepository = services.GetUploadDataRepository();
_configurationSettings = services.GetService<ConfigurationSettings>();
Queue = queue;
}
public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
{
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
var blobClient = account.CreateCloudBlobClient();
// Make sure container is there
var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
await blobContainer.CreateIfNotExistsAsync();
// set the permission to blob type
await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
using (var fileStream = File.OpenRead(tempFilePath))
{
await blockBlob.UploadFromStreamAsync(fileStream);
}
// ADD FILE TO QUEUE AND PROCESS IT
Queue.QueueBackgroundWorkItem(async token =>
{
Console.WriteLine("ITEM QUEUED PROCESS IT??");
});
await _uploadDataRepository.Add(uploadData);
}
}
}
下面我将添加从 microsoft 示例创建的类:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services.Contracts {
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Services.Services {
/// <summary>
/// Queued Hosted Service class
/// </summary>
public abstract class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private IBackgroundTaskQueue TaskQueue {
get;
}
protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
Console.WriteLine("QueuedHostedService initialized");
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
private void DoWork(object state) {
Console.WriteLine("PROCCESS FILEE???");
}
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
我的问题是该文件应该在哪里处理?在ImportService
? 还是在QueuedHostedService
?如果QueuedHostedService
我应该如何传递和访问该文件?什么是最好的做法?我想创建处理该文件的DoWork()
函数,QueuedHostedService
但我不确定如何