此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

分层器是一个简单的端点,它允许消息流延迟一定的时间间隔。 当邮件延迟时,原始发件人不会阻止。 相反,延迟消息将安排为在延迟过后发送到输出通道的实例。 即使对于相当长的延迟,这种方法也是可扩展的,因为它不会导致大量阻塞的发送方线程。 相反,在典型情况下,线程池用于实际执行释放消息。 本节包含配置分层器的几个示例。org.springframework.scheduling.TaskSchedulerSpring中文文档

配置 Delayer

该元素用于延迟两个消息通道之间的消息流。 与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但延迟器还具有“default-delay”和“expression”属性(以及“expression”元素),用于确定每条消息应延迟的毫秒数。 以下示例将所有消息延迟三秒钟:<delayer>Spring中文文档

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果需要确定每条消息的延迟,还可以使用“expression”属性提供 SpEL 表达式,如以下表达式所示:Spring中文文档

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,仅当表达式的计算结果为给定入站消息为 null 时,三秒延迟才适用。 如果只想将延迟应用于具有有效表达式计算结果的消息,则可以使用“default-delay”(默认值)。 对于延迟为(或更短)的任何消息,该消息将立即在调用线程上发送。00Spring中文文档

XML 解析器使用的消息组 ID。<beanName>.messageGroupId
延迟处理程序支持表示以毫秒为单位的间隔的表达式计算结果(任何其方法生成可解析为 的值)以及表示绝对时间的实例。 在第一种情况下,毫秒是从当前时间开始计算的(例如,值为 将从延迟器接收消息的时间起延迟消息至少五秒钟)。 对于实例,消息在该对象表示的时间之前不会释放。 等于非正延迟或过去的日期的值不会导致延迟。 相反,它直接发送到原始发送方线程上的输出通道。 如果表达式计算结果不是 a 且不能解析为 ,则应用默认延迟(如果有 — 默认为 )。ObjecttoString()Longjava.util.Date5000DateDateDateLong0
表达式计算可能会出于各种原因(包括无效表达式或其他条件)引发评估异常。 默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会回退到默认延迟(如果有)。 您可以通过设置属性来修改此行为。 默认情况下,此属性设置为,并且 delayer 行为如前所述。 但是,如果您不希望忽略表达式计算异常并将其抛给分层程序的调用方,请将该属性设置为 。ignore-expression-failurestrueignore-expression-failuresfalse

在前面的示例中,delay 表达式指定为 。 这是访问元素( implements )的 SpEL 语法。 它调用:. 对于简单的地图元素名称(不包含 '.'),还可以使用 SpEL“dot accessor”语法,其中前面显示的标题表达式可以指定为 。 但是,如果缺少标头,则会获得不同的结果。 在第一种情况下,表达式的计算结果为 。 第二个结果类似于以下内容:headers['delay']IndexerMapMessageHeadersMapheaders.get("delay")headers.delaynullSpring中文文档

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果有可能省略标头,并且您想要回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且建议这样做),因为检测 null 比捕获异常更快。Spring中文文档

delayer 委托给 Spring 抽象的实例。 delayer 使用的默认调度程序是 Spring Integration 在启动时提供的实例。 请参阅配置任务计划程序。 如果要委托给其他调度程序,可以通过 delayer 元素的“scheduler”属性提供引用,如以下示例所示:TaskSchedulerThreadPoolTaskSchedulerSpring中文文档

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 ,则可以设置此属性。 它允许在应用程序关闭时成功完成已处于执行状态(释放消息)的“延迟”任务。 在 Spring Integration 2.2 之前,此属性在元素上可用,因为可以在后台创建自己的调度程序。 从 2.2 开始,该分层程序需要外部调度程序实例,因此已被删除。 您应该使用调度程序自己的配置。ThreadPoolTaskSchedulerwaitForTasksToCompleteOnShutdown = true<delayer>DelayHandlerwaitForTasksToCompleteOnShutdown
ThreadPoolTaskScheduler有一个属性,可以注入一些实现。 此处理程序允许从发送延迟消息的计划任务的线程中处理。 默认情况下,它使用 ,您可以在日志中看到堆栈跟踪。 您可能需要考虑使用 ,它将 发送到 ,无论是从失败消息的标头还是默认的 。 此错误处理在事务回滚(如果存在)后执行。 请参阅发布失败errorHandlerorg.springframework.util.ErrorHandlerExceptionorg.springframework.scheduling.support.TaskUtils$LoggingErrorHandlerorg.springframework.integration.channel.MessagePublishingErrorHandlerErrorMessageerror-channelerror-channel
XML 解析器使用的消息组 ID。<beanName>.messageGroupId
延迟处理程序支持表示以毫秒为单位的间隔的表达式计算结果(任何其方法生成可解析为 的值)以及表示绝对时间的实例。 在第一种情况下,毫秒是从当前时间开始计算的(例如,值为 将从延迟器接收消息的时间起延迟消息至少五秒钟)。 对于实例,消息在该对象表示的时间之前不会释放。 等于非正延迟或过去的日期的值不会导致延迟。 相反,它直接发送到原始发送方线程上的输出通道。 如果表达式计算结果不是 a 且不能解析为 ,则应用默认延迟(如果有 — 默认为 )。ObjecttoString()Longjava.util.Date5000DateDateDateLong0
表达式计算可能会出于各种原因(包括无效表达式或其他条件)引发评估异常。 默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会回退到默认延迟(如果有)。 您可以通过设置属性来修改此行为。 默认情况下,此属性设置为,并且 delayer 行为如前所述。 但是,如果您不希望忽略表达式计算异常并将其抛给分层程序的调用方,请将该属性设置为 。ignore-expression-failurestrueignore-expression-failuresfalse

