Spring AMQP Samples 项目包括两个示例应用程序。 第一个是一个简单的 “Hello World” 示例,它演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,用于演示实际应用程序中常见的交互类型。 在本章中,我们提供了每个示例的快速演练,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知的 IDE(比如 SpringSource Tool Suite)中。
“Hello World” 示例
“Hello World” 示例演示了同步和异步消息接收。
您可以将样例导入到 IDE 中,然后按照下面的讨论进行操作。spring-rabbit-helloworld
同步示例
在目录中,导航到包。
打开该类,注意它包含类级别的注释,并注意方法级别的一些注释。
这是 Spring 基于 Java 的配置的一个例子。
您可以在此处阅读更多相关信息。src/main/java
org.springframework.amqp.helloworld
HelloWorldConfiguration
@Configuration
@Bean
下面的清单显示了如何创建 connection factory:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含一个实例,默认情况下,该实例查找 exchange、queue 或 binding 类型的任何 bean,然后在代理上声明它们。
实际上,生成的 bean 是一个示例,因为它是 的实例。RabbitAdmin
helloWorldQueue
HelloWorldConfiguration
Queue
下面的清单显示了 bean 定义:helloWorldQueue
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾一下 bean 配置,你可以看到它的属性(用于接收消息)和属性(用于发送消息)的名称为 set。rabbitTemplate
helloWorldQueue
queue
routingKey
现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。
首先,从同一个包中打开该类。
它包含一个创建 Spring 的方法。Producer
main()
ApplicationContext
下面的清单显示了该方法:main
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,检索 bean 并用于发送 .
由于客户端代码应尽可能依赖接口,因此类型为 而不是 .
即使 中创建的 bean 是 的实例,依赖该接口也意味着此代码更具可移植性(您可以独立于代码更改配置)。
由于调用了该方法,因此模板将委托给其实例。
在这种情况下,它使用默认的 ,但可以向 bean 提供不同的实现,如 中所定义。AmqpTemplate
Message
AmqpTemplate
RabbitTemplate
HelloWorldConfiguration
RabbitTemplate
convertAndSend()
MessageConverter
SimpleMessageConverter
rabbitTemplate
HelloWorldConfiguration
现在打开该类。
它实际上共享相同的配置基类,这意味着它共享 bean。
这就是为什么我们为该模板配置了 a (用于发送) 和 a (用于接收)。
正如我们在 AmqpTemplate
中描述的,你可以将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。
代码基本上是 Producer 的镜像,调用 而不是 .Consumer
rabbitTemplate
routingKey
queue
Consumer
receiveAndConvert()
convertAndSend()
下面的清单显示了 的主要方法:Consumer
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行 ,然后运行 ,您应该会在控制台输出中看到。Producer
Consumer
Received: Hello World
异步示例
同步示例演练了同步 Hello World 示例。
本节介绍一个稍微高级一些但功能更强大的选项。
通过一些修改,Hello World 示例可以提供异步接收(也称为消息驱动的 POJO)的示例。
事实上,有一个子包恰恰提供了:.org.springframework.amqp.samples.helloworld.async
同样,我们从发送方开始。
打开该类,请注意它会创建一个 and 一个 bean。
这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且只设置了 'routingKey' 属性。
回想一下,消息是发送到 Exchange,而不是直接发送到队列。
AMQP 默认交换是没有名称的直接交换。
所有队列都绑定到该 default 交换,并将其名称作为路由键。
这就是为什么我们只需要在此处提供路由密钥。ProducerConfiguration
connectionFactory
rabbitTemplate
RabbitTemplate
下面的清单显示了定义:rabbitTemplate
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产方设计为连续发送消息(如果它是像同步版本一样的每次执行消息模型,那么它实际上是消息驱动的使用者就不会那么明显)。
负责持续发送消息的组件定义为 .
它配置为每 3 秒运行一次。ProducerConfiguration
下面的清单显示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要了解所有细节,因为真正的重点应该是接收方(我们接下来将介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,可以在此处了解更多信息。
简单的例子是,中的 bean 向调度程序注册任务。postProcessor
ProducerConfiguration
现在我们可以转向接收方。
为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。
该类被调用,如下面的清单所示:HelloWorldHandler
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它被 Spring AMQP “适应”到接口。
然后,您可以在 .
对于此示例,容器是在类中创建的。
你可以在那里看到 POJO 包装在适配器中。MessageListener
MessageListenerAdapter
SimpleMessageListenerContainer
ConsumerConfiguration
下面的清单显示了如何定义:listenerContainer
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
它是一个 Spring 生命周期组件,默认情况下,它会自动启动。
如果您查看该类,则可以看到它的方法只包含一个用于创建 .
Producer 的方法也是一个单行引导程序,因为其方法被注释的组件也会自动启动。
您可以按任何顺序启动 and,您应该会看到每 3 秒发送和接收一次的消息。SimpleMessageListenerContainer
Consumer
main()
ApplicationContext
main()
@Scheduled
Producer
Consumer
股票交易
Stock Trading 示例演示了比 Hello World 示例更高级的消息传送方案。
但是,配置非常相似,只是稍微复杂一些。
由于我们详细介绍了 Hello World 配置,因此,我们在这里重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交易所。
然后,客户端可以通过将队列与路由模式(例如)绑定来订阅市场数据馈送。
此演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及客户端在订单请求消息本身内发送的私有队列。app.stock.quotes.nasdaq.*
replyTo
服务器的核心配置位于 package 内的 class 中。
它扩展了 .
这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还可以看到 a 在 .RabbitServerConfiguration
org.springframework.amqp.rabbit.stocks.config.server
AbstractStockAppRabbitConfiguration
Jackson2JsonMessageConverter
RabbitTemplate
特定于服务器的配置由两部分组成。
首先,它在 上配置市场数据交换,以便它不需要在每次调用时提供该交换名称来发送 .
它在 base configuration 类中定义的抽象回调方法中执行此操作。
下面的清单显示了该方法:RabbitTemplate
Message
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的 no-name 交换,并将自己的名称作为路由键。
如前所述,AMQP 规范定义了该行为。
下面的清单显示了 bean 的定义:stockRequestQueue
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在,您已经看到了服务器的 AMQP 资源的配置,请导航到目录下的包。
在那里,您可以看到提供方法的实际类。
它会根据配置文件创建一个 base.
在那里,您可以看到发布虚拟市场数据的计划任务。
该配置依赖于 Spring 的名称空间支持。
引导配置文件还导入了一些其他文件。
最有趣的是 ,它就在 下。
在那里,您可以看到负责处理股票交易请求的 bean。
最后,看看 中定义的 bean(也在 'src/main/resources' 中)。
该 bean 是该类的一个实例,并且是消息驱动的 POJO 的一个很好的示例,它也可以发送回复消息。
请注意,它本身并不与框架或任何 AMQP 概念耦合。
它接受 a 并返回 .
下面的清单显示了该方法的定义:org.springframework.amqp.rabbit.stocks
src/test/java
Server
main()
ApplicationContext
server-bootstrap.xml
task
server-messaging.xml
src/main/resources
messageListenerContainer
serverHandler
server-handlers.xml
ServerHandler
TradeRequest
TradeResponse
handleMessage
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点可能是 包中的 。
请注意,它声明了两个队列,但没有提供显式名称。
下面的清单显示了两个队列的 bean 定义:RabbitClientConfiguration
org.springframework.amqp.rabbit.stocks.config.client
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是私有队列,唯一名称是自动生成的。
客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。
回想一下,在 AMQP 中,使用者与队列交互,而生产者与交换交互。
队列与交易所的 “绑定” 是告诉 broker 将消息从给定交易所投递 (或路由) 到队列。
由于市场数据交换是主题交换,因此可以使用路由模式来表示绑定。
它对对象执行此操作,该对象是使用 Fluent API 生成的。
以下清单显示了 :RabbitClientConfiguration
Binding
BindingBuilder
Binding
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件(下)中外部化,并且我们使用 Spring 的 Comments 来注入该值。
这通常是一个好主意。
否则,该值将在类中硬编码,并且无需重新编译即可不可修改。
在这种情况下,在更改用于绑定的路由模式的同时运行多个版本的客户端要容易得多。
我们现在可以试试。client.properties
src/main/resources
@Value
首先运行 ,然后 。
您应该会看到股票的虚拟报价,因为与 client.properties 中的 'stocks.quote.pattern' 键关联的当前值是 'app.stock.quotes.nasdaq.'.
现在,在保持现有 Server
和 Client
运行的同时,将该属性值更改为 'app.stock.quotes.nyse'。并启动第二个实例。
您应该看到,第一个客户仍然接收纳斯达克报价,而第二个客户接收纽约证券交易所报价。
相反,您可以更改模式以获取所有股票甚至单个股票代码。org.springframework.amqp.rabbit.stocks.Server
org.springframework.amqp.rabbit.stocks.Client
NASDAQ
Client
我们探索的最后一个功能是从客户端的角度进行请求 - 回复交互。
回想一下,我们已经看到了 接受对象并返回对象的 。
侧面对应的代码在包里。
它委托给 为了发送消息。
下面的清单显示了该方法:ServerHandler
TradeRequest
TradeResponse
Client
RabbitStockServiceGateway
org.springframework.amqp.rabbit.stocks.gateway
RabbitTemplate
send
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会设置地址。
它提供由 Bean 定义生成的队列(如前所示)。
下面的清单显示了类本身的定义:replyTo
traderJoeQueue
@Bean
StockServiceGateway
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为 '100 TCKR' 的请求。 在模拟请求的 “处理” 的短暂人为延迟之后,您应该会看到客户端上出现一条确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,将 Header 设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。TypeId
该示例探讨了从非 Spring 应用程序转换 JSON 的几种技术。spring-rabbit-json