【微服务学习笔记(二)】Docker、RabbitMQ、SpringAMQP、Elasticseach
本篇内容为学习笔记,学习链接为SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课
课程资料链接可在视频下方找到,此处不粘贴,而以下的代码都是资料中有的,只不过做为记录单独粘贴,做为学习使用的参考步骤。
Docker
作用:解决开发部署时依赖、环境冲突的问题。
Docker如何解决依赖的兼容问题的?
- 将应用的Libs(函数库)、Deps(依赖)配置与应用一起打包。
 - 将每个应用放到一个隔离容器去运行,避免互相干扰。
 

Docker如何解决不同系统环境的问题?
- Docker将用户程序与所需要调用的系统(比如Ubuntu)函数库一起打包。
 - Docker运行到不同操作系统时,直接基于打包的库函数,借助于操作系统的Linux内核来运行。
 
Docker与虚拟机的比较
docker是一个系统进程,虚拟机是在操作系统中的操作系统。

| 特性 | Docker | 虚拟机 | 
|---|---|---|
| 性能 | 接近原生 | 性能较差 | 
| 硬盘占用 | 一般为MB | 一般为GB | 
| 启动 | 秒级 | 分钟级 | 
镜像和容器
镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。
**容器(Container)😗*镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器做隔离,对外不可见。
Docker和DockerHub
- DockerHub:DockerHub是一个Docker镜像的托管平台。这样的平台称为Docker Registry。
 - 国内也有类似于DockerHub 的公开服务,比如网易云镜像服务、阿里云镜像库等。
 
架构:
Docker是一个CS架构的程序,由两部分组成:
- 服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等。
 - 客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。
 

安装
可以参考官方文档:
 Docker安装文档
还可以参考这篇博客:
 Docker 安装 (完整详细版)
基础命令
可以通过 docker --help命令查看帮助文档,学习使用。
 
 拉取命令步骤:
 1、进入docker官网搜索需要拉取的镜像
 2、按照官网搜索出来的指令放入控制台

 数据卷是一个虚拟目录,指向宿主机文件系统中的某个目录。
容器与数据耦合的问题:
- 不便于修改
当我们要修改Nginx的html内容时,需要进入容器内部修改,很不方便。 - 数据不可复用
在容器内的修改对外是不可见的。所有修改对新创建的容器是不可复用的。 - 升级维护困难
数据在容器内,如果要升级容器必然删除旧容器,所有数据都跟着删除了。

 
命令:
docker volume [选项]
 
选项:
- create 创建一个volume
 - inspect 显示一个或多个volume的信息
 - ls 列出所有的volume
 - prune 删除未使用的volume
 - rm 删除一个或多个指定的volume
 
在创建容器时,可以通过-v参数来挂载一个数据卷到某个容器目录:
 
Dockerfile自定义镜像

Dockerfile就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。每一个指令都会形成一层Layer。
 
搭建镜像仓库
Docker官方的Docker Registry是一个基础版本的Docker镜像仓库,具备仓库管理的完整功能,但是没有图形化界面。
搭建方式比较简单,命令如下:
docker run -d \
    --restart=always \
    --name registry        \
    -p 5000:5000 \
    -v registry-data:/var/lib/registry \
    registry
 
命令中挂载了一个数据卷registry-data到容器内的/var/lib/registry 目录,这是私有镜像库存放数据的目录。
访问http://YourIp:5000/v2/_catalog 可以查看当前私有镜像服务中包含的镜像
使用DockerCompose部署带有图像界面的DockerRegistry,命令如下:
version: '3.0'
services:
  registry:
    image: registry
    volumes:
      - ./registry-data:/var/lib/registry
  ui:
    image: joxit/docker-registry-ui:static
    ports:
      - 8080:80
    environment:
      - REGISTRY_TITLE=传智教育私有仓库
      - REGISTRY_URL=http://registry:5000
    depends_on:
      - registry
 
我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
# 打开要修改的文件
vi /etc/docker/daemon.json
# 添加内容:
"insecure-registries":["http://192.168.150.101:8080"]
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker
 
MQ(服务异步通讯)
MQ(MessageQueue),消息队列,事件驱动架构中的Broker。
同步调用:
 调用方需要等待执行方的调用结果。(就像打电话一样,需要实时响应)
优点:时效性高
缺点:
- 耦合度高
 - 性能和吞吐能力下降
 - 有额外的资源消耗
 - 有级联失败问题
 
异步调用:
 调用方无需等待执行方的执行结果 (就像发微信,不需要马上回复)。
 常见实现为事件驱动模式:
 
 优点:
- 耦合度低
 - 吞吐量提升
 - 故障隔离
 - 流量削峰
异步通信的缺点: - 依赖于Broker的可靠性、安全性、吞吐能力
 - 架构复杂了,业务没有明显的流程线,不好追踪管理
 

RabbitMQ
安装
1、单机部署
 在Centos7虚拟机中使用Docker来安装。
下载镜像:
docker pull rabbitmq:3-management
 
