0
点赞
收藏
分享

微信扫一扫

Spring Integration Java 配置和 DSL

Spring Integration Java 配置和 DSL _API

Spring Integration Java 配置和 DSL 提供了一组方便的构建器和一个流畅的 API,允许您配置 Spring Integration 消息流。​​@Configuration​

(另请参阅Kotlin DSL。

(另请参阅Groovy DSL。

用于Spring Integration的Java DSL本质上是Spring Integration的门面。 DSL提供了一种简单的方法,通过将fluentpattern与Spring Framework和Spring Integration中的现有Java配置一起使用,将Spring Integration Message Flow嵌入到应用程序中。 我们还使用并支持lambdas(在Java 8中可用)来进一步简化Java配置。​​Builder​

咖啡馆提供了使用DSL的一个很好的例子。

DSL由thefluent API提供(参见)。 这将生成组件,该组件应注册为Spring bean(通过使用注释)。 构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法层次结构。​​IntegrationFlow​​​​IntegrationFlowBuilder​​​​IntegrationFlow​​​​@Bean​

Theonly 收集 bean 中的集成组件(实例、实例等),以便通过应用程序上下文进一步解析和注册具体的 bean。​​IntegrationFlowBuilder​​​​MessageChannel​​​​AbstractEndpoint​​​​IntegrationFlow​​​​IntegrationFlowBeanPostProcessor​

Java DSL直接使用Spring Integration类,并绕过任何XML生成和解析。 但是,DSL在XML之上提供的不仅仅是语法糖。 其最引人注目的功能之一是能够定义内联 lambda 来实现终端节点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring Integration对Spring Expression Language(SpEL)和内联脚本的支持解决了这个问题,但lambda更容易,功能更强大。

以下示例显示了如何使用 Java 配置进行 Spring 集成:

@Configuration
@EnableIntegration
public class MyConfiguration {

@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}

@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}

前面的配置示例的结果是,它在启动后创建 Spring 集成端点和消息通道。 Java 配置既可用于替换 XML 配置,也可用于扩充 XML 配置。 您无需替换所有现有 XML 配置即可使用 Java 配置。​​ApplicationContext​

DSL 基础知识

该包包含前面提到的 API 和许多实现,它们也是构建器,并提供流畅的 API 来配置具体的端点。 该基础架构为基于消息的应用程序(如通道、端点、轮询器和通道拦截器)提供通用的企业集成模式 (EIP)。​​org.springframework.integration.dsl​​​​IntegrationFlowBuilder​​​​IntegrationComponentSpec​​​​IntegrationFlowBuilder​

终结点在 DSL 中表示为谓词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP 终结点:

  • 转换→Transformer
  • 过滤器→Filter
  • 处理→ServiceActivator
  • 拆分→Splitter
  • 聚合→Aggregator
  • 路线→Router
  • 桥→Bridge

从概念上讲,集成过程是通过将这些端点组合到一个或多个消息流中来构造的。 请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元很有用。 DSL 提供了一个组件来定义通道和它们之间的端点的组合,但现在只扮演配置角色以在应用程序上下文中填充真正的 bean,并且在运行时不使用。 但是,bean for可以自动连线为 ato 控制和整个流程,该流程被委托给与此相关的所有 Spring 集成组件。 以下示例使用流利的 API 通过 EIP 方法定义 anbean:​​IntegrationFlow​​​​IntegrationFlow​​​​IntegrationFlow​​​​Lifecycle​​​​start()​​​​stop()​​​​IntegrationFlow​​​​IntegrationFlow​​​​IntegrationFlow​​​​IntegrationFlowBuilder​

@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}

该方法接受 lambda 作为终端节点参数,以对消息有效负载进行操作。 这种方法的真正论据是实例。 因此,任何提供的变压器(、和其他)都可以在这里使用。​​transform​​​​GenericTransformer<S, T>​​​​ObjectToJsonTransformer​​​​FileToStringTransformer​

在幕后,分别识别和它的端点。 再举一个例子:​​IntegrationFlowBuilder​​​​MessageHandler​​​​MessageTransformingHandler​​​​ConsumerEndpointFactoryBean​

@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}

前面的示例组成了 的序列。 流程是“单向”的。 也就是说,它不提供回复消息,而只是将有效负载打印到 STDOUT。 终结点使用直接通道自动连接在一起。​​Filter → Transformer → Service Activator​

Lambdas and Arguments​​Message<?>​


