延迟器
延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。
当消息延迟时,原始发件人不会阻止。
相反,延迟消息会使用 实例 of 进行计划,以便在延迟过后发送到输出通道。
这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量阻塞的发送方线程。
相反,在典型情况下,线程池用于实际执行释放消息。
本节包含配置延迟器的几个示例。org.springframework.scheduling.TaskScheduler
配置 Delayer
该元素用于延迟两个消息通道之间的消息流。
与其他终端节点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。
以下示例将所有消息延迟 3 秒:<delayer>
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果需要确定每条消息的延迟,还可以使用'expression'属性提供 SPEL 表达式,如以下表达式所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,仅当表达式的计算结果为给定入站消息的 null 时,三秒延迟才适用。
如果您只想将延迟应用于具有表达式评估有效结果的消息,则可以使用 'default-delay' (默认值)。
对于延迟为 (或更小) 的任何消息,该消息将立即在调用线程上发送。0
0
XML 解析器使用消息组 ID。<beanName>.messageGroupId |
延迟处理程序支持表示以毫秒为单位的间隔的表达式计算结果(其方法生成可解析为 a 的值的任何结果)以及表示绝对时间的实例。
在第一种情况下,毫秒从当前时间开始计算(例如,值 of 会将消息从延迟器收到消息的时间开始延迟至少 5 秒)。
对于实例,消息在该对象表示的时间之前不会释放。
等于非正延迟或过去的 Date 的值不会导致延迟。
相反,它被直接发送到原始发送方线程上的 output 通道。
如果表达式评估结果不是 a 且无法解析为 a ,则应用默认延迟(如果有 — 默认值为 )。Object toString() Long java.util.Date 5000 Date Date Date Long 0 |
表达式计算可能会因各种原因(包括无效的表达式或其他条件)而引发计算异常。
默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会回退到默认延迟(如果有)。
您可以通过设置属性来修改此行为。
默认情况下,此属性设置为 ,并且延迟器行为如前所述。
但是,如果不希望忽略表达式计算异常并将其抛给延迟器的调用方,请将该属性设置为 .ignore-expression-failures true ignore-expression-failures false |
在前面的示例中,延迟表达式指定为 。
这是访问元素( implements )的 SPEL 语法。
它调用: .
对于简单的映射元素名称(不包含 '.'),您还可以使用 SPEL“点访问器”语法,其中前面显示的标头表达式可以指定为 .
但是,如果缺少标头,则会获得不同的结果。
在第一种情况下,表达式的计算结果为 。
第二个结果类似于以下内容:
因此,如果可能会省略标头,并且你想要回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且建议使用),因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 抽象的实例。
延迟器使用的默认调度程序是 Spring 集成在启动时提供的实例。
请参阅配置 Task Scheduler。
如果要委托给不同的 scheduler,可以通过 delayer 元素的 'scheduler' 属性提供引用,如下例所示:TaskScheduler
ThreadPoolTaskScheduler
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 ,则可以在此属性上设置。
它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的 'delay' 任务。
在 Spring Integration 2.2 之前,这个属性在元素上是可用的,因为可以在后台创建自己的调度程序。
从 2.2 开始,延迟器需要外部调度程序实例,并且已被删除。
您应该使用调度程序自己的配置。ThreadPoolTaskScheduler waitForTasksToCompleteOnShutdown = true <delayer> DelayHandler waitForTasksToCompleteOnShutdown |
ThreadPoolTaskScheduler 具有一个属性 ,该属性可以与 的某个实现一起注入。
此处理程序允许从发送延迟消息的计划任务的线程中处理 an 。
默认情况下,它使用 ,您可以在日志中看到堆栈跟踪。
您可能需要考虑使用 ,它将 发送到 ,无论是从失败消息的报头还是发送到默认的 。
此错误处理在事务回滚(如果存在)后执行。
请参阅 发布失败。errorHandler org.springframework.util.ErrorHandler Exception org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler org.springframework.integration.channel.MessagePublishingErrorHandler ErrorMessage error-channel error-channel |
Delayer 和 Message Store
将延迟消息保存到提供的 .
('groupId' 基于元素所需的 'id' 属性。
另请参阅 。
在将延迟消息发送到 之前,计划任务会从 中删除该消息。
如果提供的是持久性的(例如 ),则它提供了在应用程序关闭时不丢失消息的能力。
应用程序启动后,将从 中的消息组读取消息,并根据消息的原始到达时间(如果延迟为数字)延迟重新安排这些消息。
对于延迟报头为 的邮件,在重新调度时使用。
如果延迟消息仍处于超过其 'delay' 状态,则会在启动后立即发送该消息。
这是必需的,不能依赖于可以生成的 bean 名称。
这样,在应用程序重新启动后, a 可能会获得新生成的 bean 名称。
因此,延迟的消息可能会因重新调度而丢失,因为它们的组不再由应用程序管理。DelayHandler
MessageStore
<delayer>
DelayHandler.setMessageGroupId(String)
MessageStore
DelayHandler
output-channel
MessageStore
JdbcMessageStore
DelayHandler
MessageStore
Date
Date
MessageStore
messageGroupId
DelayHandler
DelayHandler
可以使用两个互斥元素之一来丰富 : 和 。
这些 AOP 建议应用于代理的 internal ,它负责在延迟后在计划任务上发布消息。
例如,当下游消息流抛出异常并且 的事务回滚时,可能会使用它。
在这种情况下,延迟消息将保留在持久性 .
您可以在 .
该元素定义了一个简单的通知链,该链只有事务性建议。
以下示例显示了 中的 :<delayer>
<transactional>
<advice-chain>
List
DelayHandler.ReleaseMessageHandler
Thread
ReleaseMessageHandler
MessageStore
org.aopalliance.aop.Advice
<advice-chain>
<transactional>
advice-chain
<delayer>
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
这可以导出为具有托管操作 ( 和 )的 JMX,这允许在运行时重新安排延迟的持久消息 — 例如,如果 之前已停止。
这些操作可以通过命令调用,如下例所示:DelayHandler
MBean
getDelayedMessageCount
reschedulePersistedMessages
TaskScheduler
Control Bus
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅系统管理。 |
从版本 5.3.7 开始,如果在将消息存储到 a 时事务处于活动状态,则会在回调中安排发布任务。
这对于防止争用情况是必要的,在这种情况下,计划的发布可能会在事务提交之前运行,并且找不到消息。
在这种情况下,消息将在延迟后或事务提交后(以较晚者为准)发布。MessageStore
TransactionSynchronization.afterCommit()
发布失败
从版本 5.0.8 开始,延迟器上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
发布消息时,如果下游流失败,将在 .
如果达到 ,则丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储中,但不会再计划发布,直到重新启动应用程序或调用该方法,如上所述)。retryDelay
maxAttempts
reschedulePersistedMessages()
此外,您还可以配置 ;当发布失败时,将 an 发送到该通道,并将 exception 作为 payload 并具有 property 。
它包含一个包含当前计数的标头。delayedMessageErrorChannel
ErrorMessage
originalMessage
ErrorMessage
IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
如果错误流使用错误消息并正常退出,则不会执行进一步的操作;如果发布是事务性的,则将提交事务并从存储中删除消息。
如果错误流引发异常,则将重试发布,直到如上所述。maxAttempts