0
点赞
收藏
分享

微信扫一扫

如何捕获 Spring 集成的指标

如何捕获 Spring 集成的指标_spring

本节介绍如何捕获 Spring 集成的指标。 在最近的版本中,我们更多地依赖千分尺(见https://micrometer.io),我们计划在未来的版本中更多地使用千分尺。

在高容量环境中禁用日志记录

您可以在主消息流中控制调试日志记录。 在非常大容量的应用程序中,对于某些日志记录子系统,调用 to 可能非常昂贵。 您可以禁用所有此类日志记录以避免此开销。 异常日志记录(调试或其他)不受此设置的影响。​​isDebugEnabled()​

以下清单显示了用于控制日志记录的可用选项:

爪哇岛

.XML

@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)

public static class ContextConfiguration {
...
}

设置为禁用主消息流中的所有日志记录,而不考虑日志系统类别设置。 设置为“true”以启用调试日志记录(如果日志记录子系统也启用了)。 仅当尚未在 Bean 定义中显式配置设置时才应用。 默认值为。​​false​​​​true​

​defaultLoggingEnabled​​仅当尚未在 Bean 定义中显式配置相应的设置时,才应用。

千分尺集成

概述

从版本 5.0.3 开始,应用程序上下文中存在千分尺会触发对千分尺指标的支持。​​MeterRegistry​

要使用千分尺,请将其中一个 thebean 添加到应用程序上下文中。​​MeterRegistry​

对于每个和,都会注册计时器。 对于每个计数器,将注册一个计数器。​​MessageHandler​​​​MessageChannel​​​​MessageSource​

这仅适用于扩展的对象,和(大多数框架组件都是这种情况)。​​AbstractMessageHandler​​​​AbstractMessageChannel​​​​AbstractMessageSource​

用于消息通道上发送操作的计量具有以下名称或标记:​​Timer​

  • ​name​​: spring.integration.send
  • ​tag​​: type:channel
  • ​tag​​: name:<componentName>
  • ​tag​​: result:(success|failure)
  • ​tag​​: exception:(none|exception simple class name)
  • ​description​​: Send processing time

(带有异常的结果表示返回通道的操作。​​failure​​​​none​​​​send()​​​​false​

用于在可轮询消息通道上执行接收操作的计量具有以下名称或标记:​​Counter​

  • ​name​​: spring.integration.receive
  • ​tag​​: type:channel
  • ​tag​​: name:<componentName>
  • ​tag​​: result:(success|failure)
  • ​tag​​: exception:(none|exception simple class name)
  • ​description​​: Messages received

消息处理程序操作的计量具有以下名称或标记:​​Timer​

  • ​name​​: spring.integration.send
  • ​tag​​: type:handler
  • ​tag​​: name:<componentName>
  • ​tag​​: result:(success|failure)
  • ​tag​​: exception:(none|exception simple class name)
  • ​description​​: Send processing time

消息源的计量具有以下名称/标记:​​Counter​

  • ​name​​: spring.integration.receive
  • ​tag​​: type:source
  • ​tag​​: name:<componentName>
  • ​tag​​: result:success
  • ​tag​​: exception:none
  • ​description​​: Messages received

此外,还有三个仪表:​​Gauge​

  • ​spring.integration.channels​​:应用程序中的编号。MessageChannels
  • ​spring.integration.handlers​​:应用程序中的编号。MessageHandlers
  • ​spring.integration.sources​​:应用程序中的编号。MessageSources

可以通过提供 的子类来自定义由集成组件创建的名称和标记。 MicrometerCustomMetricsTests测试用例展示了如何做到这一点的简单示例。 您还可以通过在构建器子类上重载方法来进一步自定义计量。​​Meters​​​​MicrometerMetricsCaptor​​​​build()​

从版本 5.1.13 开始,公开队列大小和剩余容量的千分尺:​​QueueChannel​

  • ​name​​: spring.integration.channel.queue.size
  • ​tag​​: type:channel
  • ​tag​​: name:<componentName>
  • ​description​​: The size of the queue channel

  • ​name​​: spring.integration.channel.queue.remaining.capacity
  • ​tag​​: type:channel
  • ​tag​​: name:<componentName>
  • ​description​​: The remaining capacity of the queue channel
禁用仪表

默认情况下,所有计量在首次使用时都会注册。 现在,使用千分尺,您可以添加到防止部分或全部注册。 您可以按提供的任何属性过滤(拒绝)仪表,,,等。 有关详细信息,请参阅千分尺文档中的仪表过滤器。​​MeterFilter​​​​MeterRegistry​​​​name​​​​tag​

例如,给定:

@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}

您可以使用以下方法仅禁止此通道的计量注册:

registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));

千分尺观察

从6.0版开始,Spring Integration利用了千分尺观察抽象,该抽象可以通过适当的配置处理度量和跟踪。​​ObservationHandler​

