Merge github.com:grpc/grpc into credit

Conflicts:
	examples/pubsub/publisher_test.cc
	examples/pubsub/subscriber_test.cc
	include/grpc++/create_channel.h
	src/cpp/client/create_channel.cc
	test/cpp/end2end/async_end2end_test.cc
	test/cpp/end2end/end2end_test.cc
	test/cpp/util/create_test_channel.cc
changes/55/217555/1
Craig Tiller 10 years ago
commit 25f6cd7a97
  1. 4
      include/grpc/support/atm.h
  2. 2
      include/grpc/support/sync.h
  3. 1
      include/grpc/support/sync_posix.h
  4. 1
      include/grpc/support/sync_win32.h
  5. 2
      include/grpc/support/time.h
  6. 169
      src/compiler/cpp_generator.cc
  7. 2
      src/core/support/log_win32.c
  8. 2
      src/core/support/string_posix.c
  9. 4
      src/core/support/sync.c
  10. 14
      src/core/support/time.c
  11. 6
      src/cpp/server/thread_pool.cc
  12. 2
      src/node/examples/route_guide_server.js
  13. 2
      src/python/src/grpc/_adapter/rear.py
  14. 8
      src/python/src/grpc/framework/assembly/implementations.py
  15. 23
      src/python/src/grpc/framework/assembly/interfaces.py
  16. 8
      test/core/support/time_test.c
  17. 2
      test/cpp/end2end/async_end2end_test.cc
  18. 24
      test/cpp/end2end/end2end_test.cc
  19. 3
      tools/dockerfile/grpc_go/Dockerfile
  20. 30
      tools/gce_setup/grpc_docker.sh
  21. 3
      tools/gce_setup/shared_startup_funcs.sh

@ -51,12 +51,12 @@
The routines may be implemented as macros.
// Atomic operations acton an intergral_type gpr_atm that is guaranteed to
// Atomic operations act on an intergral_type gpr_atm that is guaranteed to
// be the same size as a pointer.
typedef gpr_intptr gpr_atm;
// A memory barrier, providing both acquire and release semantics, but not
// otherwise acting no memory.
// otherwise acting on memory.
void gpr_atm_full_barrier(void);
// Atomically return *p, with acquire semantics.

@ -206,7 +206,7 @@ void *gpr_event_cancellable_wait(gpr_event *ev, gpr_timespec abs_deadline,
/* --- Reference counting ---
These calls act on the type gpr_refcount. It requires no desctruction. */
These calls act on the type gpr_refcount. It requires no destruction. */
/* Initialize *r to value n. */
void gpr_ref_init(gpr_refcount *r, int n);

@ -36,7 +36,6 @@
#include <grpc/support/sync_generic.h>
/* Posix variant of gpr_sync_platform.h */
#include <pthread.h>
typedef pthread_mutex_t gpr_mu;

@ -36,7 +36,6 @@
#include <grpc/support/sync_generic.h>
/* Win32 variant of gpr_sync_platform.h */
#include <windows.h>
typedef struct {

@ -76,7 +76,7 @@ gpr_timespec gpr_time_min(gpr_timespec a, gpr_timespec b);
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b);
gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b);
/* Return a timespec representing a given number of microseconds. LONG_MIN is
/* Return a timespec representing a given number of time units. LONG_MIN is
interpreted as gpr_inf_past, and LONG_MAX as gpr_inf_future. */
gpr_timespec gpr_time_from_micros(long x);
gpr_timespec gpr_time_from_nanos(long x);

