0
点赞
收藏
分享

微信扫一扫

Spring Integration (集成)消息传递终结点(二)

Spring Integration (集成)消息传递终结点(二)_Groovy

服务激活器

服务激活器是端点类型,用于将任何 Spring 管理的对象连接到输入通道,以便它可以扮演服务的角色。 如果服务生成输出,则还可以连接到输出通道。 或者,输出生成服务可能位于处理管道或消息流的末尾,在这种情况下,可以使用入站消息的标头。 如果未定义输出通道,这是默认行为。 与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。​​replyChannel​

配置服务激活器

要创建服务激活器,请使用具有“输入通道”和“ref”属性的“服务激活器”元素,如以下示例所示:

<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>

上述配置从中选择满足消息传递要求之一的所有方法,如下所示:​​exampleHandler​

  • 注释为@ServiceActivator
  • public
  • 不返回如果voidrequiresReply == true

运行时调用的目标方法是按其类型为每个请求消息选择的,或者作为回退到类型(如果目标类上存在此类方法)。​​payload​​​​Message<?>​

从版本 5.0 开始,可以使用 thes 标记一种服务方法作为所有不匹配情况的回退。 当使用内容类型转换并在转换后调用目标方法时,这可能很有用。​​@org.springframework.integration.annotation.Default​

若要委托给任何对象的显式定义的方法,可以添加属性,如以下示例所示:​​method​

<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>

在任一情况下,当服务方法返回非 null 值时,终结点会尝试将回复消息发送到相应的回复通道。 要确定回复通道,它首先检查终结点配置中是否提供了 anwas,如以下示例所示:​​output-channel​

<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>

如果该方法返回结果并定义了 nois,则框架将检查请求消息的标头值。 如果该值可用,则检查其类型。 如果是 a,则回复消息将发送到该通道。 如果是 a,则终端节点尝试将通道名称解析为通道实例。 如果无法解析通道,则抛出 ais 。 如果可以解决,则消息将发送到那里。 如果请求消息没有标头并且对象为 a,则查询其标头以获取目标目标。 这是 Spring 集成中用于请求-回复消息传递的技术,也是返回地址模式的一个例子。​​output-channel​​​​replyChannel​​​​MessageChannel​​​​String​​​​DestinationResolutionException​​​​replyChannel​​​​reply​​​​Message​​​​replyChannel​

如果您的方法返回结果,并且您想要丢弃它并结束流,则应将 thet 配置为发送到 a。 为方便起见,框架注册了一个名称,。 有关详细信息,请参阅特殊频道。​​output-channel​​​​NullChannel​​​​nullChannel​

服务激活器是生成回复消息不需要的组件之一。 如果方法 returnsor 具有 areturn 类型,则服务激活器将在方法调用后退出,没有任何信号。 此行为可以通过该选项进行控制,该选项在使用 XML 命名空间进行配置时也会公开。 如果该标志设置为 并且该方法返回 null,则抛出 ais 。​​null​​​​void​​​​AbstractReplyProducingMessageHandler.requiresReply​​​​requires-reply​​​​true​​​​ReplyRequiredException​

服务方法中的参数可以是消息,也可以是任意类型。 如果是后者,则假定它是消息有效负载,从消息中提取并注入到服务方法中。 我们通常推荐这种方法,因为它在使用 Spring 集成时遵循并推广了 POJO 模型。 参数也可以具有注释,如注释支持中所述。​​@Header​​​​@Headers​

服务方法不需要有任何参数,这意味着您可以实现事件样式的服务激活器(其中您关心的只是对服务方法的调用),而不必担心消息的内容。 可以将其视为空的 JMS 消息。 这种实现的一个示例用例是存放在输入通道上的消息的简单计数器或监视器。

从版本 4.1 开始,框架正确地将消息属性(和)转换为 Java 8POJO 方法参数,如以下示例所示:​​payload​​​​headers​​​​Optional​

public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}

}

如果自定义服务激活器处理程序实现可以在其他定义中重用,则通常建议使用 aattribute。 但是,如果定制服务激活器处理程序实现仅在单个定义中使用,则可以提供内部 Bean 定义,如以下示例所示:​​ref​​​​<service-activator>​​​​<service-activator>​

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>


不允许在同一配置中同时使用 theattribute 和内部处理程序定义,因为它会产生不明确的条件并导致引发异常。​​ref​​​​<service-activator>​

如果属性引用扩展的 bean(例如框架本身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。 在这种情况下,每个都必须是单独的 Bean 实例(或 a-scope Bean)或使用内部配置类型。 如果无意中从多个 Bean 引用了相同的消息处理程序,则会出现配置异常。​​ref​​​​AbstractMessageProducingHandler​​​​ref​​​​prototype​​​​<bean/>​

服务激活器和 Spring 表达式语言 (SpEL)

从Spring Integration 2.0开始,服务激活器也可以从SpEL中受益。

例如,您可以调用任何 Bean 方法,而无需指向属性中的 Bean 或将其作为内部 Bean 定义包含在内,如下所示:​​ref​

<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>

<bean id="accountService" class="thing1.thing2.Account"/>

在前面的配置中,我们没有使用 aor 作为内 bean 来注入 'accountService',而是使用 SpEL 的表示法并调用一个采用与消息有效负载兼容类型的方法。 我们还传递一个标头值。 可以根据消息中的任何内容计算任何有效的 SpEL 表达式。 对于简单方案,如果所有逻辑都可以封装在此类表达式中,则服务激活器不需要引用 Bean,如以下示例所示:​​ref​​​​@beanId​

<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>

在前面的配置中,我们的服务逻辑是将有效负载值乘以 2。 SpEL让我们相对容易地处理它。

有关配置服务激活器的更多信息,请参阅 Java DSL 一章中的服务激活器和 .handle() 方法。

异步服务激活器

服务激活器由调用线程调用。 如果输入通道是 a,或者 是 的轮询器线程,则这是一个上游线程。 如果服务返回 a,则默认操作是将其作为发送到输出(或回复)通道的消息的有效负载发送。 从版本 4.3 开始,您现在可以将属性设置为 (通过使用使用 Java 配置时)。 如果服务在此设置此属性时返回,则会立即释放调用线程,并在完成将来的线程(从服务内部)上发送回复消息。 这对于使用 a 的长时间运行的服务特别有利,因为释放轮询器线程以在框架内执行其他服务。​​SubscribableChannel​​​​PollableChannel​​​​CompletableFuture<?>​​​​async​​​​true​​​​setAsync(true)​​​​CompletableFuture<?>​​​​async​​​​true​​​​PollableChannel​

如果服务以 完成将来,则会发生正常的错误处理。 Anis 发送到消息标头(如果存在)。 否则,anis 将发送到默认值(如果可用)。​​Exception​​​​ErrorMessage​​​​errorChannel​​​​ErrorMessage​​​​errorChannel​

服务激活器和方法返回类型

服务方法可以返回任何成为回复消息有效负载的类型。 在这种情况下,将创建一个新对象,并复制请求消息中的所有标头。 对于大多数 Spring 集成实现,当交互基于 POJO 方法调用时,这的工作方式相同。​​Message<?>​​​​MessageHandler​

也可以从该方法返回完整对象。 但是,请记住,与转换器不同,对于服务激活器,如果返回的消息中尚不存在请求消息中的标头,则将通过复制请求消息中的标头来修改此消息。 因此,如果您的方法参数为 a,并且您在服务方法中复制了一些(但不是全部)现有标头,它们将重新出现在回复消息中。 从回复消息中删除标头不是服务激活器的责任,遵循松散耦合原则,最好在集成流中添加标头。 或者,可以使用转换器代替服务激活器,但在这种情况下,当返回 fullthe 方法时,该方法完全负责消息,包括复制请求消息标头(如果需要)。 必须确保必须保留重要的框架标头(例如),如果存在。​​Message<?>​​​​Message<?>​​​​HeaderFilter​​​​Message<?>​​​​replyChannel​​​​errorChannel​

脱模剂

delayer 是一个简单的端点,它允许消息流延迟一定的间隔。 当邮件延迟时,原始发件人不会阻止。 相反,延迟的消息被安排为一个实例,以便在延迟过后发送到输出通道。 这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量阻塞的发送方线程。 相反,在典型情况下,线程池用于实际执行释放消息。 本节包含配置分层器的几个示例。​​org.springframework.scheduling.TaskScheduler​

配置中继器

该元素用于延迟两个消息通道之间的消息流。 与其他端点一样,您可以提供“输入通道”和“输出通道”属性,但 delayer 还具有“默认延迟”和“表达式”属性(以及“表达式”元素),用于确定每条消息应延迟的毫秒数。 以下示例将所有消息延迟三秒:​​<delayer>​

<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>

如果需要确定每条消息的延迟,还可以使用“表达式”属性提供 SpEL 表达式,如以下表达式所示:

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}

