10 Ağustos 2021 Salı

Debezium

Debezium sanırım en popüler CDC aracı. Açıklaması şöyle.
Its name comes from the combination of “DB” (a.k.a Database) and “-ium” (a very common suffix found in many elements of the periodic table). Debezium platform uses Kafka as Data Change Events Log. From version 1.2.x Debezium included support for CloudEvents format, which open the possibility to seamlessly integration to Event-Driven Architectures.

The pluggable model, used by many CDC tools like Debezium, can be useful for Database replication, feed analytics systems, produce KPIs, and populate caches. A monolith system can, for instance, start to produce CDC events that can feed new individual databases during migration from Monoliths to Microservices. Or an event from a relational Database can seamlessly feed a NoSQL database.
Şeklen şöyle
Debezium ve Kafka Connect İlişkisi

Source Connectors
Açıklaması şöyle
Debezium currently ships connectors for MySQL, PostgreSQL, SQL Server, Oracle, Db2, and MongoDB.
Debezium as a library
Şu satırı dahil ederiz
<dependency>
 <groupId>io.debezium</groupId>
 <artifactId>debezium-api</artifactId>
 <version>${version.debezium}</version>
</dependency>

<dependency>
 <groupId>io.debezium</groupId>
 <artifactId>debezium-embedded</artifactId>
 <version>${version.debezium}</version>
</dependency>
Eğer MySQL Connector için şu satırı dahil ederiz
<dependency>
<groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${version.debezium}</version> </dependency>
Debezium as a standalone server
Açıklaması şöyle
The Debezium server is configured to use one of the Debezium source connectors to capture changes from the source database. Change events can be serialized to different formats like JSON or Apache Avro and then sent to one of the various messaging infrastructures such as Amazon Kinesis, Google Cloud Pub/Sub, or Apache Pulsar.
PostgreSQL Connector
1. Debezium Sürümü 0.10 ve Büyükse
Açıklaması şöyle
A logical decoding output plugin is no longer needed if you use a Debezium version greater than 0.10. As of Debezium 0.10, the connector supports PostgreSQL 10+ logical replication streaming using pgoutput, which emits changes directly from the replication stream.
2 Debezium Sürümü Küçükse
Açıklaması şöyle
As of PostgreSQL 9.4, logical decoding is implemented by decoding the contents of the write-ahead log and processing them in a user-friendly manner with the help of an output plugin. The output plugin enables clients to consume the changes.
Debezium has a Postgres Connector that works with the following output plugins.
- protobuf to encode changes in Protobuf format.
- wal2json to encode changes in JSON format.
wal2json kurulumu
Postgre içinde şöyle yaparız
$ apt-get update && apt-get install postgresql-13-wal2json
Registering the PostgreSQL connector with Kafka Connect
Debezium'a bir Json post etmek gerekiyor. 

Açıklaması şöyle. Yani önce bir snapshot oluşturulur, daha sonra değişiklikler işlenmeye başlanır.
Once connected, Debezium will perform an initial snapshot of your data and emit change events to a Kafka Topic. Then, services can consume the topics and act on them.
Örnek
Elimizde şöyle bir kod olsun
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.Transformation; public class OutboxTransformer<R extends ConnectRecord<R>> implements Transformation<R> { @Override public R apply(R record) { Struct kStruct = (Struct) record.value(); String databaseOperation = kStruct.getString("op"); if ("c".equalsIgnoreCase(databaseOperation)) { Struct after = (Struct) kStruct.get("after"); String UUID = after.getString("id"); String payload = after.getString("payload"); String eventName = after.getString("event_name").toLowerCase(); String topic = eventName.toLowerCase(); Headers headers = record.headers(); headers.addString("eventId", UUID); // Prepare the event to be published. record = record.newRecord(topic, null, Schema.STRING_SCHEMA, UUID, null, payload, record.timestamp(), headers); } return record; } @Override public ConfigDef config() {return new ConfigDef();} @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
Bu kodu build etmek için şöyle yaparız
$ cd outbox-transformer
$ ./gradlew clean build
$ docker build -t outbox-transformer .
Docker dosyamız şöyledir
FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer

RUN mkdir $DEBEZIUM_DIR
COPY build/libs/outbox-transformer-0.0.1-SNAPSHOT.jar $DEBEZIUM_DIR
Docker Compose
Docker Compose ve Debezium yazısına taşıdım

PosgtreSQL Connector
PosgtreSQL Connector yazısına taşıdım

2. MySQL Connector

Registering the MySQL connector with Kafka Connect
Açıklaması şöyle
You can register the MySQL connector by sending a POST request to the Kafka Connect API.
Örnek
Şöyle yaparız
{
  "name": "delayed-email-message",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "rootpass",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "emails",
    "database.history.kafka.bootstrap.servers": "kafka:9093",
    "database.history.kafka.topic": "delayed.emails.history",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "table.whitelist": "emails.delayed_messages",
    "transforms": "Reroute, filter, unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)(delayed_messages)$",
    "transforms.Reroute.topic.replacement": "email.execution",
    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.topic.regex": "email.execution",
    "transforms.filter.condition": "value.op == \"u\" && value.before.is_ready == false && value.after.is_ready == true"
  }
}
Açıklaması şöyle
The connector is configured on the delayed_messages table (lines 19), and will only respond to an update operation that has is_ready=false, changed to is_ready=true.

Let’s have a closer look at the transformers section
ExtractNewRecordState — is used for message flattening
Reroute — changes the default Debezium Kafka topic to “email.execution”
Filter — filters messages with update operation, that changes its is_ready flag from false to true. That ensures that the Debezium connector produces a message for this specific update operation only, and ignores any other operations
Oracle Connector
Debezium Connector yazısına taşıdım

Kafka topics
Açıklaması şöyle
By default, Debezium creates a Kafka topic for each table in a database. The naming convention is similar to the following.
server.database.table
Yani dbserver1 sunucusundaki inventory veri tabanındaki customers tablosu için şöyledir
dbserver1.inventory.customers




Hiç yorum yok:

Yorum Gönder