5. Messaging

Spring Cloud AWS provides Amazon SQS and Amazon SNS integration that simplifies the publication and consumption of messages over SQS or SNS. While SQS fully relies on the messaging API introduced with Spring 4.0, SNS only partially implements it as the receiving part must be handled differently for push notifications.spring-doc.cn

5.1. Configuring messaging

Before using and configuring the messaging support, the application has to include the respective module dependency into the Maven configuration. Spring Cloud AWS Messaging support comes as a separate module to allow the modularized use of the modules.spring-doc.cn

5.1.1. Maven dependency configuration

The Spring Cloud AWS messaging module comes as a standalone module and can be imported with the following dependency declaration:spring-doc.cn

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-aws-messaging</artifactId>
    <version>{spring-cloud-version}</version>
</dependency>

5.2. SQS support

Amazon SQS is a hosted messaging service on the Amazon Web Service platform that provides point-to-point communication with queues. Compared to JMS or other message services Amazon SQS has several features and limitations that should be taken into consideration.spring-doc.cn

  • Amazon SQS allows only String payloads, so any Object must be transformed into a String representation. Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS messages by converting them to JSON.spring-doc.cn

  • Amazon SQS has no transaction support, so messages might therefore be retrieved twice. Application have to be written in an idempotent way so that they can receive a message twice.spring-doc.cn

  • Amazon SQS has a maximum message size of 256kb per message, so bigger messages will fail to be sent.spring-doc.cn

5.2.1. Sending a message

The QueueMessagingTemplate contains many convenience methods to send a message. There are send methods that specify the destination using a QueueMessageChannel object and those that specify the destination using a string which is going to be resolved against the SQS API. The send method that takes no destination argument uses the default destination.spring-doc.cn

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;

public class SqsQueueSender {

    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
        this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
    }

    public void send(String message) {
        this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
    }
}

This example uses the MessageBuilder class to create a message with a string payload. The QueueMessagingTemplate is constructed by passing a reference to the AmazonSQSAsync client. The destination in the send method is a string value that must match the queue name defined on AWS. This value will be resolved at runtime by the Amazon SQS client. Optionally a ResourceIdResolver implementation can be passed to the QueueMessagingTemplate constructor to resolve resources by logical name when running inside a CloudFormation stack (see Managing cloud environments for more information about resource name resolution).spring-doc.cn

With the messaging namespace a QueueMessagingTemplate can be defined in an XML configuration file.spring-doc.cn

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
    xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
    xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/cloud/aws/context
        http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
        http://www.springframework.org/schema/cloud/aws/messaging
        http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">

    <aws-context:context-credentials>
        <aws-context:instance-profile-credentials />
    </aws-context:context-credentials>

    <aws-messaging:queue-messaging-template id="queueMessagingTemplate" />

</beans>

In this example the messaging namespace handler constructs a new QueueMessagingTemplate. The AmazonSQSAsync client is automatically created and passed to the template’s constructor based on the provided credentials. If the application runs inside a configured CloudFormation stack a ResourceIdResolver is passed to the constructor (see Managing cloud environments for more information about resource name resolution).spring-doc.cn

Using message converters

In order to facilitate the sending of domain model objects, the QueueMessagingTemplate has various send methods that take a Java object as an argument for a message’s data content. The overloaded methods convertAndSend() and receiveAndConvert() in QueueMessagingTemplate delegate the conversion process to an instance of the MessageConverter interface. This interface defines a simple contract to convert between Java objects and SQS messages. The default implementation SimpleMessageConverter simply unwraps the message payload as long as it matches the target type. By using the converter, you and your application code can focus on the business object that is being sent or received via SQS and not be concerned with the details of how it is represented as an SQS message.spring-doc.cn

As SQS is only able to send String payloads the default converter SimpleMessageConverter should only be used to send String payloads. For more complex objects a custom converter should be used like the one created by the messaging namespace handler.spring-doc.cn

It is recommended to use the XML messaging namespace to create QueueMessagingTemplate as it will set a more sophisticated MessageConverter that converts objects into JSON when Jackson is on the classpath.spring-doc.cn

<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));