只要应用程序上下文中存在 anbean 并配置了 anis,就会在组件上启用观察处理。 要自定义应检测的组件集,属性会在注释中公开。 有关模式匹配算法,请参阅其javadocs。​​IntegrationManagement​​​​ObservationRegistry​​​​@EnableIntegrationManagement​​​​observationPatterns()​​​​@EnableIntegrationManagement​

默认情况下,不会使用 anbean 检测任何组件。 可以配置为匹配所有组件。​​IntegrationManagement​​​​ObservationRegistry​​​​*​

在这种情况下,计量器不是单独收集的,而是委托给提供的适当配置。​​ObservationHandler​​​​ObservationRegistry​

以下 Spring 集成组件使用观察逻辑进行检测,每个组件都有各自的约定:

  • ​MessageProducerSupport​​作为流的入站端点,被视为 aspan 类型并使用 API;CONSUMERIntegrationObservation.HANDLER
  • MessagingGatewaySupport' 是一个入站请求-回复终结点,被视为 aspan 类型。 它使用API;SERVERIntegrationObservation.GATEWAY
  • Anoperation是唯一一个生成消息的Spring Integration API。 因此,它被视为 aspan 类型并使用 API。 当通道是分布式实现(例如 or)并且必须将跟踪信息添加到消息中时,这更有意义。 因此,观察是基于 Spring Integration 提供 ato 允许后续跟踪添加标头,以便消费者可以使用它们;AbstractMessageChannel.send()PRODUCERIntegrationObservation.PRODCUERPublishSubscribeKafkaChannelZeroMqChannelIntegrationObservation.PRODUCERMessageSenderContextMutableMessagePropagator
  • Anis aspan 类型并使用 API。AbstractMessageHandlerCONSUMERIntegrationObservation.HANDLER

组件上的观察生产可以通过配置进行定制。 例如,anexpect avia itsAPI。​​IntegrationManagement​​​​ObservationConvention​​​​AbstractMessageHandler​​​​MessageReceiverObservationConvention​​​​setObservationConvention()​

以下是观察 API 支持的指标、范围和约定:

可观测性 - 指标

您可以在下面找到此项目声明的所有指标的列表。

网关


对入站消息网关的观察。


指标名称(由约定类定义)。类型。​​spring.integration.gateway​​​​o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention​​​​timer​

指标名称(由约定类定义)。类型。​​spring.integration.gateway.active​​​​o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention​​​​long task timer​

启动观察后添加的键值可能从 *.active 指标中丢失。

千分尺内部用于基础单元。但是,每个后端确定实际的基本单位。(即普罗米修斯使用秒)​​nanoseconds​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 1.低基数键



名字





描述





​spring.integration.name​​ (必填)





消息网关组件的名称。





​spring.integration.outcome​​ (必填)





请求/回复执行的结果。





​spring.integration.type​​ (必填)





组件的类型 - “网关”。



处理器


消息处理程序的观察。


指标名称(由约定类定义)。类型。​​spring.integration.handler​​​​o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention​​​​timer​

指标名称(由约定类定义)。类型。​​spring.integration.handler.active​​​​o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention​​​​long task timer​

启动观察后添加的键值可能从 *.active 指标中丢失。

千分尺内部用于基础单元。但是,每个后端确定实际的基本单位。(即普罗米修斯使用秒)​​nanoseconds​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 2.低基数键



名字





描述





​spring.integration.name​​ (必填)





消息处理程序组件的名称。





​spring.integration.type​​ (必填)





组件的类型 - “处理程序”。



制作人


观察消息生产者,例如渠道。


指标名称(由约定类定义)。类型。​​spring.integration.producer​​​​o.s.i.support.management.observation.DefaultMessageSenderObservationConvention​​​​timer​

指标名称(由约定类定义)。类型。​​spring.integration.producer.active​​​​o.s.i.support.management.observation.DefaultMessageSenderObservationConvention​​​​long task timer​

启动观察后添加的键值可能从 *.active 指标中丢失。

千分尺内部用于基础单元。但是,每个后端确定实际的基本单位。(即普罗米修斯使用秒)​​nanoseconds​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 3.低基数键



名字





描述





​spring.integration.name​​ (必填)





消息处理程序组件的名称。





​spring.integration.type​​ (必填)





组件的类型 - “生产者”。



可观测性 - 跨度

您可以在下面找到该项目声明的所有跨度的列表。

网关跨度


对入站消息网关的观察。


跨度名称(由约定类定义)。​​spring.integration.gateway​​​​o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 4.标签键

名字

描述

​spring.integration.name​​ (必填)

消息网关组件的名称。

​spring.integration.outcome​​ (必填)

请求/回复执行的结果。

​spring.integration.type​​ (必填)

组件的类型 - “网关”。

处理器跨度


消息处理程序的观察。


跨度名称(由约定类定义)。​​spring.integration.handler​​​​o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 5.标签键

名字

描述

​spring.integration.name​​ (必填)

