1

上下文:我有一个应用程序,允许用户处理当天收到的所有邮寄付款。有时,一个信封可能包含同一个帐户的多张支票(想想两个室友各自支付他们的一部分水电费)。

限制:以 10 个批次处理所有付款,但每批次的帐户 ID 必须是唯一的。

非常简化的支付类:

public class Payment
{
    public int AccountId { get; set; }
    // ... other properties not important
}

今天通过邮件收到的假设付款。请注意,最后两个 AccountId 值是可接受的重复项:

List<Payment> payments = new List<Payment>()
{
    new Payment() {AccountId = 1 },
    new Payment() {AccountId = 2 },
    new Payment() {AccountId = 3 },
    new Payment() {AccountId = 4 },
    new Payment() {AccountId = 5 },
    new Payment() {AccountId = 1 }, // Duplicate Account
    new Payment() {AccountId = 2 }  // Duplicate Account

    // likely hundreds more unique accounts, possibly even some more duplicates...
};

我正在使用 MoreLinq 尝试为每批选择不同的帐户,但下面的代码显然不起作用。我觉得我已经很接近了,但一直找不到可行的解决方案。同样,目标是将所有付款分成 N 个批次,而不复制该批次中的 AccountId。重复的 AccountId 必须分布在其他批次中,以便在尝试更新客户的余额时不会导致竞争条件。

为清楚起见,编辑了代码注释。

int batchSize = 10;
var paymentTasks = new List<Task>(batchSize);

// This linq expression is the heart of my question: How to divide the payments
// into batches while ensuring uniqueness of a particular key(s). This expression
// is close, but the DistinctBy() is obviously excluding the duplicates that
// I just intend to be distinct for that Batch(batchSize).
foreach (IEnumerable<Payment> batchOfPayments in payments.DistinctBy(a => a.AccountId).Batch(batchSize))
{
    // The rest of this method is for context only

    paymentTasks.Clear();

    foreach (Payment payment in batchOfPayments)
    {
        // Async method implementation not important
        Task paymentTask = ProcessPaymentAsync(payment);
        paymentTasks.Add(paymentTask);
    }

    // Await all the tasks in this batch to complete before starting the next batch
    await Task.WhenAll(paymentTasks);
}

感谢您抽出宝贵时间查看我的问题。

4

3 回答 3

2

如果我完全理解这个问题,那么有很多方法可以做到这一点,最好的解决方案将取决于您的实际需求。

假设是:

  1. 您所描述的是一种内存方法
  2. 它不需要打数据库
  3. 它不需要是生产者消费者。

然后可以使用非常简单(但有效)的批处理和队列模式,并且分配最少。

给定

public class Payment
{
   public int AccountId { get; set; }
   public Payment(int accountId) => AccountId = accountId;
}

public static IEnumerable<Payment[]> GetBatches(IEnumerable<Payment> source, int count)
{
   var hashset = new HashSet<int>(count);
   var batch = new List<Payment>(count);
   var leftOvers = new Queue<Payment>();

   while (true)
   {
      foreach (var item in source)
      {
         // check if batched
         if (hashset.Add(item.AccountId))
            batch.Add(item); // add to batch
         else
            leftOvers.Enqueue(item); // add to left overs

         // if we are at the batch size start a loop
         while (batch.Count == count)
         {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
               if (hashset.Add(leftOvers.Peek().AccountId)) // check the batch
                  batch.Add(leftOvers.Dequeue());
               else break; // we still have a duplicate bail
         }
      }

      if(batch.Any()) yield return batch.ToArray();

      if (!leftOvers.Any()) break;

      source = leftOvers.ToList(); // allocation  :(
      hashset.Clear();
      batch.Clear();
      leftOvers.Clear();
   }
}

注意:这是相当节省资源的,尽管在处理纯剩菜时它可能确实有额外的不必要的小分配,我相信这可以被删除,尽管我将把它留给你。您还可以通过使用渠道来增加许多效率,很容易将其转变为消费者

测试

var list = new List<Payment>() {new(1), new(2), new(3), new(4), new(4), new(5), new(6), new(4), new(4), new(6), new(4)};

var batches = GetBatches(list, 3);

foreach (var batch in batches)
   Console.WriteLine(string.Join(", ",batch.Select(x => x.AccountId)));

输出

1, 2, 3
4, 5, 6
4, 6
4
4
4

完整的演示在这里玩

于 2021-01-08T00:19:44.207 回答
1

请注意,这假设这不是一个真实世界的示例,因为它绝对不能防止接收端出现重复!这只是一种分发前端加载的批次而不重复的方法,同时只循环一次集合。

它依赖于按需创建存储桶并在整个存储桶列表以及当前目标存储桶本身中跟踪位置。此外,它使用自定义的 EqualityComparer 来启用 O(1) 在每个单独的存储桶中查找付款。

