0
点赞
收藏
分享

微信扫一扫

基于 Servlet API 并部署到 Servlet 容器(五)

基于 Servlet API 并部署到 Servlet 容器(五)_应用程序

2.9. 测试

要测试使用的代码,您可以使用模拟Web服务器,例如OkHttp MockWebServer。查看示例 查看 Spring Framework 测试套件中的WebClientIntegrationTests或 OkHttp 存储库中的静态服务器示例。​​WebClient​

3. HTTP接口客户端

Spring 框架允许您将 HTTP 服务定义为带有 HTTP 的 Java 接口 交换方法。然后,您可以生成实现此接口的代理,并且 执行交换。这有助于简化 HTTP 远程访问并提供额外的 灵活选择 API 样式,例如同步或反应式。

有关详细信息,请参阅REST 终结点。

4. 网络套接字

与 Servlet 堆栈中相同

参考文档的这一部分涵盖了对反应式堆栈 WebSocket 的支持 消息。

= WebSocket 简介

WebSocket协议RFC 6455提供了一个标准化的 在客户端和服务器之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与HTTP不同的TCP协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。

WebSocket 交互以使用 HTTPheader 的 HTTP 请求开始 以升级,或者在这种情况下,切换到 WebSocket 协议。以下示例 显示了这样的交互:​​Upgrade​

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

标题。​​Upgrade​

使用连接。​​Upgrade​

支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:

HTTP/1.1 101 Switching Protocols 
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

协议切换

握手成功后,HTTP 升级请求的 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。

有关 WebSocket 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节,或许多介绍中的任何一个和 网络上的教程。

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 与 WebSocket 支持相关的云提供商的说明。

== HTTP 与 WebSocket

即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开始, 重要的是要了解这两种协议导致非常不同的 体系结构和应用程序编程模型。

在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。

相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递体系结构。

WebSocket也是一种低级传输协议,与HTTP不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 消息,除非客户端和服务器在消息语义上达成一致。

WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议 (例如,STOMP),通过 HTTP 握手请求上的标头。 如果没有这一点,他们需要提出自己的公约。​​Sec-WebSocket-Protocol​

== 何时使用 WebSockets

WebSockets可以使网页具有动态性和交互性。但是,在许多情况下, Ajax 和 HTTP 流或长轮询的组合可以提供简单和 有效的解决方案。

例如,新闻、邮件和社交源需要动态更新,但可能是 完全可以每隔几分钟这样做一次。协作、游戏和金融应用, 另一方面,需要更接近实时。

延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频和高容量的组合使最佳 使用 WebSocket 的案例。

另请记住,在互联网上,您无法控制的限制性代理 可能会排除 WebSocket 交互,因为它们未配置为传递标头,或者因为它们关闭了显示为空闲的长期连接。这 意味着将WebSocket用于防火墙内的内部应用程序是一个更多的 比面向公众的应用程序更直接的决定。​​Upgrade​

4.1. 网络套接字接口

与 Servlet 堆栈中相同

Spring 框架提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序。

4.1.1. 服务器

与 Servlet 堆栈中相同

若要创建 WebSocket 服务器,可以先创建一个。 以下示例演示如何执行此操作:​​WebSocketHandler​

public class MyWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}

然后,您可以将其映射到 URL:

@Configuration
class WebConfig {

@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers

return new SimpleUrlHandlerMapping(map, order);
}
}

如果使用WebFlux 配置,则没有任何内容 进一步执行,或者如果不使用 WebFlux 配置,则需要声明如下所示的 aaS:​​WebSocketHandlerAdapter​

@Configuration
class WebConfig {

// ...

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

4.1.2. ​​WebSocketHandler​

方法的获取和返回指示会话的应用程序处理何时完成。会话已处理 通过两个流,一个用于入站消息,一个用于出站消息。下表 描述处理流的两种方法:​​handle​​​​WebSocketHandler​​​​WebSocketSession​​​​Mono<Void>​

​WebSocketSession​​方法

描述

​Flux<WebSocketMessage> receive()​

提供对入站消息流的访问,并在连接关闭时完成。

​Mono<Void> send(Publisher<WebSocketMessage>)​

获取传出消息的源,写入消息,并返回 在源完成且写入完成时完成。​​Mono<Void>​

必须将入站和出站流组合成一个统一的流,并且 返回 a,反映该流的完成。取决于应用 要求,统一流在以下情况下完成:​​WebSocketHandler​​​​Mono<Void>​

