2016-06-14 64 views
0

我无法为Atmosphere资源的每个用户会话建立一个Atmosphere Broadcaster。我可以从文档中收集的内容是如何构建“聊天”应用程序,以向每个用户广播相同的消息。每次会话的气氛资源

是否有可能让Atmosphere框架为每个用户会话建立一个通道,还是我必须自己做一些事情并使用内存映射来处理这些连接?

这是我想要的资源:

/websockets/notifications 

我想用户A和B连接到不同的浏览器这个渠道,然后有独立串流信息的能力。我应该能够使用他们的会话ID来让大家了解哪个人发送回复。

Atmosphere支持吗?

相关的pom.xml

<spring-boot-starter-web.version>1.3.3.RELEASE</spring-boot-starter-web.version> 
<atmosphere-runtime.version>2.4.4</atmosphere-runtime.version> 
<atmosphere-javascript.version>2.3.0</atmosphere-javascript.version> 

大气配置

package com.hello; 

import javax.servlet.ServletContext; 
import javax.servlet.ServletException; 
import javax.servlet.ServletRegistration; 

import org.atmosphere.cache.UUIDBroadcasterCache; 
import org.atmosphere.cpr.ApplicationConfig; 
import org.atmosphere.cpr.AtmosphereFramework; 
import org.atmosphere.cpr.AtmosphereServlet; 
import org.atmosphere.cpr.MetaBroadcaster; 
import org.springframework.boot.context.embedded.ServletContextInitializer; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class AtmosphereConfiguration implements ServletContextInitializer { 

    @Bean 
    public AtmosphereServlet atmosphereServlet() { 
     return new AtmosphereServlet(); 
    } 

    @Bean 
    public AtmosphereFramework atmosphereFramework() { 
     return atmosphereServlet().framework(); 
    } 

    @Bean 
    public MetaBroadcaster metaBroadcaster() { 
     AtmosphereFramework framework = atmosphereFramework(); 
     return framework.metaBroadcaster(); 
    } 

    @Override 
    public void onStartup(ServletContext servletContext) throws ServletException { 
     configureAthmosphere(atmosphereServlet(), servletContext); 
    } 

    private void configureAthmosphere(AtmosphereServlet servlet, ServletContext servletContext) { 
     ServletRegistration.Dynamic atmosphereServlet = servletContext.addServlet("atmosphereServlet", servlet); 
     atmosphereServlet.setInitParameter(ApplicationConfig.ANNOTATION_PACKAGE, "com.hello"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_CACHE, UUIDBroadcasterCache.class.getName()); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_SHARABLE_THREAD_POOLS, "true"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "10"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "10"); 
     servletContext.addListener(new org.atmosphere.cpr.SessionSupport()); 
     atmosphereServlet.addMapping("/websocket/*"); 
     atmosphereServlet.setLoadOnStartup(0); 
     atmosphereServlet.setAsyncSupported(true); 
    } 

} 

大气资源

package com.hello; 

import java.nio.charset.StandardCharsets; 

import org.atmosphere.config.service.Get; 
import org.atmosphere.config.service.Disconnect; 
import org.atmosphere.config.service.ManagedService; 
import org.atmosphere.config.service.Ready; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.AtmosphereResourceEvent; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

@ManagedService(path = NotificationAtmosphereResource.PATH) 
public class NotificationAtmosphereResource { 

    public static final String PATH = "/websocket/notifications"; 

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class); 

    @Get   
    public void init(AtmosphereResource resource){ 
     resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name()); 
    } 

    @Ready 
    public void onReady(final AtmosphereResource resource) { 
     logger.info("Connected {}", resource.uuid()); 
    } 

    @Disconnect 
    public void onDisconnect(AtmosphereResourceEvent event) { 
     logger.info("Client {} disconnected [{}]", event.getResource().uuid(), 
       (event.isCancelled() ? "cancelled" : "closed")); 
    } 

} 

我从中发出的服务消息

package com.hello; 

import org.atmosphere.cpr.MetaBroadcaster; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 

@Service 
public class NotificationEmitterBean implements NotificationEmitter { 

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class); 

    @Autowired 
    private MetaBroadcaster metaBroadcaster; 

    @Autowired 
    private NotificationService notificationService; 

    @Autowired 
    private JsonMapper jsonMapper; 

    @Override 
    public void emitNotification(String sessionId, String msg) { 

    // This will broadcast to all users on /websocket/notifications 
    // How can I use sessionId to broadcast to the respective browser? 
    metaBroadcaster.broadcastTo(NotificationAtmosphereResource.PATH, 
        jsonMapper.toJson(msg));   
     } 

    } 

} 

