12

我有一个计算图(带有软值),用于缓存昂贵计算的结果。

现在我有一种情况,我知道可能会在接下来的几秒钟内查找特定的键。该密钥的计算成本也比大多数密钥高。

我想在最低优先级线程中提前计算该值,以便当最终请求该值时,它已经被缓存,从而提高了响应时间。

这样做的好方法是什么:

  1. 我可以控制执行计算的线程(特别是它的优先级)。
  2. 避免了重复工作,即计算只进行一次。如果计算任务已经在运行,则调用线程等待该任务而不是再次计算值(FutureTask实现这一点。使用 Guava 的计算映射,如果你只调用get,但如果你将它与调用混合,则不是这样put。)
  3. “预先计算值”方法是异步的和幂等的。如果计算已经在进行中,它应该立即返回,而无需等待该计算完成。
  4. 避免优先级倒置,例如,如果一个高优先级线程请求该值,而一个中等优先级线程正在做一些不相关的事情,但计算任务在一个低优先级线程上排队,那么高优先级线程不能被饿死。也许这可以通过临时提高计算线程的优先级和/或在调用线程上运行计算来实现。

这如何在所有涉及的线程之间进行协调?


附加信息
我的应用程序中的计算是图像过滤操作,这意味着它们都受 CPU 限制。这些操作包括仿射变换(范围从 50µs 到 1ms)和卷积(最长 10ms)。当然,不同线程优先级的有效性取决于操作系统抢占较大任务的能力。

4

4 回答 4

8

您可以通过将 Future 与 ComputedMap 一起使用来安排后台计算的“一次”执行。Future 代表计算值的任务。Future 由 ComputedMap 创建,同时传递给ExecutorService进行后台执行。可以使用您自己的ThreadFactory实现来配置执行器,该实现创建低优先级线程,例如

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

当需要该值时,您的高优先级线程然后从映射中获取未来,并调用 get() 方法来检索结果,并在必要时等待计算结果。为了避免优先级倒置,您向任务添加了一些额外的代码:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

get()如果任务尚未完成,这将负责将任务的优先级提高到线程调用的优先级,并在任务正常或其他方式完成时将优先级返回到原始优先级。(为了简短起见,代码不会检查优先级是否确实更高,但这很容易添加。)

当高优先级任务调用 get() 时,future 可能还没有开始执行。您可能想通过为执行程序服务使用的线程数设置较大的上限来避免这种情况,但这可能不是一个好主意,因为每个线程都可能以高优先级运行,消耗尽可能多的 cpu操作系统将其关闭。池的大小可能与硬件线程的数量相同,例如将池大小设置为Runtime.availableProcessors(). 如果任务还没有开始执行,而不是等待执行者安排它(这是一种优先级反转的形式,因为您的高优先级线程正在等待低优先级线程完成),那么您可以选择取消它当前执行程序并在仅运行高优先级线程的执行程序上重新提交。

于 2010-07-13T18:00:24.050 回答
2

协调这种情况的一种常见方法是拥有一个其值为 FutureTask 对象的映射。因此,以我从我的 Web 服务器上编写的一些代码为例,其基本思想是,对于给定的参数,我们查看是否已经存在 FutureTask(意味着已经安排了使用该参数的计算),并且如果是这样,我们等待它。在此示例中,我们以其他方式安排查找,但如果需要,可以通过单独的调用在其他地方完成:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

在线程优先级方面:我想知道这是否会达到你认为的效果?我不太明白您关于将查找的优先级提高到等待线程之上的观点:如果线程正在等待,那么它正在等待,无论其他线程的相对优先级如何......(你可能想看看一些我写的关于线程优先级线程调度的文章,但长话短说,我不确定改变优先级是否一定会为你带来你所期望的。)

于 2010-07-08T15:53:43.320 回答
2

我怀疑您通过关注线程优先级而走错了路。通常,由于 I/O(内存不足数据)与 CPU 限制(逻辑计算)相比,缓存保存的数据计算成本很高。如果您预取以猜测用户未来的操作,例如查看未读电子邮件,那么它向我表明您的工作可能是 I/O 绑定的。这意味着只要不发生线程饥饿(调度程序不允许),使用线程优先级玩游戏不会提供太多性能提升。

如果成本是 I/O 调用,则后台线程被阻塞,等待数据到达并且处理该数据应该相当便宜(例如反序列化)。由于线程优先级的更改不会提供太多的加速,因此在后台线程池上异步执行工作就足够了。如果缓存未命中惩罚太高,那么使用多层缓存往往有助于进一步减少用户感知延迟。

于 2010-07-08T19:46:58.927 回答
1

作为线程优先级的替代方案,只有在没有高优先级任务正在进行时,您才可以执行低优先级任务。这是一个简单的方法:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

在您的用例中,两个 Impl() 方法都将在计算图上调用 get(),在同一线程中调用 highPriorityImpl(),在不同线程中调用 lowPriorityImpl()。

您可以编写一个更复杂的版本,将低优先级任务推迟到高优先级任务完成,并限制并发低优先级任务的数量。

于 2010-07-14T11:50:35.393 回答