2

我使用 memcached 1.4.7 版和 spymemcached 2.8.4 作为客户端来设置和获取它的键值。在多线程和高负载环境中使用时,spymemcached 客户端无法设置缓存本身的值。

我正在使用 40M 长密钥运行负载测试程序,这些密钥平均分为 20 个工作线程。每个工作线程尝试在缓存中设置 1M 键。因此有 40 个工作线程正在运行。

在我的 DefaultCache.java 文件中,我创建了一个包含 20 个 spymemcached 客户端的连接池。每次工作线程尝试将密钥设置为缓存 DefaultCache.java 时都会返回一个随机客户端,如 getCache() 方法所示。

当我的程序退出时,它会打印

加载的键总数 = 40000000

但是,当我转到 memcached telnet 控制台时,它总是会丢失数千条记录。我还通过随机获取几个输出 null 的键来验证它。没有驱逐且 cmd_set、curr_items、total_items 分别等于 39.5M

缓存中这些缺少键的原因可能是什么。

这是用于参考目的的代码。

public class TestCacheLoader { 
public static final Long TOTAL_RECORDS = 40000000L;
public static final Long LIMIT = 1000000L;

public static void main(String[] args) {
    long keyCount = loadKeyCacheData();
    System.out.println("Total no of keys loaded  = " + keyCount);
}

public static long loadKeyCacheData() {
    DefaultCache cache = new DefaultCache();
    List<Future<Long>> futureList = new ArrayList<Future<Long>>();
    ExecutorService executorThread = Executors.newFixedThreadPool(40);
    long offset = 0;
    long keyCount = 0;
    long workerCount = 0;
    try {
        do {
            List<Long> keyList = new ArrayList<Long>(LIMIT.intValue());
            for (long counter = offset; counter < (offset + LIMIT) && counter < TOTAL_RECORDS; counter++) {
                keyList.add(counter);
            }
            if (keyList.size() != 0) {
                System.out.println("Initiating a new worker thread " + workerCount++);
                KeyCacheThread keyCacheThread = new KeyCacheThread(keyList, cache);
                futureList.add(executorThread.submit(keyCacheThread));
            }
            offset += LIMIT;
        } while (offset < TOTAL_RECORDS);
        for (Future<Long> future : futureList) {
            keyCount += (Long) future.get();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        cache.shutdown();
    }
    return keyCount;
}

}

class KeyCacheThread implements Callable<Long> {
private List<Long> keyList;
private DefaultCache cache;

public KeyCacheThread(List<Long> keyList, DefaultCache cache) {
    this.keyList = keyList;
    this.cache = cache;
}

public Long call() {
    return createKeyCache();
}

public Long createKeyCache() {
    String compoundKey = "";
    long keyCounter = 0;
    System.out.println(Thread.currentThread() + " started to process " + keyList.size() + " keys");
    for (Long key : keyList) {
        keyCounter++;
        compoundKey = key.toString();
        cache.set(compoundKey, 0, key);
    }
    System.out.println(Thread.currentThread() + " processed = " + keyCounter + " keys");
    return keyCounter;
}

}

public class DefaultCache {
private static final Logger LOGGER = Logger.getLogger(DefaultCache.class);

private MemcachedClient[] clients;

public DefaultCache() {
    this.cacheNamespace = "";
    this.cacheName = "keyCache";
    this.addresses = "127.0.0.1:11211";
    this.cacheLookupTimeout = 3000;
    this.numberOfClients = 20;

    try {
        LOGGER.debug("Cache initialization started for the cache : " + cacheName);
        ConnectionFactory connectionFactory = new DefaultConnectionFactory(DefaultConnectionFactory.DEFAULT_OP_QUEUE_LEN,
                DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE, DefaultHashAlgorithm.KETAMA_HASH) {
            public NodeLocator createLocator(List<MemcachedNode> list) {
                KetamaNodeLocator locator = new KetamaNodeLocator(list, DefaultHashAlgorithm.KETAMA_HASH);
                return locator;
            }
        };

        clients = new MemcachedClient[numberOfClients];

        for (int i = 0; i < numberOfClients; i++) {
            MemcachedClient client = new MemcachedClient(connectionFactory, AddrUtil.getAddresses(getServerAddresses(addresses)));
            clients[i] = client;
        }
        LOGGER.debug("Cache initialization ended for the cache : " + cacheName);
    } catch (IOException e) {
        LOGGER.error("Exception occured while initializing cache : " + cacheName, e);
        throw new CacheException("Exception occured while initializing cache : " + cacheName, e);
    }
}

public Object get(String key) {
    try {
        return getCache().get(cacheNamespace + key);
    } catch (Exception e) {
        return null;
    }
}

public void set(String key, Integer expiryTime, final Object value) {
    getCache().set(cacheNamespace + key, expiryTime, value);
}

public Object delete(String key) {
    return getCache().delete(cacheNamespace + key);
}

public void shutdown() {
    for (MemcachedClient client : clients) {
        client.shutdown();
    }
}

public void flush() {
    for (MemcachedClient client : clients) {
        client.flush();
    }
}

private MemcachedClient getCache() {
    MemcachedClient client = null;
    int i = (int) (Math.random() * numberOfClients);
    client = clients[i];
    return client;
}

private String getServerAddresses(List<Address> addresses) {
    StringBuilder addressStr = new StringBuilder();
    for (Address address : addresses) {
        addressStr.append(address.getHost()).append(":").append(address.getPort()).append(" ");
    }
    return addressStr.toString().trim();
}

}

4

2 回答 2

1

我也看到了。原因是他们用于异步操作的反应器模式。这意味着每 1 个连接有 1 个工作线程。这 1 个线程是高负载和多线程机器下的引导瓶颈。1 个线程只能加载 1 个 CPU,而其余 23 个线程将处于空闲状态。

我们提出了连接池,增加了工作线程并允许利用更多的硬件能力。在github 上查看项目3levelmemcache

于 2014-08-18T06:36:48.013 回答
0

我不确定,但 spymemcached 库本身似乎存在问题。我将 DefaultCache.java 文件的实现更改为使用 xmemcached,一切都开始正常工作。现在我没有丢失任何记录。telnet stats 显示匹配数量的 set 命令。

不过感谢您的耐心。

于 2012-09-27T14:56:36.740 回答