我有一个场景,我必须为用户处理文件夹中的 CSV 文件,并在处理后将它们存储到数据库中。我们每个用户有 5 种类型的提要。任何用户都可以在该文件夹中发送任何提要,随时进行处理,需要遵循以下规则:
- 不能同时处理同一客户的同一类型的提要,这意味着必须始终阻止同时处理时间。
- 不允许跨“x”个以上的客户端进行并发处理
- 不允许同一客户端同时处理超过“y”个文件
实现这一目标的好方法是什么?
我有一个场景,我必须为用户处理文件夹中的 CSV 文件,并在处理后将它们存储到数据库中。我们每个用户有 5 种类型的提要。任何用户都可以在该文件夹中发送任何提要,随时进行处理,需要遵循以下规则:
实现这一目标的好方法是什么?
第一个限制可以用AtomicBoolean的映射来实现。这可能不需要是 ConcurrentHashMap,因为您不会在初始化后更改地图的键。完成后不要忘记将提要的值重置为 false。
checkAndProcessFeed(Feed feed, Map<String, AtomicBoolean> map) {
while(!map.get(feed.type).compareAndSet(false, true)) // assuming the AtomicBooleans were initialized to false
Thread.sleep(500);
}
process(feed); // at this point map.get(feed.type).get() == true
map.get(feed.type).set(false); // reset the AtomicBoolean to false
}
其他两个限制可以用AtomicInteger来实现,用于维护客户端和每个客户端文件的计数;处理完成时递减,并通过比较和设置递增以启动新的客户端/文件。
final int maxClient = 5;
AtomicInteger clientCount = new AtomicInteger(0);
ConcurrentLinkedQueue<Client> queue = new ConcurrentLinkedQueue<>(); // hold waiting clients
while(true) {
int temp = clientCount.intValue();
if(!queue.isEmpty() && clientCount.compareAndSet(temp, temp + 1) { // thread-safe increment
process(clients.poll()); // don't forget to decrement the clientCount when the processing finishes
} else Thread.sleep(500);
}