Merge branch 'master' into statuscaution

pull/16582/head
Yash Tibrewal 6 years ago
commit e085aee990
  1. 1
      AUTHORS
  2. 4
      include/grpcpp/impl/codegen/callback_common.h
  3. 12
      src/compiler/ruby_generator.cc
  4. 21
      src/core/ext/filters/client_channel/parse_address.cc
  5. 17
      src/core/lib/surface/call.cc
  6. 28
      src/cpp/common/callback_common.cc
  7. 2
      src/cpp/ext/filters/census/server_filter.cc
  8. 28
      src/proto/grpc/testing/package_options.proto
  9. 10
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  10. 1
      src/ruby/ext/grpc/rb_call.c
  11. 3
      src/ruby/ext/grpc/rb_channel.c
  12. 32
      src/ruby/ext/grpc/rb_grpc.c
  13. 2
      src/ruby/ext/grpc/rb_grpc.h
  14. 2
      src/ruby/ext/grpc/rb_server.c
  15. 2
      src/ruby/lib/grpc/generic/rpc_server.rb
  16. 44
      src/ruby/spec/channel_spec.rb
  17. 53
      src/ruby/spec/pb/codegen/package_option_spec.rb
  18. 20
      test/cpp/end2end/client_callback_end2end_test.cc
  19. 2
      test/cpp/microbenchmarks/helpers.cc
  20. 3
      test/distrib/csharp/run_distrib_test.sh
  21. 15
      tools/interop_matrix/client_matrix.py
  22. 15
      tools/run_tests/run_tests_matrix.py

@ -1 +1,2 @@
Google Inc.
WeWork Companies Inc.

