Giriş
Bir çeşit message queue. Amazon tarafından sunulan ilk servislerden birisi. Açıklaması
şöyle
The simple query service (SQS) was one of the first services AWS offered.
Internally, SQS uses elastic-mq to perform all the operations
SQS provides two types of messaging queues :
- FIFO Queue (First In First Out Order)
- Standard Queue (Best Effort Ordering)
SQS'in bazı özellikleri
şöyle
AWS Simple Queue Service (SQS) offers a reliable, highly scalable serverless hosted queue for storing messages and easily moving data between distributed application components.
- Allows relatively smaller message size up to 256 KB
- Each message can be processed independently
- Auto scales to dynamically increase read throughput by scaling the number of tasks reading from the queue
- Provides messaging semantics (such as message-level acknowledgment/fail) and message visibility timeout
- 120,000 limit for the number of inflight messages for a standard queue and 20,000 for a FIFO queu
Bazı SQS Attribute'ları şöyle
Naming: A FIFO queue can be identified with the name having an extension of “.fifo”, else it is a Standard Queue
Visibility Timeout: This is the length of time that a message received from a queue (by one consumer) will not be visible to the other message consumers. This can be from 0 seconds to max 12 hours.
Basically this means, once a message has been popped out of the queue, it will be visible to the consumer only who requested it and will not be visible for the configured time to anyone else. Once, the time period is completed the message goes back to the queue.
Message retention period: This is the time period till which the message will be retained in the queue. This can be between 1 minute to max 4 days.
Say, we send a message to the queue, then the message will be retained in the queue for the configured retain period and after that it will be automatically deleted.
Delivery delay: This is the time period which states that after you make a send message call to queue, the message will be physically present after a certain delay. This can be configured to be in between 0 seconds to 15 minutes.
Maximum message size: This is self explanatory. The message size can be configured in between 1 KB to 256 KB.
Receive message wait time: This is the maximum amount of time that polling or consumer will wait for messages to become available to receive. The time period is configurable between 0 seconds to 20 seconds
Content Based Deduplication: This is an optional configuration only for FIFO queues. If this is enabled, it makes message de-duplication optional.
Dead Letter Queue: This is one of the features AWS SQS provides which helps in moving trouble-some messages automatically.
Let’s say, a message in the queue is unable to get processed due to some faulty message content and keeps on failing. This message can be moved into a separate queue after some particular retries and can be debugged separately.
Duplicate SQS Messages
Açıklaması
şöyle. Yani SQS çift mesajları anlayıp elemek için iyi bir seçenek değil. DynamoDB daha iyi bir seçenek olabilir.
You’re adding messages to a queue and don’t know if you already added a message. This issue can’t be solved with SQS; you would have to either use another service or filter messages before adding them to the queue.
SQS isn’t made for such a use case, so you can’t prevent a queue from accepting items it already holds. If you have this constraint, you need to choose a more complex service like DynamoDB, which allows you to mark documents as unique. Step Functions might also be an option here, but this depends mainly on your use case.
Örnek
SQS + Lambda + Dynamo kullanan bir örnek var. Şeklen
şöyle. SQS düğümleri kırmızı renkte simgelerle gösteriliyor.
AWS Kinesis v/s AWS SQS
Açıklaması şöyle
Message retention
Kinesis — Stores records for 24 hours by default, can retain streaming data for up to 7 days
SQS — Can configure message retention period from 1 minute to 14 days, default is 4 days
Message retry
Kinesis — No delivery guarantee, service consuming data can retry as long as data is present, consumer doesn’t remove data, ordering is also guaranteed in case of re-drive items
SQS — At-least-once delivery guarantee, continues to republish unacknowledged message until the message reaches max age as per Message Retention Period
Failure handling
Kinesis — No Dead Letter Queue (DLQ) out of the box, every application that consumes the stream has to deal with failure on its own
SQS — If consumer fails to process message within visibility timeout, the message is sent to a configured DLQ and can be received again in SQS
Number of consumers
Kinesis — Supports multiple consumers capabilities, same data records can be processed at the same time by multiple consumers or different time by the same consumer
SQS — Supports only single consumer at a time, once a message has been acknowledged it’s deleted from the queue
Ordering of records
Kinesis — Supports in-order processing within a shard however, no kind of ordering between shards can be guaranteed
SQS — Standard SQS queue doesn’t guarantee in-order processing however FIFO queues support it but with limitations
Routing of records
Kinesis — Related records can be routed to the same record processor
SQS — No support for routing
Other AWS services support
Kinesis — Can send stream records directly to services such as Amazon S3, Amazon Redshift, Amazon ElasticSearch, Splunk, AWS Lambda
SQS — Other services can be integrated through AWS Lambda
Sample use-cases
Kinesis — Log and Event Data Collection, Real-time Analytics, Mobile Data Capture, Internet of Things Data Feed
SQS — Application integration, Decouple microservices, Decouple live user requests from intensive background work, Batch messages for future processing
Maven
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.12.341</version>
</dependency>
Gradle
implementation 'io.awspring.cloud:spring-cloud-starter-aws-messaging:2.3.5'
SQS API
AmazonSQS Sınıfı
AmazonSQSClientBuilder.standard() metodu kullanılır
Örnek
@Configuration
public class AWSConfig {
public AWSCredentials credentials() {
AWSCredentials credentials = new BasicAWSCredentials("accesskey","secretkey");
return credentials;
}
@Bean
public AmazonSQS amazonSQS() {
return AmazonSQSClientBuilder
.standard()
.withEndpointConfiguration(getEndpointConfiguration("http://localhost:4566"))
.withCredentials(new AWSStaticCredentialsProvider(credentials()))
.build();
}
private AwsClientBuilder.EndpointConfiguration getEndpointConfiguration(String url) {
return new AwsClientBuilder.EndpointConfiguration(url, Regions.US_EAST_1.getName());
}
}
Örnek
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
@Configuration
public class AWSConfig {
@Value("${cloud.aws.access-key}")
private String awsAccessKey;
@Value("${cloud.aws.secret-key}")
private String awsSecretKey;
@Bean
public AWSCredentials credentials() {
return new BasicAWSCredentials(awsAccessKey, awsSecretKey);
}
}
@Configuration
public class SQSConfig {
private final AWSCredentials credentials;
@Value("${cloud.aws.region.static}")
private String awsRegion;
@Bean
public AmazonSQS amazonSQSClient() {
return AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(awsRegion)
.build();
}
}
createQueue metodu
Örnek
import com.amazonaws.services.sqs.model.*;
public CreateQueueResult createQueue(final String queueName) {
//TODO: create with createObjectRequest params
//CreateQueueRequest createRequest = new CreateQueueRequest(queueName)
// .addAttributesEntry("DelaySeconds", "60")
// .addAttributesEntry("MessageRetentionPeriod", "86400");
return amazonSQS.createQueue(queueName);
}
deleteMessage metodu
Örnek
void deleteMessage(Message message) {
log.info("Deleting message with id {}", message.getMessageId());
amazonSQSClient.deleteMessage(
new DeleteMessageRequest(tutorialSQS, message.getReceiptHandle()));
log.info("Message with id {} successfully deleted.", message.getMessageId());
}
deleteMessageBatch metodu
Örnek
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
@Service
public class PurchaseTransactionListenerService {
private final AmazonSQS amazonSQSClient;
@Value("${cloud.aws.sqs.url}")
private String tutorialSQS;
@Value("${cloud.aws.sqs.batch-size}")
private Integer batchSize;
@Value("${cloud.aws.sqs.poll-wait-time-sec}")
private Integer pollWaitTimeInSeconds;
@Value("${cloud.aws.sqs.parallel-processing}")
private boolean isParallelProcessing;
@Scheduled(fixedDelayString = "${cloud.aws.sqs.fixed-poll-rate}")
public void messageInBatchListener() {
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(tutorialSQS)
.withMaxNumberOfMessages(batchSize)
.withWaitTimeSeconds(pollWaitTimeInSeconds);
List<Message> messages = amazonSQSClient.receiveMessage(receiveRequest)
.getMessages();
List<PurchaseTransactionEntity> purchaseTransactionEntities = messages.stream()
.map(this::processMessage)
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<Message> processed = messages.stream()
.filter(m -> Boolean.parseBoolean(m.getAttributes().get("processed"))).toList();
deleteMessagesBatch(processed);
}
private PurchaseTransactionEntity processMessage(Message message) {
try {
String body = message.getBody();
PurchaseTransactionEntity purchaseTransactionEntity = mapper.readValue(body,
PurchaseTransactionEntity.class);
//Do processing here
//Mark the message as processed
message.addAttributesEntry("processed", "true");
return purchaseTransactionEntity;
} catch (Exception ex) {
// Return null so message won't be deleted from SQS
return null;
}
}
private void deleteMessagesBatch(List<Message> messages) {
List<DeleteMessageBatchRequestEntry> entries = messages.stream()
.map(msg ->
new DeleteMessageBatchRequestEntry(msg.getMessageId(),msg.getReceiptHandle()))
.collect(Collectors.toList());
// batch delete can return success, but individual messages can fail,
//we have to retry failed messages
DeleteMessageBatchResult deleteMessageBatchResult = amazonSQSClient
.deleteMessageBatch(new DeleteMessageBatchRequest(tutorialSQS, entries));
deleteMessageBatchResult.getFailed()
.forEach(m ->
amazonSQSClient.deleteMessage(
new DeleteMessageRequest(tutorialSQS,
messages.get(Integer.parseInt(m.getId())).getReceiptHandle())));
}
}
deleteQueue metoduÖrnek
public DeleteQueueResult removeQueue(String queueName) {
return amazonSQS.deleteQueue(queueName);
}
listQueues metodu
Örnek
import com.amazonaws.services.sqs.model.*;
public ListQueuesResult listQueues() {
return amazonSQS.listQueues();
}
receiveMessage metodu
Örnek
import com.amazonaws.services.sqs.AmazonSQS;
public List<Message> receiveMessages(final String queueUrl) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
receiveMessageRequest.setQueueUrl(queueUrl);
receiveMessageRequest.setWaitTimeSeconds(5);
receiveMessageRequest.setMaxNumberOfMessages(5);
return amazonSQS.receiveMessage(receiveMessageRequest).getMessages();
}
sendMessage metodu
Örnek
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
public SendMessageResult publishMessage(String queueUrl, String message) {
ObjectMapper objectMapper = new ObjectMapper();
SendMessageRequest sendMessageRequest = null;
try {
sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl)
.withMessageBody(objectMapper.writeValueAsString(message))
.withMessageDeduplicationId(UUID.randomUUID().toString());
return amazonSQS.sendMessage(sendMessageRequest);
} catch (JsonProcessingException e) {
...
} catch (Exception e) {
...
}
return null;
}
AmazonSQSAsync Sınıfı
constructor
AmazonSQSAsyncClientBuilder.standard() metodu kullanılır
Örnek
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
@Configuration
public class SQSConfig {
@Value("${cloud.aws.region.static}")
private String region;
@Value("${cloud.aws.credentials.access-key}")
private String accessKey;
@Value("${cloud.aws.credentials.secret-key}")
private String secretKey;
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSQSAsync());
}
@Bean
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion(region)
.withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
.build();
}
}
Örnek
Mesajların niteliklerini (attribute) almak için şöyle
yaparızsqs.receiveMessage(receiveMessageRequest.withMessageAttributeNames(“myAttribute”))
.getMessages()
Açıklaması
şöyle. Yani SQS üzerinden geçen her trafik için para ödeniyor. Bu yüzden attribute'ları teker teker çekebilme imkanı var. Sadece hangilerini çektiğimizi veya çekmediğimizi hatırlamak gerekiyor.
For background, AWS services in general, and SQS in particular, are optimized to lower traffic because you pay for all data you want to get out of AWS. If you used an SQS queue from outside of AWS, you would like to make sure you only get the data you use to save money and increase throughput.
The AWS SDK lets you define which attributes of the message you need to get this optimization going. But later, when you access attributes, you have to keep in mind which you have fetched; otherwise, you will find something akin to a null-pointer exception: accessing data that isn’t there.
QueueMessageChannel SınıfıMesaj gönderir.
send metodu
Örnek
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.awspring.cloud.messaging.core.QueueMessageChannel;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
import io.awspring.cloud.messaging.listener.annotation.SqsListener;
@RestController
public class SQSController {
private String queueUrl = "---YOUR_QUEUE_URL---";
@Autowired
private final AmazonSQSAsync amazonSqs;
@Autowired
public SQSController(AmazonSQSAsync amazonSQSAsync) {
this.amazonSqs = amazonSQSAsync;
}
//listener
@SqsListener(value = "---YOUR_QUEUE_NAME---",
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void processMessage(String message) {
System.out.print("Message from SQS {" + message + "}");
}
//add new message in queue
@PostMapping("send-message")
public ResponseEntity<Object> sendMessage(@RequestBody Map<String,String> msg){
MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, queueUrl);
Message msgs = MessageBuilder.withPayload(msg.get("msg"))
.build();
long waitTimeoutMillis = 5_000;
boolean sentStatus = messageChannel.send(msgs,waitTimeoutMillis);
return new ResponseEntity<>("Message Send Successfully", HttpStatus.OK);
}
}