  • 入站或出站消息流完成。
  • 入站流完成(即连接关闭),而出站流无限。
  • 在选定的点上,通过的方法。closeWebSocketSession

当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为反应流表示结束活动。 入站流接收完成或错误信号,出站流接收 收到取消信号。

处理程序的最基本实现是处理入站流的实现。这 以下示例显示了这样的实现:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
}
}

访问入站消息流。

对每条消息执行一些操作。

执行使用消息内容的嵌套异步操作。

当接收完成时,返回 a。​​Mono<Void>​

以下实现组合了入站和出站流:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<WebSocketMessage> output = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value));

return session.send(output);
}
}

处理入站消息流。

创建出站消息,生成组合流。

在我们继续接收时未完成的退货。​​Mono<Void>​

入站和出站流可以是独立的,并且仅在完成时才加入, 如以下示例所示:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {

Mono<Void> input = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();

Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));

return Mono.zip(input, output).then();
}
}

处理入站消息流。

发送传出消息。

加入流并在任一流结束时返回完成。​​Mono<Void>​

4.1.3. ​​DataBuffer​

​DataBuffer​​是 WebFlux 中字节缓冲区的表示形式。弹簧核心部分 该参考在数据缓冲区和编解码器部分中提供了更多信息。要理解的关键点是,在某些 像Netty这样的服务器,字节缓冲区是池化和引用计数的,必须释放 消耗时以避免内存泄漏。

在 Netty 上运行时,应用程序必须使用如果 希望保留输入数据缓冲区以确保它们不会被释放,并且 随后使用缓冲区时。​​DataBufferUtils.retain(dataBuffer)​​​​DataBufferUtils.release(dataBuffer)​

4.1.4. 握手

与 Servlet 堆栈中相同

​WebSocketHandlerAdapter​​委托给 a。默认情况下,这是一个实例 of,对 WebSocket 请求执行基本检查,并且 然后用于正在使用的服务器。目前,有内置的 支持 Reactor Netty、Tomcat、Jetty 和 Undertow。​​WebSocketService​​​​HandshakeWebSocketService​​​​RequestUpgradeStrategy​

​HandshakeWebSocketService​​公开允许的属性 设置 ato 从中提取属性并插入它们 进入的属性。​​sessionAttributePredicate​​​​Predicate<String>​​​​WebSession​​​​WebSocketSession​

4.1.5. 服务器配置

与 Servlet 堆栈中相同

对于每个服务器公开特定于 底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义 如WebFlux 配置的相应部分所示的属性,或者如果 不使用 WebFlux 配置,请使用以下内容:​​RequestUpgradeStrategy​

@Configuration
class WebConfig {

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}

@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}

检查服务器的升级策略以查看可用的选项。现在 只有Tomcat和Jetty暴露了这样的选择。

4.1.6. CORS

与 Servlet 堆栈中相同

配置 CORS 和限制对 WebSocket 终结点的访问的最简单方法是 让您的实现并返回允许的源、标头和其他详细信息。如果你做不到 那,你也可以在 按 URL 模式指定 CORS 设置。如果两者都指定,则使用 on 方法组合它们。​​WebSocketHandler​​​​CorsConfigurationSource​​​​CorsConfiguration​​​​corsConfigurations​​​​SimpleUrlHandler​​​​combine​​​​CorsConfiguration​

4.1.7. 客户端

Spring WebFlux 提供了抽象和实现 Reactor Netty、Tomcat、Jetty、Undertow 和 Standard Java(即 JSR-356)。​​WebSocketClient​

要启动 WebSocket 会话,您可以创建客户端的实例并使用其方法:​​execute​

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());

某些客户端(如 Jetty)需要停止和启动 然后才能使用它们。所有客户端都有与配置相关的构造函数选项 的基础 WebSocket 客户端。​​Lifecycle​

5. 测试

在春季 MVC 中相同

