15 Mart 2022 Salı

Apache Kafka Message Delivery Semantics - Exactly Once Yani Kafka Transactions

Giriş
Spring ile nasıl yapılacağını gösteren bir yazı burada. Kafka kısmını detaylı açıklayan yazı da burada.

Exactly Once Nedir?
Açıklaması şöyle
So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a Kafka Streams application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction).
...
So effectively Kafka supports exactly-once delivery in Kafka Streams, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also Kafka Connect). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.
Exactly Once Sadece Kafka Stream İçin Geçerlidir
Açıklaması şöyle
One very important and often missed detail is that Kafka supports exactly-once delivery only in Kafka Streams. To turn it on just change a config option processing.guarantee from at_least_once (default option) to exactly_once_v2.

But even Streams applications have limitations. If your consumer reads events from Kafka and makes changes in the relational database, Kafka won’t revert it. And if your consumer sends SMS-notifications Kafka can’t revert them either, even when Kafka Streams library is used. These are limitations the developer should always keep in mind.

Why do we talk about “reverting” changes? It’s because the only way to handle the message exactly once is to do it in one transaction.
Exactly Once İçin Gereken Şeyler
Exactly once için iki tane şey gerekir
1. Producer İçin Idempotent Writes 
2. Producer veConsumer İçin Transactions

Producer İçin Idempotent writes
Idempotent Write için genel iki çözüm var. Açıklama şöyle. Kafka 2. maddeyi uyguluyor
There are two approaches to getting exactly-once semantics during data production:

1. Use a single writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
2. Include a primary key (UUID or something) in the message and deduplicate it to the consumer.
Producer açısında hata 3 tane hata durumu olabilir. Açıklaması şöyle
... three cases, when the producer doesn’t receive the acknowledgement from the broker and decides to send the message again:

1. The broker didn’t receive the message, so obviously there is no ack
2. The broker received the message, but sending an ack failed
3. The broker received the message and also successfully sent the ack, but it took more than the producer’s waiting timeout

The producer will retry in all the cases, but in two of them(2 and 3) it will lead to a duplicate.

Nor I, nor probably Kafka developers know the way to solve this problem on the producer’s side. Thus all the work for deduplication lies on the broker, who guarantees that the message will be written to the log only once. To achieve this, there is a sequence number assigned to messages (I described a similar approach in the article about the Idempotent Consumer pattern [6]). So, to be exact, it’s not the idempotent producer, but the smart broker, that deduplicates messages.

To enable this functionality in Kafka it’s enough to configure the producer with the enable.idempotence=true option.
Kafka 5 tane producer için Idempotent Write destekler. Açıklaması şöyle
One of the at least once guarantee scenarios given above covered the case of a producer that is unable to determine if a previous publish call succeeded, so pushes the batch of messages again. In previous versions of Kafka, the broker had no means of determining if the second batch is a retry of the previous batch. From Kafka 0.11 onwards, producers can opt-in to idempotent writes (it’s disabled by default), by setting the configuration flag enable.idempotence to true. This causes the client to request a producer id (pid) from a broker. The pid helps the Kafka cluster identify the producer. With idempotence enabled, the producer sends the pid along with a sequence number with each batch of records. The sequence number logically increases by one for each record sent by the same producer. Given the sequence number of the first record in the batch along with the batch record count, the broker can figure out all the sequence numbers for a batch. With idempotence enabled, when the broker receives a new batch of records, if the sequence numbers provided are ones it has already committed, the batch is treated as a retry and ignored (a ‘duplicate’ acknowledgement is sent back to the client).

When idempotent writes first came out in v0.11 the brokers could only deal with one inflight batch at a time per producer in order to guarantee ordering of messages from the same producer. From Kafka 1.0.0, support for idempotent writes with up to 5 concurrent requests (max.in.flight.requests.per.connection=5) from the same producer are now supported. This means you can have up to 5 inflight requests and still be sure they will be written to the log in the correct order. This works even in the face of batch retries or Kafka partition leader changes since in these cases the cluster will reorder them for you.
Varsayılan sayının 5 yerine 1 olması gerektiğini söyleyen bir açıklama şöyle
max.in.flight.requests.per.connection — defaults to 5, which may result in messages being published out-of-order if one (or more) of the enqueued messages times out and is retried. This should have been defaulted to 1.

Producer ve Consumer İçin Transactions
İki başlıkta incelemek lazım

1. Producer İçin Transactions
Açıklaması şöyle
Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.

Transactions are enabled through producer configuration. Clients need to first enable idempotent writes (enable.idempotence=true) and provide a transactional id (transactional.id=my-tx-id). The producer then needs to register itself with the Kafka cluster by calling initTransactions. The transactional id is used to identify the same producer across process restarts. When reconnecting with the same transactional id, a producer will be assigned the same pid and an epoch number associated with that pid will be incremented. Kafka will then guarantee that any pending transactions from previous sessions for that pid will either be committed or aborted before the producer can send any new data. Any attempt by an old zombie instance of the producer with an older epoch number to perform operations will now fail.

Once registered, a producer can send data as normal (outside a transaction) or initiate a new transaction by calling beginTransaction. Only one transaction can be active at a time per producer. From within the transaction, the standard send method can be called to add data to the transaction. Additionally, if a producer is sourcing data from Kafka itself, it can include the progress it is making reading from the source in the transaction by calling sendOffsetsToTransaction. In Kafka the default method for saving consumer progress is to save offsets back to an internal topic in Kafka and hence this action can be included in a transaction.

