Spring AMQP 示例项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,它演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,用于演示实际应用程序中常见的交互类型。 在本章中,我们将快速浏览每个示例,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何可识别 Maven 的 IDE(例如 SpringSource 工具套件)中。Spring中文文档

“Hello World”示例

“Hello World”示例演示了同步和异步消息接收。 您可以将示例导入到 IDE 中,然后按照下面的讨论进行操作。spring-rabbit-helloworldSpring中文文档

同步示例

在目录中,导航到包。 打开类,注意它包含类级别的注释,并注意方法级别的一些注释。 这是 Spring 基于 Java 的配置的一个示例。 您可以在此处阅读有关此内容的更多信息。src/main/javaorg.springframework.amqp.helloworldHelloWorldConfiguration@Configuration@BeanSpring中文文档

以下列表显示了如何创建连接工厂:Spring中文文档

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

该配置还包含一个 的实例,默认情况下,该实例查找 exchange、queue 或 binding 类型的任何 bean,然后在代理上声明它们。 实际上,生成的 Bean 就是一个示例,因为它是 的实例。RabbitAdminhelloWorldQueueHelloWorldConfigurationQueueSpring中文文档

以下列表显示了 Bean 定义:helloWorldQueueSpring中文文档

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顾 Bean 配置,您可以看到它的名称 set 作为其属性(用于接收消息)和其属性(用于发送消息)。rabbitTemplatehelloWorldQueuequeueroutingKeySpring中文文档

现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。 首先,从同一包中打开类。 它包含创建 Spring 的方法。Producermain()ApplicationContextSpring中文文档

下面的列表显示了该方法:mainSpring中文文档

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在前面的示例中,将检索 Bean 并用于发送 . 由于客户端代码应尽可能依赖于接口,因此类型为 而不是 。 即使创建的 Bean 是 的实例,依赖接口也意味着此代码更具可移植性(您可以独立于代码更改配置)。 由于该方法被调用,因此模板将委托给其实例。 在本例中,它使用默认值 ,但可以向 Bean 提供不同的实现,如 中所定义。AmqpTemplateMessageAmqpTemplateRabbitTemplateHelloWorldConfigurationRabbitTemplateconvertAndSend()MessageConverterSimpleMessageConverterrabbitTemplateHelloWorldConfigurationSpring中文文档

现在打开课程。 它实际上共享相同的配置基类,这意味着它共享 bean。 这就是为什么我们同时使用 a(用于发送)和 a(用于接收)配置该模板的原因。 正如我们在 AmqpTemplate 中所述,您可以将“routingKey”参数传递给 send 方法,将“queue”参数传递给接收方法。 代码基本上是 Producer 的镜像,调用而不是 .ConsumerrabbitTemplateroutingKeyqueueConsumerreceiveAndConvert()convertAndSend()Spring中文文档

以下列表显示了 :ConsumerSpring中文文档

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果运行 ,然后运行 ,您应该会在控制台输出中看到。ProducerConsumerReceived: Hello WorldSpring中文文档

异步示例

同步示例演练了同步 Hello World 示例。 本节介绍一个稍微高级但功能更强大的选项。 通过一些修改,Hello World 示例可以提供异步接收(也称为消息驱动的 POJO)的示例。 事实上,有一个子包正好提供了这一点:.org.springframework.amqp.samples.helloworld.asyncSpring中文文档

同样,我们从发送方开始。 打开该类,并注意它创建了一个 bean。 这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且只有“routingKey”属性集。 回想一下,消息是发送到交换的,而不是直接发送到队列。 AMQP 默认交换是没有名称的直接交换。 所有队列都绑定到该默认交换,其名称为路由密钥。 这就是为什么我们只需要在这里提供路由密钥。ProducerConfigurationconnectionFactoryrabbitTemplateRabbitTemplateSpring中文文档

以下列表显示了定义:rabbitTemplateSpring中文文档

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本那样的每次执行消息的模型,那么它实际上并不是消息驱动的使用者)。 负责连续发送消息的组件被定义为 中的内部类。 它配置为每三秒运行一次。ProducerConfigurationSpring中文文档

以下列表显示了该组件:Spring中文文档

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

您不需要了解所有细节,因为真正的重点应该在接收端(我们将在下面介绍)。 但是,如果您还不熟悉 Spring 任务调度支持,可以在此处了解更多信息。 简而言之,Bean 中的 Bean 使用调度程序注册任务。postProcessorProducerConfigurationSpring中文文档

现在我们可以转向接收方。 为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。 该类被调用,并显示在以下列表中:HelloWorldHandlerSpring中文文档

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

该类是 POJO。 它不扩展任何基类,不实现任何接口,甚至不包含任何导入。 它正在被Spring AMQP“适应”到界面。 然后,您可以在 . 对于此示例,容器是在类中创建的。 您可以在那里看到包裹在适配器中的 POJO。MessageListenerMessageListenerAdapterSimpleMessageListenerContainerConsumerConfigurationSpring中文文档

以下列表显示了如何定义:listenerContainerSpring中文文档

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

这是一个 Spring 生命周期组件,默认情况下会自动启动。 如果你看一下这个类,你可以看到它的方法只包含一个单行引导程序来创建 . Producer 的方法也是一个单行引导程序,因为其方法注释的组件也会自动启动。 您可以按任意顺序启动 and,并且应该看到每三秒钟发送和接收一次消息。SimpleMessageListenerContainerConsumermain()ApplicationContextmain()@ScheduledProducerConsumerSpring中文文档