消息处理程序组件的名称。

​spring.integration.type​​ (必填)

组件的类型 - “处理程序”。

制片人跨度


观察消息生产者,例如渠道。


跨度名称(由约定类定义)。​​spring.integration.producer​​​​o.s.i.support.management.observation.DefaultMessageSenderObservationConvention​

封闭类的完全限定名称。​​o.s.i.support.management.observation.IntegrationObservation​

所有标签必须以前缀为前缀!​​spring.integration.​

表 6.标签键

名字

描述

​spring.integration.name​​ (必填)

消息处理程序组件的名称。

​spring.integration.type​​ (必填)

组件的类型 - “生产者”。

可观测性 - 约定

您可以在下面找到该项目声明的alland列表。​​GlobalObservabilityConventions​​​​ObservabilityConventions​

表 7.观察公约的实施

观察约定类名

适用的观察上下文类名

​o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention​

​MessageReceiverContext​

​o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention​

​MessageRequestReplyReceiverContext​

​o.s.i.support.management.observation.DefaultMessageSenderObservationConvention​

​MessageSenderContext​

观测传播

为了在一个跟踪中提供连接的跨度链,独立于消息流的性质,Spring 集成提供了一个实现。 这可以单独配置 onbean 或作为具有相关 bean 名称模式匹配的 a。 此拦截器的目标是独立于实现和性质,将生产者线程从生产者线程传播到使用者线程。 但是,A 被忽略,因为它的使用者直接在生产者线程上执行。​​ObservationPropagationChannelInterceptor​​​​MessageChannnel​​​​@GlobalChannelInterceptor​​​​MessageChannnel​​​​Observation​​​​MessageChannnel​​​​DirectChannel​

消息历史记录

消息传递体系结构的主要优点是松散耦合,这样参与的组件就不会保持对彼此的任何感知。 仅这一事实就使应用程序非常灵活,允许您在不影响流的其余部分的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等。 然而,当出现问题时,这种不起眼的建筑风格可能会被证明是困难的。 调试时,您可能希望尽可能多地获取有关消息的信息(其来源、它遍历的通道以及其他详细信息)。

消息历史记录是其中一种模式,它为您提供了一个选项,以保持对消息路径的某种程度的感知,以便进行调试或维护审核跟踪。 Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加标头并在每次消息通过跟踪组件时更新该标头。

消息历史记录配置

要启用消息历史记录,您只需在配置中定义元素 (or),如以下示例所示:​​message-history​​​​@EnableMessageHistory​

@Configuration
@EnableIntegration
@EnableMessageHistory

现在,每个命名组件(定义了“id”)都会被跟踪。 该框架在消息中设置“历史记录”标头。 其值 a。​​List<Properties>​

请考虑以下配置示例:

@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}

@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}

上述配置生成一个简单的消息历史记录结构,其输出类似于以下内容:

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]

要访问消息历史记录,您只需访问标头。 以下示例演示如何执行此操作:​​MessageHistory​

Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

您可能不希望跟踪所有组件。 要根据某些组件的名称将历史记录限制为某些组件,您可以提供属性并指定与要跟踪的组件匹配的组件名称和模式的逗号分隔列表。 以下示例演示如何执行此操作:​​tracked-components​

@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")

在前面的示例中,仅为以“网关”结尾、以“sample”开头或与名称“aName”完全匹配的组件维护消息历史记录。

此外,thebean 现在由 JMX MBean 公开(参见MBean 导出器),允许您在运行时更改模式。 但请注意,必须停止 Bean(关闭消息历史记录)才能更改模式。 此功能对于临时打开历史记录以分析系统可能很有用。 MBean 的对象名称为。​​MessageHistoryConfigurer​​​​IntegrationMBeanExporter​​​​<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer​

在应用程序上下文中,必须将一个(或)声明为组件跟踪配置的单一源。 不要使用通用的 Bean 定义。​​@EnableMessageHistory​​​​<message-history/>​​​​MessageHistoryConfigurer​

根据定义,消息历史记录标头是不可变的(不能重写历史记录)。 因此,在写入消息历史记录值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史记录,对其进行修改并在回复消息上设置新列表。 在任一情况下,即使消息本身跨越线程边界,也可以追加值。 这意味着历史值可以大大简化异步消息流中的调试。

邮件存储

企业集成模式 (EIP) 一书确定了几种能够缓冲消息的模式。 例如,聚合器缓冲消息直到可以释放,并缓冲消息,直到使用者从该通道显式收到这些消息。 由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件还会引入消息可能丢失的点。​​QueueChannel​

为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件将消息存储在某种类型的持久存储(如 RDBMS)中。

Spring 集成通过以下方式提供对消息存储库模式的支持:

  • 定义策略接口org.springframework.integration.store.MessageStore
  • 提供此接口的多种实现
  • 在能够缓冲消息的所有组件上公开 aattribute,以便您可以注入实现接口的任何实例。message-storeMessageStore

