5

动机

我需要更多的眼睛来确认我能够同时调用这个方法XMPPConnection.sendPacket(Packet)。对于我当前的代码,我正在以串行方式调用 Callables 列表(最多 3 个)。每个 Callable 在一个 XMPPConnection 上发送/接收 XMPP 数据包。我计划通过分离多个线程来并行化这些 Callables 并且每个 Callable 将在共享 XMPPConnection 上调用 sendPacket 而无需同步。

XMPP连接

class XMPPConnection
{
    private boolean connected = false;

    public boolean isConnected() 
    {
        return connected;
    }

    PacketWriter packetWriter;

    public void sendPacket( Packet packet ) 
    {
        if (!isConnected())
            throw new IllegalStateException("Not connected to server.");

        if (packet == null) 
            throw new NullPointerException("Packet is null.");

        packetWriter.sendPacket(packet);
    }
}

封包写入器

class PacketWriter
{
    public void sendPacket(Packet packet) 
    {
        if (!done) {
            // Invoke interceptors for the new packet 
            // that is about to be sent. Interceptors
            // may modify the content of the packet.
            processInterceptors(packet);

            try {
                queue.put(packet);
            }
            catch (InterruptedException ie) {
                ie.printStackTrace();
                return;
            }
            synchronized (queue) {
                queue.notifyAll();
            }

            // Process packet writer listeners. Note that we're 
            // using the sending thread so it's expected that 
            // listeners are fast.
            processListeners(packet);
    }

    protected PacketWriter( XMPPConnection connection ) 
    {
        this.queue = new ArrayBlockingQueue<Packet>(500, true);
        this.connection = connection;
        init();
    }
}

我的结论

由于 PacketWriter 使用的是 BlockingQueue,因此我打算从多个线程调用 sendPacket 没有问题。我对么 ?

4

3 回答 3

2

是的,您可以毫无问题地从不同的线程发送数据包。

Smack 阻塞队列是因为你不能让不同的线程同时写入输出流。Smack 负责通过以每个数据包的粒度写入输出流来同步输出流。

Smack 实现的模式只是一个典型的生产者/消费者并发模式。您可能有多个生产者(您的线程)和只有一个消费者(Smack 的 PacketWriter 在它自己的线程中运行)。

问候。

于 2011-05-25T21:51:38.233 回答
0

如果可以限制为 Java 5+,则可以考虑使用 BlockingQueue。

来自 Java API 文档,对使用 ArrayBlockingQueue 进行了微小的更改:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new ArrayBlockingQueue();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

对于您的使用,您将让真正的发送者(实际连接的持有者)成为消费者,而数据包准备者/发送者是生产者。

一个有趣的附加想法是,您可以使用 PriorityBlockingQueue 来允许闪存覆盖在任何其他等待数据包之前发送的 XMPP 数据包。

此外,格伦在设计上的观点也很好。您可能想看看 Smack API ( http://www.igniterealtime.org/projects/smack/ ) 而不是自己创建。

于 2009-12-15T16:08:04.830 回答
0

你在这里没有提供足够的信息。

我们不知道以下是如何实现的:

  • 进程拦截器
  • 进程监听器

谁读/写“完成”变量?如果一个线程将其设置为 true,那么所有其他线程将静默失败。

乍一看,这看起来不是线程安全的,但是无法从您发布的内容中确定。

其他问题:

  • 为什么 PacketWriter 是 XMPPConnection 的类成员,它只用于一种方法?
  • 为什么 PacketWriter 有一个 XMPPConnection 成员 var 而没有使用它?
于 2009-09-22T17:00:49.190 回答