在前面的示例中,仅当给定入站消息的表达式计算结果为 null 时,三秒延迟才适用。 如果只想对具有有效表达式计算结果的消息应用延迟,则可以使用 “default-delay” of(default)。 对于延迟为 (或更少) 的任何消息,消息将立即在调用线程上发送。​​0​​​​0​

XML 解析器使用消息组标识 。​​<beanName>.messageGroupId​

延迟处理程序支持表示以毫秒为单位的间隔的表达式计算结果(anywhose方法生成可解析为的值)以及表示绝对时间的实例。 在第一种情况下,毫秒从当前时间开始计数(例如,值 of将使消息从 delayer 接收到消息的时间起至少延迟五秒)。 对于 ainstance,消息直到该对象表示的时间才会释放。 等同于非正延迟或过去的日期的值不会导致任何延迟。 相反,它被直接发送到原始发送方线程上的输出通道。 如果表达式求值结果不是 a,并且无法解析为 a,则应用默认延迟(如果有 — 默认值为)。​​Object​​​​toString()​​​​Long​​​​java.util.Date​​​​5000​​​​Date​​​​Date​​​​Date​​​​Long​​​​0​

表达式计算可能会由于各种原因(包括无效表达式或其他条件)引发计算异常。 默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且 delayer 回退到默认延迟(如果有)。 您可以通过设置属性来修改此行为。 默认情况下,此属性设置为 并且分层行为如前所述。 但是,如果您不希望忽略表达式求值异常并将其抛给 delayer 的调用方,请将属性设置为 。​​ignore-expression-failures​​​​true​​​​ignore-expression-failures​​​​false​


在前面的示例中,延迟表达式指定为 。 这是访问 aelement (implements) 的 SpEL语法。 它调用:。 对于简单的映射元素名称(不包含“.”),您还可以使用 SpEL “dot accessor” 语法,其中前面显示的标头表达式可以指定为 。 但是,如果缺少标头,则会获得不同的结果。 在第一种情况下,表达式的计算结果为 。 第二个结果类似于以下内容:​​headers['delay']​​​​Indexer​​​​Map​​​​MessageHeaders​​​​Map​​​​headers.get("delay")​​​​headers.delay​​​​null​






org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'






因此,如果有可能省略标头,并且您希望回退到默认延迟,则通常使用索引器语法而不是点属性访问器语法更有效(并建议这样做),因为检测 null 比捕获异常更快。


分解器委托给春天抽象的一个实例。 Delayer 使用的默认调度程序是 Spring 集成在启动时提供的实例。 请参阅配置任务计划程序。 如果要委托给其他调度程序,可以通过 delayer 元素的“调度程序”属性提供引用,如以下示例所示:​​TaskScheduler​​​​ThreadPoolTaskScheduler​

<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>

如果配置外部,则可以设置此属性。 它允许在应用程序关闭时成功完成已处于执行状态(释放消息)的“延迟”任务。 在Spring Integration 2.2之前,此属性在element上可用,因为可以在后台创建自己的调度程序。 从 2.2 开始,分层器需要一个外部调度程序实例,并且被删除了。 您应该使用调度程序自己的配置。​​ThreadPoolTaskScheduler​​​​waitForTasksToCompleteOnShutdown = true​​​​<delayer>​​​​DelayHandler​​​​waitForTasksToCompleteOnShutdown​

​ThreadPoolTaskScheduler​​​有一个属性,可以注入一些实现。 此处理程序允许从发送延迟消息的计划任务的线程进行处理。 默认情况下,它使用 an,您可以在日志中看到堆栈跟踪。 您可能需要考虑使用 an,它将 aninto 从失败消息的标头或默认消息的标头发送。 此错误处理在事务回滚(如果存在)后执行。 请参阅发布失败​。​​errorHandler​​​​org.springframework.util.ErrorHandler​​​​Exception​​​​org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler​​​​org.springframework.integration.channel.MessagePublishingErrorHandler​​​​ErrorMessage​​​​error-channel​​​​error-channel​

删除器和消息存储

将延迟的消息持久化到提供的消息组中。 (“groupId”基于元素所需的“id”属性。 延迟的消息在计划任务之前立即从计划任务中删除。 如果提供的是持久性的(例如),则它提供了在应用程序关闭时不会丢失消息的功能。 应用程序启动后,来自其消息组的消息,并根据消息的原始到达时间(如果延迟为数字)以延迟重新调度它们。 对于延迟标头为 a 的消息,在重新调度时使用。 如果延迟消息停留在其“延迟”之外,则会在启动后立即发送。​​DelayHandler​​​​MessageStore​​​​<delayer>​​​​MessageStore​​​​DelayHandler​​​​output-channel​​​​MessageStore​​​​JdbcMessageStore​​​​DelayHandler​​​​MessageStore​​​​Date​​​​Date​​​​MessageStore​

可以使用两个相互排斥的元素之一来丰富:和。 这些 AOP 建议适用于代理内部,它有责任在延迟后在计划任务上发布消息。 例如,当下游消息流引发异常并且事务回滚时,可以使用它。 在这种情况下,延迟的消息将保留在持久消息中。 您可以使用 中的任何自定义实现。 Theelement 定义了一个简单的建议链,该链仅包含事务性建议。 以下示例显示 a:​​<delayer>​​​​<transactional>​​​​<advice-chain>​​​​List​​​​DelayHandler.ReleaseMessageHandler​​​​Thread​​​​ReleaseMessageHandler​​​​MessageStore​​​​org.aopalliance.aop.Advice​​​​<advice-chain>​​​​<transactional>​​​​advice-chain​​​​<delayer>​

<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>

可以导出为 JMX具有托管操作(和),这允许在运行时重新调度延迟的持久消息 - 例如,如果以前已停止。 这些操作可以通过命令调用,如以下示例所示:​​DelayHandler​​​​MBean​​​​getDelayedMessageCount​​​​reschedulePersistedMessages​​​​TaskScheduler​​​​Control Bus​

Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);

