消息
消息
Spring 集成是数据的通用容器。
任何对象都可以作为有效负载提供,并且每个实例都包含包含用户可扩展属性的标头(作为键值对)。Message
Message
界面Message
下面的清单显示了接口的定义:Message
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
该接口是 API 的核心部分。
通过将数据封装在通用包装器中,消息传递系统可以在不了解数据类型的情况下传递数据。
当应用程序发展以支持新类型时,或者当类型本身被修改或扩展时,消息传递系统不会受到影响。
另一方面,当消息系统中的某些组件确实需要访问有关 的信息时,通常可以将此类元数据存储到消息标头中的元数据中并从中检索。Message
Message
消息报头
就像 Spring Integration 允许 any 用作 a 的有效负载一样,它也支持将任何类型作为 Headers 值。
实际上,该类实现了 ,如下面的类定义所示:Object
Message
Object
MessageHeaders
java.util.Map_ interface
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
尽管该类 implements ,但它实际上是一个只读实现。
对 Map 中的值的任何尝试都会导致 .
这同样适用于 和 。
由于消息可能会传递给多个使用者,因此无法修改 的结构。
同样,消息的有效负载不能在初始创建之后。
但是,标头值本身(或有效负载 Object)的可变性是有意保留给框架用户决定的。MessageHeaders Map put UnsupportedOperationException remove clear Map Object set |
作为 的实现,可以通过使用标头的名称进行调用来检索标头。
或者,您也可以将 expected 作为附加参数提供。
更好的是,当检索预定义值之一时,可以使用方便的 getter。
以下示例显示了这三个选项中的每一个:Map
get(..)
Class
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了预定义的消息标头:
标头名称 | 标头类型 | 用法 |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
此消息实例的标识符。 每次更改消息时都会更改。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息的创建时间。 每次更改消息时都会更改。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
当未配置显式输出通道且没有或已耗尽时,将回复(如果有)发送到的通道。
如果值是 a ,则它必须表示 bean 名称或由 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
将错误发送到的通道。
如果值是 a ,则它必须表示 bean 名称或由 |
许多入站和出站适配器实现还提供或期望某些 Headers,您可以配置其他用户定义的 Headers。
例如,这些 Headers 的常量可以在存在此类 Headers 的那些模块中找到。、 、 等。AmqpHeaders
JmsHeaders
MessageHeaderAccessor
应用程序接口
从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至该模块,并且引入了 API 以提供对消息传递实现的额外抽象。
所有(核心)特定于 Spring 集成的消息头常量现在都在类中声明。
下表描述了预定义的消息标头:spring-messaging
MessageHeaderAccessor
IntegrationMessageHeaderAccessor
标头名称 | 标头类型 | 用法 |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用于关联两个或多个消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常,带有一组消息的序列号带有 a 但也可以在 a 中用于对无界的消息组重新排序。 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一组相关消息中的消息数。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示消息何时过期。
不直接由框架使用,但可以使用标头扩充器进行设置,并在配置了 . |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级 — 例如,在 . |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果消息被幂等接收方侦听器检测到为重复消息,则为 True。 请参阅 Idempotent Receiver Enterprise Integration Pattern。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果消息与消息处理完成时应关闭的 关联,则存在此标头。
例如,与使用 FTP、SFTP 等的流式文件传输相关联。 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动的通道适配器支持 的配置 ,则此标头包含当前的传送尝试。 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站终端节点支持,则回调以接受、拒绝或重新排队消息。 请参阅 Deferred Acknowledgment Pollable Message Source 和 MQTT Manual Acks。 |
类中为其中一些标头提供了方便的类型化 getter,如下例所示:IntegrationMessageHeaderAccessor
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出现在用户代码中但通常不被用户代码使用的 Headers(也就是说,它们通常由 Spring Integration 的内部部分使用——为了完整起见,在这里包含它们):IntegrationMessageHeaderAccessor
标头名称 | 标头类型 | 用法 |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
需要嵌套相关时使用的相关数据堆栈(例如 )。 |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
请参阅 Routing Slip。 |
消息 ID 生成
当消息通过应用程序转换时,每次更改(例如
by a transformer) 分配新的消息 ID。
消息 ID 是一个 .
从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的实现更有效。
它使用基于安全随机种子的简单随机数,而不是每次都创建一个安全的随机数。UUID
java.util.UUID.randomUUID()
可以通过声明在应用程序上下文中实现的 bean 来选择不同的 UUID 生成策略。org.springframework.util.IdGenerator
类加载器中只能使用一种 UUID 生成策略。
这意味着,如果两个或多个应用程序上下文在同一个 classloader 中运行,它们将共享相同的策略。
如果其中一个上下文更改了策略,则所有上下文都会使用它。
如果同一类加载器中的两个或多个上下文声明 type 为 bean ,则它们都必须是同一类的实例。
否则,尝试替换自定义策略的上下文将无法初始化。
如果策略相同,但已参数化,则使用要初始化的第一个上下文中的策略。org.springframework.util.IdGenerator |
除了 default 策略之外,还提供了两个额外的策略。 使用前面的机制。
当并不真正需要 UUID 并且简单的递增值就足够时,可以使用。IdGenerators
org.springframework.util.JdkIdGenerator
UUID.randomUUID()
o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
只读标头
和 是只读标头,无法覆盖。MessageHeaders.ID
MessageHeaders.TIMESTAMP
从版本 4.3.2 开始,提供了 API 来自定义不应从上游复制的标头列表。
默认情况下,只有 和 是只读的。
提供 global 属性(请参阅 Global Properties)以自定义框架组件。
当您不想填充某些开箱即用的标头时,这可能很有用,例如(请参阅 JSON 转换器)。MessageBuilder
readOnlyHeaders(String… readOnlyHeaders)
Message
MessageHeaders.ID
MessageHeaders.TIMESTAMP
spring.integration.readOnly.headers
DefaultMessageBuilderFactory
contentType
ObjectToJsonTransformer
当您尝试使用 构建新消息时,将忽略此类标头,并将特定消息发送到日志中。MessageBuilder
INFO
从版本 5.0 开始,Messaging Gateway、Header Enricher、Content Enricher 和 Header Filter 不允许在使用 时配置 和 Header 名称,它们会抛出 .MessageHeaders.ID
MessageHeaders.TIMESTAMP
DefaultMessageBuilderFactory
BeanInitializationException
标头传播
当消息生成终端节点(例如服务激活器)处理(和修改)消息时,入站标头通常会传播到出站消息。 一个例外是转换器,当完整的消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当转换器仅返回有效负载时,入站标头将被传播。 此外,仅当出站邮件中尚不存在报头时,才会传播报头,从而允许您根据需要更改报头值。
从版本 4.3.10 开始,您可以配置消息处理程序(修改消息并生成输出)以抑制特定标头的传播。
要配置不想复制的标头,请对抽象类调用 or 方法。setNotPropagatedHeaders()
addNotPropagatedHeaders()
MessageProducingMessageHandler
您还可以通过将 in 属性设置为以逗号分隔的报头列表来全局禁止特定邮件报头的传播。readOnlyHeaders
META-INF/spring.integration.properties
从版本 5.0 开始,上的实现应用简单模式(, , *xxx
或 )以允许过滤具有通用后缀或前缀的 Headers。
有关更多信息,请参见 PatternMatchUtils
Javadoc。
当其中一个模式为 (星号) 时,不会传播任何标头。
所有其他模式都将被忽略。
在这种情况下,service activator 的行为方式与 transformer 相同,并且必须在从 service 方法返回的 中提供任何必需的 headers。
该选项在 Java DSL 的
它还可用于组件的 XML 配置作为属性。setNotPropagatedHeaders()
AbstractMessageProducingHandler
xxx*
xxx
xxx*yyy
*
Message
notPropagatedHeaders()
ConsumerEndpointSpec
<service-activator>
not-propagated-headers
消息实现
接口的基本实现是 ,它提供了两个构造函数,如下面的清单所示:Message
GenericMessage<T>
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
创建 a 时,将生成一个随机的唯一 ID。
接受 of 标头的构造函数将提供的标头复制到新创建的 .Message
Map
Message
还有一个方便的 designed 实现来传达错误条件。
此实现将对象作为其有效负载,如下例所示:Message
Throwable
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
请注意,此实现利用了基类已参数化的事实。
因此,如两个示例所示,检索 payload 时不需要强制转换 。GenericMessage
Message
Object
Helper 类MessageBuilder
您可能会注意到,该接口为其有效负载和 Headers 定义了检索方法,但没有提供 setter。
这样做的原因是 a 在初始创建后无法修改。
因此,当一个实例被发送给多个使用者(例如,
通过 publish-subscribe Channel),如果其中一个使用者需要发送具有不同有效负载类型的回复,则必须创建一个新的.
因此,其他使用者不会受到这些更改的影响。
请记住,多个使用者可以访问相同的有效负载实例或标头值,并且此类实例本身是否不可变由您决定。
换句话说,instances 的 Contract 类似于 unmodifiable 的 Contract ,并且 map 进一步说明了这一点。
即使该类实现了 ,任何对实例调用操作(或“remove”或“clear”)的尝试都会导致 .Message
Message
Message
Message
Message
Collection
MessageHeaders
MessageHeaders
java.util.Map
put
MessageHeaders
UnsupportedOperationException
Spring 集成不需要将 Map 的创建和填充传递到 GenericMessage 构造函数中,而是提供了一种更方便的方式来构造消息:。
它提供了两种工厂方法,用于从 existing 或 with a payload 创建实例。
从现有 构建时,该标头和有效负载将复制到新的 ,如下例所示:MessageBuilder
MessageBuilder
Message
Message
Object
Message
Message
Message
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要创建具有新有效负载的 ,但仍希望从现有 复制标头 ,则可以使用 'copy' 方法之一,如下例所示:Message
Message
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
请注意,该方法不会覆盖现有值。
此外,在前面的示例中,您可以看到如何使用 .
最后,有一些方法可用于预定义的 Headers,以及用于设置任何 Headers 的非破坏性方法(还为预定义的 Headers 名称定义常量)。copyHeadersIfAbsent
setHeader
set
MessageHeaders
您还可以使用 来设置消息的优先级,如下例所示:MessageBuilder
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
仅在使用 时考虑 Headers(如下章所述)。
它被定义为 .priority
PriorityChannel
java.lang.Integer