Giriş
Örneğin MySQL'den okumak için io.debezium.connector.mysql.MySqlConnector kullanılır ama PostgreSQL'e yazmak için io.confluent.connect.jdbc.JdbcSinkConnector kullanılır
1. Connector'a bir isim verilir
2. connector.class her zaman io.confluent.connect.jdbc.JdbcSinkConnector olarak belirtilir.
3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
connection.url
connection.user
connection.password
4. Tüketilecek topicler topics ile belirtilir. Açıklaması şöyle
Topics to subscribe for updates.
5. Eğer hedef veri tabanında tablo yoksa auto.create ile yaratılması istenebilir. Açıklaması şöyle
Auto-create the schema objects if they do not exist e.g. the table will be auto-created during the initial sync.
Şöyle yaparız
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json"Dosyanın içi şöyledir
--data @postgres-sink-btc.json http://localhost:8083/connectors
{ "name": "postgres-sink-btc", "config": { "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics": "topic_BTC", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false", "connection.user": "admin", "connection.password": "quest", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "true", "auto.create": "true", "insert.mode": "insert", "pk.mode": "none" } }Açıklaması şöyle
topics: Kafka topic to consume and convert into Postgres format.connection: Using default credentials for QuestDB (admin/quest) on port 8812.value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON.
Örnek
Şöyle yaparız
name=test-jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 connection.url=jdbc:postgresql://postgres-postgresql:5432/test?user=postgres&password=<postgresql-password> topics=students dialect.name=PostgreSqlDatabaseDialect auto.create=true insert.mode=upsert pk.fields=id pk.mode=record_value transforms=unwrap transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms* alanı için açıklama şöyle
This is another SMT provided by Debezium that we are going to use. By default, the structure of debezium is complex and consists of multiple levels of information including event key schema, event key payload, event value schema, event value payload (For details refer to Connector Documentation). Even in the event value payload section, we have multiple structures for values before and after. Of these, we are only interested in the final payload and that is what this SMT provides us with. It unwraps the original message and provides a relevant section. Note that we have applied this SMT after the data is saved in Kafka and before it is inserted in PostgreSQL so that Kafka remains a source of truth and has all the information (if and when required).
Hiç yorum yok:
Yorum Gönder