消息转换
消息转换
转换器
消息转换器在实现消息生成者和消息使用者的松散耦合方面起着非常重要的作用。
您可以在这些组件之间添加转换器,而不是要求每个消息生成组件都知道下一个使用者期望的类型。
泛型转换器(例如将 a 转换为 XML 文档的转换器)也是高度可重用的。String
对于某些系统,最好提供规范的数据模型,但 Spring 集成的一般理念是不需要任何特定的格式。 相反,为了获得最大的灵活性, Spring 集成旨在提供最简单的扩展模型。 与其他端点类型一样,在 XML 或 Java 注释中使用声明性配置使简单的 POJO 能够适应消息转换器的角色。 本章的其余部分介绍了这些配置选项。
为了最大限度地提高灵活性, Spring 不需要基于 XML 的消息有效负载。 尽管如此,如果这确实是您的应用程序的正确选择,那么该框架确实提供了一些方便的转换器来处理基于 XML 的有效负载。 有关这些转换器的更多信息,请参见 XML 支持 - 处理 XML 负载。 |
使用 XML 配置 Transformer
该元素用于创建消息转换终端节点。
除了 and 属性之外,它还需要一个 attribute。
该 可以指向包含单个方法上的 Comments 的对象(请参见使用 Comments 配置 Transformer),也可以与属性中提供的显式方法名称值组合。<transformer>
input-channel
output-channel
ref
ref
@Transformer
method
<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />
如果自定义转换器处理程序实现可以在其他定义中重用,则通常建议使用属性。
但是,如果自定义转换器处理程序实现的范围应限定为单个定义的定义,则可以定义内部 Bean 定义,如下例所示:ref
<transformer>
<transformer>
<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
output-channel="outChannel">
<beans:bean class="org.foo.TestTransformer"/>
</transformer>
不允许在同一配置中同时使用 attribute 和内部处理程序定义,因为它会创建不明确的条件并导致引发异常。ref <transformer> |
如果该属性引用了扩展的 bean(例如框架本身提供的转换器),则通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个都必须是单独的 bean 实例(或-scoped bean)或使用内部配置类型。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref AbstractMessageProducingHandler ref prototype <bean/> |
使用 POJO 时,用于转换的方法可能需要入站消息的类型或有效负载类型。
它还可以单独接受消息头值,也可以分别使用 和 parameter 注释作为完整 map 接受消息头值。
该方法的返回值可以是任何类型。
如果返回值本身是 ,则将其传递给 transformer 的 output channel。Message
@Header
@Headers
Message
从 Spring Integration 2.0 开始,消息转换器的 transformation method 不能再返回。
返回会导致异常,因为消息转换器应始终将每个源消息转换为有效的目标消息。
换句话说,消息转换器不应用作消息过滤器,因为有一个专用选项。
但是,如果您确实需要这种类型的行为(组件可能会返回,并且不应将其视为错误),则可以使用服务激活器。
其值是 default 的,但可以设置为该值,以便为返回值引发异常,就像 transformer 一样。null
null
<filter>
null
requires-reply
false
true
null
转换器和 Spring 表达式语言 (SpEL)
与路由器、聚合器和其他组件一样,从 Spring Integration 2.0 开始,只要转换逻辑相对简单,转换器也可以从 SPEL 支持中受益。 下面的示例展示了如何使用 SPEL 表达式:
<int:transformer input-channel="inChannel"
output-channel="outChannel"
expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>
前面的示例在不编写自定义转换器的情况下转换有效负载。
我们的有效负载(假设为 a )是大写的,与当前时间戳连接,并应用了一些格式。String
共 电 转换器
Spring 集成提供了一些 transformer 实现。
对象到字符串转换器
因为使用表示形式相当普遍,所以Spring Integration提供了一个输出是带有String的 a 的。
这是对入站 Message 的有效负载调用操作的结果。
下面的示例演示如何声明 object-to-string 转换器的实例:toString()
Object
ObjectToStringTransformer
Message
payload
String
toString()
<int:object-to-string-transformer input-channel="in" output-channel="out"/>
这个转换器的一个潜在用途是将一些任意对象发送到命名空间中的 'outbound-channel-adapter'。
虽然该 channel adapter 默认仅支持 , byte-array 或 payloads,但在适配器处理必要的转换之前立即添加此 transformer 。
只要调用的结果是你想要写入文件的内容,就可以正常工作。
否则,您可以使用前面显示的通用 'transformer' 元素来提供基于 POJO 的自定义 transformer。file
String
java.io.File
toString()
调试时,通常不需要此转换器,因为 能够记录消息有效负载。
有关更多详细信息,请参见 Wire Tap。logging-channel-adapter |
对象到字符串转换器非常简单。
它对入站有效负载进行调用。
从 Spring Integration 3.0 开始,这条规则有两个例外:
对于更复杂的操作(例如在运行时动态选择字符集),可以改用基于 SPEL 表达式的转换器,如下例所示:
|
如果需要将 an 序列化为字节数组或将 byte 数组反序列化回 ,Spring Integration 提供了对称序列化转换器。
默认情况下,这些使用标准 Java 序列化,但您可以分别使用 和 属性来提供 Spring 或策略的实现。
下面的示例展示了如何使用 Spring 的序列化器和反序列化器:Object
Object
Serializer
Deserializer
serializer
deserializer
<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>
<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
allow-list="com.mycom.*,com.yourcom.*"/>
当反序列化来自不受信任的来源的数据时,您应该考虑添加 of 包和类模式。
默认情况下,所有类都被反序列化。allow-list |
Object
转换器 - 到 - 和 - 转换器Map
Map
Object
Spring 集成还提供了 -to- 和 -to- 转换器,它们使用 JSON 来序列化和反序列化对象图。
对象层次结构被内省到最原始的类型 (、 和 等)。
此类型的路径用 SpEL 描述,它在转换后的 .
基元类型变为值。Object
Map
Map
Object
String
int
key
Map
请考虑以下示例:
public class Parent{
private Child child;
private String name;
// setters and getters are omitted
}
public class Child{
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容:Map
{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}
基于 JSON 的 JSON 允许您描述对象结构,而无需共享实际类型,这样您就可以将对象图还原并重新构建为不同类型的对象图,只要您维护该结构即可。Map
例如,可以使用 -to- transformer 将前面的结构还原回以下对象图:Map
Object
public class Father {
private Kid child;
private String name;
// setters and getters are omitted
}
public class Kid {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
如果需要创建 “结构化” 映射,则可以提供 attribute。
默认值为 'true'。
如果将其设置为 'false',则结构是对象的。flatten
Map
Map
请考虑以下示例:
public class Parent {
private Child child;
private String name;
// setters and getters are omitted
}
public class Child {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容:Map
{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}
为了配置这些转换器, Spring 集成为 Object-to-Map 提供了名称空间支持,如下例所示:
<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>
您还可以将 attribute 设置为 false,如下所示:flatten
<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>
Spring 集成为 Map-to-Object 提供名称空间支持,如下例所示:
<int:map-to-object-transformer input-channel="input"
output-channel="output"
type="org.something.Person"/>
或者,你可以使用属性和原型范围的 bean,如下例所示:ref
<int:map-to-object-transformer input-channel="inputA"
output-channel="outputA"
ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
'ref' 和 'type' 属性是互斥的。
此外,如果使用 'ref' 属性,则必须指向 'prototype' 范围的 bean。
否则,将引发 a。BeanCreationException |
从版本 5.0 开始,你可以提供自定义的 — 当你需要特殊格式的日期或空集合的 null(和其他用途)时。
有关实施的更多信息,请参阅 JSON 转换器。ObjectToMapTransformer
JsonObjectMapper
JsonObjectMapper
Stream Transformer
将有效负载转换为 a ( 或 a 如果提供了 a)。StreamTransformer
InputStream
byte[]
String
charset
下面的示例演示如何在 XML 中使用该元素:stream-transformer
<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->
<int:stream-transformer id="withCharset" charset="UTF-8"
input-channel="charsetChannel" output-channel="output"/> <!-- String -->
以下示例演示如何使用类和注释在 Java 中配置流转换器:StreamTransformer
@Transformer
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
return new StreamTransformer(); // transforms to byte[]
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
return new StreamTransformer("UTF-8"); // transforms to String
}
JSON 转换器
Spring 集成提供了 Object-to-JSON 和 JSON to-Object 转换器。 下面的一对示例演示如何在 XML 中声明它们:
<int:object-to-json-transformer input-channel="objectMapperInput"/>
<int:json-to-object-transformer input-channel="objectMapperInput"
type="foo.MyDomainObject"/>
默认情况下,前面清单中的 transformer 使用 vanilla .
它基于 Classpath 中的实现。
您可以使用适当的选项或基于所需的库(例如 GSON)提供自己的自定义实现,如下例所示:JsonObjectMapper
JsonObjectMapper
<int:json-to-object-transformer input-channel="objectMapperInput"
type="something.MyDomainObject" object-mapper="customObjectMapper"/>
从版本 3.0 开始,该属性引用新策略接口的实例:.
此抽象允许使用 JSON 映射器的多个实现。
提供了包装 Jackson 2 的实现,并在 Classpath 上检测到版本。
类分别为 。 |
您可能希望考虑使用 或 factory 方法来创建具有所需 trait 的 。
下面的示例展示了如何使用这样的工厂:FactoryBean
JsonObjectMapper
public class ObjectMapperFactory {
public static Jackson2JsonObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
return new Jackson2JsonObjectMapper(mapper);
}
}
以下示例演示如何在 XML 中执行相同的操作
<bean id="customObjectMapper" class="something.ObjectMapperFactory"
factory-method="getMapper"/>
从版本 2.2 开始,如果 input 消息还没有该 Headers,则默认情况下将 Headers 设置为 。 如果您希望将 Headers 设置为其他值或用某个值(包括 )显式覆盖任何现有 Headers,请使用 attribute 。
如果要禁止显示标头的设置,请将该属性设置为空字符串 ()。
这样做会导致消息没有 Headers,除非 Importing 消息中存在这样的 Header。 |
从版本 3.0 开始,向消息添加反映源类型的 Headers。
同样,在将 JSON 转换为对象时,可以使用这些类型的标头。
这些标头在 AMQP 适配器中映射,因此它们与 Spring-AMQP JsonMessageConverter
完全兼容。ObjectToJsonTransformer
JsonToObjectTransformer
这使以无需任何特殊配置即可工作:
-
…→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→…
其中出站适配器配置了 a,入站适配器使用默认的 .
JsonMessageConverter
SimpleMessageConverter
-
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→…
其中出站适配器配置了 a,入站适配器使用默认的 .
SimpleMessageConverter
JsonMessageConverter
-
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→
其中两个适配器都配置了 .
SimpleMessageConverter
使用标头确定类型时,不应提供属性,因为它优先于标头。class |
除了 JSON 转换器之外, Spring 集成还提供了一个内置的 SPEL 函数,用于表达式。
有关更多信息,请参阅 Spring 表达式语言 (SpEL)。#jsonPath
从版本 3.0 开始, Spring 集成还提供了一个内置的 SPEL 函数,用于表达式。
有关更多信息,请参阅 #xpath SPEL 函数。#xpath
从版本 4.0 开始,支持该属性来指定节点 JSON 表示形式。
结果节点树表示形式取决于提供的 .
默认情况下,使用 a 并将对象到节点树的转换委托给该方法。
节点 JSON 表示形式为下游消息流使用具有 JSON 数据属性访问权限的 SPEL 表达式提供了效率。
有关更多信息,请参见 Property Accessors 。ObjectToJsonTransformer
resultType
JsonObjectMapper
ObjectToJsonTransformer
Jackson2JsonObjectMapper
ObjectMapper#valueToTree
JsonPropertyAccessor
从版本 5.1 开始,可以配置为生成带有有效负载的消息,以便在使用使用此数据类型的下游处理程序时方便使用。resultType
BYTES
byte[]
从版本 5.2 开始,可以使用 配置为在使用目标 JSON 处理器进行反序列化期间支持泛型。
此外,此组件现在首先查询请求消息标头是否存在 or,否则回退到配置的类型。
现在,它还根据请求消息负载填充任何可能的下游场景的标头。JsonToObjectTransformer
ResolvableType
JsonHeaders.RESOLVABLE_TYPE
JsonHeaders.TYPE_ID
ObjectToJsonTransformer
JsonHeaders.RESOLVABLE_TYPE
从版本 5.2.6 开始,可以提供 a 来解析有效负载,以便在运行时针对请求消息从 JSON 转换。
默认情况下,它在请求消息中进行咨询。
如果此表达式返回或构建抛出 a ,则转换器将回退到提供的 .
此 logic 以表达式的形式存在,因为可能没有真正的类值,而是一些类型 ID,必须根据某些外部注册表映射到目标类。JsonToObjectTransformer
valueTypeExpression
ResolvableType
JsonHeaders
null
ResolvableType
ClassNotFoundException
targetType
JsonHeaders
Apache Avro 转换器
版本 5.2 添加了简单的转换器,用于与 Apache Avro 进行转换。
它们并不复杂,因为没有 schema 注册表;转换器只使用嵌入在从 Avro 架构生成的实现中的架构。SpecificRecord
发送到 的消息必须具有实现 ;transformer 可以处理多种类型。
必须使用一个类进行配置,该类用作反序列化的默认类型。
您还可以指定 SPEL 表达式来确定要使用该方法反序列化的类型。
默认的 SPEL 表达式是 () ,默认情况下,它由 填充源类的完全限定类名。
如果表达式返回 ,则使用 。SimpleToAvroTransformer
SpecificRecord
SimpleFromAvroTransformer
SpecificRecord
setTypeExpression
headers[avro_type]
AvroHeaders.TYPE
SimpleToAvroTransformer
null
defaultType
还有一个方法。
这允许生产者和使用者分离,发送者可以将标头设置为表示类型的某个令牌,然后使用者将该令牌映射到类型。SimpleToAvroTransformer
setTypeExpression
配置带有注释的转换器
您可以将注释添加到需要 type 或 message payload 类型的方法中。
返回值的处理方式与前面描述 <transformer>
元素的部分中描述的完全相同。
以下示例演示如何使用注释将 a 转换为 :@Transformer
Message
@Transformer
String
Order
@Transformer
Order generateOrder(String productId) {
return new Order(productId);
}
转换器方法也可以接受 和 注释,如 中所述。
以下示例演示如何使用注释:@Header
@Headers
Annotation Support
@Header
@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
return new Order(productId, customer);
}
另请参阅使用注释通知终端节点。
标头筛选器
有时,您的转换使用案例可能就像删除几个标头一样简单。 对于这样的用例, Spring 集成提供了一个 Headers 过滤器,它允许你指定应该从输出消息中删除的某些 Headers 名称(例如,出于安全原因删除 Headers 或仅临时需要的值)。 基本上,Header 过滤器与 Header Enricher 相反。 后者在 Header Enricher 中讨论。 以下示例定义了一个标头过滤器:
<int:header-filter input-channel="inputChannel"
output-channel="outputChannel" header-names="lastName, state"/>
如您所见,报头过滤器的配置非常简单。
它是具有输入和输出通道以及属性的典型终端节点。
该属性接受需要删除的 Headers 的名称(如果有多个,则用逗号分隔)。
因此,在前面的示例中,出站消息中不存在名为 'lastName' 和 'state' 的标头。header-names
基于编解码器的转换器
请参阅 编解码器。
内容丰富器
有时,您可能需要使用比目标系统提供的信息更多的信息来增强请求。 数据扩充器模式描述了各种方案以及允许您满足此类需求的组件 (Enricher)。
Spring 集成模块包括两个丰富器:Core
它还包括三个特定于适配器的标头扩充器:
请参阅本参考手册中特定于适配器的部分,以了解有关这些适配器的更多信息。
有关表达式支持的更多信息,请参阅 Spring 表达式语言 (SpEL)。
标头扩充器
如果您只需要向消息添加 Headers,并且 Headers 不是由消息内容动态确定的,则引用转换器的自定义实现可能有点矫枉过正。
出于这个原因, Spring 集成提供了对 Headers Enricher 模式的支持。
它通过元素公开。
以下示例演示如何使用它:<header-enricher>
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" value="123"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
Header Enricher 还提供了有用的子元素来设置众所周知的 Headers 名称,如下例所示:
<int:header-enricher input-channel="in" output-channel="out">
<int:error-channel ref="applicationErrorChannel"/>
<int:reply-channel ref="quoteReplyChannel"/>
<int:correlation-id value="123"/>
<int:priority value="HIGHEST"/>
<routing-slip value="channel1; routingSlipRoutingStrategy; request.headers[myRoutingSlipChannel]"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
前面的配置表明,对于众所周知的 Headers(例如 , , , , 等),您可以使用方便的子元素直接设置这些值,而不是使用必须同时提供 Headers 'name' 和 'value' 的通用子元素。errorChannel
correlationId
priority
replyChannel
routing-slip
<header>
从版本 4.1 开始,Header Enricher 提供了一个 sub-element。
有关更多信息,请参阅路由单。routing-slip
POJO 支持
通常,标头值不能静态定义,必须根据消息中的某些内容动态确定。
这就是为什么 Header Enricher 还允许您使用 and 属性指定 bean 引用的原因。
指定的方法计算 header 值。
考虑以下配置和一个 bean,其方法修改了:ref
method
String
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="something" method="computeValue" ref="myBean"/>
</int:header-enricher>
<bean id="myBean" class="thing1.thing2.MyBean"/>
public class MyBean {
public String computeValue(String payload){
return payload.toUpperCase() + "_US";
}
}
你还可以将 POJO 配置为内部 Bean,如下例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<bean class="org.MyEnricher"/>
</int:header>
</int:header-enricher>
同样,您可以指向 Groovy 脚本,如下例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<int-groovy:script location="org/SampleGroovyHeaderEnricher.groovy"/>
</int:header>
</int:header-enricher>
SPEL 支持
在 Spring Integration 2.0 中,我们引入了 Spring 表达式语言 (SpEL) 的便利性,以帮助配置许多不同的组件。 header enricher 就是其中之一。 再看一下前面显示的 POJO 示例。 你可以看到,确定 header 值的计算逻辑非常简单。 一个自然而然的问题是:“有没有更简单的方法可以做到这一点? 这就是 SpEL 展示其真正力量的地方。 请考虑以下示例:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" expression="payload.toUpperCase() + '_US'"/>
</int:header-enricher>
通过将 SPEL 用于此类简单情况,您不再需要提供单独的类并在应用程序上下文中对其进行配置。
您需要做的就是使用有效的 SPEL 表达式配置属性。
'payload'和'headers'变量绑定到 SPEL 评估上下文,为您提供对传入消息的完全访问权限。expression
使用 Java 配置配置 Header Enricher
以下两个示例显示了如何将 Java 配置用于 Headers 扩充器:
@Bean
@Transformer(inputChannel = "enrichHeadersChannel", outputChannel = "emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd =
Collections.singletonMap("emailUrl",
new StaticHeaderValueMessageProcessor<>(this.imapUrl));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
@Bean
@Transformer(inputChannel="enrichHeadersChannel", outputChannel="emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
headersToAdd.put("emailUrl", new StaticHeaderValueMessageProcessor<String>(this.imapUrl));
Expression expression = new SpelExpressionParser().parseExpression("payload.from[0].toString()");
headersToAdd.put("from",
new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
第一个示例添加单个文本标头。 第二个示例添加了两个 Headers,一个 Literal Headers 和一个基于 SpEL 表达式。
使用 Java DSL 配置头文件丰富器
以下示例显示了标头扩充器的 Java DSL 配置:
@Bean
public IntegrationFlow enrichHeadersInFlow() {
return f -> f
...
.enrichHeaders(h -> h.header("emailUrl", this.emailUrl)
.headerExpression("from", "payload.from[0].toString()"))
.handle(...);
}
Header Channel Registry
从 Spring Integration 3.0 开始,可以使用一个新的子元素。
它没有属性。
这个新的子元素将 existing 和 headers(当它们是 a 时)转换为 a ,并将通道存储在注册表中,以便以后在发送回复或处理错误时解决。
这在标头可能丢失的情况下非常有用,例如,在将消息序列化到消息存储中或通过 JMS 传输消息时。
如果报头尚不存在或不是 ,则不会进行任何更改。<int:header-channels-to-string/>
replyChannel
errorChannel
MessageChannel
String
MessageChannel
使用此功能需要存在 Bean。
默认情况下,框架会创建一个具有默认过期时间(60 秒)的 。
在此时间之后,将从注册表中删除通道。
要更改此行为,请使用 of 定义一个 bean,并使用 constructor 参数(以毫秒为单位)配置所需的默认延迟。HeaderChannelRegistry
DefaultHeaderChannelRegistry
id
integrationHeaderChannelRegistry
从版本 4.1 开始,您可以在定义上设置一个名为 to 的属性,并且 mapping 条目在首次使用时会立即删除。
这在高容量环境中以及当 channel 只使用一次时可能很有用,而不是等待 reaper 将其删除。removeOnGet
true
<bean/>
具有确定注册表当前大小的方法。
该方法取消当前计划任务并立即运行 reaper。
然后,根据当前延迟计划任务再次运行。
可以通过获取对注册表的引用来直接调用这些方法,也可以将包含以下内容的消息发送到控制总线:HeaderChannelRegistry
size()
runReaper()
"@integrationHeaderChannelRegistry.runReaper()"
此 sub-element 是一种方便,相当于指定以下配置:
<int:reply-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.replyChannel)"
overwrite="true" />
<int:error-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.errorChannel)"
overwrite="true" />
从版本 4.1 开始,您现在可以覆盖 registry 配置的 reaper delay,以便 channel mapping 至少保留指定的时间,而不管 reaper 延迟如何。 以下示例显示了如何执行此操作:
<int:header-enricher input-channel="inputTtl" output-channel="next">
<int:header-channels-to-string time-to-live-expression="120000" />
</int:header-enricher>
<int:header-enricher input-channel="inputCustomTtl" output-channel="next">
<int:header-channels-to-string
time-to-live-expression="headers['channelTTL'] ?: 120000" />
</int:header-enricher>
在第一种情况下,每个标头通道映射的生存时间为 2 分钟。 在第二种情况下,生存时间在消息报头中指定,如果没有报头,则使用 Elvis 运算符使用两分钟。
Payload Enricher
在某些情况下,如前所述,标头扩充器可能不够,有效负载本身可能必须使用其他信息进行扩充。 例如,进入 Spring Integration 消息传递系统的订单消息必须根据提供的客户编号查找订单的客户,然后用该信息丰富原始有效负载。
Spring Integration 2.1 引入了有效负载丰富器。
有效负载扩充器定义一个终端节点,该终端节点将 a 传递给公开的请求通道,然后需要回复消息。
然后,回复消息成为用于评估表达式以丰富目标负载的根对象。Message
有效负载扩充器通过元素提供完整的 XML 命名空间支持。
为了发送请求消息,有效负载扩充器具有一个属性,该属性允许您将消息分派到请求通道。enricher
request-channel
基本上,通过定义请求通道,有效负载丰富器充当网关,等待发送到请求通道的消息返回。 然后,扩充器使用回复消息提供的数据来扩充消息的有效负载。
在向请求通道发送消息时,您还可以选择使用 属性仅发送原始负载的子集。request-payload-expression
有效负载的丰富是通过 SPEL 表达式配置的,从而提供了最大程度的灵活性。
因此,您不仅可以使用回复通道中的直接值来丰富有效负载,还可以使用 SPEL 表达式从该消息中提取子集或应用其他内联转换,从而进一步操作数据。Message
如果只需要使用静态值扩充有效负载,则无需提供该属性。request-channel
Enrichers 是 transformer 的一种变体。 在许多情况下,您可以使用有效负载扩充器或通用转换器实现将其他数据添加到消息有效负载中。 你应该熟悉 Spring Integration 提供的所有支持转换的组件,并仔细选择在语义上最适合你的业务案例的实现。 |
配置
以下示例显示了有效负载扩充器的所有可用配置选项:
<int:enricher request-channel="" (1)
auto-startup="true" (2)
id="" (3)
order="" (4)
output-channel="" (5)
request-payload-expression="" (6)
reply-channel="" (7)
error-channel="" (8)
send-timeout="" (9)
should-clone-payload="false"> (10)
<int:poller></int:poller> (11)
<int:property name="" expression="" null-result-expression="'Could not determine the name'"/> (12)
<int:property name="" value="23" type="java.lang.Integer" null-result-expression="'0'"/>
<int:header name="" expression="" null-result-expression=""/> (13)
<int:header name="" value="" overwrite="" type="" null-result-expression=""/>
</int:enricher>
1 | 将消息发送到的通道,以获取用于扩充的数据。 自选。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。 默认为 true。 自选。 |
3 | 基础 Bean 定义的 ID,即 an 或 a 。
自选。EventDrivenConsumer PollingConsumer |
4 | 指定此终端节点作为订阅者连接到通道时的调用顺序。 当该通道使用 “failover” 分派策略时,这一点尤其重要。 当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。 自选。 |
5 | 标识此终端节点处理消息后将消息发送到的消息通道。 自选。 |
6 | 默认情况下,原始消息的有效负载用作发送到 的有效负载。
通过将 SPEL 表达式指定为属性的值,您可以使用原始有效负载的子集、标头值或任何其他可解析的 SPEL 表达式作为发送到请求通道的有效负载的基础。
对于表达式计算,完整消息可用作 'root object'。
例如,可以使用以下 SpEL 表达式(以及其他表达式):、、、request-channel request-payload-expression payload.something headers.something new java.util.Date() 'thing1' + 'thing2' |
7 | 需要回复消息的频道。 这是可选的。 通常,自动生成的临时回复通道就足够了。 自选。 |
8 | 如果 发生在 的下游,则将 an 发送到的通道。
这使您能够返回用于扩充的替代对象。
如果未设置,则向调用方抛出 an。
自选。ErrorMessage Exception request-channel Exception |
9 | 如果通道可能阻塞,则向通道发送消息时要等待的最长时间(以毫秒为单位)。
例如,如果已达到其最大容量,则队列通道可以阻塞,直到有可用空间为止。
在内部,超时在 上设置,并最终在 上调用 send 操作时应用 。
默认情况下,根据实现情况,将无限期地阻止。
自选。send() MessagingTemplate MessageChannel send() timeout is set to '-1', which can cause the send operation on the `MessageChannel |
10 | 布尔值,指示在将消息发送到请求通道以获取丰富数据之前,是否应克隆任何实现的有效负载。
克隆的版本将用作最终回复的目标负载。
默认值为 .
自选。Cloneable false |
11 | 允许您配置消息轮询器(如果此端点是轮询使用者)。 自选。 |
12 | 每个子元素都提供属性的名称(通过 mandatory 属性)。
该属性应在目标负载实例上设置。
还必须提供 or 属性之一——前者用于设置文字值,后者用于要求值的 SPEL 表达式。
评估上下文的根对象是从此扩充器启动的流返回的消息 — 如果没有请求通道或应用程序上下文(使用 SPEL 语法),则为 input 消息。
从版本 4.0 开始,在指定属性时,您还可以指定 optional 属性。
当目标是类型化 setter 方法时,框架会适当地强制值(只要 存在 )以处理转换。
但是,如果目标有效负载是 a ,则条目中填充值而不进行转化。
例如,该属性允许您将包含数字的 a 转换为目标有效负载中的值。
从版本 4.1 开始,您还可以指定 optional 属性。
当返回 null 时,将对其进行评估,并返回评估的输出。property name value expression @<beanName>.<beanProperty> value type PropertyEditor Map type String Integer null-result-expression enricher |
13 | 每个子元素都提供消息头的名称(通过 mandatory 属性)。
还必须提供 or 属性之一——前者用于设置文字值,后者用于要求值的 SPEL 表达式。
评估上下文的根对象是从此丰富器启动的流返回的消息 — 如果没有请求通道或应用程序上下文(使用 '@<beanName>.<beanProperty>' SPEL 语法,则为输入消息)。
请注意,与 类似,元素的元素具有 和 属性。
但是,一个关键的区别是,默认情况下,使用 , 该属性与元素的子元素一致。
从版本 4.1 开始,您还可以指定 optional 属性。
当返回 null 时,将对其进行评估,并返回评估的输出。header name value expression <header-enricher> <enricher> header type overwrite <enricher> overwrite true <enricher> <property> null-result-expression enricher |
例子
本节包含在各种情况下使用有效负载扩充器的几个示例。
此处显示的代码示例是 Spring 集成示例项目的一部分。 参见 Spring 集成示例。 |
在以下示例中,对象作为 的有效负载传递:User
Message
<int:enricher id="findUserEnricher"
input-channel="findUserEnricherChannel"
request-channel="findUserServiceChannel">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
this 具有多个属性,但最初仅设置 the 。
enricher 的属性配置为将 传递给 .User
username
request-channel
User
findUserServiceChannel
通过 implicitly set ,返回一个对象,并使用 sub-element 提取回复中的属性,并将其用于丰富原始有效载荷。reply-channel
User
property
如何仅将数据子集传递给请求通道?
使用属性时,可以将有效负载的单个属性而不是完整消息传递给请求通道。
在以下示例中,将 username 属性传递给请求通道:request-payload-expression
<int:enricher id="findUserByUsernameEnricher"
input-channel="findUserByUsernameEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
请记住,虽然仅传递用户名,但发送到请求通道的结果消息包含完整的 .MessageHeaders
如何丰富包含集合数据的有效负载?
在下面的示例中,传入的不是 object,而是 a:User
Map
<int:enricher id="findUserWithMapEnricher"
input-channel="findUserWithMapEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="user" expression="payload"/>
</int:enricher>
该 包含 map 键下的用户名。
只有 the 被传递到请求通道。
回复包含一个完整的对象,该对象最终被添加到 key 下。Map
username
username
User
Map
user
如何在不使用请求通道的情况下使用静态信息丰富有效负载?
以下示例根本不使用请求通道,而仅使用静态值丰富消息的有效负载:
<int:enricher id="userEnricher"
input-channel="input">
<int:property name="user.updateDate" expression="new java.util.Date()"/>
<int:property name="user.firstName" value="William"/>
<int:property name="user.lastName" value="Shakespeare"/>
<int:property name="user.age" value="42"/>
</int:enricher>
请注意,“static”一词在这里使用得很松散。 您仍然可以使用 SPEL 表达式来设置这些值。
索赔检查
在前面的部分中,我们介绍了几个内容扩充器组件,这些组件可以帮助您处理消息缺少一条数据的情况。 我们还讨论了内容筛选,它允许您从消息中删除数据项。 但是,有时我们想暂时隐藏数据。 例如,在分布式系统中,我们可能会收到一条 payload 非常大的消息。 一些间歇性的消息处理步骤可能不需要访问这个 payload,有些可能只需要访问某些 header,因此在每个处理步骤中携带大消息 payload 可能会导致性能下降,可能会产生安全风险,并可能使调试更加困难。
存储在库中(或声明检查)模式描述了一种机制,该机制允许您将数据存储在众所周知的位置,同时仅维护指向该数据所在位置的指针(声明检查)。 您可以将该指针作为新消息的有效负载传递,从而让消息流中的任何组件在需要时立即获取实际数据。 这种方法与认证邮件流程非常相似,在认证邮件流程中,您会在邮箱中获得索赔支票,然后必须前往邮局领取您的实际包裹。 这与航班后或酒店领取行李的想法相同。
Spring 集成提供了两种类型的索赔检查转换器:
-
Incoming Claim Check Transformer
-
传出索赔检查转换器
可以使用方便的基于命名空间的机制来配置它们。
Incoming Claim Check Transformer
传入声明检查转换器通过将传入消息存储在由其属性标识的消息存储中来转换该消息。
以下示例定义了传入的索赔检查转换器:message-store
<int:claim-check-in id="checkin"
input-channel="checkinChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在 上收到的消息将保存到使用属性标识的消息存储中,并使用生成的 ID 编制索引。
该 ID 是该邮件的声明检查。
声明检查还成为发送到 的新(转换的)消息的有效负载。input-channel
message-store
output-channel
现在,假设您在某个时候确实需要访问实际消息。 您可以手动访问邮件存储并获取邮件的内容,也可以使用相同的方法(创建转换器),只不过现在您使用传出索赔检查转换器将索赔检查转换为实际邮件。
以下清单概述了 incoming claim check transformer 的所有可用参数:
<int:claim-check-in auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
send-timeout=""> (7)
<int:poller></int:poller> (8)
</int:claim-check-in>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为 .
此属性在元素中不可用。
自选。true Chain |
2 | 标识基础 Bean 定义的 ID ()。
此属性在元素中不可用。
自选。MessageTransformingHandler Chain |
3 | 此终端节点的接收消息通道。
此属性在元素中不可用。
自选。Chain |
4 | 对此索赔检查转换器要使用的 的引用。
如果未指定,则默认引用名为 .
自选。MessageStore messageStore |
5 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用 dispatching 策略时,这一点尤其重要。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在元素中不可用。
自选。failover Chain |
6 | 标识消息在此终端节点处理后发送到的消息通道。
此属性在元素中不可用。
自选。Chain |
7 | 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。
默认为 — 无限期阻塞。
此属性在元素中不可用。
自选。-1 Chain |
8 | 定义 poller。
此元素在元素中不可用。
自选。Chain |
传出索赔检查转换器
传出声明检查转换器允许您将具有声明检查负载的消息转换为以原始内容作为其负载的消息。
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在 上收到的消息应将声明检查作为其有效负载。
传出声明检查转换器通过查询消息存储以查找由提供的声明检查标识的消息,将其转换为具有原始负载的消息。
然后,它将新签出的消息发送到 .input-channel
output-channel
下面的清单提供了传出索赔检查转换器的所有可用参数的概述:
<int:claim-check-out auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
remove-message="false" (7)
send-timeout=""> (8)
<int:poller></int:poller> (9)
</int:claim-check-out>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为 .
此属性在元素中不可用。
自选。true Chain |
2 | 标识基础 Bean 定义的 ID ()。
此属性在元素中不可用。
自选。MessageTransformingHandler Chain |
3 | 此终端节点的接收消息通道。
此属性在元素中不可用。
自选。Chain |
4 | 对此索赔检查转换器要使用的 的引用。
如果未指定,则默认引用名为 .
自选。MessageStore messageStore |
5 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用 dispatching 策略时,这一点尤其重要。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在元素中不可用。
自选。failover Chain |
6 | 标识消息在此终端节点处理后发送到的消息通道。
此属性在元素中不可用。
自选。Chain |
7 | 如果设置为 ,则此转换器将从 中删除该消息。
当 Message 只能被 “认领” 一次时,此设置非常有用。
它默认为 .
自选。true MessageStore false |
8 | 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。
它默认为 — 无限期阻止。
此属性在元素中不可用。
自选。-1 Chain |
9 | 定义 poller。
此元素在元素中不可用。
自选。Chain |
领取一次
有时,特定消息只能声明一次。
打个比方,考虑处理飞机行李的过程。
您在出发时托运行李,并在抵达时领取行李。
行李一旦被认领,如果不先重新办理托运,就不能再次领取行李。
为了适应这种情况,我们在 transformer 上引入了一个 boolean 属性。
此属性默认设置为 。
但是,如果设置为 ,则会从 中删除已认领的消息,以便无法再次认领该消息。remove-message
claim-check-out
false
true
MessageStore
此功能对存储空间有影响,尤其是在基于 内存中 的情况下,如果无法删除消息,最终可能会导致 .
因此,如果您不希望进行多次声明,我们建议您将属性的值设置为 .
以下示例演示如何使用该属性:Map
SimpleMessageStore
OutOfMemoryException
remove-message
true
remove-message
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"
remove-message="true"/>
Codec
Spring Integration 的 4.2 版引入了抽象。
编解码器对对象进行编码和解码。
它们提供了 Java 序列化的替代方案。
一个优点是,对象通常不需要实现 .
我们提供了一个使用 Kryo 进行序列化的实现,但您可以提供自己的实现,以便在以下任何组件中使用:Codec
byte[]
Serializable
-
EncodingPayloadTransformer
-
DecodingTransformer
-
CodecMessageConverter
DecodingTransformer
此转换器使用 codec 解码 a。
它需要配置对象应解码到的 (或解析为 a 的表达式)。
如果生成的对象是 ,则不会保留入站标头。byte[]
Class
Class
Message<?>
有关更多信息,请参阅 Javadoc。
CodecMessageConverter
某些终端节点(如 TCP 和 Redis)没有消息标头的概念。
它们支持使用 ,并且 可用于将消息与 转换为 或从 进行传输。MessageConverter
CodecMessageConverter
byte[]
有关更多信息,请参阅 Javadoc。
克良
目前,这是 的唯一实现,它提供了两种 :Codec
Codec
-
PojoCodec
: 用于转换器 -
MessageCodec
:用于CodecMessageConverter
该框架提供了几个自定义序列化程序:
-
FileSerializer
-
MessageHeadersSerializer
-
MutableMessageHeadersSerializer
第一个可以通过使用 .
第二个和第三个与 一起使用,后者使用 .PojoCodec
FileKryoRegistrar
MessageCodec
MessageKryoRegistrar
自定义 Kryo
默认情况下,Kryo 将未知的 Java 类型委托给其 .
Kryo 还为每种原始类型注册默认序列化器,以及 、 和 。 使用 Reflection 导航对象图。
更有效的方法是实现一个自定义序列化器,该序列化器知道对象的结构,并且可以直接序列化选定的基元字段。
以下示例显示了这样的序列化程序:FieldSerializer
String
Collection
Map
FieldSerializer
public class AddressSerializer extends Serializer<Address> {
@Override
public void write(Kryo kryo, Output output, Address address) {
output.writeString(address.getStreet());
output.writeString(address.getCity());
output.writeString(address.getCountry());
}
@Override
public Address read(Kryo kryo, Input input, Class<Address> type) {
return new Address(input.readString(), input.readString(), input.readString());
}
}
该界面公开了 、 、 和 ,它们提供了对包含哪些字段和其他内部设置的完全控制,如 Kryo 文档中所述。Serializer
Kryo
Input
Output
注册自定义序列化程序时,您需要注册 ID。 注册 ID 是任意的。 但是,在我们的例子中,必须显式定义 ID,因为分布式应用程序中的每个 Kryo 实例都必须使用相同的 ID。 Kryo 建议使用小的正整数,并保留一些 id(值< 10)。 Spring 集成目前默认使用 40、41 和 42(对于前面提到的文件和消息头序列化器)。 我们建议您从 60 开始,以便在框架中进行扩展。 您可以通过配置前面提到的 registrar 来覆盖这些框架默认值。 |
使用自定义 Kryo 序列化器
如果您需要自定义序列化,请参阅 Kryo 文档,因为您需要使用本机 API 进行自定义。
有关示例,请参阅 MessageCodec
实现。
实现 KryoSerializable
如果您有权访问域对象源代码,则可以按照此处所述进行实施。
在这种情况下,该类本身提供序列化方法,无需进一步配置。
但是,基准测试表明,这不如显式注册自定义序列化程序有效。
以下示例显示了一个自定义 Kryo 序列化器:write
KryoSerializable
public class Address implements KryoSerializable {
...
@Override
public void write(Kryo kryo, Output output) {
output.writeString(this.street);
output.writeString(this.city);
output.writeString(this.country);
}
@Override
public void read(Kryo kryo, Input input) {
this.street = input.readString();
this.city = input.readString();
this.country = input.readString();
}
}
您还可以使用此技术包装 Kryo 以外的序列化库。
使用注释@DefaultSerializer
Kryo 还提供了一个注释,如此处所述。@DefaultSerializer
@DefaultSerializer(SomeClassSerializer.class)
public class SomeClass {
// ...
}
如果您有权访问 domain 对象,这可能是指定自定义序列化程序的更简单方法。
请注意,这不会使用 ID 注册类,这可能会使该技术在某些情况下没有帮助。write