0
点赞
收藏
分享

微信扫一扫

#yyds干货盘点# ssm整合rabbitmq消息队列的简单使用案例

ssm整合rabbitmq消息队列的简单使用案例

1.在pom.xml添加rabbitmq依赖:

<!--rabbitmq依赖 -->  
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>

2.编写rabbitmq的配置文件:rabbitMQ.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" host="127.0.0.1" port="5672"
/>


<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />

<!-- 消息接收者 -->
<bean id="messageReceiver" class="com.comit.appointment.modules.test.MessageConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>

<!--定义queue -->
<rabbit:queue name="queueChris" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="receiverChris" class="com.comit.appointment.modules.test.ChrisConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueChris" ref="receiverChris" />
</rabbit:listener-container>

<!-- 分隔线 -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory2"
username="guest" password="guest" host="127.0.0.1" port="5672" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2" />

<!--定义queue -->
<rabbit:queue name="queueShijj" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTest2"
durable="true" auto-delete="false" declared-by="connectAdmin2">
<rabbit:bindings>
<rabbit:binding queue="queueShijj" pattern="shijj.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
exchange="exchangeTest2" />

<!-- 消息接收者 -->
<bean id="recieverShijj" class="com.comit.appointment.modules.test.ShijjConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="queueShijj" ref="recieverShijj" />
</rabbit:listener-container>
</beans>


3.在ssm项目的上下文配置文件spring-content.xml中引入rabbitmq的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.1.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd"
default-lazy-init="true">


<!-- rabbitmq -->
<import resource="classpath*:rabbitMQ.xml" />

<context:component-scan base-package="com.comit.appointment">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>


</beans>

4.编写测试代码:

1.消息生产者:

package com.comit.appointment.modules.test.cs;

import java.io.IOException;

import javax.annotation.Resource;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;

import com.comit.appointment.modules.sys.entity.Users;

@Service("MessageProducer2")
public class MessageProducer2 {

@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;

public void sendMessage(Object message)throws IOException{
amqpTemplate.convertAndSend("queueTestKey",message);
}



}

注:queueTestKey 是在rabbitMQ.xml绑定到队列的,根据这个key来发送消息到相应的消息队列中。

<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>

2.消息消费者:获取消息并把消息插入日志记录表中

package com.comit.appointment.modules.test;


import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

import com.comit.appointment.modules.sys.dao.LogsMapper;
import com.comit.appointment.modules.sys.entity.Logs;


public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@Autowired
private LogsMapper logsMapper;

@Override
public void onMessage(Message message) {
try {
String messages = new String(message.getBody(), "UTF-8");
String[] str=messages.split(",");
Logs log=new Logs();
log.setNAME(str[0]);
log.setTIME(str[1]);
log.setACTION(str[2]);
System.out.println("log-->"+log.toString());
logsMapper.insertSelective(log);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.info("consumer receive message------->:{}", message);
}

}

在登录时发送已登录的消息:

package com.comit.appointment.modules.sys.controller.impl;

import java.io.IOException;
import java.util.Date;

import javax.servlet.http.HttpSession;

import org.apache.shiro.session.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;

import com.comit.appointment.modules.sys.controller.IBaseController;
import com.comit.appointment.modules.sys.entity.Users;
import com.comit.appointment.modules.sys.service.IOperaterService;
import com.comit.appointment.modules.sys.service.IUsersService;
import com.comit.appointment.modules.test.cs.MessageProducer2;

@Controller
@RequestMapping("/base")
public class BaseController implements IBaseController {

@Autowired
private IUsersService usersService;

@Qualifier(value="MessageProducer2")
@Autowired
private MessageProducer2 messageProducer;

@RequestMapping("/")
public String login() {
return "modules/sys/login";
}

@Override
@RequestMapping("/login")
public String login(HttpSession session,Users user) {
Users user1=usersService.findUser(user);
if(user1!=null) {
try {
messageProducer.sendMessage(user1.getACCOUNT()+","+new Date()+",登录了");
} catch (IOException e) {
e.printStackTrace();
}
session.setAttribute("user", user1);
return "modules/sys/index";
}
return "modules/sys/login";
}



}
举报

相关推荐

0 条评论