在 EIP 方法中使用 lambda 时,“input”参数通常是消息负载。 如果要访问整个消息,请使用采用 aas 第一个参数的重载方法之一。 例如,这将不起作用:​​Class<?>​

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时失败,因为 lambda 不保留参数类型,框架将尝试将有效负载强制转换为 a。​​ClassCastException​​​​Message<?>​


相反,请使用:

.(Message.class, m -> newFooFromMessage(m))


Bean 定义覆盖


Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的注入的 bean。 如果为内联对象和现有 Bean 定义定义了相同的 Bean 名称,则抛出 ais 表示此类配置是错误的。 但是,当您处理 bean 时,无法从集成流处理器中检测到现有的 bean 定义,因为每次我们从 abean 调用 abean 时,我们都会得到一个新实例。 这样,在 theas 中使用提供的实例,无需任何 Bean 注册和对现有 bean 定义进行任何可能的检查。 但是,如果此对象具有显式 Bean 定义,则为此对象调用此名称是 inscope。​​BeanDefinitionOverrideException​​​​prototype​​​​prototype​​​​BeanFactory​​​​IntegrationFlow​​​​prototype​​​​BeanFactory.initializeBean()​​​​id​​​​prototype​


消息通道

除了使用 EIP 方法之外,Java DSL 还提供了流畅的 API 来配置实例。 为此,提供了建造者工厂。 以下示例演示如何使用它:​​IntegrationFlowBuilder​​​​MessageChannel​​​​MessageChannels​

@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}

同一生成器工厂可用于 EIP 方法从到连接端点,类似于 XML 配置中的连接/对。 缺省情况下,端点与 Bean 名称基于以下模式的实例相连: 此规则也适用于内联生成器工厂使用生成的未命名通道。 但是,所有方法都有一个知道的变体,您可以使用它来设置实例的 Bean 名称。 引用,可用作 Bean 方法调用。 以下示例显示了使用 EIP 方法的可能方法:​​MessageChannels​​​​channel()​​​​IntegrationFlowBuilder​​​​input-channel​​​​output-channel​​​​DirectChannel​​​​[IntegrationFlow.beanName].channel#[channelNameIndex]​​​​MessageChannels​​​​MessageChannels​​​​channelId​​​​MessageChannel​​​​MessageChannel​​​​beanName​​​​channel()​

@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}

  • ​from("input")​​意思是“'查找并使用带有”输入“ID的ID,或创建一个”。MessageChannel
  • ​fixedSubscriberChannel()​​生成一个实例并使用名称 of 注册它。FixedSubscriberChannelchannelFlow.channel#0
  • ​channel("queueChannel")​​工作方式相同,但使用现有的bean。queueChannel
  • ​channel(publishSubscribe())​​是 Bean 方法引用。
  • ​channel(MessageChannels.executor("executorChannel", this.taskExecutor))​​是 那 暴露给 并注册为。IntegrationFlowBuilderIntegrationComponentSpecExecutorChannelexecutorChannel
  • ​channel("output")​​注册 thebean 作为它的名字,只要不存在具有这个名字的 bean。DirectChanneloutput

注意:前面的定义是有效的,其所有通道都应用于具有实例的端点。​​IntegrationFlow​​​​BridgeHandler​

请注意,从不同的实例中使用相同的内联通道定义。 即使 DSL 解析器将不存在的对象注册为 bean,它也无法从不同的容器中确定相同的对象 ()。 以下示例是错误的:​​MessageChannels​​​​IntegrationFlow​​​​MessageChannel​​​​IntegrationFlow​

@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}

@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}

该错误示例的结果是以下异常:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其正常工作,您需要声明该通道并使用来自不同实例的 Bean 方法。​​@Bean​​​​IntegrationFlow​

轮询者

Spring Integration 还提供了一个流畅的 API,可让您配置实现。 您可以使用构建器工厂来配置常见的 Bean 定义或从 EIP 方法创建的 Bean 定义,如以下示例所示:​​PollerMetadata​​​​AbstractPollingEndpoint​​​​Pollers​​​​IntegrationFlowBuilder​

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}

有关更多信息,请参阅 Javadoc 中的Pollers和PollerSpec。

如果使用 DSL 构造 aas a,请不要在 Bean 定义中调用该方法。 从规范生成对象并初始化其所有属性。​​PollerSpec​​​​@Bean​​​​get()​​​​PollerSpec​​​​FactoryBean​​​​PollerMetadata​

端点​​reactive()​