In this example a QueueMessagingTemplate is created using the messaging namespace. The convertAndSend method converts the payload Person using the configured MessageConverter and sends the message.spring-doc.cn

5.2.2. Receiving a message

There are two ways for receiving SQS messages, either use the receive methods of the QueueMessagingTemplate or with annotation-driven listener endpoints. The latter is by far the more convenient way to receive messages.spring-doc.cn

Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);

In this example the QueueMessagingTemplate will get one message from the SQS queue and convert it to the target class passed as argument.spring-doc.cn

5.2.3. Annotation-driven listener endpoints

Annotation-driven listener endpoints are the easiest way for listening on SQS messages. Simply annotate methods with MessageMapping and the QueueMessageHandler will route the messages to the annotated methods.spring-doc.cn

<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
    // ...
}

In this example a queue listener container is started that polls the SQS queueName passed to the MessageMapping annotation. The incoming messages are converted to the target type and then the annotated method queueListener is invoked.spring-doc.cn

In addition to the payload, headers can be injected in the listener methods with the @Header or @Headers annotations. @Header is used to inject a specific header value while @Headers injects a Map<String, String> containing all headers.spring-doc.cn

Only the standard message attributes sent with an SQS message are supported. Custom attributes are currently not supported.spring-doc.cn

In addition to the provided argument resolvers, custom ones can be registered on the aws-messaging:annotation-driven-queue-listener element using the aws-messaging:argument-resolvers attribute (see example below).spring-doc.cn

<aws-messaging:annotation-driven-queue-listener>
    <aws-messaging:argument-resolvers>
        <bean class="org.custom.CustomArgumentResolver" />
    </aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>

By default the SimpleMessageListenerContainer creates a ThreadPoolTaskExecutor with computed values for the core and max pool sizes. The core pool size is set to twice the number of queues and the max pool size is obtained by multiplying the number of queues by the value of the maxNumberOfMessages field. If these default values do not meet the need of the application, a custom task executor can be set with the task-executor attribute (see example below).spring-doc.cn

<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
Message reply

Message listener methods can be annotated with @SendTo to send their return value to another channel. The SendToHandlerMethodReturnValueHandler uses the defined messaging template set on the aws-messaging:annotation-driven-queue-listener element to send the return value. The messaging template must implement the DestinationResolvingMessageSendingOperations interface.spring-doc.cn

<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
    // ...
}

In this example the extractLeafs method will receive messages coming from the treeQueue and then return a List of Leafs which is going to be sent to the leafsQueue. Note that on the aws-messaging:annotation-driven-queue-listener XML element there is an attribute send-to-message-template that specifies QueueMessagingTemplate as the messaging template to be used to send the return value of the message listener method.spring-doc.cn

Handling Exceptions

Exception thrown inside @SqsListener annotated methods can be handled by methods annotated with @MessageExceptionHandler.spring-doc.cn

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

5.2.4. The SimpleMessageListenerContainerFactory

The SimpleMessageListenerContainer can also be configured with Java by creating a bean of type SimpleMessageListenerContainerFactory.spring-doc.cn

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setAutoStartup(false);
    factory.setMaxNumberOfMessages(5);
    // ...

    return factory;
}

5.2.5. Consuming AWS Event messages with Amazon SQS

It is also possible to receive AWS generated event messages with the SQS message listeners. Because AWS messages does not contain the mime-type header, the Jackson message converter has to be configured with the strictContentTypeMatch property false to also parse message without the proper mime type.spring-doc.cn

The next code shows the configuration of the message converter using the QueueMessageHandlerFactory and re-configuring the MappingJackson2MessageConverterspring-doc.cn

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

With the configuration above, it is possible to receive event notification for S3 buckets (and also other event notifications like elastic transcoder messages) inside @SqsListener annotated methods s shown below.spring-doc.cn

@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
    S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}

5.3. SNS support

Amazon SNS is a publish-subscribe messaging system that allows clients to publish notification to a particular topic. Other interested clients may subscribe using different protocols like HTTP/HTTPS, e-mail or an Amazon SQS queue to receive the messages.spring-doc.cn

The next graphic shows a typical example of an Amazon SNS architecture.spring-doc.cn

