0
点赞
收藏
分享

微信扫一扫

Spring Integration (集成)路由消息

Spring Integration (集成)路由消息_Group

概述

路由器是许多消息传递架构中的关键元素。 它们使用来自消息通道的消息,并根据一组条件将每个使用的消息转发到一个或多个不同的消息通道。

Spring 集成提供以下路由器:

  • 有效负载类型路由器
  • 标头值路由器
  • 收件人列表路由器
  • XPath 路由器(XML 模块的一部分)
  • 错误消息异常类型路由器
  • (通用)路由器

路由器实现共享许多配置参数。 但是,路由器之间存在某些差异。 此外,配置参数的可用性取决于路由器是在链内还是在链外使用。 为了提供快速概述,以下两个表中列出了所有可用属性。

下表显示了可用于链外路由器的配置参数:

表 1.链外路由器

Spring Integration (集成)路由消息_Java_02

下表显示了链内路由器可用的配置参数:

表 2.链内的路由器

Spring Integration (集成)路由消息_XML_03

从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。 因此,一些小的更改可能会破坏旧的基于Spring Integration的应用程序。

从 Spring Integration 2.1 开始,属性被删除,以支持将其行为与属性合并。 此外,该属性现在默认为。​​ignore-channel-name-resolution-failures​​​​resolution-required​​​​resolution-required​​​​true​

在这些更改之前,属性默认为 ,导致在未解析通道和 nowas 设置时以静默方式丢弃消息。 新行为至少需要一个已解析的通道,默认情况下,如果未确定通道(或尝试发送不成功),则会引发 a)。​​resolution-required​​​​false​​​​default-output-channel​​​​MessageDeliveryException​

如果您确实希望以静默方式丢弃消息,则可以设置。​​default-output-channel="nullChannel"​

常用路由器参数

本节介绍所有路由器参数通用的参数(在本章前面的两个表中勾选了所有框的参数)。

链条内外

以下参数对链内外的所有路由器都有效。

​apply-sequence​

此属性指定是否应向每封邮件添加序列号和大小标头。 此可选属性默认为 。​​false​

​default-output-channel​

如果设置,此属性提供对通道解析无法返回任何通道时应向其发送消息的通道的引用。 如果未提供默认输出通道,路由器将引发异常。 如果要改为静默删除这些消息,请将默认输出通道属性值设置为 。​​nullChannel​

从版本 6.0 开始,设置默认输出通道也会重置选项。 因此,不会尝试从其名称解析通道,而是回退到此默认输出通道 - 类似于 Java语句。 Ifis 设置为显式设置,进一步的逻辑取决于选项:从密钥到未解析通道的消息只能到达 ifis。 因此,其中提供的配置和两者都设置为被初始化阶段拒绝。​​channelKeyFallback​​​​false​​​​switch​​​​channelKeyFallback​​​​true​​​​resolutionRequired​​​​defaultOutputChannel​​​​resolutionRequired​​​​false​​​​defaultOutputChannel​​​​channelKeyFallback​​​​resolutionRequired​​​​true​​​​AbstractMappingMessageRouter​

​resolution-required​

此属性指定是否必须始终将通道名称成功解析为存在的通道实例。 如果设置为 ,则在无法解析通道时引发 ais。 设置此属性会导致忽略任何无法解析的通道。 此可选属性默认为 。​​true​​​​MessagingException​​​​false​​​​true​

消息仅发送到未解析通道时(如果指定)。​​default-output-channel​​​​resolution-required​​​​false​

​ignore-send-failures​

如果设置为 ,则忽略发送到消息通道的失败。 如果设置为 ,则抛出 ais ,如果路由器解析多个通道,则任何后续通道都不会收到消息。​​true​​​​false​​​​MessageDeliveryException​

此属性的确切行为取决于消息发送到的类型。 例如,使用直接通道(单线程)时,发送失败可能是由下游更远的组件引发的异常引起的。 但是,当将消息发送到简单的队列通道(异步)时,引发异常的可能性相当小。​​Channel​

虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。 例如,这正是这样做的。 如果在仅路由到单个通道的路由器上设置此属性,则会吞下任何引起的异常,这通常没有意义。 在这种情况下,最好在流入口点捕获错误流中的异常。 因此,当路由器实现返回多个通道名称时,将属性设置为通常更有意义,因为失败的通道后面的其他通道仍将接收消息。​​recipient-list-router​​​​true​​​​ignore-send-failures​​​​true​

此属性默认为。​​false​

​timeout​

该属性指定将消息发送到目标消息通道时等待的最长时间(以毫秒为单位)。 默认情况下,发送操作无限期地阻止。​​timeout​

顶级(链外)

以下参数仅在链外的所有顶级路由器上有效。

​id​

标识底层 Spring Bean 定义,对于路由器,该定义是一个实例 ofor,具体取决于路由器的 ais aor a 分别取决于。 这是一个可选属性。​​EventDrivenConsumer​​​​PollingConsumer​​​​input-channel​​​​SubscribableChannel​​​​PollableChannel​

​auto-startup​

此“生命周期”属性指示是否应在应用程序上下文启动期间启动此组件。 此可选属性默认为 。​​true​

​input-channel​

此终结点的接收消息通道。

​order​

此属性定义当此终结点作为订阅者连接到通道时的调用顺序。 当该通道使用故障转移调度策略时,这一点尤其重要。 当此终结点本身是具有队列的通道的轮询使用者时,它不起作用。

路由器实现

由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring 集成的选项,以便使用 XML 命名空间支持或注释来委派给 POJO。 后面将讨论这两个问题。 但是,我们首先介绍几个满足常见要求的实现。

​PayloadTypeRouter​

将消息发送到由有效负载类型映射定义的通道,如以下示例所示:​​PayloadTypeRouter​

<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>

Spring Integration (see) 提供的命名空间也支持配置,它通过将配置及其相应的实现(通过使用 aelement 定义)组合到一个更简洁的配置元素中来简化配置。 以下示例显示了与上述配置等效但使用命名空间支持的配置:​​PayloadTypeRouter​​​​Namespace Support​​​​<router/>​​​​<bean/>​​​​PayloadTypeRouter​

<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}

使用 Java DSL 时,有两个选项。

首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}

public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}

请注意,路由器可以是,但不必是 a。 如果不是 a,则流会注册它。​​@Bean​​​​@Bean​

其次,可以在 DSL 流本身中定义路由函数,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}

​HeaderValueRouter​

根据单个标头值映射将消息发送到通道。 创建 ais 时,将使用要评估的标头的名称对其进行初始化。 标头的值可以是以下两种情况之一:​​HeaderValueRouter​​​​HeaderValueRouter​

  • 任意值
  • 频道名称

如果是任意值,则需要将这些标头值与其他映射到通道名称。 否则,不需要其他配置。

Spring 集成提供了一个简单的基于命名空间的 XML 配置来配置。 以下示例演示了何时需要将标头值映射到通道的配置:​​HeaderValueRouter​​​​HeaderValueRouter​

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析过程中,上述示例中定义的路由器可能会遇到通道解析失败,从而导致异常。 如果要禁止此类异常并将未解析的消息发送到默认输出通道(用属性标识)setto。​​default-output-channel​​​​resolution-required​​​​false​

通常,标头值未显式映射到通道的消息将发送到。 但是,当标头值映射到通道名称但无法解析通道时,将属性设置为 to 会导致将此类消息路由到。​​default-output-channel​​​​resolution-required​​​​false​​​​default-output-channel​

从 Spring Integration 2.1 开始,该属性已从 更改为。 属性默认值为。​​ignore-channel-name-resolution-failures​​​​resolution-required​​​​resolution-required​​​​true​

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}

使用 Java DSL 时,有两个选项。 首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}

public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}

请注意,路由器可以是,但不必是 a。 如果不是 a,则流会注册它。​​@Bean​​​​@Bean​

其次,可以在 DSL 流本身中定义路由函数,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}

不需要将标头值映射到通道名称的配置,因为标头值本身表示通道名称。 以下示例显示了不需要将标头值映射到通道名称的路由器:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>


从 Spring Integration 2.1 开始,解析通道的行为更加明确。 例如,如果省略属性,则路由器无法解析至少一个有效通道,并且任何通道名称解析失败都将通过 setto 忽略,然后抛出 ais 。​​default-output-channel​​​​resolution-required​​​​false​​​​MessageDeliveryException​



基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。 如果您确实要丢弃消息,还必须设置。​​default-output-channel​​​​nullChannel​


​RecipientListRouter​

将收到的每条消息发送到静态定义的消息通道列表。 以下示例创建一个:​​RecipientListRouter​​​​RecipientListRouter​

<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>

Spring 集成还为配置提供了命名空间支持(请参阅命名空间支持),如以下示例所示:​​RecipientListRouter​

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}

以下示例显示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}

此处的“apply-sequence”标志与发布-订阅-通道具有相同的效果,并且与发布-订阅通道一样,默认情况下禁用它。 有关详细信息,请参阅发布订阅频道配置​。​​recipient-list-router​

将 ais 配置为使用 Spring 表达式语言 (SpEL) 支持作为单个接收者通道的选择器时的另一个方便选项。 这样做类似于在“链”的开头使用过滤器来充当“选择性消费者”。 但是,在这种情况下,它们都相当简洁地组合到路由器的配置中,如以下示例所示:​​RecipientListRouter​

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在上述配置中,将计算由属性标识的 SpEL 表达式,以确定是否应将此收件人包含在给定输入消息的收件人列表中。 表达式的求值结果必须为 a。 如果未定义此属性,则通道始终位于收件人列表中。​​selector-expression​​​​boolean​

​RecipientListRouterManagement​

从版本 4.1 开始,提供了几个在运行时动态操作收件人的操作。 这些管理操作通过注释呈现。 它们可以通过使用控制总线以及 JMX 来使用,如以下示例所示:​​RecipientListRouter​​​​RecipientListRouterManagement​​​​@ManagedResource​

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>

messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");

从应用程序启动开始,只有一个收件人。 但是在命令之后,添加了收件人。 这是一个“注册对消息一部分的东西感兴趣”用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,因此我们正在订阅并在某个时候决定取消订阅。​​simpleRouter​​​​channel1​​​​addRecipient​​​​channel2​​​​recipient-list-router​

由于运行时管理操作,它可以从一开始就进行配置,而无需任何操作。 在这种情况下,当邮件没有一个匹配的收件人时,的行为是相同的。 配置了 ifis,消息将发送到那里。 否则扔掉。​​<recipient-list-router>​​​​<recipient>​​​​RecipientListRouter​​​​defaultOutputChannel​​​​MessageDeliveryException​

XPath 路由器

XPath 路由器是 XML 模块的一部分。 请参见使用 XPath 路由 XML 消息。

路由和错误处理

Spring Integration还提供了一个名为用于路由错误消息(定义为消息whoseis ainstance)的特殊类型路由器。 事实上,它们几乎是相同的。 唯一的区别是,在导航有效负载实例的实例层次结构(例如)以查找最具体的类型和通道映射时,然后浏览“异常原因”的层次结构(例如),以查找最具体的类型或通道映射,并用于匹配类或任何超类。​​ErrorMessageExceptionTypeRouter​​​​payload​​​​Throwable​​​​ErrorMessageExceptionTypeRouter​​​​PayloadTypeRouter​​​​PayloadTypeRouter​​​​payload.getClass().getSuperclass()​​​​ErrorMessageExceptionTypeRouter​​​​payload.getCause()​​​​Throwable​​​​mappingClass.isInstance(cause)​​​​cause​

在这种情况下,通道映射顺序很重要。 因此,如果需要获取 a 而不是 a 的映射,则必须首先在路由器上配置最后一个映射。​​IllegalArgumentException​​​​RuntimeException​

从版本 4.3 开始,在初始化阶段加载所有映射类以快速失败 a。​​ErrorMessageExceptionTypeRouter​​​​ClassNotFoundException​

以下示例显示了以下各项的示例配置:​​ErrorMessageExceptionTypeRouter​

<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />

配置通用路由器

Spring 集成提供了一个通用路由器。 您可以将其用于通用路由(与Spring Integration提供的其他路由器相反,每个路由器都有某种形式的专用化)。

使用 XML 配置基于内容的路由器

元素提供了一种将路由器连接到输入通道的方法,并且还接受可选属性。 该属性引用自定义路由器实现的 Bean 名称(必须扩展)。 以下示例显示了三个通用路由器:​​router​​​​default-output-channel​​​​ref​​​​AbstractMessageRouter​

<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>

<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>

<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>

或者,可以指向包含注释的 POJO(稍后显示),或者您可以将 POJO 与显式方法名称结合使用。 指定方法将应用本文档后面的注释部分中描述的相同行为。 以下示例定义了一个路由器,该路由器指向其属性中的 POJO:​​ref​​​​@Router​​​​ref​​​​@Router​​​​ref​

<int:router input-channel="input" ref="somePojo" method="someMethod"/>

如果自定义路由器实现在其他定义中引用,我们通常建议使用 aattribute。 但是,如果自定义路由器实现的范围应限定为 的单个定义,则可以提供内部 Bean 定义,如以下示例所示:​​ref​​​​<router>​​​​<router>​

<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>

不允许在同一配置中同时使用属性和内部处理程序定义。 这样做会产生不明确的条件并引发异常。​​ref​​​​<router>​

如果属性引用扩展的 Bean(例如框架本身提供的路由器),则配置将优化为直接引用路由器。 在这种情况下,每个属性必须引用单独的 Bean 实例(或 a-scoped bean)或使用内部配置类型。 但是,仅当您未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。 如果无意中从多个 Bean 引用了相同的消息处理程序,则会出现配置异常。​​ref​​​​AbstractMessageProducingHandler​​​​ref​​​​prototype​​​​<bean/>​