从版本 5.5 开始,提供了带有可选定制器的配置属性。 此选项将目标终端节点配置为实例,与转换为 avia 的输入通道类型无关。 提供的函数从运算符用于自定义(,等)来自输入通道的反应性流源。​​ConsumerEndpointSpec​​​​reactive()​​​​Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>​​​​ReactiveStreamsConsumer​​​​Flux​​​​IntegrationReactiveUtils.messageChannelToFlux()​​​​Flux.transform()​​​​publishOn()​​​​log()​​​​doOnNext()​

下面的示例演示如何将发布线程从独立于最终订阅者和生产者的输入通道更改为该通道:​​DirectChannel​

@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}

有关详细信息,请参阅反应式流支持。

DSL 和终结点配置

AllEIP 方法有一个变体,它应用 lambda 参数为实例:,,, 和其他参数提供选项。 它们中的每一个都有泛型参数,因此它允许您配置终结点,甚至在上下文中配置它,如以下示例所示:​​IntegrationFlowBuilder​​​​AbstractEndpoint​​​​SmartLifecycle​​​​PollerMetadata​​​​request-handler-advice-chain​​​​MessageHandler​

@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}

此外,还提供了一种方法,允许您使用给定的 Bean 名称(而不是生成的 Bean 名称)注册端点 Bean。​​EndpointSpec​​​​id()​

如果 theis 作为 bean 引用,那么如果该方法存在于 DSL 定义中,则任何现有配置都将被覆盖:​​MessageHandler​​​​adviceChain​​​​.advice()​

@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}

它们没有合并,在这种情况下只使用 thebean。​​testAdvice()​

变形金刚

DSL API 提供了一个方便、流畅的工厂,可用作 EIP 方法中的内联目标对象定义。 以下示例演示如何使用它:​​Transformers​​​​.transform()​

@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}

它避免了使用资源库进行不方便的编码,并使流定义更加简单。 请注意,您可以使用 将目标实例声明为实例,并再次将它们从定义用作 Bean 方法。 尽管如此,DSL 解析器会处理内联对象的 bean 声明(如果它们尚未定义为 bean)。​​Transformers​​​​Transformer​​​​@Bean​​​​IntegrationFlow​

请参阅 Javadoc 中的转换器,了解更多信息和支持的工厂方法。

另请参阅Lambda 和消息<?>参数。

入站通道适配器

通常,消息流从入站通道适配器(例如)开始。 适配器配置为 ,并要求 ato 定期生成消息。 Java DSL也允许从a开始。 为此,Fluent API 提供了一个重载方法。 您可以将 bean 配置为 bean,并将其作为该方法的参数提供。 的第二个参数是 alambda,它允许您为 提供选项(例如 asor)。 以下示例显示了如何使用流畅的 API 和 lambda 来创建:​​<int-jdbc:inbound-channel-adapter>​​​​<poller>​​​​MessageSource<?>​​​​IntegrationFlow​​​​MessageSource<?>​​​​IntegrationFlow​​​​IntegrationFlow.from(MessageSource<?> messageSource)​​​​MessageSource<?>​​​​IntegrationFlow.from()​​​​Consumer<SourcePollingChannelAdapterSpec>​​​​PollerMetadata​​​​SmartLifecycle​​​​SourcePollingChannelAdapter​​​​IntegrationFlow​

@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}

对于那些不需要直接构建对象的情况,您可以使用基于 的变体。 结果自动包装在 a(如果它还没有 a)。​​Message​​​​IntegrationFlow.fromSupplier()​​​​java.util.function.Supplier​​​​Supplier.get()​​​​Message​​​​Message​

消息路由器

Spring 集成本机提供专用路由器类型,包括:

  • ​HeaderValueRouter​
  • ​PayloadTypeRouter​
  • ​ExceptionTypeRouter​
  • ​RecipientListRouter​
  • ​XPathRouter​

与许多其他DSLEIP方法一样,该方法可以应用任何实现,或者为方便起见,可以应用aas SpEL表达式或a对。 此外,您可以使用 lambda 进行配置,并将 lambda 用于 a。 流畅的 API 还提供配对等选项,如以下示例所示:​​IntegrationFlowBuilder​​​​route()​​​​AbstractMessageRouter​​​​String​​​​ref​​​​method​​​​route()​​​​Consumer<RouterSpec<MethodInvokingRouter>>​​​​AbstractMappingMessageRouter​​​​channelMapping(String key, String channelName)​

@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}

以下示例显示了一个基于表达式的简单路由器:

@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}

