`
chenshangge
  • 浏览: 86383 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类

SpringMVC 异步(长轮询)实现消息定点推送

阅读更多
$(function () {
	
	getMsg();

	
});

function getMsg()
{
	$.ajax({
		url:"/polling/msg",
		type:"get",
		data:{},
		success:function(data)
		{
			if(data != null && data!="")
				alertShow(data.msg);
			
			getMsg();
		}
	});
} 



/**
 * 
 * @author {chensg}:2016年6月1日
 * example
 *
 */
@Controller
@RequestMapping("/polling/")
public class PollingController {
	
	@Autowired
	MessageContainer messageContainer;  //全局存放每一个user创建的DeferredResult实例,key:userId,value:DeferredResult
	@Autowired
	RabbitTemplate rabbitTemplate; 
	/**
	 * 长轮询
	 * @return
	 */
	@RequestMapping(value="msg", method=RequestMethod.GET)
	public @ResponseBody DeferredResult<UserMessage> getMessage() {
		final String userId = (UserDetails) SecurityContextHolder.getContext()
    .getAuthentication()
    .getPrincipal().getUsername();
		DeferredResult<UserMessage> result = new DeferredResult<UserMessage>(30000l,null);  //设置超时30s,超时返回null
	    final Map<String, DeferredResult> resultMap=messageContainer.getUserMessages();
	    resultMap.put(userId, result);  
	    result.onCompletion(new Runnable() 
	    {  
	    	@Override  
            public void run() {  
                resultMap.remove(userId);  
            }  
        });  
	    
	    return result;
	}
	
	/**
	 * test 新增需要推给某某用户的消息
	 * @return
	 */
	@RequestMapping(value="msg", method=RequestMethod.POST)
	public @ResponseBody RestResult addMessage(String msg,String userId) {
		
		UserMessage userMsg = new UserMessage();
		userMsg.setUserId(userId);
		userMsg.setMsg(msg);
		//系统或者其他用户需要推送的消息放入消息队列
		rabbitTemplate.convertAndSend("test.exchange", "test.binding", userMsg);
		
		return null;
	}
}



页面加载完成时,该用户请求/polling/msg控制器接口,接口里会创建一个DeferredResult实例,设置超时30S,超时返回null。DeferredResult<?> 允许应用程序从一个线程中返回,而何时返回则由线程决定。

消息实体类

public class UserMessage implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L; 
	
	private String userId;
	
	private String msg;

	public String getUserId() {
		return userId;
	}

	public void setUserId(String userId) {
		this.userId = userId;
	}

	public String getMsg() {
		return msg;
	}

	public void setMsg(String msg) {
		this.msg = msg;
	}

	
}



配置rabbitMQ

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd">

	
	<!-- 创建一个connectionFactory -->
	<rabbit:connection-factory id="rabbitConnectionFactory" 
		host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
		virtual-host="/" />
	
	<!-- 创建一个rabbitTemplate, 设置retryTemplate -->
	<rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory"
		retry-template="retryTemplate" />
	
	<!-- 创建一个retryTemplate -->
	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
		<property name="backOffPolicy">
		<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
			<property name="initialInterval" value="500" />
			<property name="multiplier" value="10.0" />
			<property name="maxInterval" value="10000" />
		</bean>
	</property>
	</bean>
	
	<rabbit:admin connection-factory="rabbitConnectionFactory" />
	
	<!-- 创建一个用于消息推送的队列 -->
	<rabbit:queue id="testQueue" name="test.polling" />
	
	<rabbit:direct-exchange name="test.exchange">
		<rabbit:bindings>
			<rabbit:binding queue="test.polling" key="test.binding" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	<!-- 创建一个消息处理器 -->
	<bean id="servicePollingHandler" 
		class="com.xxx.controller.test.ServicePollingHandler" />

	<!-- 绑定监听器和队列 -->
	<rabbit:listener-container connection-factory="rabbitConnectionFactory">
		<rabbit:listener ref="servicePollingHandler"
			method="testPollingHandle"
			queues="testQueue" />
	</rabbit:listener-container>
	
</beans>



public class ServicePollingHandler {
	
	@Autowired
	MessageContainer messageContainer;
	
	public void testPollingHandle(UserMessage userMessage)
	{
		Map<String, DeferredResult> msgContainer = messageContainer.getUserMessages();
		DeferredResult<UserMessage> deferredResult = msgContainer.get(userMessage.getUserId());  

	    	    if (deferredResult!=null){  
	        deferredResult.setResult(userMessage);  //调用setResult(),线程返回信息。
	    }  
	}
}

 

@PropertySource(value="classpath:application.properties")
@ImportResource({"classpath:amqp.xml"})
public class RootConfig {

	@Bean
	public MessageContainer messageContainer() {
		return new MessageContainer();
	}
	
}




public class MessageContainer {

	private ConcurrentHashMap<String, DeferredResult> userMessages = new ConcurrentHashMap<String, DeferredResult>();	//线程安全

	public ConcurrentHashMap<String, DeferredResult> getUserMessages() {
		return userMessages;
	}
	
}



该例子的用途,当一个用户登录页面时,异步请求后台/polling/msg,后台创建一个线程,维持改长连接30s,当超时或者返回信息,页面则再次请求后台,维持一个30s的长连接(长轮询)。
系统或者其他用户调用/polling/msg  method:post,传入msg与userId,控制器把消息放入消息队列,消息队列把消息推送到ServicePollingHandler类testPollingHandle()方法,该方法根据userId获得该用户登陆之后的页面长轮询创建的deferredResult实例,调用setResult,页面接受到线程返回消息。

可以基于以上代码,实现web聊天

使用支付宝扫描上方二维码领取红包

 

  • 大小: 6.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics