1 Mart 2018 Perşembe

ZeroMQ C++ API

Giriş
Şu satırı dahil ederiz.
#include <zmq.hpp>
Linklemek için şöyle yaparız.
g++ ... -lzmq
C++ API açıklaması burada.  "ZeroMQ Guide" ise burada.

ZeroMQ şu farklı mimaride kullanılabilir.
1. Senkron request + response. Request <-> Response şeklindedir.
2. Asenkron request + response. Request <-> Router <-> Response şeklindedir.
3. Publish + subscribe. Publish -> Subscribe şeklindedir.
4. Push + pull. Push -> Pull şeklindedir.
5. Exclusive pair. Pair -> Pair -> Pair şeklindedir.
6. Dealer router

ZeroMQ brokerless mimari kullandığı için RabbitMQ, ActiveMQ gibi arakatmanlardan farklıdır.

1. Senkron Request ve Response
Açıklaması şöyle. Remote Procedure Call (RPC) tarzı (gönder/sonucu al) işler için kullanılır.
sync pair of sockets.
pros: doesn't drop messages when HWM is reached.
cons: this pair of sockets is sync and blocking, it means that if REQ socket sent a message, it will wait for a reply forever and there is no reply, you can use it again only after recreating.
Socket'lerin birbirlerini nasıl bulduklarının açıklaması şöyle
All zeromq sockets implicitly have an identity associated with them. (You can obtain this identity with zmq_getsockopt().)

For bi-directional socket types, this identity is automatically transferred as part of every message sent over the socket. The REP socket uses this identity to route the response message back to the appropriate socket. This has the effect of automatic routing.

Under the hood, identities are transferred via multipart messages. The first message in a multipart message will contain the socket identity. An empty message will follow, followed by all messages specified by the user. The REQ and REP sockets deal with these prefixed messages automatically. However, if you are using XREQ or XREP sockets, you need to populate these identity messages yourself.
XREQ ve XREP Nedir ?
Açıklaması şöyle.
XREQ/XREP are aliases for ROUTER/DEALER. XREQ/XREP were used in ZeroMQ 2.x.

2. Asenkron Request ve Response
Açıklama yaz

3. Pub Ve Sub
Sub socket filtre kullanarak belirlediği mesajları işleyebilir. Açıklaması şöyle
Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don't set any subscription, you won't get any messages. It's a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if an update matches ANY subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often, but not necessarily a printable string.
Benzer bir açıklama şöyle
The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.
Ayrıca önce SUB socketi açılır ve bir müddet beklenir. Daha sonra PUB socketi açılır ve veri gönderilir. Buna "slow joiner" denilir.

4. Push Ve Pull
Push socket isteği başlatır. Pull socket cevapları oluşturur. Bu mimari load balancing kullanmaz. Açıklaması şöyle.
It's not a load balancer, this was a faulty explanation that stayed in the 0MQ docs for a while. To do load-balancing, you have to get some information back from the workers about their availability. PUSH, like DEALER, is a round-robin distributor. It's useful for its raw speed, and simplicity. You don't need any kind of chatter, just pump tasks down the pipeline and they're sprayed out to all available workers as rapidly as the network can handle them.

The pattern is useful when you are doing really high numbers of small tasks, and where workers come and go infrequently. The pattern is not good for larger tasks that take time to complete because then you want a single queue that sends new tasks only to available workers. It also suffers from an anti-pattern where if a client sends many tasks and then workers connect, the first worker will grab 1,000 or so messages while the others are still busy connecting.
Açıklaması şöyle.
async pair of sockets.
pros: no blocks, no message drops, async,
cons: no routing, so its ideal for p2p, but if you have 1-to-N connection all messages will be distributed by round robin
Klasik kullanımda 1 tane Push socket, N tane Pull socket bulunur.  N tane Pull socket istekleri düzgün bir dağılımla alırlar. Yani hepsi eşittir. Açıklaması şöyle.
Use a PUSH socket to send messages to workers, which are receiving through PULL, so the messages get uniformly distributed amongst workers. .... In the PUSH/PULL method, there's no special treatment for each worker, they're all equal.
Eğer istenirse bu durum değiştirilebilir ve 20 tane push socket, merkezi bir pull sockete veri gönderebilir.