该方法采用 a,如以下示例所示:​​routeToRecipients()​​​​Consumer<RecipientListRouterSpec>​

@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}

定义允许您将路由器设置为网关,以继续处理主流中不匹配的消息。​​.defaultOutputToParentFlow()​​​​.routeToRecipients()​​​​defaultOutput​

另请参阅Lambda 和消息<?>参数。

分配器

若要创建拆分器,请使用 EIP 方法。 默认情况下,如果有效负载是 an、an、an、a 或反应式,该方法会将每个项目输出为单独的消息。 它接受 lambda、SpEL 表达式或任何实现。 或者,您可以使用它而不带参数来提供。 以下示例演示如何通过提供 lambda 来使用该方法:​​split()​​​​Iterable​​​​Iterator​​​​Array​​​​Stream​​​​Publisher​​​​split()​​​​AbstractMessageSplitter​​​​DefaultMessageSplitter​​​​split()​

@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}

前面的示例创建一个拆分器,用于拆分包含逗号分隔的邮件。​​String​

另请参阅Lambda 和消息<?>参数。

聚合器和重新排序器

Anis 在概念上与 a 相反。 它将单个消息序列聚合为单个消息,并且必然更复杂。 默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。 相同的规则适用于。 以下示例显示了拆分器聚合器模式的规范示例:​​Aggregator​​​​Splitter​​​​Resequencer​

@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}

该方法将列表拆分为单独的消息并将它们发送到。 该方法按消息头中找到的序列详细信息对消息重新排序。 该方法收集这些消息。​​split()​​​​ExecutorChannel​​​​resequence()​​​​aggregate()​

但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:

.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))

前面的示例关联具有标头的消息,并在至少累积 10 个消息后释放这些消息。​​myCorrelationKey​

为 EIP 方法提供了类似的 lambda 配置。​​resequence()​

服务激活器和方法​​.handle()​

EIP 方法的目标是在某些 POJO 上调用任何实现或任何方法。 另一种选择是使用 lambda 表达式定义“活动”。 因此,我们引入了一个泛型函数接口。 它的方法需要两个参数:and(从5.1版开始)。 有了这个,我们可以定义一个流,如下所示:​​.handle()​​​​MessageHandler​​​​GenericHandler<P>​​​​handle​​​​P payload​​​​MessageHeaders headers​

@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}

前面的示例将它接收的任何整数加倍。

但是,Spring 集成的一个主要目标是通过运行时类型从消息有效负载到消息处理程序的目标参数的转换。 由于 Java 不支持 lambda 类的泛型类型解析,因此我们为大多数 EIP 方法引入了一种解决方法,其中包含一个额外的参数。 这样做将硬转换工作委托给 Spring 的,Spring 使用提供的消息和请求的消息来定位方法参数。 以下示例显示了结果可能如下所示:​​loose coupling​​​​payloadType​​​​LambdaMessageProcessor​​​​ConversionService​​​​type​​​​IntegrationFlow​

@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}

我们也可以注册一些内容来摆脱额外的:​​BytesToIntegerConverter​​​​ConversionService​​​​.transform()​

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}

另请参阅Lambda 和消息<?>参数。

操作员网关()

定义中的运算符是一种特殊的服务激活器实现,通过其输入通道调用其他端点或集成流并等待回复。 从技术上讲,它与定义中的嵌套组件扮演着相同的角色(请参阅从链内调用链),并允许流更清晰、更直接。 从逻辑上讲,从业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅消息传递网关)。 此运算符具有多个用于不同目标的重载:​​gateway()​​​​IntegrationFlow​​​​<gateway>​​​​<chain>​

  • ​gateway(String requestChannel)​​按名称将消息发送到某个端点的输入通道;
  • ​gateway(MessageChannel requestChannel)​​通过直接注入将消息发送到某个端点的输入通道;
  • ​gateway(IntegrationFlow flow)​​将消息发送到提供的输入通道。IntegrationFlow

所有这些都有一个带有第二个参数的变体,用于配置目标和相对。 此外,基于方法允许调用现有 bean 或通过用于函数接口的就地 lambda 将流声明为子流,或者以方法清理器代码样式提取它:​​Consumer<GatewayEndpointSpec>​​​​GatewayMessageHandler​​​​AbstractEndpoint​​​​IntegrationFlow​​​​IntegrationFlow​​​​IntegrationFlow​​​​private​

@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}

private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}

