Giriş
gRPC servisi SpringBoot içinde de kullanılabilir. grpc-spring-boot-starter kullanılır. Bir örnek burada
1. gRPC Servisi Spring Component olarak tanımlanır
2. Bu service kullanılmak istenilen yerde @GrpcClient anotasyonu ile koda dahil edilirÖrnek
Servisi Spring component yapmak için şöyle yaparız
import io.grpc.stub.StreamObserver;import org.lognet.springboot.grpc.GRpcService;@GRpcServicepublic class UserDetailsService extends
UserDetailsServiceGrpc.UserDetailsServiceImplBase {...}
metodları için şöyle yaparız. Burada hem Unary hem de Bi-Directional Stream görülebilir.
@Overridepublic void generateRandomUser(UserDetailsRequest request,
StreamObserver<UserDetailsResponse> responseObserver) {UserDetailsResponse output = UserDetailsResponse.newBuilder()...;.setCity(...)....build();responseObserver.onNext(output);responseObserver.onCompleted();}@Overridepublic StreamObserver<UserDetailsRequest> generateRandomUserStream(
StreamObserver<UserDetailsResponse> responseObserver) {return new StreamObserver<UserDetailsRequest>() {@Overridepublic void onNext(UserDetailsRequest input) {UserDetailsResponse output = UserDetailsResponse.newBuilder().setCity(...)....build();responseObserver.onNext(output);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}
Bunu generateRandomUser() metodunu teker teker çağırmak için şöyle yaparız
stream olarak çağırmak için şöyle yaparız. Burada gRPC StreamObserver nesnesi Flux nesnesine çevriliyor.import net.devh.boot.grpc.client.inject.GrpcClient;import reactor.core.publisher.Flux;@Servicepublic class UserDetailsGrpcBlockingClient {@GrpcClient("UserDetailsService")UserDetailsServiceGrpc.UserDetailsServiceBlockingStub userDetailsServiceBlockingStub;public Flux<Object> getUserDetailsResponse(Integer range) {returnFlux.range(1, range).map(i -> UserDetailsRequest.newBuilder().setCity(...)....build()).map(i -> {UserDetailsResponse response = this.userDetailsServiceBlockingStub
.generateRandomUser(i);return (Object) Map.of(response.getId(), new UserDetails(response));}).subscribeOn(Schedulers.boundedElastic());}}
import io.grpc.stub.StreamObserver;import net.devh.boot.grpc.client.inject.GrpcClient;import reactor.core.publisher.DirectProcessor;import reactor.core.publisher.Flux;import reactor.core.publisher.FluxSink;import reactor.core.scheduler.Schedulers;@Servicepublic class UserDetailsGrpcStreamClient {@GrpcClient("UserDetailsService")private UserDetailsServiceGrpc.UserDetailsServiceStub stub;public Flux<Object> generateUserStreamResponse(Integer range){DirectProcessor<Object> processor = DirectProcessor.create();StreamObserver<UserDetailsResponse> observer =
new StreamObserverImpl(processor.sink());StreamObserver<UserDetailsRequest> inputStreamObserver =
this.stub.generateRandomUserStream(observer);return Flux.range(1, range).map(i -> UserDetailsRequest.newBuilder().setCity(...)....build()).doOnNext(inputStreamObserver::onNext).zipWith(processor, (a, b) -> b).doOnComplete(inputStreamObserver::onCompleted).subscribeOn(Schedulers.boundedElastic());}}
Çevirmek için şöyle yaparız.
class StreamObserverImpl implements StreamObserver<UserDetailsResponse> {final FluxSink<Object> sink;public StreamObserverImpl(FluxSink<Object> sink) {this.sink = sink;}@Overridepublic void onNext(UserDetailsResponse output) {this.sink.next(Map.of(output.getId(), new UserDetails(output)));}@Overridepublic void onError(Throwable throwable) {this.sink.error(throwable);}@Overridepublic void onCompleted() {this.sink.complete();}}
Hiç yorum yok:
Yorum Gönder