股票交易

股票交易示例演示了比 Hello World 示例更高级的消息传递方案。 但是,配置非常相似,如果涉及更多的话。 由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。 有一个服务器将市场数据(股票报价)推送到主题交易所。 然后,客户端可以通过绑定具有路由模式的队列来订阅市场数据馈送(例如,)。 此演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。 这涉及客户端在订单请求消息本身中发送的专用队列。app.stock.quotes.nasdaq.*replyToSpring中文文档

服务器的核心配置位于包中的类中。 它扩展了 . 这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。 在该通用配置文件中,您还会看到 a 配置在 .RabbitServerConfigurationorg.springframework.amqp.rabbit.stocks.config.serverAbstractStockAppRabbitConfigurationJackson2JsonMessageConverterRabbitTemplateSpring中文文档

特定于服务器的配置由两个部分组成。 首先,它配置了市场数据交换,这样它就不需要在每次调用时提供该交换名称来发送 . 它在基配置类中定义的抽象回调方法中执行此操作。 以下列表显示了该方法:RabbitTemplateMessageSpring中文文档

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明库存请求队列。 在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换,其名称作为路由键。 如前所述,AMQP 规范定义了该行为。 以下列表显示了 Bean 的定义:stockRequestQueueSpring中文文档

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

现在,您已经看到了服务器的 AMQP 资源的配置,请导航到目录下的包。 在那里,您可以看到提供方法的实际类。 它根据配置文件创建一个 base。 在那里,您可以看到发布虚拟市场数据的计划任务。 该配置依赖于 Spring 的命名空间支持。 引导配置文件还会导入一些其他文件。 最有趣的是 ,它位于 的正下方。 在那里,您可以看到负责处理股票交易请求的 bean。 最后,看一下 bean 中定义的 bean(也在 'src/main/resources' 中)。 该 Bean 是该类的一个实例,并且是消息驱动的 POJO 的一个很好的示例,它也可以发送回复消息。 请注意,它本身并不与框架或任何 AMQP 概念耦合。 它接受 a 并返回 a 。 下面的列表显示了该方法的定义:org.springframework.amqp.rabbit.stockssrc/test/javaServermain()ApplicationContextserver-bootstrap.xmltaskserver-messaging.xmlsrc/main/resourcesmessageListenerContainerserverHandlerserver-handlers.xmlServerHandlerTradeRequestTradeResponsehandleMessageSpring中文文档

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。 最好的起点可能是 ,在包中。 请注意,它声明了两个队列,而没有提供显式名称。 以下列表显示了两个队列的 Bean 定义:RabbitClientConfigurationorg.springframework.amqp.rabbit.stocks.config.clientSpring中文文档

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

这些是专用队列,并且会自动生成唯一名称。 客户端使用第一个生成的队列绑定到服务器公开的市场数据交换。 回想一下,在 AMQP 中,消费者与队列交互,而生产者与交易所交互。 队列与交换的“绑定”是告诉代理将消息从给定交换传递(或路由)到队列。 由于市场数据交换是主题交换,因此绑定可以用路由模式表示。 使用对象执行此操作,并且该对象是使用 Fluent API 生成的。 以下列表显示了:RabbitClientConfigurationBindingBindingBuilderBindingSpring中文文档

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

请注意,实际值已在属性文件(下)中外部化,并且我们使用 Spring 的注解来注入该值。 这通常是个好主意。 否则,该值将在类中被硬编码,并且如果不重新编译就无法修改。 在这种情况下,在更改用于绑定的路由模式时,运行多个版本的客户端要容易得多。 我们现在可以尝试一下。client.propertiessrc/main/resources@ValueSpring中文文档

从运行开始,然后 . 您应该会看到股票的虚拟报价,因为与 client.properties 中的“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq”。'。 现在,在保持现有服务器客户端运行的同时,将该属性值更改为“app.stock.quotes.nyse”。并启动第二个实例。 您应该看到,第一个客户仍然收到纳斯达克报价,而第二个客户收到纽约证券交易所报价。 相反,您可以更改模式以获取所有股票甚至单个股票代码。org.springframework.amqp.rabbit.stocks.Serverorg.springframework.amqp.rabbit.stocks.ClientNASDAQClientSpring中文文档

我们探索的最后一个功能是从客户端的角度进行请求-回复交互。 回想一下,我们已经看到了接受对象并返回对象。 侧面的相应代码在包装中。 它委托给 以便发送消息。 下面的列表显示了该方法:ServerHandlerTradeRequestTradeResponseClientRabbitStockServiceGatewayorg.springframework.amqp.rabbit.stocks.gatewayRabbitTemplatesendSpring中文文档

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

请注意,在发送消息之前,它会设置地址。 它提供由 Bean 定义生成的队列(如上所示)。 下面的列表显示了类本身的定义:replyTotraderJoeQueue@BeanStockServiceGatewaySpring中文文档

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟之后,您应该会看到客户端上出现一条确认消息。Spring中文文档

从非 Spring 应用程序接收 JSON

Spring 应用程序在发送 JSON 时,将标头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。TypeIdSpring中文文档

此示例探讨了从非 Spring 应用程序转换 JSON 的几种技术。spring-rabbit-jsonSpring中文文档