0

我正在尝试为一项耗时的任务实现异步返回。所以任务数据被插入到一个Q中,并立即返回。一堆 20 个线程从该数据队列中获取任务。但是线程中的数据变空了,我想知道为什么。

代码段(仅代码的相关部分)

定义

private static LinkedList<Object[]> cacheQ = new LinkedList<Object[]>();
private static ArrayList<Thread> cacheThreads = null;

一次性初始化

if (cacheThreads == null) {
        cacheThreads = new ArrayList<Thread>();
        for (i = 0; i < 20; i++) {
            CacheThread cacheThread = readWriteOps.new CacheThread();
            Thread thread = new Thread(cacheThread);
            cacheThreads.add(i, thread);
            thread.start();
        }
        System.out.println("CAche Threads Started!");
    }

在开头的 Q 中添加新任务 ( addFirst)

public void arrayizeToCache(String key, CacheData value) {
    synchronized (cacheThreads) {
        Object[] tuple = new Object[SIZE] ;
        tuple[KEY] = key ;
        tuple[VALUE] = value ;
----NOT NULL----log.debug("CacheThread arrayizeToCache k"+key+"v"+value) ;
        cacheQ.addFirst((Object[])tuple);
        cacheThreads.notify();
    }
}

线程的实际工作

public class CacheThread implements Runnable {
    @Override
    public void run() {
        System.out.println("CachedThread running!");
        CacheData cacheStore = null;
        while (true) {
            try {
                String key;
                synchronized (cacheThreads) {
                    if (cacheQ.isEmpty()) {
                        log.debug("Cache Q waiting");
                        cacheThreads.wait();
                    }
                    if (cacheQ.isEmpty()) {
                        log.error("Cache List empty nothing to cache");
                        continue;
                    }
                    Object[] cacheData = (Object[]) cacheQ.removeLast();
                    key = (String) cacheData[KEY] ;
                    cacheStore = (CacheData) cacheData[VALUE];
----- HERE -----
//More code, but irrelevant for this question. 
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // log.debug(((Runnable) this).toString() +
            // "Server : flush SUCCESS");
        }
    }
}

-----HERE-----我得到一个空值cacheStore.getValue()。我在这里想念什么。一些基本的东西。

我为 Q 使用了 LinkedList,所以在结构上它应该可以工作。链表包含我需要处理的数据。它包含一个对象数组,因为我有超过 1 个数据元素。

编辑 :

private static final int KEY = 0 ;
private static final int VALUE = 1 ;
private static final int SIZE = 2 ;

再次编辑:这就是我所说的arrayize

ReadWriteOperations.getInstance(this).arrayizeToCache(key,
            new CacheData(subscription, SuperCache.NEVER_EXPIRE));

我的 CacheData 对象

public class CacheData {
private Object value ;
private Integer expiry = 0 ;
public CacheData(Object newValue) {
    value = newValue ;
}
public CacheData (Object newValue, Integer newExpiry) {
// THIS LINE WAS MISSING        value = newValue ;
    expiry = newExpiry ;
}
public Object getValue() {
    return value ;
}
public Integer getExpiry () {
    return expiry ;
}
}

答案:我是在初始化值。有点假设这将是一件更复杂的事情。这里是凌晨 2 点 :) 感谢@Gray。

4

1 回答 1

1

你的代码对我来说看起来不错。我很想知道:

  • cacheStore 的值在哪里更新?
  • 线程启动后是否更新?这可能是问题所在。
  • 如果是这样,则需要在synchronized (cacheThreads) {块内对其进行更新,以使更改对正在运行的线程可见。

以下是有关代码的一些附加注释:

  • 你正在分叉你自己的线程,然后有一个synchronized LinkedList工作。我会鼓励你研究一下两者都适用的ExecutorService模式。绝对推荐用于大多数线程任务。

  • 如果您不移动到 a ExecutorService,则应该将cacheQ队列切换为 a BlockingQueue,它会为您处理synchronized, notify()/wait()等。

于 2013-07-22T20:22:41.843 回答