Merge branch 'master' into sreek-epoll1

pull/11816/head
Sree Kuchibhotla 7 years ago
commit 471aa62716
  1. 1
      BUILD
  2. 6
      bazel/grpc_build_system.bzl
  3. 12
      src/compiler/php_generator.cc
  4. 3
      src/compiler/php_generator.h
  5. 17
      src/compiler/php_generator_helpers.h
  6. 5
      src/compiler/php_plugin.cc
  7. 5
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  8. 7
      src/core/ext/transport/inproc/inproc_transport.c
  9. 5
      src/core/lib/iomgr/sockaddr_utils.c
  10. 2
      src/core/lib/iomgr/sockaddr_utils.h
  11. 14
      src/core/lib/iomgr/tcp_server_uv.c
  12. 6
      src/core/lib/support/env.h
  13. 32
      src/core/lib/support/env_linux.c
  14. 5
      src/core/lib/support/env_posix.c
  15. 5
      src/core/lib/support/env_windows.c
  16. 8
      src/core/lib/support/log.c
  17. 18
      src/cpp/server/server_builder.cc
  18. 148
      src/ruby/lib/grpc/generic/active_call.rb
  19. 62
      src/ruby/lib/grpc/generic/bidi_call.rb
  20. 4
      src/ruby/lib/grpc/generic/rpc_desc.rb
  21. 1
      src/ruby/lib/grpc/generic/rpc_server.rb
  22. 137
      src/ruby/spec/client_auth_spec.rb
  23. 4
      src/ruby/spec/generic/active_call_spec.rb
  24. 347
      src/ruby/spec/generic/client_stub_spec.rb
  25. 10
      src/ruby/spec/generic/rpc_desc_spec.rb
  26. 145
      src/ruby/spec/generic/rpc_server_spec.rb
  27. 16
      src/ruby/spec/testdata/client.key
  28. 14
      src/ruby/spec/testdata/client.pem
  29. 9
      tools/internal_ci/linux/grpc_build_artifacts.sh
  30. 5
      tools/profiling/microbenchmarks/bm_diff/bm_diff.py
  31. 14
      tools/profiling/microbenchmarks/bm_diff/bm_main.py
  32. 21
      tools/profiling/microbenchmarks/bm_diff/bm_run.py
  33. 2
      tools/run_tests/sanity/core_banned_functions.py

@ -1543,6 +1543,7 @@ grpc_cc_library(
":grpc++", ":grpc++",
"//src/proto/grpc/reflection/v1alpha:reflection_proto", "//src/proto/grpc/reflection/v1alpha:reflection_proto",
], ],
alwayslink = 1,
) )
grpc_cc_library( grpc_cc_library(

@ -25,7 +25,8 @@
def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [], def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
external_deps = [], deps = [], standalone = False, external_deps = [], deps = [], standalone = False,
language = "C++", testonly = False, visibility = None): language = "C++", testonly = False, visibility = None,
alwayslink = 0):
copts = [] copts = []
if language.upper() == "C": if language.upper() == "C":
copts = ["-std=c99"] copts = ["-std=c99"]
@ -40,7 +41,8 @@ def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
linkopts = ["-pthread"], linkopts = ["-pthread"],
includes = [ includes = [
"include" "include"
] ],
alwayslink = alwayslink,
) )
def grpc_proto_plugin(name, srcs = [], deps = []): def grpc_proto_plugin(name, srcs = [], deps = []):