有关如何配置特定消息存储实现以及如何将实现注入特定缓冲组件的详细信息在整个手册中都有描述(请参阅特定组件,例如QueueChannel、聚合器、Delayer 等)。 以下一对示例演示如何为聚合器添加对 aand 的邮件存储的引用:​​MessageStore​​​​QueueChannel​

例 1.队列通道

<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>

例 2.聚合

<int:aggregator … message-store="refToMessageStore"/>

默认情况下,消息通过使用 的实现存储在内存中。 这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失不是问题。 但是,典型的生产应用程序需要一个更可靠的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。 因此,我们还为各种数据存储提供了实现。 以下是支持的实现的完整列表:​​o.s.i.store.SimpleMessageStore​​​​MessageStore​​​​MessageStore​

  • Hazelcast 消息存储库:使用 Hazelcast 分布式缓存来存储消息
  • JDBC 消息存储库:使用 RDBMS 存储消息
  • Redis 消息存储库:使用 Redis 键/值数据存储存储消息
  • MongoDB消息存储:使用MongoDB文档存储来存储消息


但是,在使用持久实现时请注意一些限制。​​MessageStore​



消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于 的实现。 例如,使用时,默认情况下仅保留数据。 在这种情况下,在序列化发生之前将删除不可序列化的标头。 此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。 例如,将 HTTP 标头映射到消息标头,其中一个是不可序列化实例。 但是,您可以将自己的 andstrategy 接口实现注入到某些实现中(例如),以更改序列化和反序列化的行为。​​MessageStore​​​​JdbcMessageStore​​​​Serializable​​​​<http:inbound-channel-adapter/>​​​​ArrayList​​​​org.springframework.http.MediaType​​​​Serializer​​​​Deserializer​​​​MessageStore​​​​JdbcMessageStore​



请特别注意表示某些类型数据的标头。 例如,如果其中一个标头包含某个 Spring Bean 的实例,则在反序列化时,您最终可能会得到该 Bean 的不同实例,这直接影响框架创建的一些隐式标头(例如 asor)。 目前,它们不可序列化,但即使可序列化,反序列化通道也不会表示预期的实例。​​REPLY_CHANNEL​​​​ERROR_CHANNEL​



从 Spring 集成版本 3.0 开始,您可以使用配置的标头丰富器来解决此问题,以便在向 注册通道后将这些标头替换为名称。​​HeaderChannelRegistry​



此外,请考虑按如下方式配置消息流时会发生什么情况:网关→队列通道(由持久消息存储库提供支持)→服务激活器。 该网关创建一个临时回复通道,当服务激活器的轮询器从队列中读取时,该通道将丢失。 同样,您可以使用标头扩充器将标头替换为表示形式。​​String​



有关详细信息,请参阅标头扩充器。


Spring Integration 4.0引入了两个新接口:

  • ​ChannelMessageStore​​:实现特定于实例的操作QueueChannel
  • ​PriorityCapableChannelMessageStore​​:标记要用于实例的实现,并为持久化消息提供优先级顺序。MessageStorePriorityChannel

实际行为取决于实现。 该框架提供了以下实现,这些实现可以用作持久化:​​MessageStore​​​​QueueChannel​​​​PriorityChannel​

  • Redis 通道消息存储库
  • MongoDB通道消息存储
  • 支持消息通道

关于以下方面的注意事项​​SimpleMessageStore​


从版本 4.1 开始,呼叫时不再复制消息组。 对于大型消息组,这是一个严重的性能问题。 4.0.1 引入了一个布尔属性,允许您控制此行为。 当聚合器在内部使用时,此属性设置为提高性能。 默认情况下,它是现在。​​SimpleMessageStore​​​​getMessageGroup()​​​​copyOnGet​​​​false​​​​false​



在组件(如聚合器)之外访问组存储的用户现在可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部操纵组可能会导致不可预知的结果。



因此,不应执行此类操作或将属性设置为。​​copyOnGet​​​​true​


用​​MessageGroupFactory​

从版本 4.3 开始,可以注入一些实现的自定义策略来创建和自定义使用的实例。 这默认为 a,它基于 () 内部集合生成实例。 其他可能的选项是 and,其中最后一个可用于恢复以前的行为。 此外,该选项可用。 有关详细信息,请参阅下一节。 从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,该选项也可用。​​MessageGroupStore​​​​MessageGroupFactory​​​​MessageGroup​​​​MessageGroupStore​​​​SimpleMessageGroupFactory​​​​SimpleMessageGroup​​​​GroupType.HASH_SET​​​​LinkedHashSet​​​​SYNCHRONISED_SET​​​​BLOCKING_QUEUE​​​​SimpleMessageGroup​​​​PERSISTENT​​​​LIST​

持久和延迟加载​​MessageGroupStore​

