系统管理

系统管理

指标和管理

本节介绍如何捕获 Spring Integration 的指标。在最近的版本中,我们更多地依赖 Micrometer(参见 https://micrometer.io),我们计划在未来的版本中更多地使用 Micrometer。spring-doc.cadn.net.cn

在高容量环境中禁用日志记录

您可以在主消息流中控制调试日志记录。在非常大容量的应用程序中,对isDebugEnabled()对于某些日志记录子系统,成本可能相当昂贵。您可以禁用所有此类日志记录以避免此开销。异常日志记录(调试或其他)不受此设置的影响。spring-doc.cadn.net.cn

以下列表显示了用于控制日志记录的可用选项:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
    defaultLoggingEnabled = "true" <1>)

public static class ContextConfiguration {
...
}
XML
<int:management default-logging-enabled="true"/> (1)
1 设置为false以禁用主消息流中的所有日志记录,而不考虑日志系统类别设置。设置为 'true' 以启用调试日志记录(如果日志记录子系统也启用了)。仅当尚未在 Bean 定义中显式配置设置时才应用。缺省值为true.
defaultLoggingEnabled仅当未在 Bean 定义中显式配置相应设置时才应用。

千分尺集成

概述

从 5.0.3 版开始,千分尺的存在 MeterRegistry在应用程序上下文中触发对微米度量的支持。spring-doc.cadn.net.cn

要使用 Micrometer,请添加MeterRegistrybean 添加到应用程序上下文中。spring-doc.cadn.net.cn

对于每个MessageHandlerMessageChannel,则计时器被注册。对于每个MessageSource,则注册了计数器。spring-doc.cadn.net.cn

这仅适用于延伸的对象AbstractMessageHandler,AbstractMessageChannelAbstractMessageSource(大多数框架组件都是这种情况)。spring-doc.cadn.net.cn

Timer用于对消息通道进行发送作的计量具有以下名称或标记:spring-doc.cadn.net.cn

(一个failure结果,并带有none异常表示通道的send()返回作false.)spring-doc.cadn.net.cn

Counter用于对可轮询消息通道进行接收作的计量具有以下名称或标记:spring-doc.cadn.net.cn

Timer对消息处理程序进行作的计量具有以下名称或标记:spring-doc.cadn.net.cn

Counter消息源的计量具有以下名称/标记:spring-doc.cadn.net.cn

此外,还有三个Gauge米:spring-doc.cadn.net.cn

可以自定义的名称和标签Meters由集成组件通过提供MicrometerMetricsCaptor. MicrometerCustomMetricsTests 测试用例显示了如何执行此作的简单示例。 您还可以通过重载build()构建器子类上的方法。spring-doc.cadn.net.cn

从版本 5.1.13 开始,QueueChannel显示队列大小和剩余容量的千分尺仪表:spring-doc.cadn.net.cn

禁用仪表

默认情况下,所有仪表在首次使用时都会注册。 现在,使用 Micrometer,您可以添加MeterFilters 到MeterRegistry以防止部分或全部被注册。 您可以按提供的任何属性过滤掉(拒绝)计量,name,tag等。 有关更多信息,请参阅 Micrometer 文档中的 Meter Filtersspring-doc.cadn.net.cn

例如,给定:spring-doc.cadn.net.cn

@Bean
public QueueChannel noMeters() {
    return new QueueChannel(10);
}

您可以使用以下命令禁止仅此通道的仪表注册:spring-doc.cadn.net.cn

registry.config().meterFilter(MeterFilter.deny(id ->
        "channel".equals(id.getTag("type")) &&
        "noMeters".equals(id.getTag("name"))));

千分尺观察

从版本 6.0 开始,Spring Integration 利用 Micrometer Observation 抽象,该抽象可以通过适当的方式处理指标以及跟踪ObservationHandler配置。spring-doc.cadn.net.cn

观察处理在IntegrationManagement每当ObservationRegistrybean 存在于应用程序上下文中,并且@EnableIntegrationManagement已配置。 要自定义应检测的组件集,请将observationPatterns()属性在@EnableIntegrationManagement注解。 有关模式匹配算法,请参阅其 javadocs。spring-doc.cadn.net.cn

默认情况下,没有IntegrationManagement组件使用ObservationRegistry豆。 可以配置为匹配所有组件。*

在这种情况下,仪表不是独立收集的,而是委托给适当的ObservationHandler在提供的ObservationRegistry.spring-doc.cadn.net.cn

