12 Ocak 2023 Perşembe

Apache Kafka ksqlDB Streams and Tables

Giriş
1. CRATE STREAM ile kafka topic'ten okuyan bir stream yaratılır
2. CREATE TABLE ile stream bir materialized view haline getirilir
Şeklen şöyle. Stream ile aggregation yapılır. Table ile de join'ler yapılır.


CREATE TABLE
Örnek - Primary Key
Şöyle yaparız
CREATE TABLE MyTable ( sensorId VARCHAR PRIMARY KEY, timestamp VARCHAR, value DECIMAL(19,5) ) WITH ( KAFKA_TOPIC = '<x>-telemetry', VALUE_FORMAT = 'JSON' ); SELECT * FROM MyTable; SELECT * FROM MyTable WHERE sensorId = "127";
Örnek - ETL
Debezium source tanımlamak için şöyle yaparız
# Create customers, products ve orders topics CREATE SOURCE CONNECTOR `mysql-connector` WITH( "connector.class"= 'io.debezium.connector.mysql.MySqlConnector', "tasks.max"= '1', "database.hostname"= 'mysql', "database.port"= '3306', "database.user"= 'root', "database.password"= 'debezium', "database.server.id"= '184054', "database.server.name"= 'dbserver1', "database.whitelist"= 'inventory', "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders', "database.history.kafka.bootstrap.servers"= 'kafka:9092', "database.history.kafka.topic"= 'schema-changes.inventory', "transforms"= 'unwrap', "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState', "key.converter"= 'org.apache.kafka.connect.json.JsonConverter', "key.converter.schemas.enable"= 'false', "value.converter"= 'org.apache.kafka.connect.json.JsonConverter', "value.converter.schemas.enable"= 'false');
Bu 3 tabloyu birleştirmek için şöyle yaparız
# Join all streams CREATE STREAM S_CUSTOMER (ID INT, FIRST_NAME string, LAST_NAME string, EMAIL string) WITH (KAFKA_TOPIC='dbserver1.inventory.customers', VALUE_FORMAT='json'); CREATE TABLE T_CUSTOMER AS SELECT id, latest_by_offset(first_name) as fist_name, latest_by_offset(last_name) as last_name, latest_by_offset(email) as email FROM s_customer GROUP BY id EMIT CHANGES; CREATE STREAM S_PRODUCT (ID INT, NAME string, description string, weight DOUBLE) WITH (KAFKA_TOPIC='dbserver1.inventory.products', VALUE_FORMAT='json'); CREATE TABLE T_PRODUCT AS SELECT id, latest_by_offset(name) as name, latest_by_offset(description) as description, latest_by_offset(weight) as weight FROM s_product GROUP BY id EMIT CHANGES; CREATE STREAM s_order ( order_number integer, order_date timestamp, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json'); CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id from s_order as o left join t_product as p on o.product_id = p.id left join t_customer as c on o.purchaser = c.id partition by o.order_number emit changes;
Sonucu PostgreSQL'e yazmak için şöyle yaparız
CREATE SINK CONNECTOR `postgres-sink` WITH( "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector', "tasks.max"= '1', "dialect.name"= 'PostgreSqlDatabaseDialect', "table.name.format"= 'ENRICHED_ORDER', "topics"= 'SA_ENRICHED_ORDER', "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw', "auto.create"= 'true', "insert.mode"= 'upsert', "pk.fields"= 'ORDER_NUMBER', "pk.mode"= 'record_key', "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter', "key.converter.schemas.enable" = 'false', "value.converter"= 'io.confluent.connect.avro.AvroConverter', "value.converter.schemas.enable" = 'true', "value.converter.schema.registry.url"= 'http://schema-registry:8081' );
CREATE SOURCE TABLE
Repartition için kullanılır

Örnek
Şöyle yaparız. Burada her algılayıcı ve timestamp için partition oluşturuluyor
# Step 1 CREATE STREAM MyStream ( sensorId VARCHAR KEY, timestamp VARCHAR, value DECIMAL(19,5) ) WITH ( KAFKA_TOPIC = '<x>-telemetry', VALUE_FORMAT = 'JSON' ); # Step 2 CREATE STREAM MyStreamRepartitioned WITH (key_format='json') AS SELECT STRUCT(sensorId:=sensorId, timestamp:=timestamp) AS myStruct, VALUE from MyStream PARTITION BY STRUCT(sensorId:=sensorId, timestamp:=timestamp); # Step 3 CREATE SOURCE TABLE RepartitionedTable (myStruct struct<sensorId VARCHAR, timestamp VARCHAR> PRIMARY KEY, value VARCHAR ) WITH ( KAFKA_TOPIC='<abc>REPARTITIONED', VALUE_FORMAT='json', KEY_FORMAT='json');
Açıklaması şöyle
Note the <abc>REPARTITIONED topic name. This topic name is dynamically created by KSQL DB for the repartitioned stream in step 2.
It is visible in the list of topics in your Confluent cloud environment, so you can take it from there.

Now you can query according to the timestamp field too. Note that the query extracts the values from the JSON serving as the primary key in our source table:
Şöyle yaparız
SELECT EXTRACTJSONFIELD(myStruct -> sensorId, '$.sensorId') AS sensor, myStruct -> timestamp as time, VALUE as val FROM RepartitionedTable WHERE EXTRACTJSONFIELD(myStruct -> sensorId, '$.sensorId') = '127' AND EXTRACTJSONFIELD(myStruct -> timestamp as time) >= '12:00' EMIT CHANGES;

Hiç yorum yok:

Yorum Gönder