23 Şubat 2023 Perşembe

Apache Kafka Connect JdbcSinkConnector

Giriş
Örneğin MySQL'den okumak için io.debezium.connector.mysql.MySqlConnector kullanılır ama PostgreSQL'e yazmak için io.confluent.connect.jdbc.JdbcSinkConnector kullanılır

1. Connector'a bir isim verilir
2. connector.class her zaman io.confluent.connect.jdbc.JdbcSinkConnector olarak belirtilir.

3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
connection.url
connection.user
connection.password

4. Tüketilecek topicler topics ile belirtilir. Açıklaması şöyle
Topics to subscribe for updates.

5. Eğer hedef veri tabanında tablo yoksa auto.create ile yaratılması istenebilir. Açıklaması şöyle
Auto-create the schema objects if they do not exist e.g. the table will be auto-created during the initial sync.

Örnek
Şöyle yaparız
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json"
--data @postgres-sink-btc.json http://localhost:8083/connectors
Dosyanın içi şöyledir
{
  "name": "postgres-sink-btc",
  "config": {
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":"1",
    "topics": "topic_BTC",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
    "connection.user": "admin",
    "connection.password": "quest",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",
    "auto.create": "true",
    "insert.mode": "insert",
    "pk.mode": "none"
  }
}
Açıklaması şöyle
topics: Kafka topic to consume and convert into Postgres format.

connection: Using default credentials for QuestDB (admin/quest) on port 8812.

value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON. 
Örnek
Şöyle yaparız
name=test-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:postgresql://postgres-postgresql:5432/test?user=postgres&password=<postgresql-password>
topics=students
dialect.name=PostgreSqlDatabaseDialect
auto.create=true
insert.mode=upsert
pk.fields=id
pk.mode=record_value
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms* alanı için açıklama şöyle
This is another SMT provided by Debezium that we are going to use. By default, the structure of debezium is complex and consists of multiple levels of information including event key schema, event key payload, event value schema, event value payload (For details refer to Connector Documentation). Even in the event value payload section, we have multiple structures for values before and after. Of these, we are only interested in the final payload and that is what this SMT provides us with. It unwraps the original message and provides a relevant section. Note that we have applied this SMT after the data is saved in Kafka and before it is inserted in PostgreSQL so that Kafka remains a source of truth and has all the information (if and when required).

Hiç yorum yok:

Yorum Gönder