@ -35,6 +35,10 @@ class CQCallbackInterface;
namespace grpc {
namespace internal {
// The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
class CallbackWithStatusTag {
public:
// always allocated against a call arena, no memory free required

@ -160,12 +160,20 @@ grpc::string GetServices(const FileDescriptor* file) {
return output;
}
std::string package_name;
if (file->options().has_ruby_package()) {
package_name = file->options().ruby_package();
} else {
package_name = file->package();
}
// Write out a file header.
std::map<grpc::string, grpc::string> header_comment_vars = ListToDict({
"file.name",
file->name(),
"file.package",
file->package(),
package_name,
});
out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n");
out.Print(header_comment_vars,
@ -190,7 +198,7 @@ grpc::string GetServices(const FileDescriptor* file) {
// Write out services within the modules
out.Print("\n");
std::vector<grpc::string> modules = Split(file->package(), '.');
std::vector<grpc::string> modules = Split(package_name, '.');
for (size_t i = 0; i < modules.size(); ++i) {
std::map<grpc::string, grpc::string> module_vars = ListToDict({
"module.name",

@ -129,30 +129,37 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
size_t host_without_scope_len = static_cast<size_t>(host_end - host);
uint32_t sin6_scope_id = 0;
if (host_without_scope_len > GRPC_INET6_ADDRSTRLEN) {
gpr_log(GPR_ERROR,
"invalid ipv6 address length %zu. Length cannot be greater than "
"GRPC_INET6_ADDRSTRLEN i.e %d)",
host_without_scope_len, GRPC_INET6_ADDRSTRLEN);
if (log_errors) {
gpr_log(
GPR_ERROR,
"invalid ipv6 address length %zu. Length cannot be greater than "
"GRPC_INET6_ADDRSTRLEN i.e %d)",
host_without_scope_len, GRPC_INET6_ADDRSTRLEN);
}
goto done;
}
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
if (log_errors) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
}
goto done;
}
if (gpr_parse_bytes_to_uint32(host_end + 1,
strlen(host) - host_without_scope_len - 1,
&sin6_scope_id) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
if (log_errors) {
gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
}
goto done;
}
// Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
} else {
if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
goto done;
}
}

@ -207,7 +207,7 @@ struct grpc_call {
grpc_server* server;
} server;
} final_op;
grpc_error* status_error;
gpr_atm status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@ -519,10 +519,12 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
grpc_error_get_status(c->status_error, c->send_deadline,
grpc_error* status_error =
reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
grpc_error_get_status(status_error, c->send_deadline,
&c->final_info.final_status, nullptr, nullptr,
&(c->final_info.error_string));
GRPC_ERROR_UNREF(c->status_error);
GRPC_ERROR_UNREF(status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@ -705,7 +707,7 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
call->final_op.client.error_string);
// explicitly take a ref
grpc_slice_ref_internal(*call->final_op.client.status_details);
call->status_error = error;
gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
@ -717,7 +719,9 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
}
} else {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
error != GRPC_ERROR_NONE ||
reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
GRPC_ERROR_NONE;
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(call->final_op.server.server);
if (channelz_server != nullptr) {
@ -1686,7 +1690,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
}
call->status_error = status_error;
gpr_atm_rel_store(&call->status_error,
reinterpret_cast<gpr_atm>(status_error));
if (!prepare_application_metadata(
call,
static_cast<int>(

@ -26,8 +26,21 @@
namespace grpc {
namespace internal {
namespace {
template <class Func, class Arg>
void CatchingCallback(Func&& func, Arg&& arg) {
#if GRPC_ALLOW_EXCEPTIONS
try {
func(arg);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func(arg);
#endif // GRPC_ALLOW_EXCEPTIONS
}
class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
public:
static void operator delete(void* ptr, std::size_t size) {
@ -52,8 +65,11 @@ class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
bool new_ok = ok;
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == parent_->ops());
func_(ok);
func_ = nullptr; // release the function
// Last use of func_ or ok, so ok to move them out for rvalue call above
CatchingCallback(std::move(func_), std::move(ok));
func_ = nullptr; // reset to clear this out for sure
grpc_call_unref(call_);
}
@ -88,8 +104,10 @@ class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == parent_->ops());
func_(status_);
func_ = nullptr; // release the function
// Last use of func_ or status_, so ok to move them out
CatchingCallback(std::move(func_), std::move(status_));
func_ = nullptr; // reset to clear this out for sure
grpc_call_unref(call_);
}
Status* status_ptr() { return &status_; }

@ -93,7 +93,7 @@ void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
FilterInitialMetadata(initial_metadata, &sml);
calld->path_ = grpc_slice_ref_internal(sml.path);
calld->method_ = GetMethod(&calld->path_);
calld->qualified_method_ = StrCat("Recv.", calld->method_);
calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
const char* tracing_str =
GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
? ""

@ -0,0 +1,28 @@
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.testing;
// For sanity checking package definitions
option ruby_package = "Grpc.Testing.Package.Options";
message TestRequest { }
message TestResponse { }
service TestService {
rpc GetTest(TestRequest) returns (TestResponse) { }
}

@ -144,8 +144,14 @@ cdef class SSLChannelCredentials(ChannelCredentials):
return grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL, NULL)
else:
c_pem_key_certificate_pair.private_key = self._private_key
c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
if self._private_key:
c_pem_key_certificate_pair.private_key = self._private_key
else:
c_pem_key_certificate_pair.private_key = NULL
if self._certificate_chain:
c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
else:
c_pem_key_certificate_pair.certificate_chain = NULL
return grpc_ssl_credentials_create(
c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL)

@ -819,6 +819,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
unsigned write_flag = 0;
void* tag = (void*)&st;
grpc_ruby_fork_guard();
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
return Qnil;

@ -217,6 +217,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
grpc_ruby_fork_guard();
rb_thread_call_without_gvl(
wait_until_channel_polling_thread_started_no_gil,
&stop_waiting_for_thread_start,
@ -374,6 +375,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
watch_state_stack stack;
void* op_success = 0;
grpc_ruby_fork_guard();
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
if (wrapper->bg_wrapped == NULL) {
@ -415,6 +417,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
grpc_slice* host_slice_ptr = NULL;
char* tmp_str = NULL;
grpc_ruby_fork_guard();
if (host != Qnil) {
host_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));

@ -23,9 +23,13 @@
#include <math.h>
#include <ruby/vm.h>
#include <stdbool.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "rb_call.h"
#include "rb_call_credentials.h"
@ -255,7 +259,26 @@ static void Init_grpc_time_consts() {
id_tv_nsec = rb_intern("tv_nsec");
}
static void grpc_rb_shutdown(void) { grpc_shutdown(); }
#if GPR_WINDOWS
static void grpc_ruby_set_init_pid(void) {}
static bool grpc_ruby_forked_after_init(void) { return false; }
#else
static pid_t grpc_init_pid;
static void grpc_ruby_set_init_pid(void) {
GPR_ASSERT(grpc_init_pid == 0);
grpc_init_pid = getpid();
}
static bool grpc_ruby_forked_after_init(void) {
GPR_ASSERT(grpc_init_pid != 0);
return grpc_init_pid != getpid();
}
#endif
static void grpc_rb_shutdown(void) {
if (!grpc_ruby_forked_after_init()) grpc_shutdown();
}
/* Initialize the GRPC module structs */
@ -276,10 +299,17 @@ VALUE sym_metadata = Qundef;
static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() {
grpc_ruby_set_init_pid();
grpc_init();
atexit(grpc_rb_shutdown);
}
void grpc_ruby_fork_guard() {
if (grpc_ruby_forked_after_init()) {
rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking");
}
}
static VALUE bg_thread_init_rb_mu = Qundef;
static int bg_thread_init_done = 0;

@ -69,4 +69,6 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval);
void grpc_ruby_once_init();
void grpc_ruby_fork_guard();
#endif /* GRPC_RB_H_ */

@ -243,6 +243,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server* s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
grpc_ruby_fork_guard();
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "destroyed!");
} else {

@ -136,7 +136,7 @@ module GRPC
begin
blk, args = worker_queue.pop
blk.call(*args)
rescue StandardError => e
rescue StandardError, GRPC::Core::CallError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end

@ -13,6 +13,7 @@
# limitations under the License.
require 'spec_helper'
require 'English'
def load_test_certs
test_root = File.join(File.dirname(__FILE__), 'testdata')
@ -27,6 +28,28 @@ describe GRPC::Core::Channel do
GRPC::Core::ChannelCredentials.new(load_test_certs[0])
end
def fork_with_propagated_error_message
pipe_read, pipe_write = IO.pipe
pid = fork do
pipe_read.close
begin
yield
rescue => exc
pipe_write.syswrite(exc.message)
end
pipe_write.close
end
pipe_write.close
exc_message = pipe_read.read
Process.wait(pid)
unless $CHILD_STATUS.success?
raise "forked process failed with #{$CHILD_STATUS}"
end
raise exc_message unless exc_message.empty?
end
shared_examples '#new' do
it 'take a host name without channel args' do
blk = proc do
@ -79,6 +102,14 @@ describe GRPC::Core::Channel do
blk = construct_with_args(args)
expect(&blk).to_not raise_error
end
it 'raises if grpc was initialized in another process' do
blk = construct_with_args({})
expect(&blk).not_to raise_error
expect do
fork_with_propagated_error_message(&blk)
end.to raise_error(RuntimeError, 'grpc cannot be used before and after forking')
end
end
describe '#new for secure channels' do
@ -121,6 +152,19 @@ describe GRPC::Core::Channel do
end
expect(&blk).to raise_error(RuntimeError)
end
it 'raises if grpc was initialized in another process' do
ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure)
deadline = Time.now + 5
blk = proc do
fork_with_propagated_error_message do
ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end
end
expect(&blk).to raise_error(RuntimeError, 'grpc cannot be used before and after forking')
end
end
describe '#destroy' do

@ -0,0 +1,53 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'spec_helper'
require 'open3'
require 'tmpdir'
describe 'Code Generation Options' do
it 'should generate and respect package options' do
fail 'CONFIG env variable unexpectedly unset' unless ENV['CONFIG']
bins_sub_dir = ENV['CONFIG']
src_dir = File.join(File.dirname(__FILE__), '..', '..', '..', '..')
pb_dir = File.join(src_dir, 'proto')
bins_dir = File.join(src_dir, '..', 'bins', bins_sub_dir)
plugin = File.join(bins_dir, 'grpc_ruby_plugin')
protoc = File.join(bins_dir, 'protobuf', 'protoc')
# Generate the service from the proto
Dir.mktmpdir(nil, File.dirname(__FILE__)) do |tmp_dir|
gen_file = system(protoc,
'-I.',
'grpc/testing/package_options.proto',
"--grpc_out=#{tmp_dir}", # generate the service
"--ruby_out=#{tmp_dir}", # generate the definitions
"--plugin=protoc-gen-grpc=#{plugin}",
chdir: pb_dir,
out: File::NULL)
expect(gen_file).to be_truthy
begin
$LOAD_PATH.push(tmp_dir)
expect { Grpc::Testing::Package::Options::TestService::Service }.to raise_error(NameError)
expect(require('grpc/testing/package_options_services_pb')).to be_truthy
expect { Grpc::Testing::Package::Options::TestService::Service }.to_not raise_error
ensure
$LOAD_PATH.delete(tmp_dir)
end
end
end
end

@ -64,7 +64,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
}
}
void SendRpcs(int num_rpcs) {
void SendRpcs(int num_rpcs, bool maybe_except) {
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
grpc::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
@ -82,7 +82,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
bool done = false;
stub_->experimental().UnaryCall(
&cli_ctx, kMethodName, send_buf.get(), &recv_buf,
[&request, &recv_buf, &done, &mu, &cv](Status s) {
[&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
GPR_ASSERT(s.ok());
EchoResponse response;
@ -91,6 +91,11 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
std::lock_guard<std::mutex> l(mu);
done = true;
cv.notify_one();
#if GRPC_ALLOW_EXCEPTIONS
if (maybe_except) {
throw - 1;
}
#endif
});
std::unique_lock<std::mutex> l(mu);
while (!done) {
@ -107,14 +112,21 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
TEST_F(ClientCallbackEnd2endTest, SimpleRpc) {
ResetStub();
SendRpcs(1);
SendRpcs(1, false);
}
TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
ResetStub();
SendRpcs(10);
SendRpcs(10, false);
}
#if GRPC_ALLOW_EXCEPTIONS
TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
SendRpcs(10, true);
}
#endif
} // namespace
} // namespace testing
} // namespace grpc

@ -38,6 +38,7 @@ void TrackCounters::AddLabel(const grpc::string& label) {
}
void TrackCounters::AddToLabel(std::ostream& out, benchmark::State& state) {
#ifdef GRPC_COLLECT_STATS
grpc_stats_data stats_end;
grpc_stats_collect(&stats_end);
grpc_stats_data stats;
@ -53,6 +54,7 @@ void TrackCounters::AddToLabel(std::ostream& out, benchmark::State& state) {
<< " " << grpc_stats_histogram_name[i] << "-99p:"
<< grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0);
}
#endif
#ifdef GPR_LOW_LEVEL_COUNTERS
grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot();
out << " locks/iter:"

@ -21,7 +21,8 @@ unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets_windows_dotnetcli.zip
./update_version.sh auto
nuget restore
# Retry "nuget restore" to work around https://github.com/grpc/grpc/issues/16312
nuget restore || nuget restore || nuget restore
xbuild DistribTest.sln

@ -99,6 +99,9 @@ LANG_RELEASE_MATRIX = {
{
'v1.14.1': None
},
{
'v1.15.0': None
},
],
'go': [
{
@ -231,6 +234,9 @@ LANG_RELEASE_MATRIX = {
{
'v1.14.1': None
},
{
'v1.15.0': None
},
],
'node': [
{
@ -319,6 +325,9 @@ LANG_RELEASE_MATRIX = {
{
'v1.14.1': None
},
{
'v1.15.0': None
},
],
'php': [
{
@ -363,6 +372,9 @@ LANG_RELEASE_MATRIX = {
{
'v1.14.1': None
},
{
'v1.15.0': None
},
],
'csharp': [
{
@ -412,6 +424,9 @@ LANG_RELEASE_MATRIX = {
{
'v1.14.1': None
},
{
'v1.15.0': None
},
],
}

@ -53,12 +53,7 @@ def _safe_report_name(name):
def _report_filename(name):
"""Generates report file name"""
return 'report_%s_%s' % (_safe_report_name(name), _REPORT_SUFFIX)
def _report_filename_internal_ci(name):
"""Generates report file name that leads to better presentation by internal CI"""
"""Generates report file name with directory structure that leads to better presentation by internal CI"""
return '%s/%s' % (_safe_report_name(name), _REPORT_SUFFIX)
@ -507,8 +502,9 @@ if __name__ == "__main__":
default=False,
action='store_const',
const=True,
help='Put reports into subdirectories to improve presentation of '
'results by Internal CI.')
help=
'(Deprecated, has no effect) Put reports into subdirectories to improve presentation of '
'results by Kokoro.')
argp.add_argument(
'--bq_result_table',
default='',
@ -517,9 +513,6 @@ if __name__ == "__main__":
help='Upload test results to a specified BQ table.')
args = argp.parse_args()
if args.internal_ci:
_report_filename = _report_filename_internal_ci # override the function
extra_args = []
if args.build_only:
extra_args.append('--build_only')

Loading…
Cancel
Save