From f3bd7a477db89bb26d4127c6c0570c7e0f007f0d Mon Sep 17 00:00:00 2001 From: Lisa Carey Date: Thu, 26 Feb 2015 17:06:14 +0000 Subject: [PATCH] Added client implementation details --- java/javatutorial.md | 132 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/java/javatutorial.md b/java/javatutorial.md index d4f3216d464..1a79f31dd53 100644 --- a/java/javatutorial.md +++ b/java/javatutorial.md @@ -106,7 +106,9 @@ For simplicity, we've provided a [Gradle build file](https://github.com/grpc/grp which actually runs: -[actual command] +```shell +protoc -I examples/src/main/proto -I examples/build/extracted-protos/main --java_out=examples/build/generated-sources/main --java_plugin_out=examples/build/generated-sources/main --plugin=protoc-gen-java_plugin=compiler/build/binaries/java_pluginExecutable/java_plugin examples/src/main/proto/route_guide.proto +``` Running this command generates the following files: - `RouteGuideOuterClass.java`, which contains all the protocol buffer code to populate, serialize, and retrieve our request and response message types @@ -330,6 +332,8 @@ First we need to create a gRPC *channel* for our stub, specifying the server add .build(); ``` +As with our server, we're using the [Netty](http://netty.io/) transport framework, so we use a `NettyChannelBuilder`. + Now we can use the channel to create our stubs using the `newStub` and `newBlockingStub` methods provided in the `RouteGuideGrpc` class we generated from our .proto. ```java @@ -343,17 +347,143 @@ Now let's look at how we call our service methods. #### Simple RPC +Calling the simple RPC `GetFeature` on the blocking stub is as straightforward as calling a local method. + +```java + Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build(); + Feature feature = blockingStub.getFeature(request); +``` +We create and populate a request protocol buffer object (in our case `Point`), pass it to the `getFeature()` method on our blocking stub, and get back a `Feature`. #### Server-side streaming RPC +Next, let's look at a server-side streaming call to `ListFeatures`, which returns a stream of geographical `Feature`s: +```java + Rectangle request = + Rectangle.newBuilder() + .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) + .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build(); + Iterator features = blockingStub.listFeatures(request); +``` + +As you can see, it's very similar to the simple RPC we just looked at, except instead of returning a single `Feature`, the method returns an `Iterator` that the client can use to read all the returned `Feature`s. #### Client-side streaming RPC +Now for something a little more complicated: the client-side streaming method `RecordRoute`, where we send a stream of `Point`s to the server and get back a single `RouteSummary`. For this method we need to use the asynchronous stub. If you've already read [Creating the server](#server) some of this may look very familiar - asynchronous streaming RPCs are implemented in a similar way on both sides. + +```java + public void recordRoute(List features, int numPoints) throws Exception { + info("*** RecordRoute"); + final SettableFuture finishFuture = SettableFuture.create(); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onValue(RouteSummary summary) { + info("Finished trip with {0} points. Passed {1} features. " + + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(), + summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime()); + } + + @Override + public void onError(Throwable t) { + finishFuture.setException(t); + } + + @Override + public void onCompleted() { + finishFuture.set(null); + } + }; + + StreamObserver requestObserver = asyncStub.recordRoute(responseObserver); + try { + // Send numPoints points randomly selected from the features list. + StringBuilder numMsg = new StringBuilder(); + Random rand = new Random(); + for (int i = 0; i < numPoints; ++i) { + int index = rand.nextInt(features.size()); + Point point = features.get(index).getLocation(); + info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point), + RouteGuideUtil.getLongitude(point)); + requestObserver.onValue(point); + // Sleep for a bit before sending the next one. + Thread.sleep(rand.nextInt(1000) + 500); + if (finishFuture.isDone()) { + break; + } + } + info(numMsg.toString()); + requestObserver.onCompleted(); + + finishFuture.get(); + info("Finished RecordRoute"); + } catch (Exception e) { + requestObserver.onError(e); + logger.log(Level.WARNING, "RecordRoute Failed", e); + throw e; + } + } +``` + +As you can see, to call this method we need to create a `StreamObserver`, which implements a special interface for the server to call with its `RouteSummary` response. In our `StreamObserver` we: +- Override the `onValue()` method to print out the returned information when the server writes a `RouteSummary` to the message stream. +- Override the `onCompleted()` method (called when the *server* has completed the call on its side) to set a `SettableFuture` that we can check to see if the server has finished writing. + +We then pass the `StreamObserver` to the asynchronous stub's `recordRoute()` method and get back our own `StreamObserver` request observer to write our `Point`s to send to the server. Once we've finished writing points, we use the request observer's `onCompleted()` method to tell gRPC that we've finished writing on the client side. Once we're done, we check our `SettableFuture` to check that the server has completed on its side. #### Bidirectional streaming RPC +Finally, let's look at our bidirectional streaming RPC `RouteChat()`. + +```java + public void routeChat() throws Exception { + info("*** RoutChat"); + final SettableFuture finishFuture = SettableFuture.create(); + StreamObserver requestObserver = + asyncStub.routeChat(new StreamObserver() { + @Override + public void onValue(RouteNote note) { + info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() + .getLatitude(), note.getLocation().getLongitude()); + } + + @Override + public void onError(Throwable t) { + finishFuture.setException(t); + } + + @Override + public void onCompleted() { + finishFuture.set(null); + } + }); + + try { + RouteNote[] requests = + {newNote("First message", 0, 0), newNote("Second message", 0, 1), + newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)}; + + for (RouteNote request : requests) { + info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() + .getLatitude(), request.getLocation().getLongitude()); + requestObserver.onValue(request); + } + requestObserver.onCompleted(); + + finishFuture.get(); + info("Finished RouteChat"); + } catch (Exception t) { + requestObserver.onError(t); + logger.log(Level.WARNING, "RouteChat Failed", t); + throw t; + } + } +``` + +As with our client-side streaming example, we both get and return a `StreamObserver` response observer, except this time we send values via our method's response observer while the server is still writing messages to *their* message stream. The syntax for reading and writing here is exactly the same as for our client-streaming method. Although each side will always get the other's messages in the order they were written, both the client and server can read and write in any order — the streams operate completely independently. + ## Try it out!