0
点赞
收藏
分享

微信扫一扫

ActiveMQ

君心浅语 2022-04-18 阅读 145
java

1.消息中间件应用场景

1.1异步处理

       比如场景:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。

(1)串行方式

       将注册信息写入数据库成功后,发送注册邮件,在发送注册短信。以上三个任务全部完成后,返回给客户端。

 

(2)并行方式

       将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

 

(3)异步处理

       引入消息中间件,将部分的业务逻辑,进行异步处理,改造后的架构如下:

 

可以发现,系统的吞吐量明显提高。

1.2应用解耦

场景:用户下单后,订单系统需要通知库存系统。

传统做法是订单系统调用库存系统的接口。缺点:假设库存系统无法访问,则导致订单失败。

 

解决方案:引入消息队列

 

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

两个系统之间互不影响。因为下单后,订单系统写入消息队列就不再关心其他的后续操作。实现了订单系统和库存系统的应用解耦。

1.3流量消峰

       流量消峰一般在秒杀或团抢活动中使用广泛。一般会因为流量暴增导致应用挂掉。解决这个问题,一般需要在应用前端加入消息队列。

       通过加入消息队列完成以下功能:

(1)可以控制活动人数

(2)可以缓解短时间内高流量压垮应用

 

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

2.JMS消息模型

JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

       消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)

(1)P2P(Ponit to Point) 点对点模型Queue队列模型

(2)Publish/Subscribe 发布/订阅模型Topic主题模型

2.1点对点模型

点对点模型:即生产者和消费者之间的消息往来。

       每个消息都被发送到特定的消息队列,接受者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

 

       特点:

  1. 每个消息只有一个消费者,一旦被消费,消息就不再在消息队列中
  2. 发送者和接受者之间在时间上没有依赖性,也就是当发送者发送了消息之后,不管接受者有没有在运行,它不会影响到消息被发送到队列。
  3. 接收者在成功接收消息之后必须要向对列应答成功

2.2发布/订阅模型

       包含三个角色:主题(Topic),发布者,订阅者。多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。

 

       发送者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。

特点:

  1. 每个消息可以有多个消费者;
  2. 发布者和订阅者之间有时间上的依赖性(必须先订阅主题,再发送消息);
  3. 订阅者必须保持运行的状态,才能接受发布者发布的消息。

3.JMS核心API

(1)ConnectionFactory

       创建Connection对象工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactoryTopicConnectionFactory两种。

(2)Destination

       Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于生产者来说,Destination是某个队列或主题;对于消费者来说,Destination也是某个队列或主题。因此,Destination实际上就是两种类型的对象QueueTopic

(3)Connection

       Connection是客户端和JMS系统之间建立的连接。Connection可以生产一个或多个Session。

(4)Session

       Session是对消息进行操作的接口,可以通过Sessionu创建生产者、消费者、消息。Session提供了事务的功能,如果需要使用Session发送/接收消息时,可以将这些发送/接收动作放到一个事务中。

(5)Producer

       Producer消息生产者,消息生产者由Session创建,并用于将消息发送到Destination。

消息生产者有两种类型:QueueSenderTopicPublisher。可以调用消息生产者的方法,send和publish发送消息。

(6)Consumer

Consumer消息消费者,消息消费者者由Session创建,并用于接收被发送到Destination的消息。消息消费者有两种类型:QueueRecieveTopicSubscriber。可分别通过session的createReceiver或createSubscriber来创建。

(7)MessageListener

       消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。

 

4.原生JMS API操作ActiveMQ

4.1 点对点模式

//1.创建连接工厂

// 2.创建链接

// 3.打开链接

// 4.创建session

// 5.创建/指定目标地址

// 6.创建消息生产者/消费者

// 7.创建/接收消息

// 8.发送消息

// 9.释放资源

1.创建空项目,然后基于maven创建一个普通java项目,导入依赖

2.创建消息生产者类,按照9个步骤走

package org.weiwei;



import org.apache.activemq.ActiveMQConnectionFactory;



import javax.jms.*;



/**

 * 消息生产者

 */

public class Produer {