@ -97,13 +97,14 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
} }
// Prints out the service descriptor object // Prints out the service descriptor object
void PrintService(const ServiceDescriptor *service, Printer *out) { void PrintService(const ServiceDescriptor *service,
const grpc::string &parameter, Printer *out) {
map<grpc::string, grpc::string> vars; map<grpc::string, grpc::string> vars;
out->Print("/**\n"); out->Print("/**\n");
out->Print(GetPHPComments(service, " *").c_str()); out->Print(GetPHPComments(service, " *").c_str());
out->Print(" */\n"); out->Print(" */\n");
vars["name"] = service->name(); vars["name"] = GetPHPServiceClassname(service, parameter);
out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n"); out->Print(vars, "class $name$ extends \\Grpc\\BaseStub {\n\n");
out->Indent(); out->Indent();
out->Indent(); out->Indent();
out->Print( out->Print(
@ -131,7 +132,8 @@ void PrintService(const ServiceDescriptor *service, Printer *out) {
} }
grpc::string GenerateFile(const FileDescriptor *file, grpc::string GenerateFile(const FileDescriptor *file,
const ServiceDescriptor *service) { const ServiceDescriptor *service,
const grpc::string &parameter) {
grpc::string output; grpc::string output;
{ {
StringOutputStream output_stream(&output); StringOutputStream output_stream(&output);
@ -150,7 +152,7 @@ grpc::string GenerateFile(const FileDescriptor *file,
vars["package"] = MessageIdentifierName(file->package()); vars["package"] = MessageIdentifierName(file->package());
out.Print(vars, "namespace $package$;\n\n"); out.Print(vars, "namespace $package$;\n\n");
PrintService(service, &out); PrintService(service, parameter, &out);
} }
return output; return output;
} }

@ -24,7 +24,8 @@
namespace grpc_php_generator { namespace grpc_php_generator {
grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file, grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file,
const grpc::protobuf::ServiceDescriptor *service); const grpc::protobuf::ServiceDescriptor *service,
const grpc::string &parameter);
} // namespace grpc_php_generator } // namespace grpc_php_generator

@ -26,9 +26,22 @@
namespace grpc_php_generator { namespace grpc_php_generator {
inline grpc::string GetPHPServiceClassname(
const grpc::protobuf::ServiceDescriptor *service,
const grpc::string &parameter) {
grpc::string suffix;
if (parameter == "") {
suffix = "Client";
} else {
suffix = parameter;
}
return service->name() + suffix;
}
inline grpc::string GetPHPServiceFilename( inline grpc::string GetPHPServiceFilename(
const grpc::protobuf::FileDescriptor *file, const grpc::protobuf::FileDescriptor *file,
const grpc::protobuf::ServiceDescriptor *service) { const grpc::protobuf::ServiceDescriptor *service,
const grpc::string &parameter) {
std::vector<grpc::string> tokens = std::vector<grpc::string> tokens =
grpc_generator::tokenize(file->package(), "."); grpc_generator::tokenize(file->package(), ".");
std::ostringstream oss; std::ostringstream oss;
@ -36,7 +49,7 @@ inline grpc::string GetPHPServiceFilename(
oss << (i == 0 ? "" : "/") oss << (i == 0 ? "" : "/")
<< grpc_generator::CapitalizeFirstLetter(tokens[i]); << grpc_generator::CapitalizeFirstLetter(tokens[i]);
} }
return oss.str() + "/" + service->name() + "Client.php"; return oss.str() + "/" + GetPHPServiceClassname(service, parameter) + ".php";
} }
// ReplaceAll replaces all instances of search with replace in s. // ReplaceAll replaces all instances of search with replace in s.

@ -41,10 +41,11 @@ class PHPGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
} }
for (int i = 0; i < file->service_count(); i++) { for (int i = 0; i < file->service_count(); i++) {
grpc::string code = GenerateFile(file, file->service(i)); grpc::string code = GenerateFile(file, file->service(i), parameter);
// Get output file name // Get output file name
grpc::string file_name = GetPHPServiceFilename(file, file->service(i)); grpc::string file_name =
GetPHPServiceFilename(file, file->service(i), parameter);
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output( std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->Open(file_name)); context->Open(file_name));

@ -1705,7 +1705,6 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) { const grpc_lb_policy_args *args) {
glb_lb_policy *glb_policy = (glb_lb_policy *)policy; glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
if (glb_policy->updating_lb_channel) { if (glb_policy->updating_lb_channel) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -1813,9 +1812,11 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// lb_on_server_status_received will pick up the cancel and reinit // lb_on_server_status_received will pick up the cancel and reinit
// lb_call. // lb_call.
if (glb_policy->pending_update_args != NULL) { if (glb_policy->pending_update_args != NULL) {
const grpc_lb_policy_args *args = glb_policy->pending_update_args; grpc_lb_policy_args *args = glb_policy->pending_update_args;
glb_policy->pending_update_args = NULL; glb_policy->pending_update_args = NULL;
glb_update_locked(exec_ctx, &glb_policy->base, args); glb_update_locked(exec_ctx, &glb_policy->base, args);
grpc_channel_args_destroy(exec_ctx, args->args);
gpr_free(args);
} }
} else if (glb_policy->started_picking && !glb_policy->shutting_down) { } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) { if (glb_policy->retry_timer_active) {

@ -190,8 +190,11 @@ typedef struct inproc_stream {
static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs, size_t max, grpc_byte_stream *bs, size_t max,
grpc_closure *on_complete) { grpc_closure *on_complete) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; // Because inproc transport always provides the entire message atomically,
return (stream->le->sb.count != 0); // the byte stream always has data available when this function is called.
// Thus, this function always returns true (unlike other transports) and
// there is never any need to schedule a closure
return true;
} }
static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,

@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme(
return NULL; return NULL;
} }
int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) {
const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
return addr->sa_family;
}
int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) {
const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
switch (addr->sa_family) { switch (addr->sa_family) {

@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr);
/* Returns the URI scheme corresponding to \a addr */ /* Returns the URI scheme corresponding to \a addr */
const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr);
int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */

@ -316,6 +316,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
unsigned port_index = 0; unsigned port_index = 0;
int status; int status;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
int family;
if (s->tail != NULL) { if (s->tail != NULL) {
port_index = s->tail->port_index + 1; port_index = s->tail->port_index + 1;
@ -353,7 +354,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
} }
handle = gpr_malloc(sizeof(uv_tcp_t)); handle = gpr_malloc(sizeof(uv_tcp_t));
status = uv_tcp_init(uv_default_loop(), handle);
family = grpc_sockaddr_get_family(addr);
status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
if (family == AF_INET || family == AF_INET6) {
int fd;
uv_fileno((uv_handle_t *)handle, &fd);
int enable = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
}
#endif /* GPR_LINUX && SO_REUSEPORT */
if (status == 0) { if (status == 0) {
error = add_socket_to_server(s, handle, addr, port_index, &sp); error = add_socket_to_server(s, handle, addr, port_index, &sp);
} else { } else {

@ -36,6 +36,12 @@ char *gpr_getenv(const char *name);
/* Sets the the environment with the specified name to the specified value. */ /* Sets the the environment with the specified name to the specified value. */
void gpr_setenv(const char *name, const char *value); void gpr_setenv(const char *name, const char *value);
/* This is a version of gpr_getenv that does not produce any output if it has to
use an insecure version of the function. It is ONLY to be used to solve the
problem in which we need to check an env variable to configure the verbosity
level of logging. So DO NOT USE THIS. */
const char *gpr_getenv_silent(const char *name, char **dst);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -38,7 +38,9 @@
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
char *gpr_getenv(const char *name) { const char *gpr_getenv_silent(const char *name, char **dst) {
const char *insecure_func_used = NULL;
char *result = NULL;
#if defined(GPR_BACKWARDS_COMPATIBILITY_MODE) #if defined(GPR_BACKWARDS_COMPATIBILITY_MODE)
typedef char *(*getenv_type)(const char *); typedef char *(*getenv_type)(const char *);
static getenv_type getenv_func = NULL; static getenv_type getenv_func = NULL;
@ -48,22 +50,28 @@ char *gpr_getenv(const char *name) {
for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) { for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) {
getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]); getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]);
if (getenv_func != NULL && strstr(names[i], "secure") == NULL) { if (getenv_func != NULL && strstr(names[i], "secure") == NULL) {
gpr_log(GPR_DEBUG, insecure_func_used = names[i];
"Warning: insecure environment read function '%s' used",
names[i]);
} }
} }
char *result = getenv_func(name); result = getenv_func(name);
return result == NULL ? result : gpr_strdup(result);
#elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17) #elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17)
char *result = secure_getenv(name); result = secure_getenv(name);
return result == NULL ? result : gpr_strdup(result);
#else #else
gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", result = getenv(name);
"getenv"); insecure_func_used = "getenv";
char *result = getenv(name);
return result == NULL ? result : gpr_strdup(result);
#endif #endif
*dst = result == NULL ? result : gpr_strdup(result);
return insecure_func_used;
}
char *gpr_getenv(const char *name) {
char *result = NULL;
const char *insecure_func_used = gpr_getenv_silent(name, &result);
if (insecure_func_used != NULL) {
gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
insecure_func_used);
}
return result;
} }
void gpr_setenv(const char *name, const char *value) { void gpr_setenv(const char *name, const char *value) {

@ -29,6 +29,11 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
const char *gpr_getenv_silent(const char *name, char **dst) {
*dst = gpr_getenv(name);
return NULL;
}
char *gpr_getenv(const char *name) { char *gpr_getenv(const char *name) {
char *result = getenv(name); char *result = getenv(name);
return result == NULL ? result : gpr_strdup(result); return result == NULL ? result : gpr_strdup(result);

@ -30,6 +30,11 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
const char *gpr_getenv_silent(const char *name, char **dst) {
*dst = gpr_getenv(name);
return NULL;
}
char *gpr_getenv(const char *name) { char *gpr_getenv(const char *name) {
char *result = NULL; char *result = NULL;
DWORD size; DWORD size;

@ -64,7 +64,8 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
} }
void gpr_log_verbosity_init() { void gpr_log_verbosity_init() {
char *verbosity = gpr_getenv("GRPC_VERBOSITY"); char *verbosity = NULL;
const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity);
gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR; gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR;
if (verbosity != NULL) { if (verbosity != NULL) {
@ -81,6 +82,11 @@ void gpr_log_verbosity_init() {
GPR_LOG_VERBOSITY_UNSET) { GPR_LOG_VERBOSITY_UNSET) {
gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print); gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print);
} }
if (insecure_getenv != NULL) {
gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
insecure_getenv);
}
} }
void gpr_set_log_function(gpr_log_func f) { void gpr_set_log_function(gpr_log_func f) {

@ -250,14 +250,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
has_sync_methods && num_frequently_polled_cqs > 0; has_sync_methods && num_frequently_polled_cqs > 0;
if (has_sync_methods) { if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
"Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
"%d, CQ timeout (msec): %d",
sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec);
grpc_cq_polling_type polling_type = grpc_cq_polling_type polling_type =
is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
@ -272,6 +264,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec)); sync_server_settings_.cq_timeout_msec));
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
"Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
"%d, CQ timeout (msec): %d",
sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec);
}
ServerInitializer* initializer = server->initializer(); ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e // Register all the completion queues with the server. i.e