在前面的示例中,delay 表达式指定为 。 这是访问元素( implements )的 SpEL 语法。 它调用:. 对于简单的地图元素名称(不包含 '.'),还可以使用 SpEL“dot accessor”语法,其中前面显示的标题表达式可以指定为 。 但是,如果缺少标头,则会获得不同的结果。 在第一种情况下,表达式的计算结果为 。 第二个结果类似于以下内容:headers['delay']IndexerMapMessageHeadersMapheaders.get("delay")headers.delaynullSpring中文文档

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果有可能省略标头,并且您想要回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且建议这样做),因为检测 null 比捕获异常更快。Spring中文文档

如果配置外部 ,则可以设置此属性。 它允许在应用程序关闭时成功完成已处于执行状态(释放消息)的“延迟”任务。 在 Spring Integration 2.2 之前,此属性在元素上可用,因为可以在后台创建自己的调度程序。 从 2.2 开始,该分层程序需要外部调度程序实例,因此已被删除。 您应该使用调度程序自己的配置。ThreadPoolTaskSchedulerwaitForTasksToCompleteOnShutdown = true<delayer>DelayHandlerwaitForTasksToCompleteOnShutdown
ThreadPoolTaskScheduler有一个属性,可以注入一些实现。 此处理程序允许从发送延迟消息的计划任务的线程中处理。 默认情况下,它使用 ,您可以在日志中看到堆栈跟踪。 您可能需要考虑使用 ,它将 发送到 ,无论是从失败消息的标头还是默认的 。 此错误处理在事务回滚(如果存在)后执行。 请参阅发布失败errorHandlerorg.springframework.util.ErrorHandlerExceptionorg.springframework.scheduling.support.TaskUtils$LoggingErrorHandlerorg.springframework.integration.channel.MessagePublishingErrorHandlerErrorMessageerror-channelerror-channel

删除程序和消息存储

将延迟的消息持久化到提供的消息组中。 (“groupId”基于元素所需的“id”属性。 另请参见 。 在将消息发送到 之前,计划任务会从 中删除延迟的消息。 如果提供的是持久的(例如 ),则它提供了在应用程序关闭时不会丢失消息的功能。 应用程序启动后,从其消息组中读取消息,并根据消息的原始到达时间(如果延迟为数字)以延迟重新安排消息。 对于延迟标头为 的邮件,在重新调度时使用。 如果延迟消息的剩余时间超过其“延迟”,则在启动后立即发送。 是必需的,不能依赖于可以生成的 Bean 名称。 这样,在应用程序重新启动后,可能会获得一个新生成的 Bean 名称。 因此,延迟的消息可能会因重新计划而丢失,因为其组不再由应用程序管理。DelayHandlerMessageStore<delayer>DelayHandler.setMessageGroupId(String)MessageStoreDelayHandleroutput-channelMessageStoreJdbcMessageStoreDelayHandlerMessageStoreDateDateMessageStoremessageGroupIdDelayHandlerDelayHandlerSpring中文文档

可以使用两个相互排斥的元素之一来丰富:和 。 这些 AOP 建议适用于代理内部,该内部负责在延迟后在计划任务上发布消息。 例如,当下游消息流引发异常并且 的事务回滚时,可能会使用它。 在这种情况下,延迟的消息将保留在持久性 . 您可以在 中使用 . 该元素定义了一个简单的建议链,该链仅包含事务性建议。 以下示例显示了 within :<delayer><transactional><advice-chain>ListDelayHandler.ReleaseMessageHandlerThreadReleaseMessageHandlerMessageStoreorg.aopalliance.aop.Advice<advice-chain><transactional>advice-chain<delayer>Spring中文文档

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

可以导出为具有托管操作 ( 和 ) 的 JMX,它允许在运行时重新调度延迟的持久化消息 — 例如,如果 之前已停止。 这些操作可以通过命令调用,如以下示例所示:DelayHandlerMBeangetDelayedMessageCountreschedulePersistedMessagesTaskSchedulerControl BusSpring中文文档

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储库、JMX 和控制总线的更多信息,请参阅系统管理

从版本 5.3.7 开始,如果在将消息存储到 时事务处于活动状态,则发布任务将在回调中调度。 这是防止争用条件所必需的,在这种情况下,计划的发布可能会在事务提交之前运行,并且找不到消息。 在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。MessageStoreTransactionSynchronization.afterCommit()Spring中文文档

有关消息存储库、JMX 和控制总线的更多信息,请参阅系统管理

发布失败

从版本 5.0.8 开始,分层器上有两个新属性:Spring中文文档

发布消息时,如果下游流失败,则会在 . 如果达到 ,则丢弃该消息(除非发布是事务性的,在这种情况下,消息将保留在存储中,但不再计划发布,直到重新启动应用程序或调用该方法,如上所述)。retryDelaymaxAttemptsreschedulePersistedMessages()Spring中文文档

此外,您还可以配置一个 ;当发布失败时,将向该通道发送一个,但作为有效负载的例外,并具有该属性。 包含包含当前计数的标头。delayedMessageErrorChannelErrorMessageoriginalMessageErrorMessageIntegrationMessageHeaderAccessor.DELIVERY_ATTEMPTSpring中文文档

如果错误流使用错误消息并正常退出,则不会执行进一步操作;如果发布是事务性的,则事务将提交,并且从存储中删除消息。 如果错误流引发异常,则将重试发布,直至如上所述。maxAttemptsSpring中文文档