Giriş
Kinesis, aslında Kafka'ya bakarak geliştirilmiş. Açıklaması şöyle. Kafka'daki partition, Kinesis'te shard olarak isimlendiriliyor.
Kinesis is a promising technology modeled after Kafka, a partition in the latter is a shard in Kinesis.
Shard'ın üst sınırı var. Açıklaması şöyle
Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys)
Kinesis'in bazı özellikleri şöyle
It provides ordering of records, as well as the ability to read and/or replay records in the same order.- Allows record size up to 1 MB- Works at shard level not message level- No auto scaling, developer needs to track shard usage and re-shard the Kinesis stream when necessary- Limited read throughput (5 transactions per second per shard)- Number of shards in a stream determines the maximum throughput- Multiple consumers can be attached to a single stream, and each consumer can process every record individually (thanks to shard-iterators)
Kinesis kullanmak için KCL Client gerekiyor. Açıklaması şöyle
Streaming data such as video processing/uploading are a good fit for Kinesis. But, not all data/records/events should go into Kinesis. It is not a general purpose enterprise event bus or queue.A little context:We once joined on a project midway, and have to support an ETL pipeline that included Kinesis in the deployment. The application collected small json records, and stuffed them into Kinesis with the python boto3 api . On the other side, worker process running inside EC2/ECS were pulling these records with boto3 and processing them. We then discovered that retrieving records out of Kinesis Streams when you have multiple worker is non-trivial.Imagine if you are running 4 workers, and they are all listening to the same Kinesis stream for events. How do we prevent them from working on the same events? In traditional queueing systems, you use locks and owner columns to ensure each task is handled by only one worker. Amazon SQS will mark a message as busy when one of the worker is working on the message, so that other workers do not do duplicate work.To implement the same pattern with Kinesis Stream, you must implement KCL, which is written in Java (nothing wrong with Java). The KCL agent will be spun up as a second daemon process in the background, and send events to your application when it itself receives events.This means: all of your python/ruby/node docker images that handles records from Kinesis must install Java dependencies and run the KCL java process in the background.
Message Queue Olarak Kinesis
Kinesis yerine SNS veya SQS tercih edilebilir. Açıklaması şöyle
It is both cumbersome and difficult to apply Kinesis stream correctly, especially when you are looking for an enterprise queue. You would do better by using SNS/SQS combination, or using a queuing framework that sits on top of Redis or traditional databases.
Data Ingestion veya Strema Olarak Kinesis
Bu kullanım Kinesis için daha uygun. Ancak SQS + Kinesis yerine, direkt Kafka ve Apache Flink kullanılabilir. Migration from Amazon SQS and Kinesis to Apache Kafka and Flink yazındaki bir örnek
Unit Test
LocalStackContainer kullanılır. Şu satırı dahil ederiz
<dependency> <groupId>org.testcontainers</groupId> <artifactId>localstack</artifactId> <version>1.17.3</version> <scope>test</scope> </dependency>
Örnek
Şöyle yaparız
import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.containers.localstack.LocalStackContainer.Service; LocalStackContainer localStack = new LocalStackContainer(parse("localstack/localstack") .withTag("0.12.3")) .withEnv("KINESIS_LATENCY","1000") .withServices(Service.KINESIS); localStack.start();
Daha sonra şöyle yaparız
import com.amazonaws.services.kinesis.AmazonKinesisAsync; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesisAsync; import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder; String endpoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566)); String region = localStack.getRegion(); String accessKey = localStack.getAccessKey(); String secretKey = localStack.getSecretKey(); AmazonKinesisAsyncClientBuilder builder = AmazonKinesisAsyncClientBuilder.standard(); builder.withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(endpoint, region)); builder.withCredentials( accessKey == null ? new DefaultAWSCredentialsProviderChain() : new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)) ); builder.withClientConfiguration(new ClientConfiguration()); builder.withExecutorFactory(() -> executorServiceSupplier.get()); AmazonKinesisAsync kinesis = builder.build();
kinesis.shutdown(); localStack.stop();
Kinesis API
Şu satırı dahil ederiz
<dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-kinesis</artifactId><version>1.12.276</version></dependency>
AmazonKinesisAsync Sınıfı
Şu satırı dahil ederiz
import com.amazonaws.SDKGlobalConfiguration; import com.amazonaws.services.kinesis.AmazonKinesisAsync; import com.amazonaws.services.kinesis.model.PutRecordsResult; import com.amazonaws.services.kinesis.model.Shard; import static com.amazonaws.services.kinesis.model.ShardIteratorType .AFTER_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST; import static com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
createStream metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.kinesis.model.CreateStreamRequest; CreateStreamRequest request = new CreateStreamRequest(); int shardCount = 3 request.setShardCount(shardCount); String stream = "TestStream"; request.setStreamName(stream); CreateStreamResult result = kinesis.createStream(request);
Örnek
Şöyle yaparız. StreamStatus CREATING, DELETING, ACTIVE, UPDATING olabilir.
import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest; import com.amazonaws.services.kinesis.model.StreamDescriptionSummary; import com.amazonaws.services.kinesis.model.StreamStatus; DescribeStreamSummaryRequest request = new DescribeStreamSummaryRequest(); request.setStreamName(stream); StreamDescriptionSummary description = kinesis.describeStreamSummary(request) .getStreamDescriptionSummary(); String statusString = description.getStreamStatus(); StreamStatus status = StreamStatus.valueOf(statusString);
listShards metodu
Örnek
Şöyle yaparız
import com.amazonaws.services.kinesis.model.ListShardsRequest; import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardFilterType; List<Shard> shards = new ArrayList<>(); String nextToken = null; do { ShardFilterType filterType = ShardFilterType.AT_LATEST; //only the currently open shards ListShardsRequest request = ...; ListShardsResult response = kinesis.listShards(request); shards.addAll(response.getShards()); nextToken = response.getNextToken(); } while (nextToken != null);
Hiç yorum yok:
Yorum Gönder