例如,我有以下可运行的 java 代码。
它是关于一个生产者和几个并行消费者的。这些消费者正在运行耗时的作业,并且它们并行运行。
我想知道这个用例是否匹配 rx-java,以及如何在 rx-java 中重写它。
public class DemoInJava {
public static void main(String[] args) {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
AtomicBoolean done = new AtomicBoolean(false);
Thread producer = new Thread(() -> {
int offset = 0;
int limit = 10;
while (true) {
if (queue.isEmpty()) {
if (offset < 100) {// there is 100 records in db
fetchDataFromDb(offset, limit).forEach(e -> queue.add(e));
offset = offset + limit;
} else {
done.set(true);
break; // no more data
}
} else {
try {
Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
} catch (InterruptedException e) {
}
}
}
});
List<Thread> consumers = IntStream.range(0, 5).boxed().map(c -> new Thread(() ->
{
while (true) {
Integer i = queue.poll();
if (i != null) {
longRunJob(i);
} else {
if (done.get()) {
break;
} else {
try {
Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
} catch (InterruptedException e) {
}
}
}
}
})).collect(Collectors.toList());
producer.start();
consumers.forEach(c -> c.start());
}
private static List<Integer> fetchDataFromDb(int offset, int limit) {
return IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList());
}
private static void longRunJob(Integer i) {
System.out.println(Thread.currentThread().getName() + " long run job of " + i);
}
}
输出是:
....
Thread-1 long run job of 7
Thread-1 long run job of 8
Thread-1 long run job of 9
Thread-4 long run job of 10
Thread-4 long run job of 16
Thread-10 long run job of 14
Thread-5 long run job of 15
Thread-8 long run job of 13
Thread-7 long run job of 12
Thread-9 long run job of 11
Thread-10 long run job of 19
Thread-4 long run job of 18
Thread-3 long run job of 17
....