@ -40,13 +40,13 @@ end
module GRPC module GRPC
# The ActiveCall class provides simple methods for sending marshallable # The ActiveCall class provides simple methods for sending marshallable
# data to a call # data to a call
class ActiveCall class ActiveCall # rubocop:disable Metrics/ClassLength
include Core::TimeConsts include Core::TimeConsts
include Core::CallOps include Core::CallOps
extend Forwardable extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:peer, :peer_cert, :trailing_metadata :trailing_metadata, :status
# client_invoke begins a client invocation. # client_invoke begins a client invocation.
# #
@ -100,6 +100,18 @@ module GRPC
fail(ArgumentError, 'Already sent md') if started && metadata_to_send fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started @metadata_to_send = metadata_to_send || {} unless started
@send_initial_md_mutex = Mutex.new @send_initial_md_mutex = Mutex.new
@output_stream_done = false
@input_stream_done = false
@call_finished = false
@call_finished_mu = Mutex.new
@client_call_executed = false
@client_call_executed_mu = Mutex.new
# set the peer now so that the accessor can still function
# after the server closes the call
@peer = call.peer
end end
# Sends the initial metadata that has yet to be sent. # Sends the initial metadata that has yet to be sent.
@ -142,11 +154,9 @@ module GRPC
Operation.new(self) Operation.new(self)
end end
# finished waits until a client call is completed. def receive_and_check_status
#
# It blocks until the remote endpoint acknowledges by sending a status.
def finished
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
attach_status_results_and_complete_call(batch_result) attach_status_results_and_complete_call(batch_result)
end end
@ -155,8 +165,6 @@ module GRPC
@call.trailing_metadata = recv_status_batch_result.status.metadata @call.trailing_metadata = recv_status_batch_result.status.metadata
end end
@call.status = recv_status_batch_result.status @call.status = recv_status_batch_result.status
@call.close
op_is_done
# The RECV_STATUS in run_batch always succeeds # The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch # Check the status for a bad status or failed run batch
@ -193,9 +201,19 @@ module GRPC
} }
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(ops) @call.run_batch(ops)
set_output_stream_done
nil nil
end end
# Intended for use on server-side calls when a single request from
# the client is expected (i.e., unary and server-streaming RPC types).
def read_unary_request
req = remote_read
set_input_stream_done
req
end
def server_unary_response(req, trailing_metadata: {}, def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK') code: Core::StatusCodes::OK, details: 'OK')
ops = {} ops = {}
@ -211,6 +229,7 @@ module GRPC
ops[RECV_CLOSE_ON_SERVER] = nil ops[RECV_CLOSE_ON_SERVER] = nil
@call.run_batch(ops) @call.run_batch(ops)
set_output_stream_done
end end
# remote_read reads a response from the remote endpoint. # remote_read reads a response from the remote endpoint.
@ -241,6 +260,8 @@ module GRPC
# each_remote_read passes each response to the given block or returns an # each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given. # enumerator the responses if no block is given.
# Used to generate the request enumerable for
# server-side client-streaming RPC's.
# #
# == Enumerator == # == Enumerator ==
# #
@ -258,11 +279,15 @@ module GRPC
# @return [Enumerator] if no block was given # @return [Enumerator] if no block was given
def each_remote_read def each_remote_read
return enum_for(:each_remote_read) unless block_given? return enum_for(:each_remote_read) unless block_given?
begin
loop do loop do
resp = remote_read resp = remote_read
break if resp.nil? # the last response was received break if resp.nil? # the last response was received
yield resp yield resp
end end
ensure
set_input_stream_done
end
end end
# each_remote_read_then_finish passes each response to the given block or # each_remote_read_then_finish passes each response to the given block or
@ -287,14 +312,18 @@ module GRPC
# @return [Enumerator] if no block was given # @return [Enumerator] if no block was given
def each_remote_read_then_finish def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given? return enum_for(:each_remote_read_then_finish) unless block_given?
begin
loop do loop do
resp = remote_read resp = remote_read
if resp.nil? # the last response was received, but not finished yet if resp.nil? # the last response was received
finished receive_and_check_status
break break
end end
yield resp yield resp
end end
ensure
set_input_stream_done
end
end end
# request_response sends a request to a GRPC server, and returns the # request_response sends a request to a GRPC server, and returns the
@ -305,6 +334,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def request_response(req, metadata: {}) def request_response(req, metadata: {})
raise_error_if_already_executed
ops = { ops = {
SEND_MESSAGE => @marshal.call(req), SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil, SEND_CLOSE_FROM_CLIENT => nil,
@ -319,7 +349,15 @@ module GRPC
end end
@metadata_sent = true @metadata_sent = true
end end
begin
batch_result = @call.run_batch(ops) batch_result = @call.run_batch(ops)
# no need to check for cancellation after a CallError because this
# batch contains a RECV_STATUS op
ensure
set_input_stream_done
set_output_stream_done
end
@call.metadata = batch_result.metadata @call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result) attach_status_results_and_complete_call(batch_result)
@ -339,10 +377,20 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def client_streamer(requests, metadata: {}) def client_streamer(requests, metadata: {})
# Metadata might have already been sent if this is an operation view raise_error_if_already_executed
begin
merge_metadata_and_send_if_not_already_sent(metadata) merge_metadata_and_send_if_not_already_sent(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
rescue GRPC::Core::CallError => e
receive_and_check_status # check for Cancelled
raise e
rescue => e
set_input_stream_done
raise e
ensure
set_output_stream_done
end
batch_result = @call.run_batch( batch_result = @call.run_batch(
SEND_CLOSE_FROM_CLIENT => nil, SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil, RECV_INITIAL_METADATA => nil,
@ -350,12 +398,11 @@ module GRPC
RECV_STATUS_ON_CLIENT => nil RECV_STATUS_ON_CLIENT => nil
) )
set_input_stream_done
@call.metadata = batch_result.metadata @call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result) attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result) get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end end
# server_streamer sends one request to the GRPC server, which yields a # server_streamer sends one request to the GRPC server, which yields a
@ -373,6 +420,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator # @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {}) def server_streamer(req, metadata: {})
raise_error_if_already_executed
ops = { ops = {
SEND_MESSAGE => @marshal.call(req), SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil SEND_CLOSE_FROM_CLIENT => nil
@ -384,13 +432,22 @@ module GRPC
end end
@metadata_sent = true @metadata_sent = true
end end
begin
@call.run_batch(ops) @call.run_batch(ops)
rescue GRPC::Core::CallError => e
receive_and_check_status # checks for Cancelled
raise e
rescue => e
set_input_stream_done
raise e
ensure
set_output_stream_done
end
replies = enum_for(:each_remote_read_then_finish) replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given? return replies unless block_given?
replies.each { |r| yield r } replies.each { |r| yield r }
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end end
# bidi_streamer sends a stream of requests to the GRPC server, and yields # bidi_streamer sends a stream of requests to the GRPC server, and yields
@ -421,6 +478,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator # @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk) def bidi_streamer(requests, metadata: {}, &blk)
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view # Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata) merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call, bd = BidiCall.new(@call,
@ -428,7 +486,10 @@ module GRPC
@unmarshal, @unmarshal,
metadata_received: @metadata_received) metadata_received: @metadata_received)
bd.run_on_client(requests, @op_notifier, &blk) bd.run_on_client(requests,
proc { set_input_stream_done },
proc { set_output_stream_done },
&blk)
end end
# run_server_bidi orchestrates a BiDi stream processing on a server. # run_server_bidi orchestrates a BiDi stream processing on a server.
@ -449,7 +510,7 @@ module GRPC
metadata_received: @metadata_received, metadata_received: @metadata_received,
req_view: MultiReqView.new(self)) req_view: MultiReqView.new(self))
bd.run_on_server(gen_each_reply) bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
end end
# Waits till an operation completes # Waits till an operation completes
@ -459,7 +520,8 @@ module GRPC
@op_notifier.wait @op_notifier.wait
end end
# Signals that an operation is done # Signals that an operation is done.
# Only relevant on the client-side (this is a no-op on the server-side)
def op_is_done def op_is_done
return if @op_notifier.nil? return if @op_notifier.nil?
@op_notifier.notify(self) @op_notifier.notify(self)
@ -484,8 +546,40 @@ module GRPC
end end
end end
def attach_peer_cert(peer_cert)
@peer_cert = peer_cert
end
private private
# To be called once the "input stream" has been completelly
# read through (i.e, done reading from client or received status)
# note this is idempotent
def set_input_stream_done
@call_finished_mu.synchronize do
@input_stream_done = true
maybe_finish_and_close_call_locked
end
end
# To be called once the "output stream" has been completelly
# sent through (i.e, done sending from client or sent status)
# note this is idempotent
def set_output_stream_done
@call_finished_mu.synchronize do
@output_stream_done = true
maybe_finish_and_close_call_locked
end
end
def maybe_finish_and_close_call_locked
return unless @output_stream_done && @input_stream_done
return if @call_finished
@call_finished = true
op_is_done
@call.close
end
# Starts the call if not already started # Starts the call if not already started
# @param metadata [Hash] metadata to be sent to the server. If a value is # @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
@ -493,6 +587,15 @@ module GRPC
merge_metadata_to_send(metadata) && send_initial_metadata merge_metadata_to_send(metadata) && send_initial_metadata
end end
def raise_error_if_already_executed
@client_call_executed_mu.synchronize do
if @client_call_executed
fail GRPC::Core::CallError, 'attempting to re-run a call'
end
@client_call_executed = true
end
end
def self.view_class(*visible_methods) def self.view_class(*visible_methods)
Class.new do Class.new do
extend ::Forwardable extend ::Forwardable
@ -518,6 +621,7 @@ module GRPC
# server client_streamer handlers. # server client_streamer handlers.
MultiReqView = view_class(:cancelled?, :deadline, MultiReqView = view_class(:cancelled?, :deadline,
:each_remote_read, :metadata, :output_metadata, :each_remote_read, :metadata, :output_metadata,
:peer, :peer_cert,
:send_initial_metadata, :send_initial_metadata,
:metadata_to_send, :metadata_to_send,
:merge_metadata_to_send, :merge_metadata_to_send,

@ -62,12 +62,19 @@ module GRPC
# block that can be invoked with each response. # block that can be invoked with each response.
# #
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @param op_notifier a Notifier used to signal completion # @param set_input_stream_done [Proc] called back when we're done
# reading the input stream
# @param set_input_stream_done [Proc] called back when we're done
# sending data on the output stream
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, op_notifier, &blk) def run_on_client(requests,
@op_notifier = op_notifier set_input_stream_done,
@enq_th = Thread.new { write_loop(requests) } set_output_stream_done,
read_loop(&blk) &blk)
@enq_th = Thread.new do
write_loop(requests, set_output_stream_done: set_output_stream_done)
end
read_loop(set_input_stream_done, &blk)
end end
# Begins orchestration of the Bidi stream for a server generating replies. # Begins orchestration of the Bidi stream for a server generating replies.
@ -81,12 +88,17 @@ module GRPC
# produced by gen_each_reply could ignore the received_msgs # produced by gen_each_reply could ignore the received_msgs
# #
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) # @param set_input_steam_done [Proc] call back to call when
# the reads have been completely read through.
def run_on_server(gen_each_reply, set_input_stream_done)
# Pass in the optional call object parameter if possible # Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1 if gen_each_reply.arity == 1
replys = gen_each_reply.call(read_loop(is_client: false)) replys = gen_each_reply.call(
read_loop(set_input_stream_done, is_client: false))
elsif gen_each_reply.arity == 2 elsif gen_each_reply.arity == 2
replys = gen_each_reply.call(read_loop(is_client: false), @req_view) replys = gen_each_reply.call(
read_loop(set_input_stream_done, is_client: false),
@req_view)
else else
fail 'Illegal arity of reply generator' fail 'Illegal arity of reply generator'
end end
@ -99,22 +111,6 @@ module GRPC
END_OF_READS = :end_of_reads END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes END_OF_WRITES = :end_of_writes
# signals that bidi operation is complete
def notify_done
return unless @op_notifier
GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
@op_notifier.notify(self)
end
# signals that a bidi operation is complete (read + write)
def finished
@done_mutex.synchronize do
return unless @reads_complete && @writes_complete && !@complete
@call.close
@complete = true
end
end
# performs a read using @call.run_batch, ensures metadata is set up # performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch def read_using_run_batch
ops = { RECV_MESSAGE => nil } ops = { RECV_MESSAGE => nil }
@ -127,7 +123,8 @@ module GRPC
batch_result batch_result
end end
def write_loop(requests, is_client: true) # set_output_stream_done is relevant on client-side
def write_loop(requests, is_client: true, set_output_stream_done: nil)
GRPC.logger.debug('bidi-write-loop: starting') GRPC.logger.debug('bidi-write-loop: starting')
count = 0 count = 0
requests.each do |req| requests.each do |req|
@ -151,23 +148,20 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done') GRPC.logger.debug('bidi-write-loop: done')
notify_done
@writes_complete = true
finished
end end
GRPC.logger.debug('bidi-write-loop: finished') GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e) GRPC.logger.warn(e)
notify_done
@writes_complete = true
finished
raise e raise e
ensure
set_output_stream_done.call if is_client
end end
# Provides an enumerator that yields results of remote reads # Provides an enumerator that yields results of remote reads
def read_loop(is_client: true) def read_loop(set_input_stream_done, is_client: true)
return enum_for(:read_loop, return enum_for(:read_loop,
set_input_stream_done,
is_client: is_client) unless block_given? is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting') GRPC.logger.debug('bidi-read-loop: starting')
begin begin
@ -201,10 +195,10 @@ module GRPC
GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e) GRPC.logger.warn(e)
raise e raise e
ensure
set_input_stream_done.call
end end
GRPC.logger.debug('bidi-read-loop: finished') GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
# Make sure that the write loop is done done before finishing the call. # Make sure that the write loop is done done before finishing the call.
# Note that blocking is ok at this point because we've already received # Note that blocking is ok at this point because we've already received
# a status # a status

