5. Messaging
Spring Cloud AWS provides Amazon SQS and Amazon SNS integration that simplifies the publication and consumption of messages over SQS or SNS. While SQS fully relies on the messaging API introduced with Spring 4.0, SNS only partially implements it as the receiving part must be handled differently for push notifications.
5.1. Configuring messaging
Before using and configuring the messaging support, the application has to include the respective module dependency into the Maven configuration. Spring Cloud AWS Messaging support comes as a separate module to allow the modularized use of the modules.
5.1.1. Maven dependency configuration
The Spring Cloud AWS messaging module comes as a standalone module and can be imported with the following dependency declaration:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>{spring-cloud-version}</version>
</dependency>
5.2. SQS support
Amazon SQS is a hosted messaging service on the Amazon Web Service platform that provides point-to-point communication with queues. Compared to JMS or other message services Amazon SQS has several features and limitations that should be taken into consideration.
-
Amazon SQS allows only
String
payloads, so anyObject
must be transformed into a String representation. Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS messages by converting them to JSON. -
Amazon SQS has no transaction support, so messages might therefore be retrieved twice. Application have to be written in an idempotent way so that they can receive a message twice.
-
Amazon SQS has a maximum message size of 256kb per message, so bigger messages will fail to be sent.
5.2.1. Sending a message
The QueueMessagingTemplate
contains many convenience methods to send a message. There are send methods that specify the
destination using a QueueMessageChannel
object and those that specify the destination using a string which is going to
be resolved against the SQS API. The send method that takes no destination argument uses the default destination.
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
public class SqsQueueSender {
private final QueueMessagingTemplate queueMessagingTemplate;
@Autowired
public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
}
public void send(String message) {
this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
}
}
This example uses the MessageBuilder
class to create a message with a string payload. The QueueMessagingTemplate
is
constructed by passing a reference to the AmazonSQSAsync
client. The destination in the send method is a string value that
must match the queue name defined on AWS. This value will be resolved at runtime by the Amazon SQS client. Optionally
a ResourceIdResolver
implementation can be passed to the QueueMessagingTemplate
constructor to resolve resources by
logical name when running inside a CloudFormation stack (see Managing cloud environments for more information about
resource name resolution).
With the messaging namespace a QueueMessagingTemplate
can be defined in an XML configuration file.
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cloud/aws/context
http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
http://www.springframework.org/schema/cloud/aws/messaging
http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">
<aws-context:context-credentials>
<aws-context:instance-profile-credentials />
</aws-context:context-credentials>
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
</beans>
In this example the messaging namespace handler constructs a new QueueMessagingTemplate
. The AmazonSQSAsync
client
is automatically created and passed to the template’s constructor based on the provided credentials. If the
application runs inside a configured CloudFormation stack a ResourceIdResolver
is passed to the constructor (see
Managing cloud environments for more information about resource name resolution).
Using message converters
In order to facilitate the sending of domain model objects, the QueueMessagingTemplate
has various send methods that
take a Java object as an argument for a message’s data content. The overloaded methods convertAndSend()
and
receiveAndConvert()
in QueueMessagingTemplate
delegate the conversion process to an instance of the MessageConverter
interface. This interface defines a simple contract to convert between Java objects and SQS messages. The default
implementation SimpleMessageConverter
simply unwraps the message payload as long as it matches the target type. By
using the converter, you and your application code can focus on the business object that is being sent or received via
SQS and not be concerned with the details of how it is represented as an SQS message.
As SQS is only able to send |
It is recommended to use the XML messaging namespace to create QueueMessagingTemplate
as it will set a more
sophisticated MessageConverter
that converts objects into JSON when Jackson is on the classpath.
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));
In this example a QueueMessagingTemplate
is created using the messaging namespace. The convertAndSend
method
converts the payload Person
using the configured MessageConverter
and sends the message.
5.2.2. Receiving a message
There are two ways for receiving SQS messages, either use the receive
methods of the QueueMessagingTemplate
or with
annotation-driven listener endpoints. The latter is by far the more convenient way to receive messages.
Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);
In this example the QueueMessagingTemplate
will get one message from the SQS queue and convert it to the target class
passed as argument.
5.2.3. Annotation-driven listener endpoints
Annotation-driven listener endpoints are the easiest way for listening on SQS messages. Simply annotate methods with
MessageMapping
and the QueueMessageHandler
will route the messages to the annotated methods.
<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
// ...
}
In this example a queue listener container is started that polls the SQS queueName
passed to the MessageMapping
annotation. The incoming messages are converted to the target type and then the annotated method queueListener
is invoked.
In addition to the payload, headers can be injected in the listener methods with the @Header
or @Headers
annotations. @Header
is used to inject a specific header value while @Headers
injects a Map<String, String>
containing all headers.
Only the standard message attributes sent with an SQS message are supported. Custom attributes are currently not supported.
In addition to the provided argument resolvers, custom ones can be registered on the
aws-messaging:annotation-driven-queue-listener
element using the aws-messaging:argument-resolvers
attribute (see example below).
<aws-messaging:annotation-driven-queue-listener>
<aws-messaging:argument-resolvers>
<bean class="org.custom.CustomArgumentResolver" />
</aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>
By default the SimpleMessageListenerContainer
creates a ThreadPoolTaskExecutor
with computed values for the core and
max pool sizes. The core pool size is set to twice the number of queues and the max pool size is obtained by multiplying
the number of queues by the value of the maxNumberOfMessages
field. If these default values do not meet the need of
the application, a custom task executor can be set with the task-executor
attribute (see example below).
<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
Message reply
Message listener methods can be annotated with @SendTo
to send their return value to another channel. The
SendToHandlerMethodReturnValueHandler
uses the defined messaging template set on the
aws-messaging:annotation-driven-queue-listener
element to send the return value. The messaging template must implement
the DestinationResolvingMessageSendingOperations
interface.
<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
// ...
}
In this example the extractLeafs
method will receive messages coming from the treeQueue
and then return a
List
of Leaf
s which is going to be sent to the leafsQueue
. Note that on the
aws-messaging:annotation-driven-queue-listener
XML element there is an attribute send-to-message-template
that specifies QueueMessagingTemplate
as the messaging template to be used to send the return value of the message
listener method.
Handling Exceptions
Exception thrown inside @SqsListener
annotated methods can be handled by methods annotated with @MessageExceptionHandler
.
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;
@Component
public class MyMessageHandler {
@SqsListener("queueName")
void handle(String message) {
...
throw new MyException("something went wrong");
}
@MessageExceptionHandler(MyException.class)
void handleException(MyException e) {
...
}
}
5.2.4. The SimpleMessageListenerContainerFactory
The SimpleMessageListenerContainer
can also be configured with Java by creating a bean of type SimpleMessageListenerContainerFactory
.
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(false);
factory.setMaxNumberOfMessages(5);
// ...
return factory;
}
5.2.5. Consuming AWS Event messages with Amazon SQS
It is also possible to receive AWS generated event messages with the SQS message listeners. Because
AWS messages does not contain the mime-type header, the Jackson message converter has to be configured
with the strictContentTypeMatch
property false to also parse message without the proper mime type.
The next code shows the configuration of the message converter using the QueueMessageHandlerFactory
and re-configuring the MappingJackson2MessageConverter
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
return factory;
}
With the configuration above, it is possible to receive event notification for S3 buckets (and also other
event notifications like elastic transcoder messages) inside @SqsListener
annotated methods s shown below.
@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}
5.3. SNS support
Amazon SNS is a publish-subscribe messaging system that allows clients to publish notification to a particular topic. Other interested clients may subscribe using different protocols like HTTP/HTTPS, e-mail or an Amazon SQS queue to receive the messages.
The next graphic shows a typical example of an Amazon SNS architecture.
Spring Cloud AWS supports Amazon SNS by providing support to send notifications with a NotificationMessagingTemplate
and
to receive notifications with the HTTP/HTTPS endpoint using the Spring Web MVC @Controller
based programming model. Amazon
SQS based subscriptions can be used with the annotation-driven message support that is provided by the Spring Cloud AWS messaging module.
5.3.1. Sending a message
The NotificationMessagingTemplate
contains two convenience methods to send a notification. The first one specifies the
destination using a String
which is going to be resolved against the SNS API. The second one takes no destination
argument and uses the default destination. All the usual send methods that are available on the MessageSendingOperations
are implemented but are less convenient to send notifications because the subject must be passed as header.
Currently only |
import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
public class SnsNotificationSender {
private final NotificationMessagingTemplate notificationMessagingTemplate;
@Autowired
public SnsNotificationSender(AmazonSNS amazonSns) {
this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
}
public void send(String subject, String message) {
this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
}
}
This example constructs a new NotificationMessagingTemplate
by passing an AmazonSNS
client as argument. In the send
method the convenience sendNotification
method is used to send a message
with subject
to an SNS topic. The
destination in the sendNotification
method is a string value that must match the topic name defined on AWS. This value
is resolved at runtime by the Amazon SNS client. Optionally a ResourceIdResolver
implementation can be passed to the
NotificationMessagingTemplate
constructor to resolve resources by logical name when running inside a CloudFormation stack.
(See Managing cloud environments for more information about resource name resolution.)
It is recommended to use the XML messaging namespace to create NotificationMessagingTemplate
as it will automatically
configure the SNS client to setup the default converter.
<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />
5.3.2. Annotation-driven HTTP notification endpoint
SNS supports multiple endpoint types (SQS, Email, HTTP, HTTPS), Spring Cloud AWS provides support for HTTP(S) endpoints. SNS sends three type of requests to an HTTP topic listener endpoint, for each of them annotations are provided:
-
Subscription request →
@NotificationSubscriptionMapping
-
Notification request →
@NotificationMessageMapping
-
Unsubscription request →
@NotificationUnsubscribeMapping
HTTP endpoints are based on Spring MVC controllers. Spring Cloud AWS added some custom argument resolvers to extract the message and subject out of the notification requests.
@Controller
@RequestMapping("/topicName")
public class NotificationTestController {
@NotificationSubscriptionMapping
public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
//We subscribe to start receive the message
status.confirmSubscription();
}
@NotificationMessageMapping
public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
// ...
}
@NotificationUnsubscribeConfirmationMapping
public void handleUnsubscribeMessage(NotificationStatus status) {
//e.g. the client has been unsubscribed and we want to "re-subscribe"
status.confirmSubscription();
}
}
Currently it is not possible to define the mapping URL on the method level therefore the |
This example creates a new Spring MVC controller with three methods to handle the three requests listed above. In order
to resolve the arguments of the handleNotificationMessage
methods a custom argument resolver must be registered. The
XML configuration is listed below.
<mvc:annotation-driven>
<mvc:argument-resolvers>
<ref bean="notificationResolver" />
</mvc:argument-resolvers>
</mvc:annotation-driven>
<aws-messaging:notification-argument-resolver id="notificationResolver" />
The aws-messaging:notification-argument-resolver
element registers three argument resolvers:
NotificationStatusHandlerMethodArgumentResolver
, NotificationMessageHandlerMethodArgumentResolver
,
and NotificationSubjectHandlerMethodArgumentResolver
.
5.4. Using CloudFormation
Amazon SQS queues and SNS topics can be configured within a stack and then be used by applications. Spring Cloud AWS also supports the lookup of stack-configured queues and topics by their logical name with the resolution to the physical name. The example below shows an SNS topic and SQS queue configuration inside a CloudFormation template.
"LogicalQueueName": {
"Type": "AWS::SQS::Queue",
"Properties": {
}
},
"LogicalTopicName": {
"Type": "AWS::SNS::Topic",
"Properties": {
}
}
The logical names LogicalQueueName
and LogicalTopicName
can then be used in the configuration and in the application
as shown below:
<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />
<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
// Logical names can also be used with messaging templates
this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}
When using the logical names like in the example above, the stack can be created on different environments without any configuration or code changes inside the application.