有关消息存储库、JMX 和控制总线的详细信息,请参阅系统管理。

从版本 5.3.7 开始,如果将消息存储到 a 中时事务处于活动状态,则在回调中调度释放任务。 这对于防止争用条件是必要的,在该争用条件中,计划的发布可能在事务提交之前运行,并且找不到消息。 在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。​​MessageStore​​​​TransactionSynchronization.afterCommit()​

发布失败

从版本 5.0.8 开始,分层器上有两个新属性:

  • ​maxAttempts​​(默认值 5)
  • ​retryDelay​​(默认为 1 秒)

当消息被释放时,如果下游流失败,之后将尝试释放。 如果达到,则丢弃该消息(除非发布是事务性的,在这种情况下,消息将保留在存储中,但将不再计划发布,直到重新启动应用程序或调用该方法,如上所述)。​​retryDelay​​​​maxAttempts​​​​reschedulePersistedMessages()​

此外,您可以配置;当发布失败时,ANIS 将发送到该通道,但作为有效负载除外,并具有属性。 包含包含当前计数的标头。​​delayedMessageErrorChannel​​​​ErrorMessage​​​​originalMessage​​​​ErrorMessage​​​​IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT​

如果错误流使用错误消息并正常退出,则不会执行进一步的操作;如果发布是事务性的,则事务将提交并从存储中删除消息。 如果错误流引发异常,将按照上述方式重试发布。​​maxAttempts​

脚本支持

Spring Integration 2.1 增加了对 Java 版本 6 中引入的JSR223 Scripting for Java 规范的支持。 它允许您使用用任何受支持的语言(包括Ruby,JRuby,Groovy和Kotlin)编写的脚本来为各种集成组件提供逻辑,类似于Spring Expression Language(SpEL)在Spring Integration中的使用方式。 有关 JSR223 的更多信息,请参阅文档。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>6.0.0</version>
</dependency>

此外,你需要添加一个脚本引擎实现,例如JRuby,Jython。

从版本 5.2 开始,Spring Integration 提供了 Kotlin Jsr223 支持。 您需要将这些依赖项添加到项目中才能使其正常工作:

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-script-util</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>

由提供的语言指示符或脚本文件选择的扩展名。​​KotlinScriptExecutor​​​​kotlin​​​​.kts​

为了使用 JVM 脚本语言,该语言的 JSR223 实现必须包含在类路径中。 Groovy和JRuby项目在其标准发行版中提供了JSR233支持。

各种JSR223语言实现已经由第三方开发。 特定实现与 Spring 集成的兼容性取决于它是否符合规范以及实现者对规范的解释。

如果您打算使用Groovy作为脚本语言,我们建议您使用Spring-Integration的Groovy支持,因为它提供了特定于Groovy的附加功能。 但是,本节也是相关的。

脚本配置

根据集成需求的复杂性,脚本可以作为 XML 配置中的 CDATA 内联提供,也可以作为对包含脚本的 Spring 资源的引用提供。 为了启用脚本支持,Spring 集成定义了 a,它将消息有效负载绑定到名为 d 的变量,将消息头绑定到 avariable,两者都可以在脚本执行上下文中访问。 您需要做的就是编写一个使用这些变量的脚本。 以下一对示例显示了创建筛选器的示例配置:​​ScriptExecutingMessageProcessor​​​​payload​​​​headers​

@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}

@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}

如前面的示例所示,脚本可以内联包含,也可以通过引用资源位置(通过使用属性)来包含。 此外,该属性对应于语言名称(或其 JSR223 别名)。​​location​​​​lang​

其他支持脚本的 Spring 集成端点元素包括,,,和。 每种情况下的脚本配置都与上述配置相同(除了端点元素)。​​router​​​​service-activator​​​​transformer​​​​splitter​

脚本支持的另一个有用功能是能够更新(重新加载)脚本,而无需重新启动应用程序上下文。 为此,请在元素上指定属性,如以下示例所示:​​refresh-check-delay​​​​script​

Scripts.processor(...).refreshCheckDelay(5000)
}

在前面的示例中,每 5 秒检查一次脚本位置的更新。 如果脚本已更新,则自更新后 5 秒后发生的任何调用都会导致运行新脚本。

请考虑以下示例:

Java DSL

.XML

Scripts.processor(...).refreshCheckDelay(0)
}

在前面的示例中,一旦发生任何脚本修改,就会使用此类修改更新上下文,从而为“实时”配置提供简单的机制。 任何负值表示在初始化应用程序上下文后不会重新加载脚本。 这是默认行为。 以下示例显示了一个永不更新的脚本:

Scripts.processor(...).refreshCheckDelay(-1)
}

无法重新加载内联脚本。

脚本变量绑定

需要变量绑定才能使脚本能够引用外部提供给脚本执行上下文的变量。 默认情况下,和用作绑定变量。 您可以使用元素(或选项)将其他变量绑定到脚本,如以下示例所示:​​payload​​​​headers​​​​<variable>​​​​ScriptSpec.variables()​

Java DSL

.XML

Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}

如前面的示例所示,您可以将脚本变量绑定到标量值或 Spring Bean 引用。 请注意,and仍作为绑定变量包含在内。​​payload​​​​headers​

在Spring Integration 3.0中,除了元素之外,还引入了属性。 此属性和元素并不相互排斥,您可以将它们组合在一个组件中。 但是,变量必须是唯一的,无论它们在何处定义。 此外,从 Spring Integration 3.0 开始,内联脚本也允许变量绑定,如以下示例所示:​​variable​​​​variables​​​​variable​​​​script​

<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>

前面的示例显示了内联脚本、aelement 和 aattribute 的组合。 该属性包含一个逗号分隔的值,其中每个段都包含变量及其值的“=”分隔对。 变量名称可以带有后缀,如前面示例中的变量所示。 这意味着绑定变量具有 name,,但该值是从应用程序上下文中对 bean 的引用。 这在使用属性占位符配置或命令行参数时可能很有用。​​variable​​​​variables​​​​variables​​​​-ref​​​​date-ref​​​​date​​​​dateBean​

如果您需要更好地控制变量的生成方式,则可以实现自己的使用策略的 Java 类,该策略由以下接口定义:​​ScriptVariableGenerator​

public interface ScriptVariableGenerator {

Map<String, Object> generateScriptVariables(Message<?> message);

}

此接口要求您实现该方法。 message 参数允许您访问消息有效负载和标头中可用的任何数据,返回值是绑定变量。 每次为消息执行脚本时都会调用此方法。 下面的示例演示如何提供 的实现并使用属性引用它:​​generateScriptVariables(Message)​​​​Map​​​​ScriptVariableGenerator​​​​script-variable-generator​

Java DSL

.XML

Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}

如果未提供 ais ,则使用脚本组件,它将任何提供的元素与 andvariables 合并。​​script-variable-generator​​​​DefaultScriptVariableGenerator​​​​<variable>​​​​payload​​​​headers​​​​Message​​​​generateScriptVariables(Message)​

不能同时提供属性和元素。 它们是相互排斥的。​​script-variable-generator​​​​<variable>​

GraalVM Polyglot