If the downstream flow does not always return a reply, you should set the to 0 to prevent hanging the calling thread indefinitely. In that case, the flow will end at that point and the thread released for further work. ​​requestTimeout​

操作员日志()

为方便起见,为了记录通过 Spring 集成流 () 的消息旅程,提供了一个运算符。 在内部,它由 aas 其订户。 它负责将传入消息记录到下一个终结点或当前通道中。 以下示例演示如何使用:​​<logging-channel-adapter>​​​​log()​​​​WireTap​​​​ChannelInterceptor​​​​LoggingHandler​​​​LoggingHandler​

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,标头仅在 ontoonly 级别记录通过筛选器的消息和路由之前。​​id​​​​ERROR​​​​test.category​

从版本 6.0 开始,此运算符在流结束时的行为与其在中间的用法保持一致。 换句话说,即使删除了运算符,流的行为也保持不变。 因此,如果预计不会在流程结束时生成回复,则建议在最后一个之后使用。​​log()​​​​nullChannel()​​​​log()​

运算符截距()

从版本 5.3 开始,运算符允许在流中的当前注册一个或多个实例。 这是通过 API 创建显式的替代方法。 以下示例使用 ato 拒绝某些邮件,但有例外:​​intercept()​​​​ChannelInterceptor​​​​MessageChannel​​​​MessageChannel​​​​MessageChannels​​​​MessageSelectingInterceptor​

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

​MessageChannelSpec.wireTap()​

Spring Integration 包括流畅的 API 构建器。 下面的示例演示如何使用该方法记录输入:​​.wireTap()​​​​MessageChannelSpec​​​​wireTap​

@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}


如果是 的实例,则 ,或运算符应用于当前。 否则,会将中间项注入到当前配置的终结点的流中。 在下面的示例中,拦截器被直接添加到,因为实现:​​MessageChannel​​​​InterceptableChannel​​​​log()​​​​wireTap()​​​​intercept()​​​​MessageChannel​​​​DirectChannel​​​​WireTap​​​​myChannel​​​​DirectChannel​​​​InterceptableChannel​




@Bean
MessageChannel myChannel() {
return new DirectChannel();
}

...
.channel(myChannel())
.log()
}



当电流不实现时,隐式注入到,并添加到这个新的。 下面的示例没有任何通道声明:​​MessageChannel​​​​InterceptableChannel​​​​DirectChannel​​​​BridgeHandler​​​​IntegrationFlow​​​​WireTap​​​​DirectChannel​

.handle(...)
.log()
}

在前面的示例中(以及任何时间都没有声明通道),在当前位置注入隐式炎的 和 用作当前配置的输出通道(来自前面描述的)。​​DirectChannel​​​​IntegrationFlow​​​​ServiceActivatingHandler​​​​.handle()​

使用消息流

​IntegrationFlowBuilder​​提供顶级 API 以生成连接到消息流的集成组件。 当您的集成可以通过单个流完成时(通常是这种情况),这很方便。 或者,可以通过实例连接实例。​​IntegrationFlow​​​​MessageChannel​

默认情况下,在 Spring 集成用语中表现为“链”。 也就是说,终结点由实例自动隐式连接。 消息流实际上并不是构造为链,这提供了更大的灵活性。 例如,如果您知道流中的任何组件名称(即,如果您显式定义它),则可以向流中的任何组件发送消息。 您还可以引用流中的外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。 因此,DSL不支持Spring Integrationelement,因为在这种情况下它不会增加太多价值。​​MessageFlow​​​​DirectChannel​​​​inputChannel​​​​chain​

由于Spring Integration Java DSL生成与任何其他配置选项相同的Bean定义模型,并且基于现有的Spring Framework基础架构,因此它可以与XML定义一起使用,并与Spring Integration消息传递注释配置连接。​​@Configuration​

您还可以使用 lambda 定义直接实例。 以下示例演示如何执行此操作:​​IntegrationFlow​

@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}

此定义的结果是使用隐式直接通道连接的同一组集成组件。 这里唯一的限制是此流以命名的直接通道 - 启动。 此外,Lambda 流无法从 from 启动。​​lambdaFlow.input​​​​MessageSource​​​​MessageProducer​

从版本 5.1 开始,这种 of 包装到代理以公开生命周期控制并提供对内部关联的访问。​​IntegrationFlow​​​​inputChannel​​​​StandardIntegrationFlow​