以下示例显示了在 Java 中配置的等效路由器:

@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {

@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}

};
}

以下示例显示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(myCustomRouter())
.get();
}

public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {

@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}

};
}

或者,您可以路由消息有效负载中的数据,如以下示例所示:

@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}

路由器和 Spring 表达式语言 (SpEL)

有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 Bean 似乎有点矫枉过正。 从Spring Integration 2.0开始,我们提供了一种替代方案,允许您使用SpEL来实现以前需要自定义POJO路由器的简单计算。

有关 Spring 表达式语言的更多信息,请参阅Spring 框架参考指南中的相关章节。

通常,计算 SpEL 表达式并将其结果映射到通道,如以下示例所示:

<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>

以下示例显示了在 Java 中配置的等效路由器:

@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}

以下示例显示了在 Java DSL 中配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}

为了进一步简化,SpEL 表达式的计算结果可能为通道名称,如以下表达式所示:

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

在前面的配置中,结果通道由 SpEL 表达式计算,该表达式将 的值与文本“通道”连接起来。​​payload​​​​String​

SpEL 配置路由器的另一个优点是表达式可以返回一个,有效地使每个收件人列表成为路由器。 每当表达式返回多个通道值时,消息都会转发到每个通道。 下面的示例演示了这样的表达式:​​Collection​​​​<router>​

<int:router input-channel="inChannel" expression="headers.channels"/>

在上述配置中,如果消息包含名称为“channels”的标头,并且该标头的值为 aof 通道名称,则消息将发送到列表中的每个通道。 当您需要选择多个通道时,您可能还会发现集合投影和集合选择表达式很有用。 有关详细信息,请参阅:​​List​

  • 馆藏投影
  • 馆藏选择
使用注释配置路由器

当用于注释方法时,该方法可能会返回 aor atype。 在后一种情况下,终结点会解析通道名称,就像解析默认输出通道一样。 此外,该方法可以返回单个值或集合。 如果返回集合,则回复消息将发送到多个通道。 总而言之,以下方法签名都是有效的:​​@Router​​​​MessageChannel​​​​String​

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

除了基于有效负载的路由之外,还可以根据消息标头中作为属性或属性提供的元数据来路由消息。 在这种情况下,批注方法可能包括带批注的参数,该参数映射到标头值,如以下示例所示,并记录在注释支持中:​​@Router​​​​@Header​

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)

有关基于 XML 的消息的路由(包括 XPath 支持),请参阅XML 支持 - 处理 XML 有效负载。

有关路由器配置的更多信息,另请参阅 Java DSL 一章中的消息路由器。

动态路由器

Spring 集成为常见的基于内容的路由用例提供了相当多的不同路由器配置,以及将自定义路由器实现为 POJO 的选项。 例如,提供了一种简单的方法来配置基于传入消息的有效负载类型计算通道的路由器,同时通过评估特定消息标头的值来配置计算通道的路由器,从而提供相同的便利。 还有基于表达式的 (SpEL) 路由器,其中通道是根据对表达式的评估确定的。 所有这些类型的路由器都表现出一些动态特性。​​PayloadTypeRouter​​​​HeaderValueRouter​

但是,这些路由器都需要静态配置。 即使在基于表达式的路由器中,表达式本身也被定义为路由器配置的一部分,这意味着对相同值运行的相同表达式始终会导致计算相同的通道。 这在大多数情况下是可以接受的,因为这种路线是明确定义的,因此是可预测的。 但有时我们需要动态更改路由器配置,以便消息流可以路由到不同的通道。

例如,您可能希望关闭系统的某些部分进行维护,并暂时将消息重新路由到其他消息流。 作为另一个示例,您可能希望通过添加另一个路由来处理更具体的类型(在 的情况下)来为消息流引入更精细的粒度。​​java.lang.Number​​​​PayloadTypeRouter​

不幸的是,使用静态路由器配置来实现这些目标中的任何一个,您必须关闭整个应用程序,更改路由器的配置(更改路由),然后恢复应用程序。 这显然不是任何人想要的解决方案。

动态路由器模式描述了在不关闭系统或单个路由器的情况下动态更改或配置路由器的机制。

在我们进入Spring Integration如何支持动态路由的细节之前,我们需要考虑路由器的典型流程:

  1. 计算通道标识符,这是路由器在收到消息后计算的值。 通常,它是一个字符串或实际的实例。MessageChannel
  2. 将通道标识符解析为通道名称。 我们将在本节后面介绍此过程的细节。
  3. 将频道名称解析为实际MessageChannel

如果步骤1导致实际实例,则在动态路由方面无能为力,因为这是任何路由器作业的最终产品。 但是,如果第一步产生的通道标识符不是实例,则有很多可能的方法可以影响派生过程。 请考虑以下有效负载类型路由器的示例:​​MessageChannel​​​​MessageChannel​​​​MessageChannel​​​​MessageChannel​

<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

在有效负载类型路由器的上下文中,前面提到的三个步骤将实现如下:

  1. 计算通道标识符,该标识符是有效负载类型的完全限定名称(例如,)。java.lang.String
  2. 将通道标识符解析为通道名称,其中上一步的结果用于从元素中定义的有效负载类型映射中选择适当的值。mapping
  3. 将通道名称解析为应用程序上下文中对 Bean 的引用的实际实例(希望是 a),由上一步的结果标识。MessageChannelMessageChannel

换句话说,每个步骤都会馈送下一步,直到该过程完成。

现在考虑一个标头值路由器的示例:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

现在我们可以考虑这三个步骤如何适用于标头值路由器:

  1. 计算通道标识符,该标识符是由属性标识的标头的值。header-name
  2. 将通道标识符解析为通道名称,其中上一步的结果用于从元素中定义的常规映射中选择适当的值。mapping
  3. 将通道名称解析为应用程序上下文中对 Bean 的引用的实际实例(希望是 a),由上一步的结果标识。MessageChannelMessageChannel

两种不同路由器类型的前两种配置看起来几乎相同。 但是,如果您查看我们的替代配置,我们清楚地看到没有子元素,如以下列表所示:​​HeaderValueRouter​​​​mapping​

<int:header-value-router input-channel="inputChannel" header-name="testHeader">

但是,该配置仍然完全有效。 所以自然的问题是第二步的映射呢?

第二步现在是可选的。 如果未定义 ifis,则在第一步中计算的通道标识符值将自动被视为,现在解析为实际值,就像在第三步中一样。 这也意味着第二步是向路由器提供动态特征的关键步骤之一,因为它引入了一个过程,允许您更改通道标识符解析为通道名称的方式,从而影响确定最终实例的过程从初始通道标识符。​​mapping​​​​channel name​​​​MessageChannel​​​​MessageChannel​

例如,在前面的配置中,假设值为“kermit”,现在是通道标识符(第一步)。 由于此路由器中没有映射,因此无法将此通道标识符解析为通道名称(第二步),此通道标识符现在被视为通道名称。 但是,如果存在映射但值不同,该怎么办? 最终结果仍然相同,因为如果无法通过将通道标识符解析为通道名称的过程来确定新值,则通道标识符将成为通道名称。​​testHeader​

剩下的就是第三步将通道名称(“kermit”)解析为由该名称标识的实际实例。 这基本上涉及对提供的名称进行 bean 查找。 现在,所有包含标头值对的消息都将路由到其 Bean 名称 (its) 为 'kermit' 的 a。​​MessageChannel​​​​testHeader=kermit​​​​MessageChannel​​​​id​

但是,如果您想将这些消息路由到“辛普森”频道怎么办?显然,更改静态配置是有效的,但这样做也需要关闭系统。 但是,如果您有权访问通道标识符映射,则可以在标头值对现在所在的位置引入新的映射,从而使第二步将“kermit”视为通道标识符,同时将其解析为“simpson”作为通道名称。​​kermit=simpson​

这显然同样适用于您现在可以重新映射或删除特定有效负载类型映射的地方。 事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步来解决实际问题。​​PayloadTypeRouter​​​​channel name​

任何作为子类的路由器(包括大多数框架定义的路由器)都是动态路由器,因为这是在级别定义的。 该映射的 setter 方法与“setChannelMapping”和“removeChannelMapping”方法一起作为公共方法公开。 这些允许您在运行时更改、添加和删除路由器映射,只要您引用路由器本身即可。 这也意味着您可以通过 JMX(请参阅JMX 支持)或 Spring 集成控制总线(参见控制总线)功能公开这些相同的配置选项。​​AbstractMappingMessageRouter​​​​channelMapping​​​​AbstractMappingMessageRouter​

回退到通道键作为通道名称灵活方便。 但是,如果您不信任消息创建者,则恶意参与者(了解系统)可能会创建路由到意外通道的消息。 例如,如果密钥设置为路由器输入通道的通道名称,则此类消息将路由回路由器,最终导致堆栈溢出错误。 因此,您可能希望禁用此功能(将属性设置为),并根据需要更改映射。​​channelKeyFallback​​​​false​

使用控制总线管理路由器映射

管理路由器映射的一种方法是通过控制总线模式,该模式公开了一个控制通道,您可以将控制消息发送到该通道以管理和监视 Spring 集成组件,包括路由器。

有关控制总线的详细信息,请参阅控制总线。

通常,您会发送一条控制消息,要求调用特定受管组件(如路由器)上的特定操作。 以下托管操作(方法)特定于更改路由器解析过程:

  • ​public void setChannelMapping(String key, String channelName)​​:允许您在 和 之间添加新映射或修改现有映射channel identifierchannel name
  • ​public void removeChannelMapping(String key)​​:允许您删除特定的通道映射,从而断开channel identifierchannel name

请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。 但是,如果要删除一个路由并添加另一个路由,则更新不是原子的。 这意味着路由表在更新之间可能处于不确定状态。 从版本 4.0 开始,您现在可以使用控制总线以原子方式更新整个路由表。 以下方法允许您执行此操作:

  • ​public Map<String, String>getChannelMappings()​​:返回当前映射。
  • ​public void replaceChannelMappings(Properties channelMappings)​​:更新映射。 请注意,参数是对象。 这种安排允许控制总线命令使用内置命令,如以下示例所示:channelMappingsPropertiesStringToPropertiesConverter

"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"

请注意,每个映射都由换行符 () 分隔。 对于映射的编程更改,出于类型安全问题,我们建议您使用该方法。忽略非对象的键或值。​​\n​​​​setChannelMappings​​​​replaceChannelMappings​​​​String​

使用 JMX 管理路由器映射

您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您喜欢的 JMX 客户端(例如 JConsole)来管理用于更改路由器配置的操作(方法)。

有关 Spring Integration 的 JMX 支持的更多信息,请参阅JMX 支持。

传送名单

从版本 4.1 开始,Spring 集成提供了路由名单企业集成模式的实现。 它作为 amessage 标头实现,用于在未为终结点指定 anis 时确定实例中的下一个通道。 此模式在复杂的动态情况下非常有用,因为配置多个路由器以确定消息流变得困难。 当消息到达没有的端点时,将咨询该消息以确定消息发送到的下一个通道。 当传送名单用尽时,将恢复正常处理。​​routingSlip​​​​AbstractMessageProducingHandler​​​​outputChannel​​​​output-channel​​​​routingSlip​​​​replyChannel​

传送名单的配置显示为选项,即包含哨兵的分号分隔传送名单,如以下示例所示:​​HeaderEnricher​​​​path​

<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

前面的示例具有:

  • 用于演示传送名单中的条目可以指定为可解析密钥的配置。<context:property-placeholder>path
  • 子元素用于填充处理程序。<header-enricher><routing-slip>RoutingSlipHeaderValueMessageProcessorHeaderEnricher
  • 接受一系列已解析的路由滑入条目,并返回(来自)带有 theasandas 首字母。RoutingSlipHeaderValueMessageProcessorStringpathprocessMessage()singletonMappathkey0routingSlipIndex

路由滑梯条目可以包含 bean 名称、bean 名称和 Spring 表达式 (SpEL)。 根据第一次调用时检查每个路由单条目。 它将条目(在应用程序上下文中不是 Bean 名称)转换为实例。条目被多次调用,直到它们返回 null 或空。​​path​​​​MessageChannel​​​​RoutingSlipRouteStrategy​​​​RoutingSlipHeaderValueMessageProcessor​​​​path​​​​BeanFactory​​​​processMessage​​​​ExpressionEvaluatingRoutingSlipRouteStrategy​​​​RoutingSlipRouteStrategy​​​​String​

由于流程中涉及路由名单,因此我们有一个请求-回复上下文。 这已被引入以确定下一个使用和对象。 此策略的实现应在应用程序上下文中注册为 Bean,并在路由名单中使用其 Bean 名称。 提供了实现。 它接受 SpEL 表达式,并将内部对象用作评估上下文的根对象。 这是为了避免每次调用的创建开销。 它是一个简单的Java Bean,具有两个属性:and。 通过此表达式实现,我们可以使用 SpEL(例如,and)指定路由滑入,并避免为 the 定义 bean。​​getOutputChannel​​​​RoutingSlipRouteStrategy​​​​outputChannel​​​​requestMessage​​​​reply​​​​path​​​​ExpressionEvaluatingRoutingSlipRouteStrategy​​​​ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply​​​​EvaluationContext​​​​ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()​​​​Message<?> request​​​​Object reply​​​​path​​​​@routingSlipRoutingPojo.get(request, reply)​​​​request.headers[myRoutingSlipChannel]​​​​RoutingSlipRouteStrategy​