从版本 6.0 开始,该框架提供了基于GraalVM Polyglot API 的 API。 JavaScript 的 JSR223 引擎实现,从 Java 中移除,已被使用这个新的脚本执行器所取代。 查看有关在 GraalVM 中启用 JavaScript 支持以及可以通过脚本变量传播哪些配置选项的更多信息。 默认情况下,框架设置共享多语言,这将启用与主机 JVM 的交互:​​PolyglotScriptExecutor​​​​allowAllAccess​​​​true​​​​Context​

  • 新线程的创建和使用。
  • 对公共宿主类的访问。
  • 通过向类路径添加条目来加载新的主机类。
  • 将新成员导出到多语言绑定中。
  • 主机系统上不受限制的 IO 操作。
  • 通过实验选项。
  • 创建和使用新的子流程。
  • 对进程环境变量的访问。

这可以通过接受 a 的重载构造函数进行自定义。​​PolyglotScriptExecutor​​​​org.graalvm.polyglot.Context.Builder​

JavaScript在框架组件中的用法保持不变,不同的是,现在它只能在已安装组件的GraalVM上运行。​​js​

时髦的支持

在Spring Integration 2.0中,我们添加了Groovy支持,允许您使用Groovy脚本语言为各种集成组件提供逻辑 - 类似于支持Spring Expression Language(SpEL)进行路由,转换和其他集成问题的方式。 有关Groovy的更多信息,请参阅Groovy文档,您可以在项目网站上找到该文档。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>6.0.0</version>
</dependency>

此外,从版本 6.0 开始,还提供了用于集成流配置的Groovy DSL。

时髦配置

在 Spring Integration 2.1 中,Groovy 支持的配置命名空间是 Spring Integration 脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。 尽管 Groovy 脚本得到了通用脚本支持的良好支持,但 Groovy 支持提供了配置命名空间,该命名空间由 Spring Framework 的 sand 相关组件提供支持,提供了使用 Groovy 的扩展功能。 以下清单显示了两个示例配置:​​Groovy​​​​org.springframework.scripting.groovy.GroovyScriptFactory​

例 1.滤波器

<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>

<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>

如前面的示例所示,该配置看起来与常规脚本支持配置相同。 唯一的区别是使用 Groovy 命名空间,如命名空间前缀所示。 另请注意,标签上的属性在此命名空间中无效。​​int-groovy​​​​lang​​​​<script>​

时髦的对象定制

如果你需要自定义Groovy对象本身(除了设置变量之外),你可以引用一个通过使用属性实现的bean。 例如,如果要通过修改和注册函数来实现域特定语言 (DSL) 以在脚本中可用,这可能很有用。 以下示例演示如何执行此操作:​​GroovyObjectCustomizer​​​​customizer​​​​MetaClass​

<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>

<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>

设置自定义不是与元素或属性相互排斥的。 定义内联脚本时也可以提供它。​​GroovyObjectCustomizer​​​​<variable>​​​​script-variable-generator​

Spring Integration 3.0引入了theattribute,它与theelement一起工作。 此外,groovy 脚本能够将变量解析为 bean,如果未提供绑定变量的名称。 以下示例演示如何使用变量 ():​​variables​​​​variable​​​​BeanFactory​​​​entityManager​

<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>

​entityManager​​必须是应用程序上下文中的适当 Bean。

有关元素、属性和属性的详细信息,请参阅脚本变量绑定。<variable>variablesscript-variable-generator

时髦脚本编译器定制

Thehint是最受欢迎的Groovy编译器自定义选项。 它可以在类或方法级别使用。 有关更多信息,请参阅 Groovy参考手册,特别是@CompileStatic。 为了将此功能用于短脚本(在集成场景中),我们被迫将简单脚本更改为更类似于 Java 的代码。 请考虑以下脚本:​​@CompileStatic​​​​<filter>​

headers.type == 'good'

上述脚本在 Spring 集成中变为以下方法:

@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}

filter(headers)

这样,该方法被转换并编译为静态Java代码,绕过Groovy。 调用的动态阶段,例如工厂和代理。​​filter()​​​​getProperty()​​​​CallSite​

从版本 4.3 开始,您可以使用选项配置 Spring Integration Groovy 组件,指定应将 for添加到内部。 有了这个,你可以在我们的脚本代码中省略方法声明,仍然可以获得编译的纯 Java 代码。 在这种情况下,前面的脚本可以很短,但仍需要比解释脚本更详细一些,如以下示例所示:​​compile-static​​​​boolean​​​​ASTTransformationCustomizer​​​​@CompileStatic​​​​CompilerConfiguration​​​​@CompileStatic​

binding.variables.headers.type == 'good'

您必须通过属性访问 and(或任何其他)变量,因为我们没有动态能力。​​headers​​​​payload​​​​groovy.lang.Script​​​​binding​​​​@CompileStatic​​​​GroovyObject.getProperty()​

此外,我们还引入了 thebean 引用。 使用此属性,您可以提供任何其他必需的 Groovy 编译器自定义项,例如。 有关此功能的详细信息,请参阅 Groovy 文档以获取高级编译器配置。​​compiler-configuration​​​​ImportCustomizer​

使用不会自动添加注释,并且会覆盖该选项。 如果仍然需要,则应手动将a添加到该自定义中。​​compilerConfiguration​​​​ASTTransformationCustomizer​​​​@CompileStatic​​​​compileStatic​​​​CompileStatic​​​​new ASTTransformationCustomizer(CompileStatic.class)​​​​CompilationCustomizers​​​​compilerConfiguration​

Groovy编译器自定义对选项没有任何影响,可重新加载的脚本也可以静态编译。​​refresh-check-delay​

控制总线

如(企业集成模式)中所述,控制总线背后的思想是,您可以使用与“应用程序级”消息传递相同的消息传递系统来监视和管理框架中的组件。 在 Spring 集成中,我们基于前面描述的适配器进行构建,以便您可以发送消息作为调用公开操作的一种方式。 这些操作的一个选项是Groovy脚本。 以下示例为控制总线配置 Groovy 脚本:

<int-groovy:control-bus input-channel="operationChannel"/>

控制总线有一个输入通道,可以访问该通道以调用应用程序上下文中对 Bean 的操作。

Groovy控制总线在输入通道上以Groovy脚本的形式运行消息。 它接受一条消息,将正文编译为脚本,使用 a 对其进行自定义,然后运行它。 控制总线公开应用程序上下文中所有用 Spring 的接口注释的 bean,并实现 Spring 的接口或扩展 Spring 的基类(例如,几个 and 实现)。​​GroovyObjectCustomizer​​​​MessageProcessor​​​​@ManagedResource​​​​Lifecycle​​​​CustomizableThreadCreator​​​​TaskExecutor​​​​TaskScheduler​

在控制总线命令脚本中使用带有定制作用域(例如“request”)的托管 Bean 时要小心,尤其是在异步消息流中。 如果控制总线无法从应用程序上下文中公开 bean,则在命令脚本运行期间可能会出现一些问题。 例如,如果未建立自定义作用域的上下文,则尝试在该作用域内获取 Bean 会触发 。​​MessageProcessor​​​​BeansException​​​​BeanCreationException​

如果需要进一步自定义 Groovy 对象,还可以提供对通过属性实现的 Bean 的引用,如以下示例所示:​​GroovyObjectCustomizer​​​​customizer​

<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>

<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>

向终结点添加行为