从 V5.0.6 开始,为 an 中的组件生成的 Bean 名称包括流 Bean,后跟点 () 作为前缀。 例如,在前面的示例中,生成的是 Bean 名称。 (这是缩短的 from,以适合页面。 该端点的实现 Bean 具有 Bean 名称 (从版本 5.1 开始),其中使用其组件类型,而不是类的完全限定名称。 当必须在流中生成 Bean 名称时,相同的模式将应用于所有 the。 这些生成的 Bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起,以及避免在运行时同时注册集成流时出现争用情况。 有关更多信息,请参阅动态和运行时集成流。​​IntegrationFlow​​​​.​​​​ConsumerEndpointFactoryBean​​​​.transform("Hello "::concat)​​​​lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0​​​​o.s.i​​​​org.springframework.integration​​​​Transformer​​​​lambdaFlow.transformer#0​​​​MethodInvokingTransformer​​​​NamedComponent​

​FunctionExpression​

我们引入了 theclass(SpEL 接口的实现)来让我们使用 lambda 和。 当存在来自核心弹簧集成的隐式变体时,为 DSL 组件提供了该选项以及一个选项。 以下示例演示如何使用函数表达式:​​FunctionExpression​​​​Expression​​​​generics​​​​Function<T, R>​​​​expression​​​​Strategy​

.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))

还支持运行时类型转换,如 中所做的那样。​​FunctionExpression​​​​SpelExpression​

子流支持

某些 ofand 组件提供了使用子流指定其逻辑或映射的功能。 最简单的示例是,如以下示例所示:​​if…else​​​​publish-subscribe​​​​.publishSubscribeChannel()​

@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的定义获得相同的结果,但我们希望您发现逻辑组合的子流样式有用。 我们发现它会产生更短(因此更具可读性)的代码。​​IntegrationFlow​​​​@Bean​

从版本 5.3 开始,提供了基于 a 的实现,用于在代理支持的消息通道上配置子流订阅者。 例如,我们现在可以在以下位置上配置多个订阅者作为子流:​​BroadcastCapableChannel​​​​publishSubscribeChannel()​​​​Jms.publishSubscribeChannel()​

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}

@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}

类似的子流组合提供了该方法。​​publish-subscribe​​​​.routeToRecipients()​

另一个例子是使用代替方法。​​.discardFlow()​​​​.discardChannel()​​​​.filter()​

值得特别注意。 请考虑以下示例:​​.route()​

@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}

继续像在常规映射中一样工作,但将该子流绑定到主流。 换句话说,任何路由器的子流都会返回到主流之后。​​.channelMapping()​​​​Router​​​​.subFlowMapping()​​​​.route()​


有时,您需要引用现有的。 以下示例演示如何执行此操作:​​IntegrationFlow​​​​@Bean​​​​.subFlowMapping()​




@Bean
public IntegrationFlow splitRouteAggregate() {
return f -> f
.split()
.<Integer, Boolean>route(o -> o % 2 == 0,
m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, sf -> sf.gateway(evenFlow())))
.aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
return f -> f.handle((p, h) -> "even");
}




在这种情况下,当您需要从此类子流接收回复并继续主流时,必须使用 aas 包装 thisbean 引用(或其输入通道),如上例所示。 前面示例中的引用未包装到 。 因此,我们不需要来自此路由分支的回复。 否则,您最终会得到类似于以下内容的异常:​​IntegrationFlow​​​​.gateway()​​​​oddFlow()​​​​.gateway()​




Caused by: org.springframework.beans.factory.BeanCreationException:
The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
This is the end of the integration flow.




将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。


子流可以嵌套到任何深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,并且很难被人类解析。


在 DSL 支持子流配置的情况下,当正在配置的组件通常需要通道,并且该子流以 aelement 开头时,框架会隐式放置在组件输出通道和流的输入通道之间。 例如,在此定义中:​​channel()​​​​bridge()​​​​filter​