该模块提供了 、 和 的模拟实现。 参见Spring Web Reactivefor a 模拟对象的讨论。​​spring-test​​​​ServerHttpRequest​​​​ServerHttpResponse​​​​ServerWebExchange​

WebTestClient基于这些模拟请求和 响应对象,用于为在没有 HTTP 的情况下测试 WebFlux 应用程序提供支持 服务器。您也可以使用端到端集成测试。​​WebTestClient​

6. RSocket

本节介绍 Spring Framework 对 RSocket 协议的支持。

6.1. 概述

RSocket 是一种通过 TCP 进行多路复用、双工通信的应用协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:

  • ​Request-Response​​— 发送一条消息并接收一条回馈。
  • ​Request-Stream​​— 发送一条消息并接收返回的消息流。
  • ​Channel​​— 双向发送消息流。
  • ​Fire-and-Forget​​— 发送单向消息。

一旦建立初始连接,“客户端”与“服务器”的区别就会丢失,因为 双方变得对称,每一方都可以发起上述交互之一。 这就是为什么在协议中称参与方为“请求者”和“响应者” 而上述交互称为“请求流”或简称为“请求”。

这些是 RSocket 协议的主要功能和优点:

  • 跨网络边界的反应式流语义 — 用于流请求,例如背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于在网络级别或任何级别进行缓冲。Request-StreamChannel
  • 请求限制 — 此功能在帧之后命名为“租赁” 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约定期续订。LEASE
  • 会话恢复 — 这是针对连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。
  • 大型消息的分段和重新组合。
  • 保持活动状态(心跳)。

RSocket 有多种语言的实现。Java 库建立在Project Reactor, 和用于运输的反应堆网。这意味着 应用程序中来自反应式流发布服务器的信号透明地传播 通过 RSocket 跨网络。

6.1.1. 协议

RSocket 的好处之一是它在网络上具有明确定义的行为和 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API。本节提供简洁的概述以建立一些上下文。

连接

最初,客户端通过一些低级流传输连接到服务器,例如 作为TCP或WebSocket,并将帧发送到服务器以设置参数 连接。​​SETUP​

服务器可能会拒绝帧,但通常在发送帧后(对于客户端) 并且收到(对于服务器),双方都可以开始发出请求,除非指示使用租赁语义来限制请求的数量,在这种情况下 双方必须等待来自另一端的 Aframe 才能允许提出请求。​​SETUP​​​​SETUP​​​​LEASE​

提出请求

建立连接后,双方都可以通过其中之一发起请求 框架,,,或。每个 这些帧将一条消息从请求者传送到响应方。​​REQUEST_RESPONSE​​​​REQUEST_STREAM​​​​REQUEST_CHANNEL​​​​REQUEST_FNF​

然后,响应方可以返回带有响应消息的帧,在这种情况下 的请求者也可以发送具有更多请求的帧 消息。​​PAYLOAD​​​​REQUEST_CHANNEL​​​​PAYLOAD​

当请求涉及消息流(如 and)时, 响应方必须尊重来自请求方的需求信号。需求表示为 消息数。初始需求在帧中指定。随后的需求是信号Viaframes。​​Request-Stream​​​​Channel​​​​REQUEST_STREAM​​​​REQUEST_CHANNEL​​​​REQUEST_N​

每一方还可以通过帧发送元数据通知,这些通知不会 与任何单个请求有关,而是与整个连接有关。​​METADATA_PUSH​

消息格式

RSocket 消息包含数据和元数据。元数据可用于发送路由、 安全令牌等数据和元数据的格式可以不同。每个 MIME 类型 在帧中声明,并应用于给定连接上的所有请求。​​SETUP​

虽然所有消息都可以有元数据,但通常元数据(如路由)是按请求的 因此仅包含在请求的第一条消息中,即与其中一个帧一起,,,或。​​REQUEST_RESPONSE​​​​REQUEST_STREAM​​​​REQUEST_CHANNEL​​​​REQUEST_FNF​

协议扩展定义了在应用程序中使用的常见元数据格式:

  • 复合元数据 -- 多个, 独立格式化的元数据条目。
  • 路由 — 请求的路由。

6.1.2. Java实现

