6

我想处理客户端请求流。每个请求都有其特殊类型。首先我需要为该类型初始化一些数据,然后我可以开始处理请求。当客户端类型第一次来的时候,我只是初始化了相应的数据。在此之后,使用该数据处理该类型的所有以下请求。

我需要以线程安全的方式执行此操作。

这是我写的代码。它是线程安全的吗?

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    /* to process client request we need to 
    create corresponding client type data.
    on the first signal we create that data, 
    on the second - we process the request*/

    void onClientRequestReceived(int clientTypeIndex) {
        if (clientTypesInitiated.put(clientTypeIndex, "") == null) {
            //new client type index arrived, this type was never processed
            //process data for that client type and put it into the map of types
            Object clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
        } else {
            //already existing index - we already have results and we can use them
            processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
        }
    }

    Object createClientTypeData(int clientIndex) {return new Object();}

    void processClientUsingClientTypeData(Object clientTypeData) {}
}

一方面,ConcurrentHashMap 不能为同一个 A 生成两次 map.put(A,B) == null。另一方面,赋值和比较操作不是线程安全的。

那么这段代码可以吗?如果没有,我该如何解决?

更新:我接受了 Martin Serrano 的回答,因为他的代码是线程安全的,并且不容易出现双重初始化问题。但我想指出,我的版本没有发现任何问题,作为答案发布在下面,而且我的版本不需要同步。

4

8 回答 8

3

不,我不认为它仍然是线程安全的。

您需要将 put 操作包装在同步块中。

根据ConcurrentHashMap 的 javadoc

检索操作(包括 get)一般不会阻塞,因此可能与更新操作(包括 put 和 remove)重叠。

于 2012-09-18T19:17:15.273 回答
2

这段代码不是线程安全的,因为

//already existing index - we already have results and we can use them
processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));

有机会获得您在看跌支票中临时插入的“”值。

因此,可以使此代码成为线程安全的:

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    /* to process client request we need to 
       create corresponding client type data.
       on the first signal we create that data, 
       on the second - we process the request*/

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData = clientTypesInitiated.get(clientTypeIndex);
    if (clientTypeData == null) {
        synchronized (clientTypesInitiated) {
          clientTypeData = clientTypesInitiated.get(clientTypeIndex);
          if (clientTypeData == null) {
              //new client type index arrived, this type was never processed
              //process data for that client type and put it into the map of types
              clientTypeData = createClientTypeData(clientTypeIndex);
              clientTypesInitiated.put(clientTypeIndex, clientTypeData);
          }
        }
    }
    processClientUsingClientTypeData(clientTypeData);
}

Object createClientTypeData(int clientIndex) {return new Object();}

void processClientUsingClientTypeData(Object clientTypeData) {}

}

于 2012-09-18T19:30:17.830 回答
2

你应该在这里使用 putIfAbsent,这个操作的语义类似于 CAS,它肯定是原子的。并且由于它是原子的 - 那么您对内部分配和比较没有问题。

当前代码不是线程安全的。

于 2012-09-18T19:20:40.513 回答
1

ConcurrentHashMap 被创建为不使用同步块,虽然使用它作为同步块的锁是可以的,但效率不高。如果一定要使用ConcurrentHashMap,正确的用法如下;

private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData = clientTypesInitiated.get(clientTypeIndex);
    if (clientTypeData == null) {
        Object newClientTypeData = createClientTypeData(clientTypeIndex);
        clientTypeData = clientTypesInitiated.putIfAbsent(clientTypeIndex, newClientTypeData);
        // if clientTypeData is null, then put successful, otherwise, use the old one
        if (clientTypeData == null) {
            clientTypeData = newClientTypeData;
        }
    }
    processClientUsingClientTypeData(clientTypeData);
}

老实说,上面的代码可能会创建clientTypeData多次(因为竞态条件),尽管它没有使用synchronized块。因此,您应该衡量创建clientTypeData它的成本,如果它太昂贵以至于它掩盖了不使用synchronized块,那么您甚至不需要使用ConcurrentHashMap. 使用正常HashMapsynchronized阻塞。

于 2012-10-29T10:00:03.740 回答
0

我认为代码根本无法按预期工作。看:

void onClientRequestReceived(int clientTypeIndex) {
     // the line below ensures the existing data is lost even if was not null
    if (clientTypesInitiated.put(clientTypeIndex, "") == null) {
        Object clientTypeData = createClientTypeData(clientTypeIndex);
        clientTypesInitiated.put(clientTypeIndex, clientTypeData);
    } else {
        processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
    }
}

我猜你宁愿想要一个 get 方法调用。

我建议这样一种方法:

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData;
    synchronized (clientTypesInitiated) {
        if ((clientTypeData = clientTypesInitiated.get(clientTypeIndex)) == null) {
            clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
        } 
    }
    processClientUsingClientTypeData(clientTypeData);
}
于 2012-09-18T19:29:46.220 回答
0

当前代码不是线程安全的,因为在 if 和 next put 之间可能发生上下文切换。您需要将其设置clientTypesInitiated为 ConcurrentHashMap 类型,而不是接口 Map。然后使用putIfAbsent方法

于 2012-09-18T19:29:24.960 回答
0

putIfAbsent 效果很好,但有时需要多次初始化。带有双重检查锁定的同步块可能可以解决这个问题。

但这是基于 AtomicIntegerArrays 的另一种选择:

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    private static final int CLIENT_TYPES_NUMBER = 10;
    private static AtomicIntegerArray clientTypesInitStarted = new AtomicIntegerArray(CLIENT_TYPES_NUMBER);
    private static AtomicIntegerArray clientTypesInitFinished = new AtomicIntegerArray(CLIENT_TYPES_NUMBER);

    void onClientRequestReceived(int clientTypeIndex) {

        int isInitStarted = clientTypesInitStarted.getAndIncrement(clientTypeIndex);
        int isInitFinished = clientTypesInitFinished.get(clientTypeIndex);

        if (isInitStarted == 0) {
            //new client type index arrived, this type was never processed
            //process data for that client type and put it into the map of types
            Object clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
            clientTypesInitFinished.getAndIncrement(clientTypeIndex);
        } else if (isInitFinished > 0) {
            //already existing index - we already have results and we can use them
            processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
        }
    }

    Object createClientTypeData(int clientIndex) {
        return new Object();
    }

    void processClientUsingClientTypeData(Object clientTypeData) {
    }
}

有什么意见吗?

于 2012-09-18T20:43:02.673 回答
0

不,不是,如果您需要整个方法是线程安全的,请将其声明为同步,或者使用同步块来放置您的关键代码。

于 2012-09-18T19:20:35.817 回答