8 Haziran 2021 Salı

gRPC Client

Giriş
- Önce bir tane io.grpc.ManagedChannel yaratılır. Açıklaması şöyle
ManagedChannelBuilder is an abstract class and there are concrete implementations like NettyChannelBuilder and OkHttpChannelBuilder. When you use an API like ManagedChannelBuilder.forTarget() or Grpc.newChannelBuilder() gRPC finds an appropriate concrete implementation based on your platform and returns it (normally OkHttp on Android; Netty otherwise).
Daha sonra bir sub yaratılır. Blocking ve Non-blocking olarak iki tane stub çeşidi var. Açıklaması şöyle
gRPC supports two types of client stubs:
1. blocking/synchronous stub: in this stub, the RPC call waits for the server to respond.
2. non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.
Blocking Stub
- ManagedChannel kullanılarak MyService.newBlockingStub(channel) çağrısı ile bir stub yaratılır. Stub servis çağrılarını içerir. Bu servis çağrılarını kullanarak gRPC kullanılır

- İşimiz bitince channel().awaitTermination() çağrısıyla kanal kapatılır

Non-blocking Stub With Callback
ManagedChannel kullanılarak MyService.newStub(channel) çağrısı ile bir stub yaratılır. Stub'a StreamObserver arayüzünü gerçekleştiren bir tane callback takılır. Callback gRPC'nin kendi threadleri üzerinde gerçekleşir. Açıklaması şöyle
For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. 
Eğer kendi threadlerimizi kullanmak istersek bunu ManagedChannel seviyesinde yapmak gerekir. Şöyle yaparız
var executorService = Executors.newFixedThreadPool(10); 
var managedChannel = ManagedChannelBuilder.forAddress(host,port).executor(executorService)
  .usePlaintext() .build();
Non-blocking Stub With Future
Şöyle yaparız
// Create a new future stub 
var productServiceFutureStub = ProductServiceGrpc.newFutureStub(managedChannel);

var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build(); 
ListenableFuture<GetProductResponse> listenableFuture = productServiceFutureStub
  .getProduct(productRequest); 
Futures.addCallback(listenableFuture, new ProductCallback(), fixedThreadPool);

//or if you want Runnable
listenableFuture.addListener(this::notifyListener, fixedThreadPool);
Örnek - Blocking Stub
Şöyle yaparız
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import io.octutorials.ocgrpc.Defs.Payload;
import io.octutorials.ocgrpc.FetchGrpc;

import com.google.protobuf.ByteString;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;

public class TutorialClient {
  private final ManagedChannel channel;
  private final FetchGrpc.FetchBlockingStub stub;

  public TutorialClient(String serverHost, int serverPort) {
    this.channel = ManagedChannelBuilder.forAddress(serverHost, serverPort)
      .usePlaintext(true)
      .build();
    this.stub = FetchGrpc.newBlockingStub(this.channel);
  }

  public void shutdown() throws InterruptedException {
    this.channel.shutdown().awaitTermination(4, TimeUnit.SECONDS);
  }

  public String capitalize(String data) throws Exception {
    ByteString bs = ByteString.copyFrom(data.getBytes("UTF8"));
    Payload in = Payload.newBuilder().setData(bs).build();
    Payload out = this.stub.capitalize(in);
    return out.getData().toString("UTF8");
  }
}
Örnek - Blocking Stub
Şöyle yaparız
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build(); var productResponse = productServiceBlockingStub.getProduct(productRequest);
Örnek - Non-blocking Stub
Şöyle yaparız
var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build(); productServiceAsyncStub.getProduct(productRequest, new ProductCallback()); class ProductCallback implements StreamObserver<GetProductResponse> { @Override public void onNext(GetProductResponse value) { log.info("Received product, {}", value); } @Override public void onError(Throwable cause) { log.error("Error occurred, cause {}", cause.getMessage()); } @Override public void onCompleted() { log.info("Completed"); } }
b

Hiç yorum yok:

Yorum Gönder