11 Şubat 2021 Perşembe

Apache Kafka ksqlDB

Pull Queries
Açıklaması şöylePull Queries normal SQL cümlesi gibidir tek seferlik sonuç döndürür.
A pull query is a form of query issued by a client that retrieves a result as of “now”, like a query against a traditional RDBMS. Pull queries enable you to fetch the current state of a materialized view. Because materialized views are incrementally updated as new events arrive.
kSQL ile Kafka Topic'ten farklı olarak update işlemi yapılabilir. Açıklaması şöyle
Kafka does not allow the events to be updated, so we cannot mark the top event as failed or associate any metadata with it.
Removing the event and queueing it back (at the end) doesn’t work either, as this will break the ordering constraints — event #3 will end up before event #1,...
kSQL ile işlem yapmak kodla çalışmaktan daha kolay. 

kSQL CLI

WINDOW TUMBLING - Aggregation
Örnek
Şöyle yaparız
CREATE TABLE anomaly_detection AS
  SELECT temperature_spike_id, COUNT(*) AS total_spikes,
  AVG(temperature) AS avg_temperature
  FROM sensor_data
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY temperature_spike_id
  EMIT CHANGES;
Açıklaması şöyle
A one-hour sliding window continuously aggregates the temperature spikes from sensors.
Örnek
Şöyle yaparız
SELECT symbol, SUM(shares) FROM stock_txn_stream
WINDOW TUMBLING(SIZE 10 SECONDS)
GROUP BY symboll;
Bunu kodla yapsak şöyle olur
KStream<String,StockTransaction> stream = ...;

Aggregator<String,StockTransaction,Integer> aggregator =
 (k,v,i) -> v.getShares() + i;

stream.groupByKey()
  .windowedBy(TimeWindows.of(10_000))
  .aggregate(()->0,aggregator,
           Materialized.<String,Integer,WindowStore<Bytes,byte[]>> as("SharesPerPeriod")
           .withKeySerde(stringSerde)
           .withValueSerde(Serdes.Integer()))
  .toStream()
  .peek((k,v) -> ...);
PRINT
Şöyle yaparız
PRINT 'pg_dev.public.mini_ticker' FROM BEGINNING;
SHOW QUERIES
Örnek
Şöyle yaparız
ksql> SHOW QUERIES;

 Query ID              | Query Type | Status    | Sink Name      | Sink Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------
 CSAS_SHIPPED_ORDERS_0 | PERSISTENT | RUNNING:1 | SHIPPED_ORDERS | SHIPPED_ORDERS   | CREATE STREAM SHIPPED_ORDERS WITH 
 ...
Şöyle yaparız
ksql> EXPLAIN CSAS_SHIPPED_ORDERS_0;
SHOW TOPICS
Örnek
Şöyle yaparız
ksql> show topics;
Kafka Topic                   | Partitions | Partition Replicas
-----------------------------------------------------------------
dbserver1                     | 1          | 1
dbserver1.inventory.customers | 1          | 1
dbserver1.inventory.orders    | 1          | 1
dbserver1.inventory.products  | 1          | 1
default_ksql_processing_log   | 1          | 1
my_connect_configs            | 1          | 1
my_connect_offsets            | 25         | 1
my_connect_statuses           | 5          | 1
schema-changes.inventory      | 1          | 1
-----------------------------------------------------------------
Örnek
Şöyle yaparız
> docker exec -it ksqldb ksql http://localhost:8088


> show topics;
Kafka Topic                    | Partitions | Partition Replicas
------------------------------------------------------------------
crypto-ksqlksql_processing_log | 1          | 1
docker-connect-configs         | 1          | 1
docker-connect-offsets         | 25         | 1
docker-connect-status          | 5          | 1
------------------------------------------------------------------

2 yorum:

  1. Hocam elinize saglik ama nacizane , sanki bu aciklamalari kendiniz icin not almissiniz gibi. Ama yinede tesekkurler.

    YanıtlaSil