此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13spring-doc.cn

此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13spring-doc.cn

这描述了如何在 Spring 中使用 JMS 接收消息。spring-doc.cn

同步接收

虽然 JMS 通常与异步处理相关联,但您可以 同步消费消息。重载方法提供了这个 功能性。在同步接收期间,调用线程会阻塞,直到出现一条消息 变为可用。这可能是一个危险的操作,因为调用线程可以 可能会无限期阻止。该属性指定多长时间 接收方应该在放弃等待消息之前等待。receive(..)receiveTimeoutspring-doc.cn

异步接收:消息驱动的 POJO

Spring 还通过使用 Comments 来支持 annotated-listener 端点,并提供了一个开放的基础设施来以编程方式注册端点。 到目前为止,这是设置 asynchronous receiver 最方便的方法。 有关更多详细信息,请参阅启用侦听器终端节点注释@JmsListener

与EJB世界中的消息驱动Bean (MDB) 类似,消息驱动的 POJO (MDP) 充当 JMS 消息的接收方。MDP 上的一个限制(但请参阅使用 MessageListenerAdapter)是它必须实现 界面。请注意,如果您的 POJO 收到消息 在多个线程上,确保您的实现是线程安全的非常重要。jakarta.jms.MessageListenerspring-doc.cn

以下示例显示了 MDP 的简单实现:spring-doc.cn

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

实现 后,就可以创建消息侦听器 容器。MessageListenerspring-doc.cn

以下示例说明如何定义和配置其中一个消息侦听器 随 Spring 一起提供的容器(在本例中为 ):DefaultMessageListenerContainerspring-doc.cn

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

请参阅各种消息侦听器容器(所有这些容器都实现了MessageListenerContainer)的 Spring javadoc ,了解每种实施所支持的功能的完整描述。spring-doc.cn

Spring 还通过使用 Comments 来支持 annotated-listener 端点,并提供了一个开放的基础设施来以编程方式注册端点。 到目前为止,这是设置 asynchronous receiver 最方便的方法。 有关更多详细信息,请参阅启用侦听器终端节点注释@JmsListener

使用界面SessionAwareMessageListener

该接口是特定于 Spring 的接口,它提供 与 JMS 接口类似的契约,但也提供了消息处理 方法访问从中接收 的 JMS。 下面的清单显示了接口的定义:SessionAwareMessageListenerMessageListenerSessionMessageSessionAwareMessageListenerspring-doc.cn

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

您可以选择让您的 MDP 实现此接口(优先于标准 JMS 接口),如果您希望 MDP 能够响应任何 received messages (通过使用方法中提供的 )。Spring 附带的所有消息侦听器容器实现 支持实现 OR 接口的 MDP。实现 the 的类需要注意的是,它们随后会绑定到 Spring 通过界面。是否使用它的选择完全取决于您 作为应用程序开发人员或架构师。MessageListenerSessiononMessage(Message, Session)MessageListenerSessionAwareMessageListenerSessionAwareMessageListenerspring-doc.cn

请注意,接口的方法会引发 .与标准 JMS 接口相比,使用该接口时,它是 客户端代码负责处理任何引发的异常。onMessage(..)SessionAwareMessageListenerJMSExceptionMessageListenerSessionAwareMessageListenerspring-doc.cn

MessageListenerAdapter

该类是 Spring 的 asynchronous 中的最后一个组件 消息支持。简而言之,它允许您将几乎任何类公开为 MDP (尽管有一些限制)。MessageListenerAdapterspring-doc.cn

请考虑以下接口定义:spring-doc.cn

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

请注意,尽管该接口既不扩展 the 也不扩展 interface,但您仍然可以通过使用类将其用作 MDP。还要注意各种消息处理方法的 根据他们可以的各种类型的内容进行强类型 接收和处理。MessageListenerSessionAwareMessageListenerMessageListenerAdapterMessagespring-doc.cn

现在考虑接口的以下实现:MessageDelegatespring-doc.cn

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特别要注意的是,前面的接口实现(类)根本没有 JMS 依赖项。它确实是一个 POJO 中,我们可以通过以下配置将其制作成 MDP:MessageDelegateDefaultMessageDelegatespring-doc.cn

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一个示例显示了另一个只能处理接收 JMS 消息的 MDP。请注意消息处理方法的实际调用方式(消息处理方法的名称默认为 ),但它是可配置的(如本节后面所示)。通知 以及如何对方法进行强类型化以仅接收和响应 JMS 消息。 下面的清单显示了接口的定义:TextMessagereceiveMessageListenerAdapterhandleMessagereceive(..)TextMessageTextMessageDelegatespring-doc.cn

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

下面的清单显示了实现该接口的类:TextMessageDelegatespring-doc.cn

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

然后,attendant 的配置将如下所示:MessageListenerAdapterspring-doc.cn

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果 接收到类型为 除了 之外,还会抛出 an (然后 吞下)。该类的另一个功能是 能够在处理程序方法返回 non-void 值。请考虑以下接口和类:messageListenerMessageTextMessageIllegalStateExceptionMessageListenerAdapterMessagespring-doc.cn

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果将 与 结合使用,则从执行 该方法(在默认配置中)转换为 .然后,结果将发送到 (如果 一个存在)定义在原始 API 的 JMS 属性中定义,或者 default set on (如果已配置)。 如果未找到,则抛出 (请注意,此异常不会被吞噬,而是会向上传播 call 堆栈)。DefaultResponsiveTextMessageDelegateMessageListenerAdapter'receive(..)'TextMessageTextMessageDestinationReply-ToMessageDestinationMessageListenerAdapterDestinationInvalidDestinationExceptionspring-doc.cn

处理事务中的消息

在事务中调用消息侦听器只需要重新配置 listener 容器。spring-doc.cn

您可以通过标志激活本地资源事务 在侦听器容器定义上。然后,每个消息侦听器调用都会运行 在活动的 JMS 事务中,如果侦听器,则回滚消息接收 执行失败。发送响应消息(通过 )是 部分,但任何其他资源操作(例如 数据库访问)独立运行。这通常需要重复的消息 detection 来覆盖数据库处理 已提交,但消息处理无法提交。sessionTransactedSessionAwareMessageListenerspring-doc.cn

考虑以下 bean 定义:spring-doc.cn

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理的事务,您需要配置 事务管理器,并使用支持外部管理的侦听器容器 transactions (通常为 )。DefaultMessageListenerContainerspring-doc.cn

要为 XA 事务参与配置消息侦听器容器,您需要 配置一个(默认情况下,委托给 Jakarta EE 服务器的事务子系统)。请注意,底层 JMS 需要 支持 XA 并已正确注册到 JTA 事务协调器。(检查您的 Jakarta EE 服务器的 JNDI 资源的配置。这也允许消息接收 因为(例如)数据库访问是同一事务的一部分(使用统一提交 语义,但代价是 XA 事务日志开销)。JtaTransactionManagerConnectionFactoryspring-doc.cn

以下 bean 定义创建一个事务管理器:spring-doc.cn

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后我们需要将其添加到我们之前的容器配置中。容器 负责其余的事情。以下示例显示了如何执行此操作:spring-doc.cn

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>