Giriş
Bir çeşit message queue. Amazon tarafından sunulan ilk servislerden birisi. Açıklaması şöyle
The simple query service (SQS) was one of the first services AWS offered.
Açıklaması şöyle
Internally, SQS uses elastic-mq to perform all the operationsSQS provides two types of messaging queues :- FIFO Queue (First In First Out Order)- Standard Queue (Best Effort Ordering)
SQS'in bazı özellikleri şöyle
AWS Simple Queue Service (SQS) offers a reliable, highly scalable serverless hosted queue for storing messages and easily moving data between distributed application components.- Allows relatively smaller message size up to 256 KB- Each message can be processed independently- Auto scales to dynamically increase read throughput by scaling the number of tasks reading from the queue- Provides messaging semantics (such as message-level acknowledgment/fail) and message visibility timeout- 120,000 limit for the number of inflight messages for a standard queue and 20,000 for a FIFO queu
Bazı SQS Attribute'ları şöyle
Naming: A FIFO queue can be identified with the name having an extension of “.fifo”, else it is a Standard QueueVisibility Timeout: This is the length of time that a message received from a queue (by one consumer) will not be visible to the other message consumers. This can be from 0 seconds to max 12 hours.Basically this means, once a message has been popped out of the queue, it will be visible to the consumer only who requested it and will not be visible for the configured time to anyone else. Once, the time period is completed the message goes back to the queue.Message retention period: This is the time period till which the message will be retained in the queue. This can be between 1 minute to max 4 days.Say, we send a message to the queue, then the message will be retained in the queue for the configured retain period and after that it will be automatically deleted.Delivery delay: This is the time period which states that after you make a send message call to queue, the message will be physically present after a certain delay. This can be configured to be in between 0 seconds to 15 minutes.Maximum message size: This is self explanatory. The message size can be configured in between 1 KB to 256 KB.Receive message wait time: This is the maximum amount of time that polling or consumer will wait for messages to become available to receive. The time period is configurable between 0 seconds to 20 secondsContent Based Deduplication: This is an optional configuration only for FIFO queues. If this is enabled, it makes message de-duplication optional.Dead Letter Queue: This is one of the features AWS SQS provides which helps in moving trouble-some messages automatically.Let’s say, a message in the queue is unable to get processed due to some faulty message content and keeps on failing. This message can be moved into a separate queue after some particular retries and can be debugged separately.
Duplicate SQS Messages
Açıklaması şöyle. Yani SQS çift mesajları anlayıp elemek için iyi bir seçenek değil. DynamoDB daha iyi bir seçenek olabilir.
You’re adding messages to a queue and don’t know if you already added a message. This issue can’t be solved with SQS; you would have to either use another service or filter messages before adding them to the queue.
SQS isn’t made for such a use case, so you can’t prevent a queue from accepting items it already holds. If you have this constraint, you need to choose a more complex service like DynamoDB, which allows you to mark documents as unique. Step Functions might also be an option here, but this depends mainly on your use case.
Örnek
SQS + Lambda + Dynamo kullanan bir örnek var. Şeklen şöyle. SQS düğümleri kırmızı renkte simgelerle gösteriliyor.
AWS Kinesis v/s AWS SQS
Açıklaması şöyle
Message retentionKinesis — Stores records for 24 hours by default, can retain streaming data for up to 7 daysSQS — Can configure message retention period from 1 minute to 14 days, default is 4 daysMessage retryKinesis — No delivery guarantee, service consuming data can retry as long as data is present, consumer doesn’t remove data, ordering is also guaranteed in case of re-drive itemsSQS — At-least-once delivery guarantee, continues to republish unacknowledged message until the message reaches max age as per Message Retention PeriodFailure handlingKinesis — No Dead Letter Queue (DLQ) out of the box, every application that consumes the stream has to deal with failure on its ownSQS — If consumer fails to process message within visibility timeout, the message is sent to a configured DLQ and can be received again in SQSNumber of consumersKinesis — Supports multiple consumers capabilities, same data records can be processed at the same time by multiple consumers or different time by the same consumerSQS — Supports only single consumer at a time, once a message has been acknowledged it’s deleted from the queueOrdering of recordsKinesis — Supports in-order processing within a shard however, no kind of ordering between shards can be guaranteedSQS — Standard SQS queue doesn’t guarantee in-order processing however FIFO queues support it but with limitationsRouting of recordsKinesis — Related records can be routed to the same record processorSQS — No support for routingOther AWS services supportKinesis — Can send stream records directly to services such as Amazon S3, Amazon Redshift, Amazon ElasticSearch, Splunk, AWS LambdaSQS — Other services can be integrated through AWS Lambda
Sample use-cases
Açıklaması şöyle
Kinesis — Log and Event Data Collection, Real-time Analytics, Mobile Data Capture, Internet of Things Data FeedSQS — Application integration, Decouple microservices, Decouple live user requests from intensive background work, Batch messages for future processing
Maven
Şu satırı dahil ederiz
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>1.12.341</version> </dependency>
Gradle
Şu satırı dahil ederiz
implementation 'io.awspring.cloud:spring-cloud-starter-aws-messaging:2.3.5'
SQS API
AmazonSQS Sınıfı
AmazonSQSClientBuilder.standard() metodu kullanılır
Örnek
Şöyle yaparız
@Configuration public class AWSConfig { public AWSCredentials credentials() { AWSCredentials credentials = new BasicAWSCredentials("accesskey","secretkey"); return credentials; } @Bean public AmazonSQS amazonSQS() { return AmazonSQSClientBuilder .standard() .withEndpointConfiguration(getEndpointConfiguration("http://localhost:4566")) .withCredentials(new AWSStaticCredentialsProvider(credentials())) .build(); } private AwsClientBuilder.EndpointConfiguration getEndpointConfiguration(String url) { return new AwsClientBuilder.EndpointConfiguration(url, Regions.US_EAST_1.getName()); } }
Örnek
Şöyle yaparız
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; @Configuration public class AWSConfig { @Value("${cloud.aws.access-key}") private String awsAccessKey; @Value("${cloud.aws.secret-key}") private String awsSecretKey; @Bean public AWSCredentials credentials() { return new BasicAWSCredentials(awsAccessKey, awsSecretKey); } } @Configuration public class SQSConfig { private final AWSCredentials credentials; @Value("${cloud.aws.region.static}") private String awsRegion; @Bean public AmazonSQS amazonSQSClient() { return AmazonSQSClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withRegion(awsRegion) .build(); } }
createQueue metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.model.*; public CreateQueueResult createQueue(final String queueName) { //TODO: create with createObjectRequest params //CreateQueueRequest createRequest = new CreateQueueRequest(queueName) // .addAttributesEntry("DelaySeconds", "60") // .addAttributesEntry("MessageRetentionPeriod", "86400"); return amazonSQS.createQueue(queueName); }
deleteMessage metodu
Örnek
Şöyle yaparız
void deleteMessage(Message message) { log.info("Deleting message with id {}", message.getMessageId()); amazonSQSClient.deleteMessage( new DeleteMessageRequest(tutorialSQS, message.getReceiptHandle())); log.info("Message with id {} successfully deleted.", message.getMessageId()); }
deleteMessageBatch metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; @Service public class PurchaseTransactionListenerService { private final AmazonSQS amazonSQSClient; @Value("${cloud.aws.sqs.url}") private String tutorialSQS; @Value("${cloud.aws.sqs.batch-size}") private Integer batchSize; @Value("${cloud.aws.sqs.poll-wait-time-sec}") private Integer pollWaitTimeInSeconds; @Value("${cloud.aws.sqs.parallel-processing}") private boolean isParallelProcessing; @Scheduled(fixedDelayString = "${cloud.aws.sqs.fixed-poll-rate}") public void messageInBatchListener() { ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(tutorialSQS) .withMaxNumberOfMessages(batchSize) .withWaitTimeSeconds(pollWaitTimeInSeconds); List<Message> messages = amazonSQSClient.receiveMessage(receiveRequest) .getMessages(); List<PurchaseTransactionEntity> purchaseTransactionEntities = messages.stream() .map(this::processMessage) .filter(Objects::nonNull) .collect(Collectors.toList()); List<Message> processed = messages.stream() .filter(m -> Boolean.parseBoolean(m.getAttributes().get("processed"))).toList(); deleteMessagesBatch(processed); } private PurchaseTransactionEntity processMessage(Message message) { try { String body = message.getBody(); PurchaseTransactionEntity purchaseTransactionEntity = mapper.readValue(body, PurchaseTransactionEntity.class); //Do processing here //Mark the message as processed message.addAttributesEntry("processed", "true"); return purchaseTransactionEntity; } catch (Exception ex) { // Return null so message won't be deleted from SQS return null; } } private void deleteMessagesBatch(List<Message> messages) { List<DeleteMessageBatchRequestEntry> entries = messages.stream() .map(msg -> new DeleteMessageBatchRequestEntry(msg.getMessageId(),msg.getReceiptHandle())) .collect(Collectors.toList()); // batch delete can return success, but individual messages can fail, //we have to retry failed messages DeleteMessageBatchResult deleteMessageBatchResult = amazonSQSClient .deleteMessageBatch(new DeleteMessageBatchRequest(tutorialSQS, entries)); deleteMessageBatchResult.getFailed() .forEach(m -> amazonSQSClient.deleteMessage( new DeleteMessageRequest(tutorialSQS, messages.get(Integer.parseInt(m.getId())).getReceiptHandle()))); } }
Örnek
Şöyle yaparız
public DeleteQueueResult removeQueue(String queueName) { return amazonSQS.deleteQueue(queueName); }
listQueues metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.model.*; public ListQueuesResult listQueues() { return amazonSQS.listQueues(); }
receiveMessage metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.AmazonSQS; public List<Message> receiveMessages(final String queueUrl) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(); receiveMessageRequest.setQueueUrl(queueUrl); receiveMessageRequest.setWaitTimeSeconds(5); receiveMessageRequest.setMaxNumberOfMessages(5); return amazonSQS.receiveMessage(receiveMessageRequest).getMessages(); }
sendMessage metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.*; public SendMessageResult publishMessage(String queueUrl, String message) { ObjectMapper objectMapper = new ObjectMapper(); SendMessageRequest sendMessageRequest = null; try { sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl) .withMessageBody(objectMapper.writeValueAsString(message)) .withMessageDeduplicationId(UUID.randomUUID().toString()); return amazonSQS.sendMessage(sendMessageRequest); } catch (JsonProcessingException e) { ... } catch (Exception e) { ... } return null; }
AmazonSQSAsync Sınıfı
constructor
AmazonSQSAsyncClientBuilder.standard() metodu kullanılırÖrnek
Şöyle yaparız
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Regions; import com.amazonaws.services.sqs.AmazonSQSAsync; import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import io.awspring.cloud.messaging.core.QueueMessagingTemplate; @Configuration public class SQSConfig { @Value("${cloud.aws.region.static}") private String region; @Value("${cloud.aws.credentials.access-key}") private String accessKey; @Value("${cloud.aws.credentials.secret-key}") private String secretKey; @Bean public QueueMessagingTemplate queueMessagingTemplate() { return new QueueMessagingTemplate(amazonSQSAsync()); } @Bean public AmazonSQSAsync amazonSQSAsync() { return AmazonSQSAsyncClientBuilder .standard() .withRegion(region) .withCredentials( new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey))) .build(); } }
Örnek
Mesajların niteliklerini (attribute) almak için şöyle yaparız
sqs.receiveMessage(receiveMessageRequest.withMessageAttributeNames(“myAttribute”)).getMessages()
Açıklaması şöyle. Yani SQS üzerinden geçen her trafik için para ödeniyor. Bu yüzden attribute'ları teker teker çekebilme imkanı var. Sadece hangilerini çektiğimizi veya çekmediğimizi hatırlamak gerekiyor.
For background, AWS services in general, and SQS in particular, are optimized to lower traffic because you pay for all data you want to get out of AWS. If you used an SQS queue from outside of AWS, you would like to make sure you only get the data you use to save money and increase throughput.QueueMessageChannel Sınıfı
The AWS SDK lets you define which attributes of the message you need to get this optimization going. But later, when you access attributes, you have to keep in mind which you have fetched; otherwise, you will find something akin to a null-pointer exception: accessing data that isn’t there.
Mesaj gönderir.
send metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.sqs.AmazonSQSAsync; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.SendMessageRequest; import io.awspring.cloud.messaging.core.QueueMessageChannel; import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy; import io.awspring.cloud.messaging.listener.annotation.SqsListener; @RestController public class SQSController { private String queueUrl = "---YOUR_QUEUE_URL---"; @Autowired private final AmazonSQSAsync amazonSqs; @Autowired public SQSController(AmazonSQSAsync amazonSQSAsync) { this.amazonSqs = amazonSQSAsync; } //listener @SqsListener(value = "---YOUR_QUEUE_NAME---", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS) public void processMessage(String message) { System.out.print("Message from SQS {" + message + "}"); } //add new message in queue @PostMapping("send-message") public ResponseEntity<Object> sendMessage(@RequestBody Map<String,String> msg){ MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, queueUrl); Message msgs = MessageBuilder.withPayload(msg.get("msg")) .build(); long waitTimeoutMillis = 5_000; boolean sentStatus = messageChannel.send(msgs,waitTimeoutMillis); return new ResponseEntity<>("Message Send Successfully", HttpStatus.OK); } }
Hiç yorum yok:
Yorum Gönder