论证总是一个。 根据上下文,回复对象可以是、一个或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。 在前两种情况下,通常的属性(和)在使用 SpEL(或 Java 实现)时可用。 对于任意域对象,这些属性不可用。 因此,如果结果用于确定下一个路径,则在将路由名单与 POJO 方法结合使用时要小心。​​requestMessage​​​​Message<?>​​​​Message<?>​​​​AbstractIntegrationMessageBuilder​​​​Message​​​​payload​​​​headers​

如果分布式环境中涉及传送名单,我们建议不要对传送名单使用内联表达式。 此建议适用于分布式环境,例如跨 JVM 应用程序、通过消息代理(例如AMQP​支持或JMS 支持​)或在集成流中使用持久(消息存储​)。 框架用于将它们转换为对象,并在消息标头中使用它们。 由于这个类不是(它不可能,因为它依赖于),整个变得不可序列化,并且在任何分布式操作中,我们最终都会得到一个。 要克服此限制,请将 anbean 注册到所需的 SpEL,并在路由滑梯配置中使用其 Bean 名称。​​path​​​​request-reply​​​​MessageStore​​​​RoutingSlipHeaderValueMessageProcessor​​​​ExpressionEvaluatingRoutingSlipRouteStrategy​​​​routingSlip​​​​Serializable​​​​BeanFactory​​​​Message​​​​NotSerializableException​​​​ExpressionEvaluatingRoutingSlipRouteStrategy​​​​path​

对于 Java 配置,您可以将实例添加到 bean 定义中,如以下示例所示:​​RoutingSlipHeaderValueMessageProcessor​​​​HeaderEnricher​

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}

当终结点生成回复且已定义 no,路由名单算法的工作方式如下:​​outputChannel​

  • 用于从路由滑单中获取值。routingSlipIndexpath
  • 如果值 fromis,则用于从中获取 bean。routingSlipIndexStringBeanFactory
  • 如果返回的 bean 是 的实例,则将其用作 next,并在回复消息标头中递增(路由单条目保持不变)。MessageChanneloutputChannelroutingSlipIndexpath
  • 如果返回的 Bean 是 的实例,并且它不返回空,则该结果将用作下一个 Bean 的名称。 保持不变。RoutingSlipRouteStrategygetNextPathStringoutputChannelroutingSlipIndex
  • 如果返回一个空器,则递增,并且递归地调用下一个传送滑单项。RoutingSlipRouteStrategy.getNextPathStringnullroutingSlipIndexgetOutputChannelFromRoutingSlippath
  • 如果下一个传送单条目不是 a,则它必须是 的实例。pathStringRoutingSlipRouteStrategy
  • 当超过路由滑单的大小时,算法将移动到标准标头的默认行为。routingSlipIndexpathreplyChannel

进程管理器企业集成模式

企业集成模式包括进程管理器模式。 现在,您可以使用封装在传送名单中的自定义进程管理器逻辑轻松实现此模式。 除了 Bean 名称之外,thecan 还可以返回 anyobject,并且不要求此实例在应用程序上下文中是 Bean。 这样,当无法预测应该使用哪个通道时,我们可以提供强大的动态路由逻辑。 A可以在 中创建并返回。 对于这种情况,Awith 关联实现是一个很好的组合。 例如,您可以路由到反应式流,如以下示例所示:​​RoutingSlipRouteStrategy​​​​RoutingSlipRouteStrategy​​​​MessageChannel​​​​MessageChannel​​​​MessageChannel​​​​RoutingSlipRouteStrategy​​​​FixedSubscriberChannel​​​​MessageHandler​

@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}

滤波器

邮件过滤器用于根据某些条件(如邮件头值或邮件内容本身)决定是否应传递或删除 a。 因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的输入通道接收的每条消息,同一消息可能会也可能不会发送到过滤器的输出通道。 与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。​​Message​

如本节后面所述,筛选器还支持丢弃通道。 在某些情况下,它可以根据布尔条件扮演非常简单的路由器(或“交换机”)的角色。

在 Spring 集成中,您可以将消息过滤器配置为委托给接口实现的消息端点。 该界面本身非常简单,如以下列表所示:​​MessageSelector​

public interface MessageSelector {

boolean accept(Message<?> message);

}

构造函数接受选择器实例,如以下示例所示:​​MessageFilter​

MessageFilter filter = new MessageFilter(someSelector);

结合命名空间和 SpEL,您可以使用很少的 Java 代码配置强大的过滤器。

使用 XML 配置筛选器

您可以使用 element 用于创建消息选择端点。 除了 andattributes 之外,它还需要属性。 可以指向实现,如以下示例所示:​​<filter>​​​​input-channel​​​​output-channel​​​​ref​​​​ref​​​​MessageSelector​

<int:filter input-channel="input" ref="selector" output-channel="output"/>

<bean id="selector" class="example.MessageSelectorImpl"/>

或者,您可以添加属性。 在这种情况下,该属性可以引用任何对象。 引用的方法可能需要入站消息的类型或有效负载类型。 该方法必须返回布尔值。 如果该方法返回“true”,则消息将发送到输出通道。 以下示例演示如何配置使用属性的筛选器:​​method​​​​ref​​​​Message​​​​method​

<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>

<bean id="exampleObject" class="example.SomeObject"/>

如果选择器或适应的 POJO 方法返回,则一些设置控制拒绝消息的处理。 默认情况下,(如果按照前面的示例进行配置)拒绝的邮件将以静默方式丢弃。 如果拒绝应改为导致错误条件,请将属性设置为 ,如以下示例所示:​​false​​​​throw-exception-on-rejection​​​​true​

<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>

如果要将被拒绝的邮件路由到特定通道,请提供该引用,如以下示例所示:​​discard-channel​

<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>

另请参阅建议过滤器。

消息筛选器通常与发布-订阅通道结合使用。 许多筛选器终结点可能订阅到同一通道,它们决定是否将消息传递到下一个终结点,该终结点可以是任何受支持的类型(例如服务激活器)。 这为使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动的方法提供了一种被动的替代方法。

如果自定义筛选器实现在其他定义中引用,我们建议使用 aattribute。 但是,如果自定义筛选器实现的作用域为单个元素,则应提供内部 Bean 定义,如以下示例所示:​​ref​​​​<filter>​​​​<filter>​

<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>

不允许在同一配置中同时使用 theattribute 和内部处理程序定义,因为它会产生不明确的条件并引发异常。​​ref​​​​<filter>​

如果属性引用扩展的 bean(例如框架本身提供的过滤器),则通过将输出通道直接注入过滤器 bean 来优化配置。 在这种情况下,每个都必须是单独的 Bean 实例(或 a-scope Bean)或使用内部配置类型。 但是,仅当未在筛选器 XML 定义中提供任何特定于筛选器的属性时,此优化才适用。 如果无意中从多个 Bean 引用了相同的消息处理程序,则会出现配置异常。​​ref​​​​MessageFilter​​​​ref​​​​prototype​​​​<bean/>​

随着SpEL支持的引入,Spring Integration将属性添加到过滤器元件中。 对于简单的过滤器,它可以用来完全避免Java,如以下示例所示:​​expression​

<int:filter input-channel="input" expression="payload.equals('nonsense')"/>

作为表达式属性值传递的字符串将作为 SpEL 表达式进行评估,其中包含评估上下文中可用的消息。 如果必须在应用程序上下文范围内包含表达式的结果,则可以使用SpEL 参考文档中定义的表示法,如以下示例所示:​​#{}​

<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>

如果表达式本身需要动态,则可以使用“表达式”子元素。 这为通过表达式的键从 an 解析表达式提供了间接级别。 这是一个可以直接实现的策略接口,或者您可以依赖 Spring 集成中可用的版本,该版本从“资源包”加载表达式,并可以在给定秒数后检查修改。 以下配置示例中演示了所有这些内容,如果修改了基础文件,则可以在一分钟内重新加载表达式:​​ExpressionSource​

<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>

<beans:bean id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>

如果 bean 被命名,则无需在元素上提供“源”属性。 但是,在前面的示例中,我们展示了它的完整性。​​ExpressionSource​​​​expressionSource​​​​<expression>​

“config/integration/expressions.properties”文件(或任何具有语言环境扩展名的更具体的版本,以加载资源包的典型方式进行解析)可以包含键/值对,如以下示例所示:

filterPatterns.example=payload > 100

所有这些用作属性或子元素的示例也可以应用于转换器、路由器、拆分器、服务激活器和标头丰富器元素。 给定组件类型的语义和角色将影响评估结果的解释,就像解释方法调用的返回值一样。 例如,表达式可以返回要由路由器组件视为消息通道名称的字符串。 但是,根据作为根对象的消息评估表达式并解析 Bean 名称(如果前缀为“@”)的底层功能在 Spring 集成中的所有核心 EIP 组件中是一致的。​​expression​

配置带有批注的过滤器

下面的示例演示如何使用批注配置筛选器:

public class PetFilter {
...
@Filter
public boolean dogsOnly(String input) {
...
}
}

指示此方法将用作筛选器的批注。 如果要将此类用作筛选器,则必须指定它。

XML 元素提供的所有配置选项也可用于注释。​​@Filter​

过滤器可以从 XML 显式引用,或者,如果在类上定义了注释,则通过类路径扫描自动检测。​​@MessageEndpoint​

另请参阅使用注释为端点提供建议。

分配器

拆分器是一个组件,其作用是将消息划分为多个部分,并发送要独立处理的结果消息。 通常,它们是包含聚合器的管道中的上游生产者。

编程模型

用于执行拆分的 API 由一个基类组成。 它是一种封装拆分器通用功能的实现,例如在生成的消息上填充适当的消息标头(、和)。 此填充可以跟踪消息及其处理结果(在典型方案中,这些标头将复制到各种转换终结点生成的消息中)。 然后,例如,组合消息处理器可以使用这些值。​​AbstractMessageSplitter​​​​MessageHandler​​​​CORRELATION_ID​​​​SEQUENCE_SIZE​​​​SEQUENCE_NUMBER​

以下示例显示摘自:​​AbstractMessageSplitter​

public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);

}

若要在应用程序中实现特定的拆分器,可以扩展和实现该方法,该方法包含用于拆分消息的逻辑。 返回值可以是以下值之一:​​AbstractMessageSplitter​​​​splitMessage​

  • Aor 消息数组或迭代消息的 an(or)。 在这种情况下,消息作为消息发送(在填充之后)。 使用此方法可以为您提供更多控制,例如,在拆分过程中填充自定义邮件头。CollectionIterableIteratorCORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER
  • Aor 非消息对象的数组或循环访问非消息对象的 an(or)。 它的工作方式与前面的情况类似,只是每个集合元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑邮件系统,并生成更易于测试的代码。CollectionIterableIterator
  • aor 非消息对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是发送了一条消息。Message

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。 在这种情况下,方法的返回值将如前所述进行解释。 输入参数可以是 a,也可以是简单的 POJO。 在后一种情况下,拆分器接收传入消息的有效负载。 我们推荐这种方法,因为它将代码与 Spring 集成 API 分离,并且通常更容易测试。​​Message​

迭代器

从版本 4.1 开始,该 支持 该类型 为 拆分。 请注意,在 an(or) 的情况下,我们无法访问基础项的数量,并且标头设置为。 这意味着默认的 anwon 不起作用,并且来自的组不会被释放;它将保持原样。 在这种情况下,您应该使用适当的定制或依赖或 a。​​AbstractMessageSplitter​​​​Iterator​​​​value​​​​Iterator​​​​Iterable​​​​SEQUENCE_SIZE​​​​0​​​​SequenceSizeReleaseStrategy​​​​<aggregator>​​​​CORRELATION_ID​​​​splitter​​​​incomplete​​​​ReleaseStrategy​​​​send-partial-result-on-expiry​​​​group-timeout​​​​MessageGroupStoreReaper​

从版本 5.0 开始,提供了允许确定 theandobjects 大小的方法(如果可能的话)。 例如可以确定标的物体的大小。 从版本 5.0.9 开始,此方法还正确返回大小 .​​AbstractMessageSplitter​​​​protected obtainSizeIfPossible()​​​​Iterable​​​​Iterator​​​​XPathMessageSplitter​​​​NodeList​​​​com.fasterxml.jackson.core.TreeNode​

Anobject 对于避免在拆分之前在内存中构建整个集合非常有用。 例如,当使用迭代或流从某些外部系统(例如数据库或 FTP)填充基础项时。​​Iterator​​​​MGET​

流和通量

从版本 5.0 开始,支持 Javaand Reactive Streamstypes 进行拆分。 在这种情况下,目标基于其迭代功能构建。​​AbstractMessageSplitter​​​​Stream​​​​Publisher​​​​value​​​​Iterator​

此外,如果分路器的输出通道是 a 的实例,则产生结果而不是 an,并且输出通道订阅此用于基于背压的分流下游流量需求。​​ReactiveStreamsSubscribableChannel​​​​AbstractMessageSplitter​​​​Flux​​​​Iterator​​​​Flux​

从版本 5.2 开始,拆分器支持 aoption 用于发送 split 函数返回空容器(集合、数组、流等)的请求消息。 在这种情况下,没有要迭代的项目要发送到。 拆分结果仍作为流量结束指示器。​​discardChannel​​​​Flux​​​​outputChannel​​​​null​

使用 XML 配置拆分器

可以通过 XML 配置拆分器,如下所示:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"
ref="splitterBean"
method="split"
input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="discardChannel" />

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>

拆分器的 ID 是可选的。