RSocket 的Java 实现建立在Project Reactor 之上。TCP 和 WebSocket 的传输是 建立在反应堆内蒂之上。作为反应式流 库,反应器简化了实现协议的工作。对于应用程序,它是 自然适合使用,带有声明式运算符和透明背面 压力支持。​​Flux​​​​Mono​

RSocket Java 中的 API 有意是最小和基本的。它专注于协议 功能并将应用程序编程模型(例如 RPC 代码生成与其他)保留为 更高层次,独立关注。

主合约io.rsocket.RSocket对四种请求交互类型进行建模,并表示对 单个消息、消息流和实际 以字节缓冲区的形式访问数据和元数据的消息。使用合约 对称。对于请求,应用程序被给予一个执行 请求。为了响应,应用程序实现处理请求。​​Mono​​​​Flux​​​​io.rsocket.Payload​​​​RSocket​​​​RSocket​​​​RSocket​

这并不是一个彻底的介绍。在大多数情况下,弹簧应用 不必直接使用其 API。但是,观察或实验可能很重要 与RSocket独立于Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。

6.1.3. 弹簧支撑

该模块包含以下内容:​​spring-messaging​

  • RSocketRequester— 流畅的API,通过数据和元数据编码/解码发出请求。io.rsocket.RSocket
  • 带注释的响应程序 — 带注释的处理程序方法 响应。@MessageMapping

模块包含沙实现,如杰克逊 CBOR/JSON 和 RSocket 应用程序可能需要的 Protobuf。它还包含可以插入以实现高效路由匹配的。​​spring-web​​​​Encoder​​​​Decoder​​​​PathPatternParser​

Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有客户端 支持和自动配置阿南德。 有关更多详细信息,请参阅 Spring Boot 参考中的RSocket 部分。​​RSocketRequester.Builder​​​​RSocketStrategies​

Spring Security 5.2 提供 RSocket 支持。

Spring Integration 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参阅 Spring 集成参考手册。

Spring Cloud Gateway 支持 RSocket 连接。

6.2. RSocket请求者

​RSocketRequester​​提供流畅的 API 来执行 RSocket 请求,接受和 返回数据和元数据的对象,而不是低级数据缓冲区。它可以使用 对称地,从客户端发出请求,从服务器发出请求。

6.2.1. 客户端请求者

获取 anon 客户端是连接到一个服务器,其中涉及 发送具有连接设置的 RSocketframe。提供 帮助准备包含连接的构建器 帧的设置。​​RSocketRequester​​​​SETUP​​​​RSocketRequester​​​​io.rsocket.core.RSocketConnector​​​​SETUP​

这是使用默认设置进行连接的最基本方法:

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);

以上不会立即连接。发出请求时,共享连接是 透明地建立和使用。

连接设置

​RSocketRequester.Builder​​提供以下内容以自定义初始帧:​​SETUP​

  • ​dataMimeType(MimeType)​​— 设置连接上数据的 MIME 类型。
  • ​metadataMimeType(MimeType)​​— 设置连接上元数据的 MIME 类型。
  • ​setupData(Object)​​— 要包含在中的数据。SETUP
  • ​setupRoute(String, Object…)​​— 要包含在元数据中的路由。SETUP
  • ​setupMetadata(Object, MimeType)​​— 要包含在中的其他元数据。SETUP

对于数据,默认 mime 类型派生自第一个配置的类型。为 元数据,默认 MIME 类型是复合元数据,允许多个 每个请求的元数据值和 MIME 类型对。通常两者都不需要更改。​​Decoder​

帧中的数据和元数据是可选的。在服务器端,@ConnectMapping方法可用于处理 连接和框架的内容。元数据可用于连接 级别安全性。​​SETUP​​​​SETUP​

策略

​RSocketRequester.Builder​​接受以配置请求者。 您需要使用它来提供编码器和解码器,用于数据的(反)序列化和 元数据值。默认情况下,仅注册来自 for、、和的基本编解码器。添加提供对更多内容的访问 可以按如下方式注册:​​RSocketStrategies​​​​spring-core​​​​String​​​​byte[]​​​​ByteBuffer​​​​spring-web​

RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();

RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);

