0
点赞
收藏
分享

微信扫一扫

想让系统更具有弹性?了解背压机制和响应式流的秘密!

分析传统开发模式和响应式编程实现方法之间的差别引出了数据流的概念

1 引言

从“流”的概念出发,并引入响应式流程规范,从而分析响应式编程中所包含的各个核心组件。

2 流的概念

由生产者生产,并由一或多个消费者消费的元素序列。

这种生产者/消费者模型也称发布/订阅模型。

3 流的处理模型

拉模式

即消费者主动从生产者拉取元素

推模式

在这种模式下,生产者将元素推送给消费者

想让系统更具有弹性?了解背压机制和响应式流的秘密!_响应式

4 流量控制

4.1 v(生产者生产数据) < v(消费者消费数据)

消费者消费数据没有任何压力,也就不需要进行流量的控制。

4.2 v(生产者生产数据) > v(消费者消费数据)

消费者可能因为无法处理过多的数据而发生崩溃。

想让系统更具有弹性?了解背压机制和响应式流的秘密!_数据_02

5 队列选型

无界队列

想让系统更具有弹性?了解背压机制和响应式流的秘密!_数据_03

有界丢弃队列

想让系统更具有弹性?了解背压机制和响应式流的秘密!_流量控制_04

有界丢弃队列考虑了资源的限制,适用于允许丢消息的业务场景。但消息重要性很高的场景显然不可能采取这种队列。

有界阻塞队列

想让系统更具有弹性?了解背压机制和响应式流的秘密!_数据_05

6 背压机制

纯“推”模式下的数据流量会有很多不可控制的因素,需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。

下游能够向上游反馈流量请求的机制。

如果消费者消费数据的速度赶不上生产者生产数据的速度,它就会持续消耗系统的资源。使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度,这种机制就是背压。

7 响应式流规范

针对流量控制的解决方案以及背压机制都包含在响应式流规范中,其中包含了响应式编程的各个核心组件。

8 响应式流的核心接口

8.1 Publisher<T>

public interface Publisher<T> {
 
     /**
      * 请求 Publisher 开始流式传输数据。
 这是一个“工厂方法”,可以多次调用,每次开始一个新的 Subscription.
 每个 Subscription 只能为一个 Subscriber.
 A Subscriber 只能订阅一次 Publisher单个 .
 如果 拒绝 Publisher 订阅尝试或失败,它将通过 发出 Subscriber.onError(Throwable)错误信号。
 参数:
 sSubscriber– 将消耗来自此的信号Publisher
      * Request {@link Publisher} to start streaming data.
      * <p>
      * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
      * <p>
      * Each {@link Subscription} will work for only a single {@link Subscriber}.
      * <p>
      * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
      * <p>
      * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
      * signal the error via {@link Subscriber#onError(Throwable)}.
      *
      * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
      */
     public void subscribe(Subscriber<? super T> s);
 }

A Publisher 是潜在无限数量的序列元素的提供者,根据从其 Subscriber(s)收到的需求发布它们。 A Publisher 可以在 Subscriber不同时间点动态地提供多个订阅 subscribe(Subscriber) 的服务。 <T> – 发出信号的元素类型

8.2 Subscriber<T>

public interface Subscriber<T> {
 
     /**
      * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
      * <p>
      * No data will start flowing until {@link Subscription#request(long)} is invoked.
      * <p>
      * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
      * <p>
      * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
      * 
      * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
      */
     public void onSubscribe(Subscription s);
 
     /**
      * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
      * 
      * @param t the element signaled
      */
     public void onNext(T t);
 
     /**
      * Failed terminal state.
      * <p>
      * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
      *
      * @param t the throwable signaled
      */
     public void onError(Throwable t);
 
     /**
      * Successful terminal state.
      * <p>
      * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
      */
     public void onComplete();
 }

8.3 Subscription

Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础,也是流量控制中实现背压机制的关键。

public interface Subscription {
 
     /**
      * No events will be sent by a {@link Publisher} until demand is signaled via this method.
      * <p>
      *  It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
      *  it may be treated by the {@link Publisher} as "effectively unbounded".
      * <p>
      * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
      * <p>
      * A {@link Publisher} can send less than is requested if the stream ends but
      * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
      * 
      * @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
      */
     public void request(long n);
 
     /**
      * Request the {@link Publisher} to stop sending data and clean up resources.
      * <p>
      * Data may still be sent to meet previously signalled demand after calling cancel.
      */
     public void cancel();
 }

想让系统更具有弹性?了解背压机制和响应式流的秘密!_响应式_06

响应式流规范非常灵活,还可提供独立的“推”模型和“拉”模型。

响应式流是一种规范,而该规范的核心价值,就在于为业界提供了一种非阻塞式背压的异步流处理标准。

业界主流响应式开发库包括:

  • RxJava
  • Akka
  • Vert.X
  • Project Reactor

总结

本文分析了数据流的概念的分类以及“推”流模式下的流量控制问题,从而引出了响应式系统中的背压机制。

响应式流规范是对响应式编程思想精髓的呈现 对于开发人员而言,理解这一规范有助于更好的掌握开发库的使用方法和基本原理。

FAQ

简要描述响应式流规范中数据的生产者和消费者之间的交互关系。

响应式流规范中,数据的生产者和消费者之间的交互关系是基于观察者模式实现的。生产者通过创建一个可观察的数据流并向消费者提供订阅方法,消费者可以通过订阅这个数据流来获取数据。一旦生产者有新的数据产生,它会将数据发送给所有已订阅该数据流的消费者。消费者可以通过取消订阅方法来停止接收数据。这种交互关系使得生产者和消费者之间解耦,同时也允许消费者按需获取数据,从而实现了高效的异步编程。

举报

相关推荐

0 条评论