    public static void main(String[] args) throws JMSException {

        //1.创建连接工厂

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");//注意tcp端口是61616,http端口是8161

        // 2.创建链接

        Connection connection = factory.createConnection();

        // 3.打开链接

        connection.start();

        // 4.创建session

            /*

                参数1:是否开启事务操作

                参数2:消息确机制

             */

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目标地址

        Queue queue = session.createQueue("queue1");

        // 6.创建消息生产者

        MessageProducer producer = session.createProducer(queue);

        // 7.创建消息

        TextMessage testMessage = session.createTextMessage("test message");

        // 8.发送消息

        producer.send(testMessage);

        System.out.println("消息发送完成");

        // 9.释放资源

        session.close();

        connection.close();

    }

}

3.运行producer

 

可以看到有1条消息等待被消费

4.创建新模块,同样导入相同依赖,创建消费者类

package org.weiwei;



import org.apache.activemq.ActiveMQConnectionFactory;



import javax.jms.*;



/**

 * 点对点消费者方式(第一种方案)

 */

public class P2P_Consumer1 {

    public static void main(String[] args) throws JMSException {

        //1.创建连接工厂

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");

        // 2.创建链接

        Connection connection = factory.createConnection();

        // 3.打开链接

        connection.start();

        // 4.创建session

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.指定目标地址

        Queue queue = session.createQueue("queue1");

        // 6.创建消息消费者

        MessageConsumer consumer = session.createConsumer(queue);

        // 7.接收消息

        while (true){

            Message message = consumer.receive();

            //如果没有消息了,就结束

            if (message==null){

                break;

            }

            //如果还有消息,判断是什么类型的消息

            if (message instanceof TextMessage) {

                TextMessage textMessage = (TextMessage) message;

                System.out.println("接收的消息:"+textMessage.getText());

            }



        }

    }



}

输出:

 

 

5.点对点消息消费者(第二种方案:推荐)

package org.weiwei;



import org.apache.activemq.ActiveMQConnectionFactory;



import javax.jms.*;



/**

 * 点对点消费者方式2

 */

public class P2P_Consumer2 {