​RSocketStrategies​​专为重复使用而设计。在某些情况下,例如客户端和服务器 相同的应用程序,最好在 Spring 配置中声明它。

客户端响应者

​RSocketRequester.Builder​​可用于配置响应程序对来自 服务器。

您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:

RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())
.build();

SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());

RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", 7000);

使用,ifis存在,高效 路由匹配。​​PathPatternRouteMatcher​​​​spring-web​

从类 withand/or 方法创建响应程序。​​@MessageMapping​​​​@ConnectMapping​

注册响应程序。

请注意,以上只是为客户端编程注册设计的快捷方式 反应。对于替代场景,其中客户端响应者处于 Spring 配置中, 您仍然可以选择春豆,然后按如下方式应用:​​RSocketMessageHandler​

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);

对于上述内容,您可能还需要使用 切换到不同的策略来检测客户端响应者,例如基于自定义 注释,例如默认值。这 在客户端和服务器或同一中的多个客户端的情况下是必需的 应用。​​setHandlerPredicate​​​​RSocketMessageHandler​​​​@RSocketClientResponder​​​​@Controller​

另请参阅带注释的响应程序,了解有关编程模型的更多信息。

高深

​RSocketRequesterBuilder​​提供回调以公开底层证券,以便为保持连接提供进一步的配置选项 间隔、会话恢复、拦截器等。您可以配置选项 在该级别如下:​​io.rsocket.core.RSocketConnector​

RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);

6.2.2. 服务器请求程序

从服务器向连接的客户端发出请求是获取 来自服务器的已连接客户端的请求程序。

在注释响应器中,方法支持参数。使用它来访问连接的请求者。保持 请注意,方法本质上是框架的处理程序,其中 必须在请求开始之前进行处理。因此,一开始的请求必须是 与处理分离。例如:​​@ConnectMapping​​​​@MessageMapping​​​​RSocketRequester​​​​@ConnectMapping​​​​SETUP​

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> {
// ...
});
return ...
}

异步启动请求,独立于处理。

执行处理和返回完成。​​Mono<Void>​

6.2.3. 请求

拥有客户端或服务器请求者后,可以按如下方式发出请求:

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlux(AirportLocation.class);

指定要包含在请求消息元数据中的路由。

为请求消息提供数据。

声明预期的响应。

交互类型由输入和 输出。上面的例子是因为发送了一个值和一个流 的值被接收。在大多数情况下,您不需要考虑这一点,只要 输入和输出的选择与 RSocket 交互类型以及输入和 响应方预期的输出。无效组合的唯一示例是多对一。​​Request-Stream​

该方法还接受任何反应式流,包括和,以及注册的任何其他价值生产者。对于多值例如产生 相同类型的值,请考虑使用重载方法之一以避免出现 对每个元素进行类型检查和查找:​​data(Object)​​​​Publisher​​​​Flux​​​​Mono​​​​ReactiveAdapterRegistry​​​​Publisher​​​​Flux​​​​data​​​​Encoder​

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

该步骤是可选的。对于不发送数据的请求,请跳过它:​​data(Object)​

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);

如果使用复合元数据(默认),并且如果 值由已注册支持。例如:​​Encoder​

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);

忘记返回的方法。请注意,仅指示消息已成功发送,而不是已处理。​​Fire-and-Forget​​​​send()​​​​Mono<Void>​​​​Mono​

使用返回值替换方法。​​Metadata-Push​​​​sendMetadata()​​​​Mono<Void>​

6.3. 带注释的响应者

RSocket 响应器可以实现为方法.方法处理单个请求,而方法处理 连接级事件(设置和元数据推送)。支持带注释的响应程序 对称地,用于从服务器端响应和从客户端响应。​​@MessageMapping​​​​@ConnectMapping​​​​@MessageMapping​​​​@ConnectMapping​

6.3.1. 服务器响应程序

要在服务器端使用带注释的响应程序,请添加到您的 Spring 用于检测 Bean 与方法的配置:​​RSocketMessageHandler​​​​@Controller​​​​@MessageMapping​​​​@ConnectMapping​