在 Spring Integration 2.2 之前,您可以通过向轮询器的元素添加 AOP 建议来向整个集成流添加行为。 但是,假设您要重试,例如,仅重试 REST Web 服务调用,而不重试任何下游终结点。​​<advice-chain/>​

例如,请考虑以:

inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter

如果在轮询器上的建议链中配置了一些重试逻辑,并且由于网络故障而调用失败,则重试会导致第二次调用两者。 同样,在 jdbc-outout-adapter 中出现暂时性故障后,在再次调用 之前,将再次调用两个 HTTP 网关。​​http-gateway2​​​​http-gateway1​​​​http-gateway2​​​​jdbc-outbound-adapter​

Spring Integration 2.2 增加了向各个端点添加行为的功能。 这是通过将元素添加到许多端点来实现的。 下面的示例演示如何在 a 中元素:​​<request-handler-advice-chain/>​​​​<request-handler-advice-chain/>​​​​outbound-gateway​

<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

在这种情况下,仅本地应用于此网关,不适用于在将回复发送到下游执行的进一步操作。 建议的范围仅限于终结点本身。​​myRetryAdvice​​​​nextChannel​


目前,您无法建议整个端点。 架构不允许 aas 链本身的子元素。​​<chain/>​​​​<request-handler-advice-chain>​



但是,a可以添加到元素中的各个回复生成端点。 一个例外是,在未产生应答的链中,由于链中的最后一个元素是 an,因此无法建议最后一个元素。 如果您需要建议这样的元素,则必须将其移动到链外(链是适配器)。 然后可以像往常一样建议适配器。 对于生成回复的链,可以建议每个子元素。​​<request-handler-advice-chain>​​​​<chain>​​​​outbound-channel-adapter​​​​output-channel​​​​input-channel​


提供咨询课程

除了提供应用 AOP 建议类的一般机制外,Spring 集成还提供了这些开箱即用的建议实现:

  • ​RequestHandlerRetryAdvice​​(在重试建议中所述)
  • ​RequestHandlerCircuitBreakerAdvice​​(在断路器建议中描述)
  • ​ExpressionEvaluatingRequestHandlerAdvice​​(在表达式评估建议中描述)
  • ​RateLimiterRequestHandlerAdvice​​(在速率限制器建议中所述)
  • ​CacheRequestHandlerAdvice​​(在缓存建议中描述)
  • ​ReactiveRequestHandlerAdvice​​(在反应性建议中描述)
重试建议

重试建议 () 利用了 Spring 重试项目提供的丰富重试机制。 的核心组件是,它允许配置复杂的重试场景,包括andstrategies(具有许多实现)以及用于确定重试用尽时要采取的操作的策略。​​o.s.i.handler.advice.RequestHandlerRetryAdvice​​​​spring-retry​​​​RetryTemplate​​​​RetryPolicy​​​​BackoffPolicy​​​​RecoveryCallback​

无状态重试

无状态重试是指重试活动完全在建议中处理的情况。 线程暂停(如果配置为这样做)并重试操作。

有状态重试

有状态重试是指在建议中管理重试状态,但引发异常并且调用方重新提交请求的情况。 有状态重试的一个示例是,我们希望消息发起方(例如,JMS)负责重新提交,而不是在当前线程上执行它。 有状态重试需要某种机制来检测重试的提交。

有关更多信息,请参阅项目的 Javadoc和Spring Batch 的参考文档,其中起源。​​spring-retry​​​​spring-retry​

默认的退避行为是不回退。 将立即尝试重试。 使用导致线程在尝试之间暂停的退避策略可能会导致性能问题,包括内存使用过多和线程不足。 在高容量环境中,应谨慎使用退避策略。

配置重试建议

本节中的示例使用以下始终引发异常的内容:​​<service-activator>​

public class FailingService {

public void service(String message) {
throw new RuntimeException("error");
}
}

简单无状态重试

默认有尝试三次。 没有,因此三次尝试是背靠背进行的,尝试之间没有延迟。 没有,因此结果是在最后一次失败的重试发生后向调用方抛出异常。 在 Spring 集成环境中,可以使用入站端点 anon 来处理此最终异常。 以下示例使用并显示其输出:​​RetryTemplate​​​​SimpleRetryPolicy​​​​BackOffPolicy​​​​RecoveryCallback​​​​error-channel​​​​RetryTemplate​​​​DEBUG​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
</int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3

使用恢复进行简单的无状态重试

以下示例将 aa 添加到前面的示例,并使用 anto 向 anto 发送通道:​​RecoveryCallback​​​​ErrorMessageSendingRecoverer​​​​ErrorMessage​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]

使用自定义策略进行无状态重试和恢复

对于更复杂的情况,我们可以提供定制的建议。 此示例继续使用,但将尝试增加到四个。 它还添加了第一次重试等待一秒钟,第二次重试等待五秒,第三次重试等待 25 次(总共四次尝试)。 以下清单显示了该示例及其输出:​​RetryTemplate​​​​SimpleRetryPolicy​​​​ExponentialBackoffPolicy​​​​DEBUG​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="4" />
</bean>
</property>
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="1000" />
<property name="multiplier" value="5.0" />
<property name="maxInterval" value="60000" />
</bean>
</property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]

命名空间支持无状态重试

从版本 4.0 开始,由于命名空间支持重试建议,可以大大简化上述配置,如以下示例所示:

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<ref bean="retrier" />
</int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,建议被定义为顶级 Bean,以便它可以在多个实例中使用。 您还可以直接在链中定义建议,如以下示例所示:​​request-handler-advice-chain​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:retry-advice>
</int:request-handler-advice-chain>
</int:service-activator>

A可以有 aorchild 元素或没有子元素。 没有子元素的 A不使用退避。 如果没有,则在重试用尽时引发异常。 命名空间只能与无状态重试一起使用。​​<handler-retry-advice>​​​​<fixed-back-off>​​​​<exponential-back-off>​​​​<handler-retry-advice>​​​​recovery-channel​

对于更复杂的环境(自定义策略等),请使用规范定义。​​<bean>​

使用恢复进行简单的有状态重试

为了使重试有状态,我们需要提供带有实现的建议。 此类用于将邮件标识为重新提交,以便可以确定此邮件的当前重试状态。 框架提供了一个,它使用 SpEL 表达式确定消息标识符。 此示例再次使用默认策略(三次尝试,无退避)。 与无状态重试一样,可以自定义这些策略。 以下清单显示了该示例及其输出:​​RetryStateGenerator​​​​RetryTemplate​​​​SpelExpressionRetryStateGenerator​​​​DEBUG​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="retryStateGenerator">
<bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
<constructor-arg value="headers['jms_messageId']" />
</bean>
</property>
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果将前面的示例与无状态示例进行比较,可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。

重试的异常分类

Spring 重试在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置重试所有异常,异常分类器查看顶级异常。 例如,如果将其配置为仅重试,并且应用程序在原因为 a 的位置抛出,则不会发生重试。​​MyException​​​​SomeOtherException​​​​MyException​

从 Spring 重试 1.0.3 开始,有一个属性调用(默认值为)。 当时,它会遍历异常原因,直到找到匹配项或用完遍历的原因。​​BinaryExceptionClassifier​​​​traverseCauses​​​​false​​​​true​

