This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.0! |
Sample Applications
The Spring AMQP Samples project includes two sample applications. The first is a simple “Hello World” example that demonstrates both synchronous and asynchronous message reception. It provides an excellent starting point for acquiring an understanding of the essential components. The second sample is based on a stock-trading use case to demonstrate the types of interaction that would be common in real world applications. In this chapter, we provide a quick walk-through of each sample so that you can focus on the most important components. The samples are both Maven-based, so you should be able to import them directly into any Maven-aware IDE (such as SpringSource Tool Suite).
The “Hello World” Sample
The “Hello World” sample demonstrates both synchronous and asynchronous message reception.
You can import the spring-rabbit-helloworld
sample into the IDE and then follow the discussion below.
Synchronous Example
Within the src/main/java
directory, navigate to the org.springframework.amqp.helloworld
package.
Open the HelloWorldConfiguration
class and notice that it contains the @Configuration
annotation at the class level and notice some @Bean
annotations at method-level.
This is an example of Spring’s Java-based configuration.
You can read more about that here.
The following listing shows how the connection factory is created:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
The configuration also contains an instance of RabbitAdmin
, which, by default, looks for any beans of type exchange, queue, or binding and then declares them on the broker.
In fact, the helloWorldQueue
bean that is generated in HelloWorldConfiguration
is an example because it is an instance of Queue
.
The following listing shows the helloWorldQueue
bean definition:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
Looking back at the rabbitTemplate
bean configuration, you can see that it has the name of helloWorldQueue
set as its queue
property (for receiving messages) and for its routingKey
property (for sending messages).
Now that we have explored the configuration, we can look at the code that actually uses these components.
First, open the Producer
class from within the same package.
It contains a main()
method where the Spring ApplicationContext
is created.
The following listing shows the main
method:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
In the preceding example, the AmqpTemplate
bean is retrieved and used for sending a Message
.
Since the client code should rely on interfaces whenever possible, the type is AmqpTemplate
rather than RabbitTemplate
.
Even though the bean created in HelloWorldConfiguration
is an instance of RabbitTemplate
, relying on the interface means that this code is more portable (you can change the configuration independently of the code).
Since the convertAndSend()
method is invoked, the template delegates to its MessageConverter
instance.
In this case, it uses the default SimpleMessageConverter
, but a different implementation could be provided to the rabbitTemplate
bean, as defined in HelloWorldConfiguration
.
Now open the Consumer
class.
It actually shares the same configuration base class, which means it shares the rabbitTemplate
bean.
That is why we configured that template with both a routingKey
(for sending) and a queue
(for receiving).
As we describe in AmqpTemplate
, you could instead pass the 'routingKey' argument to the send method and the 'queue' argument to the receive method.
The Consumer
code is basically a mirror image of the Producer, calling receiveAndConvert()
rather than convertAndSend()
.
The following listing shows the main method for the Consumer
:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
If you run the Producer
and then run the Consumer
, you should see Received: Hello World
in the console output.
Asynchronous Example
Synchronous Example walked through the synchronous Hello World sample.
This section describes a slightly more advanced but significantly more powerful option.
With a few modifications, the Hello World sample can provide an example of asynchronous reception, also known as message-driven POJOs.
In fact, there is a sub-package that provides exactly that: org.springframework.amqp.samples.helloworld.async
.
Again, we start with the sending side.
Open the ProducerConfiguration
class and notice that it creates a connectionFactory
and a rabbitTemplate
bean.
This time, since the configuration is dedicated to the message sending side, we do not even need any queue definitions, and the RabbitTemplate
has only the 'routingKey' property set.
Recall that messages are sent to an exchange rather than being sent directly to a queue.
The AMQP default exchange is a direct exchange with no name.
All queues are bound to that default exchange with their name as the routing key.
That is why we only need to provide the routing key here.
The following listing shows the rabbitTemplate
definition:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
Since this sample demonstrates asynchronous message reception, the producing side is designed to continuously send messages (if it were a message-per-execution model like the synchronous version, it would not be quite so obvious that it is, in fact, a message-driven consumer).
The component responsible for continuously sending messages is defined as an inner class within the ProducerConfiguration
.
It is configured to run every three seconds.
The following listing shows the component:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
You do not need to understand all of the details, since the real focus should be on the receiving side (which we cover next).
However, if you are not yet familiar with Spring task scheduling support, you can learn more here.
The short story is that the postProcessor
bean in the ProducerConfiguration
registers the task with a scheduler.
Now we can turn to the receiving side.
To emphasize the message-driven POJO behavior, we start with the component that react to the messages.
The class is called HelloWorldHandler
and is shown in the following listing:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
That class is a POJO.
It does not extend any base class, it does not implement any interfaces, and it does not even contain any imports.
It is being “adapted” to the MessageListener
interface by the Spring AMQP MessageListenerAdapter
.
You can then configure that adapter on a SimpleMessageListenerContainer
.
For this sample, the container is created in the ConsumerConfiguration
class.
You can see the POJO wrapped in the adapter there.
The following listing shows how the listenerContainer
is defined:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
The SimpleMessageListenerContainer
is a Spring lifecycle component and, by default, starts automatically.
If you look in the Consumer
class, you can see that its main()
method consists of nothing more than a one-line bootstrap to create the ApplicationContext
.
The Producer’s main()
method is also a one-line bootstrap, since the component whose method is annotated with @Scheduled
also starts automatically.
You can start the Producer
and Consumer
in any order, and you should see messages being sent and received every three seconds.
Stock Trading
The Stock Trading sample demonstrates more advanced messaging scenarios than the Hello World sample.
However, the configuration is very similar, if a bit more involved.
Since we walked through the Hello World configuration in detail, here, we focus on what makes this sample different.
There is a server that pushes market data (stock quotations) to a topic exchange.
Then, clients can subscribe to the market data feed by binding a queue with a routing pattern (for example,
app.stock.quotes.nasdaq.*
).
The other main feature of this demo is a request-reply “stock trade” interaction that is initiated by the client and handled by the server.
That involves a private replyTo
queue that is sent by the client within the order request message itself.
The server’s core configuration is in the RabbitServerConfiguration
class within the org.springframework.amqp.rabbit.stocks.config.server
package.
It extends the AbstractStockAppRabbitConfiguration
.
That is where the resources common to the server and client are defined, including the market data topic exchange (whose name is 'app.stock.marketdata') and the queue that the server exposes for stock trades (whose name is 'app.stock.request').
In that common configuration file, you also see that a Jackson2JsonMessageConverter
is configured on the RabbitTemplate
.
The server-specific configuration consists of two things.
First, it configures the market data exchange on the RabbitTemplate
so that it does not need to provide that exchange name with every call to send a Message
.
It does this within an abstract callback method defined in the base configuration class.
The following listing shows that method:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
Second, the stock request queue is declared.
It does not require any explicit bindings in this case, because it is bound to the default no-name exchange with its own name as the routing key.
As mentioned earlier, the AMQP specification defines that behavior.
The following listing shows the definition of the stockRequestQueue
bean:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
Now that you have seen the configuration of the server’s AMQP resources, navigate to the org.springframework.amqp.rabbit.stocks
package under the src/test/java
directory.
There, you can see the actual Server
class that provides a main()
method.
It creates an ApplicationContext
based on the server-bootstrap.xml
config file.
There, you can see the scheduled task that publishes dummy market data.
That configuration relies upon Spring’s task
namespace support.
The bootstrap config file also imports a few other files.
The most interesting one is server-messaging.xml
, which is directly under src/main/resources
.
There, you can see the messageListenerContainer
bean that is responsible for handling the stock trade requests.
Finally, have a look at the serverHandler
bean that is defined in server-handlers.xml
(which is also in 'src/main/resources').
That bean is an instance of the ServerHandler
class and is a good example of a message-driven POJO that can also send reply messages.
Notice that it is not itself coupled to the framework or any of the AMQP concepts.
It accepts a TradeRequest
and returns a TradeResponse
.
The following listing shows the definition of the handleMessage
method:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
Now that we have seen the most important configuration and code for the server, we can turn to the client.
The best starting point is probably RabbitClientConfiguration
, in the org.springframework.amqp.rabbit.stocks.config.client
package.
Notice that it declares two queues without providing explicit names.
The following listing shows the bean definitions for the two queues:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
Those are private queues, and unique names are generated automatically.
The first generated queue is used by the client to bind to the market data exchange that has been exposed by the server.
Recall that, in AMQP, consumers interact with queues while producers interact with exchanges.
The “binding” of queues to exchanges is what tells the broker to deliver (or route) messages from a given exchange to a queue.
Since the market data exchange is a topic exchange, the binding can be expressed with a routing pattern.
The RabbitClientConfiguration
does so with a Binding
object, and that object is generated with the BindingBuilder
fluent API.
The following listing shows the Binding
:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
Notice that the actual value has been externalized in a properties file (client.properties
under src/main/resources
), and that we use Spring’s @Value
annotation to inject that value.
This is generally a good idea.
Otherwise, the value would have been hardcoded in a class and unmodifiable without recompilation.
In this case, it is much easier to run multiple versions of the client while making changes to the routing pattern used for binding.
We can try that now.
Start by running org.springframework.amqp.rabbit.stocks.Server
and then org.springframework.amqp.rabbit.stocks.Client
.
You should see dummy quotations for NASDAQ
stocks, because the current value associated with the 'stocks.quote.pattern' key in client.properties is 'app.stock.quotes.nasdaq.'.
Now, while keeping the existing Server
and Client
running, change that property value to 'app.stock.quotes.nyse.' and start a second Client
instance.
You should see that the first client still receives NASDAQ quotes while the second client receives NYSE quotes.
You could instead change the pattern to get all stocks or even an individual ticker.
The final feature we explore is the request-reply interaction from the client’s perspective.
Recall that we have already seen the ServerHandler
that accepts TradeRequest
objects and returns TradeResponse
objects.
The corresponding code on the Client
side is RabbitStockServiceGateway
in the org.springframework.amqp.rabbit.stocks.gateway
package.
It delegates to the RabbitTemplate
in order to send messages.
The following listing shows the send
method:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
Notice that, prior to sending the message, it sets the replyTo
address.
It provides the queue that was generated by the traderJoeQueue
bean definition (shown earlier).
The following listing shows the @Bean
definition for the StockServiceGateway
class itself:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
If you are no longer running the server and client, start them now. Try sending a request with the format of '100 TCKR'. After a brief artificial delay that simulates “processing” of the request, you should see a confirmation message appear on the client.
Receiving JSON from Non-Spring Applications
Spring applications, when sending JSON, set the TypeId
header to the fully qualified class name to assist the receiving application in converting the JSON back to a Java object.
The spring-rabbit-json
sample explores several techniques to convert the JSON from a non-Spring application.
See also Jackson2JsonMessageConverter
as well as the Javadoc for the DefaultClassMapper
.