这描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以
同步使用消息。重载方法提供了这一点
功能性。在同步接收期间,调用线程将阻塞,直到消息
变为可用。这可能是一个危险的操作,因为调用线程可以
可能会被无限期阻止。该属性指定多长时间
接收者应该在放弃等待消息之前等待。receive(..)
receiveTimeout
异步接收:消息驱动的 POJO
Spring 还通过使用注解来支持注解侦听器端点,并提供以编程方式注册端点的开放基础设施。
到目前为止,这是设置异步接收器的最便捷方法。
有关详细信息,请参阅启用侦听器终结点注释。@JmsListener |
在 EJB 世界中,以类似于消息驱动 Bean (MDB) 的方式,消息驱动
POJO (MDP) 充当 JMS 消息的接收方。MDP 上的一个限制(但请参阅使用 MessageListenerAdapter
)是它必须实现
接口。请注意,如果您的 POJO 收到消息
在多个线程上,请务必确保实现是线程安全的。jakarta.jms.MessageListener
下面的示例演示了 MDP 的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
一旦你实现了你的,就该创建一个消息侦听器了
容器。MessageListener
以下示例演示如何定义和配置其中一个消息侦听器
随 Spring 一起提供的容器(在本例中为 ):DefaultMessageListenerContainer
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息侦听器容器的 Spring javadoc(所有这些容器都实现了 MessageListenerContainer) 了解每个实现支持的功能的完整说明。
使用界面SessionAwareMessageListener
该接口是特定于 Spring 的接口,它提供
与 JMS 接口类似的协定,但也提供了消息处理
方法访问从中接收 的 JMS。
以下列表显示了接口的定义:SessionAwareMessageListener
MessageListener
Session
Message
SessionAwareMessageListener
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让 MDP 实现此接口(优先于标准
JMS 接口),如果您希望 MDP 能够响应任何
接收的消息(通过使用方法中提供的)。Spring 附带的所有消息侦听器容器实现
支持实现 or 接口的 MDP。实现 的类附带一个警告,即它们随后被绑定到 Spring
通过界面。是否使用它的选择完全由您决定
作为应用程序开发人员或架构师。MessageListener
Session
onMessage(Message, Session)
MessageListener
SessionAwareMessageListener
SessionAwareMessageListener
请注意,接口的方法抛出 .与标准 JMS 接口相比,使用该接口时,它是
客户端代码处理任何引发的异常的责任。onMessage(..)
SessionAwareMessageListener
JMSException
MessageListener
SessionAwareMessageListener
用MessageListenerAdapter
该类是 Spring 异步的最后一个组件
消息支持。简而言之,它允许您将几乎任何类公开为 MDP
(尽管有一些限制)。MessageListenerAdapter
请考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,尽管接口既不扩展接口,也不扩展接口,但仍可以使用类将其用作 MDP。还要注意各种消息处理方法
根据各种类型的内容进行强类型化
接收和处理。MessageListener
SessionAwareMessageListener
MessageListenerAdapter
Message
现在考虑接口的以下实现:MessageDelegate
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别要注意接口的前面实现(类)根本没有 JMS 依赖关系。这确实是一个
我们可以通过以下配置将其制作成 MDP 的 POJO:MessageDelegate
DefaultMessageDelegate
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 消息的 MDP。请注意消息处理方法的实际调用方式(消息处理方法的名称默认为 ),但它是可配置的(如本节后面所示)。通知
以及如何将方法强类型化为仅接收和响应 JMS 消息。
以下列表显示了接口的定义:TextMessage
receive
MessageListenerAdapter
handleMessage
receive(..)
TextMessage
TextMessageDelegate
public interface TextMessageDelegate {
void receive(TextMessage message);
}
下面的清单显示了实现该接口的类:TextMessageDelegate
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
然后,服务员的配置如下:MessageListenerAdapter
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果接收到某种类型的 JMS
除了 ,an 被抛出(随后
吞下)。该类的另一个功能是
如果处理程序方法返回
非 void 值。请考虑以下接口和类:messageListener
Message
TextMessage
IllegalStateException
MessageListenerAdapter
Message
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果将 与 结合使用 ,则从执行
该方法(在默认配置中)转换为 .然后将结果发送到 (如果
一个存在)在原始或
默认设置在(如果已配置)。
如果未找到,则抛出
(请注意,此异常不会被吞并,而是向上传播
调用堆栈)。DefaultResponsiveTextMessageDelegate
MessageListenerAdapter
'receive(..)'
TextMessage
TextMessage
Destination
Reply-To
Message
Destination
MessageListenerAdapter
Destination
InvalidDestinationException
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 侦听器容器。
您可以通过标志激活本地资源事务
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动 JMS 事务中,在侦听器的情况下回滚消息接收
执行失败。发送响应消息(通过)是
属于同一本地事务的一部分,但任何其他资源操作(例如
数据库访问)独立运行。这通常需要重复的消息
侦听器实现中的检测,以涵盖数据库处理的情况
已提交,但消息处理未能提交。sessionTransacted
SessionAwareMessageListener
请考虑以下 bean 定义:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理事务,您需要配置
事务管理器,并使用支持外部托管的侦听器容器
事务(通常为 )。DefaultMessageListenerContainer
要配置消息侦听器容器以参与 XA 事务,您需要
配置一个(默认情况下,委托给 Jakarta EE
服务器的事务子系统)。请注意,底层 JMS 需要
具有 XA 功能,并在您的 JTA 事务协调器中正确注册。(检查你的
Jakarta EE 服务器的 JNDI 资源配置。这也允许消息接收
作为(例如)数据库访问是同一事务的一部分(具有统一提交
语义,以牺牲 XA 事务日志开销为代价)。JtaTransactionManager
ConnectionFactory
以下 Bean 定义创建事务管理器:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,我们需要将其添加到之前的容器配置中。容器 剩下的就交给你了。以下示例演示如何执行此操作:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 | 我们的交易经理。 |