Giriş
1. gRPC Servisi Spring Component olarak tanımlanır
2. Bu service kullanılmak istenilen yerde
@GrpcClient anotasyonu ile koda dahil edilir
Örnek
Servisi Spring component yapmak için şöyle
yaparızimport io.grpc.stub.StreamObserver;
import org.lognet.springboot.grpc.GRpcService;
@GRpcService
public class UserDetailsService extends
UserDetailsServiceGrpc.UserDetailsServiceImplBase {
...
}
metodları için şöyle
yaparız. Burada hem Unary hem de Bi-Directional Stream görülebilir.
@Override
public void generateRandomUser(UserDetailsRequest request,
StreamObserver<UserDetailsResponse> responseObserver) {
UserDetailsResponse output = UserDetailsResponse.newBuilder()...;
.setCity(...)
...
.build();
responseObserver.onNext(output);
responseObserver.onCompleted();
}
@Override
public StreamObserver<UserDetailsRequest> generateRandomUserStream(
StreamObserver<UserDetailsResponse> responseObserver) {
return new StreamObserver<UserDetailsRequest>() {
@Override
public void onNext(UserDetailsRequest input) {
UserDetailsResponse output = UserDetailsResponse.newBuilder()
.setCity(...)
...
.build();
responseObserver.onNext(output);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
Bunu generateRandomUser() metodunu teker teker çağırmak için şöyle
yaparızimport net.devh.boot.grpc.client.inject.GrpcClient;
import reactor.core.publisher.Flux;
@Service
public class UserDetailsGrpcBlockingClient {
@GrpcClient("UserDetailsService")
UserDetailsServiceGrpc.UserDetailsServiceBlockingStub userDetailsServiceBlockingStub;
public Flux<Object> getUserDetailsResponse(Integer range) {
return
Flux.range(1, range)
.map(i -> UserDetailsRequest.newBuilder()
.setCity(...)
...
.build())
.map(i -> {
UserDetailsResponse response = this.userDetailsServiceBlockingStub
.generateRandomUser(i);
return (Object) Map.of(response.getId(), new UserDetails(response));
})
.subscribeOn(Schedulers.boundedElastic());
}
}
stream olarak çağırmak için şöyle
yaparız. Burada gRPC StreamObserver nesnesi Flux nesnesine çevriliyor.
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.client.inject.GrpcClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
@Service
public class UserDetailsGrpcStreamClient {
@GrpcClient("UserDetailsService")
private UserDetailsServiceGrpc.UserDetailsServiceStub stub;
public Flux<Object> generateUserStreamResponse(Integer range){
DirectProcessor<Object> processor = DirectProcessor.create();
StreamObserver<UserDetailsResponse> observer =
new StreamObserverImpl(processor.sink());
StreamObserver<UserDetailsRequest> inputStreamObserver =
this.stub.generateRandomUserStream(observer);
return Flux.range(1, range)
.map(i -> UserDetailsRequest.newBuilder()
.setCity(...)
...
.build())
.doOnNext(inputStreamObserver::onNext)
.zipWith(processor, (a, b) -> b)
.doOnComplete(inputStreamObserver::onCompleted)
.subscribeOn(Schedulers.boundedElastic());
}
}
class StreamObserverImpl implements StreamObserver<UserDetailsResponse> {
final FluxSink<Object> sink;
public StreamObserverImpl(FluxSink<Object> sink) {
this.sink = sink;
}
@Override
public void onNext(UserDetailsResponse output) {
this.sink.next(Map.of(output.getId(), new UserDetails(output)));
}
@Override
public void onError(Throwable throwable) {
this.sink.error(throwable);
}
@Override
public void onCompleted() {
this.sink.complete();
}
}