2.9. 测试
要测试使用的代码,您可以使用模拟Web服务器,例如OkHttp MockWebServer。查看示例 查看 Spring Framework 测试套件中的WebClientIntegrationTests或 OkHttp 存储库中的静态服务器示例。WebClient
3. HTTP接口客户端
Spring 框架允许您将 HTTP 服务定义为带有 HTTP 的 Java 接口 交换方法。然后,您可以生成实现此接口的代理,并且 执行交换。这有助于简化 HTTP 远程访问并提供额外的 灵活选择 API 样式,例如同步或反应式。
有关详细信息,请参阅REST 终结点。
4. 网络套接字
与 Servlet 堆栈中相同
参考文档的这一部分涵盖了对反应式堆栈 WebSocket 的支持 消息。
= WebSocket 简介
WebSocket协议RFC 6455提供了一个标准化的 在客户端和服务器之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与HTTP不同的TCP协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。
WebSocket 交互以使用 HTTPheader 的 HTTP 请求开始 以升级,或者在这种情况下,切换到 WebSocket 协议。以下示例 显示了这样的交互:Upgrade
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
标题。 |
使用连接。 |
支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
协议切换 |
握手成功后,HTTP 升级请求的 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。
有关 WebSocket 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节,或许多介绍中的任何一个和 网络上的教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 与 WebSocket 支持相关的云提供商的说明。
== HTTP 与 WebSocket
即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开始, 重要的是要了解这两种协议导致非常不同的 体系结构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。
相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递体系结构。
WebSocket也是一种低级传输协议,与HTTP不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 消息,除非客户端和服务器在消息语义上达成一致。
WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议 (例如,STOMP),通过 HTTP 握手请求上的标头。 如果没有这一点,他们需要提出自己的公约。Sec-WebSocket-Protocol
== 何时使用 WebSockets
WebSockets可以使网页具有动态性和交互性。但是,在许多情况下, Ajax 和 HTTP 流或长轮询的组合可以提供简单和 有效的解决方案。
例如,新闻、邮件和社交源需要动态更新,但可能是 完全可以每隔几分钟这样做一次。协作、游戏和金融应用, 另一方面,需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频和高容量的组合使最佳 使用 WebSocket 的案例。
另请记住,在互联网上,您无法控制的限制性代理 可能会排除 WebSocket 交互,因为它们未配置为传递标头,或者因为它们关闭了显示为空闲的长期连接。这 意味着将WebSocket用于防火墙内的内部应用程序是一个更多的 比面向公众的应用程序更直接的决定。Upgrade
4.1. 网络套接字接口
与 Servlet 堆栈中相同
Spring 框架提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序。
4.1.1. 服务器
与 Servlet 堆栈中相同
若要创建 WebSocket 服务器,可以先创建一个。 以下示例演示如何执行此操作:WebSocketHandler
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
然后,您可以将其映射到 URL:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
如果使用WebFlux 配置,则没有任何内容 进一步执行,或者如果不使用 WebFlux 配置,则需要声明如下所示的 aaS:WebSocketHandlerAdapter
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
4.1.2. WebSocketHandler
方法的获取和返回指示会话的应用程序处理何时完成。会话已处理 通过两个流,一个用于入站消息,一个用于出站消息。下表 描述处理流的两种方法:handle
WebSocketHandler
WebSocketSession
Mono<Void>
| 描述 |
| 提供对入站消息流的访问,并在连接关闭时完成。 |
| 获取传出消息的源,写入消息,并返回 在源完成且写入完成时完成。 |
必须将入站和出站流组合成一个统一的流,并且 返回 a,反映该流的完成。取决于应用 要求,统一流在以下情况下完成:WebSocketHandler
Mono<Void>
- 入站或出站消息流完成。
- 入站流完成(即连接关闭),而出站流无限。
- 在选定的点上,通过的方法。
close
WebSocketSession
当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为反应流表示结束活动。 入站流接收完成或错误信号,出站流接收 收到取消信号。
处理程序的最基本实现是处理入站流的实现。这 以下示例显示了这样的实现:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
}
}
访问入站消息流。 |
对每条消息执行一些操作。 |
执行使用消息内容的嵌套异步操作。 |
当接收完成时,返回 a。 |
以下实现组合了入站和出站流:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value));
return session.send(output);
}
}
处理入站消息流。 |
创建出站消息,生成组合流。 |
在我们继续接收时未完成的退货。 |
入站和出站流可以是独立的,并且仅在完成时才加入, 如以下示例所示:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));
return Mono.zip(input, output).then();
}
}
处理入站消息流。 |
发送传出消息。 |
加入流并在任一流结束时返回完成。 |
4.1.3. DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示形式。弹簧核心部分 该参考在数据缓冲区和编解码器部分中提供了更多信息。要理解的关键点是,在某些 像Netty这样的服务器,字节缓冲区是池化和引用计数的,必须释放 消耗时以避免内存泄漏。
在 Netty 上运行时,应用程序必须使用如果 希望保留输入数据缓冲区以确保它们不会被释放,并且 随后使用缓冲区时。DataBufferUtils.retain(dataBuffer)
DataBufferUtils.release(dataBuffer)
4.1.4. 握手
与 Servlet 堆栈中相同
WebSocketHandlerAdapter
委托给 a。默认情况下,这是一个实例 of,对 WebSocket 请求执行基本检查,并且 然后用于正在使用的服务器。目前,有内置的 支持 Reactor Netty、Tomcat、Jetty 和 Undertow。WebSocketService
HandshakeWebSocketService
RequestUpgradeStrategy
HandshakeWebSocketService
公开允许的属性 设置 ato 从中提取属性并插入它们 进入的属性。sessionAttributePredicate
Predicate<String>
WebSession
WebSocketSession
4.1.5. 服务器配置
与 Servlet 堆栈中相同
对于每个服务器公开特定于 底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义 如WebFlux 配置的相应部分所示的属性,或者如果 不使用 WebFlux 配置,请使用以下内容:RequestUpgradeStrategy
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
检查服务器的升级策略以查看可用的选项。现在 只有Tomcat和Jetty暴露了这样的选择。
4.1.6. CORS
与 Servlet 堆栈中相同
配置 CORS 和限制对 WebSocket 终结点的访问的最简单方法是 让您的实现并返回允许的源、标头和其他详细信息。如果你做不到 那,你也可以在 按 URL 模式指定 CORS 设置。如果两者都指定,则使用 on 方法组合它们。WebSocketHandler
CorsConfigurationSource
CorsConfiguration
corsConfigurations
SimpleUrlHandler
combine
CorsConfiguration
4.1.7. 客户端
Spring WebFlux 提供了抽象和实现 Reactor Netty、Tomcat、Jetty、Undertow 和 Standard Java(即 JSR-356)。WebSocketClient
要启动 WebSocket 会话,您可以创建客户端的实例并使用其方法:execute
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
某些客户端(如 Jetty)需要停止和启动 然后才能使用它们。所有客户端都有与配置相关的构造函数选项 的基础 WebSocket 客户端。Lifecycle
5. 测试
在春季 MVC 中相同
该模块提供了 、 和 的模拟实现。 参见Spring Web Reactivefor a 模拟对象的讨论。spring-test
ServerHttpRequest
ServerHttpResponse
ServerWebExchange
WebTestClient基于这些模拟请求和 响应对象,用于为在没有 HTTP 的情况下测试 WebFlux 应用程序提供支持 服务器。您也可以使用端到端集成测试。WebTestClient
6. RSocket
本节介绍 Spring Framework 对 RSocket 协议的支持。
6.1. 概述
RSocket 是一种通过 TCP 进行多路复用、双工通信的应用协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:
-
Request-Response
— 发送一条消息并接收一条回馈。 -
Request-Stream
— 发送一条消息并接收返回的消息流。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
一旦建立初始连接,“客户端”与“服务器”的区别就会丢失,因为 双方变得对称,每一方都可以发起上述交互之一。 这就是为什么在协议中称参与方为“请求者”和“响应者” 而上述交互称为“请求流”或简称为“请求”。
这些是 RSocket 协议的主要功能和优点:
- 跨网络边界的反应式流语义 — 用于流请求,例如背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于在网络级别或任何级别进行缓冲。
Request-Stream
Channel
- 请求限制 — 此功能在帧之后命名为“租赁” 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约定期续订。
LEASE
- 会话恢复 — 这是针对连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。
- 大型消息的分段和重新组合。
- 保持活动状态(心跳)。
RSocket 有多种语言的实现。Java 库建立在Project Reactor, 和用于运输的反应堆网。这意味着 应用程序中来自反应式流发布服务器的信号透明地传播 通过 RSocket 跨网络。
6.1.1. 协议
RSocket 的好处之一是它在网络上具有明确定义的行为和 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API。本节提供简洁的概述以建立一些上下文。
连接
最初,客户端通过一些低级流传输连接到服务器,例如 作为TCP或WebSocket,并将帧发送到服务器以设置参数 连接。SETUP
服务器可能会拒绝帧,但通常在发送帧后(对于客户端) 并且收到(对于服务器),双方都可以开始发出请求,除非指示使用租赁语义来限制请求的数量,在这种情况下 双方必须等待来自另一端的 Aframe 才能允许提出请求。SETUP
SETUP
LEASE
提出请求
建立连接后,双方都可以通过其中之一发起请求 框架,,,或。每个 这些帧将一条消息从请求者传送到响应方。REQUEST_RESPONSE
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_FNF
然后,响应方可以返回带有响应消息的帧,在这种情况下 的请求者也可以发送具有更多请求的帧 消息。PAYLOAD
REQUEST_CHANNEL
PAYLOAD
当请求涉及消息流(如 and)时, 响应方必须尊重来自请求方的需求信号。需求表示为 消息数。初始需求在帧中指定。随后的需求是信号Viaframes。Request-Stream
Channel
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_N
每一方还可以通过帧发送元数据通知,这些通知不会 与任何单个请求有关,而是与整个连接有关。METADATA_PUSH
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、 安全令牌等数据和元数据的格式可以不同。每个 MIME 类型 在帧中声明,并应用于给定连接上的所有请求。SETUP
虽然所有消息都可以有元数据,但通常元数据(如路由)是按请求的 因此仅包含在请求的第一条消息中,即与其中一个帧一起,,,或。REQUEST_RESPONSE
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_FNF
协议扩展定义了在应用程序中使用的常见元数据格式:
- 复合元数据 -- 多个, 独立格式化的元数据条目。
- 路由 — 请求的路由。
6.1.2. Java实现
RSocket 的Java 实现建立在Project Reactor 之上。TCP 和 WebSocket 的传输是 建立在反应堆内蒂之上。作为反应式流 库,反应器简化了实现协议的工作。对于应用程序,它是 自然适合使用,带有声明式运算符和透明背面 压力支持。Flux
Mono
RSocket Java 中的 API 有意是最小和基本的。它专注于协议 功能并将应用程序编程模型(例如 RPC 代码生成与其他)保留为 更高层次,独立关注。
主合约io.rsocket.RSocket对四种请求交互类型进行建模,并表示对 单个消息、消息流和实际 以字节缓冲区的形式访问数据和元数据的消息。使用合约 对称。对于请求,应用程序被给予一个执行 请求。为了响应,应用程序实现处理请求。Mono
Flux
io.rsocket.Payload
RSocket
RSocket
RSocket
这并不是一个彻底的介绍。在大多数情况下,弹簧应用 不必直接使用其 API。但是,观察或实验可能很重要 与RSocket独立于Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。
6.1.3. 弹簧支撑
该模块包含以下内容:spring-messaging
- RSocketRequester— 流畅的API,通过数据和元数据编码/解码发出请求。io.rsocket.RSocket
- 带注释的响应程序 — 带注释的处理程序方法 响应。
@MessageMapping
模块包含沙实现,如杰克逊 CBOR/JSON 和 RSocket 应用程序可能需要的 Protobuf。它还包含可以插入以实现高效路由匹配的。spring-web
Encoder
Decoder
PathPatternParser
Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有客户端 支持和自动配置阿南德。 有关更多详细信息,请参阅 Spring Boot 参考中的RSocket 部分。RSocketRequester.Builder
RSocketStrategies
Spring Security 5.2 提供 RSocket 支持。
Spring Integration 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参阅 Spring 集成参考手册。
Spring Cloud Gateway 支持 RSocket 连接。
6.2. RSocket请求者
RSocketRequester
提供流畅的 API 来执行 RSocket 请求,接受和 返回数据和元数据的对象,而不是低级数据缓冲区。它可以使用 对称地,从客户端发出请求,从服务器发出请求。
6.2.1. 客户端请求者
获取 anon 客户端是连接到一个服务器,其中涉及 发送具有连接设置的 RSocketframe。提供 帮助准备包含连接的构建器 帧的设置。RSocketRequester
SETUP
RSocketRequester
io.rsocket.core.RSocketConnector
SETUP
这是使用默认设置进行连接的最基本方法:
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
以上不会立即连接。发出请求时,共享连接是 透明地建立和使用。
连接设置
RSocketRequester.Builder
提供以下内容以自定义初始帧:SETUP
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
— 要包含在中的数据。SETUP
-
setupRoute(String, Object…)
— 要包含在元数据中的路由。SETUP
-
setupMetadata(Object, MimeType)
— 要包含在中的其他元数据。SETUP
对于数据,默认 mime 类型派生自第一个配置的类型。为 元数据,默认 MIME 类型是复合元数据,允许多个 每个请求的元数据值和 MIME 类型对。通常两者都不需要更改。Decoder
帧中的数据和元数据是可选的。在服务器端,@ConnectMapping方法可用于处理 连接和框架的内容。元数据可用于连接 级别安全性。SETUP
SETUP
策略
RSocketRequester.Builder
接受以配置请求者。 您需要使用它来提供编码器和解码器,用于数据的(反)序列化和 元数据值。默认情况下,仅注册来自 for、、和的基本编解码器。添加提供对更多内容的访问 可以按如下方式注册:RSocketStrategies
spring-core
String
byte[]
ByteBuffer
spring-web
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
RSocketStrategies
专为重复使用而设计。在某些情况下,例如客户端和服务器 相同的应用程序,最好在 Spring 配置中声明它。
客户端响应者
RSocketRequester.Builder
可用于配置响应程序对来自 服务器。
您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", 7000);
使用,ifis存在,高效 路由匹配。 |
从类 withand/or 方法创建响应程序。 |
注册响应程序。 |
请注意,以上只是为客户端编程注册设计的快捷方式 反应。对于替代场景,其中客户端响应者处于 Spring 配置中, 您仍然可以选择春豆,然后按如下方式应用:RSocketMessageHandler
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
对于上述内容,您可能还需要使用 切换到不同的策略来检测客户端响应者,例如基于自定义 注释,例如默认值。这 在客户端和服务器或同一中的多个客户端的情况下是必需的 应用。setHandlerPredicate
RSocketMessageHandler
@RSocketClientResponder
@Controller
另请参阅带注释的响应程序,了解有关编程模型的更多信息。
高深
RSocketRequesterBuilder
提供回调以公开底层证券,以便为保持连接提供进一步的配置选项 间隔、会话恢复、拦截器等。您可以配置选项 在该级别如下:io.rsocket.core.RSocketConnector
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
6.2.2. 服务器请求程序
从服务器向连接的客户端发出请求是获取 来自服务器的已连接客户端的请求程序。
在注释响应器中,方法支持参数。使用它来访问连接的请求者。保持 请注意,方法本质上是框架的处理程序,其中 必须在请求开始之前进行处理。因此,一开始的请求必须是 与处理分离。例如:@ConnectMapping
@MessageMapping
RSocketRequester
@ConnectMapping
SETUP
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> {
// ...
});
return ...
}
异步启动请求,独立于处理。 |
执行处理和返回完成。 |
6.2.3. 请求
拥有客户端或服务器请求者后,可以按如下方式发出请求:
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlux(AirportLocation.class);
指定要包含在请求消息元数据中的路由。 |
为请求消息提供数据。 |
声明预期的响应。 |
交互类型由输入和 输出。上面的例子是因为发送了一个值和一个流 的值被接收。在大多数情况下,您不需要考虑这一点,只要 输入和输出的选择与 RSocket 交互类型以及输入和 响应方预期的输出。无效组合的唯一示例是多对一。Request-Stream
该方法还接受任何反应式流,包括和,以及注册的任何其他价值生产者。对于多值例如产生 相同类型的值,请考虑使用重载方法之一以避免出现 对每个元素进行类型检查和查找:data(Object)
Publisher
Flux
Mono
ReactiveAdapterRegistry
Publisher
Flux
data
Encoder
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
该步骤是可选的。对于不发送数据的请求,请跳过它:data(Object)
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
如果使用复合元数据(默认),并且如果 值由已注册支持。例如:Encoder
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
忘记返回的方法。请注意,仅指示消息已成功发送,而不是已处理。Fire-and-Forget
send()
Mono<Void>
Mono
使用返回值替换方法。Metadata-Push
sendMetadata()
Mono<Void>
6.3. 带注释的响应者
RSocket 响应器可以实现为方法.方法处理单个请求,而方法处理 连接级事件(设置和元数据推送)。支持带注释的响应程序 对称地,用于从服务器端响应和从客户端响应。@MessageMapping
@ConnectMapping
@MessageMapping
@ConnectMapping
6.3.1. 服务器响应程序
要在服务器端使用带注释的响应程序,请添加到您的 Spring 用于检测 Bean 与方法的配置:RSocketMessageHandler
@Controller
@MessageMapping
@ConnectMapping
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
然后通过 Java RSocket API 启动 RSocket 服务器,并插入响应程序,如下所示:RSocketMessageHandler
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
RSocketMessageHandler
默认情况下支持复合元数据和路由元数据。如果需要切换到 不同的 MIME 类型或注册其他元数据 MIME 类型。
您需要设置元数据和数据所需的实例 要支持的格式。您可能需要该模块来实现编解码器。Encoder
Decoder
spring-web
默认情况下用于匹配路由通过。 我们建议插入 fromfor 高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。 默认情况下,两个路由匹配器都配置为使用 “.” 作为分隔符,并且没有 URL 解码与 HTTP URL 一样。SimpleRouteMatcher
AntPathMatcher
PathPatternRouteMatcher
spring-web
RSocketMessageHandler
可以通过以下方式配置这可能很有用,如果 您需要在同一进程中在客户端和服务器之间共享配置:RSocketStrategies
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
6.3.2. 客户端响应者
客户端上的带注释的响应程序需要在 中配置。有关详细信息,请参阅客户端响应程序。RSocketRequester.Builder
6.3.3. @MessageMapping
服务器或客户端响应程序配置就绪后,可以按如下方式使用方法:@MessageMapping
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
上述方法响应具有 路线“定位.雷达.内”。它支持灵活的方法签名,可选择 使用以下方法参数:@MessageMapping
方法参数 | 描述 |
| 请求的有效负载。这可以是异步类型(如)的具体值。 注意:批注的使用是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,被假定为预期的有效负载。 |
| 向远程端发出请求的请求者。 |
| 根据映射模式中的变量从路由中提取的值,例如 |
| 注册用于提取的元数据值,如元数据提取器中所述。 |
| 注册用于提取的所有元数据值,如元数据提取器中所述。 |
返回值应为一个或多个要序列化为响应的对象 负载。这可以是异步类型,例如 or、具体值或 或无值异步类型,例如。Mono
Flux
void
Mono<Void>
方法支持的 RSocket 交互类型由 输入(即参数)和输出的基数,其中 基数意味着以下内容:@MessageMapping
@Payload
基数 | 描述 |
1 | 显式值或单值异步类型,例如。 |
多 | 多值异步类型,例如。 |
0 | 对于输入,这意味着该方法没有参数。 对于输出,这是无值异步类型,例如。 |
下表显示了所有输入和输出基数组合以及相应的 交互类型:
输入基数 | 输出基数 | 交互类型 |
0, 1 | 0 | 即发即弃,请求-响应 |
0, 1 | 1 | 请求-响应 |
0, 1 | 多 | 请求流 |
多 | 0, 1, 许多 | 请求通道 |
6.3.4. @ConnectMapping
@ConnectMapping
在 RSocket 连接开始时处理帧,以及 通过帧的任何后续元数据推送通知,i.e.in。SETUP
METADATA_PUSH
metadataPush(Payload)
io.rsocket.RSocket
@ConnectMapping
方法支持与 @MessageMapping相同的参数,但基于元数据和来自 AndFrame 的数据。可以有一个模式来缩小处理范围 元数据中具有路由的特定连接,或者未声明任何模式 然后所有连接都匹配。SETUP
METADATA_PUSH
@ConnectMapping
@ConnectMapping
方法不能返回数据,必须声明为返回值。如果处理返回新的错误 连接,则连接被拒绝。处理不得耽搁 请求用于连接。有关详细信息,请参阅服务器请求程序。void
Mono<Void>
RSocketRequester
6.4. 元数据提取器
响应程序必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种方法 以访问提取的值。
MetadataExtractor
是一个接受序列化元数据并返回解码的协定 然后可以像标头一样按名称访问的名称-值对,例如 viain 注释处理程序方法。@Header
DefaultMetadataExtractor
可以给出实例来解码元数据。出 它内置了对“消息/x.rsocket.routing.v0”的框,它解码到并保存在“路由”键下。对于您需要提供的任何其他 MIME 类型 a并注册 MIME 类型,如下所示:Decoder
String
Decoder
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
复合元数据非常适合组合独立的元数据值。然而, 请求者可能不支持复合元数据,或者可能选择不使用它。为此,可能需要自定义逻辑将解码值映射到输出 地图。下面是将 JSON 用于元数据的示例:DefaultMetadataExtractor
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
配置时,您可以使用配置的解码器创建提取器,并且 只需使用回调来自定义注册,如下所示:MetadataExtractor
RSocketStrategies
RSocketStrategies.Builder
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
6.5. RSocket 接口
Spring 框架允许您将 RSocket 服务定义为带有注释的 Java 接口 RSocket 交换的方法。然后,您可以生成实现此接口的代理 并执行交换。这有助于通过包装 RSocket 远程访问来简化 使用底层RSocketRequester。
一,声明一个接口:@RSocketExchange
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
第二,创建一个代理来执行声明的 RSocket 交换:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RepositoryService service = factory.createClient(RadarService.class);
6.5.1. 方法参数
带注释的 RSocket 交换方法支持灵活的方法签名,具有以下功能 方法参数:
方法参数 | 描述 |
| 添加一个路由变量以与注释中的路由一起传递,以便展开路由中的模板占位符。 此变量可以是字符串或任何对象,然后通过以下方式进行格式化。 |
| 设置请求的输入有效负载。这可以是具体值,也可以是任何生产者 可以适应反应式流的值 |
| 输入有效负载中元数据条目的值。这可以是任意长度 因为下一个参数是元数据条目。价值可以是具体的 值或任何可以适应反应式流的单个值的生产者。 |
| 用于元数据条目。前面的方法参数应为 元数据值。 |
6.5.2. 返回值
带注释的 RSocket 交换方法支持作为具体值的返回值,或者 任何可以适应反应式流的价值生产者。Publisher
ReactiveAdapterRegistry
7. 反应式库
spring-webflux
依赖并在内部使用它来编写异步 逻辑并提供反应流支持。通常,WebFlux API 返回器(因为它们在内部使用)并宽容地接受任何响应式流实现作为输入。使用对立很重要,因为 它有助于表达基数 - 例如,是单个还是多个异步 值是预期的,这对于做出决策至关重要(例如,当 编码或解码 HTTP 消息)。reactor-core
Flux
Mono
Publisher
Flux
Mono
对于带注释的控制器,WebFlux 透明地适应 应用程序。这是在ReactiveAdapterRegistry的帮助下完成的,它 为反应式库和其他异步类型提供可插拔支持。注册表 内置了对 RxJava 3、Kotlin 协程和 SmallRye Mutiny 的支持,但你可以 也注册其他人。
对于函数式 API(例如功能端点等),一般规则 对于 WebFlux API,应用 — andas 返回值和 Reactive Streamsas 输入。当一个,无论是自定义的还是来自另一个反应式库, 提供,它只能被视为具有未知语义 (0..N) 的流。但是,如果 语义是已知的,你可以用它包装它 传递原始。WebClient
Flux
Mono
Publisher
Publisher
Flux
Mono.from(Publisher)
Publisher
例如,给定athat不是a,杰克逊JSON消息编写器 需要多个值。如果媒体类型暗示无限流(例如,),则会单独写入和刷新值。否则 值被缓冲到列表中并呈现为 JSON 数组。Publisher
Mono
application/json+stream