对应用程序上下文中定义的 Bean 的引用。 Bean 必须实现拆分逻辑,如前面的一节所述。 自选。 如果未提供对 Bean 的引用,则假定到达 上的消息的有效负载是 的实现,并且默认拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到 。​​input-channel​​​​java.util.Collection​​​​output-channel​

实现拆分逻辑的方法(在 Bean 上定义)。 自选。

分配器的输入通道。 必填。

拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自行指定回复通道)。

在出现空拆分结果时请求消息发送到的通道。 可选(它们将在结果的情况下停止)。​​null​

如果可以在其他定义中引用自定义拆分器实现,我们建议使用 aattribute。 但是,如果自定义拆分器处理程序实现的范围应限定为 的单个定义,则可以配置内部 Bean 定义,如以下示例所示:​​ref​​​​<splitter>​​​​<splitter>​

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>

不允许在同一配置中同时使用 aattribute 和内部处理程序定义,因为它会产生不明确的条件并导致引发异常。​​ref​​​​<int:splitter>​

如果属性引用扩展的 bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。 在这种情况下,每个实例必须是单独的 Bean 实例(或作用域 Bean)或使用内部配置类型。 但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。 如果无意中从多个 Bean 引用了相同的消息处理程序,则会出现配置异常。​​ref​​​​AbstractMessageProducingHandler​​​​ref​​​​prototype​​​​<bean/>​

配置带注释的拆分器

注释适用于需要类型或消息有效负载类型的方法,该方法的返回值应为任何类型。 如果返回的值不是实际对象,则每个项目都包装在 aas 的有效负载中。 每个结果都发送到定义该结果的端点的指定输出通道。​​@Splitter​​​​Message​​​​Collection​​​​Message​​​​Message​​​​Message​​​​Message​​​​@Splitter​

以下示例演示如何使用注释配置拆分器:​​@Splitter​

@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}

另请参阅使用注释、拆分器和文件拆分器通知端点。

聚合

聚合器基本上是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。

从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。 它必须保存要聚合的消息,并确定何时准备好聚合完整的消息组。 为了做到这一点,它需要一个。​​MessageStore​

功能性

聚合器通过关联和存储一组相关消息来组合它们,直到该组被视为完整。 此时,聚合器通过处理整个组来创建单个消息,并将聚合的消息作为输出发送。

实现聚合器需要提供执行聚合的逻辑(即,从多个消息创建单个消息)。 两个相关的概念是关联和发布。

相关性确定如何对消息进行分组以进行聚合。 在 Spring 集成中,默认情况下,关联是基于消息头完成的。 具有相同内容的消息分组在一起。 但是,您可以自定义关联策略,以允许使用其他方式指定消息应如何组合在一起。 为此,您可以实现 a(在本章后面介绍)。​​IntegrationMessageHeaderAccessor.CORRELATION_ID​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​​​​CorrelationStrategy​

为了确定一组消息准备处理的时间点,ais 进行了咨询。 聚合器的默认发布策略在序列中包含的所有消息都存在时根据标头释放组。 您可以通过提供对自定义实现的引用来覆盖此默认策略。​​ReleaseStrategy​​​​IntegrationMessageHeaderAccessor.SEQUENCE_SIZE​​​​ReleaseStrategy​

编程模型

聚合 API 由许多类组成:

  • 接口及其子类:和MessageGroupProcessorMethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessor
  • 接口及其默认实现:ReleaseStrategySimpleSequenceSizeReleaseStrategy
  • 接口及其默认实现:CorrelationStrategyHeaderAttributeCorrelationStrategy
​AggregatingMessageHandler​

(一个子类)是一个实现,封装了聚合器的通用功能(和其他相关用例),如下所示:​​AggregatingMessageHandler​​​​AbstractCorrelatingMessageHandler​​​​MessageHandler​

  • 将消息关联到要聚合的组中
  • 在组可以释放之前维护这些消息MessageStore
  • 决定何时可以释放该组
  • 将已发布的组聚合为单个消息
  • 识别和响应过期的组

决定如何将消息组合在一起的责任委托给实例。 决定是否可以释放消息组的责任委托给实例。​​CorrelationStrategy​​​​ReleaseStrategy​

下面的清单显示了基础的简要亮点(实现该方法的责任留给开发人员):​​AbstractAggregatingMessageGroupProcessor​​​​aggregatePayloads​

public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {

protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}

protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

看,andas开箱即用的实现。​​DefaultAggregatingMessageGroupProcessor​​​​ExpressionEvaluatingMessageGroupProcessor​​​​MethodInvokingMessageGroupProcessor​​​​AbstractAggregatingMessageGroupProcessor​

从版本 5.2 开始,astrategy 可用于输出消息的合并和计算(聚合)标头。 该实现具有返回组之间没有冲突的所有标头的逻辑;组中一封或多封邮件中缺少标头不被视为冲突。 省略冲突的标头。 与新引入的函数一起,此函数用于任何任意(非)实现。 本质上,框架将提供的函数注入到实例中,并将所有其他实现包装到实例中。 两者之间的逻辑差异后者在调用委托策略之前不会提前计算标头,并且在委托返回 aor 时不会调用函数。 在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。 该策略可用作XML配置的引用属性,作为Java DSL的选项和纯Java配置的选项。​​Function<MessageGroup, Map<String, Object>>​​​​AbstractAggregatingMessageGroupProcessor​​​​DefaultAggregateHeadersFunction​​​​DelegatingMessageGroupProcessor​​​​AbstractAggregatingMessageGroupProcessor​​​​MessageGroupProcessor​​​​AbstractAggregatingMessageGroupProcessor​​​​DelegatingMessageGroupProcessor​​​​AbstractAggregatingMessageGroupProcessor​​​​DelegatingMessageGroupProcessor​​​​Message​​​​AbstractIntegrationMessageBuilder​​​​Function<MessageGroup, Map<String, Object>>​​​​headers-function​​​​AggregatorSpec.headersFunction()​​​​AggregatorFactoryBean.setHeadersFunction()​

Theis 拥有的 and 具有基于消息标头的默认值,如以下示例所示:​​CorrelationStrategy​​​​AbstractCorrelatingMessageHandler​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}

至于消息组的实际处理,默认实现是。 它创建一个单曲,其有效载荷是给定组接收的有效载荷。 这适用于具有拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。​​DefaultAggregatingMessageGroupProcessor​​​​Message​​​​List​

在此类方案中使用发布-订阅通道或收件人列表路由器时,请务必启用 theflag。 这样做会添加必要的标头:,和。 默认情况下,Spring 集成中的拆分器启用了该行为,但发布-订阅通道或收件人列表路由器未启用该行为,因为这些组件可能用于不需要这些标头的各种上下文中。​​apply-sequence​​​​CORRELATION_ID​​​​SEQUENCE_NUMBER​​​​SEQUENCE_SIZE​

为应用程序实现特定的聚合器策略时,可以扩展和实现该方法。 但是,有更好的解决方案(与 API 耦合较少)来实现聚合逻辑,可以通过 XML 或注释进行配置。​​AbstractAggregatingMessageGroupProcessor​​​​aggregatePayloads​

通常,如果任何 POJO 提供接受 singleas 参数的方法(也支持参数化列表),则任何 POJO 都可以实现聚合算法。 调用此方法以聚合消息,如下所示:​​java.util.List​

  • 如果参数为 a,并且参数类型 T 可分配给该参数,则为聚合而累积的整个消息列表将发送到聚合器。java.util.Collection<T>Message
  • 如果参数是非参数化的或参数类型不可分配给,则该方法将接收累积消息的有效负载。java.util.CollectionMessage
  • 如果返回类型不可分配给,则将其视为框架自动创建的有效负载。MessageMessage

为了简化代码并促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。

从版本 5.3 开始,在处理消息组后,针对具有多个嵌套级别的正确拆分器聚合器方案执行消息标头修改。 仅当消息组发布结果不是消息集合时,才会执行此操作。 在这种情况下,目标负责在构建这些消息时进行调用。​​AbstractCorrelatingMessageHandler​​​​MessageBuilder.popSequenceDetails()​​​​MessageGroupProcessor​​​​MessageBuilder.popSequenceDetails()​

如果返回 a,则仅当与组中的第一条消息匹配时,才会对输出消息执行 a。 (以前,仅当从中返回普通有效负载或 anhas 时才执行此操作。​​MessageGroupProcessor​​​​Message​​​​MessageBuilder.popSequenceDetails()​​​​sequenceDetails​​​​AbstractIntegrationMessageBuilder​​​​MessageGroupProcessor​

此功能可以由 newproperty 控制,因此在某些情况下,当标准拆分器未填充关联详细信息时,可以禁用此功能。 从本质上讲,此属性撤消了最近的上游所做的工作。 有关详细信息,请参阅拆分器。​​popSequence​​​​boolean​​​​MessageBuilder.popSequenceDetails()​​​​applySequence = true​​​​AbstractMessageSplitter​

该方法返回一个。 因此,如果聚合 POJO 方法具有 a参数,则传入的参数恰好是该实例,并且当您使用 afor 聚合器时,该原始参数在释放组后被清除。 因此,如果 POJO 中的变量从聚合器中传递出来,它也会被清除。 如果希望仅按原样发布该集合以进行进一步处理,则必须生成一个新的(例如,)。 从版本 4.3 开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。​​SimpleMessageGroup.getMessages()​​​​unmodifiableCollection​​​​Collection<Message>​​​​Collection​​​​SimpleMessageStore​​​​Collection<Message>​​​​Collection<Message>​​​​Collection​​​​new ArrayList<Message>(messages)​

在版本 4.2 之前,无法使用 XML 配置提供 a。 只有 POJO 方法可用于聚合。 现在,如果框架检测到引用的(或内部的)bean 实现,它将用作聚合器的输出处理器。​​MessageGroupProcessor​​​​MessageProcessor​

如果您希望从 customas 释放对象集合作为消息的有效负载,您的类应该扩展和实现。​​MessageGroupProcessor​​​​AbstractAggregatingMessageGroupProcessor​​​​aggregatePayloads()​

此外,从版本 4.2 开始,提供了 ais 。 它返回来自组的消息集合,如前所述,这会导致单独发送释放的消息。​​SimpleMessageGroupProcessor​

这允许聚合器充当消息屏障,其中将保留到达的消息,直到发布策略触发并且组作为单个消息序列发布。

从版本 6.0 开始,上述拆分行为仅在组处理器为 a 时才有效。 否则,对于返回 a 的任何其他实现,只会发出一条回复消息,并将整个消息集合作为其有效负载。 这种逻辑是由聚合器的规范目的决定的 - 按某个键收集请求消息并生成单个分组消息。​​SimpleMessageGroupProcessor​​​​MessageGroupProcessor​​​​Collection<Message>​

​ReleaseStrategy​

接口定义如下:​​ReleaseStrategy​

public interface ReleaseStrategy {

boolean canRelease(MessageGroup group);

}

通常,如果任何 POJO 提供接受 singleas 参数(也支持参数化列表)并返回布尔值的方法,则任何 POJO 都可以实现完成决策逻辑。 此方法在每个新消息到达后调用,以确定组是否完整,如下所示:​​java.util.List​

  • 如果参数为 a,并且参数类型可分配给,则组中累积的消息的整个列表将发送到该方法。java.util.List<T>TMessage
  • 如果参数是非参数化的或参数类型不可分配给,则该方法将接收累积消息的有效负载。java.util.ListMessage
  • 该方法必须返回消息组已准备好进行聚合,否则返回 false。true

下面的示例演示如何对 aof 类型使用注释:​​@ReleaseStrategy​​​​List​​​​Message​

public class MyReleaseStrategy {

@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

下面的示例演示如何对 aof 类型使用注释:​​@ReleaseStrategy​​​​List​​​​String​

public class MyReleaseStrategy {

@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}

根据前两个示例中的签名,基于 POJO 的发布策略将传递 aof 尚未发布的消息(如果需要访问整个)或 aof 有效负载对象(如果类型参数不是任何内容)。 这满足了大多数用例。 但是,如果由于某种原因,您需要访问完整版,则应提供接口的实现。​​Collection​​​​Message​​​​Collection​​​​Message​​​​MessageGroup​​​​ReleaseStrategy​



处理可能较大的组时,应了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。 最有效的是实现,因为聚合器可以直接调用它。 第二种最有效的方法是具有参数类型的 POJO 方法。 效率最低的是带有 atype 的 POJO 方法。 每次调用发布策略时,框架都必须将有效负载从组中的消息复制到新集合中(并可能尝试将有效负载转换为)。 使用避免了转换,但仍需要创建新的。​​ReleaseStrategy​​​​Collection<Message<?>>​​​​Collection<Something>​​​​Something​​​​Collection<?>​​​​Collection​



出于这些原因,对于大型组,我们建议您实施。​​ReleaseStrategy​


释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。 如果组也是完整的(即,如果序列中的所有消息都已到达,或者没有定义序列),则该组将标记为完成。 此组的任何新消息都将发送到丢弃通道(如果已定义)。 设置(默认值为)删除整个组,任何新消息(与删除的组具有相同的相关 ID)将形成一个新组。 您可以通过一起使用而不是设置为释放部分序列。​​expire-groups-upon-completion​​​​true​​​​false​​​​MessageGroupStoreReaper​​​​send-partial-result-on-expiry​​​​true​

为了便于丢弃延迟到达的消息,聚合器必须在组发布后维护有关组的状态。 这最终会导致内存不足的情况。 为避免此类情况,应考虑配置 ato 以删除组元数据。 过期参数应设置为在达到某个点后使组过期,在此点之后,预计不会有延迟的消息到达。 有关配置收割机的信息,请参阅管理聚合器中的状态:消息组存储​。​​MessageGroupStoreReaper​

Spring 集成提供了以下实现: 此实现会查阅每个到达消息的 andheaders,以确定消息组何时完成并准备好进行聚合。 如前所述,它也是默认策略。​​ReleaseStrategy​​​​SimpleSequenceSizeReleaseStrategy​​​​SEQUENCE_NUMBER​​​​SEQUENCE_SIZE​

在 5.0 版之前,默认发布策略是,它不能很好地处理大型组。 使用该策略,可以检测并拒绝重复的序列号。 此操作可能很昂贵。​​SequenceSizeReleaseStrategy​

如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用 - 对于这些用例,它的效率要高得多,并且是自版本 5.0以来未指定部分组发布时的默认设置。​​SimpleSequenceSizeReleaseStrategy​

聚合大型组

4.3 版本更改了 ato(以前是 a)中的默认值。 从大型组中删除单个邮件时,这很昂贵(需要 O(n) 线性扫描)。 尽管哈希集的删除速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在插入和删除时计算哈希。 如果您有哈希成本高昂的消息,请考虑使用其他一些集合类型。 如使用MessageGroupFactory 中所述,提供了 ais 以便您可以选择最适合您需求的。 您还可以提供自己的工厂实现来创建其他一些实现。​​Collection​​​​SimpleMessageGroup​​​​HashSet​​​​BlockingQueue​​​​SimpleMessageGroupFactory​​​​Collection​​​​Collection<Message<?>>​

以下示例演示如何使用前面的实现和 a 配置聚合器:​​SimpleSequenceSizeReleaseStrategy​

<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />

如果筛选器终结点涉及聚合器上游的流,则序列大小释放策略(固定或基于标头)将无法实现其目的,因为筛选器可能会丢弃序列中的某些消息。 在这种情况下,建议选择另一个,或者使用从丢弃子流发送的补偿消息,该子流在其内容中携带一些信息,以便在自定义的完整组函数中跳过。 有关详细信息,请参阅筛选器​。​​sequenceSize​​​​ReleaseStrategy​

关联策略

接口定义如下:​​CorrelationStrategy​

public interface CorrelationStrategy {

Object getCorrelationKey(Message<?> message);

}

该方法返回 an,表示用于将消息与消息组相关联的相关键。 密钥必须满足用于密钥的标准,以实现 and。​​Object​​​​Map​​​​equals()​​​​hashCode()​

通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或参数)的规则与 a(包括支持注释)的规则相同。 该方法必须返回一个值,并且该值不能返回。​​ServiceActivator​​​​@Header​​​​null​

Spring 集成提供了以下实现: 此实现返回其中一个消息标头(其名称由构造函数参数指定)的值作为相关键。 默认情况下,关联策略是返回标头属性值的 a。 如果您有要用于关联的自定义标头名称,则可以在 的实例上配置它,并将其作为聚合器关联策略的参考。​​CorrelationStrategy​​​​HeaderAttributeCorrelationStrategy​​​​HeaderAttributeCorrelationStrategy​​​​CORRELATION_ID​​​​HeaderAttributeCorrelationStrategy​

锁定注册表

对组的更改是线程安全的。 因此,当您同时发送同一相关 ID 的消息时,聚合器中只会处理其中一个消息,从而有效地将其作为每个消息组的单线程。 AI 用于获取已解析的相关 ID 的锁。 默认情况下使用 Ais(在内存中)。 若要在使用共享的服务器之间同步更新,必须配置共享锁注册表。​​LockRegistry​​​​DefaultLockRegistry​​​​MessageGroupStore​

避免死锁

如上所述,当消息组发生突变(添加或释放消息)时,会保持锁定。

请考虑以程:

...->aggregator1-> ... ->aggregator2-> ...

如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。 这将导致挂起的线程,并可能出现如下结果:​​jstack <pid>​

Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"

有几种方法可以避免此问题:

  • 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
  • 使用聚合器的输出通道 Anoras,以便下游流在新线程上运行ExecutorChannelQueueChannel
  • 从版本 5.1.1 开始,将聚合器属性设置为releaseLockBeforeSendtrue

如果由于某种原因,单个聚合器的输出最终路由回同一聚合器,也可能导致此问题。 当然,上述第一个解决方案不适用于这种情况。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器的信息,请参阅聚合器和重新排序器。

使用 XML 配置聚合器

Spring 集成支持通过元素使用 XML 配置聚合器。 以下示例显示了一个聚合器示例:​​<aggregator/>​

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"
auto-startup="true"
input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="throwAwayChannel"
message-store="persistentMessageStore"
order="1"
send-partial-result-on-expiry="false"
send-timeout="1000"

correlation-strategy="correlationStrategyBean"
correlation-strategy-method="correlate"
correlation-strategy-expression="headers['foo']"

ref="aggregatorBean"
method="aggregate"

release-strategy="releaseStrategyBean"
release-strategy-method="release"
release-strategy-expression="size() == 5"

expire-groups-upon-completion="false"
empty-group-min-timeout="60000"

lock-registry="lockRegistry"

group-timeout="60000"
group-timeout-expression="size() ge 2 ? 100 : -1"
expire-groups-upon-timeout="true"

scheduler="taskScheduler" >
<expire-transactional/>
<expire-advice-chain/>
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>

聚合器的 id 是可选的。

生命周期属性,指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为“true”)。

