这描述了如何在 Spring 中使用 JMS 接收消息。

同步接收

虽然 JMS 通常与异步处理相关联,但您可以 同步使用消息。重载方法提供了这一点 功能性。在同步接收期间,调用线程将阻塞,直到消息 变为可用。这可能是一个危险的操作,因为调用线程可以 可能会被无限期阻止。该属性指定多长时间 接收者应该在放弃等待消息之前等待。receive(..)receiveTimeout

异步接收:消息驱动的 POJO

Spring 还通过使用注解来支持注解侦听器端点,并提供以编程方式注册端点的开放基础设施。 到目前为止,这是设置异步接收器的最便捷方法。 有关详细信息,请参阅启用侦听器终结点注释@JmsListener

在 EJB 世界中,以类似于消息驱动 Bean (MDB) 的方式,消息驱动 POJO (MDP) 充当 JMS 消息的接收方。MDP 上的一个限制(但请参阅使用 MessageListenerAdapter)是它必须实现 接口。请注意,如果您的 POJO 收到消息 在多个线程上,请务必确保实现是线程安全的。jakarta.jms.MessageListener

下面的示例演示了 MDP 的简单实现:

import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}

一旦你实现了你的,就该创建一个消息侦听器了 容器。MessageListener

以下示例演示如何定义和配置其中一个消息侦听器 随 Spring 一起提供的容器(在本例中为 ):DefaultMessageListenerContainer

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

请参阅各种消息侦听器容器的 Spring javadoc(所有这些容器都实现了 MessageListenerContainer) 了解每个实现支持的功能的完整说明。

使用界面SessionAwareMessageListener

该接口是特定于 Spring 的接口,它提供 与 JMS 接口类似的协定,但也提供了消息处理 方法访问从中接收 的 JMS。 以下列表显示了接口的定义:SessionAwareMessageListenerMessageListenerSessionMessageSessionAwareMessageListener

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

您可以选择让 MDP 实现此接口(优先于标准 JMS 接口),如果您希望 MDP 能够响应任何 接收的消息(通过使用方法中提供的)。Spring 附带的所有消息侦听器容器实现 支持实现 or 接口的 MDP。实现 的类附带一个警告,即它们随后被绑定到 Spring 通过界面。是否使用它的选择完全由您决定 作为应用程序开发人员或架构师。MessageListenerSessiononMessage(Message, Session)MessageListenerSessionAwareMessageListenerSessionAwareMessageListener

请注意,接口的方法抛出 .与标准 JMS 接口相比,使用该接口时,它是 客户端代码处理任何引发的异常的责任。onMessage(..)SessionAwareMessageListenerJMSExceptionMessageListenerSessionAwareMessageListener

MessageListenerAdapter

该类是 Spring 异步的最后一个组件 消息支持。简而言之,它允许您将几乎任何类公开为 MDP (尽管有一些限制)。MessageListenerAdapter

请考虑以下接口定义:

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}

请注意,尽管接口既不扩展接口,也不扩展接口,但仍可以使用类将其用作 MDP。还要注意各种消息处理方法 根据各种类型的内容进行强类型化 接收和处理。MessageListenerSessionAwareMessageListenerMessageListenerAdapterMessage

现在考虑接口的以下实现:MessageDelegate

public class DefaultMessageDelegate implements MessageDelegate {
	// implementation elided for clarity...
}

特别要注意接口的前面实现(类)根本没有 JMS 依赖关系。这确实是一个 我们可以通过以下配置将其制作成 MDP 的 POJO:MessageDelegateDefaultMessageDelegate

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一个示例显示了另一个只能处理接收 JMS 消息的 MDP。请注意消息处理方法的实际调用方式(消息处理方法的名称默认为 ),但它是可配置的(如本节后面所示)。通知 以及如何将方法强类型化为仅接收和响应 JMS 消息。 以下列表显示了接口的定义:TextMessagereceiveMessageListenerAdapterhandleMessagereceive(..)TextMessageTextMessageDelegate

public interface TextMessageDelegate {

	void receive(TextMessage message);
}

下面的清单显示了实现该接口的类:TextMessageDelegate

public class DefaultTextMessageDelegate implements TextMessageDelegate {
	// implementation elided for clarity...
}

然后,服务员的配置如下:MessageListenerAdapter

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果接收到某种类型的 JMS 除了 ,an 被抛出(随后 吞下)。该类的另一个功能是 如果处理程序方法返回 非 void 值。请考虑以下接口和类:messageListenerMessageTextMessageIllegalStateExceptionMessageListenerAdapterMessage

public interface ResponsiveTextMessageDelegate {

	// notice the return type...
	String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
	// implementation elided for clarity...
}

如果将 与 结合使用 ,则从执行 该方法(在默认配置中)转换为 .然后将结果发送到 (如果 一个存在)在原始或 默认设置在(如果已配置)。 如果未找到,则抛出 (请注意,此异常不会被吞并,而是向上传播 调用堆栈)。DefaultResponsiveTextMessageDelegateMessageListenerAdapter'receive(..)'TextMessageTextMessageDestinationReply-ToMessageDestinationMessageListenerAdapterDestinationInvalidDestinationException

处理事务中的消息

在事务中调用消息侦听器只需要重新配置 侦听器容器。

您可以通过标志激活本地资源事务 在侦听器容器定义上。然后,每个消息侦听器调用都会运行 在活动 JMS 事务中,在侦听器的情况下回滚消息接收 执行失败。发送响应消息(通过)是 属于同一本地事务的一部分,但任何其他资源操作(例如 数据库访问)独立运行。这通常需要重复的消息 侦听器实现中的检测,以涵盖数据库处理的情况 已提交,但消息处理未能提交。sessionTransactedSessionAwareMessageListener

请考虑以下 bean 定义:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理事务,您需要配置 事务管理器,并使用支持外部托管的侦听器容器 事务(通常为 )。DefaultMessageListenerContainer

要配置消息侦听器容器以参与 XA 事务,您需要 配置一个(默认情况下,委托给 Jakarta EE 服务器的事务子系统)。请注意,底层 JMS 需要 具有 XA 功能,并在您的 JTA 事务协调器中正确注册。(检查你的 Jakarta EE 服务器的 JNDI 资源配置。这也允许消息接收 作为(例如)数据库访问是同一事务的一部分(具有统一提交 语义,以牺牲 XA 事务日志开销为代价)。JtaTransactionManagerConnectionFactory

以下 Bean 定义创建事务管理器:

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,我们需要将其添加到之前的容器配置中。容器 剩下的就交给你了。以下示例演示如何执行此操作:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 我们的交易经理。