从版本 4.3 开始,所有持久实例都以延迟加载方式从存储中检索实例及其实例。 在大多数情况下,它对于关联实例很有用(请参阅聚合器和重新排序器),当它在每个关联操作上从存储加载整个存储时会增加开销。​​MessageGroupStore​​​​MessageGroup​​​​messages​​​​MessageHandler​​​​MessageGroup​

您可以使用该选项从配置中关闭延迟加载行为。​​AbstractMessageGroupStore.setLazyLoadMessageGroups(false)​

我们对MongoDB(MongoDB消息存储)和(聚合器)上的延迟加载的性能测试使用类似于以下内容的自定义:​​MessageStore​​​​<aggregator>​​​​release-strategy​

<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>

对于 1000 条简单消息,它会生成类似于以下内容的结果:

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...

但是,从版本 5.5 开始,所有持久实现都提供基于目标数据库流式处理 API 的合约。 当商店中的组非常大时,这可以提高资源利用率。 在框架内部,当它在启动时重新调度持久化的消息时,这个新 API 在Delayer(例如)中使用。 返回的必须在处理结束时关闭,例如通过自动关闭。 无论何时使用,它都会委托给。​​MessageGroupStore​​​​streamMessagesForGroup(Object groupId)​​​​Stream<Message<?>>​​​​try-with-resources​​​​PersistentMessageGroup​​​​streamMessages()​​​​MessageGroupStore.streamMessagesForGroup()​

消息组条件

从版本 5.5 开始,抽象提供了字符串选项。 此选项的值可以是以后出于任何原因可以解析的任何值,以便为组做出决定。 例如,afrom相关消息处理程序可以从组中引用此属性,而不是迭代组中的所有消息。 TheexexexposAPI。 为此,已将选项添加到。 将每条消息添加到组后,将针对该函数以及组的现有条件评估此函数。 实现可能决定返回新值、现有值或将目标条件重置为。 a的值可以是JSON,SpEL表达式,数字或任何可以序列化为字符串并在之后解析的内容。 例如,从文件聚合器组件中,将条件从消息的标头填充到组中,并从它将组大小与此条件中的值进行比较进行咨询。 这样,它就不会迭代组中的所有消息以查找带有标头的消息。 它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。​​MessageGroup​​​​condition​​​​ReleaseStrategy​​​​MessageGroupStore​​​​setGroupCondition(Object groupId, String condition)​​​​setGroupConditionSupplier(BiFunction<Message<?>, String, String>)​​​​AbstractCorrelatingMessageHandler​​​​null​​​​condition​​​​FileMarkerReleaseStrategy​​​​FileHeaders.LINE_COUNT​​​​FileSplitter.FileMarker.Mark.END​​​​canRelease()​​​​FileSplitter.FileMarker.Mark.END​​​​FileHeaders.LINE_COUNT​

此外,为了便于配置,还引入了契约。 检查所提供的接口是否实现此接口并提取 afor 组条件评估逻辑。​​GroupConditionProvider​​​​AbstractCorrelatingMessageHandler​​​​ReleaseStrategy​​​​conditionSupplier​

元数据存储

许多外部系统、服务或资源不是事务性的(Twitter、RSS、文件系统等),并且无法将数据标记为已读。 此外,有时,您可能需要在某些集成解决方案中实现企业集成模式幂等接收器。 为了实现这个目标,并在下一次与外部系统交互之前存储端点的一些先前状态或处理下一条消息,Spring 集成提供了元数据存储组件作为具有通用键值协定的接口的实现。​​org.springframework.integration.metadata.MetadataStore​

元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个源条目的发布日期),以帮助源适配器等组件处理重复项。 如果未直接为组件提供对 a 的引用,则查找元数据存储的算法如下:首先,在应用程序上下文中查找具有 aID 的 Bean。 如果找到,请使用它。 否则,请创建一个新的实例,这是一个内存中实现,仅在当前正在运行的应用程序上下文的生命周期内保留元数据。 这意味着,重新启动后,您最终可能会得到重复的条目。​​MetadataStore​​​​metadataStore​​​​SimpleMetadataStore​

如果需要在应用程序上下文重新启动之间保留元数据,框架将提供以下持久性:​​MetadataStores​

  • ​PropertiesPersistingMetadataStore​
  • 榛子广播元数据存储
  • JDBC 元数据存储
  • MongoDB 元数据存储
  • 红地元数据存储
  • 动物园管理员元数据存储

Theis 由一个属性文件和一个PropertiesPersister 支持。​​PropertiesPersistingMetadataStore​

默认情况下,它仅保留应用程序上下文正常关闭时的状态。 它实现以便您可以通过调用随意保留状态。 下面的示例演示如何使用 XML 配置“属性持久元数据存储”:​​Flushable​​​​flush()​

<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

或者,您可以提供自己的接口实现(例如),并在应用程序上下文中将其配置为 Bean。​​MetadataStore​​​​JdbcMetadataStore​

