4

我想将元组对象存储在并发的 java 集合中,然后有一个高效的阻塞查询方法,它返回与模式匹配的第一个元素。如果没有这样的元素可用,它将阻塞,直到出现这样的元素。

例如,如果我有一堂课:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

还有一个像这样的集合:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

我想像这样查询它:

myFunkyCollection.get( new Pair( null, "foo" ) );

它返回字段等于“foo”或块的第一个可用对,second直到添加这样的元素。另一个查询示例:

myFunkyCollection.get( new Pair( null, null ) );

应该返回第一个可用的对,无论其值如何。

解决方案是否已经存在?如果不是这种情况,您建议如何实施该get( Pair p )方法?

澄清:该方法get( Pair p)还必须删除元素。名字的选择不是很聪明。一个更好的名字将是take( ... )

4

4 回答 4

3

这是一些源代码。它与 cb160 所说的基本相同,但拥有源代码可能有助于解决您可能仍有的任何问题。特别是 FunkyCollection 上的方法必须同步。

正如meriton 指出的那样,每次添加新对象时,get 方法都会对每个阻塞的get 执行一次O(n) 扫描。它还执行 O(n) 操作来删除对象。这可以通过使用类似于链表的数据结构来改进,您可以在其中将迭代器保留到最后检查的项目。我没有提供这个优化的源代码,但是如果你需要额外的性能,它应该不会太难实现。

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

输出:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

请注意,我将所有内容都放在一个源文件中以使其更易于运行。

于 2010-01-09T23:22:57.037 回答
3

我知道没有现有的容器会提供这种行为。您面临的一个问题是没有现有条目与查询匹配的情况。在这种情况下,您将不得不等待新条目到达,并且这些新条目应该到达序列的尾部。鉴于您正在阻止,您不想检查最新添加之前的所有条目,因为您已经检查了它们并确定它们不匹配。因此,您需要一些方法来记录您当前的位置,并能够在新条目到达时从那里向前搜索。

这种等待是一个工作Condition。正如cb160的答案中所建议的那样,您应该在集合中分配一个Condition实例,并通过Condition#await(). 您还应该向您的get()方法公开一个伴随重载以允许定时等待:

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

每次调用 时add(),调用 onCondition#signalAll()以解除阻塞等待未满足get()查询的线程,允许它们扫描最近添加的内容。

您还没有提到如何或是否曾经从该容器中删除项目。如果容器只增长,这将简化线程扫描其内容的方式,而无需担心来自其他线程改变容器的争用。每个线程都可以放心地开始查询,以确定可检查的最小条目数。但是,如果您允许移除项目,则需要面对更多挑战。

于 2010-01-09T23:37:34.600 回答
2

在您的 FunkyCollection add 方法中,您可以在每次添加元素时在集合本身上调用 notifyAll。

在 get 方法中,如果底层容器(任何合适的容器都可以)不包含您需要的值,请等待 FunkyCollection。当通知等待时,检查底层容器是否包含您需要的结果。如果是,则返回该值,否则,再次等待。

于 2010-01-09T23:06:04.040 回答
1

看来您正在寻找元组空间的实现。关于它们的Wikipedia 文章列出了一些 Java 实现,也许您可​​以使用其中之一。如果做不到这一点,您可能会找到一个可以模仿的开源实现,或相关的研究论文。

于 2010-01-10T00:17:55.893 回答