public async Task Implementation(List<Payment> payments)
{
    int batchSize = 10;
    var paymentTasks = new List<Task>();
    // use a custom equality comparer to allow for O(1) lookups
    // of payments based on AccountId
    var buckets = new List<HashSet<Payment>>();

    var incompletedBucketPosition = 0;
    var currentBucketPosition = 0;

    foreach (var payment in payments)
    {
        var isAdded = false;
        while (!isAdded)
        {
            if (currentBucketPosition >= buckets.Count)
            {
                // if we have run out of buckets, we need to
                // create a new one.
                // You could also create buckets ahead of time but
                // without knowing how many duplicates to expect,
                // this could be more efficient.
                buckets.Add(new HashSet<Payment>(batchSize));
            }
            
            var currentBucket = buckets[currentBucketPosition];

            if (currentBucket.Count >= batchSize)
            {   
                // our batch is complete. Advance to the next incomplete bucket
                // and reset our currentBucketPosition to the incomplete position.
                currentBucketPosition = ++incompletedBucketPosition;
            }
            else
            {
                if (currentBucket.Contains(payment, new PaymentComparer()))
                {
                    // we can't use this batch since our new payment would be a dupe
                    // advance to the next bucket position.
                    currentBucketPosition++;
                }
                else
                {
                    currentBucket.Add(payment);
                    isAdded = true;
                    // since we've successfully added an element,
                    // reset the currentBucketPosition to the last 
                    // incomplete bucket.
                    currentBucketPosition = incompletedBucketPosition;
                }
            }
        }
    }
    
    paymentTasks.AddRange(buckets.Select(b => ProcessPaymentAsync(b)));
    await Task.WhenAll(paymentTasks);
}

public async Task ProcessPaymentAsync(HashSet<Payment> paymentBatch)
{
    Console.WriteLine(string.Join(',', paymentBatch.Select(b => b.AccountId)));
    await Task.FromResult(0);
}

public class Payment
{
    public int AccountId { get; set; }
}

public class PaymentComparer : IEqualityComparer<Payment>
{
    public bool Equals(Payment x, Payment y)
    {
        return x?.AccountId == y?.AccountId;
    }

    public int GetHashCode(Payment obj)
    {
        return obj?.AccountId.GetHashCode() ?? 0;
    }
}

您可以使用以下方法对其进行测试:

void Main()
{
    List<Payment> payments = new List<Payment>()
    {
        new Payment() {AccountId = 1 },
        new Payment() {AccountId = 2 },
        new Payment() {AccountId = 3 },
        new Payment() {AccountId = 4 },
        new Payment() {AccountId = 5 },
        new Payment() {AccountId = 1 }, // Duplicate Account
        new Payment() {AccountId = 2 },  // Duplicate Account
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 7 },
        new Payment() {AccountId = 8 },
        new Payment() {AccountId = 9 },
        new Payment() {AccountId = 10 },
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 11 },
        new Payment() {AccountId = 12 },
        new Payment() {AccountId = 13 },
        new Payment() {AccountId = 14 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 16 },
        new Payment() {AccountId = 17 },
        new Payment() {AccountId = 18 },
        new Payment() {AccountId = 6 },
        // likely hundreds more unique accounts, possibly even some more duplicates...
    };
    
    Implementation(payments).GetAwaiter().GetResult();  
}

输出:

1,2,3,4,5,6,7,8,9,10
1,2,6,11,12,13,14,15,16,17
15,18,6
于 2021-01-08T00:20:42.477 回答
0

我只是在这里发布我的@TheGeneral 答案版本。编写原始解决方案的所有功劳都归功于他。

我的版本略有不同,因为我已将其完全通用,以便在 OP 的直接用例之外使用,见下文:

public static IEnumerable<IEnumerable<TSource>> BatchDuplicate<TSource, TKey>(
    this IEnumerable<TSource> source, 
    Func<TSource, TKey> keySelector,
    int count
)
{
var hashset = new HashSet<TKey>(count);
var batch = new List<TSource>(count);
var leftOvers = new Queue<TSource>();

while (true)
{
    foreach (var item in source)
    {
        // check if batched
        if (hashset.Add(keySelector(item)))
        {
            batch.Add(item); // add to batch
        }
        else
        {
            leftOvers.Enqueue(item); // add to left overs
        }

        // if we are at the batch size start a loop
        while (batch.Count == count)
        {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
            if (hashset.Add(keySelector(leftOvers.Peek())))
            {
                batch.Add(leftOvers.Dequeue());
            } // check the batch
            else break; // we still have a duplicate bail
        }
    }

    if(batch.Any()) yield return batch.ToArray();

    if (!leftOvers.Any()) break;

    source = leftOvers.ToList(); // allocation  :(
    hashset.Clear();
    batch.Clear();
    leftOvers.Clear();
}

用法:

class MyModel
{
    public string MyKey { get; set; }
}

// // IEnumerable<MyModel>
someEnumerable.BatchDuplicate(item => item.MyKey, 10);

这允许该方法与任何模型一起使用,只要您为模型的键提供选择器

于 2021-12-03T18:14:05.060 回答