从版本 4.0 开始,,,并实施。 它们提供原子更新,可以跨多个组件或应用程序实例使用。​​SimpleMetadataStore​​​​PropertiesPersistingMetadataStore​​​​RedisMetadataStore​​​​ConcurrentMetadataStore​

幂等接收器和元数据存储

当需要筛选传入消息(如果已处理)时,元数据存储对于实现 EIP幂等接收方模式非常有用,您可以丢弃它或在丢弃时执行一些其他逻辑。 以下配置显示了如何执行此操作的示例:

<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

幂等条目可能是到期日期,在此日期之后,某些计划的收割者应从元数据存储中删除该条目。​​value​

另请参阅幂等接收器企业集成模式。

​MetadataStoreListener​

某些元数据存储(目前仅 zookeeper)支持注册侦听器以在项目更改时接收事件,如以下示例所示:

public interface MetadataStoreListener {

void onAdd(String key, String value);

void onRemove(String key, String oldValue);

void onUpdate(String key, String newValue);
}

有关更多信息,请参阅Javadoc。 如果您只对事件的子集感兴趣,则可以对其进行子类化。​​MetadataStoreListenerAdapter​

控制总线

企业集成模式 (EIP) 一书中所述,控制总线背后的思想是,与用于“应用程序级”消息传递相同的消息传递系统可用于监视和管理框架中的组件。 在 Spring 集成中,我们基于上述适配器进行构建,以便您可以发送消息作为调用公开操作的一种方式。

以下示例演示如何使用 XML 配置控制总线:

<int:control-bus input-channel="operationChannel"/>

控制总线有一个输入通道,可以访问该通道以在应用程序上下文中调用对 Bean 的操作。 它还具有服务激活终结点的所有通用属性。 例如,如果操作的结果具有要发送到下游通道的返回值,则可以指定输出通道。

控制总线在输入通道上运行消息作为 Spring 表达式语言 (SpEL) 表达式。 它接受一条消息,将正文编译为表达式,添加一些上下文,然后运行它。 默认上下文支持任何已批注 withor 的方法。 它还支持 Spring 接口上的方法(以及从 5.2 版开始的扩展),并且它支持用于配置 Spring 的几个 Spring'sand实现的方法。 确保自己的方法可用于控制总线的最简单方法是使用理论注释。 由于这些注释还用于向 JMX MBean 注册表公开方法,因此它们提供了一个方便的副产品:通常,您希望向控制总线公开的相同类型的操作对于通过 JMX 公开是合理的)。 应用程序上下文中任何特定实例的解析都是通过典型的 SpEL 语法实现的。 为此,请提供 Bean 名称以及 Bean 的 SpEL 前缀 ()。 例如,要在 Spring Bean 上执行方法,客户端可以向操作通道发送消息,如下所示:​​@ManagedAttribute​​​​@ManagedOperation​​​​Lifecycle​​​​Pausable​​​​TaskExecutor​​​​TaskScheduler​​​​@ManagedAttribute​​​​@ManagedOperation​​​​@​

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

表达式上下文的根是 the本身,因此您还可以访问表达式中的 theandas 变量。 这与 Spring 集成端点中的所有其他表达式支持一致。​​Message​​​​payload​​​​headers​

使用 Java 注释,您可以按如下方式配置控制总线:

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}

同样,您可以按如下方式配置 Java DSL 流定义:

@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}

如果您希望将 lambda 与自动创建一起使用,则可以按如下方式创建控制总线:​​DirectChannel​

@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}

在本例中,通道被命名。​​controlBus.input​

有序关机

如“MBean 导出器”中所述,MBean 导出器提供了一个调用的 JMX 操作,该操作用于有序地停止应用程序。 该操作具有单个参数。 该参数指示操作等待多长时间(以毫秒为单位)以允许动态消息完成。 该操作的工作方式如下:​​stopActiveComponents​​​​Long​

  1. 调用所有实现的 bean。beforeShutdown()OrderlyShutdownCapable

    这样做可以让这些组件为关闭做好准备。 实现此接口的组件及其对此调用执行的操作的示例包括停止其侦听器容器的 JMS 和 AMQP 消息驱动的适配器、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、丢弃(记录)收到的任何新消息的 TCP 入站终结点,以及为任何新请求返回的 HTTP 入站终结点。503 - Service Unavailable
  2. 停止任何活动通道,例如 JMS 或 AMQP 支持的通道。
  3. 停止所有实例。MessageSource
  4. 停止所有入站(不是)。MessageProducerOrderlyShutdownCapable
  5. 等待剩余时间,由传递给操作的参数值定义。Long

    这样做可以让任何飞行中的消息完成其旅程。 因此,在调用此操作时选择适当的超时非常重要。
  6. 卡隆所有组件。afterShutdown()OrderlyShutdownCapable

    这样做可以让这些组件执行最终的关闭任务(例如,关闭所有打开的套接字)。