SNS Overview

Spring Cloud AWS supports Amazon SNS by providing support to send notifications with a NotificationMessagingTemplate and to receive notifications with the HTTP/HTTPS endpoint using the Spring Web MVC @Controller based programming model. Amazon SQS based subscriptions can be used with the annotation-driven message support that is provided by the Spring Cloud AWS messaging module.spring-doc.cn

5.3.1. Sending a message

The NotificationMessagingTemplate contains two convenience methods to send a notification. The first one specifies the destination using a String which is going to be resolved against the SNS API. The second one takes no destination argument and uses the default destination. All the usual send methods that are available on the MessageSendingOperations are implemented but are less convenient to send notifications because the subject must be passed as header.spring-doc.cn

Currently only String payloads can be sent using the NotificationMessagingTemplate as this is the expected type by the SNS API.spring-doc.cn

import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;

public class SnsNotificationSender {

    private final NotificationMessagingTemplate notificationMessagingTemplate;

    @Autowired
    public SnsNotificationSender(AmazonSNS amazonSns) {
        this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
    }

    public void send(String subject, String message) {
        this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
    }
}

This example constructs a new NotificationMessagingTemplate by passing an AmazonSNS client as argument. In the send method the convenience sendNotification method is used to send a message with subject to an SNS topic. The destination in the sendNotification method is a string value that must match the topic name defined on AWS. This value is resolved at runtime by the Amazon SNS client. Optionally a ResourceIdResolver implementation can be passed to the NotificationMessagingTemplate constructor to resolve resources by logical name when running inside a CloudFormation stack. (See Managing cloud environments for more information about resource name resolution.)spring-doc.cn

It is recommended to use the XML messaging namespace to create NotificationMessagingTemplate as it will automatically configure the SNS client to setup the default converter.spring-doc.cn

<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />

5.3.2. Annotation-driven HTTP notification endpoint

SNS supports multiple endpoint types (SQS, Email, HTTP, HTTPS), Spring Cloud AWS provides support for HTTP(S) endpoints. SNS sends three type of requests to an HTTP topic listener endpoint, for each of them annotations are provided:spring-doc.cn

  • Subscription request → @NotificationSubscriptionMappingspring-doc.cn

  • Notification request → @NotificationMessageMappingspring-doc.cn

  • Unsubscription request → @NotificationUnsubscribeMappingspring-doc.cn

HTTP endpoints are based on Spring MVC controllers. Spring Cloud AWS added some custom argument resolvers to extract the message and subject out of the notification requests.spring-doc.cn

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

Currently it is not possible to define the mapping URL on the method level therefore the RequestMapping must be done at type level and must contain the full path of the endpoint.spring-doc.cn

This example creates a new Spring MVC controller with three methods to handle the three requests listed above. In order to resolve the arguments of the handleNotificationMessage methods a custom argument resolver must be registered. The XML configuration is listed below.spring-doc.cn

<mvc:annotation-driven>
    <mvc:argument-resolvers>
        <ref bean="notificationResolver" />
    </mvc:argument-resolvers>
</mvc:annotation-driven>

<aws-messaging:notification-argument-resolver id="notificationResolver" />

The aws-messaging:notification-argument-resolver element registers three argument resolvers: NotificationStatusHandlerMethodArgumentResolver, NotificationMessageHandlerMethodArgumentResolver, and NotificationSubjectHandlerMethodArgumentResolver.spring-doc.cn

5.4. Using CloudFormation

Amazon SQS queues and SNS topics can be configured within a stack and then be used by applications. Spring Cloud AWS also supports the lookup of stack-configured queues and topics by their logical name with the resolution to the physical name. The example below shows an SNS topic and SQS queue configuration inside a CloudFormation template.spring-doc.cn

"LogicalQueueName": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
    }
},
"LogicalTopicName": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
    }
}

The logical names LogicalQueueName and LogicalTopicName can then be used in the configuration and in the application as shown below:spring-doc.cn

<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />

<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
    // Logical names can also be used with messaging templates
    this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}

When using the logical names like in the example above, the stack can be created on different environments without any configuration or code changes inside the application.spring-doc.cn