@ -48,7 +48,7 @@ module GRPC
end end
def handle_request_response(active_call, mth) def handle_request_response(active_call, mth)
req = active_call.remote_read req = active_call.read_unary_request
resp = mth.call(req, active_call.single_req_view) resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response( active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata) resp, trailing_metadata: active_call.output_metadata)
@ -61,7 +61,7 @@ module GRPC
end end
def handle_server_streamer(active_call, mth) def handle_server_streamer(active_call, mth)
req = active_call.remote_read req = active_call.read_unary_request
replys = mth.call(req, active_call.single_req_view) replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) } replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata) send_status(active_call, OK, 'OK', active_call.output_metadata)

@ -418,6 +418,7 @@ module GRPC
metadata_received: true, metadata_received: true,
started: false, started: false,
metadata_to_send: connect_md) metadata_to_send: connect_md)
c.attach_peer_cert(an_rpc.call.peer_cert)
mth = an_rpc.method.to_sym mth = an_rpc.method.to_sym
[c, mth] [c, mth]
end end

@ -0,0 +1,137 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'grpc'
def create_channel_creds
test_root = File.join(File.dirname(__FILE__), 'testdata')
files = ['ca.pem', 'client.key', 'client.pem']
creds = files.map { |f| File.open(File.join(test_root, f)).read }
GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2])
end
def client_cert
test_root = File.join(File.dirname(__FILE__), 'testdata')
cert = File.open(File.join(test_root, 'client.pem')).read
fail unless cert.is_a?(String)
cert
end
def create_server_creds
test_root = File.join(File.dirname(__FILE__), 'testdata')
p "test root: #{test_root}"
files = ['ca.pem', 'server1.key', 'server1.pem']
creds = files.map { |f| File.open(File.join(test_root, f)).read }
GRPC::Core::ServerCredentials.new(
creds[0],
[{ private_key: creds[1], cert_chain: creds[2] }],
true) # force client auth
end
# A test message
class EchoMsg
def self.marshal(_o)
''
end
def self.unmarshal(_o)
EchoMsg.new
end
end
# a test service that checks the cert of its peer
class SslTestService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
def check_peer_cert(call)
error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}"
fail(error_msg) unless call.peer_cert == client_cert
end
def an_rpc(req, call)
check_peer_cert(call)
req
end
def a_client_streaming_rpc(call)
check_peer_cert(call)
call.each_remote_read.each { |r| p r }
EchoMsg.new
end
def a_server_streaming_rpc(_, call)
check_peer_cert(call)
[EchoMsg.new, EchoMsg.new]
end
def a_bidi_rpc(requests, call)
check_peer_cert(call)
requests.each { |r| p r }
[EchoMsg.new, EchoMsg.new]
end
end
SslTestServiceStub = SslTestService.rpc_stub_class
describe 'client-server auth' do
RpcServer = GRPC::RpcServer
before(:all) do
server_opts = {
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
port = @srv.add_http2_port('0.0.0.0:0', create_server_creds)
@srv.handle(SslTestService)
@srv_thd = Thread.new { @srv.run }
@srv.wait_till_running
client_opts = {
channel_args: {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
}
}
@stub = SslTestServiceStub.new("localhost:#{port}",
create_channel_creds,
**client_opts)
end
after(:all) do
expect(@srv.stopped?).to be(false)
@srv.stop
@srv_thd.join
end
it 'client-server auth with unary RPCs' do
@stub.an_rpc(EchoMsg.new)
end
it 'client-server auth with client streaming RPCs' do
@stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new])
end
it 'client-server auth with server streaming RPCs' do
responses = @stub.a_server_streaming_rpc(EchoMsg.new)
responses.each { |r| p r }
end
it 'client-server auth with bidi RPCs' do
responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new])
responses.each { |r| p r }
end
end