如有序关闭托管操作中所述,可以使用 JMX 调用此操作。 如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对该方法的引用。 如果在定义中提供了 noattribute,则 bean 具有生成的名称。 此名称包含一个随机组件,以避免在同一 JVM 中存在多个 Spring 集成上下文时发生冲突()。​​IntegrationMBeanExporter​​​​id​​​​<int-jmx:mbean-export/>​​​​ObjectName​​​​MBeanServer​

因此,如果您希望以编程方式调用该方法,我们建议您为导出器提供属性,以便您可以在应用程序上下文中轻松访问它。​​id​

最后,可以使用元素调用该操作。 有关详细信息,请参阅监视 Spring 集成示例应用程序。​​<control-bus>​

前面描述的算法在版本 4.1 中得到了改进。 以前,所有任务执行程序和计划程序都已停止。 这可能会导致实例中的中流消息保留。 现在,关闭使轮询器保持运行,以便将这些消息排出并进行处理。​​QueueChannel​

集成图

从版本 4.3 开始,Spring 集成提供了对应用程序的运行时对象模型的访问,该模型可以选择包含组件度量。 它以图形形式公开,可用于可视化集成应用程序的当前状态。 Thepackage 包含收集、构建 Spring 集成组件的运行时状态并将其呈现为单个树状对象所需的所有类。 应该声明为一个 bean 来构建、检索和刷新对象。 结果对象可以序列化为任何格式,尽管 JSON 在客户端解析和表示非常灵活且方便。 仅包含默认组件的 Spring 集成应用程序将公开一个图形,如下所示:​​o.s.i.support.management.graph​​​​Graph​​​​IntegrationGraphServer​​​​Graph​​​​Graph​

{
"contentDescriptor" : {
"providerVersion" : "6.0.0",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}

版本 5.2 弃用了旧指标,转而使用千分尺计,如指标管理中所述。 旧衡量指标已在版本 5.4 中移除,将不再显示在图表中。

在前面的示例中,图形由三个顶级元素组成。

元素包含有关提供数据的应用程序的一般信息。 可以在 Bean 上或在应用程序上下文环境属性中自定义。 框架提供了其他属性,使您可以将类似的模型与其他源区分开来。​​contentDescriptor​​​​name​​​​IntegrationGraphServer​​​​spring.application.name​

图形元素表示来自图形元素的节点之间的连接,因此表示源 Spring 集成应用程序中的集成组件之间的连接。 例如,从 ato anwith someor from anto a。 为方便起见并让您确定链接的用途,该模型包含属性。 可能的类型有:​​links​​​​nodes​​​​MessageChannel​​​​EventDrivenConsumer​​​​MessageHandler​​​​AbstractReplyProducingMessageHandler​​​​MessageChannel​​​​type​

  • ​input​​:标识从端点的方向,或属性MessageChannelinputChannelrequestChannel
  • ​output​​:从,,或到通过无属性的方向MessageHandlerMessageProducerSourcePollingChannelAdapterMessageChanneloutputChannelreplyChannel
  • ​error​​: Fromonororto thethrough anproperty;MessageHandlerPollingConsumerMessageProducerSourcePollingChannelAdapterMessageChannelerrorChannel
  • ​discard​​:从(如)到通过属性。DiscardingMessageHandlerMessageFilterMessageChannelerrorChannel
  • ​route​​:从(如)到。 类似于但在运行时确定。 可能是配置的通道映射或动态解析的通道。 为此,路由器通常最多只保留 100 个动态路由,但您可以通过设置属性来修改此值。AbstractMappingMessageRouterHeaderValueRouterMessageChanneloutputdynamicChannelLimit

可视化工具可以使用此元素中的信息来呈现来自图形元素的节点之间的连接,其中 andnumbers 表示来自链接节点属性的值。 例如,元素可用于确定目标节点的正确性。​​nodes​​​​from​​​​to​​​​nodeId​​​​link​​​​port​

以下“文本图像”显示了类型之间的关系:

+---(discard)
|
+----o----+
| |
| |
| |
(input)--o o---(output)
| |
| |
| |
+----o----+
|
+---(error)

Thegraph 元素可能是最有趣的,因为它的元素不仅包含运行时组件及其实例和值,还可以选择性地包含组件公开的指标。 节点元素包含各种属性,这些属性通常是不言自明的。 例如,基于表达式的组件包括包含组件的主表达式字符串的属性。 要启用指标,请添加 anto aclass 或将 anelement 添加到 XML 配置中。 有关完整信息,请参阅指标和管理。​​nodes​​​​componentType​​​​name​​​​expression​​​​@EnableIntegrationManagement​​​​@Configuration​​​​<int:management/>​

其中提供了一个唯一的增量标识符,可让您将一个组件与另一个组件区分开来。 它还在元素中用于表示此组件与其他组件的关系(连接)(如果有)。 和属性是用于,,,或的和属性。 有关详细信息,请参阅下一节。​​nodeId​​​​links​​​​input​​​​output​​​​inputChannel​​​​outputChannel​​​​AbstractEndpoint​​​​MessageHandler​​​​SourcePollingChannelAdapter​​​​MessageProducerSupport​

从版本 5.1 开始,接受 afor 填充的附加属性在 for 特定。 例如,您可以将 andand 属性公开到目标图中:​​IntegrationGraphServer​​​​Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback​​​​IntegrationNode​​​​NamedComponent​​​​SmartLifecycle​​​​autoStartup​​​​running​

server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});