Push ve Pull büyük bir dosyayı daha küçük parçalar halinde göndermek için de kullanılabilir ancak tek bir push socket olmalı.

5. Exclusive Pair
Açıklama yaz. Aslında bu mimari pek kullanılmıyor.

6. Dealer ve Router
Açıklaması şöyle. Router istekleri başlatır ve Dealer socket'e gönderir. Dealer cevapları toplar ve Router'a gönderir.
DEALER and ROUTER are sockets, which can allow easy scaling of REQ / REP pairs.

In direct communication, REQ and REP are talking in blocking manner.

ROUTER - accept requests - from REQ side

Router is able accepting requests, adds an envelop with information about that requestee, and makes this new message available for further processing by interconnecting code). When the response comes back (in an envelop), it can pass the response back to the requestee.

DEALER - talks to workers - on REP side

Dealer cares about workers. Note, that to make the whole solution usable, workers has to connect to the dealer, not the other way around.

DEALER also allows non blocking connection with REP.

Some connecting code passes request in an envelop to the DEALER, dealer manages distributing requests to workers (without the envelope) and later responses back to the interconnecting code (again in an envelope).
Router High Water Mark seviyesine gelirse paketleri düşürür. Açıklaması şöyle
But keep in mind that ROUTER socket can drop your messages if its HWM is reached.
Açıklaması şöyle
async pair of sockets.
pros: these socket are not blocking and you can route your messages, but
cons: it HWM of ROUTER socket is reached it will drop messages and there is no API to let you know about it.
C#
C#'ta kullanmak için ZeroMQ C# API yazısına bakabilirsiniz.

Kurulum
Linux'ta kaynak koddan derlemek için şöyle yaparız.
cd /tmp/
git clone https://github.com/zeromq/zeromq4-1.git
cd zeromq4-1/
./autogen.sh
./configure
make
make install
context_t Sınıfı
Constructor - I/O thread sayısı
Parametre olarak kaç tane I/O thread istediğimizi belirtiriz. Şöyle yaparız
zmq::context_t context (1);
Bu kod C API'sindeki şu koda benzer.
void *ctx = zmq_ctx_new ();
socket_t Sınıfı
Constructor
Response - Request
Sunucu tarafı için şöyle yaparız. Daha sonra bind() metodunu çağırmak gerekir.
zmq::socket_t socket (context, ZMQ_REP);
İstemci tarafı için şöyle yaparız.
zmq::socket_t socket (context, ZMQ_REQ);
Publish - Subcsribe
Sunucu tarafı için şöyle yaparız.
zmq::socket_t socket (context, ZMQ_PUB);
İstem tarafı için şöyle yaparız.
zmq::socket_t socket (context, ZMQ_SUB);
Sadece Publisher mesaj gönderebilir. Subscriber mesaj gönderemez!
one to many (One PUB can talk to many SUB)
Dealer - Router
Şöyle yaparız.
zmq::socket_t socket (context, ZMQ_ROUTER);
Şöyle yaparız.
zmq::socket_t socket (context, ZMQ_DEALER);
Pair
Thread'ler arası kullanılır. Hem sunucu hem de istemci de şöyle yaparız.
zmq::socket_t socket (context, ZMQ_PAIR);
Sunucu şu adresi dinler.
socket.bind("tcp://127.0.0.1:20000");
İstemci şu adrese bağlanır
socket.connect ("tcp://127.0.0.1:20000");
bind metodu
Klasik kullanımda sunucu socket tarafından kullanılır. Ancal ZeroMQ'da işler tersten de çalışabiliyor. Yani istemci bind() yapıp sunucu da connect() yapabiliyor.

1. tcp
Açıklaması şöyle.
Assigning a local address to a socket