若要使用此分类器重试,请将 acreate 与采用最大尝试次数、theofobjects 和布尔值的构造函数一起使用。 然后,您可以将此策略注入到。​​SimpleRetryPolicy​​​​Map​​​​Exception​​​​traverseCauses​​​​RetryTemplate​

​traverseCauses​​​在这种情况下是必需的,因为用户异常可能包装在 A 中。​​MessagingException​

断路器建议

断路器模式的一般思想是,如果服务当前不可用,请不要浪费时间(和资源)尝试使用它。 实现此模式。 当断路器处于关闭状态时,终结点会尝试调用该服务。 如果连续尝试失败一定次数,断路器将进入打开状态。 当它处于打开状态时,新请求“快速失败”,并且在一段时间到期之前不会尝试调用该服务。​​o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice​

当该时间到期时,断路器将设置为半开路状态。 处于此状态时,即使单次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入关闭状态,在这种情况下,它不会再次进入打开状态,直到再次发生配置的连续失败次数。 任何成功的尝试都会将状态重置为零故障,以确定断路器何时可能再次进入打开状态。

通常,此建议可能用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接超时)。

有两个属性:和。 该属性表示在断路器打开之前需要发生的连续故障数。 它默认为。 该属性表示中断器在尝试另一个请求之前等待的最后一次失败后的时间。 默认值为 1000 毫秒。​​RequestHandlerCircuitBreakerAdvice​​​​threshold​​​​halfOpenAfter​​​​threshold​​​​5​​​​halfOpenAfter​

以下示例配置断路器并显示 itsandoutput:​​DEBUG​​​​ERROR​

<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,阈值设置为 andis 设置为秒。 每 5 秒到达一个新请求。 前两次尝试调用了该服务。 第三个和第四个失败,但异常指示断路器已打开。 尝试了第五个请求,因为该请求是在上次失败后 15 秒。 第六次尝试立即失败,因为断路器立即打开。​​2​​​​halfOpenAfter​​​​12​

表达式评估建议

最后提供的建议类是。 这个建议比其他两个建议更笼统。 它提供了一种机制来计算发送到终结点的原始入站消息上的表达式。 在成功或失败后,可以计算单独的表达式。 或者,可以将包含评估结果的消息与输入消息一起发送到消息通道。​​o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice​

此建议的典型用例可能是使用 ,如果传输成功,则可能将文件移动到一个目录,如果传输失败,则移动到另一个目录:​​<ftp:outbound-channel-adapter/>​

该建议具有在成功时设置表达式、为失败设置表达式以及每个表达式的相应通道的属性。 对于成功案例,发送到 theis an 的消息,有效负载是表达式计算的结果。 调用的附加属性包含发送到处理程序的原始消息。 发送到 the(当处理程序引发异常时)的消息具有有效负载。 与所有实例一样,此有效负载具有属性,以及调用的附加属性,该属性包含表达式计算的结果。​​successChannel​​​​AdviceMessage​​​​inputMessage​​​​failureChannel​​​​ErrorMessage​​​​MessageHandlingExpressionEvaluatingAdviceException​​​​MessagingException​​​​failedMessage​​​​cause​​​​evaluationResult​

从版本 5.1.3 开始,如果配置了通道,但未提供表达式,则默认表达式用于计算消息。​​payload​

在建议范围内引发异常时,默认情况下,在计算 anyis 后,会将该异常抛给调用方。 如果要禁止引发异常,请将属性设置为 。 以下建议显示了如何使用Java DSL进行配置:​​failureExpression​​​​trapException​​​​true​​​​advice​

@SpringBootApplication
public class EerhaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}

@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}

@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}

@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}

}

限速器建议

速率限制器建议 () 允许确保端点不会因请求而过载。 当超出速率限制时,请求将进入阻止状态。​​RateLimiterRequestHandlerAdvice​

此建议的典型用例可能是外部服务提供商不允许每分钟超过请求数。​​n​

该实现完全基于​​Resilience4j​​项目,并且需要非此即彼的注入。 也可以配置默认值和/或自定义名称。​​RateLimiterRequestHandlerAdvice​​​​RateLimiter​​​​RateLimiterConfig​

以下示例配置速率限制器建议,每 1 秒一个请求:

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}

缓存建议

从版本 5.2 开始,引入了。 它基于Spring 框架中的缓存抽象,并与注释系列提供的概念和功能保持一致。 内部逻辑基于扩展,其中缓存操作的代理是围绕方法完成的,请求作为参数。 可以使用 SpEL 表达式或 ato 来评估缓存键来配置此建议。 请求可用作 SpEL 评估上下文的根对象,或用作输入参数。 默认情况下,请求消息用于缓存键。 当默认缓存操作为 a 时,必须配置一组任意操作。 每个都可以单独配置或具有共享选项,例如 a,and,可以从配置中重用。 这个配置功能类似于Spring Framework的sandannotation组合。 如果未提供 ais ,则默认情况下从 thein 解析单个 bean。​​CacheRequestHandlerAdvice​​​​@Caching​​​​CacheAspectSupport​​​​AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage​​​​Message<?>​​​​Function​​​​Message<?>​​​​Function​​​​payload​​​​CacheRequestHandlerAdvice​​​​cacheNames​​​​CacheableOperation​​​​CacheOperation​​​​CacheOperation​​​​CacheManager​​​​CacheResolver​​​​CacheErrorHandler​​​​CacheRequestHandlerAdvice​​​​@CacheConfig​​​​@Caching​​​​CacheManager​​​​BeanFactory​​​​CacheAspectSupport​

以下示例使用不同的缓存操作集配置两个建议:

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}

反应性建议

从版本 5.3 开始,acan 可用于生成 areplies 的请求消息处理程序。 必须为此建议提供 A,并且根据截获方法实现产生的回复从运算符调用它。 通常,当我们想要通过类似的支持运营商控制网络波动时,这种定制是必要的。 例如,当我们可以通过 WebFlux 客户端发出 HTTP 请求时,我们可以使用以下配置来等待响应不超过 5 秒:​​ReactiveRequestHandlerAdvice​​​​Mono​​​​BiFunction<Message<?>, Mono<?>, Publisher<?>>​​​​Mono.transform()​​​​handleRequestMessage()​​​​Mono​​​​timeout()​​​​retry()​

.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));

参数是消息处理程序的请求消息,可用于确定请求范围属性。 参数是此消息处理程序的方法实现的结果。 也可以从此函数调用嵌套以应用,例如,无功断路器。​​message​​​​mono​​​​handleRequestMessage()​​​​Mono.transform()​

定制建议课程

除了前面描述的建议类之外,您还可以实施自己的建议类。 虽然你可以提供任何实现(通常),但我们通常建议你子类。 这样做的好处是避免编写面向方面的低级编程代码,并提供专门为在此环境中使用而定制的起点。​​org.aopalliance.aop.Advice​​​​org.aopalliance.intercept.MethodInterceptor​​​​o.s.i.handler.advice.AbstractRequestHandlerAdvice​

子类需要实现该方法,其定义如下:​​doInvoke()​

/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;

回调参数便于避免直接处理 AOP 的子类。 调用该方法将调用消息处理程序。​​callback.execute()​

为那些需要维护特定处理程序的状态的子类提供了参数,也许是通过在目标的键控中维护该状态。 此功能允许将相同的建议应用于多个处理程序。 建议这样做以保持每个处理程序的断路器状态。​​target​​​​Map​​​​RequestHandlerCircuitBreakerAdvice​

