此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
这描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以
同步消费消息。重载方法提供了这个
功能性。在同步接收期间,调用线程会阻塞,直到出现一条消息
变为可用。这可能是一个危险的操作,因为调用线程可以
可能会无限期阻止。该属性指定多长时间
接收方应该在放弃等待消息之前等待。receive(..)
receiveTimeout
异步接收:消息驱动的 POJO
Spring 还通过使用 Comments 来支持 annotated-listener 端点,并提供了一个开放的基础设施来以编程方式注册端点。
到目前为止,这是设置 asynchronous receiver 最方便的方法。
有关更多详细信息,请参阅启用侦听器终端节点注释。@JmsListener |
与EJB世界中的消息驱动Bean (MDB) 类似,消息驱动的
POJO (MDP) 充当 JMS 消息的接收方。MDP 上的一个限制(但请参阅使用 MessageListenerAdapter
)是它必须实现
界面。请注意,如果您的 POJO 收到消息
在多个线程上,确保您的实现是线程安全的非常重要。jakarta.jms.MessageListener
以下示例显示了 MDP 的简单实现:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
实现 后,就可以创建消息侦听器
容器。MessageListener
以下示例说明如何定义和配置其中一个消息侦听器
随 Spring 一起提供的容器(在本例中为 ):DefaultMessageListenerContainer
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息侦听器容器(所有这些容器都实现了MessageListenerContainer)的 Spring javadoc ,了解每种实施所支持的功能的完整描述。
Spring 还通过使用 Comments 来支持 annotated-listener 端点,并提供了一个开放的基础设施来以编程方式注册端点。
到目前为止,这是设置 asynchronous receiver 最方便的方法。
有关更多详细信息,请参阅启用侦听器终端节点注释。@JmsListener |
使用界面SessionAwareMessageListener
该接口是特定于 Spring 的接口,它提供
与 JMS 接口类似的契约,但也提供了消息处理
方法访问从中接收 的 JMS。
下面的清单显示了接口的定义:SessionAwareMessageListener
MessageListener
Session
Message
SessionAwareMessageListener
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让您的 MDP 实现此接口(优先于标准
JMS 接口),如果您希望 MDP 能够响应任何
received messages (通过使用方法中提供的 )。Spring 附带的所有消息侦听器容器实现
支持实现 OR 接口的 MDP。实现 the 的类需要注意的是,它们随后会绑定到 Spring
通过界面。是否使用它的选择完全取决于您
作为应用程序开发人员或架构师。MessageListener
Session
onMessage(Message, Session)
MessageListener
SessionAwareMessageListener
SessionAwareMessageListener
请注意,接口的方法会引发 .与标准 JMS 接口相比,使用该接口时,它是
客户端代码负责处理任何引发的异常。onMessage(..)
SessionAwareMessageListener
JMSException
MessageListener
SessionAwareMessageListener
用MessageListenerAdapter
该类是 Spring 的 asynchronous 中的最后一个组件
消息支持。简而言之,它允许您将几乎任何类公开为 MDP
(尽管有一些限制)。MessageListenerAdapter
请考虑以下接口定义:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,尽管该接口既不扩展 the 也不扩展 interface,但您仍然可以通过使用类将其用作 MDP。还要注意各种消息处理方法的
根据他们可以的各种类型的内容进行强类型
接收和处理。MessageListener
SessionAwareMessageListener
MessageListenerAdapter
Message
现在考虑接口的以下实现:MessageDelegate
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别要注意的是,前面的接口实现(类)根本没有 JMS 依赖项。它确实是一个
POJO 中,我们可以通过以下配置将其制作成 MDP:MessageDelegate
DefaultMessageDelegate
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 消息的 MDP。请注意消息处理方法的实际调用方式(消息处理方法的名称默认为 ),但它是可配置的(如本节后面所示)。通知
以及如何对方法进行强类型化以仅接收和响应 JMS 消息。
下面的清单显示了接口的定义:TextMessage
receive
MessageListenerAdapter
handleMessage
receive(..)
TextMessage
TextMessageDelegate
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
下面的清单显示了实现该接口的类:TextMessageDelegate
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
然后,attendant 的配置将如下所示:MessageListenerAdapter
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果 接收到类型为
除了 之外,还会抛出 an (然后
吞下)。该类的另一个功能是
能够在处理程序方法返回
non-void 值。请考虑以下接口和类:messageListener
Message
TextMessage
IllegalStateException
MessageListenerAdapter
Message
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果将 与 结合使用,则从执行
该方法(在默认配置中)转换为 .然后,结果将发送到 (如果
一个存在)定义在原始 API 的 JMS 属性中定义,或者
default set on (如果已配置)。
如果未找到,则抛出
(请注意,此异常不会被吞噬,而是会向上传播
call 堆栈)。DefaultResponsiveTextMessageDelegate
MessageListenerAdapter
'receive(..)'
TextMessage
TextMessage
Destination
Reply-To
Message
Destination
MessageListenerAdapter
Destination
InvalidDestinationException
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 listener 容器。
您可以通过标志激活本地资源事务
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动的 JMS 事务中,如果侦听器,则回滚消息接收
执行失败。发送响应消息(通过 )是
部分,但任何其他资源操作(例如
数据库访问)独立运行。这通常需要重复的消息
detection 来覆盖数据库处理
已提交,但消息处理无法提交。sessionTransacted
SessionAwareMessageListener
考虑以下 bean 定义:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置
事务管理器,并使用支持外部管理的侦听器容器
transactions (通常为 )。DefaultMessageListenerContainer
要为 XA 事务参与配置消息侦听器容器,您需要
配置一个(默认情况下,委托给 Jakarta EE
服务器的事务子系统)。请注意,底层 JMS 需要
支持 XA 并已正确注册到 JTA 事务协调器。(检查您的
Jakarta EE 服务器的 JNDI 资源的配置。这也允许消息接收
因为(例如)数据库访问是同一事务的一部分(使用统一提交
语义,但代价是 XA 事务日志开销)。JtaTransactionManager
ConnectionFactory
以下 bean 定义创建一个事务管理器:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器 负责其余的事情。以下示例显示了如何执行此操作:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>