When assigning a local address to a socket using zmq_bind() with the tcp:// transport, the endpoint shall be interpreted as an interface followed by a colon and the TCP port number to use.

An interface may be specified by either of the following:
- The wild-card *, meaning all available interfaces.
- The primary IPv4 address assigned to the interface, in its numeric representation.
- The interface name as defined by the operating system.

Interface names are not standardised in any way and should be assumed to be arbitrary and platform dependent. On Win32 platforms no short interface names exist, thus only the primary IPv4 address may be used to specify an interface.
Tcp kullanımı firewall'u tetikleyebilir.

Örnek
Transport olarak tcp kullanmak ve tüm arayüzleri dinlemek için şöyle yaparız.
socket.bind ("tcp://*:5565");
Benzer bir şekilde tüm arayüzleri dinlemek için şöyle yaparız.
socket.bind ("tcp://localhost:5565");
Eğer sadece loopback arayüzünü kullanmak istersek şöyle yaparız.
socket.bind ("tcp://127.0.0.1:5565");
Örnek
2 sunucu açmak için şöyle yaparız.
//  Prepare our context and socket
zmq::context_t context (1);

zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:6666");
zmq::socket_t socket2 (context, ZMQ_REP);
socket2.bind ("tcp://*:6661");
2. ipc
Aynı bilgisayardaki iki farklı uygulama arasında kullanılır. Açıklaması şöyle. ipc Windows'ta çalışmaz, Linux'ta çalışır.
With a normal transfer, you have to copy data into a zmq message, that has to be copied into an ipc pipe, copied out again, and copied back into a new zmq message at the receiving end.
Şöyle yaparız.
socket.bind ("ipc://...");
3. inproc
Aynı uygulama içinde thread'ler arası kullanılır. Şöyle yaparız.
socket.bind ("inproc://#1");
4. vmci
Bunun ne olduğunu bilmiyorum.

close metodu
Şöyle yaparız.
socket.close();
connect metodu
Örnek
Request istemcisini bağlamak için şöyle yaparız.
socket.connect ("tcp://localhost:5555");
Örnek
Şöyle yaparız.
zmq::socket_t socket (context, ZMQ_REQ );

std::string addr     = "tcp://127.0.0.1";
std::string req_port = "55555";

req.connect( addr + req_port );
Örnek
Publisher istemcisini bağlamak için şöyle yaparız.
zmq::context_t ctx(1);
zmq::socket_t pub (ctx, ZMQ_PUB);
pub.connect("tcp://0.0.0.0:8000");
recv metodu
Mesaj almak için şöyle yaparız.
zmq::message_t msg;
bool recieved = socket.recv (&msg);
Şöyle yaparız.
zmq::message_t msg;
bool received = socket.recv(&msg, ZMQ_DONTWAIT);
if(received) {...}
Veriyi string'e çevirmek için şöyle yaparız.
std::string str (static_cast<char*>(msg.data()),msg.size());
Bu kod C API'sindeki şu koda benzer.
char buffer[256];
//  Wait for next request from client
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1) {
  ...error...
}
send metodu - msg
Mesaj göndermek içindir.
Örnek
şöyle yaparız.
zmq::message_t msg = ...;
socket.send (msg);
Örnek
Şöyle yaparız.
zmq::message_t msg (10);

memcpy(msg.data(), &mydata, 10);
socket.send(msg);
send metodu - msg + option
Örnek
Şöyle yaparız.
zmq::message_t msg = ...;

socket.send(msg, ZMQ_SNDMORE);
setsockopt metodu
Şöyle yaparız.
int linger = 0;// ms
socket.setsockopt (ZMQ_LINGER, &linger, sizeof(linger));
Şöyle yaparız.
socket.setsockopt (ZMQ_IDENTITY, "Hello", 5);
Subscribe socketlerde şöyle yaparız.
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
Subscribe socketlerde şöyle yaparız.
int confl = 1;
socket.setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
message_t Sınıfı
message_t Sınıfı yazısına taşıdım.

Hiç yorum yok:

Yorum Gönder