参数是发送到处理程序的消息。 虽然建议无法在调用处理程序之前修改消息,但它可以修改有效负载(如果它具有可变属性)。 通常,建议会使用消息进行日志记录,或者在调用处理程序之前或之后的某个位置发送消息的副本。​​message​

返回值通常是返回的值。 但是,该建议确实能够修改返回值。 请注意,只有实例返回值。 以下示例显示了一个扩展的自定义建议类:​​callback.execute()​​​​AbstractReplyProducingMessageHandler​​​​AbstractRequestHandlerAdvice​

public class MyAdvice extends AbstractRequestHandlerAdvice {

@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}


除了方法之外,还提供了另外一种方法: 此方法必须在调用可能在单次执行中多次调用的情况下使用,例如在 中。 这是必需的,因为 Spring AOP对象通过跟踪链中的哪个建议上次调用来维护状态。 必须为每个调用重置此状态。​​execute()​​​​ExecutionCallback​​​​cloneAndExecute()​​​​doInvoke()​​​​RequestHandlerRetryAdvice​​​​org.springframework.aop.framework.ReflectiveMethodInvocation​



有关更多信息,请参阅ReflectiveMethodInvocationJavadoc。


其他建议链元素

虽然上面提到的抽象类很方便,但您可以将任何内容(包括交易建议)添加到链中。​​Advice​

处理消息建议

如本节简介中所述,请求处理程序建议链中的建议对象仅应用于当前终结点,而不应用于下游流(如果有)。 对于生成回复的对象(例如扩展的对象),建议将应用于内部方法:(调用自)。 对于其他消息处理程序,将应用该建议。​​MessageHandler​​​​AbstractReplyProducingMessageHandler​​​​handleRequestMessage()​​​​MessageHandler.handleMessage()​​​​MessageHandler.handleMessage()​

在某些情况下,即使消息处理程序是 ,也必须将建议应用于该方法。 例如,幂等接收器可能会返回,如果处理程序的属性设置为 ,这将导致异常。 另一个示例是 — 请参阅严格消息排序。​​AbstractReplyProducingMessageHandler​​​​handleMessage​​​​null​​​​replyRequired​​​​true​​​​BoundRabbitChannelAdvice​

从版本 4.3.1 开始,引入了 newinterface 及其基本实现 ()。​​HandleMessageAdvice​​​​AbstractHandleMessageAdvice​​​​Advice​​​​HandleMessageAdvice​​​​handleMessage()​

重要的是要了解,当应用于返回响应的处理程序时,实现(例如幂等接收器)与方法分离并正确应用于方法。​​HandleMessageAdvice​​​​adviceChain​​​​MessageHandler.handleMessage()​

由于这种脱离关系,建议链订单不被遵守。

请考虑以下配置:

<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>

在前面的示例中,应用于。 但是,申请到。 因此,在 之前调用它。 要保留顺序,您应该遵循标准的Spring AOP配置方法,并使用端点和后缀来获取目标 Bean。 请注意,在这种情况下,整个下游流都在事务范围内。​​<tx:advice>​​​​AbstractReplyProducingMessageHandler.handleRequestMessage()​​​​myHandleMessageAdvice​​​​MessageHandler.handleMessage()​​​​<tx:advice>​​​​id​​​​.handler​​​​MessageHandler​

在 athat 不返回响应的情况下,将保留建议链顺序。​​MessageHandler​

从版本 5.3 开始,提供了应用任何方法,因此应用整个子流。 例如,a可以应用于从某个端点开始的整个子流;默认情况下,这是不可能的,因为使用方终结点仅将建议应用于。​​HandleMessageAdviceAdapter​​​​MethodInterceptor​​​​MessageHandler.handleMessage()​​​​RetryOperationsInterceptor​​​​AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()​

交易支持

从 5.0 版开始,由于实现了,引入了一个 new,使整个下游流成为事务性。 当在元素中使用 regularis 时(例如,通过配置),启动的事务仅应用于内部事务,并且不会传播到下游流。​​TransactionHandleMessageAdvice​​​​HandleMessageAdvice​​​​TransactionInterceptor​​​​<request-handler-advice-chain>​​​​<tx:advice>​​​​AbstractReplyProducingMessageHandler.handleRequestMessage()​

为了简化XML配置,aelement已添加到allandand相关组件中。 以下示例显示了使用:​​<request-handler-advice-chain>​​​​<transactional>​​​​<outbound-gateway>​​​​<service-activator>​​​​<transactional>​

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果您熟悉JPA 集成组件,这样的配置并不新鲜,但现在我们可以从流程中的任何点启动事务 — 而不仅仅是从消息驱动的通道适配器(如JMS)启动事务。​​<poller>​

Java 配置可以通过使用 来简化,并且结果 Bean 名称可以在消息传递注释属性中使用,如以下示例所示:​​TransactionInterceptorBuilder​​​​adviceChain​

@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}

注意构造函数上的参数。 它导致创建,而不是规则。​​true​​​​TransactionInterceptorBuilder​​​​TransactionHandleMessageAdvice​​​​TransactionInterceptor​

Java DSL 支持端点配置上的选项,如以下示例所示:​​Advice​​​​.transactional()​

@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}

建议过滤器

在提供建议时,还有一个额外的考虑因素。 默认情况下,任何丢弃操作(当筛选器返回时)都在建议链的范围内执行。 这可能包括丢弃通道下游的所有流。 因此,例如,如果丢弃通道下游的元素引发异常,并且存在重试建议,则会重试该过程。 此外,ifis 设置为 (异常在建议范围内抛出)。​​Filter​​​​false​​​​throwExceptionOnRejection​​​​true​

设置修改此行为,丢弃(或异常)在调用建议链后发生。​​discard-within-advice​​​​false​

使用注释为端点提供建议

使用注释(,,, and)配置某些端点时,可以在属性中为建议链提供 Bean 名称。 此外,注释还具有属性,可用于配置丢弃行为,如建议过滤器中所述。 以下示例导致在建议之后执行丢弃:​​@Filter​​​​@ServiceActivator​​​​@Splitter​​​​@Transformer​​​​adviceChain​​​​@Filter​​​​discardWithinAdvice​

@MessageEndpoint
public class MyAdvisedFilter {

@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}

在咨询链中订购建议

建议类是“围绕”建议的,并以嵌套的方式应用。 第一个建议是最外层的,而最后一个建议是最内层的(即最接近被建议的处理程序)。 重要的是将建议类按正确的顺序排列,以实现您想要的功能。

例如,假设您要添加重试建议和事务建议。 您可能希望首先放置重试建议,然后放置事务建议。 因此,每次重试都会在新事务中执行。 另一方面,如果您希望将所有尝试和任何恢复操作(在重试中)的范围限定在事务中,则可以将事务建议放在首位。​​RecoveryCallback​

建议的处理程序属性

有时,从建议中访问处理程序属性很有用。 例如,大多数处理程序实现允许您访问组件名称。​​NamedComponent​

目标对象可以通过参数(子类化时)或(实现时)访问。​​target​​​​AbstractRequestHandlerAdvice​​​​invocation.getThis()​​​​org.aopalliance.intercept.MethodInterceptor​