    public static void main(String[] args) throws JMSException {

        //1.创建连接工厂

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");

        // 2.创建链接

        Connection connection = factory.createConnection();

        // 3.打开链接

        connection.start();

        // 4.创建session

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.指定目标地址

        Queue queue = session.createQueue("queue1");

        // 6.创建消息消费者

        MessageConsumer consumer = session.createConsumer(queue);

        // 7.设置消息监听器,接收消息

        consumer.setMessageListener(new MessageListener() {

            //处理消息

            @Override

            public void onMessage(Message message) {

                if (message instanceof TextMessage) {

                    TextMessage textMessage = (TextMessage) message;

                    try {

                        System.out.println("接收的消息(2):" + textMessage.getText());

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            }

        });

        //注意:在监听器模式下,千万不要关闭连接,一旦关闭,消息无法接收



    }



}

4.2发布/订阅模式

(1)消息生产者变化不大,改变的地方为第五步

package org.weiwei;



import org.apache.activemq.ActiveMQConnectionFactory;



import javax.jms.*;



/**

 * 发布订阅模式---消息生产者

 */

public class PS_Produer {

    public static void main(String[] args) throws JMSException {

        //1.创建连接工厂

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");//注意tcp端口是61616,http端口是8161

        // 2.创建链接

        Connection connection = factory.createConnection();

        // 3.打开链接

        connection.start();

        // 4.创建session

            /*

                参数1:是否开启事务操作

                参数2:消息确机制

             */

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目标地址

        Topic topic = session.createTopic("topic1");

        // 6.创建消息生产者

        MessageProducer producer = session.createProducer(topic);

        // 7.创建消息

        TextMessage testMessage = session.createTextMessage("test message-----topic");

        // 8.发送消息

        producer.send(testMessage);

        System.out.println("topic消息发送完成");

        // 9.释放资源

        session.close();

        connection.close();

    }

}

(2)消息消费者同理改变的地方为第五步

package org.weiwei;



import org.apache.activemq.ActiveMQConnectionFactory;



import javax.jms.*;



/**

 * 发布订阅模式消费者方式--监听器模式

 */

public class PS_Consumer2 {

    public static void main(String[] args) throws JMSException {

        //1.创建连接工厂

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");

        // 2.创建链接

        Connection connection = factory.createConnection();

        // 3.打开链接

        connection.start();

        // 4.创建session

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.指定目标地址

        Topic topic = session.createTopic("topic1");

        // 6.创建消息消费者

        MessageConsumer consumer = session.createConsumer(topic);

        // 7.设置消息监听器,接收消息

        consumer.setMessageListener(new MessageListener() {

            //处理消息

            @Override

            public void onMessage(Message message) {

                if (message instanceof TextMessage) {

                    TextMessage textMessage = (TextMessage) message;

                    try {

                        System.out.println("接收的消息(2)--topic:" + textMessage.getText());

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            }

        });

        //注意:在监听器模式下,千万不要关闭连接,一旦关闭,消息无法接收



    }



}

(3)运行:注意【先运行消费者,再运行生产者------先订阅再消费】

输出:

topic消息发送完成

接收的消息(2)--topic:test message-----topic

5. SpringBoot 整合 ActiveMQ

5.1点对点模式

       详情见:

消息中间件解决方案JMS(1)_Java-请多指教的博客-CSDN博客

5.2发布订阅模式

创建一个springboot项目,勾选web和activemq

(1)配置文件

application.properties

(2)启动类开启注解@EnableJms//启动消息队列

(3)配置类

package com.weiwei.activemq_ps.config;



import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.command.ActiveMQTopic;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.jms.core.JmsMessagingTemplate;



import javax.jms.ConnectionFactory;

import javax.jms.Topic;



@Configuration

public class activemqConfig {

    @Value("${spring.activemq.broker-url}")

    private String brokerUrl;

    @Value("${spring.activemq.user}")

    private String username;

    @Value("${spring.activemq.password}")

    private String password;

    @Value("${spring.activemq.topic-name}")

    private String topicName;



    //创建链接工厂

    @Bean

    public ConnectionFactory connectionFactory(){

        return new ActiveMQConnectionFactory(username,password,brokerUrl);

    }

    //jms的消息模板

    @Bean

    public JmsMessagingTemplate jmsMessagingTemplate(){

        return new JmsMessagingTemplate(connectionFactory());

    }

    //发布订阅模式消息对象

    @Bean

    public Topic topic(){

        return new ActiveMQTopic(topicName);

    }

}

(4)消息生产者

package com.weiwei.activemq_ps.controller;



import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;



import javax.jms.Topic;



@RestController

public class PSProducer {

    @Autowired

    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired

    private Topic topic;

    //消息发送方法

    @RequestMapping("/sendtopic.do")

    public String sendQueue(String username){

        /*

            参数1:主题名称

            参数2:消息内容

         */

        jmsMessagingTemplate.convertAndSend(this.topic,username);

        return "send-success-topic";

    }

}

(5)消息消费者

package com.weiwei.activemq_ps.controller;



import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;



import javax.jms.Message;

import javax.jms.TextMessage;



/**

 * 发布订阅模式的消息消费者

 */

@Component //放入IOC容器

public class PSConsumer1 {



        @JmsListener(destination = "${spring.activemq.topic-name}")

        public void readActiveMessage(Message message){

            if (message instanceof TextMessage){

                TextMessage textMessage=(TextMessage)message;

                try {

                    System.out.println("接收topic消息:"+textMessage.getText());

                }catch (Exception e){

                    e.printStackTrace();

                }

            }

        }





}

运行测试:

localhost:8080/sendtopic.do?username=wuwei

 

6.ActiveMQ消息组成与高级特性

6.1JMS消息组成详解

       整个JMS消息协议组成结构:

      

结构

描述

JMS Provider

消息中间件/消息服务器

JMS Producer

消息生产者

JMS Consumer

消息消费者

JMS Message

消息(重要)

JMS Message消息由三部分组成:

(1)消息头

(2)消息体

(3)消息属性

1.消息头

       JMS消息头定义了若干字段,用于客户端与JMS提供者之间识别和发送消息。

 

举报

相关推荐

0 条评论