聚合器从中接收消息的通道。 必填。

聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在“回复通道”消息标头中指定回复通道)。

聚合器向其发送超时消息 (ifis) 的通道。 自选。​​send-partial-result-on-expiry​​​​false​

对 aused 的引用,用于将消息组存储在其相关键下,直到它们完成。 自选。 默认情况下,它是易失性内存中存储。 有关详细信息,请参阅消息存储​。​​MessageGroupStore​

订阅多个句柄时此聚合器的顺序(用于负载平衡目的)。 自选。​​DirectChannel​

指示过期的消息应聚合并发送到“输出通道”或“回复通道”,一旦其包含过期(请参阅MessageGroupStore.expireMessageGroups(long)​)。 通过配置 a 使 ais 过期的一种方法。 但是,您也可以通过调用来过期。 您可以通过控制总线操作来实现此目的,或者,如果您有对实例的引用,则通过调用来实现。 否则,此属性本身不执行任何操作。 它仅用作是否丢弃或发送到输出或回复通道的任何消息的指示器 即将过期。 可选(默认值为)。 注意:调用此属性可能更合适,因为该组可能实际上不会使 ifis 设置为过期。​​MessageGroup​​​​MessageGroup​​​​MessageGroupStoreReaper​​​​MessageGroup​​​​MessageGroupStore.expireMessageGroups(timeout)​​​​MessageGroupStore​​​​expireMessageGroups(timeout)​​​​MessageGroup​​​​false​​​​send-partial-result-on-timeout​​​​expire-groups-upon-timeout​​​​false​

发送回复理论时等待的超时间隔。 默认值为 ,这将导致无限期阻止。 仅当输出通道具有某些“发送”限制(例如具有固定的“容量”)时,才应用它。 在这种情况下,ais 抛出。 对于实现,这是忽略的. 对于,从计划的即将到期的任务会导致此任务被重新计划。 自选。​​Message​​​​output-channel​​​​discard-channel​​​​-1​​​​QueueChannel​​​​MessageDeliveryException​​​​AbstractSubscribableChannel​​​​send-timeout​​​​group-timeout(-expression)​​​​MessageDeliveryException​

对实现消息关联(分组)算法的 Bean 的引用。 Bean 可以是接口的实现,也可以是 POJO。 在后一种情况下,还必须定义属性。 可选(默认情况下,聚合器使用标头)。​​CorrelationStrategy​​​​correlation-strategy-method​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​

在引用的 Bean 上定义的方法。 它实现了相关决策算法。 可选,有限制(必须存在)。​​correlation-strategy​​​​correlation-strategy​

表示相关策略的 SpEL 表达式。 例:。 只允许一个奥福里斯。​​"headers['something']"​​​​correlation-strategy​​​​correlation-strategy-expression​

对应用程序上下文中定义的 Bean 的引用。 如前所述,Bean 必须实现聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。

在属性引用的 Bean 上定义的方法。 它实现消息聚合算法。 可选(取决于定义的属性)。​​ref​​​​ref​

对实现发布策略的 Bean 的引用。 Bean 可以是接口的实现,也可以是 POJO。 在后一种情况下,还必须定义属性。 可选(默认情况下,聚合器使用 header 属性)。​​ReleaseStrategy​​​​release-strategy-method​​​​IntegrationMessageHeaderAccessor.SEQUENCE_SIZE​

在属性引用的 Bean 上定义的方法。 它实现了完成决策算法。 可选,有限制(必须存在)。​​release-strategy​​​​release-strategy​

表示发布策略的 SpEL 表达式。 表达式的根对象是 a。 例:。 只允许一个奥福里斯。​​MessageGroup​​​​"size() == 5"​​​​release-strategy​​​​release-strategy-expression​

设置为 (默认值为 ) 时,已完成的组将从邮件存储区中删除,让具有相同相关性的后续消息形成一个新组。 默认行为是将与已完成的组具有相同相关性的消息发送到。​​true​​​​false​​​​discard-channel​

仅当为 的 ais 配置时才适用。 默认情况下,当 ais 配置为使部分组过期时,也会删除空组。 正常释放组后存在空组。 空组允许检测和丢弃延迟到达的消息。 如果希望空组的过期时间长于使部分组过期,请设置此属性。 然后,不会从中删除空组,直到它们至少在此毫秒数内未被修改。 请注意,使空组过期的实际时间也受收割者属性的影响,并且可能与此值加上超时一样多。​​MessageGroupStoreReaper​​​​MessageStore​​​​<aggregator>​​​​MessageGroupStoreReaper​​​​MessageStore​​​​timeout​

对阿豆的引用。 它用于获取基于并发操作的。 默认情况下,使用内部。 使用分布式(例如 )可确保聚合器的一个实例可以同时对组进行操作。 请参阅Redis​Lock Registry 或Zookeeper Lock Registry​了解更多信息。​​org.springframework.integration.util.LockRegistry​​​​Lock​​​​groupId​​​​MessageGroup​​​​DefaultLockRegistry​​​​LockRegistry​​​​ZookeeperLockRegistry​

超时(以毫秒为单位),用于在当前邮件到达时不释放组时强制完成。 此属性为聚合器提供了一个内置的基于时间的发布策略,当需要发出部分结果(或丢弃组)时,如果新消息未在超时(从最后一条消息到达的时间开始计算)内到达。 要设置从创建时间开始计算的超时,请参阅信息。 当新消息到达聚合器时,任何现有的消息都将被取消。 如果有(意味着不释放)和,则计划将新任务使组过期。 我们不建议将此属性设置为零(或负值)。 这样做会有效地禁用聚合器,因为每个消息组都会立即完成。 但是,您可以使用表达式有条件地将其设置为零(或负值)。 请参阅信息。 在完成期间执行的操作取决于 和 属性。 有关详细信息,请参阅聚合器和组超时​。 它与“组超时表达式”属性互斥。​​MessageGroup​​​​ReleaseStrategy​​​​MessageGroup​​​​MessageGroup​​​​group-timeout-expression​​​​ScheduledFuture<?>​​​​MessageGroup​​​​ReleaseStrategy​​​​false​​​​groupTimeout > 0​​​​group-timeout-expression​​​​ReleaseStrategy​​​​send-partial-group-on-expiry​

计算结果为 aas 评估上下文对象的 SpEL 表达式。 用于调度强制完成。 如果表达式的计算结果为 ,则不计划完成。 如果计算结果为零,则组将立即在当前线程上完成。 实际上,这提供了一个动态属性。 例如,如果您希望在创建组后经过 10 秒后强制完成,则可以考虑使用以下 SpEL 表达式:whereis 由此处提供,因为这里是评估上下文对象。 但请记住,组创建时间可能与首次到达消息的时间不同,具体取决于其他组过期属性的配置。 请参阅以获取更多信息。 与“组超时”属性互斥。​​groupTimeout​​​​MessageGroup​​​​#root​​​​MessageGroup​​​​null​​​​group-timeout​​​​MessageGroup​​​​timestamp + 10000 - T(System).currentTimeMillis()​​​​timestamp​​​​MessageGroup.getTimestamp()​​​​MessageGroup​​​​#root​​​​group-timeout​

当某个组因超时(或超时)而完成时,默认情况下该组已过期(完全删除)。 迟到的消息将启动一个新组。 您可以设置此项以完成组,但保留其元数据,以便丢弃延迟到达的消息。 空组可以在以后与属性一起使用过期。 它默认为“真”。​​MessageGroupStoreReaper​​​​false​​​​MessageGroupStoreReaper​​​​empty-group-min-timeout​

Abean 引用到调度 thebe 在没有新消息到达的情况下强制完成。 如果未提供,则使用在 () 中注册的默认调度程序 ()。 此属性不适用未指定的 iforis。​​TaskScheduler​​​​MessageGroup​​​​MessageGroup​​​​groupTimeout​​​​taskScheduler​​​​ApplicationContext​​​​ThreadPoolTaskScheduler​​​​group-timeout​​​​group-timeout-expression​

从版本 4.1 开始。 它允许为操作启动事务。 它由 aor 发起,并且不适用于正常、和操作。 仅允许此子元素 or。​​forceComplete​​​​group-timeout(-expression)​​​​MessageGroupStoreReaper​​​​add​​​​release​​​​discard​​​​<expire-advice-chain/>​

