此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
从版本 5.0 开始,由于实现,引入了一个新的功能,使整个下游流具有事务性。
当在元素中使用 regular 时(例如,通过 配置 ),启动的事务仅应用于内部,而不会传播到下游流。TransactionHandleMessageAdvice
HandleMessageAdvice
TransactionInterceptor
<request-handler-advice-chain>
<tx:advice>
AbstractReplyProducingMessageHandler.handleRequestMessage()
为了简化 XML 配置,除了 之外,还向 all and and 相关组件添加了一个元素。
以下示例显示了使用情况:<request-handler-advice-chain>
<transactional>
<outbound-gateway>
<service-activator>
<transactional>
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
可以使用 ,可以使用 Java 配置,并且可以在 messaging annotations 属性中使用结果 Bean 名称,如下例所示:TransactionInterceptorBuilder
adviceChain
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意构造函数上的参数。
它会导致创建 ,而不是常规的 。true
TransactionInterceptorBuilder
TransactionHandleMessageAdvice
TransactionInterceptor
Java DSL 支持端点配置上的直通选项,如下例所示:Advice
.transactional()
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}