3

我正在寻找一些将处理事件的并发代码。此处理可能需要很长时间。

当该事件正在处理时,它应该记录传入事件,然后在它可以再次自由运行时处理最后传入的事件。(可以丢弃其他事件)。这有点像 FILO 队列,但我只需要在队列中存储一个元素。

理想情况下,我想将我的新 Executor 插入到我的事件处理架构中,如下所示。

public class AsyncNode<I, O> extends AbstractNode<I, O>  {
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class);
    private Executor executor;

    public AsyncNode(EventHandler<I, O> handler, Executor executor) {
        super(handler);
        this.executor = executor;
    }

    @Override
    public void emit(O output) {
        if (output != null) {
            for (EventListener<O> node : children) {
                node.handle(output);
            }
        }
    }

    @Override
    public void handle(final I input) {

        executor.execute(new Runnable() {

            @Override
            public void run() {
                try{
                emit(handler.process(input));
                }catch (Exception e){
                    log.error("Exception occured whilst processing input." ,e);
                    throw e;
                }

            }
        });

    }

}

4

5 回答 5

3

我也不会。我会对您要处理的事件有一个 AtomicReference 并添加一个任务以破坏性方式处理它。

final AtomicReference<Event> eventRef =

public void processEvent(Event event) {
   eventRef.set(event);
   executor.submit(new Runnable() {
       public vodi run() {
           Event e = eventRef.getAndSet(null);
           if (e == null) return;
           // process event
       }
   }
}

这只会在执行器空闲时处理下一个事件,而无需自定义执行器或队列(可用于其他事情)

这也可以扩展到具有键控事件,即您想要处理键的最后一个事件。

于 2012-07-03T07:40:43.443 回答
3

我认为关键是您需要将“丢弃政策”应用于您的Executor. 如果您只想处理最新的任务,那么您需要队列大小为 1 和丢弃最旧的“丢弃策略”。这是执行此操作的示例

Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded 
        30L, TimeUnit.SECONDS, // Keep alive, not really important here
        new ArrayBlockingQueue<>(1), // Single element queue
        new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest

然后当你的任务进来时,只需将它们提交给这个执行器,如果已经有一个排队的作业,它将被新的替换

latestTaskExecutor.execute(() -> doUpdate()));

这是一个显示此工作的示例应用程序

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class LatestUpdate {

    private static final Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded
            30L, TimeUnit.SECONDS, // Keep alive, not really important here
            new ArrayBlockingQueue<>(1), // Single element queue
            new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest

    private static final AtomicInteger counter = new AtomicInteger(0);
    private static final Random random = new Random(); 

    public static void main(String[] args) {
        LatestUpdate latestUpdate = new LatestUpdate();
        latestUpdate.run();
    }

    private void doUpdate(int number) {
        System.out.println("Latest number updated is: " + number);
        try { // Wait a random amount of time up to 5 seconds. Processing the update takes time...
            Thread.sleep(random.nextInt(5000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void run() {
        // Updates a counter every second and schedules an update event
        Thread counterUpdater = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000L); // Wait one second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                counter.incrementAndGet();
                // Schedule this update will replace any existing update waiting 
                latestTaskExecutor.execute(() -> doUpdate(counter.get()));
                System.out.println("New number is: " + counter.get());
            }
        });
        counterUpdater.start(); // Run the thread
    }
}

这也涵盖了 GUI 的情况,一旦更新停止到达,您希望 GUI 最终与收到的最后一个事件保持一致。

于 2018-12-11T15:00:56.187 回答
0
public class LatestTaskExecutor implements Executor {
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>();
    private final Executor executor;

    public LatestTaskExecutor(Executor executor) {
        super();
        this.executor = executor;
    }

    @Override
    public void execute(Runnable command) {
        lastTask.set(command);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                Runnable task=lastTask.getAndSet(null);
                if(task!=null){
                    task.run();
                }
            }
        });

    }
}

@RunWith( MockitoJUnitRunner.class )
public class LatestTaskExecutorTest {

    @Mock private Executor executor;
    private LatestTaskExecutor latestExecutor;
    @Before
    public void setup(){
        latestExecutor=new LatestTaskExecutor(executor);
    }
    @Test
    public void testRunSingleTask() {
        Runnable run=mock(Runnable.class);
        latestExecutor.execute(run);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor).execute(captor.capture());
        captor.getValue().run();
        verify(run).run();
    }

    @Test
    public void discardsIntermediateUpdates(){
        Runnable run=mock(Runnable.class);
        Runnable run2=mock(Runnable.class);
        latestExecutor.execute(run);
        latestExecutor.execute(run2);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor,times(2)).execute(captor.capture());
        for (Runnable runnable:captor.getAllValues()){
            runnable.run();
        }
        verify(run2).run();
        verifyNoMoreInteractions(run);
    }
}
于 2012-07-06T18:54:18.790 回答
0

这个答案是来自 DD 的一个修改版本,它最大限度地减少提交多余任务。

原子引用用于跟踪最新事件。自定义任务被提交到队列以潜在地处理事件,只有读取最新事件的任务才会真正继续执行并在清除对 null 的原子引用之前执行有用的工作。当其他任务有机会运行并发现没有可处理的事件时,它们只是什么都不做,然后默默地消失。通过跟踪队列中可用任务的数量来避免提交多余的任务。如果队列中至少有一个待处理的任务,我们可以避免提交该任务,因为该事件将在已经排队的任务出列时处理。

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class EventExecutorService implements Executor {

    private final Executor executor;
    // the field which keeps track of the latest available event to process
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>();
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);

    public EventExecutorService(final Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(final Runnable eventTask) {
        // update the latest event
        latestEventReference.set(eventTask);
        // read count _after_ updating event
        final int activeTasks = activeTaskCount.get();

        if (activeTasks == 0) {
            // there is definitely no other task to process this event, create a new task
            final Runnable customTask = new Runnable() {
                @Override
                public void run() {
                    // decrement the count for available tasks _before_ reading event
                    activeTaskCount.decrementAndGet();
                    // find the latest available event to process
                    final Runnable currentTask = latestEventReference.getAndSet(null);
                    if (currentTask != null) {
                        // if such an event exists, process it
                        currentTask.run();
                    } else {
                        // somebody stole away the latest event. Do nothing.
                    }
                }
            };
            // increment tasks count _before_ submitting task
            activeTaskCount.incrementAndGet();
            // submit the new task to the queue for processing
            executor.execute(customTask);
        }
    }
}
于 2012-07-07T03:58:04.963 回答
0

虽然我喜欢 James Mudd 的解决方案,但它仍然会在前一个任务运行时将第二个任务排入队列,这可能是不可取的。如果您想在先前未完成的情况下始终忽略/丢弃到达的任务,您可以像这样制作一些包装器:

public class DiscardingSubmitter {
private final ExecutorService es = Executors.newSingleThreadExecutor();
private Future<?> future = CompletableFuture.completedFuture(null); //to avoid null check

public void submit(Runnable r){
    if (future.isDone()) {
        future = es.submit(r);
    }else {
        //Task skipped, log if you want
    }
}

}

于 2021-09-07T05:32:23.773 回答