此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

Spring Integration 是数据的通用容器。 任何对象都可以作为有效负载提供,并且每个实例都包含包含用户可扩展属性的标头,作为键值对。MessageMessageSpring中文文档

界面Message

以下列表显示了接口的定义:MessageSpring中文文档

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

接口是 API 的核心部分。 通过将数据封装在通用包装器中,消息传递系统可以在不了解数据类型的情况下传递数据。 当应用程序发展为支持新类型时,或者当类型本身被修改或扩展时,消息传递系统不受影响。 另一方面,当消息传递系统中的某些组件确实需要访问有关 的信息时,通常可以将此类元数据存储到消息标头中的元数据中并从中检索。MessageMessageSpring中文文档

邮件头

正如 Spring Integration 允许将 any 用作 的有效负载一样,它也支持将任何类型用作标头值。 实际上,该类实现了 ,如以下类定义所示:ObjectMessageObjectMessageHeadersjava.util.Map_ interfaceSpring中文文档

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
即使该类实现了 ,它实际上也是一个只读实现。 对 Map 中的某个值的任何尝试都会导致 . 这同样适用于 和 。 由于消息可以传递给多个使用者,因此无法修改其结构。 同样,消息的有效负载不能在初始创建之后。 但是,标头值本身(或有效负载对象)的可变性有意留给框架用户决定。MessageHeadersMapputUnsupportedOperationExceptionremoveclearMapObjectset

作为 的实现,可以通过使用标头的名称调用来检索标头。 或者,您可以提供预期的作为附加参数。 更好的是,在检索预定义值之一时,可以使用方便的 getter。 以下示例显示了这三个选项中的每一个:Mapget(..)ClassSpring中文文档

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息标头:Spring中文文档

表 1.预定义的消息头
标头名称 标头类型 用法
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。 每次消息发生突变时都会更改。Spring中文文档

 MessageHeaders.
TIMESTAMP
 java.lang.Long

创建消息的时间。 每次消息发生突变时都会更改。Spring中文文档

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当未配置显式输出通道且没有或已耗尽时,将应答(如果有)发送到的通道。 如果值为 ,则它必须表示 Bean 名称或由ROUTING_SLIPROUTING_SLIPStringChannelRegistry.Spring中文文档

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

将错误发送到的通道。 如果值为 ,则它必须表示 Bean 名称或由StringChannelRegistry.Spring中文文档

许多入站和出站适配器实现还提供或期望某些标头,您可以配置其他用户定义的标头。 例如,可以在存在此类标头的模块中找到这些标头的常量。、 等。AmqpHeadersJmsHeadersSpring中文文档

MessageHeaderAccessor应用程序接口

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至模块中,并引入了 API 以提供消息传递实现的附加抽象。 所有(核心)特定于 Spring Integration 的消息标头常量现在都在类中声明。 下表描述了预定义的消息标头:spring-messagingMessageHeaderAccessorIntegrationMessageHeaderAccessorSpring中文文档

表 2.预定义的消息头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两条或多条消息。Spring中文文档

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常,带有一组消息的序列号,带有 但也可以用于对无界消息组进行重新排序。SEQUENCE_SIZE<resequencer/>Spring中文文档

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组相关消息中的消息数。Spring中文文档

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示邮件何时过期。 框架不直接使用,但可以使用标头 enricher 进行设置,并在配置了 .<filter/>UnexpiredMessageSelectorSpring中文文档

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级 — 例如,在 .PriorityChannelSpring中文文档

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果幂等接收器拦截器将消息检测为重复消息,则为 True。 请参阅幂等接收方企业集成模式Spring中文文档

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果消息与消息处理完成时应关闭的消息相关联,则存在此标头。 例如,与使用 FTP、SFTP 等的流式文件传输相关联。CloseableSessionSpring中文文档

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动的通道适配器支持配置 ,则此标头包含当前传递尝试。RetryTemplateSpring中文文档

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站终结点支持它,则回调以接受、拒绝或重新排队消息。 请参阅延迟确认可轮询消息源MQTT 手动确认Spring中文文档

类中提供了其中一些标头的方便类型化 getter,如以下示例所示:IntegrationMessageHeaderAccessorSpring中文文档

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在 但通常不被用户代码使用的标题(也就是说,它们通常由 Spring Integration 的内部部分使用 - 此处包含它们是为了完整性):IntegrationMessageHeaderAccessorSpring中文文档

表 3.预定义的消息头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

需要嵌套关联时使用的关联数据堆栈(例如,)。splitter→…​→splitter→…​→aggregator→…​→aggregatorSpring中文文档

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

请参阅 Routing SlipSpring中文文档

消息 ID 生成

当消息通过应用程序转换时,每次它都会发生突变(例如, 通过转换器)分配一个新的消息 ID。 消息 ID 为 . 从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的实现更有效。 它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。UUIDjava.util.UUID.randomUUID()Spring中文文档