版本 4.1 开始。 它允许配置任何操作。 它由 aor 发起,并且不适用于正常、和操作。 仅允许此子元素 or。 也可以在此处使用 Springnamespace 配置事务。​​Advice​​​​forceComplete​​​​group-timeout(-expression)​​​​MessageGroupStoreReaper​​​​add​​​​release​​​​discard​​​​<expire-transactional/>​​​​Advice​​​​tx​

即将过期的组


有两个属性与即将过期(完全删除)组相关。 当组过期时,没有该组的记录,并且,如果具有相同相关性的新消息到达,则会启动一个新组。 当组完成(不过期)时,空组将保留,延迟到达的消息将被丢弃。 稍后可以通过将 ain 与属性结合使用来删除空组。​​MessageGroupStoreReaper​​​​empty-group-min-timeout​



​expire-groups-upon-completion​​​与释放组时的“正常”完成有关。 这默认为。​​ReleaseStrategy​​​​false​



如果某个组未正常完成,但由于超时而被释放或丢弃,则该组通常已过期。 从版本 4.1 开始,您可以使用来控制此行为。 它默认为向后兼容。​​expire-groups-upon-timeout​​​​true​



从版本 5.0 开始,空组也计划在之后删除。 如果,则在正常或部分序列发布发生时计划删除组的任务。​​empty-group-min-timeout​​​​expireGroupsUponCompletion == false​​​​minimumTimeoutForEmptyGroups > 0​



从版本 5.4 开始,可以将聚合器(和重新排序器)配置为使孤立组(持久性邮件存储中可能不会释放的组)过期。 (如果大于)指示应清除存储中早于此值的组。 该方法在启动时调用,并与提供的方法一起在计划任务中定期调用。 此方法也可以随时在外部调用。 过期逻辑根据上面提到的过期选项完全委托给功能。 当需要从那些不再使用常规邮件到达逻辑释放的旧组中清理邮件存储时,这种定期清除功能非常有用。 在大多数情况下,当使用持久性消息组存储时,这发生在应用程序重新启动后。 该功能类似于计划任务,但在使用组超时而不是收割器时,提供了一种处理特定组件中的旧组的便捷方法。 必须专门为当前关联终结点提供。 否则,一个聚合器可能会从另一个聚合器中清除组。 使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于属性。​​expireTimeout​​​​0​​​​purgeOrphanedGroups()​​​​expireDuration​​​​forceComplete(MessageGroup)​​​​MessageGroupStoreReaper​​​​MessageGroupStore​​​​expireGroupsUponCompletion​


当一个组超时时,再给一个释放组的机会。 如果这样做并且为 false,则由 控制过期。 如果在超时期间释放策略未释放组,则过期由 控制。 超时的组要么被丢弃,要么发生部分释放(基于)。​​ReleaseStrategy​​​​expire-groups-upon-timeout​​​​expire-groups-upon-completion​​​​expire-groups-upon-timeout​​​​send-partial-result-on-expiry​

如果自定义聚合器处理程序实现可以在其他定义中引用,我们通常建议使用 aattribute。 但是,如果自定义聚合器实现仅由单个定义使用,则可以使用内部 Bean 定义(从版本 1.0.3 开始)在元素中配置聚合 POJO,如以下示例所示:​​ref​​​​<aggregator>​​​​<aggregator>​​​​<aggregator>​

<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>

不允许在同一配置中同时使用 aattribute 和内部 bean 定义,因为它会产生不明确的条件。 在这种情况下,将引发异常。​​ref​​​​<aggregator>​

以下示例显示了聚合器 Bean 的实现:

public class PojoAggregator {

public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}

前面示例的完成策略 Bean 的实现可能如下所示:

public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}

只要这样做有意义,发布策略方法和聚合器方法都可以组合成一个 Bean。

上面示例中的相关策略 Bean 的实现可能如下所示:

public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}

前面示例中的聚合器将按某个条件对数字进行分组(在本例中为除以 10 后的余数),并保留该组,直到有效负载提供的数字之和超过某个值。

只要有必要这样做,发布策略方法、相关策略方法和聚合器方法都可以组合在一个 Bean 中。 (实际上,它们或其中任何两个都可以组合在一起。

聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,您可以使用SpEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议这样做。 假设您有一个旨在接收对象数组的旧组件。 我们知道默认的发布策略将所有聚合的消息组合在 中。 现在我们有两个问题。 首先,我们需要从列表中提取单个消息。 其次,我们需要提取每条消息的有效负载并组装对象数组。 以下示例解决了这两个问题:​​List​

public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}

但是,使用 SpEL,这样的需求实际上可以通过单行表达式相对容易地处理,从而使您免于编写自定义类并将其配置为 Bean。 以下示例演示如何执行此操作:

<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。

在处理自定义发布和关联策略时,可以应用相同的基于表达式的方法。

您可以实现简单的相关逻辑作为 SpEL 表达式并在属性中配置它,而不是为 customin 属性定义 bean,如以下示例所示:​​CorrelationStrategy​​​​correlation-strategy​​​​correlation-strategy-expression​

correlation-strategy-expression="payload.person.id"

在前面的示例中,我们假设有效负载具有 aattribute with an,该属性将用于关联消息。​​person​​​​id​

同样,对于 ,您可以将发布逻辑实现为 SpEL 表达式并在属性中对其进行配置。 评估上下文的根对象是自身。 可以使用表达式中组的属性来引用 Of 消息。​​ReleaseStrategy​​​​release-strategy-expression​​​​MessageGroup​​​​List​​​​message​

在 5.0 版之前的版本中,根对象是 的集合,如前面的示例所示:​​Message<?>​

release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 评估上下文的根对象是 the,并且您声明,只要该组中存在有效负载的消息,就应该释放该组。​​MessageGroup​​​​5​

聚合器和组超时

从版本 4.0 开始,引入了两个新的互斥属性:and。 请参阅使用 XML 配置聚合器。 在某些情况下,如果当前消息到达时未释放,则可能需要在超时后发出聚合器结果(或丢弃组)。 为此,该选项允许调度强制完成,如以下示例所示:​​group-timeout​​​​group-timeout-expression​​​​ReleaseStrategy​​​​groupTimeout​​​​MessageGroup​

<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器按定义的顺序接收最后一条消息,则可以正常发布。 如果该特定消息未到达,则强制组在十秒后完成,只要该组至少包含两条消息。​​release-strategy-expression​​​​groupTimeout​

强制组完成的结果取决于和。 首先,再次咨询发布策略,以确定是否要进行正常发布。 虽然该组没有更改,但该组决定此时释放该组。 如果发布策略仍未释放组,则表示该组已过期。 Ifis,现有消息中的(部分)被释放为正常的聚合器回复消息。 否则,它将被丢弃。​​ReleaseStrategy​​​​send-partial-result-on-expiry​​​​ReleaseStrategy​​​​send-partial-result-on-expiry​​​​true​​​​MessageGroup​​​​output-channel​

行为和(请参阅使用 XML 配置聚合器)之间存在差异。 收割者定期启动所有人的强制完成。 如果在此期间没有收到新消息,则为每个单独执行此操作。 此外,收割者可用于删除空组(保留空组以丢弃延迟消息ifis false)。​​groupTimeout​​​​MessageGroupStoreReaper​​​​MessageGroup​​​​MessageGroupStore​​​​groupTimeout​​​​MessageGroup​​​​groupTimeout​​​​expire-groups-upon-completion​

从版本 5.5 开始,可以评估为 ainstance。 这在根据组创建时间 () 而不是当前消息到达确定计划任务时刻等情况下非常有用,因为它的计算时间计算为:​​groupTimeoutExpression​​​​java.util.Date​​​​MessageGroup.getTimestamp()​​​​groupTimeoutExpression​​​​long​

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

使用注释配置聚合器

以下示例显示了配置了注释的聚合器:

public class Waiter {
...

@Aggregator
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}

@ReleaseStrategy
public boolean releaseChecker(List<Message<?>> messages) {
...
}

@CorrelationStrategy
public String correlateBy(OrderItem item) {
...
}
}

指示此方法应用作聚合器的注释。 如果将此类用作聚合器,则必须指定它。

指示此方法用作聚合器的发布策略的注释。 如果任何方法上都不存在,则聚合器使用 。​​SimpleSequenceSizeReleaseStrategy​

指示此方法应用作聚合器的关联策略的注释。 如果未指示关联策略,则聚合器使用基于。​​HeaderAttributeCorrelationStrategy​​​​CORRELATION_ID​

XML 元素提供的所有配置选项也可用于注释。​​@Aggregator​

聚合器可以从 XML 显式引用,或者,如果在类上定义,则通过类路径扫描自动检测。​​@MessageEndpoint​

聚合器组件的注释配置(和其他配置)仅涵盖简单的用例,其中大多数默认选项就足够了。 如果在使用注释配置时需要对这些选项进行更多控制,请考虑对 and 标记其方法使用 adefinition,如以下示例所示:​​@Aggregator​​​​@Bean​​​​AggregatingMessageHandler​​​​@Bean​​​​@ServiceActivator​

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}

有关详细信息,请参阅编程模型和注释@Bean方法。

从 4.2 版开始,可用于简化 Java 配置。​​AggregatorFactoryBean​​​​AggregatingMessageHandler​

在聚合器中管理状态:​​MessageGroupStore​

聚合器(以及 Spring 集成中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的相关键。 有状态模式(例如)中的接口设计由以下原则驱动:组件(无论是由框架还是由用户定义)都应该能够保持无状态。 所有状态都由 和 其管理被委托给。 接口定义如下:​​ReleaseStrategy​​​​MessageGroup​​​​MessageGroupStore​​​​MessageGroupStore​

public interface MessageGroupStore {

int getMessageCountForAllMessageGroups();

int getMarkedMessageCountForAllMessageGroups();

int getMessageGroupCount();

MessageGroup getMessageGroup(Object groupId);

MessageGroup addMessageToGroup(Object groupId, Message<?> message);

MessageGroup markMessageGroup(MessageGroup group);

MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

void removeMessageGroup(Object groupId);

void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

int expireMessageGroups(long timeout);
}

有关更多信息,请参阅​​Javadoc​​。

在等待触发发布策略时累积状态信息,并且该事件可能永远不会发生。 因此,为了防止过时的消息滞留,并为易失性存储提供在应用程序关闭时进行清理的钩子,允许您注册回调以在它们过期时应用于它。 界面非常简单,如以下列表所示:​​MessageGroupStore​​​​MessageGroups​​​​MessageGroupStore​​​​MessageGroups​

public interface MessageGroupCallback {

void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。

维护这些回调的列表,按需将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅前面描述的 theand方法)。​​MessageGroupStore​​​​registerMessageGroupExpiryCallback(..)​​​​expireMessageGroups(..)​

当您打算依赖该功能时,不要在不同的聚合器组件中使用相同的实例,这一点很重要。 每个注册自己的基于回调。 这样,每个过期组都可能被错误的聚合器完成或丢弃。 从版本 5.0.10 开始,ais 从 for 注册回调中使用。 反过来,检查此类的实例是否存在,如果回调集中已存在一个错误,则使用适当的消息记录错误。 这样,框架不允许在不同的聚合器/重新排序器中使用实例,以避免上述未由特定关联处理程序创建的组过期的副作用。​​MessageGroupStore​​​​expireMessageGroups​​​​AbstractCorrelatingMessageHandler​​​​MessageGroupCallback​​​​forceComplete()​​​​UniqueExpiryCallback​​​​AbstractCorrelatingMessageHandler​​​​MessageGroupStore​​​​MessageGroupStore​​​​MessageGroupStore​

可以使用超时值调用该方法。 任何早于当前时间减去此值的消息都将过期并应用回调。 因此,定义消息组“过期”含义的是存储的用户。​​expireMessageGroups​

为了方便用户,Spring 集成以 a 的形式为消息到期提供了一个包装器,如以下示例所示:​​MessageGroupStoreReaper​

<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

死神是一个。 在前面的示例中,每十秒调用一次消息组存储的 expire 方法。 超时本身为 30 秒。​​Runnable​


重要的是要了解 的 'timeout' 属性是一个近似值,受任务调度程序速率的影响,因为此属性仅在任务的下一次计划执行时被选中。 例如,如果超时设置为 10 分钟,但任务计划每小时运行一次,并且任务的最后一次执行发生在超时前一分钟,则在接下来的 59 分钟内不会过期。 因此,我们建议将速率设置为至少等于超时值或更短。​​MessageGroupStoreReaper​​​​MessageGroupStoreReaper​​​​MessageGroupStoreReaper​​​​MessageGroupStoreReaper​​​​MessageGroup​

除了收割者之外,当应用程序通过生命周期回调关闭时,还会调用到期回调。​​AbstractCorrelatingMessageHandler​

注册自己的到期回调,这是聚合器的 XML 配置中与布尔标志的链接。 如果该标志设置为 ,则在调用到期回调时,组中尚未释放的任何未标记的消息都可以发送到输出通道。​​AbstractCorrelatingMessageHandler​​​​send-partial-result-on-expiry​​​​true​


由于是从计划任务调用的,并且可能导致向下游集成流生成消息(取决于选项),因此建议通过 an 提供 customwith ato 处理程序异常,因为常规聚合器发布功能可能会期望这样做。 同样的逻辑也适用于同样依赖于 a 的组超时功能。 有关详细信息​​,请参阅错误处理​​​。​​MessageGroupStoreReaper​​​​sendPartialResultOnExpiry​​​​TaskScheduler​​​​MessagePublishingErrorHandler​​​​errorChannel​​​​TaskScheduler​



当共享用于不同的关联端点时,必须配置适当的确保组 ID 的唯一性。 否则,当一个关联终结点释放或终止来自其他关联的终结点的消息时,可能会发生意外行为。 具有相同相关键的消息存储在同一个消息组中。​​MessageStore​​​​CorrelationStrategy​



某些实现允许通过对数据进行分区来使用相同的物理资源。 例如,thehas aproperty和thehas aproperty。​​MessageStore​​​​JdbcMessageStore​​​​region​​​​MongoDbMessageStore​​​​collectionName​



有关接口及其实现的详细信息,请参阅​​消息存储​​​。​​MessageStore​


助焊剂聚合器

在 5.2 版中,引入了该组件。 它基于项目反应堆和操作员。 传入的消息被发送到由此组件的构造函数启动。 如果未提供或不是实例,则从实现中完成对mainis的订阅。 否则,它将推迟到实现完成的订阅。 消息按使用 afor 组键进行分组。 默认情况下,将查询消息的标头。​​FluxAggregatorMessageHandler​​​​Flux.groupBy()​​​​Flux.window()​​​​FluxSink​​​​Flux.create()​​​​outputChannel​​​​ReactiveStreamsSubscribableChannel​​​​Flux​​​​Lifecycle.start()​​​​ReactiveStreamsSubscribableChannel​​​​Flux.groupBy()​​​​CorrelationStrategy​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​

默认情况下,每个关闭的窗口都作为要生成的消息的有效负载发布。 此消息包含窗口中第一条消息的所有标头。 这在输出消息有效负载中必须在下游订阅和处理。 这样的逻辑可以被配置选项定制(或取代)。 例如,如果我们想在最终消息中有 aof 有效负载,我们可以这样配置:​​Flux​​​​Flux​​​​setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)​​​​FluxAggregatorMessageHandler​​​​List​​​​Flux.collectList()​

fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));

要选择适当的窗口策略,请提供多个选项:​​FluxAggregatorMessageHandler​

  • ​setBoundaryTrigger(Predicate<Message<?>>)​​- 传播给操作员。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。Flux.windowUntil()
  • ​setWindowSize(int)​​和 - 传播到理论。 默认情况下,窗口大小是根据组中的第一条消息及其标头计算的。setWindowSizeFunction(Function<Message<?>, Integer>)Flux.window(int)windowTimeout(int, Duration)IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
  • ​setWindowTimespan(Duration)​​- 传播到理论上取决于窗口大小配置。Flux.window(Duration)windowTimeout(int, Duration)
  • ​setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)​​- 一个函数,用于将转换应用于公开选项未涵盖的任何自定义窗口操作的分组通量。

由于此组件是实现,因此可以简单地与消息传递注释一起用作定义。 使用Java DSL,可以从EIP方法使用它。 下面的示例演示了我们如何注册 anat 运行时以及如何将 acan 与上游的拆分器相关联:​​MessageHandler​​​​@Bean​​​​@ServiceActivator​​​​.handle()​​​​IntegrationFlow​​​​FluxAggregatorMessageHandler​

IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();

Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组的条件

从5.5版本开始,一个(包括它的Java和XML DSL)公开了实现的选项。 此函数用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。 可以查阅此条件,而不是循环访问组中的所有消息。 有关更多信息,请参阅JavaDocs 和消息组条件。​​AbstractCorrelatingMessageHandler​​​​groupConditionSupplier​​​​BiFunction<Message<?>, String, String>​​​​ReleaseStrategy​​​​GroupConditionProvider​

另请参阅文件聚合器。

重新排序器

重新排序器与聚合器相关,但用途不同。 当聚合器合并消息时,重新排序器传递消息而不更改它们。

功能性

resequencer 的工作方式与聚合器类似,因为它使用 thes 将消息存储在组中。 不同之处在于,重新排序器不会以任何方式处理消息。 相反,它会按其标头值的顺序释放它们。​​CORRELATION_ID​​​​SEQUENCE_NUMBER​

