0

我有 10 名工人名单的代码和以下两种方法:

public void demoDeques() {
        int maxSizeOfJobDeque = 3;
        Producer producer = new ProducerImpl( maxSizeOfJobDeque );

        Logger.debug( "WorkFlowEngineImpl : " +
                "Creating Workers and adding them to allocator" );
        List<Worker> workerList = buildWorkerList( producer );
        Logger.debug( "WorkFlowEngineImpl : " +
                "Assigning some jobs to the workers. " +
                    "The workers have not been started yet");

        for ( int i=1; i<4; i++ ) {
            producer.assign( new JobImpl( "job " + i, i ) );
            try {
                Thread.sleep( 4000 );
            } catch( InterruptedException e ) {
                e.printStackTrace();
            }
        }

        Logger.debug( "WorkFlowEngineImpl : " + "Starting the workers" );
        startWorkersAndWait5Seconds( workerList );
        Logger.debug( "WorkFlowEngineImpl : " +
                "Assigning some more jobs to the " +
                    "started workers" );

        for ( int i=4; i<7; i++ ) {
            producer.assign( new JobImpl( "Job " + i, i ) );
            try {
                Thread.sleep( 4000 );
            } catch( InterruptedException e ) {
                e.printStackTrace();
            }
        }

        Logger.debug( "WorkFlowEngineImpl : " + "Assigning More Jobs" );
        for ( int i=7; i<11; i++ ) {
            producer.assign( new JobImpl( "job" + i, i ) );
            try {
                Thread.sleep( 4000 );
            } catch( InterruptedException e ) {
                e.printStackTrace();
            }
        }
    }

生产商:

public synchronized void assign( Job job ) {
    Set<Worker> workerSet = jobMap.keySet();
    LinkedBlockingDeque<Job> jobQueue;
    StringBuffer sb;

    for ( Worker worker : workerSet ) {
        jobQueue = jobMap.get( worker );

        sb = new StringBuffer();
        sb.append( "Assigning job " );
        sb.append( job.getJobNumber() );
        sb.append( " to " );
        sb.append( worker );
        sb.append( "'s jobs Deque" );

        Logger.debug( "Producer : " + sb.toString() );

        if ( ! jobQueue.offerFirst( job ) ) { 
            jobQueue.pollLast();
            jobQueue.offerFirst( job );
        }
    }
}

我正在尝试更改这两种方法,以便分配器有一个包含 100 个工作的列表,然后以这样的方式进行分配,即十个工人中的每一个在时间上最多分配三个工作,直到达到 100 个,即工人一个需要工作 1,2,3 工人 2 需要 4,5,6,所以当工人 10 到达时,它会返回给工人 1 分配三个工作,直到达到第 100 个工作,然后它会停止并警告所有工作都已分配。请帮助我被卡住了....

4

2 回答 2

2

尽管您的问题提到了具体的执行顺序,但您的评论似乎表明这不是真正的要求。我建议您使用ExecutorsJava 1.5 中提供的线程池,而不是实现自己的线程池。例如:

ExecutorService threadPool =
    Executors.newFixedThreadPool(NUMBER_THREADS_TO_RUN_JOBS);
// your producers then just have to submit jobs to the pool
for ( int i=1; i<4; i++ ) {
    threadPool.submit(new JobImpl("Job " + i, i));
}

我无法从您的示例中确切看到每个“作业”需要进行哪些处理,但是否JobImpl需要实现Runnable上述代码才能正常工作。如果您需要从您的工作中返回一些值,那么您可以更改JobImpl为 isCallable并使用方法Future返回的submit(Callable)方法来获取call().

于 2012-04-24T13:54:55.893 回答
1

我不同意你的工作分配方法,我认为这就是你的代码挂起的原因。

一般来说,生产者应该创建工作并将其分配给队列。担心分配工作不是制片人的责任。当工人可用时,他们将从队列中拉出作业。他们是拉 1 个工作还是 3 个工作只是工作线程拉动时的逻辑问题。

您可能希望为队列创建一个单例类。它应该处理接收和请求工作。

以这种方式接近它可以让您将工人与生产者分离。您可以根据工作量/瓶颈的要求添加更多生产者或工人。它还使您无法生成某种信号机制,其中工人向生产者指示他们已准备好进行更多工作。最后,这使您不必担心只有 1 个作业可用的各种竞争条件,或者作业编号与您在 demoDequeue 中的魔术数字顺序不符。

HTH,对不起,这不是一个直接的答案,但应该让你走上更好的解决问题的道路。

于 2012-04-24T13:52:25.083 回答