执行下面的命令来运行MQ容器:
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
 
2、集群部署
 在RabbitMQ的官方文档中,讲述了两种集群的配置方式:
- 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
 - 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
 
先来看普通模式集群。
首先,我们需要让3台MQ互相知道对方的存在。
 分别在3台机器中,设置 /etc/hosts文件,添加如下内容:
192.168.150.101 mq1
192.168.150.102 mq2
192.168.150.103 mq3
 
并在每台机器上测试,是否可以ping通对方。
使用
开启后在浏览器中输入虚拟机的地址,以此进入RabbitMQ页面。
RabbitMQ页面介绍:

 
 
 
消息模型

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色
- publisher:消息发布者,将消息发送到队列queue
 - queue:消息队列,负责接受并缓存消息
 - consumer:订阅队列,处理队列中的消息
 
简单队列模型
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");
        // 5.关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}
 
SpringAMQP
AMQP:Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。
SpringAMQP特征:
- 侦听器容器,用于异步处理入站消息
 - 用于发送和接收消息的RabbitTemplate
 - RabbitAdmin用于自动声明队列,交换和绑定
 
消息发送
1、引入依赖
 <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
2、在publisher服务中配置
spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
 
3、测试方法:
@RunWith(SpringRunner.class)
@SpringBootTest
//单元测试使用该两个注解
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
 
消息接收
1、在consumer中
spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1
 
2、监听方法
@Component
public class SpringRabbitListener {
     @RabbitListener(queues = "simple.queue")
     public void listenSimpleQueue(String msg) {
         System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
     }
}
 
Work Queue 工作队列
作用:提高消息处理的效率,避免消息堆积。
消息预取:预先取出队列中的消息。
 
 消息预取限制:通过prefetch控制预期消息的上限,使得处理更快的消费者能取得更多消息,慢的消费者取得少的消息。
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
 
发布订阅
以上两个模型都针对一个消息发布一个消费者,哪怕是队列模型,也是对消息进行分配,不可能一个消息获得多次。
 加入exchange(交换机),发布者不需要直接面对队列,交换机只负责发送,不负责存储。
 常见exchange类型包括:
- Fanout:广播
 - Direct:路由
 - Topic:话题
 
Fanout Exchange
和以上类型不同的点在于,会将接收到的消息路由到每一个跟其绑定的queue。

1、声明队列、交换机,并将其绑定
@Configuration
public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }
}
 
2、查看

 3、发送测试SpringAmqpTest中
 @Test
    public void testSendFanoutExchange() {
        // 交换机名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
 
Direct Exchange
会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
 - 发布者发送消息时,指定消息的RoutingKey
 - Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

 
1、SpringRabbitListener中声明监听方法,分别监听两个队列
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }
 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
 
2、SpringAmqpTest类发送测试
 @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, red!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
 
TopicExchange
与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。
1、SpringRabbitListener监听
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
 
2、SpringAmqpTest中测试
 @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "今天天气不错,我的心情好极了!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    }
 
消息转换器
RabbitMQ只支持字节注入,而SpringAMQP允许发对象,采用Java中jdk的序列化,将其转化。
Spring的对消息对象的处理是由org.springframework,amqp.suppor.converter.Messageconverter来处理的。默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。
 推荐用JSON方式序列化,步骤如下:
1、引入依赖
 <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
 
2、PublisherApplication中:
 @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
 
3、发送转换为JSON完成,接下来是接收:
SpringRabbitListener:
 @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg){
        System.out.println("接收到object.queue的消息:" + msg);
    }
 
ConsumerApplication:
 @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
 
elasticsearch(分布式搜索)
elasticsearch是强大的开源搜索引擎,广泛应用于日志数据分析、实时监控等领域。

 Lucene是一个]ava语言的搜索引擎类库,是Apache公司的顶级项目。
Lucene的优势:
- 易扩展
 - 高性能(基于倒排索引)
 
Lucene的缺点:
- 只限于Java语言开发
 - 学习曲线陡峭不支持水平扩展
 
倒排索引

与MySQL对比:
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性
 - Elasticsearch:擅长海量数据的搜索、分析、计算


 
索引库语法
查看索引库语法
GET /索引库名
 
示例:
GET /heima
 
删除索引库的语法
DELETE /索引库名
 
示例:
DELETE /heima
 
索引库和mapping一旦创建无法修改,但是可以添加新的字段,语法如下:
 ![[图片]](https://file.cfanz.cn/uploads/png/2024/03/13/9/76c7TSX1eD.png)
操作文档
新增文档
 ![[图片]](https://file.cfanz.cn/uploads/png/2024/03/13/9/L0B22E0OD0.png)
查看文档语法
GET /索引库名/-doc/文档id
 
示例:
GET /heima/_doc/1
 
删除文档的语法:
DELETE /索引库名/-doc/文档id
 
示例:
DELETE /heima/ doc/1
 
修改文档
 ![[图片]](https://file.cfanz.cn/uploads/png/2024/03/13/9/e4a2PCERZG.png)










