系统管理
系统管理
指标和管理
本节介绍如何捕获 Spring Integration 的指标。 在最近的版本中,我们更加依赖 Micrometer(参见 https://micrometer.io),我们计划在未来的版本中更多地使用 Micrometer。
在高容量环境中禁用日志记录
您可以在主消息流中控制调试日志记录。
在非常大容量的应用程序中,对于某些日志记录子系统,调用 to 可能非常昂贵。
您可以禁用所有此类日志记录以避免此开销。
异常日志记录 (debug 或其他) 不受此设置的影响。isDebugEnabled()
以下清单显示了用于控制日志记录的可用选项:
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)
1 | 设置为 以禁用主消息流中的所有日志记录,而不管日志系统类别设置如何。
设置为 'true' 以启用调试日志记录(如果日志记录子系统也启用了)。
仅在未在 Bean 定义中显式配置设置时应用。
默认值为 .false true |
defaultLoggingEnabled 仅当尚未在 Bean 定义中显式配置相应的设置时,才适用。 |
千分尺集成
概述
从版本 5.0.3 开始,应用程序上下文中存在的 Micrometer 会触发对 Micrometer 度量的支持。MeterRegistry
要使用 Micrometer,请将其中一个 bean 添加到应用程序上下文中。MeterRegistry
对于每个 和 ,都会注册计时器。
对于每个 ,都会注册一个计数器。MessageHandler
MessageChannel
MessageSource
这仅适用于扩展 、 、 和 的对象(大多数框架组件都是这种情况)。AbstractMessageHandler
AbstractMessageChannel
AbstractMessageSource
消息通道上发送操作的 Meter 具有以下名称或标签:Timer
-
name
:spring.integration.send
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
(结果异常表示通道的操作返回了 。failure
none
send()
false
用于轮询消息通道上的接收操作的 Meters 具有以下名称或标签:Counter
-
name
:spring.integration.receive
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Messages received
消息处理程序上的操作计量具有以下名称或标签:Timer
-
name
:spring.integration.send
-
tag
:type:handler
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
消息源的计量器具有以下名称/标签:Counter
-
name
:spring.integration.receive
-
tag
:type:source
-
tag
:name:<componentName>
-
tag
:result:success
-
tag
:exception:none
-
description
:Messages received
此外,还有三个 Meter:Gauge
-
spring.integration.channels
:应用程序中的 数量。MessageChannels
-
spring.integration.handlers
:应用程序中的 数量。MessageHandlers
-
spring.integration.sources
:应用程序中的 数量。MessageSources
通过提供 的子类,可以自定义由集成组件创建的名称和标记。
MicrometerCustomMetricsTests 测试用例显示了如何执行此操作的简单示例。
您还可以通过重载 builder 子类上的方法来进一步自定义计量器。Meters
MicrometerMetricsCaptor
build()
从版本 5.1.13 开始,公开了队列大小和剩余容量的 Micrometer 仪表:QueueChannel
-
name
:spring.integration.channel.queue.size
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The size of the queue channel
和
-
name
:spring.integration.channel.queue.remaining.capacity
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The remaining capacity of the queue channel
禁用仪表
默认情况下,所有计量在首次使用时都会注册。
现在,使用 Micrometer,您可以向 中添加 s 以防止部分或全部被注册。
您可以按提供的任何属性、 、 等过滤 (拒绝) 仪表。
有关更多信息,请参阅 Micrometer 文档中的 Meter Filters。MeterFilter
MeterRegistry
name
tag
例如,给定:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
您可以通过以下方式禁止此通道的计量注册:
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
千分尺观察
从版本 6.0 开始, Spring 集成利用了千分尺观察抽象,它可以通过适当的配置来处理度量和跟踪。ObservationHandler
只要应用程序上下文中存在 bean 并且配置了 ,就会在组件上启用观察处理。
要自定义应检测的组件集,需要在 Comments 上公开一个属性。
有关模式匹配算法,请参阅其 javadocs。IntegrationManagement
ObservationRegistry
@EnableIntegrationManagement
observationPatterns()
@EnableIntegrationManagement
默认情况下,没有组件使用 Bean 进行检测。
可以配置为匹配所有组件。IntegrationManagement ObservationRegistry * |
在这种情况下,计量不是独立收集的,而是委托给提供的 .ObservationHandler
ObservationRegistry
以下 Spring 集成组件使用 observe logic 进行检测,每个组件都有各自的约定:
-
MessageProducerSupport
作为流的入站终端节点,被视为 span 类型并使用 API;CONSUMER
IntegrationObservation.HANDLER
-
MessagingGatewaySupport' 是入站请求-回复终端节点,被视为 span 类型。 它使用 API;
SERVER
IntegrationObservation.GATEWAY
-
操作是它生成消息的唯一 Spring 集成 API。 因此,它被视为 span 类型并使用 API。 当通道是分布式实现时(例如 或 ),并且必须将跟踪信息添加到消息中。 因此,观察是基于 Spring Integration 提供的地方,以允许后续跟踪添加 Headers,以便它们可供消费者使用;
AbstractMessageChannel.send()
PRODUCER
IntegrationObservation.PRODCUER
PublishSubscribeKafkaChannel
ZeroMqChannel
IntegrationObservation.PRODUCER
MessageSenderContext
MutableMessage
Propagator
-
An 是 span 类型,使用 API。
AbstractMessageHandler
CONSUMER
IntegrationObservation.HANDLER
组件的观察生产可以通过配置进行定制。
例如,an 期望通过其 API.IntegrationManagement
ObservationConvention
AbstractMessageHandler
MessageReceiverObservationConvention
setObservationConvention()
以下是观察 API 支持的指标、跨度和约定:
可观测性 - 指标
您可以在下面找到此项目声明的所有指标的列表。
网关
对入站消息网关的观察。
度量名称(由 convention class 定义)。键入 。spring.integration.gateway
o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
timer
度量名称(由 convention class 定义)。键入 。spring.integration.gateway.active
o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
long task timer
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
Micrometer 内部用于 baseunit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)nanoseconds |
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息网关组件的名称。 |
|
请求/回复执行的结果。 |
|
组件类型 - 'gateway'。 |
处理器
消息处理程序的观察。
度量名称(由 convention class 定义)。键入 。spring.integration.handler
o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
timer
度量名称(由 convention class 定义)。键入 。spring.integration.handler.active
o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
long task timer
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
Micrometer 内部用于 baseunit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)nanoseconds |
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'handler'。 |
制作人
对消息生产者(例如通道)的观察。
度量名称(由 convention class 定义)。键入 。spring.integration.producer
o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
timer
度量名称(由 convention class 定义)。键入 。spring.integration.producer.active
o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
long task timer
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
Micrometer 内部用于 baseunit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)nanoseconds |
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'producer'。 |
可观测性 - Span
您可以在下面找到此项目声明的所有 span 的列表。
网关跨度
对入站消息网关的观察。
Span name (由 convention class 定义)。spring.integration.gateway
o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息网关组件的名称。 |
|
请求/回复执行的结果。 |
|
组件类型 - 'gateway'。 |
处理程序 Span
消息处理程序的观察。
Span name (由 convention class 定义)。spring.integration.handler
o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'handler'。 |
生产者跨度
对消息生产者(例如通道)的观察。
Span name (由 convention class 定义)。spring.integration.producer
o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
封闭类 的完全限定名称 。o.s.i.support.management.observation.IntegrationObservation
所有标签都必须以 prefix 为前缀!spring.integration. |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'producer'。 |
可观测性 - 约定
您可以在下面找到该项目声明的所有列表。GlobalObservationConvention
ObservationConvention
ObservationConvention 类名 |
适用的 ObservationContext 类名 |
|
|
|
|
|
|
|
|
|
|
|
|
观测传播
为了在一个跟踪中提供连接的 span 链,独立于消息传递流的性质, Spring 集成提供了一个实现。
这可以在 bean 上单独配置,也可以作为具有相应 bean 名称模式匹配的 bean 进行配置。
这个拦截器的目标是将一个从 producer 线程传播到消费者线程,而与实现和性质无关。
但是,A 会被忽略,因为它的使用者直接在 producer 线程上执行。ObservationPropagationChannelInterceptor
MessageChannnel
@GlobalChannelInterceptor
MessageChannnel
Observation
MessageChannnel
DirectChannel
Spring 集成 JMX 支持
另请参阅 JMX 支持。
消息历史记录
消息传递体系结构的主要优点是松散耦合,因此参与的组件不会保持对彼此的任何感知。 仅这一事实就使应用程序非常灵活,允许您在不影响流的其余部分的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等等。 然而,当出现问题时,这种不起眼的架构风格可能会很困难。 在调试时,您可能希望获得尽可能多的有关消息的信息 (其来源、它遍历的通道和其他详细信息)。
Message History 是一种有用的模式,它为您提供了一个选项来保持对消息路径的某种程度的感知,用于调试目的或维护审计跟踪。 Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加 Headers 并在每次消息通过跟踪组件时更新该 Header。
消息历史记录配置
要启用消息历史记录,您只需在配置中定义元素 (或 ),如以下示例所示:message-history
@EnableMessageHistory
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>
现在,每个命名组件(定义了 'id')都会被跟踪。
框架会在您的消息中设置 'history' 标头。
其值 a .List<Properties>
请考虑以下配置示例:
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>
前面的配置生成一个简单的消息历史记录结构,其输出类似于以下内容:
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]
要访问消息历史记录,您只需访问标头。
以下示例显示了如何执行此操作:MessageHistory
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
您可能不想跟踪所有组件。
要根据组件的名称将历史记录限制为某些组件,您可以提供该属性并指定与要跟踪的组件匹配的组件名称和模式的逗号分隔列表。
以下示例显示了如何执行此操作:tracked-components
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>
在前面的示例中,仅维护以“Gateway”结尾、以“sample”开头或与名称“aName”完全匹配的组件的消息历史记录。
此外,Bean 现在由 (请参阅 MBean Exporter) 作为 JMX MBean 公开,允许您在运行时更改模式。
但是请注意,必须停止 Bean (关闭消息历史记录) 才能更改模式。
此功能对于临时打开历史记录以分析系统可能很有用。
MBean 的对象名称为 。MessageHistoryConfigurer
IntegrationMBeanExporter
<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer
在应用程序上下文中,只有一个 (或 ) 必须声明为组件跟踪配置的单一源。
不要对 .@EnableMessageHistory <message-history/> MessageHistoryConfigurer |
根据定义,消息历史记录标头是不可变的(您不能重写历史记录)。 因此,在写入消息历史记录值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史记录,对其进行修改并在回复消息上设置新列表。 在任一情况下,即使消息本身跨越线程边界,也可以追加这些值。 这意味着 history 值可以极大地简化异步消息流中的调试。 |
消息存储
Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。
例如,聚合器会缓冲消息,直到它们可以被释放,而 会缓冲消息,直到使用者明确地从该通道接收这些消息。
由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。QueueChannel
为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久性存储(例如 RDBMS)中。
Spring 集成通过以下方式为消息存储模式提供支持:
-
定义策略接口
org.springframework.integration.store.MessageStore
-
提供此接口的多种实现
-
在具有缓冲消息功能的所有组件上公开属性,以便您可以注入实现该接口的任何实例。
message-store
MessageStore
手册中详细介绍了如何配置特定的消息存储实现以及如何将实现注入到特定的缓冲组件中(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。
以下一对示例显示了如何为 和 聚合器添加对消息存储的引用:MessageStore
QueueChannel
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
默认情况下,消息使用 、 的实现存储在内存中。
这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失并不是一个问题。
但是,典型的生产应用程序需要一个更健壮的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。
因此,我们还为各种数据存储提供了实现。
以下是支持的实施的完整列表:o.s.i.store.SimpleMessageStore
MessageStore
MessageStore
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储:使用 RDBMS 存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
但是,在使用 . Message 数据(payload 和 headers)使用不同的序列化策略进行序列化和反序列化,具体取决于 .
例如,使用 时,默认情况下仅保留数据。
在这种情况下,在序列化发生之前会删除 non-Serializable headers。
此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。
例如,将 HTTP 标头映射到消息标头,其中一个是不可序列化的实例。
但是,您可以将自己的 和 strategy 接口的实现注入到某些实现(例如 )中,以更改序列化和反序列化的行为。 请特别注意表示某些类型数据的标头。
例如,如果其中一个 Headers 包含某个 Spring bean 的实例,则在反序列化时,您最终可能会得到该 Bean 的不同实例,这直接影响框架创建的某些隐式 Headers(例如 or )。
目前,它们是不可序列化的,但是,即使它们是可序列化的,反序列化的通道也不会表示预期的实例。 从 Spring 集成版本 3.0 开始,您可以通过配置标头丰富器来解决此问题,以便在向 . 此外,请考虑按如下方式配置 message-flow 时会发生什么:gateway → queue-channel(由持久 Message Store 支持)→ service-activator。
该网关会创建一个临时回复通道,当服务激活器的 Poller 从队列中读取时,该通道将丢失。
同样,您可以使用 header enricher 将 Headers 替换为表示形式。 有关更多信息,请参阅 Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:实现特定于实例的操作QueueChannel
-
PriorityCapableChannelMessageStore
:标记要用于实例的实现,并为持久化消息提供优先级顺序。MessageStore
PriorityChannel
实际行为取决于实现。
该框架提供了以下实现,这些实现可用作 和 的持久性 :MessageStore
QueueChannel
PriorityChannel
注意事项
SimpleMessageStore 从版本 4.1 开始,在调用 .
对于大型消息组,这是一个严重的性能问题。
4.0.1 引入了一个 boolean 属性,可让您控制此行为。
当聚合器在内部使用时,此属性设置为以提高性能。
现在是默认的。 现在,在组件(如聚合器)之外访问组存储的用户可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部操纵组可能会导致不可预知的结果。 因此,您不应执行此类操作或将属性设置为 。 |
用MessageGroupFactory
从版本 4.3 开始,一些实现可以注入自定义策略来创建和自定义 .
这默认为 a ,它基于 () 内部集合生成实例。
其他可能的选项包括 和 ,其中最后一个选项可用于恢复之前的行为。
此外,该选项也可用。
有关更多信息,请参阅下一节。
从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,该选项也可用。MessageGroupStore
MessageGroupFactory
MessageGroup
MessageGroupStore
SimpleMessageGroupFactory
SimpleMessageGroup
GroupType.HASH_SET
LinkedHashSet
SYNCHRONISED_SET
BLOCKING_QUEUE
SimpleMessageGroup
PERSISTENT
LIST
持久加载和延迟加载MessageGroupStore
从版本 4.3 开始,所有持久性实例都以延迟加载方式从存储中检索实例及其实例。
在大多数情况下,它对于关联实例(请参阅 Aggregator 和 Resequencer)很有用,此时它会增加在每个关联操作上从存储加载整个的开销。MessageGroupStore
MessageGroup
messages
MessageHandler
MessageGroup
您可以使用该选项从配置中关闭延迟加载行为。AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
我们对 MongoDB (MongoDB Message Store) 和 (Aggregator) 上的延迟加载性能测试使用类似于以下内容的自定义:MessageStore
<aggregator>
release-strategy
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于 1000 条简单消息,它会生成类似于以下内容的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,从版本 5.5 开始,所有持久化实现都提供基于目标数据库流式处理 API 的协定。
当 store 中的组非常大时,这可以提高资源利用率。
在框架内部,当 Delayer 在启动时重新安排持久消息时,将使用此新 API(例如)。
返回的 A 必须在处理结束时关闭,例如通过 .
每当使用 a 时,它都会委托给 .MessageGroupStore
streamMessagesForGroup(Object groupId)
Stream<Message<?>>
try-with-resources
PersistentMessageGroup
streamMessages()
MessageGroupStore.streamMessagesForGroup()
消息组条件
从版本 5.5 开始,抽象提供了一个 string 选项。
此选项的值可以是以后可以出于任何原因解析以为组做出决策的任何内容。
例如,来自关联消息处理程序的 a 可以从组中查阅此属性,而不是迭代组中的所有消息。
这暴露了一个 API。
为此,已向 .
在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。
实现可以决定返回新值、现有值或将目标条件重置为 。
的值可以是 JSON、SPEL 表达式、数字或任何可以序列化为字符串并在之后解析的东西。
例如,从 File Aggregator 组件,将条件从消息的标头填充到组中,并通过将组大小与此条件中的值进行比较来咨询该条件。
这样,它不会迭代组中的所有消息来查找带有 header 的消息。
它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。MessageGroup
condition
ReleaseStrategy
MessageGroupStore
setGroupCondition(Object groupId, String condition)
setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
AbstractCorrelatingMessageHandler
null
condition
FileMarkerReleaseStrategy
FileHeaders.LINE_COUNT
FileSplitter.FileMarker.Mark.END
canRelease()
FileSplitter.FileMarker.Mark.END
FileHeaders.LINE_COUNT
此外,为了便于配置,还引入了一个 Contract。
检查提供的是否实现了此接口,并提取了 for group 条件评估逻辑。GroupConditionProvider
AbstractCorrelatingMessageHandler
ReleaseStrategy
conditionSupplier
元数据存储
许多外部系统、服务或资源不是事务性的(Twitter、RSS、文件系统等),并且无法将数据标记为已读。
此外,有时,您可能需要在某些集成解决方案中实现 Enterprise Integration Pattern 幂等接收器。
为了实现这个目标并在与外部系统的下一次交互之前存储端点的一些先前状态或处理下一条消息, Spring 集成提供了元数据存储组件作为具有通用键值契约的接口的实现。org.springframework.integration.metadata.MetadataStore
元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个源条目的发布日期),以帮助源适配器等组件处理重复项。
如果未直接向组件提供对 a 的引用,则查找元数据存储的算法如下:首先,在应用程序上下文中查找具有 ID 的 bean。
如果找到,请使用它。
否则,请创建一个新实例 ,该实例是一个内存中实现,它仅在当前正在运行的应用程序上下文的生命周期内保留元数据。
这意味着,在重新启动时,您最终可能会得到重复的条目。MetadataStore
metadataStore
SimpleMetadataStore
如果需要在应用程序上下文重新启动之间保留元数据,框架将提供以下持久性:MetadataStores
-
PropertiesPersistingMetadataStore
它由属性文件和 PropertiesPersister
提供支持。PropertiesPersistingMetadataStore
默认情况下,它仅在应用程序上下文正常关闭时保留状态。
它实现后,您可以通过调用 .
以下示例显示了如何使用 XML 配置 'PropertiesPersistingMetadataStore':Flushable
flush()
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
或者,您可以提供自己的接口实现(例如,),并在应用程序上下文中将其配置为 bean。MetadataStore
JdbcMetadataStore
从版本 4.0 开始,、 和 implement .
这些实例提供原子更新,并且可以跨多个组件或应用程序实例使用。SimpleMetadataStore
PropertiesPersistingMetadataStore
RedisMetadataStore
ConcurrentMetadataStore
幂等接收器和元数据存储
当需要筛选传入消息(如果已处理)时,元数据存储对于实施 EIP 幂等接收器模式非常有用,您可以丢弃该消息或在丢弃时执行一些其他逻辑。 以下配置显示了如何执行此操作的示例:
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
幂等条目的 可以是过期日期,在此日期之后,该条目应由某个预定的收割者从元数据存储中删除。value
MetadataStoreListener
一些元数据存储(目前只有 zookeeper)支持注册侦听器以在项目更改时接收事件,如下例所示:
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
有关更多信息,请参阅 Javadoc。
如果您只对事件的子集感兴趣,则可以将其子类化。MetadataStoreListenerAdapter
控制总线
正如 Enterprise Integration Patterns (EIP) 一书中描述的,控制总线背后的思想是,可以使用与用于 “应用程序级” 消息传递相同的消息传递系统来监控和管理框架内的组件。 在 Spring 集成中,我们构建在上述适配器的基础上,以便您可以发送消息作为调用公开操作的一种方式。
以下示例说明如何使用 XML 配置控制总线:
<int:control-bus input-channel="operationChannel"/>
控制总线有一个 Importing 通道,可以访问该通道以调用应用程序上下文中的 bean 上的操作。 它还具有服务激活终端节点的所有通用属性。 例如,如果操作结果具有要发送到下游通道的返回值,则可以指定输出通道。
控制总线将 input 通道上的消息作为 Spring 表达式语言 (SpEL) 表达式运行。
它接受一条消息,将 body 编译为表达式,添加一些上下文,然后运行它。
默认上下文支持已使用 或 注释的任何方法。
它还支持 Spring 接口上的方法(以及自 5.2 版以来的扩展),并且它支持用于配置多个 Spring 和实现的方法。
确保你自己的方法对 control 总线可用的最简单方法是使用 or 注解。
由于这些注解也用于向 JMX MBean 注册表公开方法,因此它们提供了一个方便的副产品:通常,您希望向控制总线公开的相同类型的操作对于通过 JMX 公开是合理的。
应用程序上下文中任何特定实例的解析都是在典型的 SPEL 语法中实现的。
为此,请为 bean name 提供 bean 的 SPEL 前缀 () 。
例如,要在 Spring Bean 上执行方法,客户端可以向操作通道发送消息,如下所示:@ManagedAttribute
@ManagedOperation
Lifecycle
Pausable
TaskExecutor
TaskScheduler
@ManagedAttribute
@ManagedOperation
@
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
表达式上下文的根是 itself,因此您还可以访问表达式中的 the 和 as 变量。
这与 Spring 集成端点中的所有其他表达式支持一致。Message
payload
headers
使用 Java 注释,您可以按如下方式配置控制总线:
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同样,您可以按如下方式配置 Java DSL 流定义:
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
如果您更喜欢将 lambda 与自动创建结合使用,则可以按如下方式创建控制总线:DirectChannel
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
在本例中,通道名为 。controlBus.input
有序关机
如“MBean 导出器”中所述,MBean 导出器提供了一个名为 的 JMX 操作,该操作用于有序地停止应用程序。
该操作只有一个参数。
该参数指示操作等待多长时间(以毫秒为单位)以允许正在进行的消息完成。
该操作的工作原理如下:stopActiveComponents
Long
-
调用所有实现 .
beforeShutdown()
OrderlyShutdownCapable
这样做可以让此类组件为关闭做好准备。 实现此接口的组件示例以及它们对此调用执行的操作包括:停止其侦听器容器的 JMS 和 AMQP 消息驱动适配器、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、丢弃(记录)收到的任何新消息的 TCP 入站终端节点,以及为任何新请求返回的 HTTP 入站终端节点。
503 - Service Unavailable
-
停止任何活动通道,例如 JMS 或 AMQP 支持的通道。
-
停止所有实例。
MessageSource
-
停止所有入站 s(不是 )。
MessageProducer
OrderlyShutdownCapable
-
等待剩余的任何时间,如传递给操作的参数值所定义。
Long
这样做可以让任何正在进行的消息完成其旅程。 因此,在调用此操作时,选择适当的超时非常重要。
-
调用所有组件。
afterShutdown()
OrderlyShutdownCapable
这样做可以让这些组件执行最终的关闭任务(例如,关闭所有打开的套接字)。
如 Orderly Shutdown Managed Operation 中所述,可以使用 JMX 调用此操作。
如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对 .
如果定义上未提供任何属性,则 Bean 具有生成的名称。
如果同一个 JVM() 中存在多个 Spring 集成上下文,则此名称包含一个随机组件,以避免冲突。IntegrationMBeanExporter
id
<int-jmx:mbean-export/>
ObjectName
MBeanServer
因此,如果您希望以编程方式调用该方法,我们建议您为 exporter 提供一个属性,以便您可以在应用程序上下文中轻松访问它。id
最后,可以使用 element 调用该操作。
有关详细信息,请参阅监视 Spring 集成示例应用程序。<control-bus>
前面描述的算法在版本 4.1 中得到了改进。
以前,所有任务执行程序和计划程序都已停止。
这可能会导致实例中的中间流消息保留。
现在关闭使 poller 保持运行状态,以便排空和处理这些消息。QueueChannel |
集成图
从版本 4.3 开始, Spring 集成提供了对应用程序的运行时对象模型的访问,该模型可以选择包括组件度量。
它以图表的形式公开,可用于可视化集成应用程序的当前状态。
该包包含收集、构建和呈现 Spring 集成组件的运行时状态所需的所有类,作为单个树状对象。
应该将 声明为 bean 来构建、检索和刷新对象。
生成的对象可以序列化为任何格式,尽管 JSON 在客户端解析和表示起来很灵活和方便。
仅具有默认组件的 Spring 集成应用程序将公开一个图形,如下所示:o.s.i.support.management.graph
Graph
IntegrationGraphServer
Graph
Graph
{
"contentDescriptor" : {
"providerVersion" : "6.0.9",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}
版本 5.2 弃用了旧指标,转而使用千分尺,如 Metrics Management 中所述。 旧版指标已在版本 5.4 中删除,将不再显示在图表中。 |
在前面的示例中,图形由三个顶级元素组成。
graph 元素包含有关提供数据的应用程序的常规信息。
可以在 Bean 上或在应用程序上下文环境属性中进行自定义。
框架提供了其他属性,并允许您将类似模型与其他源区分开来。contentDescriptor
name
IntegrationGraphServer
spring.application.name
graph 元素表示 graph 元素中节点之间的连接,因此,也表示源 Spring Integration 应用程序中的集成组件之间的连接。
例如,从 a 到 an with some 或从 an 到 a 。
为了方便起见并让您确定链接的用途,该模型包括 the attribute。
可能的类型包括:links
nodes
MessageChannel
EventDrivenConsumer
MessageHandler
AbstractReplyProducingMessageHandler
MessageChannel
type
-
input
:标识从端点、 或属性的方向MessageChannel
inputChannel
requestChannel
-
output
:从 、 或通过 或 属性到MessageHandler
MessageProducer
SourcePollingChannelAdapter
MessageChannel
outputChannel
replyChannel
-
error
: 从 on 或 to the through an property;MessageHandler
PollingConsumer
MessageProducer
SourcePollingChannelAdapter
MessageChannel
errorChannel
-
discard
:从(例如 )到通过属性。DiscardingMessageHandler
MessageFilter
MessageChannel
errorChannel
-
route
:从 (例如 ) 到 . 类似于但在运行时确定。 可能是配置的 channel mapping 或动态解析的 channel。 为此,路由器通常最多只保留 100 个动态路由,但您可以通过设置该属性来修改此值。AbstractMappingMessageRouter
HeaderValueRouter
MessageChannel
output
dynamicChannelLimit
可视化工具可以使用此元素中的信息来呈现 graph 元素中节点之间的连接,其中 和 数字表示链接节点属性中的值。
例如,该元素可用于确定目标节点上的适当值。nodes
from
to
nodeId
link
port
下面的 “text image” 显示了类型之间的关系:
+---(discard) | +----o----+ | | | | | | (input)--o o---(output) | | | | | | +----o----+ | +---(error)
graph 元素可能是最有趣的,因为它的元素不仅包含运行时组件及其实例和值,还可以选择包含组件公开的度量。
Node 元素包含各种属性,这些属性通常一目了然。
例如,基于表达式的组件包括包含组件的主表达式字符串的属性。
要启用指标,请将 添加到 类 或将元素添加到 XML 配置中。
有关完整信息,请参阅 Metrics and Management。nodes
componentType
name
expression
@EnableIntegrationManagement
@Configuration
<int:management/>
这表示一个唯一的增量标识符,以便您区分一个组件和另一个组件。
它还在元素中用于表示此组件与其他组件的关系(连接)(如果有)。
的 and 属性用于 、 、 或 的 和 属性。
有关更多信息,请参阅下一节。nodeId
links
input
output
inputChannel
outputChannel
AbstractEndpoint
MessageHandler
SourcePollingChannelAdapter
MessageProducerSupport
从版本 5.1 开始, 接受 for for 特定 .
例如,您可以将 and 属性公开到目标图中:IntegrationGraphServer
Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback
IntegrationNode
NamedComponent
SmartLifecycle
autoStartup
running
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
Graph 运行时模型
Spring 集成组件具有不同的复杂程度。
例如,任何轮询的 a 和 a 都会定期从源数据向其发送消息。
其他组件可能是中间件请求-回复组件(例如 ),其中 consume 用于订阅(或轮询)() 以获取消息,以及 () 用于生成回复消息以发送到下游。
同时,任何实现(例如 )都会包装一些源协议侦听逻辑,并将消息发送到 .MessageSource
SourcePollingChannelAdapter
MessageChannel
JmsOutboundGateway
AbstractEndpoint
requestChannel
input
replyChannel
output
MessageProducerSupport
ApplicationEventListeningMessageProducer
outputChannel
在图中, Spring 集成组件通过使用类层次结构来表示,您可以在包中找到该层次结构。
例如,您可以将 用于 (因为它有一个选项),并且在使用 时可能会产生错误。
另一个示例是 — for a 当使用 .IntegrationNode
o.s.i.support.management.graph
ErrorCapableDiscardingMessageHandlerNode
AggregatingMessageHandler
discardChannel
PollableChannel
PollingConsumer
CompositeMessageHandlerNode
MessageHandlerChain
SubscribableChannel
EventDrivenConsumer
(请参见消息网关)为其每个方法提供节点,其中属性基于网关的 Bean 名称和短方法签名。
请考虑以下网关示例:@MessagingGateway name |
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
前面的网关生成类似于以下内容的节点:
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}
你可以使用这个层次结构来解析 Client 端的图形模型,以及了解一般的 Spring Integration 运行时行为。
有关更多信息,另请参阅编程提示和技巧。IntegrationNode
版本 5.3 引入了一个抽象,所有代表企业集成模式 (EIP) 的开箱即用组件都实现了这个抽象并提供一个枚举值。
此信息对于目标应用程序中的某些分类逻辑非常有用,或者,如果公开到图形节点中,则 UI 可以使用此信息来确定如何绘制组件。IntegrationPattern
IntegrationPatternType
集成图控制器
如果你的应用程序是基于 Web 的(或者构建在 Spring Boot 之上,带有嵌入式 Web 容器),并且 Spring 集成 HTTP 或 WebFlux 模块(分别参见 HTTP 支持和 WebFlux 支持)存在于 Classpath 上,则可以使用a将功能公开为 REST 服务。
为此,HTTP 模块中提供了 and 类注释和 XML 元素。
与 annotation (或 XML definitions) 一起,此配置注册了一个可以在 annotation 或 element 上配置 its 的位置。
默认路径为 .IntegrationGraphController
IntegrationGraphServer
@EnableIntegrationGraphController
@Configuration
<int-http:graph-controller/>
@EnableWebMvc
<mvc:annotation-driven/>
IntegrationGraphController
@RestController
@RequestMapping.path
@EnableIntegrationGraphController
<int-http:graph-controller/>
/integration
提供以下服务:IntegrationGraphController
@RestController
-
@GetMapping(name = "getGraph")
:检索自上次刷新以来 Spring 集成组件的状态。 作为 REST 服务的 a 返回。IntegrationGraphServer
o.s.i.support.management.graph.Graph
@ResponseBody
-
@GetMapping(path = "/refresh", name = "refreshGraph")
:刷新实际运行时状态的当前状态并将其作为 REST 响应返回。 无需刷新指标的图表。 检索图形时,会实时提供这些指标。 如果自上次检索图形以来修改了应用程序上下文,则可以调用 Refresh。 在这种情况下,图形将完全重新构建。Graph
您可以使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件为 设置安全性和跨域限制。
以下示例可实现这些目标:IntegrationGraphController
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />
以下示例显示了如何对 Java 配置执行相同的操作:
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
请注意,为方便起见,注解提供了一个属性。
这提供了对 .
为了更复杂,你可以使用标准的 Spring MVC 机制来配置 CORS 映射。@EnableIntegrationGraphController
allowedOrigins
GET
path