@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response') server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response') expect(client_call.remote_read).to eq('server_response')
server_call.send_status(OK, 'status code is OK') server_call.send_status(OK, 'status code is OK')
expect { client_call.finished }.to_not raise_error expect { client_call.receive_and_check_status }.to_not raise_error
end end
it 'finishes ok if the server sends an early status response' do it 'finishes ok if the server sends an early status response' do
@ -490,7 +490,7 @@ describe GRPC::ActiveCall do
expect do expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error end.to_not raise_error
expect { client_call.finished }.to_not raise_error expect { client_call.receive_and_check_status }.to_not raise_error
end end
it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do

@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts include GRPC::Core::TimeConsts
include GRPC::Core::CallOps include GRPC::Core::CallOps
# check that methods on a finished/closed call t crash
def check_op_view_of_finished_client_call(op_view,
expected_metadata,
expected_trailing_metadata)
# use read_response_stream to try to iterate through
# possible response stream
fail('need something to attempt reads') unless block_given?
expect do
resp = op_view.execute
yield resp
end.to raise_error(GRPC::Core::CallError)
expect { op_view.start_call }.to raise_error(RuntimeError)
sanity_check_values_of_accessors(op_view,
expected_metadata,
expected_trailing_metadata)
expect do
op_view.wait
op_view.cancel
op_view.write_flag = 1
end.to_not raise_error
end
def sanity_check_values_of_accessors(op_view,
expected_metadata,
expected_trailing_metadata)
expected_status = Struct::Status.new
expected_status.code = 0
expected_status.details = 'OK'
expected_status.metadata = expected_trailing_metadata
expect(op_view.status).to eq(expected_status)
expect(op_view.metadata).to eq(expected_metadata)
expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
expect(op_view.cancelled?).to be(false)
expect(op_view.write_flag).to be(nil)
# The deadline attribute of a call can be either
# a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
# TODO: fix so that the accessor always returns the same type.
expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
op_view.deadline.is_a?(Time)).to be(true)
end
describe 'ClientStub' do describe 'ClientStub' do
let(:noop) { proc { |x| x } } let(:noop) { proc { |x| x } }
@ -45,6 +92,7 @@ describe 'ClientStub' do
@method = 'an_rpc_method' @method = 'an_rpc_method'
@pass = OK @pass = OK
@fail = INTERNAL @fail = INTERNAL
@metadata = { k1: 'v1', k2: 'v2' }
end end
after(:each) do after(:each) do
@ -107,7 +155,7 @@ describe 'ClientStub' do
end end
end end
describe '#request_response' do describe '#request_response', request_response: true do
before(:each) do before(:each) do
@sent_msg, @resp = 'a_msg', 'a_reply' @sent_msg, @resp = 'a_msg', 'a_reply'
end end
@ -126,7 +174,7 @@ describe 'ClientStub' do
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass, th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2') expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp) expect(get_response(stub)).to eq(@resp)
th.join th.join
@ -187,13 +235,24 @@ describe 'ClientStub' do
# Kill the server thread so tests can complete # Kill the server thread so tests can complete
th.kill th.kill
end end
it 'should raise ArgumentError if metadata contains invalid values' do
@metadata.merge!(k3: 3)
server_port = create_test_server
host = "localhost:#{server_port}"
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect do
get_response(stub)
end.to raise_error(ArgumentError,
/Header values must be of type string or array/)
end
end end
describe 'without a call operation' do describe 'without a call operation' do
def get_response(stub, credentials: nil) def get_response(stub, credentials: nil)
puts credentials.inspect puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop, stub.request_response(@method, @sent_msg, noop, noop,
metadata: { k1: 'v1', k2: 'v2' }, metadata: @metadata,
credentials: credentials) credentials: credentials)
end end
@ -201,40 +260,62 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
after(:each) do
# make sure op.wait doesn't hang, even if there's a bad status
@op.wait
end
def get_response(stub, run_start_call_first: false, credentials: nil) def get_response(stub, run_start_call_first: false, credentials: nil)
op = stub.request_response(@method, @sent_msg, noop, noop, @op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, return_op: true,
metadata: { k1: 'v1', k2: 'v2' }, metadata: @metadata,
deadline: from_relative_time(2), deadline: from_relative_time(2),
credentials: credentials) credentials: credentials)
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
op.start_call if run_start_call_first @op.start_call if run_start_call_first
result = op.execute result = @op.execute
op.wait # make sure wait doesn't hang
result result
end end
it_behaves_like 'request response' it_behaves_like 'request response'
it 'sends metadata to the server ok when running start_call first' do def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2') @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_request_response(
@sent_msg, @resp, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp) expect(
get_response(stub,
run_start_call_first: run_start_call_first)).to eq(@resp)
th.join th.join
end end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
end end
end end
describe '#client_streamer' do describe '#client_streamer', client_streamer: true do
before(:each) do before(:each) do
Thread.abort_on_exception = true Thread.abort_on_exception = true
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
@metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply' @resp = 'a_reply'
end end
@ -247,7 +328,8 @@ describe 'ClientStub' do
end end
it 'should send metadata to the server ok' do it 'should send metadata to the server ok' do
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) th = run_client_streamer(@sent_msgs, @resp, @pass,
expected_metadata: @metadata)
expect(get_response(@stub)).to eq(@resp) expect(get_response(@stub)).to eq(@resp)
th.join th.join
end end
@ -278,27 +360,50 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
after(:each) do
# make sure op.wait doesn't hang, even if there's a bad status
@op.wait
end
def get_response(stub, run_start_call_first: false) def get_response(stub, run_start_call_first: false)
op = stub.client_streamer(@method, @sent_msgs, noop, noop, @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, metadata: @metadata) return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
op.start_call if run_start_call_first @op.start_call if run_start_call_first
result = op.execute result = @op.execute
op.wait # make sure wait doesn't hang
result result
end end
it_behaves_like 'client streaming' it_behaves_like 'client streaming'
it 'sends metadata to the server ok when running start_call first' do def run_op_view_metadata_test(run_start_call_first)
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_client_streamer(
@sent_msgs, @resp, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
expect(
get_response(@stub,
run_start_call_first: run_start_call_first)).to eq(@resp)
th.join th.join
end end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
end end
end end
describe '#server_streamer' do describe '#server_streamer', server_streamer: true do
before(:each) do before(:each) do
@sent_msg = 'a_msg' @sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@ -328,18 +433,42 @@ describe 'ClientStub' do
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail, th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2') expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub) e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join th.join
end end
it 'should raise ArgumentError if metadata contains invalid values' do
@metadata.merge!(k3: 3)
server_port = create_test_server
host = "localhost:#{server_port}"
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect do
get_responses(stub)
end.to raise_error(ArgumentError,
/Header values must be of type string or array/)
end
it 'the call terminates when there is an unmarshalling error' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @pass)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
expect do
get_responses(stub, unmarshal: unmarshal).collect { |r| r }
end.to raise_error(ArgumentError, 'test unmarshalling error')
th.join
end
end end
describe 'without a call operation' do describe 'without a call operation' do
def get_responses(stub) def get_responses(stub, unmarshal: noop)
e = stub.server_streamer(@method, @sent_msg, noop, noop, e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
metadata: { k1: 'v1', k2: 'v2' }) metadata: @metadata)
expect(e).to be_a(Enumerator) expect(e).to be_a(Enumerator)
e e
end end
@ -351,10 +480,10 @@ describe 'ClientStub' do
after(:each) do after(:each) do
@op.wait # make sure wait doesn't hang @op.wait # make sure wait doesn't hang
end end
def get_responses(stub, run_start_call_first: false) def get_responses(stub, run_start_call_first: false, unmarshal: noop)
@op = stub.server_streamer(@method, @sent_msg, noop, noop, @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
return_op: true, return_op: true,
metadata: { k1: 'v1', k2: 'v2' }) metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first @op.start_call if run_start_call_first
e = @op.execute e = @op.execute
@ -364,20 +493,41 @@ describe 'ClientStub' do
it_behaves_like 'server streaming' it_behaves_like 'server streaming'
it 'should send metadata to the server ok when start_call is run first' do def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail, @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
k1: 'v1', k2: 'v2') @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_server_streamer(
@sent_msg, @replys, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true) e = get_responses(stub, run_start_call_first: run_start_call_first)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) expect(e.collect { |r| r }).to eq(@replys)
th.join th.join
end end
it 'should send metadata to the server ok when start_call is run first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end end
end end
describe '#bidi_streamer' do it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
end
end
describe '#bidi_streamer', bidi: true do
before(:each) do before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@ -386,7 +536,7 @@ describe 'ClientStub' do
end end
shared_examples 'bidi streaming' do shared_examples 'bidi streaming' do
it 'supports sending all the requests first', bidi: true do it 'supports sending all the requests first' do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass) @pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@ -395,7 +545,7 @@ describe 'ClientStub' do
th.join th.join
end end
it 'supports client-initiated ping pong', bidi: true do it 'supports client-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub) e = get_responses(stub)
@ -403,18 +553,39 @@ describe 'ClientStub' do
th.join th.join
end end
it 'supports a server-initiated ping pong', bidi: true do it 'supports a server-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub) e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs) expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join th.join
end end
it 'should raise an error if the status is not ok' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
# TODO: add test for metadata-related ArgumentError in a bidi call once
# issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
it 'should send metadata to the server ok' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
expected_metadata: @metadata)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
end end
describe 'without a call operation' do describe 'without a call operation' do
def get_responses(stub) def get_responses(stub)
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
metadata: @metadata)
expect(e).to be_a(Enumerator) expect(e).to be_a(Enumerator)
e e
end end
@ -428,7 +599,8 @@ describe 'ClientStub' do
end end
def get_responses(stub, run_start_call_first: false) def get_responses(stub, run_start_call_first: false)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true) return_op: true,
metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first @op.start_call if run_start_call_first
e = @op.execute e = @op.execute
@ -438,27 +610,53 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming' it_behaves_like 'bidi streaming'
it 'can run start_call before executing the call' do def run_op_view_metadata_test(run_start_call_first)
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@pass) @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_bidi_streamer_echo_ping_pong(
@sent_msgs, @pass, true,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true) e = get_responses(stub, run_start_call_first: run_start_call_first)
expect(e.collect { |r| r }).to eq(@replys) expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join th.join
end end
it 'can run start_call before executing the call' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
it 'doesnt crash when op_view used after call has finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
end end
end end
def run_server_streamer(expected_input, replys, status, **kw) def run_server_streamer(expected_input, replys, status,
wanted_metadata = kw.clone expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier| wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier) c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v| wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v) expect(c.metadata[k.to_s]).to eq(v)
end end
expect(c.remote_read).to eq(expected_input) expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) } replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true) c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end end
end end
@ -472,9 +670,17 @@ describe 'ClientStub' do
end end
end end
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier| wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier) c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expected_inputs.each do |i| expected_inputs.each do |i|
if client_starts if client_starts
expect(c.remote_read).to eq(i) expect(c.remote_read).to eq(i)
@ -484,33 +690,44 @@ describe 'ClientStub' do
expect(c.remote_read).to eq(i) expect(c.remote_read).to eq(i)
end end
end end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true) c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end end
end end
def run_client_streamer(expected_inputs, resp, status, **kw) def run_client_streamer(expected_inputs, resp, status,
wanted_metadata = kw.clone expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier| wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier) c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) } expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v| wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v) expect(c.metadata[k.to_s]).to eq(v)
end end
c.remote_send(resp) c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true) c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end end
end end
def run_request_response(expected_input, resp, status, **kw) def run_request_response(expected_input, resp, status,
wanted_metadata = kw.clone expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier| wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier) c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
expect(c.remote_read).to eq(expected_input) expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v| wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v) expect(c.metadata[k.to_s]).to eq(v)
end end
c.remote_send(resp) c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true) c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end end
end end
@ -528,13 +745,13 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end end
def expect_server_to_be_invoked(notifier) def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
@server.start @server.start
notifier.notify(nil) notifier.notify(nil)
recvd_rpc = @server.request_call recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata recvd_call.metadata = recvd_rpc.metadata
recvd_call.run_batch(SEND_INITIAL_METADATA => nil) recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true) metadata_received: true)
end end