可以通过声明在应用程序上下文中实现的 Bean 来选择不同的 UUID 生成策略。org.springframework.util.IdGeneratorSpring中文文档

类装入器中只能使用一种 UUID 生成策略。 这意味着,如果两个或多个应用程序上下文在同一个类装入器中运行,则它们共享相同的策略。 如果其中一个上下文更改了策略,则所有上下文都将使用它。 如果同一类装入器中的两个或多个上下文声明一个 bean 类型为 ,那么它们都必须是同一类的实例。 否则,尝试替换自定义策略的上下文将无法初始化。 如果策略相同,但已参数化,则使用要初始化的第一个上下文中的策略。org.springframework.util.IdGenerator

除了默认策略外,还提供了两个附加策略。 使用以前的机制。 当真正不需要 UUID 并且简单的递增值就足够了时,可以使用。IdGeneratorsorg.springframework.util.JdkIdGeneratorUUID.randomUUID()o.s.i.support.IdGenerators.SimpleIncrementingIdGeneratorSpring中文文档

只读标头

和 是只读标头,不能被覆盖。MessageHeaders.IDMessageHeaders.TIMESTAMPSpring中文文档

从版本 4.3.2 开始,提供了 API 来自定义不应从上游复制的标头列表。 默认情况下,只有 和 是只读的。 提供全局属性(请参阅全局属性)用于自定义框架组件。 当您希望不填充某些现成的标头时,这可能很有用,例如 (请参阅 JSON Transformers)。MessageBuilderreadOnlyHeaders(String…​ readOnlyHeaders)MessageMessageHeaders.IDMessageHeaders.TIMESTAMPspring.integration.readOnly.headersDefaultMessageBuilderFactorycontentTypeObjectToJsonTransformerSpring中文文档

当您尝试使用 构建新消息时,将忽略此类标头,并将特定消息发送到日志。MessageBuilderINFOSpring中文文档

从版本 5.0 开始,Messaging GatewayHeader EnricherContent EnricherHeader Filter 不允许您在使用时配置 和 标头名称,并且它们会抛出 .MessageHeaders.IDMessageHeaders.TIMESTAMPDefaultMessageBuilderFactoryBeanInitializationExceptionSpring中文文档

标头传播

当消息生成端点(如服务激活器)处理(和修改)消息时,通常,入站标头将传播到出站消息。 一个例外是转换器,当一个完整的消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当转换器仅返回有效负载时,将传播入站标头。 此外,仅当出站邮件中尚不存在标头时,才会传播标头,从而允许您根据需要更改标头值。Spring中文文档

从版本 4.3.10 开始,您可以配置消息处理程序(修改消息并生成输出)以抑制特定标头的传播。 若要配置不希望复制的标头,请调用抽象类上的 or 方法。setNotPropagatedHeaders()addNotPropagatedHeaders()MessageProducingMessageHandlerSpring中文文档

还可以通过将属性设置为逗号分隔的标头列表来全局禁止特定邮件标头的传播。readOnlyHeadersMETA-INF/spring.integration.propertiesSpring中文文档

从 5.0 版开始,实现 应用简单模式(、*xxx 或 ),以允许使用通用后缀或前缀过滤标头。 有关更多信息,请参见 PatternMatchUtils Javadoc。 当其中一个模式为(星号)时,不会传播任何标头。 所有其他模式都将被忽略。 在这种情况下,服务激活器的行为方式与转换器相同,并且必须在从服务方法返回的标头中提供任何必需的标头。 该选项在 Java DSL 中可用 它也可用于组件的 XML 配置作为属性。setNotPropagatedHeaders()AbstractMessageProducingHandlerxxx*xxxxxx*yyy*MessagenotPropagatedHeaders()ConsumerEndpointSpec<service-activator>not-propagated-headersSpring中文文档

报头传播抑制不适用于那些不修改消息的端点,例如网桥路由器
即使该类实现了 ,它实际上也是一个只读实现。 对 Map 中的某个值的任何尝试都会导致 . 这同样适用于 和 。 由于消息可以传递给多个使用者,因此无法修改其结构。 同样,消息的有效负载不能在初始创建之后。 但是,标头值本身(或有效负载对象)的可变性有意留给框架用户决定。MessageHeadersMapputUnsupportedOperationExceptionremoveclearMapObjectset
表 1.预定义的消息头
标头名称 标头类型 用法
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。 每次消息发生突变时都会更改。Spring中文文档

 MessageHeaders.
TIMESTAMP
 java.lang.Long

创建消息的时间。 每次消息发生突变时都会更改。Spring中文文档

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当未配置显式输出通道且没有或已耗尽时,将应答(如果有)发送到的通道。 如果值为 ,则它必须表示 Bean 名称或由ROUTING_SLIPROUTING_SLIPStringChannelRegistry.Spring中文文档

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

