Şöyle yaparız
{"name": "kafka-cosmosdb-sink","config": {"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector","tasks.max": "1","topics": "myserver.retail.orders_info","contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com","loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>","datastax-java-driver.advanced.connection.init-query-timeout": 5000,"ssl.hostnameValidation": true,"ssl.provider": "JDK","ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>","ssl.keystore.password": "<keystore password: it is 'changeit' by default>","port": 10350,"maxConcurrentRequests": 500,"maxNumberOfRecordsInBatch": 32,"queryExecutionTimeout": 30,"connectionPoolLocalSize": 4,"auth.username": "<Azure Cosmos DB user name (same as account name)>","auth.password": "<Azure Cosmos DB password>","topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time","topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time","key.converter": "org.apache.kafka.connect.storage.StringConverter","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","offset.flush.interval.ms": 10000}}
Hiç yorum yok:
Yorum Gönder