@ -38,14 +38,14 @@ describe GRPC::RpcDesc do
shared_examples 'it handles errors' do shared_examples 'it handles errors' do
it 'sends the specified status if BadStatus is raised' do it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new) expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
metadata: {}) metadata: {})
this_desc.run_server_method(@call, method(:bad_status)) this_desc.run_server_method(@call, method(:bad_status))
end end
it 'sends status UNKNOWN if other StandardErrors are raised' do it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new) expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(UNKNOWN, expect(@call).to receive(:send_status).once.with(UNKNOWN,
arg_error_msg, arg_error_msg,
false, metadata: {}) false, metadata: {})
@ -53,7 +53,7 @@ describe GRPC::RpcDesc do
end end
it 'absorbs CallError with no further action' do it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(CallError) expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
blk = proc do blk = proc do
this_desc.run_server_method(@call, method(:fake_reqresp)) this_desc.run_server_method(@call, method(:fake_reqresp))
end end
@ -75,7 +75,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do it 'sends a response and closes the stream if there no errors' do
req = Object.new req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:output_metadata).once.and_return(fake_md)
expect(@call).to receive(:server_unary_response).once expect(@call).to receive(:server_unary_response).once
.with(@ok_response, trailing_metadata: fake_md) .with(@ok_response, trailing_metadata: fake_md)
@ -133,7 +133,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do it 'sends a response and closes the stream if there no errors' do
req = Object.new req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true, expect(@call).to receive(:send_status).once.with(OK, 'OK', true,

@ -111,6 +111,47 @@ end
SlowStub = SlowService.rpc_stub_class SlowStub = SlowService.rpc_stub_class
# a test service that hangs onto call objects
# and uses them after the server-side call has been
# finished
class CheckCallAfterFinishedService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
attr_reader :server_side_call
def an_rpc(req, call)
fail 'shouldnt reuse service' unless @server_side_call.nil?
@server_side_call = call
req
end
def a_client_streaming_rpc(call)
fail 'shouldnt reuse service' unless @server_side_call.nil?
@server_side_call = call
# iterate through requests so call can complete
call.each_remote_read.each { |r| p r }
EchoMsg.new
end
def a_server_streaming_rpc(_, call)
fail 'shouldnt reuse service' unless @server_side_call.nil?
@server_side_call = call
[EchoMsg.new, EchoMsg.new]
end
def a_bidi_rpc(requests, call)
fail 'shouldnt reuse service' unless @server_side_call.nil?
@server_side_call = call
requests.each { |r| p r }
[EchoMsg.new, EchoMsg.new]
end
end
CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
describe GRPC::RpcServer do describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes StatusCodes = GRPC::Core::StatusCodes
@ -505,5 +546,109 @@ describe GRPC::RpcServer do
t.join t.join
end end
end end
context 'when call objects are used after calls have completed' do
before(:each) do
server_opts = {
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@alt_host = "0.0.0.0:#{alt_port}"
@service = CheckCallAfterFinishedService.new
@srv.handle(@service)
@srv_thd = Thread.new { @srv.run }
@srv.wait_till_running
end
# check that the server-side call is still in a usable state even
# after it has finished
def check_single_req_view_of_finished_call(call)
common_check_of_finished_server_call(call)
expect(call.peer).to be_a(String)
expect(call.peer_cert).to be(nil)
end
def check_multi_req_view_of_finished_call(call)
common_check_of_finished_server_call(call)
expect do
call.each_remote_read.each { |r| p r }
end.to raise_error(GRPC::Core::CallError)
end
def common_check_of_finished_server_call(call)
expect do
call.merge_metadata_to_send({})
end.to raise_error(RuntimeError)
expect do
call.send_initial_metadata
end.to_not raise_error
expect(call.cancelled?).to be(false)
expect(call.metadata).to be_a(Hash)
expect(call.metadata['user-agent']).to be_a(String)
expect(call.metadata_sent).to be(true)
expect(call.output_metadata).to eq({})
expect(call.metadata_to_send).to eq({})
expect(call.deadline.is_a?(Time)).to be(true)
end
it 'should not crash when call used after an unary call is finished' do
req = EchoMsg.new
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
resp = stub.an_rpc(req)
expect(resp).to be_a(EchoMsg)
@srv.stop
@srv_thd.join
check_single_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after client streaming finished' do
requests = [EchoMsg.new, EchoMsg.new]
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
resp = stub.a_client_streaming_rpc(requests)
expect(resp).to be_a(EchoMsg)
@srv.stop
@srv_thd.join
check_multi_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after server streaming finished' do
req = EchoMsg.new
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
responses = stub.a_server_streaming_rpc(req)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
@srv.stop
@srv_thd.join
check_single_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after a bidi call is finished' do
requests = [EchoMsg.new, EchoMsg.new]
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
responses = stub.a_bidi_rpc(requests)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
@srv.stop
@srv_thd.join
check_multi_req_view_of_finished_call(@service.server_side_call)
end
end
end end
end end

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM
s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM
JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT
NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS
k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH
0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS
W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI
w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5
0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5
/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/
U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP
1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd
9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI
JiqOszq9GWESErAatg==
-----END PRIVATE KEY-----

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV
BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw
MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB
nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX
b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz
W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw
R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/
T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk
tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C
OO+svdkmqH0KZo320ZUqdl2ooQ==
-----END CERTIFICATE-----

@ -20,9 +20,10 @@ cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc source tools/internal_ci/helper_scripts/prepare_build_linux_rc
# TODO(jtattermusch): install ruby on the internal_ci worker set +ex
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 [[ -s /etc/profile.d/rvm.sh ]] && . /etc/profile.d/rvm.sh
# TODO(jtattermusch): grep works around https://github.com/rvm/rvm/issues/4068 set -e # rvm commands are very verbose
curl -sSL https://get.rvm.io | grep -v __rvm_print_headline | bash -s stable --ruby rvm --default use ruby-2.4.1
set -ex
tools/run_tests/task_runner.py -f artifact linux tools/run_tests/task_runner.py -f artifact linux

@ -144,7 +144,7 @@ def _read_json(filename, badjson_files, nonexistant_files):
def fmt_dict(d): def fmt_dict(d):
return ''.join([" " + k + ": " + str(d[k]) + "\n" for k in d]) return ''.join([" " + k + ": " + str(d[k]) + "\n" for k in d])
def diff(bms, loops, track, old, new, counters): def diff(bms, loops, regex, track, old, new, counters):
benchmarks = collections.defaultdict(Benchmark) benchmarks = collections.defaultdict(Benchmark)
badjson_files = {} badjson_files = {}
@ -153,7 +153,8 @@ def diff(bms, loops, track, old, new, counters):
for loop in range(0, loops): for loop in range(0, loops):
for line in subprocess.check_output( for line in subprocess.check_output(
['bm_diff_%s/opt/%s' % (old, bm), ['bm_diff_%s/opt/%s' % (old, bm),
'--benchmark_list_tests']).splitlines(): '--benchmark_list_tests',
'--benchmark_filter=%s' % regex]).splitlines():
stripped_line = line.strip().replace("/", "_").replace( stripped_line = line.strip().replace("/", "_").replace(
"<", "_").replace(">", "_").replace(", ", "_") "<", "_").replace(">", "_").replace(", ", "_")
js_new_opt = _read_json('%s.%s.opt.%s.%d.json' % js_new_opt = _read_json('%s.%s.opt.%s.%d.json' %

@ -63,10 +63,10 @@ def _args():
help='Name of baseline run to compare to. Ususally just called "old"') help='Name of baseline run to compare to. Ususally just called "old"')
argp.add_argument( argp.add_argument(
'-r', '-r',
'--repetitions', '--regex',
type=int, type=str,
default=1, default="",
help='Number of repetitions to pass to the benchmarks') help='Regex to filter benchmarks run')
argp.add_argument( argp.add_argument(
'-l', '-l',
'--loops', '--loops',
@ -125,10 +125,10 @@ def main(args):
subprocess.check_call(['git', 'checkout', where_am_i]) subprocess.check_call(['git', 'checkout', where_am_i])
subprocess.check_call(['git', 'submodule', 'update']) subprocess.check_call(['git', 'submodule', 'update'])
bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
diff, note = bm_diff.diff(args.benchmarks, args.loops, args.track, old, diff, note = bm_diff.diff(args.benchmarks, args.loops, args.regex, args.track, old,
'new', args.counters) 'new', args.counters)
if diff: if diff:
text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff) text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff)

@ -56,10 +56,10 @@ def _args():
) )
argp.add_argument( argp.add_argument(
'-r', '-r',
'--repetitions', '--regex',
type=int, type=str,
default=1, default="",
help='Number of repetitions to pass to the benchmarks') help='Regex to filter benchmarks run')
argp.add_argument( argp.add_argument(
'-l', '-l',
'--loops', '--loops',
@ -77,18 +77,17 @@ def _args():
return args return args
def _collect_bm_data(bm, cfg, name, reps, idx, loops): def _collect_bm_data(bm, cfg, name, regex, idx, loops):
jobs_list = [] jobs_list = []
for line in subprocess.check_output( for line in subprocess.check_output(
['bm_diff_%s/%s/%s' % (name, cfg, bm), ['bm_diff_%s/%s/%s' % (name, cfg, bm),
'--benchmark_list_tests']).splitlines(): '--benchmark_list_tests', '--benchmark_filter=%s' % regex]).splitlines():
stripped_line = line.strip().replace("/", "_").replace( stripped_line = line.strip().replace("/", "_").replace(
"<", "_").replace(">", "_").replace(", ", "_") "<", "_").replace(">", "_").replace(", ", "_")
cmd = [ cmd = [
'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' % 'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' %
line, '--benchmark_out=%s.%s.%s.%s.%d.json' % line, '--benchmark_out=%s.%s.%s.%s.%d.json' %
(bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json', (bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json',
'--benchmark_repetitions=%d' % (reps)
] ]
jobs_list.append( jobs_list.append(
jobset.JobSpec( jobset.JobSpec(
@ -100,13 +99,13 @@ def _collect_bm_data(bm, cfg, name, reps, idx, loops):
return jobs_list return jobs_list
def run(name, benchmarks, jobs, loops, reps, counters): def run(name, benchmarks, jobs, loops, regex, counters):
jobs_list = [] jobs_list = []
for loop in range(0, loops): for loop in range(0, loops):
for bm in benchmarks: for bm in benchmarks:
jobs_list += _collect_bm_data(bm, 'opt', name, reps, loop, loops) jobs_list += _collect_bm_data(bm, 'opt', name, regex, loop, loops)
if counters: if counters:
jobs_list += _collect_bm_data(bm, 'counters', name, reps, loop, jobs_list += _collect_bm_data(bm, 'counters', name, regex, loop,
loops) loops)
random.shuffle(jobs_list, random.SystemRandom().random) random.shuffle(jobs_list, random.SystemRandom().random)
jobset.run(jobs_list, maxjobs=jobs) jobset.run(jobs_list, maxjobs=jobs)
@ -114,4 +113,4 @@ def run(name, benchmarks, jobs, loops, reps, counters):
if __name__ == '__main__': if __name__ == '__main__':
args = _args() args = _args()
run(args.name, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) run(args.name, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)

@ -41,6 +41,8 @@ BANNED_EXCEPT = {
'grpc_closure_sched(' : ['src/core/lib/iomgr/closure.c'], 'grpc_closure_sched(' : ['src/core/lib/iomgr/closure.c'],
'grpc_closure_run(' : ['src/core/lib/iomgr/closure.c'], 'grpc_closure_run(' : ['src/core/lib/iomgr/closure.c'],
'grpc_closure_list_sched(' : ['src/core/lib/iomgr/closure.c'], 'grpc_closure_list_sched(' : ['src/core/lib/iomgr/closure.c'],
'gpr_getenv_silent(' : ['src/core/lib/support/log.c', 'src/core/lib/support/env_linux.c',
'src/core/lib/support/env_posix.c', 'src/core/lib/support/env_windows.c'],
} }
errors = 0 errors = 0

Loading…
Cancel
Save