25 Haziran 2021 Cuma

gRPC SpringBoot Service

Giriş
gRPC servisi SpringBoot içinde de kullanılabilir. grpc-spring-boot-starter kullanılır. Bir örnek burada
1. gRPC Servisi Spring Component olarak tanımlanır
2. Bu service kullanılmak istenilen yerde @GrpcClient anotasyonu ile koda dahil edilir

Örnek
Servisi Spring component yapmak için şöyle yaparız
import io.grpc.stub.StreamObserver;
import org.lognet.springboot.grpc.GRpcService;

@GRpcService
public class UserDetailsService extends
UserDetailsServiceGrpc.UserDetailsServiceImplBase {
...
}
metodları için şöyle yaparız. Burada hem Unary hem de Bi-Directional Stream görülebilir.
@Override
public void generateRandomUser(UserDetailsRequest request,
StreamObserver<UserDetailsResponse> responseObserver) {
  UserDetailsResponse output = UserDetailsResponse.newBuilder()...;
    .setCity(...)
    ...
    .build();
  responseObserver.onNext(output);
  responseObserver.onCompleted();
}

@Override
public StreamObserver<UserDetailsRequest> generateRandomUserStream(
StreamObserver<UserDetailsResponse> responseObserver) {
  return new StreamObserver<UserDetailsRequest>() {
    @Override
    public void onNext(UserDetailsRequest input) {
      UserDetailsResponse output = UserDetailsResponse.newBuilder()
.setCity(...)
        ...
        .build();
responseObserver.onNext(output);
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}
Bunu  generateRandomUser() metodunu teker teker çağırmak için şöyle yaparız
import net.devh.boot.grpc.client.inject.GrpcClient;
import reactor.core.publisher.Flux;
@Service
public class UserDetailsGrpcBlockingClient {

  @GrpcClient("UserDetailsService")
  UserDetailsServiceGrpc.UserDetailsServiceBlockingStub userDetailsServiceBlockingStub;

  public Flux<Object> getUserDetailsResponse(Integer range) {
    return
      Flux.range(1, range)
.map(i -> UserDetailsRequest.newBuilder()
                    .setCity(...)
            ...
            .build())
.map(i -> {
  UserDetailsResponse response = this.userDetailsServiceBlockingStub
.generateRandomUser(i);
  return (Object) Map.of(response.getId(), new UserDetails(response));
         })
       .subscribeOn(Schedulers.boundedElastic());
  }
}
stream olarak çağırmak için şöyle yaparız. Burada gRPC StreamObserver nesnesi Flux nesnesine çevriliyor.
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.client.inject.GrpcClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

@Service
public class UserDetailsGrpcStreamClient {

  @GrpcClient("UserDetailsService")
  private UserDetailsServiceGrpc.UserDetailsServiceStub stub;

  public Flux<Object> generateUserStreamResponse(Integer range){
    DirectProcessor<Object> processor = DirectProcessor.create();
    StreamObserver<UserDetailsResponse> observer =
new StreamObserverImpl(processor.sink());
    StreamObserver<UserDetailsRequest> inputStreamObserver =
this.stub.generateRandomUserStream(observer);
    return Flux.range(1, range)
      .map(i -> UserDetailsRequest.newBuilder()
        .setCity(...)
        ...
        .build())
      .doOnNext(inputStreamObserver::onNext)
      .zipWith(processor, (a, b) -> b)
      .doOnComplete(inputStreamObserver::onCompleted)
      .subscribeOn(Schedulers.boundedElastic());
  }
}
Çevirmek için şöyle yaparız.
class StreamObserverImpl implements StreamObserver<UserDetailsResponse> {

  final FluxSink<Object> sink;

  public StreamObserverImpl(FluxSink<Object> sink) {
    this.sink = sink;
  }

  @Override
  public void onNext(UserDetailsResponse output) {
  this.sink.next(Map.of(output.getId(), new UserDetails(output)));
  }

  @Override
  public void onError(Throwable throwable) {
    this.sink.error(throwable);
  }

  @Override
  public void onCompleted() {
    this.sink.complete();
  }
}

Hiç yorum yok:

Yorum Gönder