将错误发送到的通道。 如果值为 ,则它必须表示 Bean 名称或由StringChannelRegistry.Spring中文文档

表 2.预定义的消息头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两条或多条消息。Spring中文文档

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常,带有一组消息的序列号,带有 但也可以用于对无界消息组进行重新排序。SEQUENCE_SIZE<resequencer/>Spring中文文档

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组相关消息中的消息数。Spring中文文档

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示邮件何时过期。 框架不直接使用,但可以使用标头 enricher 进行设置,并在配置了 .<filter/>UnexpiredMessageSelectorSpring中文文档

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级 — 例如,在 .PriorityChannelSpring中文文档

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果幂等接收器拦截器将消息检测为重复消息,则为 True。 请参阅幂等接收方企业集成模式Spring中文文档

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果消息与消息处理完成时应关闭的消息相关联,则存在此标头。 例如,与使用 FTP、SFTP 等的流式文件传输相关联。CloseableSessionSpring中文文档

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动的通道适配器支持配置 ,则此标头包含当前传递尝试。RetryTemplateSpring中文文档

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站终结点支持它,则回调以接受、拒绝或重新排队消息。 请参阅延迟确认可轮询消息源MQTT 手动确认Spring中文文档

表 3.预定义的消息头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

需要嵌套关联时使用的关联数据堆栈(例如,)。splitter→…​→splitter→…​→aggregator→…​→aggregatorSpring中文文档

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

请参阅 Routing SlipSpring中文文档

类装入器中只能使用一种 UUID 生成策略。 这意味着,如果两个或多个应用程序上下文在同一个类装入器中运行,则它们共享相同的策略。 如果其中一个上下文更改了策略,则所有上下文都将使用它。 如果同一类装入器中的两个或多个上下文声明一个 bean 类型为 ,那么它们都必须是同一类的实例。 否则,尝试替换自定义策略的上下文将无法初始化。 如果策略相同,但已参数化,则使用要初始化的第一个上下文中的策略。org.springframework.util.IdGenerator
报头传播抑制不适用于那些不修改消息的端点,例如网桥路由器

消息实现

该接口的基本实现是 ,它提供了两个构造函数,如以下清单所示:MessageGenericMessage<T>Spring中文文档

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建 a 时,会生成一个随机的唯一 ID。 接受 of 标头的构造函数将提供的标头复制到新创建的 .MessageMapMessageSpring中文文档

还有一个方便的实现,旨在传达错误条件。 此实现将对象作为其有效负载,如以下示例所示:MessageThrowableSpring中文文档

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了基类参数化的事实。 因此,如两个示例所示,在检索有效载荷时不需要强制转换。GenericMessageMessageObjectSpring中文文档

Helper 类MessageBuilder

您可能会注意到,该接口为其有效负载和标头定义了检索方法,但没有提供 setter。 这样做的原因是 a 在初始创建后无法修改。 因此,当一个实例被发送到多个消费者(例如, 通过发布-订阅频道),如果其中一个使用者需要发送具有不同有效负载类型的回复,则必须创建一个新的 . 因此,其他使用者不受这些更改的影响。 请记住,多个使用者可以访问相同的有效负载实例或标头值,并且此类实例本身是否不可变由您决定。 换言之,实例的契约类似于不可修改的契约,地图进一步举例说明了这一点。 即使该类实现了 ,任何对实例调用操作(或“remove”或“clear”)的尝试都会导致 .MessageMessageMessageMessageMessageCollectionMessageHeadersMessageHeadersjava.util.MapputMessageHeadersUnsupportedOperationExceptionSpring中文文档

Spring Integration 不需要将 Map 的创建和填充传递到 GenericMessage 构造函数中,而是提供了一种更方便的方法来构造 Messages: 。 提供了两种工厂方法,用于从现有实例或具有有效负载创建实例。 当从现有构建时,其标头和有效负载被复制到新的,如以下示例所示:MessageBuilderMessageBuilderMessageMessageObjectMessageMessageMessageSpring中文文档

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建一个具有新有效负载的标头,但仍想从现有 中复制标头,则可以使用“copy”方法之一,如以下示例所示:MessageMessageSpring中文文档

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,该方法不会覆盖现有值。 此外,在前面的示例中,您可以看到如何使用 来设置任何用户定义的标头。 最后,有一些可用于预定义标头的方法,以及用于设置任何标头的非破坏性方法(还定义了预定义标头名称的常量)。copyHeadersIfAbsentsetHeadersetMessageHeadersSpring中文文档

您还可以用于设置消息的优先级,如以下示例所示:MessageBuilderSpring中文文档

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

仅当使用 (如下一章所述) 时,才会考虑标头。 它被定义为 .priorityPriorityChanneljava.lang.IntegerSpring中文文档