关于这一点,您可以选择一次释放所有消息(在整个序列之后,根据和其他可能性),或者在有效序列可用时立即释放。 (我们将在本章后面介绍“有效序列”的含义。​​SEQUENCE_SIZE​

重新排序器旨在对间隙较小的相对较短的消息序列进行重新排序。 如果有大量具有许多间隙的不相交序列,则可能会遇到性能问题。

配置重新排序器

有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。

配置重新排序器只需要在 XML 中包含相应的元素。

以下示例显示了重新排序器配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"
input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="discardChannel"
release-partial-sequences="true"
message-store="messageStore"
send-partial-result-on-expiry="true"
send-timeout="86420000"
correlation-strategy="correlationStrategyBean"
correlation-strategy-method="correlate"
correlation-strategy-expression="headers['something']"
release-strategy="releaseStrategyBean"
release-strategy-method="release"
release-strategy-expression="size() == 10"
empty-group-min-timeout="60000"

lock-registry="lockRegistry"

group-timeout="60000"
group-timeout-expression="size() ge 2 ? 100 : -1"
scheduler="taskScheduler" />
expire-group-upon-timeout="false" />

重新排序器的 id 是可选的。

重新排序器的输入通道。 必填。

重新排序程序将重新排序的消息发送到的通道。 自选。

重新排序程序将超时的消息发送到的通道(ifis 设置为)。 自选。​​send-partial-result-on-timeout​​​​false​

是尽快发送有序序列,还是仅在整个消息组到达后发送。 自选。 (默认值为。​​false​

对 的引用可用于在其相关键下存储消息组,直到它们完成。 自选。 (默认值为易失性内存中存储。​​MessageGroupStore​

在组过期后,是否应发送有序的组(即使缺少某些消息)。 自选。 (默认值为 false。 请参阅管理聚合器中的状态:消息组存储。

发送回复理论时等待的超时间隔。 默认为 ,无限期阻止。 仅当输出通道具有某些“发送”限制(例如具有固定的“容量”)时,才应用它。 在这种情况下,ais 抛出。 泰斯忽略了实现。 对于,从计划的过期任务会导致此任务重新计划。 自选。​​Message​​​​output-channel​​​​discard-channel​​​​-1​​​​QueueChannel​​​​MessageDeliveryException​​​​send-timeout​​​​AbstractSubscribableChannel​​​​group-timeout(-expression)​​​​MessageDeliveryException​

对实现消息关联(分组)算法的 Bean 的引用。 Bean 可以是接口的实现,也可以是 POJO。 在后一种情况下,还必须定义属性。 自选。 (默认情况下,聚合器使用标头。​​CorrelationStrategy​​​​correlation-strategy-method​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​

在 bean 引用的 beand 上定义的方法,用于实现相关决策算法。 可选,有限制(要求存在)。​​correlation-strategy​​​​correlation-strategy​

表示相关策略的 SpEL 表达式。 例:。 只允许一个奥福里斯。​​"headers['something']"​​​​correlation-strategy​​​​correlation-strategy-expression​

对实现发布策略的 Bean 的引用。 Bean 可以是接口的实现,也可以是 POJO。 在后一种情况下,还必须定义属性。 可选(默认情况下,聚合器将使用标头属性)。​​ReleaseStrategy​​​​release-strategy-method​​​​IntegrationMessageHeaderAccessor.SEQUENCE_SIZE​

在 Bean 引用的 beand 上定义的方法,它实现完成决策算法。 可选,有限制(要求存在)。​​release-strategy​​​​release-strategy​

表示发布策略的 SpEL 表达式。 表达式的根对象是 a。 例:。 只允许一个奥福里斯。​​MessageGroup​​​​"size() == 5"​​​​release-strategy​​​​release-strategy-expression​

仅当 ais 配置为 时才适用。 默认情况下,当 ais 配置为使部分组过期时,也会删除空组。 正常释放组后,存在空组。 这是为了能够检测和丢弃延迟到达的消息。 如果希望空组的过期时间长于使部分组过期,请设置此属性。 然后,不会从中删除空组,直到它们至少在此毫秒数内未被修改。 请注意,使空组过期的实际时间也受收割者的超时属性的影响,它可能与此值加上超时一样多。​​MessageGroupStoreReaper​​​​<resequencer>​​​​MessageStore​​​​MessageGroupStoreReaper​​​​MessageStore​

请参阅使用 XML 配置聚合器。

请参阅使用 XML 配置聚合器。

请参阅使用 XML 配置聚合器。

请参阅使用 XML 配置聚合器。

默认情况下,当组因超时(或超时)而完成时,将保留空组的元数据。 迟到的消息将立即被丢弃。 设置此项可完全删除组。 然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。 由于序列范围内的“漏洞”导致超时,新组永远不会正常释放。 空组可以在以后通过与属性一起使用来过期(完全删除)。 从版本 5.0 开始,空组也计划在过期后删除。 默认值为“假”。​​MessageGroupStoreReaper​​​​true​​​​MessageGroupStoreReaper​​​​empty-group-min-timeout​​​​empty-group-min-timeout​

另请参阅聚合器过期组以了解更多信息。

由于在 Java 类中没有要为重新排序器实现的自定义行为,因此没有对它的注释支持。

消息处理程序链

The是可以配置为单个消息端点的实现,同时实际委托给其他处理程序链,例如过滤器、转换器、拆分器等。 当多个处理程序需要以固定的线性级数连接时,这可能会导致更简单的配置。 例如,在其他组件之前提供变压器是相当常见的。 同样,当您在链中的其他组件之前提供筛选器时,实质上是创建选择性使用者。 无论哪种情况,链条都只需要一个和单个,无需为每个单独的组件定义通道。​​MessageHandlerChain​​​​MessageHandler​​​​input-channel​​​​output-channel​

这主要是为XML配置而设计的。 对于Java DSL,定义可以被视为链组件,但它与下面本章中描述的概念和原则无关。 有关更多信息,请参阅Java DSL​。​​MessageHandlerChain​​​​IntegrationFlow​

Spring Integration提供了一个布尔属性: 当您在同一点对点通道上为多个具有不同接受标准的选择性使用者提供时,应将此值设置为“true”(默认值为),以便调度程序知道消息被拒绝,并因此尝试将消息传递给其他订阅者。 如果未引发异常,则调度程序会认为消息已成功传递,即使筛选器已丢弃消息以防止进一步处理。 如果您确实想要“丢弃”消息,过滤器的“丢弃通道”可能会很有用,因为它确实让您有机会对丢弃的消息执行一些操作(例如将其发送到 JMS 队列或将其写入日志)。​​Filter​​​​throwExceptionOnRejection​​​​false​

处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某些时候需要非线性排列,则修改配置是微不足道的。

在内部,链扩展为所列端点的线性设置,由匿名通道分隔。 链中不考虑回复通道标头。 只有在调用最后一个处理程序后,生成的消息才会转发到回复通道或链的输出通道。 由于此设置,除最后一个处理程序之外的所有处理程序都必须实现接口(它提供了“setOutputChannel()”方法)。 如果设置了theon,则最后一个处理程序只需要一个输出通道。​​MessageProducer​​​​outputChannel​​​​MessageHandlerChain​

与其他端点一样,它们是可选的。 如果链的末尾有回复消息,则输出通道优先。 但是,如果不可用,链处理程序将检查入站消息上的回复通道标头作为回退。​​output-channel​

在大多数情况下,您不需要自己实现。 下一节重点介绍链元素的命名空间支持。 大多数 Spring 集成端点,例如服务激活器和转换器,都适合在 a 中使用。​​MessageHandler​​​​MessageHandlerChain​

配置链

元素提供了一个属性。 如果链中的最后一个元素能够生成回复消息(可选),则它还支持 anattribute。 然后,子元素是筛选器、转换器、拆分器和服务激活器。 最后一个元素也可以是路由器或出站通道适配器。 以下示例显示了一个链定义:​​<chain>​​​​input-channel​​​​output-channel​

<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>

前面的示例中使用的元素设置一个消息标头,该标头的名称为消息上的值。 标头扩充器是仅涉及标头值的专用化。 您可以通过实现 athat 进行标头修改并将其连接为 bean 来获得相同的结果,但标头丰富器是一个更简单的选择。​​<header-enricher>​​​​thing1​​​​thing2​​​​Transformer​​​​MessageHandler​

可以配置为消息流的最后一个“封闭框”使用者。 对于此解决方案,您可以将其放在<链>某个<出站通道适配器>的末尾,如以下示例所示:​​<chain>​

<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>

不允许的属性和元素


某些属性(如 and)不允许在链中使用的组件上指定。 轮询器子元素也是如此。​​order​​​​input-channel​



对于 Spring 集成核心组件,XML 模式本身强制执行其中一些约束。 但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 命名空间分析器而不是 XML 架构强制实施。



这些 XML 命名空间解析器约束是在 Spring Integration 2.2 中添加的。 如果尝试使用不允许的属性和元素,XML 命名空间分析器将引发 a。​​BeanDefinitionParsingException​


使用“id”属性

从 Spring Integration 3.0 开始,如果给链元素一个属性,则该元素的 bean 名称是链和元素本身的组合。 没有属性的元素不会注册为 bean,但每个元素都被赋予一个包含链的元素。 请考虑以下示例:​​id​​​​id​​​​id​​​​id​​​​componentName​​​​id​

<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>

在前面的示例中:

  • 根元素有一个'somethingChain'。 因此,实现(或,取决于类型)bean 将此值作为其 Bean 名称。<chain>idAbstractEndpointPollingConsumerEventDrivenConsumerinput-channel
  • Thebean 获取了一个 bean 别名('somethingChain.handler'),它允许从 bean 直接访问这个 bean。MessageHandlerChainBeanFactory
  • 这不是一个成熟的消息传递端点(它不是aor)。 它是在内。 在这种情况下,注册的 bean 名称是 'somethingChain$child.somethingService.handler'。<service-activator>PollingConsumerEventDrivenConsumerMessageHandler<chain>BeanFactory
  • 这采用相同的值,但没有“.handler”后缀。 它变成了'somethingChain$child.somethingService'。componentNameServiceActivatingHandler
  • 最后一个子组件 ,没有属性。 它基于它在中的位置。 在这种情况下,它是'somethingChain$child#1'。 (名称的最后一个元素是链中的顺序,以“#0”开头)。 请注意,此转换器未在应用程序上下文中注册为 Bean,因此它不会获得 a。 但是,它具有一个对日志记录和其他目的有用的值。<chain><object-to-json-transformer>idcomponentName<chain>beanNamecomponentName

属性 forelements 使它们有资格进行JMX 导出,并且可以在消息历史记录中跟踪它们。 如前所述,您可以使用适当的 Bean 名称从 访问它们。​​id​​​​<chain>​​​​BeanFactory​

在元素上提供一个显式属性以简化日志中子组件的识别并提供从 etc 对它们的访问非常有用。​​id​​​​<chain>​​​​BeanFactory​

从链内调用链

有时,您需要从链内对另一条链进行嵌套调用,然后返回并继续在原始链内执行。 为此,您可以通过包含 <gateway> 元素来使用消息传递网关,如以下示例所示:

<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>

在前面的示例中,在处理结束时由那里配置的“网关”元素调用。 在标头扩充后对 ais 的调用。 然后流返回以完成执行。 最后,流返回到。 当在链中定义元素的嵌套版本时,它不需要属性。 相反,它将消息置于其当前状态,并将其放置在属性中定义的通道上。 当该网关启动的下游流完成时,ais 返回到网关并继续其在当前链中的旅程。​​nested-chain-a​​​​main-chain​​​​nested-chain-a​​​​nested-chain-b​​​​nested-chain-b​​​​main-chain​​​​<gateway>​​​​service-interface​​​​request-channel​​​​Message​

分散-聚集

从版本 4.1 开始,Spring 集成提供了分散-聚集​企业集成模式的实现。 它是一个复合终结点,其目标是向收件人发送消息并聚合结果。 如企业集成模式中所述,它是“最佳报价”等场景的组件,我们需要从多个供应商请求信息,并确定哪一个为我们提供所请求项目的最佳术语。

以前,可以使用分立元件配置模式。 此增强功能带来了更方便的配置。

这是一个结合了 a(或 a) 和 an 的请求-答复终结点。 请求消息被发送到通道,并等待聚合器发送到的回复。​​ScatterGatherHandler​​​​PublishSubscribeChannel​​​​RecipientListRouter​​​​AggregatingMessageHandler​​​​scatter​​​​ScatterGatherHandler​​​​outputChannel​

功能性

该模式提出了两种情况:“拍卖”和“分配”。 在这两种情况下,函数是相同的,并提供所有可用的选项。 (实际上,只需要构造函数参数。 有关详细信息,请参阅聚合器。​​Scatter-Gather​​​​aggregation​​​​AggregatingMessageHandler​​​​ScatterGatherHandler​​​​AggregatingMessageHandler​

拍卖

拍卖变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是 awith。 但是,此通道可以是任何实现(就像 thein 的情况一样 — 请参阅内容丰富器)。 但是,在这种情况下,您应该为函数创建自己的自定义。​​Scatter-Gather​​​​PublishSubscribeChannel​​​​apply-sequence="true"​​​​MessageChannel​​​​request-channel​​​​ContentEnricher​​​​correlationStrategy​​​​aggregation​

分配

该发行版变体基于(请参阅收件人列表路由器)具有所有可用的选项。 这是第二个构造函数参数。 如果你只想依赖默认值对于和,你应该指定。 否则,您应该提供一个自定义。 与变体(拍卖变体)不同,具有选项允许根据消息过滤目标供应商。 有了,提供了默认值,并且可以正确释放组。 分配选项与拍卖选项相互排斥。​​Scatter-Gather​​​​RecipientListRouter​​​​RecipientListRouter​​​​ScatterGatherHandler​​​​correlationStrategy​​​​recipient-list-router​​​​aggregator​​​​apply-sequence="true"​​​​correlationStrategy​​​​aggregator​​​​PublishSubscribeChannel​​​​recipient-list-router​​​​selector​​​​apply-sequence="true"​​​​sequenceSize​​​​aggregator​

只有基于构造函数配置的纯 Java 配置才需要,因为框架不能改变外部提供的组件。 为了方便起见,XML 和 Java DSL 从版本 6.0 开始为 true。​​applySequence=true​​​​ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)​​​​Scatter-Gather​​​​applySequence​

对于竞价和分发变体,请求(分散)消息都使用标头进行扩充,以等待来自的回复消息。​​gatherResultChannel​​​​aggregator​

默认情况下,所有供应商都应将其结果发送到标头(通常通过省略最终端点)。 但是,还提供了该选项,允许供应商将其回复发送到该通道以进行聚合。​​replyChannel​​​​output-channel​​​​gatherChannel​

配置分散-聚集端点

以下示例显示了 Bean 定义的 Java 配置:​​Scatter-Gather​

@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}

@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}

在前面的示例中,我们配置了 bean withand 接收通道列表。 下一个豆子是用于一个。 最后,我们将这两个 bean 注入到bean定义中,并将其标记为a,将分散-聚集组件连接到集成流中。​​RecipientListRouter​​​​distributor​​​​applySequence="true"​​​​AggregatingMessageHandler​​​​ScatterGatherHandler​​​​@ServiceActivator​

下面的示例演示如何使用 XML 命名空间配置终结点:​​<scatter-gather>​

<scatter-gather
id=""
auto-startup=""
input-channel=""
output-channel=""
scatter-channel=""
gather-channel=""
order=""
phase=""
send-timeout=""
gather-timeout=""
requires-reply="" >
<scatterer/>
<gatherer/>
</scatter-gather>


终结点的 ID。 Thebean 注册了别名 的。 Thebean 注册了别名 的。 这。 自选。 (生成默认值。​​ScatterGatherHandler​​​​id + '.handler'​​​​RecipientListRouter​​​​id + '.scatterer'​​​​AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'​​​​BeanFactory​​​​id​

生命周期属性,指示是否应在应用程序上下文初始化期间启动终结点。 此外,还实现并启动和停止,如果提供 ais 在内部创建。 自选。 (默认值为。​​ScatterGatherHandler​​​​Lifecycle​​​​gatherEndpoint​​​​gather-channel​​​​true​

接收请求消息以在其中处理它们的通道。 必填。​​ScatterGatherHandler​

将聚合结果发送到的通道。 自选。 (传入消息可以在消息标头中自行指定回复通道)。​​ScatterGatherHandler​​​​replyChannel​

拍卖方案的分散消息发送到的通道。 自选。 与子元素互斥。​​<scatterer>​

从每个供应商接收聚合答复的渠道。 它用作分散消息中的标头。 自选。 默认情况下,创建。​​replyChannel​​​​FixedSubscriberChannel​

当多个处理程序订阅相同时此组件的顺序(用于负载平衡目的)。 自选。​​DirectChannel​

指定应启动和停止终结点的阶段。 启动顺序从最低到最高,关闭顺序从最高到最低。 默认情况下,此值为 ,表示此容器尽可能晚地启动并尽快停止。 自选。​​Integer.MAX_VALUE​

发送答复时等待的超时间隔。 默认情况下,块为一秒钟。 仅当输出通道具有某些“发送”限制时,它才适用,例如,具有已满的固定“容量”。 在这种情况下,ais 抛出。 泰斯忽略了实现。 对于,从计划的过期任务会导致此任务重新计划。 自选。​​Message​​​​output-channel​​​​send()​​​​QueueChannel​​​​MessageDeliveryException​​​​send-timeout​​​​AbstractSubscribableChannel​​​​group-timeout(-expression)​​​​MessageDeliveryException​

允许您指定分散-收集在返回之前等待回复消息的时间。 默认情况下,它会无限期等待。 如果回复超时,则返回“null”。 自选。 它默认为,表示无限期等待。​​-1​

指定分散-聚集是否必须返回非空值。 默认情况下,此值为。 因此,当基础聚合器在之后返回空值时,将引发 ais 。 注意,ifis 是一种可能性,应该指定以避免无限期等待。​​true​​​​ReplyRequiredException​​​​gather-timeout​​​​null​​​​gather-timeout​

选项。 自选。 与属性互斥。​​<recipient-list-router>​​​​scatter-channel​

选项。 必填。​​<aggregator>​

错误处理

由于分散-收集是一个多请求-答复组件,因此错误处理具有一些额外的复杂性。 在某些情况下,如果允许流程以少于请求的回复完成,则最好只捕获并忽略下游异常。 在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的内容。​​ReleaseStrategy​

每个异步子流都应配置一个标头,以便从中发送正确的错误消息。 否则,错误将使用通用错误处理逻辑发送到全局。 有关异步错误处理的详细信息,请参阅错误处理。​​errorChannel​​​​MessagePublishingErrorHandler​​​​errorChannel​

同步流可能会使用 anfor 忽略异常或返回补偿消息。 当异常从其中一个子流抛出到 时,它只是重新抛出到上游。 这样,所有其他子流将一无所获,并且它们的回复将被忽略。 有时这可能是预期行为,但在大多数情况下,最好处理特定子流中的错误,而不会影响所有其他子流和收集器中的预期。​​ExpressionEvaluatingRequestHandlerAdvice​​​​ScatterGatherHandler​​​​ScatterGatherHandler​

从版本 5.1.3 开始,随选项一起提供。 它填充到分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流中使用,以直接发送错误消息。​​ScatterGatherHandler​​​​errorChannelName​​​​errorChannel​

下面的示例配置演示了通过返回补偿消息进行异步错误处理:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}

为了产生正确的回复,我们必须从 的 复制标头(包括和)的 已经发送到的。 这样,目标异常将返回给 for 回复消息组完成的收集器。 这样的异常可以在收集器中过滤掉,或者在分散-收集端点之后以其他方式处理下游。​​replyChannel​​​​errorChannel​​​​failedMessage​​​​MessagingException​​​​scatterGatherErrorChannel​​​​MessagePublishingErrorHandler​​​​ScatterGatherHandler​​​​payload​​​​MessageGroupProcessor​

在将分散结果发送到收集器之前,恢复请求消息标头,包括回复和错误通道(如果有)。 这样,来自 theare 的错误将传播到调用方,即使在分散的接收者子流中应用了异步切换也是如此。 要成功操作,必须将 a,andheader 传输回分散收件人子流的回复。 在这种情况下,必须配置合理的、有限的。 否则,默认情况下,它将永远被阻止等待收集者的回复。​​ScatterGatherHandler​​​​AggregatingMessageHandler​​​​gatherResultChannel​​​​originalReplyChannel​​​​originalErrorChannel​​​​gatherTimeout​​​​ScatterGatherHandler​

螺纹屏障

有时,我们需要挂起消息流线程,直到发生其他异步事件。 例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。 我们可能希望在 RabbitMQ 代理发出消息已收到确认之前不回复用户。

在 4.2 版中,Spring Integration 为此目的引入了组件。 底层是。 此类还实现,其中传递给方法的消息释放方法中的相应线程(如果存在)。​​<barrier/>​​​​MessageHandler​​​​BarrierMessageHandler​​​​MessageTriggerAction​​​​trigger()​​​​handleRequestMessage()​

挂起的线程和触发器线程通过调用 aon 消息来关联。 当消息发送到时,线程将暂停长达几毫秒,等待相应的触发消息。 默认关联策略使用标头。 当具有相同相关性的触发器消息到达时,将释放线程。 发送到后版本的消息是使用 a 构造的。 默认情况下,消息是两个有效负载,标头使用 a 合并。​​CorrelationStrategy​​​​input-channel​​​​requestTimeout​​​​IntegrationMessageHeaderAccessor.CORRELATION_ID​​​​output-channel​​​​MessageGroupProcessor​​​​Collection<?>​​​​DefaultAggregatingMessageGroupProcessor​

如果首先调用该方法(或在主线程超时后),则会挂起该方法,直到等待挂起消息到达。 如果不想挂起触发器线程,请考虑移交给 ainstead 以便其线程挂起。​​trigger()​​​​triggerTimeout​​​​TaskExecutor​

在版本 5.4 之前,请求和触发器消息只有一个选项,但在某些用例中,最好为这些操作设置不同的超时。 因此,引入了选项。​​timeout​​​​requestTimeout​​​​triggerTimeout​

该属性确定在触发器消息到达之前挂起的线程超时时要执行的操作。 默认情况下,它是,这意味着终结点返回,流结束,线程返回到调用方。 当,艾斯扔了。​​requires-reply​​​​false​​​​null​​​​true​​​​ReplyRequiredException​

可以通过编程方式调用该方法(通过使用名称获取 Bean 引用,— 其中屏障端点的 Bean 名称)。 或者,您可以配置 anto 触发器发布。​​trigger()​​​​barrier.handler​​​​barrier​​​​<outbound-channel-adapter/>​

只能挂起一个具有相同相关性的线程。 同一相关性可以多次使用,但只能同时使用一次。 如果第二个线程到达时具有相同的相关性,则会引发异常。

以下示例演示如何使用自定义标头进行关联:

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}

根据哪个消息先到达,要么是发送消息的线程,要么是发送消息的线程等待最多十秒钟,直到另一条消息到达。 释放消息时,将向通道发送一条消息,该消息结合了调用自定义 Bean(命名)的结果。 如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的丢弃通道。​​in​​​​release​​​​out​​​​MessageGroupProcessor​​​​myOutputProcessor​

举报

相关推荐

0 条评论