@ -183,34 +183,40 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
printer->Print(*vars,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
"$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* $Method$("
"::grpc::ClientContext* context, $Response$* response);\n");
printer->Print(*vars,
"::grpc::ClientAsyncWriter< $Request$>* $Method$("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag);\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientWriter< $Request$>> $Method$("
"::grpc::ClientContext* context, $Response$* response);\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>> $Method$("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientReader< $Response$>* $Method$("
"std::unique_ptr< ::grpc::ClientReader< $Response$>> $Method$("
"::grpc::ClientContext* context, const $Request$& request);\n");
printer->Print(*vars,
"::grpc::ClientAsyncReader< $Response$>* $Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> $Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context);\n");
printer->Print(*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$(::grpc::ClientContext* context);\n");
printer->Print(*vars,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag);\n");
}
@ -309,7 +315,8 @@ void PrintHeaderService(google::protobuf::io::Printer *printer,
printer->Outdent();
printer->Print("};\n");
printer->Print(
"static Stub* NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& "
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
"::grpc::ChannelInterface>& "
"channel);\n");
printer->Print("\n");
@ -380,91 +387,101 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, response);\n"
"}\n\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ::grpc::ClientAsyncResponseReader< $Response$>("
" return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>(new "
"::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, tag);\n"
"context, request, tag));\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientWriter< $Request$>* $Service$::Stub::$Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return new ::grpc::ClientWriter< $Request$>("
"std::unique_ptr< ::grpc::ClientWriter< $Request$>> "
"$Service$::Stub::$Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientWriter< "
"$Request$>>(new ::grpc::ClientWriter< $Request$>("
"channel(),"
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, response);\n"
"context, response));\n"
"}\n\n");
printer->Print(
*vars,
"::grpc::ClientAsyncWriter< $Request$>* $Service$::Stub::$Method$("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ::grpc::ClientAsyncWriter< $Request$>("
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>> "
"$Service$::Stub::$Method$("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncWriter< "
"$Request$>>(new ::grpc::ClientAsyncWriter< $Request$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, response, tag);\n"
"context, response, tag));\n"
"}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientReader< $Response$>* $Service$::Stub::$Method$("
"std::unique_ptr< ::grpc::ClientReader< $Response$>> "
"$Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$& request) {\n");
printer->Print(*vars,
" return new ::grpc::ClientReader< $Response$>("
" return std::unique_ptr< ::grpc::ClientReader< "
"$Response$>>(new ::grpc::ClientReader< $Response$>("
"channel(),"
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request);\n"
"context, request));\n"
"}\n\n");
printer->Print(
*vars,
"::grpc::ClientAsyncReader< $Response$>* $Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ::grpc::ClientAsyncReader< $Response$>("
"std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> "
"$Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncReader< "
"$Response$>>(new ::grpc::ClientAsyncReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request, tag);\n"
"context, request, tag));\n"
"}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> "
"$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n");
printer->Print(
*vars,
" return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
"channel(),"
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context);\n"
"}\n\n");
printer->Print(
*vars,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(
*vars,
" return new ::grpc::ClientAsyncReaderWriter< $Request$, $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context, tag);\n"
"}\n\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientReaderWriter< "
"$Request$, $Response$>>(new ::grpc::ClientReaderWriter< "
"$Request$, $Response$>("
"channel(),"
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context));\n"
"}\n\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>>(new "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context, tag));\n"
"}\n\n");
}
}
@ -587,9 +604,9 @@ void PrintSourceService(google::protobuf::io::Printer *printer,
printer->Print(
*vars,
"$Service$::Stub* $Service$::NewStub("
"std::unique_ptr< $Service$::Stub> $Service$::NewStub("
"const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
" $Service$::Stub* stub = new $Service$::Stub();\n"
" std::unique_ptr< $Service$::Stub> stub(new $Service$::Stub());\n"
" stub->set_channel(channel);\n"
" return stub;\n"
"};\n\n");

@ -90,7 +90,7 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
fprintf(stderr, "%s%s.%09u %5u %s:%d: %s\n",
fprintf(stderr, "%s%s.%09u %5u %s:%d] %s\n",
gpr_log_severity_string(args->severity), time_buffer,
(int)(now.tv_nsec), GetCurrentThreadId(),
args->file, args->line, args->message);

@ -51,7 +51,7 @@ int gpr_asprintf(char **strp, const char *format, ...) {
va_start(args, format);
ret = vsnprintf(buf, sizeof(buf), format, args);
va_end(args);
if (!(0 <= ret)) {
if (ret < 0) {
*strp = NULL;
return -1;
}

@ -41,7 +41,7 @@
Should be a prime. */
enum { event_sync_partitions = 31 };
/* Event are partitioned by address to avoid lock contention. */
/* Events are partitioned by address to avoid lock contention. */
static struct sync_array_s {
gpr_mu mu;
gpr_cv cv;
@ -71,10 +71,10 @@ void gpr_event_set(gpr_event *ev, void *value) {
struct sync_array_s *s = hash(ev);
gpr_mu_lock(&s->mu);
GPR_ASSERT(gpr_atm_acq_load(&ev->state) == 0);
GPR_ASSERT(value != NULL);
gpr_atm_rel_store(&ev->state, (gpr_atm)value);
gpr_cv_broadcast(&s->cv);
gpr_mu_unlock(&s->mu);
GPR_ASSERT(value != NULL);
}
void *gpr_event_get(gpr_event *ev) {

@ -85,12 +85,12 @@ gpr_timespec gpr_time_from_nanos(long ns) {
} else if (ns == LONG_MIN) {
result = gpr_inf_past;
} else if (ns >= 0) {
result.tv_sec = ns / 1000000000;
result.tv_nsec = ns - result.tv_sec * 1000000000;
result.tv_sec = ns / GPR_NS_PER_SEC;
result.tv_nsec = ns - result.tv_sec * GPR_NS_PER_SEC;
} else {
/* Calculation carefully formulated to avoid any possible under/overflow. */
result.tv_sec = (-(999999999 - (ns + 1000000000)) / 1000000000) - 1;
result.tv_nsec = ns - result.tv_sec * 1000000000;
result.tv_sec = (-(999999999 - (ns + GPR_NS_PER_SEC)) / GPR_NS_PER_SEC) - 1;
result.tv_nsec = ns - result.tv_sec * GPR_NS_PER_SEC;
}
return result;
}
@ -172,8 +172,8 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) {
gpr_timespec sum;
int inc = 0;
sum.tv_nsec = a.tv_nsec + b.tv_nsec;
if (sum.tv_nsec >= 1000000000) {
sum.tv_nsec -= 1000000000;
if (sum.tv_nsec >= GPR_NS_PER_SEC) {
sum.tv_nsec -= GPR_NS_PER_SEC;
inc++;
}
if (a.tv_sec == TYPE_MAX(time_t) || a.tv_sec == TYPE_MIN(time_t)) {
@ -200,7 +200,7 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) {
int dec = 0;
diff.tv_nsec = a.tv_nsec - b.tv_nsec;
if (diff.tv_nsec < 0) {
diff.tv_nsec += 1000000000;
diff.tv_nsec += GPR_NS_PER_SEC;
dec++;
}
if (a.tv_sec == TYPE_MAX(time_t) || a.tv_sec == TYPE_MIN(time_t)) {

@ -37,11 +37,11 @@ namespace grpc {
ThreadPool::ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; i++) {
threads_.push_back(std::thread([=]() {
threads_.push_back(std::thread([this]() {
for (;;) {
std::unique_lock<std::mutex> lock(mu_);
// Wait until work is available or we are shutting down.
auto have_work = [=]() { return shutdown_ || !callbacks_.empty(); };
auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); };
std::unique_lock<std::mutex> lock(mu_);
if (!have_work()) {
cv_.wait(lock, have_work);
}

@ -51,7 +51,7 @@ var COORD_FACTOR = 1e7;
var feature_list = [];
/**
* Get a feature object at the given point, or creates one if it does not exist.
* Get a feature object at the given point.
* @param {point} point The point to check
* @return {feature} The feature object at the point. Note that an empty name
* indicates no feature

@ -382,6 +382,8 @@ class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
def join_fore_link(self, fore_link):
with self._lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
if self._rear_link is not None:
self._rear_link.join_fore_link(self._fore_link)
def _start(self):
with self._lock:

@ -195,7 +195,7 @@ def _servicer(implementations, pool):
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods)
class _ServiceAssembly(activated.Activated):
class _ServiceAssembly(interfaces.Server):
def __init__(self, implementations, fore_link):
self._implementations = implementations
@ -237,6 +237,10 @@ class _ServiceAssembly(activated.Activated):
def stop(self):
self._stop()
def port(self):
with self._lock:
return self._fore_link.port()
def assemble_face_stub(activated_rear_link):
"""Assembles a face_interfaces.Stub.
@ -300,6 +304,6 @@ def assemble_service(implementations, activated_fore_link):
when passed to this method.
Returns:
An activated.Activated value encapsulating RPC service.
An interfaces.Server encapsulating RPC service.
"""
return _ServiceAssembly(implementations, activated_fore_link)

@ -37,6 +37,7 @@ import abc
# module.
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.common import style # pylint: disable=unused-import
from grpc.framework.foundation import activated
from grpc.framework.foundation import stream # pylint: disable=unused-import
@ -89,3 +90,25 @@ class MethodImplementation(object):
style.Service.EVENT.
"""
__metaclass__ = abc.ABCMeta
class Server(activated.Activated):
"""The server interface.
Aside from being able to be activated and deactivated, objects of this type
are able to report the port on which they are servicing RPCs.
"""
__metaclass__ = abc.ABCMeta
# TODO(issue 726): This is an abstraction violation; not every Server is
# necessarily serving over a network at all.
@abc.abstractmethod
def port(self):
"""Identifies the port on which this Server is servicing RPCs.
This method may only be called while the server is active.
Returns:
The number of the port on which this Server is servicing RPCs.
"""
raise NotImplementedError()

@ -81,7 +81,7 @@ static void ts_to_s(gpr_timespec t,
void *arg) {
if (t.tv_sec < 0 && t.tv_nsec != 0) {
t.tv_sec++;
t.tv_nsec = 1000000000 - t.tv_nsec;
t.tv_nsec = GPR_NS_PER_SEC - t.tv_nsec;
}
i_to_s(t.tv_sec, 10, 0, writer, arg);
(*writer)(arg, ".", 1);
@ -127,15 +127,15 @@ static void test_values(void) {
/* Test possible overflow in conversion of -ve values. */
x = gpr_time_from_micros(-(LONG_MAX - 999997));
GPR_ASSERT(x.tv_sec < 0);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < 1000000000);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < GPR_NS_PER_SEC);
x = gpr_time_from_nanos(-(LONG_MAX - 999999997));
GPR_ASSERT(x.tv_sec < 0);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < 1000000000);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < GPR_NS_PER_SEC);
x = gpr_time_from_millis(-(LONG_MAX - 997));
GPR_ASSERT(x.tv_sec < 0);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < 1000000000);
GPR_ASSERT(x.tv_nsec >= 0 && x.tv_nsec < GPR_NS_PER_SEC);
/* Test general -ve values. */
for (i = -1; i > -1000 * 1000 * 1000; i *= 7) {

@ -105,7 +105,7 @@ class AsyncEnd2endTest : public ::testing::Test {
void ResetStub() {
std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
stub_.reset(grpc::cpp::test::util::TestService::NewStub(channel));
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
void server_ok(int i) { verify_ok(&srv_cq_, i, true); }

@ -163,7 +163,7 @@ class End2endTest : public ::testing::Test {
void ResetStub() {
std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
stub_.reset(grpc::cpp::test::util::TestService::NewStub(channel));
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
@ -293,15 +293,13 @@ TEST_F(End2endTest, RequestStreamOneRequest) {
EchoResponse response;
ClientContext context;
ClientWriter<EchoRequest>* stream = stub_->RequestStream(&context, &response);
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
Status s = stream->Finish();
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
delete stream;
}
TEST_F(End2endTest, RequestStreamTwoRequests) {
@ -310,7 +308,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
EchoResponse response;
ClientContext context;
ClientWriter<EchoRequest>* stream = stub_->RequestStream(&context, &response);
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
@ -318,8 +316,6 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
Status s = stream->Finish();
EXPECT_EQ(response.message(), "hellohello");
EXPECT_TRUE(s.IsOk());
delete stream;
}
TEST_F(End2endTest, ResponseStream) {
@ -329,7 +325,7 @@ TEST_F(End2endTest, ResponseStream) {
ClientContext context;
request.set_message("hello");
ClientReader<EchoResponse>* stream = stub_->ResponseStream(&context, request);
auto stream = stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
@ -340,8 +336,6 @@ TEST_F(End2endTest, ResponseStream) {
Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
}
TEST_F(End2endTest, BidiStream) {
@ -351,8 +345,7 @@ TEST_F(End2endTest, BidiStream) {
ClientContext context;
grpc::string msg("hello");
ClientReaderWriter<EchoRequest, EchoResponse>* stream =
stub_->BidiStream(&context);
auto stream = stub_->BidiStream(&context);
request.set_message(msg + "0");
EXPECT_TRUE(stream->Write(request));
@ -374,8 +367,6 @@ TEST_F(End2endTest, BidiStream) {
Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
}
// Talk to the two services with the same name but different package names.
@ -425,14 +416,11 @@ TEST_F(End2endTest, BadCredentials) {
EXPECT_EQ("Rpc sent on a lame channel.", s.details());
ClientContext context2;
ClientReaderWriter<EchoRequest, EchoResponse>* stream =
stub->BidiStream(&context2);
auto stream = stub->BidiStream(&context2);
s = stream->Finish();
EXPECT_FALSE(s.IsOk());
EXPECT_EQ(StatusCode::UNKNOWN, s.code());
EXPECT_EQ("Rpc sent on a lame channel.", s.details());
delete stream;
}
} // namespace testing

@ -48,6 +48,9 @@ RUN git config --global url."git@github.com:".insteadOf "https://github.com/"
# Get the source from GitHub
RUN go get google.golang.org/grpc
# Add a service_account directory containing the auth creds file
ADD service_account service_account
# Build the interop client and server
RUN cd src/google.golang.org/grpc/interop/client && go install
RUN cd src/google.golang.org/grpc/interop/server && go install

@ -935,6 +935,36 @@ grpc_cloud_prod_gen_ruby_cmd() {
echo $the_cmd
}
# constructs the full dockerized Go interop test cmd.
#
# call-seq:
# flags= .... # generic flags to include the command
# cmd=$($grpc_gen_test_cmd $flags)
grpc_cloud_prod_auth_service_account_creds_gen_go_cmd() {
local cmd_prefix="sudo docker run grpc/go /bin/bash -c"
local test_script="cd src/google.golang.org/grpc/interop/client"
local test_script+=" && go run client.go --use_tls=true"
local gfe_flags=" --tls_ca_file=\"\" --tls_server_name=\"\" --server_port=443 --server_host=grpc-test.sandbox.google.com"
local added_gfe_flags=$(_grpc_svc_acc_test_flags)
local the_cmd="$cmd_prefix '$test_script $gfe_flags $added_gfe_flags $@'"
echo $the_cmd
}
# constructs the full dockerized Go interop test cmd.
#
# call-seq:
# flags= .... # generic flags to include the command
# cmd=$($grpc_gen_test_cmd $flags)
grpc_cloud_prod_auth_compute_engine_creds_gen_go_cmd() {
local cmd_prefix="sudo docker run grpc/go /bin/bash -c"
local test_script="cd src/google.golang.org/grpc/interop/client"
local test_script+=" && go run client.go --use_tls=true"
local gfe_flags=" --tls_ca_file=\"\" --tls_server_name=\"\" --server_port=443 --server_host=grpc-test.sandbox.google.com"
local added_gfe_flags=$(_grpc_gce_test_flags)
local the_cmd="$cmd_prefix '$test_script $gfe_flags $added_gfe_flags $@'"
echo $the_cmd
}
# constructs the full dockerized ruby service_account auth interop test cmd.
#
# call-seq:

@ -364,7 +364,7 @@ grpc_docker_launch_registry() {
grpc_docker_pull_known() {
local addr=$1
[[ -n $addr ]] || addr="0.0.0.0:5000"
local known="base cxx php_base php ruby_base ruby java_base java go node_base node"
local known="base cxx php_base php ruby_base ruby java_base java go node_base node python_base python"
echo "... pulling docker images for '$known'"
for i in $known
do
@ -408,6 +408,7 @@ grpc_dockerfile_install() {
}
[[ $image_label == "grpc/go" ]] && {
grpc_docker_sync_github_key $dockerfile_dir/.ssh 'go_ssh_key' || return 1;
grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1;
}
[[ $image_label == "grpc/java_base" ]] && {
grpc_docker_sync_github_key $dockerfile_dir/.ssh 'java_base_ssh_key' || return 1;

Loading…
Cancel
Save