以下 Spring Integration 组件使用观察逻辑进行检测,每个逻辑都有各自的约定:spring-doc.cadn.net.cn

  • MessageProducerSupport,作为流的入站端点,被视为CONSUMERspan 类型,并使用IntegrationObservation.HANDLER应用程序接口;spring-doc.cadn.net.cn

  • MessagingGatewaySupport' 是一个入站请求-回复端点,被视为SERVERspan 类型。 它使用IntegrationObservation.GATEWAY应用程序接口;spring-doc.cadn.net.cn

  • AbstractMessageChannel.send()作是唯一生成消息的 Spring Integration API。 因此,它被视为PRODUCERspan 类型,并使用IntegrationObservation.PRODCUER应用程序接口。 当通道是分布式实现时,这更有意义(例如PublishSubscribeKafkaChannelZeroMqChannel),并且必须将跟踪信息添加到消息中。 因此,IntegrationObservation.PRODUCER观察基于MessageSenderContext其中 Spring Integration 提供了一个MutableMessage以允许后续跟踪Propagator添加标头,以便消费者可以使用它们;spring-doc.cadn.net.cn

  • AbstractMessageHandler是一个CONSUMERspan 类型,并使用IntegrationObservation.HANDLER应用程序接口。spring-doc.cadn.net.cn

IntegrationManagement组件可以通过以下方式进行定制ObservationConvention配置。 例如,一个AbstractMessageHandler期望MessageReceiverObservationConvention通过其setObservationConvention()应用程序接口。spring-doc.cadn.net.cn

以下是 Observation API 支持的指标、跨度和约定:spring-doc.cadn.net.cn

可观测性 - 指标

您可以在下面找到此项目声明的所有指标的列表。spring-doc.cadn.net.cn

网关

入站消息网关的观察。spring-doc.cadn.net.cn