回答

0

我能够完成这项工作的唯一方法是创建我自己的基于会话的广播器。我使用由Jeanfrancois Arcand编写的ExcludeSessionBroadcaster作为我的基准。

IncludeSessionBroadcaster.java

package com.hello; 

import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.Future; 

import org.atmosphere.cpr.AtmosphereConfig; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.Broadcaster; 
import org.atmosphere.cpr.BroadcasterFuture; 
import org.atmosphere.cpr.DefaultBroadcaster; 
import org.atmosphere.cpr.Deliver; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
* An implementation of {@link DefaultBroadcaster} that include one or more {@link AtmosphereResource} 
* 
* Based on ExcludeSessionBroadcaster written by Jeanfrancois Arcand 
* 
* @author Steven Zgaljic 
*/ 
public class IncludeSessionBroadcaster extends DefaultBroadcaster { 

    private static final Logger logger = LoggerFactory.getLogger(IncludeSessionBroadcaster.class); 

    public IncludeSessionBroadcaster(){} 

    public Broadcaster initialize(String id, AtmosphereConfig config) { 
     return super.initialize(id, config); 
    } 

    /** 
    * the AtmosphereResource r will be include for this broadcast 
    * 
    * @param msg 
    * @param r 
    * @return 
    */ 
    @Override 
    public Future<Object> broadcast(Object msg, AtmosphereResource r) { 

     if (destroyed.get()) { 
      throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used"); 
     } 

     Set<AtmosphereResource> sub = new HashSet<AtmosphereResource>(); 
     sub.removeAll(resources); 
     sub.add(r); 
     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return null; 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, sub.size()); 
     dispatchMessages(new Deliver(newMsg, sub, f, msg)); 
     return f; 
    } 


    /** 
    * the AtmosphereResources subset will be include for this broadcast 
    * 
    * @param msg 
    * @param subset 
    * @return 
    */ 
    @Override 
    public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) { 

     if (destroyed.get()) { 
      return futureDone(msg); 
     } 

     subset.retainAll(resources); 
     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return futureDone(msg); 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size()); 
     dispatchMessages(new Deliver(newMsg, subset, f, msg)); 
     return f; 
    } 

    /** 
    * session will be include for this broadcast 
    * 
    * @param msg 
    * @param s 
    * @return 
    */ 
    public Future<Object> broadcast(Object msg, String sessionId) { 

     if (destroyed.get()) { 
      return futureDone(msg); 
     } 

     Set<AtmosphereResource> subset = new HashSet<AtmosphereResource>(); 

     for (AtmosphereResource r : resources) { 
      if (!r.getAtmosphereResourceEvent().isCancelled() && 
        sessionId.equals(r.getRequest().getSession().getId())) { 
       subset.add(r); 
       break; 
      } 
     } 

     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return futureDone(msg); 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size()); 
     dispatchMessages(new Deliver(newMsg, subset, f, msg)); 
     return f; 
    } 
} 

然后我分配这个广播到大气资源。

NotificationAtmosphereResource.java

package com.hello; 

import java.nio.charset.StandardCharsets; 

import org.atmosphere.config.service.Get; 
import org.atmosphere.config.service.Disconnect; 
import org.atmosphere.config.service.ManagedService; 
import org.atmosphere.config.service.Ready; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.AtmosphereResourceEvent; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

@ManagedService(path = NotificationAtmosphereResource.PATH, 
    broadcaster=IncludeSessionBroadcaster.class) 
public class NotificationAtmosphereResource { 

    public static final String PATH = "/websocket/notifications"; 

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class); 

    @Get   
    public void init(AtmosphereResource resource){ 
     resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name()); 
    } 

    @Ready 
    public void onReady(final AtmosphereResource resource) { 
     logger.info("Connected {}", resource.uuid()); 
    } 

    @Disconnect 
    public void onDisconnect(AtmosphereResourceEvent event) { 
     logger.info("Client {} disconnected [{}]", event.getResource().uuid(), 
       (event.isCancelled() ? "cancelled" : "closed")); 
    } 

} 

然后,我可以将消息发送到唯一我想要的浏览器(阶段ID)。

NotificationEmitterBean.java

package com.hello; 

import org.atmosphere.cpr.MetaBroadcaster; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 

@Service 
public class NotificationEmitterBean implements NotificationEmitter { 

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class); 

    @Autowired 
    private BroadcasterFactory factory; 

    @Override 
    public void emitNotification(String sessionId, String msg) { 

      ((IncludeSessionBroadcaster)factory.lookup(NotificationAtmosphereResource.PATH)).broadcast(msg, sessionId);  
     } 

    } 

}