0

假设我有一种方法可以高速处理实时事件消息。

对于每个调用(消息通过),我有多个想要跟踪的状态,并且我在下一次调用该方法时执行的处理类型取决于当前状态。

因为它的速率很高并且可能需要一些时间来处理并且在单个线程上,所以上一个调用可能不会在下一个调用之前完成。

如果我对每个方法调用使用异步多线程实现(例如线程池),那么可以同时执行多个调用,并且每个调用都会评估为相同的状态并且会发生相同类型的处理,这不是我想要的。我想确保如果变量的状态在其中一个线程调用中发生更改,那么其他线程将知道该状态。

我的问题是这种情况下最好的类型实现是什么(使用原子整数?同步?)对于我想确保它异步处理速率和每次调用的处理,但同时想要确保“同时”对线程的多个调用是状态感知的。顺序并不是那么重要。

IE:

state = false;//current state

a thread                     b thread (and vice versa if thread b or thread a "saw" it first)
------------------------------
|                            |
|                            |
sees false                  sees false (should "see" true)
changes to true             changes to true (should not change to true)
|                            |


void processMessage(String message) {
    Runnable runner = new Runnable() {
       void run() {
        if(track.in_state == true) {
          if(track.state == 1) {
             track.in_state = false;
             //do something here
          }
          else if(track.state == 2) {
             track.in_state = false;
             //do something here
          }
        }
      }
    }
    poolA.executor(runner);
    //what happens here is that multiple threads are executed with same processing here
}

void processADifferentMessage(String message) {//a different but also dependent on the state tracker object
    Runnable runner = new Runnable() {
       void run() {
        if(track.in_state == false) {
             //do something here
        }
       }
    };
    //I also want to make sure that its state aware here as well in this thread pool
    poolB.executor(runner);
}

感谢您的任何回复。

4

2 回答 2

0

您提出的当前问题可能可以使用 AtomicInteger 和 AtomicBoolean 来解决。

但是我想您需要某种类型的异步模型,您需要根据某些状态来处理/处理某些消息,并且它们可能会根据某些状态同时执行。对于这些类型的场景,锁定/同步比使用原子版本更好,因为您可能需要使用等待/通知/等待/信号,具体取决于您无法使用 atomicInteger 和 AtomicBoolean 执行的某些状态。你可能有更进一步的要求。

于 2013-07-06T18:05:46.353 回答
0

您可以使用 anAtomicBoolean和 an AtomicInteger,使用它们的compareAndSet运算符。

AtomicBoolean atomicBoolean;
AtomicInteger atomicInteger;
void processMessage(String message) {
    Runnable runner = new ... {
        boolean success = false;
        boolean boolState;
        int intState;
        while(!success) {
            boolState = atomicBoolean.get();
            success = atomicBoolean.compareAndSet(boolState, !boolState);
        }
        success = false
        while(!success) {
            intState = atomicInteger.get();
            success = atomicInteger.compareAndSet(intState, (intState + 1) % maxIntState);
        }
        if(boolState) {
          if(intState == 1) {
             //do something here
          }
          else if(intState == 2) {
             //do something here
          }
        }
    }
    poolA.executor(runner);
}

while循环读取 and 的状态并将AtomicBoolean它们AtomicInteger更新为新状态 - 我假设您AtomicBoolean每次在 true 和 false 之间翻转状态,并且您将 the 初始化AtomicInteger为 0,然后将其递增直到maxIntState达到点你将它重置为 0(例如,如果maxIntState是 4,那么AtomicInteger它将从 0 -> 1 -> 2 -> 3 -> 0)。您在这里使用while循环,以防另一个线程在您读取状态的时间和您尝试更新状态的时间之间更改了状态(例如,您可能读取intState1,但随后另一个线程更新intState为 2 在您可以之前更新它,然后你再试一次intState2)

于 2013-07-06T17:57:11.313 回答