4

我正在使用 Atmosphere 运行时 0.6 快照。Tomcat 7 正确记录我正在使用 Http11 Nio 连接器,并且没有警告将使用 BlockingIO。

我正在尝试将消息发送到三种渠道。

  1. Global Broadcaster - 向所有暂停的资源广播。(全部)
  2. 广播到特定资源(例如,合作伙伴)
  3. 广播到当前资源(自身)

当登录动作发生时,为了实现这种广播,我必须在会话中存储什么?

我的代码的一些细节如下:

  1. 我的处理程序实现 AtmosphereHandler
  2. 在构造函数中,我将 globalBroadcaster 实例化如下:

    globalBroadcaster = new DefaultBroadcaster();

  3. 登录时,

    resource.getAtmosphereConfig().getServletContext().setAttribute(name, selfBroadcaster); 其中 name 是请求参数中的用户名,selfBroadcaster 是 DefaultBroadcaster 的新实例。

  4. 这是 sendMessageToPartner 的代码,

private synchronized void sendMessageToPartner(Broadcaster selfBroadcaster, AtmosphereResource<HttpServletRequest, HttpServletResponse> resource,String name, String message) {
// this gives the partner's name
String partner= (String) resource.getAtmosphereConfig().getServletContext().getAttribute(name + PARTNER_NAME_TOKEN);
// get partner's broadcaster
Broadcaster outsiderBroadcaster = (Broadcaster) resource
.getAtmosphereConfig().getServletContext()
.getAttribute(partner);
if (outsiderBroadcaster == null) {
sendMessage(selfBroadcaster, "Invalid user " + partner);
return;
}
// broadcast to partner
outsiderBroadcaster.broadcast(" **" + message);

我希望我已经提供了所有必需的信息。如果需要,我可以提供更多信息。

问题是,全局消息被发送。当向合作伙伴发送消息时,有时它会被阻止,客户端根本不会收到消息。这在 3-4 条消息之后始终如一地发生。

有一些线程问题吗?我究竟做错了什么?

我希望有人能帮我解决这个问题。

4

1 回答 1

6

好的,我想出了如何使用 Atmosphere 运行时来实现这一点。首先,我升级到 0.7 SNAPSHOT,但我认为相同的逻辑也适用于 0.6。

因此,要为单个用户创建广播器:

在 GET 请求中,

  // Use one Broadcaster per AtmosphereResource             
try {       
atmoResource.setBroadcaster(BroadcasterFactory.getDefault().get());     

} catch (Throwable t) {
                throw new IOException(t);
            }

            // Create a Broadcaster based on this session id.
            selfBroadcaster = atmoResource.getBroadcaster();
            // add to the selfBroadcaster
            selfBroadcaster.addAtmosphereResource(atmoResource);

            atmoResource.suspend();   

调用登录操作时,

//Get this broadcaster from session and add it to BroadcasterFactory.

Broadcaster selfBroadcaster = (Broadcaster) session.getAttribute(sessionId);

BroadcasterFactory.getDefault().add(selfBroadcaster, name);

Now the global broadcaster. The logic here is, you create a broadcaster from the first resource and then add each resource as they log in.

Broadcaster globalBroadcaster;

globalBroadcaster = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, GLOBAL_TOKEN, false);
                if (globalBroadcaster == null) {
                  globalBroadcaster = selfBroadcaster;

                    } else {
                        BroadcasterFactory.getDefault().remove(
                                globalBroadcaster, GLOBAL_TOKEN);
                        AtmosphereResource r = (AtmosphereResource) session
                                .getAttribute("atmoResource");
                        globalBroadcaster.addAtmosphereResource(r);

                    }
                    BroadcasterFactory.getDefault().add(globalBroadcaster,
                            GLOBAL_TOKEN);

最后,您可以向单个连接或全局广播到所有连接,如下所示:

// Single Connection/Session
Broadcaster singleBroadcaster= BroadcasterFactory.getDefault().lookup(
                            DefaultBroadcaster.class, name);
singleBroadcaster.broadcast("Only for you");

// Global 
Broadcaster globalBroadcaster = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class,GLOBAL_TOKEN, false);
globalBroadcaster.broadcast("Global message to all");

要向合作伙伴发送消息,只需查找合作伙伴的广播公司,然后对单个连接执行与上述相同的操作。

希望这可以帮助那些试图实现同样目标的人。可能有更好的方法来做到这一点。我想我将不得不使用这种方法,直到有人提出更好的解决方案。

于 2010-12-30T12:10:26.870 回答