Once all required messages and offsets are added to the transaction, the client calls commitTransaction to attempt to commit the changes atomically to the Kafka cluster. The client is also able to call abortTransaction if they no longer wish to go ahead with the transaction, likely due to some error.

The producer and brokers do not wait until the transaction is committed before writing the data to the data logs. Instead the brokers write records to the logs as they arrive. Transactional messages are also bracketed with special control messages that indicate where a transaction has started and either committed or aborted. Consumers now have an additional configuration parameter called isolation.level that must be set to either read_uncommitted (the default) or read_committed. When set to read_uncommitted, all messages are consumed as they become available in offset ordering. When set to read_committed, only messages not in a transaction, or messages from committed transactions are read in offset ordering. If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms (default 1 minute).
Açıklaması şöyle
How Kafka transactions work

After the message is written to the Kafka log and the broker guarantees that it was done without duplicates, it should be just handled and written to the next topic in one transaction. But how to do it?

A Kafka transaction is a set of changes written in the log, which itself is stored in the internal Kafka topic. This log is managed by a special entity called Transaction Coordinator. In order to invoke a transaction several steps should be completed:

The consumer finds the Transaction Coordinator. This happens when the application starts. It sends its configured transactionalID (if it exists) to the coordinator and receives the producerID. This is needed in case the application restarts and tries to register itself again with the same transactionalID. When the restarted application starts a new transaction, the Transaction Coordinator aborts all the pending transactions started by the previous instance.
When the application consumes new messages it starts the transaction
When the application writes messages to any other topics it sends this information to its Transaction Coordinator. The coordinator stores information about all the changed partitions in its internal topic.
This is an important detail. Using Kafka Streams API you don’t have to send these messages to the coordinator manually, Streams library will do it for you. But if you write messages to the topic directly, it won’t be written into the transaction log even if this topic is in the same cluster.

Another important thing about transactions is that all the messages written during the transaction will not be exposed to the consumers until this transaction is committed.

4. The transaction commits or fails. If it’s aborted, the coordinator adds an “Abort” mark to the transaction in the internal topic and the same mark to all the messages written during the transaction.

5. When the transaction commits, the process is almost the same. The coordinator adds a “Commit” mark to the transaction and to all the messages. That mark will make these messages available for the consumers.

Don’t you forget that consumer offsets are also stored in their own topic? It means that committing offsets is the same as writing a message to the output topic. And this message can also be marked “Abort” or “Commit” which affects whether the same message will be consumed the second time or not. Obviously, when it’s marked as “Commit”, it will not, and when it’s marked as “Abort” the whole transaction will start from the beginning — consuming messages.
Producer şöyle yapar
producer.initTransactions();
producer.beginTransaction();
sourceOffsetRecords.forEach(producer::send);
outputRecords.forEach(producer::send);
producer.commitTransaction();
Kafka Transactions İle Bazı Problemler
Açıklaması şöyle
A critical point to understand, and why this pattern is often not a good fit to meet the requirements of a messaging application, is that all other actions occurring as part of the processing can still happen multiple times, on the occasions where the original message is redelivered. If for example the application performs REST calls to other applications, or performs writes to a database, these can still happen multiple times. The guarantee is that the resulting events from the processing will only be written once, so downstream transaction aware consumers will not have to cater for duplicates being written to the topic.

...
However database transactions and Kafka transactions are separate, and in order to perform them atomically would need to be done as a distributed transaction, using a ChainedTransactionManager for example in Spring. Using distributed transactions should generally be avoided as there is a significant performance penalty, increased code complexity, and failure scenarios that could leave the two resources (the Kafka broker and the database) with an inconsistent view of the data

Enabling Kafka Transactions For Producer
Açıklaması şöyle
To enable transactions the producer must be configured to enable transactions, which requires setting the producer transactional Id on the producer factory. With this in place, Kafka can now write messages using transactions. This setting also implicitly sets the producer to be idempotent. This means that any transient errors occurring during the message produce does not result in duplicate messages being written. 
...
Finally a transaction manager must be implemented to manage the transaction.

The producing of any outbound message must be surrounded by a transaction. The following is the transactional flow:
  1. First beginTransaction is called
  2. Messages are published by the Producer
  3. The consumer offsets are also sent to the Producer in order that these are included in the transaction.
  4. The commitTransaction is called to complete the transaction.
Bunu Spring ile yapmak kolay. Şöyle yaparız
When using Spring Kafka this boilerplate code is taken care of for the developer. They need only annotate the method responsible for writing the outbound events with @Transactional. Finally wire in a KafkaTransactionManager to the Spring context to make this available for managing the transaction. 
Enabling Kafka Transactions For Consumer
Açıklaması şöyle
In order to guarantee the exactly-once semantics a consumer must be configured with read isolation.level of READ_COMMITTED. This ensures it will not read transactional messages written to topic partitions until the message is marked as committed. (The consumer can however consume non-transactional messages that are written to the topic partition).
...
By default consumers are configured with a read isolation.level of READ_UNCOMMITTED. If a transactional message was written to a topic, for such a consumer this is therefore immediately available for consumption, whether or not the transaction is subsequently committed or aborted.



Hiç yorum yok:

Yorum Gönder