.filter(p -> p instanceof String, e -> e
.discardFlow(df -> df
.channel(MessageChannels.queue())
...)




框架在内部创建用于注入的 abean。 然后,它将子流包装成 anstart,其中包含订阅的此隐式通道,并将子流放在流中指定的通道之前。 当现有 Bean 用作子流引用(而不是内联子流,例如 lambda)时,不需要这样的桥接,因为框架可以从流 Bean 解析第一个通道。 对于内联子流,输入通道尚不可用。​​DirectChannel​​​​MessageFilter.discardChannel​​​​IntegrationFlow​​​​bridge​​​​channel()​​​​IntegrationFlow​


使用协议适配器

到目前为止显示的所有示例都说明了DSL如何使用Spring Integration编程模型支持消息传递体系结构。 然而,我们还没有进行任何真正的整合。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些以及更多。 理想情况下,DSL应该为所有这些提供一流的支持,但是实现所有这些并跟上新的适配器添加到Spring Integration是一项艰巨的任务。 因此,人们期望DSL不断赶上Spring Integration。

因此,我们提供高级 API 来无缝定义特定于协议的消息传递。 我们使用工厂和构建器模式以及 lambda 来做到这一点。 您可以将工厂类视为“命名空间工厂”,因为它们与特定于具体协议的 Spring 集成模块中的组件的 XML 命名空间扮演相同的角色。 目前,Spring Integration Java DSL支持,,,,,,,,,,,和命名空间工厂。 以下示例演示如何使用其中三个(、和):​​Amqp​​​​Feed​​​​Jms​​​​Files​​​​(S)Ftp​​​​Http​​​​JPA​​​​MongoDb​​​​TCP/UDP​​​​Mail​​​​WebFlux​​​​Scripts​​​​Amqp​​​​Jms​​​​Mail​

@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}

@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}

前面的示例演示如何使用“命名空间工厂”作为内联适配器声明。 但是,您可以从定义中使用它们来使方法链更具可读性。​​@Bean​​​​IntegrationFlow​

我们正在征求社区对这些命名空间工厂的反馈,然后再将精力花在其他工厂上。 我们也感谢对接下来应该支持的适配器和网关的优先级的任何输入。

您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。

所有其他协议通道适配器可以配置为通用 bean 并连接到 ,如以下示例所示:​​IntegrationFlow​

@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}

​IntegrationFlowAdapter​

该接口可以直接实现并指定为扫描组件,如以下示例所示:​​IntegrationFlow​

@Component
public class MyFlow implements IntegrationFlow {

@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}

}

它由应用程序拾取并在应用程序上下文中正确解析和注册。​​IntegrationFlowBeanPostProcessor​

为了方便和获得松散耦合架构的好处,我们提供了基类实现。 它需要方法实现才能使用方法之一生成 anby,如以下示例所示:​​IntegrationFlowAdapter​​​​buildFlow()​​​​IntegrationFlowDefinition​​​​from()​

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}

public String messageSource() {
return "T,H,I,N,G,2";
}

@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}

@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}

@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}

@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}

@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}

}

动态和运行时集成流

​IntegrationFlow​​并且其所有依赖组件都可以在运行时注册。 在 5.0 版本之前,我们使用钩子。 从 Spring 框架开始,我们使用 thehook 进行编程注册。 下面的示例演示如何以编程方式注册 Bean:​​BeanFactory.registerSingleton()​​​​5.0​​​​instanceSupplier​​​​BeanDefinition​

BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在前面的示例中,thehook 是方法的最后一个参数,在本例中由 lambda 提供。​​instanceSupplier​​​​genericBeanDefinition​

所有必要的 Bean 初始化和生命周期都是自动完成的,就像标准上下文配置 Bean 定义一样。

为了简化开发体验,Spring 集成引入了在运行时注册和管理实例,如以下示例所示:​​IntegrationFlowContext​​​​IntegrationFlow​

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);

IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());

IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建多个类似流的实例时,这很有用。 为此,我们可以迭代我们的选项并在循环中创建和注册实例。 另一种变体是当我们的数据源不是基于 Spring 时,因此我们必须动态创建它。 此类示例是反应式流事件源,如以下示例所示:​​IntegrationFlow​

Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();

this.integrationFlowContext.registration(integrationFlow)
.register();

The(作为结果)可用于为寄存器指定 bean 名称,控制其,以及寄存非 Spring 集成 bean。 通常,这些附加的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化程序和解序列化程序或任何其他所需的支持组件。​​IntegrationFlowRegistrationBuilder​​​​IntegrationFlowContext.registration()​​​​IntegrationFlow​​​​autoStartup​

当您不再需要动态注册的 bean 及其所有依赖的 bean 时,您可以使用回调来删除它们。 有关更多信息,请参阅IntegrationFlowContextJavadoc。​​IntegrationFlowRegistration.destroy()​​​​IntegrationFlow​

从版本 5.0.6 开始,定义中生成的所有 Bean 名称都以流 ID 作为前缀作为前缀。 我们建议始终指定显式流 ID。 否则,将在 中启动同步屏障,以生成 和 的 Bean 名称,并注册其 bean。 我们在这两个操作上同步,以避免当相同的生成的 Bean 名称可能用于不同的实例时出现争用条件。​​IntegrationFlow​​​​IntegrationFlowContext​​​​IntegrationFlow​​​​IntegrationFlow​

