注意:我真的试图简化这段代码
我有多个 Runnable 执行的多个进程(不同类型)。
我试图用图表来简化这种情况。
我有一个RunnableProducer
生产它的时间,生产的是转移到一个RunnableWorker
执行一些操作ProcessorDown
(蓝色箭头)执行一个过程,并将其分发给它的接收者(同类型的类)。如果RunnableWorker
被标记(code
非空),它必须执行一种特殊类型的过程Processor
并将其返回给传输它的“父级” RunnableWorker
。也就是说,您的接收器收集了许多执行另一个附加ProcessorUp
(绿色箭头)注意绿色箭头的数量。初始RunnableWorker
传输(在同一类的中介的帮助下)所有数据到RunnableConsumer
不混合它们的情况下,谁将执行另一个任务(对于这个问题,print
)。
RunnableProducer
应该只在最终RunnableConsumer
可以接收/收集所有产生的东西(由 转移RunnableWorker's
)时产生。RunnableProducer
可以独立关闭。但是,RunnableConsumer
必须在RunnableProducer
生产时运行,直到他消耗掉所有东西(及其变体)。
注意:您可以复制、粘贴、编译和运行
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestCollectorRunnable1 {
public static void main(String... args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Counter counter = new Counter();
LifeCycle rcLifeCycle = new LifeCycle();
LifeCycle rpLifeCycle = new LifeCycle();
RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, executorService, counter);
RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, executorService, counter);
RunnableWorker rw0 = new RunnableWorker("rw0", executorService, counter, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());
RunnableWorker rw11 = new RunnableWorker("rw11", executorService, counter, null, rw0);
RunnableWorker rw12 = new RunnableWorker("rw12", executorService, counter, null, rw0);
rw0.addBlockingQueue(rw11.getInputBlockingQueue());
rw0.addBlockingQueue(rw12.getInputBlockingQueue());
RunnableWorker rw211 = new RunnableWorker("rw211", executorService, counter, 1, rw11);
RunnableWorker rw212 = new RunnableWorker("rw212", executorService, counter, 2, rw11);
RunnableWorker rw213 = new RunnableWorker("rw213", executorService, counter, 3, rw11);
rw11.addBlockingQueue(rw211.getInputBlockingQueue());
rw11.addBlockingQueue(rw212.getInputBlockingQueue());
rw11.addBlockingQueue(rw213.getInputBlockingQueue());
RunnableWorker rw221 = new RunnableWorker("rw221", executorService, counter, 4, rw12);
RunnableWorker rw222 = new RunnableWorker("rw222", executorService, counter, 5, rw12);
rw12.addBlockingQueue(rw221.getInputBlockingQueue());
rw12.addBlockingQueue(rw222.getInputBlockingQueue());
//Simulate Turn off
new Timer().schedule(new TimerTask() {
@Override
public void run() {
rp.stop();
}
}, ThreadLocalRandom.current().nextLong(100L, 1000L));
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class RunnableProducer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final int bufferSize;
private final BlockingQueue<ChunkDTO> outBlockingQueue;
private volatile boolean isRunning = false;
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.bufferSize = 8;
this.outBlockingQueue = outBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
long quantity = 0;
isRunning = true;
//Blocking Wait (not very elegant)
/*
block until the consumer can consume without losing what is produced and processed
*/
while (!outLifeCycle.isRunning()) {
try {
Thread.sleep(10);
} catch (Exception e) {
}
}
while (/*isRunning*/quantity < 5) {
ownLifeCycle.setRunning(true);
try {
byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
outBlockingQueue.put(chunkDTO);
System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
Thread.sleep(timeSleeping);
} catch (Exception e) {
}
quantity++;
counter.setValue(quantity);
}
System.out.println(name + "\tSent:" + quantity);
}
public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
return outBlockingQueue;
}
public void stop() {
isRunning = false;
}
}
public static class RunnableConsumer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final BlockingQueue<ChunkDTO> inBlockingQueue;
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.inBlockingQueue = inBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
if (inBlockingQueue != null) {
try {
int quantity = 0;
while (!outLifeCycle.isCreated() || outLifeCycle.isRunning()/*haya recolectado lo que tiene que recolectar*/) {
ownLifeCycle.setRunning(true);
ChunkDTO chunkDTO = inBlockingQueue.take();
System.out.println(name + ".Collected " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t pitch:" + chunkDTO.getPitch());
quantity++;
}
System.out.println(name + "\tReceived:" + quantity);
} catch (InterruptedException e) {
}
}
}
public BlockingQueue<ChunkDTO> getInBlockingQueue() {
return inBlockingQueue;
}
}
public static class RunnableWorker {
private final ExecutorService executorService;
private final RunnableWorker parent;
private final BlockingQueue<ChunkDTO> inputBlockingQueue;
private final BlockingQueue<ChunkDTO> outputBlockingQueue;
private final List<BlockingQueue<ChunkDTO>> downList;
private final List<BlockingQueue<ChunkDTO>> upList;
private final Set<Integer> codes;
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
this.executorService = executorService;
this.parent = parent;
this.inputBlockingQueue = inputBlockingQueue;
this.outputBlockingQueue = outputBlockingQueue;
this.downList = new ArrayList<>();
this.upList = new ArrayList<>(Arrays.asList(new SynchronousQueue/*LinkedBlockingQueue*/<>()));
this.codes = new HashSet<>();
//RUNNABLE DISTRIBUTOR
this.executorService.execute(() -> {
if (inputBlockingQueue != null) {
try {
while (true) {
ChunkDTO chunkDTO = inputBlockingQueue.take();
/*
if (codes.size() > 0) {
System.out.println(name + " codes.length:" + codes.size());
}
if (parent == null) {
System.out.println(name + ".Worked " + new String(chunkDTO.getChunk()) + "\tindex:" + chunkDTO.getIndex());
}
// */
if (code == null) {
new ProcessorDown(executorService, chunkDTO, downList);
} else {
ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
System.out.println("\t\t" + name + ".Returned " + returned.toString());
if (parent != null) {
new Processor(executorService, returned, parent.getUpList());
parent.addCodeSon(code);
}
}
}
} catch (Exception e) {
}
}
});
//RUNNABLE COLLECTOR
if (code == null) {
this.executorService.execute(() -> {
int quantity = 0;
while (quantity == 0) {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
if (outBlockingQueue != null) {
try {
while (quantity == 0 || (quantity > 0 && quantity < codes.size() * (counter.getValue()))) {
ChunkDTO chunkDTO = outBlockingQueue.take();
/*
System.out.println("\t" + name + ".quantity: " + quantity + ", codes.size():" + codes.size() + ", counter.getValue():" + counter.getValue() + ", total:" + (codes.size() * counter.getValue())
+ "\r\t\tcchunk:" + chunkDTO
+ "\r\t\tcodes:" + codes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
// */
if (chunkDTO != null) {
if (parent == null) {
outputBlockingQueue.put(chunkDTO);
System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
} else {
new ProcessorUp(executorService, chunkDTO, parent.getUpList());
}
quantity++;
}
}
/*
if (quantity != 0) {
String codesString = codes.stream().map(i -> i.toString()).collect(Collectors.joining(","));
System.out.println("\t" + name + "\tWorked:" + quantity + ", \tcodes:" + codesString);
}
// */
} catch (InterruptedException e) {
}
}
}
});
}
}
public void addCodeSon(Integer code) {
if (parent != null) {
parent.addCodeSon(code);
}
codes.add(code);
}
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent) {
this(name, executorService, counter, code, parent, new SynchronousQueue/*LinkedBlockingQueue*/<>(), new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
return inputBlockingQueue;
}
public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.add(blockingQueue);
}
public void delBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.remove(blockingQueue);
}
public List<BlockingQueue<ChunkDTO>> getUpList() {
return upList;
}
}
public static class Processor implements Runnable {
private final ExecutorService executorService;
private final List<BlockingQueue<ChunkDTO>> listOutput;
private final ChunkDTO inChunkDTO;
public Processor(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
this.executorService = executorService;
this.listOutput = listOutput;
this.inChunkDTO = inChunkDTO;
this.executorService.execute(this);
}
@Override
public void run() {
if (inChunkDTO != null) {
try {
byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
if (listOutput != null) {
listOutput.forEach(output -> {
try {
output.put(outChunkDTO);
} catch (Exception e) {
}
});
}
} catch (Exception e) {
}
}
}
}
public static class ProcessorDown extends Processor {
public ProcessorDown(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
public static class ProcessorUp extends Processor {
public ProcessorUp(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return out;
}
public static class ChunkDTO {
private final byte[] chunk;
private final long index;
private final Integer pitch;
public ChunkDTO(byte[] chunk, long index, Integer pitch) {
this.chunk = chunk;
this.index = index;
this.pitch = pitch;
}
public byte[] getChunk() {
return chunk;
}
public long getIndex() {
return index;
}
public Integer getPitch() {
return pitch;
}
@Override
public String toString() {
return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Long value;
public Counter() {
this.rwLock = new ReentrantReadWriteLock();
this.value = 0L;
}
public Long getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Long value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
public static class LifeCycle {
private final ReadWriteLock rwLock;
private boolean created;
private boolean running;
private boolean finished;
public LifeCycle() {
this.rwLock = new ReentrantReadWriteLock();
}
public boolean isCreated() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return created;
} finally {
readLock.unlock();
}
}
public void setCreated(boolean created) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.created = created;
} finally {
writeLock.unlock();
}
}
public boolean isRunning() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return running;
} finally {
readLock.unlock();
}
}
public void setRunning(boolean running) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.running = running;
} finally {
writeLock.unlock();
}
}
public boolean isFinished() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return finished;
} finally {
readLock.unlock();
}
}
public void setFinished(boolean finished) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.finished = finished;
} finally {
writeLock.unlock();
}
}
}
}
该类ChunkDTO
包含数据、索引(位置)和代码(以方便其分类RunnableConsumer
)。
类Counter
,以便控制RunnableConsumer/RunnableWorker
应该期望的内容。
如果RunnableProducer
产生 7,并且有 5 个代码,则RunnableConsumer
最终应该收集 35。rw11应该收集,而rw12 应该收集。RunnableWorker
3*7=21
RunnableWorker
2*7=14
创建该类LifeCycle
是为了控制 LifeCycle Producer 和 Consumer,我仍然没有RunnableWorker
.
有RunnableWorker
两个Runnable,以处理转移他们的孩子(//RUNNABLE DISTRIBUTOR
)和父母(//RUNNABLE COLLECTOR
)。
输出
rp.Produced tpwqomrt index:0
rw0.Worked tpwqomrt index:0
rw221.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw222.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rw212.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rw213.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw211.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rp.Produced xwnlpkju index:1
rw0 codes.length:5
rw0.Worked xwnlpkju index:1
rw11 codes.length:3
rw12 codes.length:2
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rc.Collected tpwqomrt index:0 pitch:2
rc.Collected tpwqomrt index:0 pitch:4
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rc.Collected tpwqomrt index:0 pitch:1
rc.Collected tpwqomrt index:0 pitch:3
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rc.Collected tpwqomrt index:0 pitch:5
rw212.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=2}
rw221.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=4}
rw222.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=5}
rw213.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=3}
rw211.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=1}
rp.Produced xmdfcmmo index:2
rw0 codes.length:5
rw0.Worked xmdfcmmo index:2
rw12 codes.length:2
rw11 codes.length:3
rw221.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=4}
rw212.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=2}
rw222.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=5}
rw213.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=3}
rw211.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=1}
rw0 codes.length:5
rp.Produced syqpyxuk index:3
rw0.Worked syqpyxuk index:3
rw11 codes.length:3
rw0 codes.length:5
rw0.Worked linlkasp index:4
rp.Produced linlkasp index:4
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=1}
rw213.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=3}
rw212.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=2}
rw11 codes.length:3
rw222.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=5}
rw221.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=4}
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=1}
rw213.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=3}
rw222.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=5}
rw212.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=2}
rw221.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=4}
rp Sent:5
如您所见,生产者发送了 5 个块,但我损失了大部分,我需要收到 25 个(对于本示例)。消费者只收集了少量物品。我的逻辑有什么问题?
我不知道创建两个 runnable 是否是 RunnableWorker 类的好解决方案。有没有更好的实现方式?
我认识到我有一种可怕的方法来阻止生产者等待消费者。您推荐什么解决方案?