图形运行时模型

弹簧集成组件具有不同程度的复杂性。 例如,任何轮询也有 aand ato,可以定期从源数据发送消息。 其他组件可能是中间件请求-回复组件(例如),具有 消耗订阅(或轮询)the() 的消息,以及 a() 生成要发送到下游的回复消息。 同时,任何实现(例如)包装一些源协议侦听逻辑并向其发送消息。​​MessageSource​​​​SourcePollingChannelAdapter​​​​MessageChannel​​​​JmsOutboundGateway​​​​AbstractEndpoint​​​​requestChannel​​​​input​​​​replyChannel​​​​output​​​​MessageProducerSupport​​​​ApplicationEventListeningMessageProducer​​​​outputChannel​

在图中,Spring 集成组件使用类层次结构表示,您可以在包中找到该层次结构。 例如,您可以使用 for the(因为它有 aoption),并且在使用 a 从 aby 消费时会产生错误。 另一个例子是——当使用 an 订阅 aby 时。​​IntegrationNode​​​​o.s.i.support.management.graph​​​​ErrorCapableDiscardingMessageHandlerNode​​​​AggregatingMessageHandler​​​​discardChannel​​​​PollableChannel​​​​PollingConsumer​​​​CompositeMessageHandlerNode​​​​MessageHandlerChain​​​​SubscribableChannel​​​​EventDrivenConsumer​

(请参阅消息网关​)为其每个方法提供节点,其中属性基于网关的 Bean 名称和短方法签名。 请考虑以下网关示例:​​@MessagingGateway​​​​name​

@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

void foo(String foo);

void foo(Integer foo);

void bar(String bar);

}

上述网关生成类似于以下内容的节点:

{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}

您可以使用此层次结构在客户端解析图形模型,以及了解一般的 Spring 集成运行时行为。 另请参阅编程提示和技巧以获取更多信息。​​IntegrationNode​

版本 5.3 引入了抽象和所有表示企业集成模式 (EIP) 的开箱即用组件,实现了此抽象并提供 anenum 值。 此信息对于目标应用程序中的某些分类逻辑非常有用,或者,在图形节点中公开时,UI 可以使用它来确定如何绘制组件。​​IntegrationPattern​​​​IntegrationPatternType​

集成图控制器

如果你的应用程序是基于Web的(或者建立在Spring Boot之上,带有一个嵌入式的Web容器),并且Spring Integration HTTP或WebFlux模块(分别参见HTTP支持和WebFlux支持)存在于类路径上,你可以使用a将功能公开为REST服务。 为此,在 HTTP 模块中提供了 theandclass 注释和 XML 元素。 与注释(或对于XML定义)一起,此配置寄存器在注释或元素上配置扫描的位置。 默认路径为。​​IntegrationGraphController​​​​IntegrationGraphServer​​​​@EnableIntegrationGraphController​​​​@Configuration​​​​<int-http:graph-controller/>​​​​@EnableWebMvc​​​​<mvc:annotation-driven/>​​​​IntegrationGraphController​​​​@RestController​​​​@RequestMapping.path​​​​@EnableIntegrationGraphController​​​​<int-http:graph-controller/>​​​​/integration​

提供以下服务:​​IntegrationGraphController​​​​@RestController​

  • ​@GetMapping(name = "getGraph")​​:检索自上次刷新以来 Spring 集成组件的状态。 Theis 作为 REST 服务的 ais 返回。IntegrationGraphServero.s.i.support.management.graph.Graph@ResponseBody
  • ​@GetMapping(path = "/refresh", name = "refreshGraph")​​:刷新实际运行时状态的当前值,并将其作为 REST 响应返回。 无需刷新指标的图表。 检索图形时会实时提供它们。 如果自上次检索图形以来已修改应用程序上下文,则可以调用刷新。 在这种情况下,图形将完全重建。Graph

您可以使用 Spring 安全性和 Spring MVC 项目提供的标准配置选项和组件来设置安全性和跨源限制。 以下示例实现了这些目标:​​IntegrationGraphController​

<mvc:annotation-driven />

<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>

<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

以下示例显示了如何使用 Java 配置执行相同的操作:

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}

//...

}

请注意,为方便起见,theannotation 提供了属性。 这提供了对 的访问。 为了更复杂,您可以使用标准的Spring MVC机制来配置CORS映射。​​@EnableIntegrationGraphController​​​​allowedOrigins​​​​GET​​​​path​

举报

相关推荐

0 条评论