此外,从版本 5.0.6 开始,注册生成器 API 具有一个新方法: 如果您希望声明同一流的多个实例,并在流中的组件具有相同 ID 时避免 Bean 名称冲突,这将非常有用,如以下示例所示:​​useFlowIdAsPrefix()​

private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();

IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}

private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}

在这种情况下,可以使用 bean 的名称来引用第一个流的消息处理程序。​​tcp1.client.handler​

当您使用时,需要属性。​​id​​​​useFlowIdAsPrefix()​

​IntegrationFlow​​作为网关

可以从提供组件的服务接口开始,如以下示例所示:​​IntegrationFlow​​​​GatewayProxyFactoryBean​

public interface ControlBusGateway {

void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}

所有用于接口方法的代理都随通道一起提供,以将消息发送到下一个集成组件中。 您可以使用注释标记服务接口,并使用注释标记方法。 尽管如此,theis 被忽略并被该内部通道覆盖为下一个组件。 否则,使用 创建此类配置没有意义。​​IntegrationFlow​​​​@MessagingGateway​​​​@Gateway​​​​requestChannel​​​​IntegrationFlow​​​​IntegrationFlow​

默认情况下, 获取一个传统的 Bean 名称,例如。 可以使用属性或重载工厂方法更改该 ID。 此外,接口上注释中的所有属性都将应用于目标。 当注释配置不适用时,可以使用变体为目标代理提供适当的选项。 此 DSL 方法从版本 5.2 开始可用。​​GatewayProxyFactoryBean​​​​[FLOW_BEAN_NAME.gateway]​​​​@MessagingGateway.name()​​​​IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)​​​​@MessagingGateway​​​​GatewayProxyFactoryBean​​​​Consumer<GatewayProxySpec>​

在 Java 8 中,您甚至可以创建带有接口的集成网关,如以下示例所示:​​java.util.function​

@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}

可以按如下方式使用:​​errorRecovererFlow​

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL 扩展

从版本 5.3 开始,引入了 an,允许使用自定义或组合 EIP 运算符扩展现有的 Java DSL。 所需要的只是这个类的扩展,它提供了可以在bean定义中使用的方法。 扩展类也可用于自定义配置;例如,可以在现有扩展中实现错过或默认选项。 下面的示例演示了一个复合自定义运算符以及默认自定义扩展的用法:​​IntegrationFlowExtension​​​​IntegrationFlow​​​​IntegrationComponentSpec​​​​IntegrationComponentSpec​​​​AggregatorSpec​​​​outputProcessor​

public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}

public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}

}

public class CustomAggregatorSpec extends AggregatorSpec {

CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}

}

对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。 这样,目标定义将与新的和现有的DSL运算符一起使用:​​IntegrationFlow​

@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}

集成流组成

在《春游》中,抽象是一等公民,总是假设整合流的构成。 流中任何终结点的输入通道都可用于从任何其他终结点发送消息,而不仅仅是从具有此通道作为输出的终结点发送消息。 此外,使用 acontract、Content Enricher 组件、复合端点(如 a)以及现在的 withbeans(例如),在较短、可重用的部分之间分发业务逻辑非常简单。 最终合成所需的只是有关 ato 发送到或接收自的知识。​​MessageChannel​​​​@MessagingGateway​​​​<chain>​​​​IntegrationFlow​​​​IntegrationFlowAdapter​​​​MessageChannel​

从版本开始,为了从最终用户中抽象更多并隐藏实现细节,引入了工厂方法,以允许从现有流的输出启动当前:​​5.5.4​​​​MessageChannel​​​​IntegrationFlow​​​​from(IntegrationFlow)​​​​IntegrationFlow​

@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}

另一方面,添加了终端算子以继续在某个其他流的输入通道处的电流流:​​IntegrationFlowDefinition​​​​to(IntegrationFlow)​

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}

流中间的成分可以通过现有的EIP方法简单地实现。 通过这种方式,我们可以通过从更简单、可重用的逻辑块组合它们来构建具有任何复杂性的流。 例如,您可以添加一个 of beans 库作为依赖项,只需将它们的配置类导入到最终项目中并为您的定义自动连接就足够了。​​gateway(IntegrationFlow)​​​​IntegrationFlow​​​​IntegrationFlow​

举报

相关推荐

0 条评论