@Configuration
static class ServerConfig {

@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并插入响应程序,如下所示:​​RSocketMessageHandler​

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();

​RSocketMessageHandler​​默认情况下支持复合元数据和路由元数据。如果需要切换到 不同的 MIME 类型或注册其他元数据 MIME 类型。

您需要设置元数据和数据所需的实例 要支持的格式。您可能需要该模块来实现编解码器。​​Encoder​​​​Decoder​​​​spring-web​

默认情况下用于匹配路由通过。 我们建议插入 fromfor 高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。 默认情况下,两个路由匹配器都配置为使用 “.” 作为分隔符,并且没有 URL 解码与 HTTP URL 一样。​​SimpleRouteMatcher​​​​AntPathMatcher​​​​PathPatternRouteMatcher​​​​spring-web​

​RSocketMessageHandler​​可以通过以下方式配置这可能很有用,如果 您需要在同一进程中在客户端和服务器之间共享配置:​​RSocketStrategies​

@Configuration
static class ServerConfig {

@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}

6.3.2. 客户端响应者

客户端上的带注释的响应程序需要在 中配置。有关详细信息,请参阅客户端响应程序。​​RSocketRequester.Builder​

6.3.3. @MessageMapping

服务器或客户端响应程序配置就绪后,可以按如下方式使用方法:​​@MessageMapping​

@Controller
public class RadarsController {

@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}

上述方法响应具有 路线“定位.雷达.内”。它支持灵活的方法签名,可选择 使用以下方法参数:​​@MessageMapping​

方法参数

描述

​@Payload​

请求的有效负载。这可以是异步类型(如)的具体值。​​Mono​​​​Flux​

注意:批注的使用是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,被假定为预期的有效负载。

​RSocketRequester​

向远程端发出请求的请求者。

​@DestinationVariable​

根据映射模式中的变量从路由中提取的值,例如​​@MessageMapping("find.radar.{id}")​

​@Header​

注册用于提取的元数据值,如元数据提取器中所述。

​@Headers Map<String, Object>​

注册用于提取的所有元数据值,如元数据提取器中所述。

返回值应为一个或多个要序列化为响应的对象 负载。这可以是异步类型,例如 or、具体值或 或无值异步类型,例如。​​Mono​​​​Flux​​​​void​​​​Mono<Void>​

方法支持的 RSocket 交互类型由 输入(即参数)和输出的基数,其中 基数意味着以下内容:​​@MessageMapping​​​​@Payload​

基数

描述

1

显式值或单值异步类型,例如。​​Mono<T>​

多值异步类型,例如。​​Flux<T>​

0

对于输入,这意味着该方法没有参数。​​@Payload​

对于输出,这是无值异步类型,例如。​​void​​​​Mono<Void>​

下表显示了所有输入和输出基数组合以及相应的 交互类型:

输入基数

输出基数

交互类型

0, 1

0

即发即弃,请求-响应

0, 1

1

请求-响应

0, 1

请求流

0, 1, 许多

请求通道

6.3.4. @ConnectMapping

​@ConnectMapping​​在 RSocket 连接开始时处理帧,以及 通过帧的任何后续元数据推送通知,i.e.in。​​SETUP​​​​METADATA_PUSH​​​​metadataPush(Payload)​​​​io.rsocket.RSocket​

​@ConnectMapping​​方法支持与 @MessageMapping相同的参数,但基于元数据和来自 AndFrame 的数据。可以有一个模式来缩小处理范围 元数据中具有路由的特定连接,或者未声明任何模式 然后所有连接都匹配。​​SETUP​​​​METADATA_PUSH​​​​@ConnectMapping​

​@ConnectMapping​​方法不能返回数据,必须声明为返回值。如果处理返回新的错误 连接,则连接被拒绝。处理不得耽搁 请求用于连接。有关详细信息,请参阅服务器请求程序。​​void​​​​Mono<Void>​​​​RSocketRequester​

6.4. 元数据提取器

响应程序必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种方法 以访问提取的值。

​MetadataExtractor​​是一个接受序列化元数据并返回解码的协定 然后可以像标头一样按名称访问的名称-值对,例如 viain 注释处理程序方法。​​@Header​

​DefaultMetadataExtractor​​可以给出实例来解码元数据。出 它内置了对“消息/x.rsocket.routing.v0”的框,它解码到并保存在“路由”键下。对于您需要提供的任何其他 MIME 类型 a并注册 MIME 类型,如下所示:​​Decoder​​​​String​​​​Decoder​

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");

复合元数据非常适合组合独立的元数据值。然而, 请求者可能不支持复合元数据,或者可能选择不使用它。为此,可能需要自定义逻辑将解码值映射到输出 地图。下面是将 JSON 用于元数据的示例:​​DefaultMetadataExtractor​

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});