指标名称 spring.integration.gateway(由约定类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.gateway.active(由约定类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

启动观察后添加的 KeyValues 可能在 *.active 指标中缺失。
千分尺内部使用nanoseconds用于基站。但是,每个后端都确定了实际的基数单元。(即普罗米修斯使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 1.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息网关组件的名称。spring-doc.cadn.net.cn

spring.integration.outcome (必填)spring-doc.cadn.net.cn

请求/回复执行的结果。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - “网关”。spring-doc.cadn.net.cn

处理器

消息处理程序的观察。spring-doc.cadn.net.cn

指标名称 spring.integration.handler(由约定类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.handler.active(由约定类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

启动观察后添加的 KeyValues 可能在 *.active 指标中缺失。
千分尺内部使用nanoseconds用于基站。但是,每个后端都确定了实际的基数单元。(即普罗米修斯使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 2.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'handler'。spring-doc.cadn.net.cn

制作人

对消息生产者的观察,例如通道。spring-doc.cadn.net.cn

指标名称 spring.integration.producer(由约定类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.producer.active(由约定类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

启动观察后添加的 KeyValues 可能在 *.active 指标中缺失。
千分尺内部使用nanoseconds用于基站。但是,每个后端都确定了实际的基数单元。(即普罗米修斯使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 3.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'producer'。spring-doc.cadn.net.cn

可观测性 - 跨度

您可以在下面找到此项目声明的所有跨度的列表。spring-doc.cadn.net.cn

网关跨度

入站消息网关的观察。spring-doc.cadn.net.cn

跨度名称 spring.integration.gateway(由约定类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 4.标记键

名称spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息网关组件的名称。spring-doc.cadn.net.cn

spring.integration.outcome (必填)spring-doc.cadn.net.cn

请求/回复执行的结果。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - “网关”。spring-doc.cadn.net.cn

处理程序跨度

消息处理程序的观察。spring-doc.cadn.net.cn

跨度名称 spring.integration.handler(由约定类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 5.标记键

名称spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'handler'。spring-doc.cadn.net.cn

生产者跨度

对消息生产者的观察,例如通道。spring-doc.cadn.net.cn

跨度名称 spring.integration.producer(由约定类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签必须以spring.integration.前缀!
表 6.标记键

名称spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'producer'。spring-doc.cadn.net.cn

可观测性 - 约定

您可以在下面找到所有列表GlobalObservationConventionObservationConvention由该项目声明。spring-doc.cadn.net.cn

表7.观察公约实现

ObservationConvention 类名称spring-doc.cadn.net.cn

适用的 ObservationContext 类名称spring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageReceiverObservationConventionspring-doc.cadn.net.cn

MessageReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageReceiverObservationConventionspring-doc.cadn.net.cn

MessageReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConventionspring-doc.cadn.net.cn

MessageRequestReplyReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageRequestReplyReceiverObservationConventionspring-doc.cadn.net.cn

MessageRequestReplyReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageSenderObservationConventionspring-doc.cadn.net.cn

MessageSenderContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageSenderObservationConventionspring-doc.cadn.net.cn

MessageSenderContextspring-doc.cadn.net.cn

观测传播

为了在一个跟踪中提供连接的跨度链,与消息传递流的性质无关,Spring Integration提供了一个ObservationPropagationChannelInterceptor实现。 这可以在MessageChannnel豆子单独或作为@GlobalChannelInterceptor与各自MessageChannnelbean 名称模式匹配。此拦截器的目标是传播Observation从生产者线程到消费者线程,独立于MessageChannnel实施和性质。 一个DirectChannel但是,由于其使用者直接在生产者线程上执行,因此会被忽略。spring-doc.cadn.net.cn

Spring Integration JMX 支持

消息历史记录

消息传递体系结构的主要好处是松散耦合,以便参与的组件不会保持对彼此的任何感知。仅这一事实就使应用程序变得非常灵活,允许您在不影响其余流的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等等。但是,当出现问题时,这种不起眼的体系结构风格可能会被证明是困难的。在调试时,您可能希望尽可能多地获得有关消息的信息(其来源、它遍历的通道和其他详细信息)。spring-doc.cadn.net.cn

消息历史记录是其中一种模式,它为您提供了一种模式,它为您提供了一种选项来保持对消息路径的某种程度的感知,以用于调试目的或维护审计跟踪。Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加标头并在每次消息通过跟踪组件时更新该标头。spring-doc.cadn.net.cn

消息历史记录配置

要启用消息历史记录,您只需定义message-history元素(或@EnableMessageHistory) 中,如以下示例所示:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableMessageHistory
XML
<int:message-history/>

现在,每个命名组件(定义了 'id')都会被跟踪。框架在消息中设置 'history' 标头。它的值 aList<Properties>.spring-doc.cadn.net.cn

请考虑以下配置示例:spring-doc.cadn.net.cn

Java
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
   ...
}

@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
    HeaderEnricher enricher =
           new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
    return enricher;
}
XML
<int:gateway id="sampleGateway"
    service-interface="org.springframework.integration.history.sample.SampleGateway"
    default-request-channel="bridgeInChannel"/>

<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
    <int:header name="baz" value="baz"/>
</int:header-enricher>

上述配置生成一个简单的消息历史记录结构,输出类似于以下内容:spring-doc.cadn.net.cn

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
 {name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]

要访问消息历史记录,您只需访问MessageHistory页眉。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

Iterator<Properties> historyIterator =
    message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

您可能不想跟踪所有组件。 要根据某些组件的名称将历史记录限制为某些组件,您可以提供tracked-components属性,并指定与要跟踪的组件匹配的组件名称和模式的逗号分隔列表。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
XML
<int:message-history tracked-components="*Gateway, sample*, aName"/>

在前面的示例中,仅维护以“Gateway”结尾、以“sample”开头或与名称“aName”完全匹配的组件的消息历史记录。spring-doc.cadn.net.cn

此外,MessageHistoryConfigurerbean 现在被IntegrationMBeanExporter(参见 MBean 导出器),允许您在运行时更改模式。 但是请注意,必须停止 Bean(关闭消息历史记录)才能更改模式。 此功能对于临时打开历史记录以分析系统可能很有用。 MBean 的对象名称为<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer.spring-doc.cadn.net.cn

只有一个@EnableMessageHistory(或<message-history/>) 必须在应用程序上下文中声明为组件跟踪配置的单一源。 不要对MessageHistoryConfigurer.
根据定义,消息历史记录标头是不可变的(您无法重写历史记录)。 因此,在写入消息历史记录值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史记录,对其进行修改并在回复消息上设置新列表。 无论哪种情况,即使消息本身跨越线程边界,也可以追加值。 这意味着历史记录值可以极大地简化异步消息流中的调试。

消息存储

企业集成模式 (EIP) 一书确定了几种能够缓冲消息的模式。 例如,聚合器会缓冲消息,直到可以释放它们,并且QueueChannel缓冲消息,直到使用者从该通道显式接收这些消息。 由于在消息流中的任何时间点都可能发生故障,因此缓冲消息的 EIP 组件还会引入消息可能丢失的点。spring-doc.cadn.net.cn

为了降低丢失消息的风险,EIP 定义了消息存储模式,它允许 EIP 组件存储消息,通常存储在某种类型的持久存储(例如 RDBMS)中。spring-doc.cadn.net.cn

Spring Integration 通过以下方式为消息存储模式提供支持:spring-doc.cadn.net.cn

有关如何配置特定消息存储实现以及如何注入MessageStore整个手册都描述了特定缓冲组件的实现(请参阅特定组件,例如 QueueChannelAggregatorDelayer 等)。 以下一对示例演示如何添加对消息存储的引用QueueChannel对于聚合器:spring-doc.cadn.net.cn

示例 1.队列通道
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
示例 2.聚合
<int:aggregator … message-store="refToMessageStore"/>

默认情况下,消息通过使用o.s.i.store.SimpleMessageStore,实现MessageStore. 这对于开发或简单的低容量环境来说可能没问题,因为这些环境不会担心非持久性消息的潜在丢失。 但是,典型的生产应用程序需要一个更可靠的选项,不仅可以降低消息丢失的风险,还可以避免潜在的内存不足错误。 因此,我们还提供MessageStore各种数据存储的实现。 以下是受支持实现的完整列表:spring-doc.cadn.net.cn

但是,在使用MessageStore.spring-doc.cadn.net.cn

消息数据(有效负载和标头)通过使用不同的序列化策略进行序列化和反序列化,具体取决于MessageStore. 例如,当使用JdbcMessageStoreSerializable默认情况下,数据是持久化的。 在这种情况下,在序列化发生之前,将删除不可序列化标头。 此外,请注意传输适配器(例如 FTP、HTTP、JMS 等)注入的特定于协议的标头。 例如<http:inbound-channel-adapter/>将 HTTP 标头映射到消息标头中,其中之一是ArrayList的不可序列化org.springframework.http.MediaType实例。 但是,您可以注入自己的SerializerDeserializer策略接口到一些MessageStore实现(例如JdbcMessageStore) 来更改序列化和反序列化的行为。spring-doc.cadn.net.cn

特别注意表示某些类型数据的标头。 例如,如果其中一个标头包含某个 Spring bean 的实例,则在反序列化时,您最终可能会得到该 bean 的不同实例,这直接影响框架创建的一些隐式标头(例如REPLY_CHANNELERROR_CHANNEL). 目前,它们不可序列化,但即使可序列化,反序列化通道也不会表示预期的实例。spring-doc.cadn.net.cn

从 Spring Integration 版本 3.0 开始,您可以使用配置为在向HeaderChannelRegistry.spring-doc.cadn.net.cn

此外,请考虑按如下方式配置消息流时会发生什么:网关→队列通道(由持久性消息存储支持)→服务激活器。 该网关创建一个临时应答通道,当服务激活器的轮询器从队列中读取时,该通道将丢失。 同样,您可以使用标头扩充器将标头替换为String表示法。spring-doc.cadn.net.cn

有关详细信息,请参阅标头扩充器spring-doc.cadn.net.cn

Spring Integration 4.0 引入了两个新接口:spring-doc.cadn.net.cn

  • ChannelMessageStore:实现特定于以下内容的作QueueChannel实例spring-doc.cadn.net.cn

  • PriorityCapableChannelMessageStore:标记MessageStore用于的实现PriorityChannel实例,并为持久化消息提供优先级顺序。spring-doc.cadn.net.cn

实际行为取决于实现。 该框架提供了以下实现,可以作为持久化MessageStoreQueueChannelPriorityChannel:spring-doc.cadn.net.cn

注意事项SimpleMessageStore

从 4.1 版本开始,SimpleMessageStore调用时不再复制消息组getMessageGroup(). 对于大型消息组,这是一个重大的性能问题。 4.0.1 引入了一个布尔值copyOnGet属性,用于控制此行为。 当聚合器在内部使用时,此属性设置为false以提高性能。 现在false默认情况下。spring-doc.cadn.net.cn

在组件(如聚合器)之外访问组存储的用户现在会获得对聚合器正在使用的组的直接引用,而不是副本。在聚合器外部作组可能会导致不可预知的结果。spring-doc.cadn.net.cn

因此,您不应该执行此类作或将copyOnGet属性设置为true.spring-doc.cadn.net.cn

MessageGroupFactory

从 4.3 版本开始,一些MessageGroupStore实现可以注入自定义MessageGroupFactory策略来创建和自定义MessageGroup实例MessageGroupStore. 这默认为SimpleMessageGroupFactory,产生SimpleMessageGroup实例基于GroupType.HASH_SET (LinkedHashSet) 内部集合。 其他可能的选项是SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个可用于恢复前一个SimpleMessageGroup行为。 此外,PERSISTENT选项可用。 有关详细信息,请参阅下一节。 从版本 5.0.1 开始,LIST当组中消息的顺序和唯一性无关紧要时,选项也可用。spring-doc.cadn.net.cn

持续MessageGroupStore和延迟加载

从 4.3 版本开始,所有持久性MessageGroupStore实例检索MessageGroup实例及其messages以延迟加载的方式从商店。 在大多数情况下,它对相关性很有用MessageHandler实例(参见 AggregatorResequencer),这会增加加载整个MessageGroup从存储中进行每个关联作。spring-doc.cadn.net.cn

您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)选项以从配置中关闭延迟加载行为。spring-doc.cadn.net.cn

我们对 MongoDB 上的延迟加载进行性能测试MessageStore (MongoDB 消息存储)和<aggregator> (聚合)使用自定义release-strategy类似于以下内容:spring-doc.cadn.net.cn

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

对于 1000 条简单消息,它会产生类似于以下结果的结果:spring-doc.cadn.net.cn

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从 5.5 版本开始,所有持久的MessageGroupStore实现提供了一个streamMessagesForGroup(Object groupId)基于目标数据库流 API 的合约。 当存储中的组非常大时,这提高了资源利用率。 在框架内部,当 Delayer 在启动时重新安排持久化消息时,将使用此新 API(例如)。 返回Stream<Message<?>>必须在处理结束时关闭,例如通过try-with-resources. 每当PersistentMessageGroup,则其streamMessages()委托到MessageGroupStore.streamMessagesForGroup().spring-doc.cadn.net.cn

消息组条件

从 5.5 版本开始,MessageGroup抽象提供了一个conditionstring 选项。 此选项的值可以是稍后出于任何原因可以解析以为组做出决策的任何值。 例如,一个ReleaseStrategy相关消息处理程序可以从组中查询此属性,而不是循环访问组中的所有消息。 这MessageGroupStore公开一个setGroupCondition(Object groupId, String condition)应用程序接口。 为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项已添加到AbstractCorrelatingMessageHandler. 在将每条消息添加到组以及组的现有条件后,将根据每条消息进行评估。实现可以决定返回新值、现有值或将目标条件重置为null. 一个condition可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,FileMarkerReleaseStrategy文件聚合器组件中,将条件填充到组中FileHeaders.LINE_COUNTFileSplitter.FileMarker.Mark.END消息并从其canRelease()将组大小与此条件下的值进行比较。这样,它就不会循环访问组中的所有消息来查找FileSplitter.FileMarker.Mark.END消息与FileHeaders.LINE_COUNT页眉。 它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。spring-doc.cadn.net.cn

此外,为了配置方便,一个GroupConditionProvider合同已经引入。 这AbstractCorrelatingMessageHandler检查是否提供了ReleaseStrategy实现此接口并提取conditionSupplier用于组条件评估逻辑。spring-doc.cadn.net.cn

元数据存储

许多外部系统、服务或资源(Twitter、RSS、文件系统等)不是事务性的,并且无法将数据标记为已读。 此外,有时,您可能需要在某些集成解决方案中实现企业集成模式幂等接收器。 为了实现此目标并在下次与外部系统交互之前存储端点的一些先前状态或处理下一条消息,Spring Integration 提供了元数据存储组件作为org.springframework.integration.metadata.MetadataStore与通用键值合约的接口。spring-doc.cadn.net.cn

元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个源条目的发布日期),以帮助源适配器等组件处理重复项。 如果组件没有直接提供对MetadataStore,定位元数据存储的算法如下:首先,寻找一个带有metadataStore应用程序上下文中的 ID。 如果找到,请使用它。 否则,请创建一个SimpleMetadataStore,这是一种内存中实现,仅在当前正在运行的应用程序上下文的生命周期内保留元数据。 这意味着,在重新启动时,您最终可能会收到重复的条目。spring-doc.cadn.net.cn

如果需要在应用程序上下文重启之间保留元数据,则框架会提供以下持久性MetadataStores:spring-doc.cadn.net.cn

PropertiesPersistingMetadataStore由属性文件和PropertiesPersister.spring-doc.cadn.net.cn

默认情况下,它仅在应用程序上下文正常关闭时保留状态。 它实现Flushable这样您就可以通过调用flush(). 以下示例演示如何使用 XML 配置“PropertiesPersistingMetadataStore”:spring-doc.cadn.net.cn

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

或者,您可以提供自己的MetadataStore接口(例如JdbcMetadataStore),并将其配置为应用程序上下文中的 bean。spring-doc.cadn.net.cn

从 4.0 版本开始,SimpleMetadataStore,PropertiesPersistingMetadataStoreRedisMetadataStore实现ConcurrentMetadataStore. 这些提供原子更新,并且可以跨多个组件或应用程序实例使用。spring-doc.cadn.net.cn

幂等接收器和元数据存储

元数据存储对于实现 EIP 幂等接收器模式非常有用,当需要过滤传入消息(如果它已被处理)并且您可以丢弃它或在丢弃时执行其他逻辑时。 以下配置显示了如何执行此作的示例:spring-doc.cadn.net.cn

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

value幂等条目可能是一个到期日期,之后该条目应由某个计划的收割者从元数据存储中删除。spring-doc.cadn.net.cn

MetadataStoreListener

某些元数据存储(目前仅限 zookeeper)支持注册侦听器以在项目更改时接收事件,如以下示例所示:spring-doc.cadn.net.cn

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

有关更多信息,请参阅 Javadoc。 这MetadataStoreListenerAdapter如果您只对事件的子集感兴趣,则可以进行子类化。spring-doc.cadn.net.cn

控制总线

正如 Enterprise Integration Patterns (EIP) 一书中所述,控制总线背后的思想是,与用于“应用程序级”消息传递相同的消息传递系统可用于监视和管理框架内的组件。 在 Spring Integration 中,我们基于上述适配器进行构建,以便您可以发送消息作为调用公开作的一种方式。spring-doc.cadn.net.cn

以下示例显示如何使用 XML 配置控制总线:spring-doc.cadn.net.cn

<int:control-bus input-channel="operationChannel"/>

控制总线有一个输入通道,可以访问该通道以调用应用程序上下文中对 bean 的作。 它还具有服务激活终结点的所有公共属性。 例如,如果作结果具有要发送到下游通道的返回值,则可以指定输出通道。spring-doc.cadn.net.cn

控制总线在输入通道上作为 Spring Expression Language (SpEL) 表达式运行消息。 它接受一条消息,将正文编译为表达式,添加一些上下文,然后运行它。 默认上下文支持任何已使用@ManagedAttribute@ManagedOperation. 它还支持 Spring 的Lifecycle接口(及其Pausable扩展,从 5.2 版开始),并且它支持用于配置 Spring 的几个TaskExecutorTaskScheduler实现。 确保控制总线可以使用您自己的方法的最简单方法是使用@ManagedAttribute@ManagedOperation附注。 由于这些注释也用于将方法公开给 JMX MBean 注册表,因此它们提供了一个方便的副产品:通常,您想要向控制总线公开的相同类型的作对于通过 JMX 公开是合理的)。 应用程序上下文中任何特定实例的解析都是在典型的 SpEL 语法中实现的。 为此,请为 bean 名称提供 bean 名称和 bean 的 SpEL 前缀 ()。 例如,要在 Spring Bean 上执行方法,客户端可以向作通道发送消息,如下所示:@spring-doc.cadn.net.cn

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

表达式上下文的根是Message本身,因此您还可以访问payloadheaders作为表达式中的变量。这与 Spring Integration 端点中的所有其他表达式支持一致。spring-doc.cadn.net.cn

使用 Java 注释,您可以按如下方式配置控制总线:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

同样,您可以按如下方式配置 Java DSL 流定义:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from("controlBus")
              .controlBus()
              .get();
}

如果您更喜欢将 lambda 与 automatic 一起使用DirectChannel创建时,您可以按如下方式创建控制总线:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

在这种情况下,通道名为controlBus.input.spring-doc.cadn.net.cn

有序关机

如“MBean Exporter”中所述,MBean 导出器提供了一个名为stopActiveComponents,用于有序停止应用程序。 该作具有单个Long参数。 该参数指示作等待允许正在进行的消息完成的时间(以毫秒为单位)。 该作的工作原理如下:spring-doc.cadn.net.cn

  1. beforeShutdown()在实现的所有 bean 上OrderlyShutdownCapable.spring-doc.cadn.net.cn

    这样做可以让此类组件为关闭做好准备。 实现此接口的组件及其对此调用执行的作的示例包括停止其侦听器容器的 JMS 和 AMQP 消息驱动适配器、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、删除(记录)收到的任何新消息的 TCP 入站端点以及返回503 - Service Unavailable对于任何新请求。spring-doc.cadn.net.cn

  2. 停止任何活动通道,例如 JMS 或 AMQP 支持的通道。spring-doc.cadn.net.cn

  3. 全部停止MessageSource实例。spring-doc.cadn.net.cn

  4. 停止所有入站MessageProducers(不是OrderlyShutdownCapable).spring-doc.cadn.net.cn

  5. 等待剩余时间,由Long参数传递给作。spring-doc.cadn.net.cn

    这样做可以让任何正在进行的消息完成它们的旅程。 因此,在调用此作时选择适当的超时非常重要。spring-doc.cadn.net.cn

  6. afterShutdown()在所有OrderlyShutdownCapable组件。spring-doc.cadn.net.cn

    这样做可以让此类组件执行最终关闭任务(例如,关闭所有打开的套接字)。spring-doc.cadn.net.cn

Orderly Shutdown Managed Operation 中所述,可以使用 JMX 调用此作。 如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对IntegrationMBeanExporter. 如果没有id属性在<int-jmx:mbean-export/>定义,bean 有一个生成的名称。 此名称包含一个随机组件,以避免ObjectName如果同一个JVM中存在多个Spring Integration上下文(MBeanServer).spring-doc.cadn.net.cn

因此,如果您希望以编程方式调用该方法,我们建议您向导出器提供id属性,以便您可以在应用程序上下文中轻松访问它。spring-doc.cadn.net.cn

最后,可以使用<control-bus>元素。 有关详细信息,请参阅监控 Spring Integration 示例应用程序spring-doc.cadn.net.cn

前面描述的算法在 4.1 版本中得到了改进。 以前,所有任务执行器和调度程序都已停止。 这可能会导致流中消息QueueChannel实例保留。 现在,关闭使轮询器运行,让这些消息被清空和处理。

积分图

从版本 4.3 开始,Spring Integration 提供对应用程序运行时对象模型的访问,该模型可以选择包含组件指标。 它以图形的形式公开,可用于可视化集成应用程序的当前状态。 这o.s.i.support.management.graphpackage 包含收集、构建和呈现 Spring Integration 组件的运行时状态所需的所有类,作为单个树状Graph对象。 这IntegrationGraphServer应声明为 bean 来构建、检索和刷新Graph对象。 由此产生的Graph对象可以序列化为任何格式,尽管 JSON 在客户端解析和表示起来灵活且方便。仅具有默认组件的 Spring Integration 应用程序将公开如下图:spring-doc.cadn.net.cn

{
  "contentDescriptor" : {
    "providerVersion" : "6.0.9",
    "providerFormatVersion" : 1.2,
    "provider" : "spring-integration",
    "name" : "myAppName:1.0"
  },
  "nodes" : [ {
    "nodeId" : 1,
    "componentType" : "null-channel",
    "integrationPatternType" : "null_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 0.0,
        "max" : 0.0
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "receiveCounters" : {
      "successes" : 0,
      "failures" : 0
    },
    "name" : "nullChannel"
  }, {
    "nodeId" : 2,
    "componentType" : "publish-subscribe-channel",
    "integrationPatternType" : "publish_subscribe_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 7.807002,
        "max" : 7.807002
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorChannel"
  }, {
    "nodeId" : 3,
    "componentType" : "logging-channel-adapter",
    "integrationPatternType" : "outbound_channel_adapter",
    "integrationPatternCategory" : "messaging_endpoint",
    "properties" : { },
    "output" : null,
    "input" : "errorChannel",
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 6.742722,
        "max" : 6.742722
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorLogger"
  } ],
  "links" : [ {
    "from" : 2,
    "to" : 3,
    "type" : "input"
  } ]
}
5.2 版弃用了旧指标,转而使用微米表,如指标管理所述。 旧版指标已在 5.4 版中删除,将不再显示在图表中。

在前面的示例中,图形由三个顶级元素组成。spring-doc.cadn.net.cn

contentDescriptorgraph 元素包含有关提供数据的应用程序的一般信息。 这name可以在IntegrationGraphServerbean 或spring.application.name应用程序上下文环境属性。 框架提供了其他属性,可让您将类似模型与其他来源区分开来。spring-doc.cadn.net.cn

linksgraph 元素表示节点之间的连接,来自nodesgraph 元素,因此,在源 Spring Integration 应用程序中的集成组件之间。 例如,从MessageChannel设置为EventDrivenConsumer与一些MessageHandler或从AbstractReplyProducingMessageHandler设置为MessageChannel. 为方便起见并让您确定链接的用途,该模型包括type属性。 可能的类型包括:spring-doc.cadn.net.cn

  • input:标识方向MessageChannel到端点,inputChannelrequestChannel属性spring-doc.cadn.net.cn

  • output:从MessageHandler,MessageProducerSourcePollingChannelAdapterMessageChannel通过outputChannelreplyChannel属性spring-doc.cadn.net.cn

  • error:从MessageHandlerPollingConsumerMessageProducerSourcePollingChannelAdapterMessageChannel通过errorChannel财产;spring-doc.cadn.net.cn

  • discard:从DiscardingMessageHandler(例如MessageFilter) 设置为MessageChannel通过errorChannel财产。spring-doc.cadn.net.cn

  • route:从AbstractMappingMessageRouter(例如HeaderValueRouter) 设置为MessageChannel. 似output但在运行时确定。 可能是配置的通道映射或动态解析的通道。 为此,路由器通常最多只保留 100 个动态路由,但您可以通过将dynamicChannelLimit财产。spring-doc.cadn.net.cn

可视化工具可以使用来自此元素的信息来渲染节点之间的连接,从nodesgraph 元素,其中fromto数字表示nodeId链接节点的属性。 例如,link元素可用于确定适当的port在目标节点上。spring-doc.cadn.net.cn

以下“文本图像”显示了类型之间的关系:spring-doc.cadn.net.cn

              +---(discard)
              |
         +----o----+
         |         |
         |         |
         |         |
(input)--o         o---(output)
         |         |
         |         |
         |         |
         +----o----+
              |
              +---(error)

nodesgraph 元素可能是最有趣的,因为它的元素不仅包含运行时组件及其componentTypeinstances 和name值,但也可以选择包含组件公开的指标。 节点元素包含各种通常不言自明的属性。 例如,基于表达式的组件包括expression包含组件的主表达式字符串的属性。 要启用指标,请添加@EnableIntegrationManagement设置为@Configuration类或添加<int:management/>元素添加到您的 XML 配置中。 有关完整信息,请参阅指标和管理。spring-doc.cadn.net.cn

nodeId表示唯一的增量标识符,以便将一个组件与另一个组件区分开来。 它也用于links元素来表示此组件与其他组件(如果有)的关系(连接)。 这inputoutput属性用于inputChanneloutputChannel属性AbstractEndpoint,MessageHandler,SourcePollingChannelAdapterMessageProducerSupport. 有关详细信息,请参阅下一节。spring-doc.cadn.net.cn

从 5.1 版本开始,IntegrationGraphServer接受Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback用于填充IntegrationNode对于特定的NamedComponent. 例如,您可以公开SmartLifecycle autoStartuprunningproperties 添加到目标图中:spring-doc.cadn.net.cn

server.setAdditionalPropertiesCallback(namedComponent -> {
            Map<String, Object> properties = null;
            if (namedComponent instanceof SmartLifecycle) {
                SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
                properties = new HashMap<>();
                properties.put("auto-startup", smartLifecycle.isAutoStartup());
                properties.put("running", smartLifecycle.isRunning());
            }
            return properties;
        });

图形运行时模型

Spring Integration 组件具有不同级别的复杂性。 例如,任何轮询的MessageSource还有一个SourcePollingChannelAdapterMessageChannel定期从源数据向其发送消息。 其他组件可能是中间件请求-回复组件(例如JmsOutboundGateway) 与消耗AbstractEndpoint订阅(或轮询)的requestChannel (input) 表示消息,以及replyChannel (output) 生成要向下游发送的回复消息。 同时,任何MessageProducerSupport实现(例如ApplicationEventListeningMessageProducer) 包装一些源协议监听逻辑并将消息发送到outputChannel.spring-doc.cadn.net.cn

在图中,Spring Integration 组件通过使用IntegrationNode类层次结构,您可以在o.s.i.support.management.graph包。 例如,您可以使用ErrorCapableDiscardingMessageHandlerNode对于AggregatingMessageHandler(因为它有一个discardChannel选项),并且在从PollableChannel通过使用PollingConsumer. 另一个例子是CompositeMessageHandlerNode— 对于一个MessageHandlerChain当订阅SubscribableChannel通过使用EventDrivenConsumer.spring-doc.cadn.net.cn

@MessagingGateway(参见 消息传递网关)为其每个方法提供节点,其中name属性基于网关的 Bean 名称和短方法签名。 请考虑以下网关示例:
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

	void foo(String foo);

	void foo(Integer foo);

	void bar(String bar);

}

上述网关生成类似于以下内容的节点:spring-doc.cadn.net.cn

{
  "nodeId" : 10,
  "name" : "gate.bar(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 11,
  "name" : "gate.foo(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 12,
  "name" : "gate.foo(class java.lang.Integer)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
}

您可以使用此IntegrationNode用于在客户端解析图形模型以及了解一般 Spring Integration 运行时行为的层次结构。 另请参阅编程提示和技巧以获取更多信息。spring-doc.cadn.net.cn

5.3 版引入了IntegrationPattern抽象和所有开箱即用的组件(代表企业集成模式 (EIP))实现此抽象并提供IntegrationPatternType枚举值。 此信息对于目标应用程序中的某些分类逻辑很有用,或者,在图形节点中公开时,UI 可以使用它来确定如何绘制组件。spring-doc.cadn.net.cn

集成图控制器

如果您的应用程序是基于 Web 的(或构建在具有嵌入式 Web 容器的 Spring Boot 之上),并且 Spring Integration HTTP 或 WebFlux 模块(分别参见 HTTP SupportWebFlux Support)存在于类路径上,则可以使用IntegrationGraphController以公开IntegrationGraphServer功能作为 REST 服务。 为此,该@EnableIntegrationGraphController@Configuration类注释和<int-http:graph-controller/>XML 元素在 HTTP 模块中可用。 与@EnableWebMvc注释(或<mvc:annotation-driven/>对于 XML 定义),此配置注册一个IntegrationGraphController @RestController其中,其@RequestMapping.path可以在@EnableIntegrationGraphController注释或<int-http:graph-controller/>元素。 默认路径是/integration.spring-doc.cadn.net.cn

IntegrationGraphController @RestController提供以下服务:spring-doc.cadn.net.cn

  • @GetMapping(name = "getGraph"):检索自上次以来 Spring Integration 组件的状态IntegrationGraphServer刷新。 这o.s.i.support.management.graph.Graph作为@ResponseBodyREST 服务的。spring-doc.cadn.net.cn

  • @GetMapping(path = "/refresh", name = "refreshGraph"):刷新当前Graph为实际运行时状态,并将其作为 REST 响应返回。 无需刷新指标的图形。 检索图形时,它们是实时提供的。 如果自上次检索图形以来已修改应用程序上下文,则可以调用 Refresh。 在这种情况下,图形将完全重建。spring-doc.cadn.net.cn

您可以为IntegrationGraphController使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件。 以下示例实现了这些目标:spring-doc.cadn.net.cn

<mvc:annotation-driven />

<mvc:cors>
	<mvc:mapping path="/myIntegration/**"
				 allowed-origins="http://localhost:9090"
				 allowed-methods="GET" />
</mvc:cors>

<security:http>
    <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

以下示例显示了如何对 Java 配置执行相同的作:spring-doc.cadn.net.cn

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
	    http
            .authorizeRequests()
               .antMatchers("/testIntegration/**").hasRole("ADMIN")
            // ...
            .formLogin();
    }

    //...

}

请注意,为方便起见,该@EnableIntegrationGraphController注释提供了一个allowedOrigins属性。 这提供了GET访问path. 为了更复杂,您可以使用标准 Spring MVC 机制配置 CORS 映射。spring-doc.cadn.net.cn