2

我有一个由 3 个阶段组成的责任链。

阶段

我的阶段实现如下所示。

public class InitialStage implements RecordHandler {

    private RecordHandler next;

    @Override
    public void setNext(RecordHandler handler) {
        if (this.next == null) {
            this.next = handler;
        } else {
            this.next.setNext(handler);
        }
    }

    @Override
    public void handleRequest(String record) {
        System.out.println("Processing @ Initial Stage.");
        if (next != null) {
            next.handleRequest(record);
        }
    }
}

流程管理器

public class ProcessManager {
    private RecordProcessor processor = new RecordProcessor();

    public ProcessManager()
    {
        createPipeLine();
    }

    private void createPipeLine() {
        processor = new RecordProcessor();
        processor.addHandler(new InitialStage());
        processor.addHandler(new MiddleStage());
        processor.addHandler(new FinalStage());
    }

    public void processRecord(String record){
        processor.handleRequest(record);
    }
}

界面

public interface RecordHandler {
    public void setNext(RecordHandler handler);
    public void handleRequest(String record);
}

最后是 RecordProcessor,

public class RecordProcessor {

    private RecordHandler successor;
    private RecordHandler first;

    public RecordProcessor(){
    }

    public void addHandler(RecordHandler recordHandler){
        if(this.first == null){
            this.first = recordHandler;
        }
        else{
            this.successor.setNext(recordHandler);  
        }
        this.successor = recordHandler;
    }

    public void handleRequest(String record){
        first.handleRequest(record);
    }
}

现在我观察到我的中间阶段需要一些时间才能完成。所以现在我对使用线程池很感兴趣,所以记录的处理方式如下所示。

在这里,我假设了 3 个工作线程。

推荐链

问题

如何正确修改现有的 CoR 以满足新要求?

4

2 回答 2

1

为了社区的利益回答我自己的问题,因为我找到了一个可行的解决方案。

我为这个问题开发了稍微不同的解决方案。我没有使用并行管道,而是使用了多线程阶段。(在这种情况下是中间阶段)。换句话说,中间阶段不是传递单个记录,而是使用输入队列和输出队列。当输入队列被定义数量的记录填充时,线程会产生。

后续阶段可能会一直处于空闲状态,直到队列已满,但这种实现仍然是可以接受的。这可以在不需要排序时使用。如果您需要遵循序列,则需要保留序列 ID 并在输出队列中执行排序。

中期

public class MiddleStage implements RecordHandler {

private RecordHandler next;
private ExecutorService executor = Executors.newFixedThreadPool(5);
private Queue<String> inbound = new LinkedList<String>();
Collection<Callable<String>> tasks = new ArrayList<Callable<String>>();

@Override
public void setNext(RecordHandler handler) {
    if (this.next == null) {
        this.next = handler;
    } else {
        this.next.setNext(handler);
    }
}

@Override
public void handleRequest(String record) {
    System.out.println("Adding new record to Queue : " + record);
    inbound.add(record);
    System.out.println("Queue Size : " + inbound.size());

    if (inbound.size() >= 10)
    {
        System.out.println("Processing the batch.");

        for (int i = 0; i < 10; i++){
            tasks.add(new MiddleWorker(inbound.poll()));
        }

        System.out.println("Processing @ Middle Stage. " + record);
        List <Future<String>> results = null;
        try {
            results = executor.invokeAll(tasks, 60, TimeUnit.SECONDS);
            tasks.clear();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (Future<String> f : results) {
              try {
                String answer = f.get();
                System.out.println("Outbound : " + answer);
                if (next != null) {
                    next.handleRequest(answer);
                }

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            }
        results.clear();
    }
}
}

中间工人阶级

public class MiddleWorker implements Callable<String> {

private String output;

public MiddleWorker(String record)
{
    output = record + " : OK";
}

@Override
public String call() throws Exception {
    try
    {
        Thread.sleep(2 * 1000);
    }
    catch(final InterruptedException ex)
    {
        ex.printStackTrace();
    }

    return (output);
}
}

如果这个答案不清楚,请发表评论,我会修改答案。

于 2013-10-03T07:47:43.633 回答
0

您可以创建一个额外的“阶段”,负责将工作负载传递到线程池中。

public class PoolExecutingStage implements RecordHandler {

    private Executor threadPool;
    private RecordHandler next;

    public PoolExecutingStage(Executor tp) {
         this.threadPool = tp;      
    }

    @Override
    public void setNext(RecordHandler handler) {
           // similar to your example
    }

    @Override
    public void handleRequest(String record) {
        if (next != null) {
            threadPool.execute(new Runnable() {
                 public void run() {
                     next.handleRequest(record);
                 }
            });               
        }
    }
}

接下来,您需要通过processor.addHandler(new PoolExecutingStage(configuredThreadPool));在初始阶段和中间阶段之间添加调用来将这个新阶段包含在您的管道中。


还要注意处理在多个线程中运行时可能需要的任何线程同步

于 2013-07-29T16:56:13.940 回答