当建议整个处理程序时(例如,当处理程序不生成回复或建议实现时),可以将目标对象强制转换为接口,如以下示例所示:​​HandleMessageAdvice​​​​NamedComponent​

String componentName = ((NamedComponent) target).getComponentName();

直接实现时,可以按如下方式强制转换目标对象:​​MethodInterceptor​

String componentName = ((NamedComponent) invocation.getThis()).getComponentName();

当仅建议使用该方法时(在回复生成处理程序中),您需要访问完整的处理程序,即 an。 以下示例演示如何执行此操作:​​handleRequestMessage()​​​​AbstractReplyProducingMessageHandler​

AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();

String componentName = handler.getComponentName();

幂等接收器企业集成模式

从版本 4.1 开始,Spring 集成提供了幂等接收器企业集成模式的实现。 它是一种功能模式,整个幂等逻辑应在应用程序中实现。 但是,为了简化决策,提供了组件。 这是一个AOP应用于该方法,并且可以根据其配置请求消息或将其标记为a。​​IdempotentReceiverInterceptor​​​​Advice​​​​MessageHandler.handleMessage()​​​​filter​​​​duplicate​

以前,例如,您可以使用自定义 a(请参阅筛选器)来实现此模式。 但是,由于此模式真正定义了终结点的行为,而不是终结点本身,因此幂等接收器实现不提供终结点组件。 相反,它应用于应用程序中声明的终结点。​​MessageSelector​​​​<filter/>​

的逻辑是基于提供的,如果该选择器不接受该消息,则会使用设置为的标头来丰富它。 目标(或下游流)可以参考此标头来实现正确的幂等性逻辑。 如果配置了 aor,则不会将重复的消息发送到目标。 相反,它被丢弃了。 如果要丢弃(不执行任何操作)重复的消息,则应使用 a,例如 defaultbean。​​IdempotentReceiverInterceptor​​​​MessageSelector​​​​duplicateMessage​​​​true​​​​MessageHandler​​​​IdempotentReceiverInterceptor​​​​discardChannel​​​​throwExceptionOnRejection = true​​​​MessageHandler.handleMessage()​​​​discardChannel​​​​NullChannel​​​​nullChannel​

为了维护消息之间的状态并提供比较消息幂等性的能力,我们提供了。 它接受实现(基于创建查找键)和可选(元数据存储)。 有关更多信息,请参阅MetadataStoreSelectorJavadoc。 您还可以使用附加的自定义。 默认情况下,使用消息标头。​​MetadataStoreSelector​​​​MessageProcessor​​​​Message​​​​ConcurrentMetadataStore​​​​value​​​​ConcurrentMetadataStore​​​​MessageProcessor​​​​MetadataStoreSelector​​​​timestamp​

通常,如果键没有现有值,选择器会选择要接受的消息。 在某些情况下,比较键的当前值和新值以确定是否应接受消息非常有用。 从版本 5.3 开始,提供了引用 a;第一个参数是旧值;返回以接受消息并将旧值替换为中的新值。 这对于减少密钥数量很有用;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。 然后,在重新启动后,您可以跳过已处理的行。 有关示例,请参阅幂等下游处理拆分文件。​​compareValues​​​​BiPredicate<String, String>​​​​true​​​​MetadataStore​

为方便起见,选项可直接在组件上配置。 以下清单显示了所有可能的属性:​​MetadataStoreSelector​​​​<idempotent-receiver>​

<idempotent-receiver
id=""
endpoint=""
selector=""
discard-channel=""
metadata-store=""
key-strategy=""
key-expression=""
value-strategy=""
value-expression=""
compare-values=""
throw-exception-on-rejection="" />

憨豆的 ID。 自选。​​IdempotentReceiverInterceptor​

应用此拦截器的使用者终结点名称或模式。 用逗号 () 分隔名称(模式),例如。 然后,与这些模式匹配的端点 Bean 名称用于检索目标端点的 bean(使用 itss后缀),并将这些 bean 应用于这些 bean。 必填。​​,​​​​endpoint="aaa, bbb*, ccc, *ddd, eee*fff"​​​​MessageHandler​​​​.handler​​​​IdempotentReceiverInterceptor​

阿豆参考。 相互排斥。 未提供时,需要 1 个 oforis。​​MessageSelector​​​​metadata-store​​​​key-strategy (key-expression)​​​​selector​​​​key-strategy​​​​key-strategy-expression​

标识不接受消息时要向其发送消息的通道。 省略时,重复的消息将转发到 aheader 的处理程序。 自选。​​IdempotentReceiverInterceptor​​​​duplicateMessage​

偏好。 由底层证券使用。 相互排斥。 自选。 默认值使用内部的不跨应用程序执行维护状态。​​ConcurrentMetadataStore​​​​MetadataStoreSelector​​​​selector​​​​MetadataStoreSelector​​​​SimpleMetadataStore​

偏好。 由底层证券使用。 从请求消息中计算。 相互排斥。 如果未提供 ais,则需要一个 oforis。​​MessageProcessor​​​​MetadataStoreSelector​​​​idempotentKey​​​​selector​​​​key-expression​​​​selector​​​​key-strategy​​​​key-strategy-expression​

要填充的 SpEL 表达式。 由底层证券使用。 使用请求消息作为评估上下文根对象来评估对象。 相互排斥。 如果未提供 ais,则需要一个 oforis。​​ExpressionEvaluatingMessageProcessor​​​​MetadataStoreSelector​​​​idempotentKey​​​​selector​​​​key-strategy​​​​selector​​​​key-strategy​​​​key-strategy-expression​

偏好。 由底层证券使用。 从请求消息中计算 afor。 相互排斥。 默认情况下,“元数据存储选择器”使用“时间戳”消息标头作为元数据“值”。​​MessageProcessor​​​​MetadataStoreSelector​​​​value​​​​idempotentKey​​​​selector​​​​value-expression​

要填充的 SpEL 表达式。 由底层证券使用。 通过使用请求消息作为评估上下文根对象来评估 afor。 相互排斥。 默认情况下,“元数据存储选择器”使用“时间戳”消息标头作为元数据“值”。​​ExpressionEvaluatingMessageProcessor​​​​MetadataStoreSelector​​​​value​​​​idempotentKey​​​​selector​​​​value-strategy​

对 abean 的引用,默认情况下,它允许您通过比较键的新旧值来选择消息。​​BiPredicate<String, String>​​​​null​

如果出现消息,是否引发异常。 默认为。 无论是否提供 ais 都适用。​​IdempotentReceiverInterceptor​​​​false​​​​discard-channel​

对于 Java 配置,Spring Integration 提供了方法级注释。 它用于标记具有消息传递注释(,对象应用于此终结点。 以下示例演示如何使用注释:​​@IdempotentReceiver​​​​method​​​​@ServiceActivator​​​​@Router, and others) to specify which `IdempotentReceiverInterceptor​​​​@IdempotentReceiver​

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}

使用 Java DSL 时,可以将拦截器添加到端点的建议链中,如以下示例所示:

@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}

Theis 专为该方法而设计。 从版本 4.3.1 开始,它以 theas 作为基类实现,以实现更好的解离。 有关详细信息,请参阅处理消息建议​。​​IdempotentReceiverInterceptor​​​​MessageHandler.handleMessage(Message<?>)​​​​HandleMessageAdvice​​​​AbstractHandleMessageAdvice​

举报

相关推荐

0 条评论