Debezium etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster
Debezium etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster

16 Aralık 2022 Cuma

Debezium JSON Örnekleri

Giriş
Yukarıda schema ile payload için alanlar tanımlıdır. Açıklaması şöyle
before: This field contains the state of the record before the operation. It is optional because it may not be present for all events.
after: This field contains the state of the record after the operation. It is always present for INSERT and UPDATE events.
source: This field contains information about the Debezium connector that generated the event.
op: This field specifies the type of operation performed on the record.
ts_ms: This field specifies the timestamp of the event in milliseconds since the Unix epoch.
transaction: This field contains information about the transaction in which the event occurred. It is optional because it can be null for non-transactional operations.

Read
Örnek
{
  "schema": {
    "type": "struct",
    "fields": [ ... ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "1.6.1.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1630246982521,
      "snapshot": "true",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000008",
      "pos": 154,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "r",
    "ts_ms": 1630246982521,
    "transaction": null
  }
}
Insert İle Yeni Satır - Create
op alanı c yani CREATE gelir. Payload kısmında before ve after bölümleri var. Bu bölümlerde sütun isimleri var. Yeni satır ise before alanı null gelir.
Örnek
   "payload":{ 
      "before":null,
      "after":{ 
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad@acme.org"
      },
      "source":{ 
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}
Update
op alanı u yani UPDATE gelir. Hem before hem de after kısmı doludur
Örnek
{
"payload":{ "before":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad@acme.org" }, "after":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad.mihalcea@acme.org" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369929, "gtid":null, "file":"mysql-bin.000003", "pos":673, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"u", "ts_ms":1500369929464 } }
Delete
op alanı d yani DELETE gelir. after kısmı null gelir
Örnek
{
    "payload":{ 
      "before":{ 
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad.mihalcea@acme.org"
      },
      "after":null,
      "source":{ 
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

Docker ve Debezium

Örnek
Şöyle yaparız
> docker run -it \
--name zookeeper \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 
debezium/zookeeper:0.5
 
> docker run -it \
--name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
debezium/kafka:0.5
 
> docker run -it \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw 
debezium/example-mysql:0.5
 
> docker run -it \
--name kafka-connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
debezium/connect:0.5
 
> docker run -it 
--name kafka-watcher \ 
--link zookeeper:zookeeper \
debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers


7 Kasım 2022 Pazartesi

Docker Compose ve Debezium

Giriş
Debezium aslında 4 tane şeyin aynı anda çalışması ile oluşan bir şey. Bunlar
1. zookeeper
2. kafka
3. kafka-connect
4. debezium

Image olarak şunlar kullanılabilir
debezium/server
debezium/connect
outbox-transformer

Şu değişkenler belirtilir
BOOTSTRAP_SERVERS : Kafka adresi
GROUP_ID
CONFIG_STORAGE_TOPIC
OFFSET_STORAGE_TOPIC
STATUS_STORAGE_TOPIC

Ayrıca şu edeğişkenler ile Avro sunucusu belirtilir
KEY_CONVERTER
VALUE_CONVERTER
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL

Örnek - postgres + debezium + kafka
Şöyle yaparız
version: '3.1'
services:
    postgres:
        ...
    zookeeper:
        ...
    kafka:
        ...
    connector:
        image: debezium/connect:latest
        ports:
          - "8083:8083"
        environment:
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: my_connect_configs
          OFFSET_STORAGE_TOPIC: my_connect_offsets
          BOOTSTRAP_SERVERS: kafka:9092
        depends_on:
          - zookeeper
          - postgres
          - kafka
Örnek - postgres + debezium + kafka
Şöyle yaparız
services:
  db:
    ...

  zookeeper:
    ...

  kafka:
    ...

  connect:
    image: debezium/connect
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - zookeeper
      - kafka
Örnek - postgres + debezium + kafka
Şöyle yaparız
version: "3.5"

services:
  # Install postgres and setup the user service database
  postgres:
    ...

  # Install zookeeper.
  zookeeper:
   ...

  # Install kafka and create needed topics.
  kafka:
    ...

  # Install debezium-connect and add outbox-transformer here.
  debezium-connect:
    container_name: custom-debezium-connect
    image: outbox-transformer
    hostname: debezium-connect
    ports:
      - '8083:8083'
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_connect_config
      OFFSET_STORAGE_TOPIC: debezium_connect_offsets
      STATUS_STORAGE_TOPIC: debezium_connect_status
      BOOTSTRAP_SERVERS: kafka:29092
    depends_on:
      - kafka
      - postgres
Örnek - postgres + debezium + kafka + avro
Şöyle yaparız
version: “3.7”
services:
  postgres:
   ...
  zookeeper:
   ...
  kafka:
    ...
  kafka-ui:
   ...
  debezium:
    image: debezium/connect:1.4
    environment:
    BOOTSTRAP_SERVERS: kafka:9092
    GROUP_ID: 1
    CONFIG_STORAGE_TOPIC: connect_configs
    OFFSET_STORAGE_TOPIC: connect_offsets
    KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
    VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    depends_on: [kafka]
    ports:
      — 8083:8083
  schema-registry:
   ...
Örnek
Şöyle yaparız. Burada Debezium Kafka yerine Redis'e yazıyor. Connector ayarları conf dizinindeki application.properties dosyasında
version: '3.1'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis


10 Ağustos 2021 Salı

Debezium

Debezium sanırım en popüler CDC aracı. Açıklaması şöyle.
Its name comes from the combination of “DB” (a.k.a Database) and “-ium” (a very common suffix found in many elements of the periodic table). Debezium platform uses Kafka as Data Change Events Log. From version 1.2.x Debezium included support for CloudEvents format, which open the possibility to seamlessly integration to Event-Driven Architectures.

The pluggable model, used by many CDC tools like Debezium, can be useful for Database replication, feed analytics systems, produce KPIs, and populate caches. A monolith system can, for instance, start to produce CDC events that can feed new individual databases during migration from Monoliths to Microservices. Or an event from a relational Database can seamlessly feed a NoSQL database.
Şeklen şöyle
Debezium ve Kafka Connect İlişkisi

Source Connectors
Açıklaması şöyle
Debezium currently ships connectors for MySQL, PostgreSQL, SQL Server, Oracle, Db2, and MongoDB.
Debezium as a library
Şu satırı dahil ederiz
<dependency>
 <groupId>io.debezium</groupId>
 <artifactId>debezium-api</artifactId>
 <version>${version.debezium}</version>
</dependency>

<dependency>
 <groupId>io.debezium</groupId>
 <artifactId>debezium-embedded</artifactId>
 <version>${version.debezium}</version>
</dependency>
Eğer MySQL Connector için şu satırı dahil ederiz
<dependency>
<groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${version.debezium}</version> </dependency>
Debezium as a standalone server
Açıklaması şöyle
The Debezium server is configured to use one of the Debezium source connectors to capture changes from the source database. Change events can be serialized to different formats like JSON or Apache Avro and then sent to one of the various messaging infrastructures such as Amazon Kinesis, Google Cloud Pub/Sub, or Apache Pulsar.
PostgreSQL Connector
1. Debezium Sürümü 0.10 ve Büyükse
Açıklaması şöyle
A logical decoding output plugin is no longer needed if you use a Debezium version greater than 0.10. As of Debezium 0.10, the connector supports PostgreSQL 10+ logical replication streaming using pgoutput, which emits changes directly from the replication stream.
2 Debezium Sürümü Küçükse
Açıklaması şöyle
As of PostgreSQL 9.4, logical decoding is implemented by decoding the contents of the write-ahead log and processing them in a user-friendly manner with the help of an output plugin. The output plugin enables clients to consume the changes.
Debezium has a Postgres Connector that works with the following output plugins.
- protobuf to encode changes in Protobuf format.
- wal2json to encode changes in JSON format.
wal2json kurulumu
Postgre içinde şöyle yaparız
$ apt-get update && apt-get install postgresql-13-wal2json
Registering the PostgreSQL connector with Kafka Connect
Debezium'a bir Json post etmek gerekiyor. 

Açıklaması şöyle. Yani önce bir snapshot oluşturulur, daha sonra değişiklikler işlenmeye başlanır.
Once connected, Debezium will perform an initial snapshot of your data and emit change events to a Kafka Topic. Then, services can consume the topics and act on them.
Örnek
Elimizde şöyle bir kod olsun
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.Transformation; public class OutboxTransformer<R extends ConnectRecord<R>> implements Transformation<R> { @Override public R apply(R record) { Struct kStruct = (Struct) record.value(); String databaseOperation = kStruct.getString("op"); if ("c".equalsIgnoreCase(databaseOperation)) { Struct after = (Struct) kStruct.get("after"); String UUID = after.getString("id"); String payload = after.getString("payload"); String eventName = after.getString("event_name").toLowerCase(); String topic = eventName.toLowerCase(); Headers headers = record.headers(); headers.addString("eventId", UUID); // Prepare the event to be published. record = record.newRecord(topic, null, Schema.STRING_SCHEMA, UUID, null, payload, record.timestamp(), headers); } return record; } @Override public ConfigDef config() {return new ConfigDef();} @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
Bu kodu build etmek için şöyle yaparız
$ cd outbox-transformer
$ ./gradlew clean build
$ docker build -t outbox-transformer .
Docker dosyamız şöyledir
FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer

RUN mkdir $DEBEZIUM_DIR
COPY build/libs/outbox-transformer-0.0.1-SNAPSHOT.jar $DEBEZIUM_DIR
Docker Compose
Docker Compose ve Debezium yazısına taşıdım

PosgtreSQL Connector
PosgtreSQL Connector yazısına taşıdım

2. MySQL Connector

Registering the MySQL connector with Kafka Connect
Açıklaması şöyle
You can register the MySQL connector by sending a POST request to the Kafka Connect API.
Örnek
Şöyle yaparız
{
  "name": "delayed-email-message",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "rootpass",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "emails",
    "database.history.kafka.bootstrap.servers": "kafka:9093",
    "database.history.kafka.topic": "delayed.emails.history",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "table.whitelist": "emails.delayed_messages",
    "transforms": "Reroute, filter, unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)(delayed_messages)$",
    "transforms.Reroute.topic.replacement": "email.execution",
    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.topic.regex": "email.execution",
    "transforms.filter.condition": "value.op == \"u\" && value.before.is_ready == false && value.after.is_ready == true"
  }
}
Açıklaması şöyle
The connector is configured on the delayed_messages table (lines 19), and will only respond to an update operation that has is_ready=false, changed to is_ready=true.

Let’s have a closer look at the transformers section
ExtractNewRecordState — is used for message flattening
Reroute — changes the default Debezium Kafka topic to “email.execution”
Filter — filters messages with update operation, that changes its is_ready flag from false to true. That ensures that the Debezium connector produces a message for this specific update operation only, and ignores any other operations
Oracle Connector
Debezium Connector yazısına taşıdım

Kafka topics
Açıklaması şöyle
By default, Debezium creates a Kafka topic for each table in a database. The naming convention is similar to the following.
server.database.table
Yani dbserver1 sunucusundaki inventory veri tabanındaki customers tablosu için şöyledir
dbserver1.inventory.customers