配置时,您可以使用配置的解码器创建提取器,并且 只需使用回调来自定义注册,如下所示:​​MetadataExtractor​​​​RSocketStrategies​​​​RSocketStrategies.Builder​

RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();

6.5. RSocket 接口

Spring 框架允许您将 RSocket 服务定义为带有注释的 Java 接口 RSocket 交换的方法。然后,您可以生成实现此接口的代理 并执行交换。这有助于通过包装 RSocket 远程访问来简化 使用底层RSocketRequester。

一,声明一个接口:​​@RSocketExchange​

interface RadarService {

@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);

// more RSocket exchange methods...

}

第二,创建一个代理来执行声明的 RSocket 交换:

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RepositoryService service = factory.createClient(RadarService.class);

6.5.1. 方法参数

带注释的 RSocket 交换方法支持灵活的方法签名,具有以下功能 方法参数:

方法参数

描述

​@DestinationVariable​

添加一个路由变量以与注释中的路由一起传递,以便展开路由中的模板占位符。 此变量可以是字符串或任何对象,然后通过以下方式进行格式化。​​RSocketRequester​​​​@RSocketExchange​​​​toString()​

​@Payload​

设置请求的输入有效负载。这可以是具体值,也可以是任何生产者 可以适应反应式流的值​​Publisher​​​​ReactiveAdapterRegistry​

​Object​​​,如果后跟​​MimeType​

输入有效负载中元数据条目的值。这可以是任意长度 因为下一个参数是元数据条目。价值可以是具体的 值或任何可以适应反应式流的单个值的生产者。​​Object​​​​MimeType​​​​Publisher​​​​ReactiveAdapterRegistry​

​MimeType​

用于元数据条目。前面的方法参数应为 元数据值。​​MimeType​

6.5.2. 返回值

带注释的 RSocket 交换方法支持作为具体值的返回值,或者 任何可以适应反应式流的价值生产者。​​Publisher​​​​ReactiveAdapterRegistry​

7. 反应式库

​spring-webflux​​依赖并在内部使用它来编写异步 逻辑并提供反应流支持。通常,WebFlux API 返回器(因为它们在内部使用)并宽容地接受任何响应式流实现作为输入。使用对立很重要,因为 它有助于表达基数 - 例如,是单个还是多个异步 值是预期的,这对于做出决策至关重要(例如,当 编码或解码 HTTP 消息)。​​reactor-core​​​​Flux​​​​Mono​​​​Publisher​​​​Flux​​​​Mono​

对于带注释的控制器,WebFlux 透明地适应 应用程序。这是在ReactiveAdapterRegistry的帮助下完成的,它 为反应式库和其他异步类型提供可插拔支持。注册表 内置了对 RxJava 3、Kotlin 协程和 SmallRye Mutiny 的支持,但你可以 也注册其他人。

对于函数式 API(例如功能端点等),一般规则 对于 WebFlux API,应用 — andas 返回值和 Reactive Streamsas 输入。当一个,无论是自定义的还是来自另一个反应式库, 提供,它只能被视为具有未知语义 (0..N) 的流。但是,如果 语义是已知的,你可以用它包装它 传递原始。​​WebClient​​​​Flux​​​​Mono​​​​Publisher​​​​Publisher​​​​Flux​​​​Mono.from(Publisher)​​​​Publisher​

例如,给定athat不是a,杰克逊JSON消息编写器 需要多个值。如果媒体类型暗示无限流(例如,),则会单独写入和刷新值。否则 值被缓冲到列表中并呈现为 JSON 数组。​​Publisher​​​​Mono​​​​application/json+stream​

举报

相关推荐

0 条评论