Merge remote-tracking branch 'origin/master' into dedupe_dual_stack_tests

pull/20838/head
Richard Belleville 5 years ago
commit d78563eb43
  1. 4
      .gitignore
  2. 4
      CONTRIBUTING.md
  3. 28
      src/compiler/cpp_generator.cc
  4. 2
      src/compiler/generator_helpers.h
  5. 4
      src/compiler/ruby_plugin.cc
  6. 2
      src/core/ext/filters/client_channel/backup_poller.cc
  7. 4
      src/core/ext/filters/client_channel/channel_connectivity.cc
  8. 49
      src/core/ext/filters/client_channel/client_channel.cc
  9. 2
      src/core/ext/filters/client_channel/client_channel_factory.cc
  10. 10
      src/core/ext/filters/client_channel/global_subchannel_pool.cc
  11. 22
      src/core/ext/filters/client_channel/health/health_check_client.cc
  12. 6
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  13. 8
      src/core/ext/transport/chttp2/transport/parsing.cc
  14. 2
      src/core/ext/transport/chttp2/transport/writing.cc
  15. 19
      src/core/ext/transport/inproc/inproc_transport.cc
  16. 2
      src/core/lib/avl/avl.cc
  17. 2
      src/core/lib/channel/channel_stack.cc
  18. 20
      src/core/lib/channel/channel_stack.h
  19. 4
      src/core/lib/channel/channelz.cc
  20. 5
      src/core/lib/channel/connected_channel.cc
  21. 5
      src/core/lib/compression/message_compress.cc
  22. 12
      src/core/lib/compression/stream_compression_identity.cc
  23. 2
      src/core/lib/surface/channel_ping.cc
  24. 23
      src/core/lib/surface/completion_queue.cc
  25. 2
      src/core/lib/surface/completion_queue_factory.cc
  26. 2
      src/core/lib/surface/init.cc
  27. 4
      src/core/lib/surface/init_secure.cc
  28. 8
      src/core/lib/surface/lame_client.cc
  29. 19
      src/core/lib/surface/server.cc
  30. 4
      src/core/lib/transport/byte_stream.cc
  31. 2
      src/core/lib/transport/connectivity_state.cc
  32. 6
      src/core/lib/transport/metadata_batch.cc
  33. 20
      src/cpp/thread_manager/thread_manager.cc
  34. 4
      src/cpp/thread_manager/thread_manager.h
  35. 6
      src/python/grpcio_tests/commands.py
  36. 52
      src/python/grpcio_tests/tests/_runner.py
  37. 3
      src/python/grpcio_tests/tests_aio/tests.json
  38. 26
      src/python/grpcio_tests/tests_aio/unit/BUILD.bazel
  39. 29
      src/python/grpcio_tests/tests_aio/unit/_test_base.py
  40. 35
      src/python/grpcio_tests/tests_aio/unit/_test_server.py
  41. 18
      src/python/grpcio_tests/tests_aio/unit/channel_test.py
  42. 10
      src/python/grpcio_tests/tests_aio/unit/init_test.py
  43. 7
      src/python/grpcio_tests/tests_aio/unit/server_test.py
  44. 92
      src/python/grpcio_tests/tests_aio/unit/test_base.py
  45. 2
      test/core/end2end/tests/max_concurrent_streams.cc
  46. 2
      test/core/end2end/tests/max_connection_idle.cc
  47. 2
      test/core/end2end/tests/negative_deadline.cc
  48. 2
      test/core/end2end/tests/payload.cc
  49. 2
      test/core/end2end/tests/proxy_auth.cc
  50. 2
      test/core/end2end/tests/registered_call.cc
  51. 2
      test/core/end2end/tests/server_finishes_request.cc
  52. 2
      test/core/end2end/tests/simple_delayed_request.cc
  53. 2
      test/core/gpr/alloc_test.cc
  54. 2
      test/core/iomgr/buffer_list_test.cc
  55. 2
      test/core/iomgr/ev_epollex_linux_test.cc
  56. 2
      test/core/iomgr/tcp_client_uv_test.cc
  57. 2
      test/core/iomgr/tcp_server_uv_test.cc
  58. 6
      test/core/json/json_stream_error_test.cc
  59. 4
      test/core/security/check_gcp_environment_linux_test.cc
  60. 2
      test/core/security/check_gcp_environment_windows_test.cc
  61. 2
      test/core/security/grpc_alts_credentials_options_test.cc
  62. 2
      test/core/security/oauth2_utils.cc
  63. 2
      test/core/transport/chttp2/bin_encoder_test.cc
  64. 3
      tools/run_tests/helper_scripts/build_python.sh
  65. 81
      tools/run_tests/run_tests.py
  66. 2
      tools/run_tests/run_tests_matrix.py

4
.gitignore vendored

@ -147,3 +147,7 @@ cmake-build-debug/
# Benchmark outputs
BenchmarkDotNet.Artifacts/
# pyenv config
.python-version

@ -72,6 +72,10 @@ How to get your contributions merged smoothly and quickly.
- Don't fix code style and formatting unless you are already changing that line
to address an issue. PRs with irrelevant changes won't be merged. If you do
want to fix formatting or style, do that in a separate PR.
- If you are adding a new file, make sure it has the copyright message template
at the top as a comment. You can copy over the message from an existing file
and update the year.
- Unless your PR is trivial, you should expect there will be reviewer comments
that you'll need to address before merging. We expect you to be reasonably

@ -584,7 +584,7 @@ void PrintHeaderClientMethod(grpc_generator::Printer* printer,
void PrintHeaderClientMethodCallbackInterfacesStart(
grpc_generator::Printer* printer,
std::map<grpc::string, grpc::string>* vars) {
std::map<grpc::string, grpc::string>* /*vars*/) {
// This declares the interface for the callback-based API. The components
// are pure; even though this is new (post-1.0) API, it can be pure because
// it is an entirely new interface that happens to be scoped within
@ -599,10 +599,7 @@ void PrintHeaderClientMethodCallbackInterfacesStart(
void PrintHeaderClientMethodCallbackInterfaces(
grpc_generator::Printer* printer, const grpc_generator::Method* method,
std::map<grpc::string, grpc::string>* vars, bool is_public) {
// Reserve is_public for future expansion
assert(is_public);
std::map<grpc::string, grpc::string>* vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@ -646,7 +643,7 @@ void PrintHeaderClientMethodCallbackInterfaces(
void PrintHeaderClientMethodCallbackInterfacesEnd(
grpc_generator::Printer* printer,
std::map<grpc::string, grpc::string>* vars) {
std::map<grpc::string, grpc::string>* /*vars*/) {
printer->Outdent();
printer->Print("};\n");
@ -662,7 +659,7 @@ void PrintHeaderClientMethodCallbackInterfacesEnd(
void PrintHeaderClientMethodCallbackStart(
grpc_generator::Printer* printer,
std::map<grpc::string, grpc::string>* vars) {
std::map<grpc::string, grpc::string>* /*vars*/) {
// This declares the stub entry for the callback-based API.
printer->Print("class experimental_async final :\n");
printer->Print(" public StubInterface::experimental_async_interface {\n");
@ -670,13 +667,9 @@ void PrintHeaderClientMethodCallbackStart(
printer->Indent();
}
void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
const grpc_generator::Method* method,
std::map<grpc::string, grpc::string>* vars,
bool is_public) {
// Reserve is_public for future expansion
assert(is_public);
void PrintHeaderClientMethodCallback(
grpc_generator::Printer* printer, const grpc_generator::Method* method,
std::map<grpc::string, grpc::string>* vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name();
@ -723,7 +716,7 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
void PrintHeaderClientMethodCallbackEnd(
grpc_generator::Printer* printer,
std::map<grpc::string, grpc::string>* vars) {
std::map<grpc::string, grpc::string>* /*vars*/) {
printer->Outdent();
printer->Print(" private:\n");
printer->Indent();
@ -1372,7 +1365,7 @@ void PrintHeaderService(grpc_generator::Printer* printer,
for (int i = 0; i < service->method_count(); ++i) {
printer->Print(service->method(i)->GetLeadingComments("//").c_str());
PrintHeaderClientMethodCallbackInterfaces(printer, service->method(i).get(),
vars, true);
vars);
printer->Print(service->method(i)->GetTrailingComments("//").c_str());
}
PrintHeaderClientMethodCallbackInterfacesEnd(printer, vars);
@ -1397,8 +1390,7 @@ void PrintHeaderService(grpc_generator::Printer* printer,
}
PrintHeaderClientMethodCallbackStart(printer, vars);
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethodCallback(printer, service->method(i).get(), vars,
true);
PrintHeaderClientMethodCallback(printer, service->method(i).get(), vars);
}
PrintHeaderClientMethodCallbackEnd(printer, vars);
printer->Outdent();

@ -166,7 +166,7 @@ inline MethodType GetMethodType(
}
}
inline void Split(const grpc::string& s, char delim,
inline void Split(const grpc::string& s, char /*delim*/,
std::vector<grpc::string>* append_to) {
std::istringstream iss(s);
grpc::string piece;

@ -30,9 +30,9 @@ class RubyGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
~RubyGrpcGenerator() {}
bool Generate(const grpc::protobuf::FileDescriptor* file,
const grpc::string& parameter,
const grpc::string& /*parameter*/,
grpc::protobuf::compiler::GeneratorContext* context,
grpc::string* error) const {
grpc::string* /*error*/) const {
grpc::string code = grpc_ruby_generator::GetServices(file);
if (code.size() == 0) {
return true; // don't generate a file if there are no services

@ -89,7 +89,7 @@ static void backup_poller_shutdown_unref(backup_poller* p) {
}
}
static void done_poller(void* arg, grpc_error* error) {
static void done_poller(void* arg, grpc_error* /*error*/) {
backup_poller_shutdown_unref(static_cast<backup_poller*>(arg));
}

@ -90,7 +90,7 @@ static void delete_state_watcher(state_watcher* w) {
gpr_free(w);
}
static void finished_completion(void* pw, grpc_cq_completion* ignored) {
static void finished_completion(void* pw, grpc_cq_completion* /*ignored*/) {
bool should_delete = false;
state_watcher* w = static_cast<state_watcher*>(pw);
gpr_mu_lock(&w->mu);
@ -198,7 +198,7 @@ typedef struct watcher_timer_init_arg {
gpr_timespec deadline;
} watcher_timer_init_arg;
static void watcher_timer_init(void* arg, grpc_error* error_ignored) {
static void watcher_timer_init(void* arg, grpc_error* /*error_ignored*/) {
watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(arg);
grpc_timer_init(&wa->w->alarm, grpc_timespec_to_millis_round_up(wa->deadline),

@ -584,10 +584,10 @@ class CallData {
// A predicate type and some useful implementations for PendingBatchesFail().
typedef bool (*YieldCallCombinerPredicate)(
const CallCombinerClosureList& closures);
static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
return true;
}
static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
return false;
}
static bool YieldCallCombinerIfPendingBatchesFound(
@ -633,8 +633,8 @@ class CallData {
// Sets *status and *server_pushback_md based on md_batch and error.
// Only sets *server_pushback_md if server_pushback_md != nullptr.
void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
grpc_error* error, grpc_status_code* status,
void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md);
// Adds recv_trailing_metadata_ready closure to closures.
void AddClosureForRecvTrailingMetadataReady(
@ -663,10 +663,10 @@ class CallData {
// Adds the on_complete closure for the pending batch completed in
// batch_data to closures.
void AddClosuresForCompletedPendingBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, grpc_error* error,
CallCombinerClosureList* closures);
void AddClosuresForCompletedPendingBatch(grpc_call_element* elem,
SubchannelCallBatchData* batch_data,
grpc_error* error,
CallCombinerClosureList* closures);
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
@ -1052,7 +1052,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
private:
static void ApplyUpdateInControlPlaneCombiner(void* arg,
grpc_error* error) {
grpc_error* /*error*/) {
Updater* self = static_cast<Updater*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1188,7 +1188,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
}
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
void* arg, grpc_error* ignored) {
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
// This assumes that the closure is scheduled on the ExecCtx scheduler
@ -1201,7 +1201,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
}
void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked(
void* arg, grpc_error* ignored) {
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
self->chand_->state_tracker_.RemoveWatcher(self);
@ -1228,7 +1228,7 @@ class ChannelData::ConnectivityWatcherAdder {
}
private:
static void AddWatcherLocked(void* arg, grpc_error* error) {
static void AddWatcherLocked(void* arg, grpc_error* /*error*/) {
ConnectivityWatcherAdder* self =
static_cast<ConnectivityWatcherAdder*>(arg);
self->chand_->state_tracker_.AddWatcher(self->initial_state_,
@ -1262,7 +1262,7 @@ class ChannelData::ConnectivityWatcherRemover {
}
private:
static void RemoveWatcherLocked(void* arg, grpc_error* error) {
static void RemoveWatcherLocked(void* arg, grpc_error* /*error*/) {
ConnectivityWatcherRemover* self =
static_cast<ConnectivityWatcherRemover*>(arg);
self->chand_->state_tracker_.RemoveWatcher(self->watcher_);
@ -1809,7 +1809,7 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
return result.error;
}
void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
grpc_channel_element* elem =
static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
@ -1937,7 +1937,7 @@ ChannelData::GetConnectedSubchannelInDataPlane(
return connected_subchannel->Ref();
}
void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
void ChannelData::TryToConnectLocked(void* arg, grpc_error* /*error_ignored*/) {
auto* chand = static_cast<ChannelData*>(arg);
if (chand->resolving_lb_policy_ != nullptr) {
chand->resolving_lb_policy_->ExitIdleLocked();
@ -2050,7 +2050,7 @@ grpc_error* CallData::Init(grpc_call_element* elem,
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
CallData* calld = static_cast<CallData*>(elem->call_data);
if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
@ -2445,7 +2445,7 @@ void CallData::PendingBatchesFail(
// This is called via the call combiner, so access to calld is synchronized.
void CallData::ResumePendingBatchInCallCombiner(void* arg,
grpc_error* ignored) {
grpc_error* /*ignored*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* subchannel_call =
@ -2908,8 +2908,7 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) {
// recv_trailing_metadata handling
//
void CallData::GetCallStatus(grpc_call_element* elem,
grpc_metadata_batch* md_batch, grpc_error* error,
void CallData::GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
if (error != GRPC_ERROR_NONE) {
@ -3078,7 +3077,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
grpc_mdelem* server_pushback_md = nullptr;
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
calld->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
@ -3111,8 +3110,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
void CallData::AddClosuresForCompletedPendingBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, grpc_error* error,
CallCombinerClosureList* closures) {
grpc_error* error, CallCombinerClosureList* closures) {
PendingBatch* pending = PendingBatchFind(
elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
// Match the pending batch with the same set of send ops as the
@ -3210,7 +3208,7 @@ void CallData::OnComplete(void* arg, grpc_error* error) {
if (!retry_state->retry_dispatched) {
// Add closure for the completed pending batch, if any.
calld->AddClosuresForCompletedPendingBatch(
elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
elem, batch_data, GRPC_ERROR_REF(error), &closures);
// If needed, add a callback to start any replay or pending send ops on
// the subchannel call.
if (!retry_state->completed_recv_trailing_metadata) {
@ -3238,7 +3236,7 @@ void CallData::OnComplete(void* arg, grpc_error* error) {
// subchannel batch construction
//
void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
void CallData::StartBatchInCallCombiner(void* arg, grpc_error* /*ignored*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* subchannel_call =
@ -3608,7 +3606,8 @@ void CallData::AddSubchannelBatchesForPendingBatches(
}
}
void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
void CallData::StartRetriableSubchannelBatches(void* arg,
grpc_error* /*ignored*/) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);

@ -29,7 +29,7 @@ namespace grpc_core {
namespace {
void* factory_arg_copy(void* f) { return f; }
void factory_arg_destroy(void* f) {}
void factory_arg_destroy(void* /*f*/) {}
int factory_arg_cmp(void* factory1, void* factory2) {
return GPR_ICMP(factory1, factory2);
}

@ -140,28 +140,28 @@ RefCountedPtr<GlobalSubchannelPool>* GlobalSubchannelPool::instance_ = nullptr;
namespace {
void sck_avl_destroy(void* p, void* user_data) {
void sck_avl_destroy(void* p, void* /*user_data*/) {
SubchannelKey* key = static_cast<SubchannelKey*>(p);
Delete(key);
}
void* sck_avl_copy(void* p, void* unused) {
void* sck_avl_copy(void* p, void* /*unused*/) {
const SubchannelKey* key = static_cast<const SubchannelKey*>(p);
auto* new_key = New<SubchannelKey>(*key);
return static_cast<void*>(new_key);
}
long sck_avl_compare(void* a, void* b, void* unused) {
long sck_avl_compare(void* a, void* b, void* /*unused*/) {
const SubchannelKey* key_a = static_cast<const SubchannelKey*>(a);
const SubchannelKey* key_b = static_cast<const SubchannelKey*>(b);
return key_a->Cmp(*key_b);
}
void scv_avl_destroy(void* p, void* user_data) {
void scv_avl_destroy(void* p, void* /*user_data*/) {
GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool");
}
void* scv_avl_copy(void* p, void* unused) {
void* scv_avl_copy(void* p, void* /*unused*/) {
GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool");
return p;
}

@ -377,8 +377,8 @@ void HealthCheckClient::CallState::StartCall() {
StartBatch(&recv_trailing_metadata_batch_);
}
void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
grpc_error* error) {
void HealthCheckClient::CallState::StartBatchInCallCombiner(
void* arg, grpc_error* /*error*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* call =
@ -396,21 +396,22 @@ void HealthCheckClient::CallState::StartBatch(
}
void HealthCheckClient::CallState::AfterCallStackDestruction(
void* arg, grpc_error* error) {
void* arg, grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
Delete(self);
}
void HealthCheckClient::CallState::OnCancelComplete(void* arg,
grpc_error* error) {
grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
self->call_->Unref(DEBUG_LOCATION, "cancel");
}
void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
void HealthCheckClient::CallState::StartCancel(void* arg,
grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
auto* batch = grpc_make_transport_stream_op(
@ -432,7 +433,8 @@ void HealthCheckClient::CallState::Cancel() {
}
}
void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
void HealthCheckClient::CallState::OnComplete(void* arg,
grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
@ -441,8 +443,8 @@ void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
self->call_->Unref(DEBUG_LOCATION, "on_complete");
}
void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
grpc_error* error) {
void HealthCheckClient::CallState::RecvInitialMetadataReady(
void* arg, grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
@ -524,7 +526,7 @@ void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
}
void HealthCheckClient::CallState::RecvMessageReady(void* arg,
grpc_error* error) {
grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
@ -583,7 +585,7 @@ void HealthCheckClient::CallState::RecvTrailingMetadataReady(
}
void HealthCheckClient::CallState::CallEndedRetry(void* arg,
grpc_error* error) {
grpc_error* /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
self->CallEnded(true /* retry */);

@ -246,7 +246,7 @@ void HttpConnectHandshaker::Shutdown(grpc_error* why) {
GRPC_ERROR_UNREF(why);
}
void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* acceptor,
void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* args) {
// Check for HTTP CONNECT channel arg.
@ -340,8 +340,8 @@ HttpConnectHandshaker::HttpConnectHandshaker() {
class HttpConnectHandshakerFactory : public HandshakerFactory {
public:
void AddHandshakers(const grpc_channel_args* args,
grpc_pollset_set* interested_parties,
void AddHandshakers(const grpc_channel_args* /*args*/,
grpc_pollset_set* /*interested_parties*/,
HandshakeManager* handshake_mgr) override {
handshake_mgr->Add(MakeRefCounted<HttpConnectHandshaker>());
}

@ -312,13 +312,13 @@ static grpc_error* init_frame_parser(grpc_chttp2_transport* t) {
}
}
static grpc_error* skip_parser(void* parser, grpc_chttp2_transport* t,
grpc_chttp2_stream* s, const grpc_slice& slice,
int is_last) {
static grpc_error* skip_parser(void* /*parser*/, grpc_chttp2_transport* /*t*/,
grpc_chttp2_stream* /*s*/,
const grpc_slice& /*slice*/, int /*is_last*/) {
return GRPC_ERROR_NONE;
}
static grpc_error* skip_header(void* tp, grpc_mdelem md) {
static grpc_error* skip_header(void* /*tp*/, grpc_mdelem md) {
GRPC_MDELEM_UNREF(md);
return GRPC_ERROR_NONE;
}

@ -176,7 +176,7 @@ static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
}
/* How many bytes would we like to put on the wire during a single syscall */
static uint32_t target_write_size(grpc_chttp2_transport* t) {
static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
return 1024 * 1024;
}

@ -900,7 +900,7 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
void do_nothing(void* arg, grpc_error* error) {}
void do_nothing(void* /*arg*/, grpc_error* /*error*/) {}
void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
@ -1140,7 +1140,7 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
gpr_mu_unlock(&t->mu->mu);
}
void destroy_stream(grpc_transport* gt, grpc_stream* gs,
void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
@ -1162,16 +1162,17 @@ void destroy_transport(grpc_transport* gt) {
* INTEGRATION GLUE
*/
void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) {
void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
grpc_pollset* /*pollset*/) {
// Nothing to do here
}
void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
grpc_pollset_set* pollset_set) {
void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
grpc_pollset_set* /*pollset_set*/) {
// Nothing to do here
}
grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
const grpc_transport_vtable inproc_vtable = {
sizeof(inproc_stream), "inproc", init_stream,
@ -1183,9 +1184,9 @@ const grpc_transport_vtable inproc_vtable = {
* Main inproc transport functions
*/
void inproc_transports_create(grpc_transport** server_transport,
const grpc_channel_args* server_args,
const grpc_channel_args* /*server_args*/,
grpc_transport** client_transport,
const grpc_channel_args* client_args) {
const grpc_channel_args* /*client_args*/) {
INPROC_LOG(GPR_INFO, "inproc_transports_create");
shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
inproc_transport* st = new (gpr_malloc(sizeof(*st)))
@ -1221,7 +1222,7 @@ void grpc_inproc_transport_init(void) {
grpc_channel* grpc_inproc_channel_create(grpc_server* server,
grpc_channel_args* args,
void* reserved) {
void* /*reserved*/) {
GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
(server, args));

@ -294,7 +294,7 @@ grpc_avl grpc_avl_remove(grpc_avl avl, void* key, void* user_data) {
return avl;
}
grpc_avl grpc_avl_ref(grpc_avl avl, void* user_data) {
grpc_avl grpc_avl_ref(grpc_avl avl, void* /*user_data*/) {
ref_node(avl.root);
return avl;
}

@ -203,7 +203,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
}
void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_call_element* elem, grpc_polling_entity* pollent) {}
grpc_call_element* /*elem*/, grpc_polling_entity* /*pollent*/) {}
void grpc_call_stack_destroy(grpc_call_stack* stack,
const grpc_call_final_info* final_info,

@ -235,13 +235,25 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
grpc_stream_unref(&(channel_stack)->refcount, reason)
#else
#define GRPC_CALL_STACK_REF(call_stack, reason) \
grpc_stream_ref(&(call_stack)->refcount)
do { \
grpc_stream_ref(&(call_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CALL_STACK_UNREF(call_stack, reason) \
grpc_stream_unref(&(call_stack)->refcount)
do { \
grpc_stream_unref(&(call_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
grpc_stream_ref(&(channel_stack)->refcount)
do { \
grpc_stream_ref(&(channel_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
grpc_stream_unref(&(channel_stack)->refcount)
do { \
grpc_stream_unref(&(channel_stack)->refcount); \
(void)(reason); \
} while (0);
#endif
/* Destroy a call stack */

@ -54,7 +54,7 @@ namespace channelz {
namespace {
void* parent_uuid_copy(void* p) { return p; }
void parent_uuid_destroy(void* p) {}
void parent_uuid_destroy(void* /*p*/) {}
int parent_uuid_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); }
const grpc_arg_pointer_vtable parent_uuid_vtable = {
parent_uuid_copy, parent_uuid_destroy, parent_uuid_cmp};
@ -315,7 +315,7 @@ void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {
// ServerNode
//
ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes)
ServerNode::ServerNode(grpc_server* /*server*/, size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kServer, /* name */ nullptr),
trace_(channel_tracer_max_nodes) {}

@ -167,7 +167,7 @@ static void set_pollset_or_pollset_set(grpc_call_element* elem,
/* Destructor for call_data */
static void connected_channel_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -195,7 +195,8 @@ static void connected_channel_destroy_channel_elem(grpc_channel_element* elem) {
/* No-op. */
static void connected_channel_get_channel_info(
grpc_channel_element* elem, const grpc_channel_info* channel_info) {}
grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {
}
const grpc_channel_filter grpc_connected_filter = {
connected_channel_start_transport_stream_op_batch,

@ -80,11 +80,12 @@ error:
return 0;
}
static void* zalloc_gpr(void* opaque, unsigned int items, unsigned int size) {
static void* zalloc_gpr(void* /*opaque*/, unsigned int items,
unsigned int size) {
return gpr_malloc(items * size);
}
static void zfree_gpr(void* opaque, void* address) { gpr_free(address); }
static void zfree_gpr(void* /*opaque*/, void* address) { gpr_free(address); }
static int zlib_compress(grpc_slice_buffer* input, grpc_slice_buffer* output,
int gzip) {

@ -47,12 +47,10 @@ static void grpc_stream_compression_pass_through(grpc_slice_buffer* in,
}
}
static bool grpc_stream_compress_identity(grpc_stream_compression_context* ctx,
grpc_slice_buffer* in,
grpc_slice_buffer* out,
size_t* output_size,
size_t max_output_size,
grpc_stream_compression_flush flush) {
static bool grpc_stream_compress_identity(
grpc_stream_compression_context* ctx, grpc_slice_buffer* in,
grpc_slice_buffer* out, size_t* output_size, size_t max_output_size,
grpc_stream_compression_flush /*flush*/) {
if (ctx == nullptr) {
return false;
}
@ -84,7 +82,7 @@ grpc_stream_compression_context_create_identity(
}
static void grpc_stream_compression_context_destroy_identity(
grpc_stream_compression_context* ctx) {
grpc_stream_compression_context* /*ctx*/) {
return;
}

@ -35,7 +35,7 @@ typedef struct {
grpc_cq_completion completion_storage;
} ping_result;
static void ping_destroy(void* arg, grpc_cq_completion* storage) {
static void ping_destroy(void* arg, grpc_cq_completion* /*storage*/) {
gpr_free(arg);
}

@ -529,7 +529,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
}
static void cq_init_next(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
void* data,
grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
new (data) cq_next_data();
}
@ -539,7 +540,8 @@ static void cq_destroy_next(void* data) {
}
static void cq_init_pluck(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
void* data,
grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
new (data) cq_pluck_data();
}
@ -582,7 +584,7 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) {
cq->owning_refs.Ref(debug_location, reason);
}
static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) {
grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
@ -630,20 +632,21 @@ static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
GPR_ASSERT(found);
}
#else
static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
bool /*lock_cq*/) {}
#endif
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
}
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
}
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
}
@ -669,7 +672,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
static void cq_end_op_for_next(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) {
grpc_cq_completion* storage, bool /*internal*/) {
GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
@ -748,7 +751,7 @@ static void cq_end_op_for_next(
static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) {
grpc_cq_completion* storage, bool /*internal*/) {
GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
@ -936,7 +939,7 @@ static void dump_pending_tags(grpc_completion_queue* cq) {
gpr_free(out);
}
#else
static void dump_pending_tags(grpc_completion_queue* cq) {}
static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
#endif
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,

@ -28,7 +28,7 @@
*/
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_factory* /*factory*/,
const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal(
attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);

@ -197,7 +197,7 @@ void grpc_shutdown_internal_locked(void) {
grpc_destroy_static_metadata_ctx();
}
void grpc_shutdown_internal(void* ignored) {
void grpc_shutdown_internal(void* /*ignored*/) {
GRPC_API_TRACE("grpc_shutdown_internal", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
// We have released lock from the shutdown thread and it is possible that

@ -37,7 +37,7 @@
void grpc_security_pre_init(void) {}
static bool maybe_prepend_client_auth_filter(
grpc_channel_stack_builder* builder, void* arg) {
grpc_channel_stack_builder* builder, void* /*arg*/) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {
@ -52,7 +52,7 @@ static bool maybe_prepend_client_auth_filter(
}
static bool maybe_prepend_server_auth_filter(
grpc_channel_stack_builder* builder, void* arg) {
grpc_channel_stack_builder* builder, void* /*arg*/) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {

@ -94,8 +94,8 @@ static void lame_start_transport_stream_op_batch(
calld->call_combiner);
}
static void lame_get_channel_info(grpc_channel_element* elem,
const grpc_channel_info* channel_info) {}
static void lame_get_channel_info(grpc_channel_element* /*elem*/,
const grpc_channel_info* /*channel_info*/) {}
static void lame_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) {
@ -133,8 +133,8 @@ static grpc_error* lame_init_call_elem(grpc_call_element* elem,
return GRPC_ERROR_NONE;
}
static void lame_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
static void lame_destroy_call_elem(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}

@ -302,7 +302,7 @@ struct shutdown_cleanup_args {
grpc_slice slice;
};
static void shutdown_cleanup(void* arg, grpc_error* error) {
static void shutdown_cleanup(void* arg, grpc_error* /*error*/) {
struct shutdown_cleanup_args* a =
static_cast<struct shutdown_cleanup_args*>(arg);
grpc_slice_unref_internal(a->slice);
@ -367,7 +367,7 @@ static void request_matcher_destroy(request_matcher* rm) {
gpr_free(rm->requests_per_cq);
}
static void kill_zombie(void* elem, grpc_error* error) {
static void kill_zombie(void* elem, grpc_error* /*error*/) {
grpc_call_unref(
grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
}
@ -449,7 +449,7 @@ static void orphan_channel(channel_data* chand) {
chand->next = chand->prev = chand;
}
static void finish_destroy_channel(void* cd, grpc_error* error) {
static void finish_destroy_channel(void* cd, grpc_error* /*error*/) {
channel_data* chand = static_cast<channel_data*>(cd);
grpc_server* server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
@ -477,7 +477,7 @@ static void destroy_channel(channel_data* chand) {
op);
}
static void done_request_event(void* req, grpc_cq_completion* c) {
static void done_request_event(void* req, grpc_cq_completion* /*c*/) {
gpr_free(req);
}
@ -672,7 +672,8 @@ static int num_listeners(grpc_server* server) {
return n;
}
static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
static void done_shutdown_event(void* server,
grpc_cq_completion* /*completion*/) {
server_unref(static_cast<grpc_server*>(server));
}
@ -850,7 +851,7 @@ static void got_initial_metadata(void* ptr, grpc_error* error) {
}
}
static void accept_stream(void* cd, grpc_transport* transport,
static void accept_stream(void* cd, grpc_transport* /*transport*/,
const void* transport_server_data) {
channel_data* chand = static_cast<channel_data*>(cd);
/* create a call */
@ -895,8 +896,8 @@ static grpc_error* server_init_call_elem(grpc_call_element* elem,
}
static void server_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->~call_data();
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -1258,7 +1259,7 @@ void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
gpr_free(storage);
}
static void listener_destroy_done(void* s, grpc_error* error) {
static void listener_destroy_done(void* s, grpc_error* /*error*/) {
grpc_server* server = static_cast<grpc_server*>(s);
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;

@ -53,8 +53,8 @@ void SliceBufferByteStream::Orphan() {
// filter stack.
}
bool SliceBufferByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
bool SliceBufferByteStream::Next(size_t /*max_size_hint*/,
grpc_closure* /*on_complete*/) {
GPR_DEBUG_ASSERT(backing_buffer_.count > 0);
return true;
}

@ -72,7 +72,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
}
private:
static void SendNotification(void* arg, grpc_error* ignored) {
static void SendNotification(void* arg, grpc_error* /*ignored*/) {
Notifier* self = static_cast<Notifier*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s",

@ -50,6 +50,9 @@ static void assert_valid_list(grpc_mdelem_list* list) {
verified_count++;
}
GPR_ASSERT(list->count == verified_count);
#else
// Avoid unused-parameter warning for debug-only parameter
(void)list;
#endif /* NDEBUG */
}
@ -64,6 +67,9 @@ static void assert_valid_callouts(grpc_metadata_batch* batch) {
}
grpc_slice_unref_internal(key_interned);
}
#else
// Avoid unused-parameter warning for debug-only parameter
(void)batch;
#endif
}

@ -34,8 +34,10 @@ ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
thd_ = grpc_core::Thread(
"grpcpp_sync_server",
[](void* th) { static_cast<ThreadManager::WorkerThread*>(th)->Run(); },
this);
thd_.Start();
this, &created_);
if (!created_) {
gpr_log(GPR_ERROR, "Could not create grpc_sync_server worker-thread");
}
}
void ThreadManager::WorkerThread::Run() {
@ -139,7 +141,9 @@ void ThreadManager::Initialize() {
}
for (int i = 0; i < min_pollers_; i++) {
new WorkerThread(this);
WorkerThread* worker = new WorkerThread(this);
GPR_ASSERT(worker->created()); // Must be able to create the minimum
worker->Start();
}
}
@ -177,7 +181,15 @@ void ThreadManager::MainWorkLoop() {
}
// Drop lock before spawning thread to avoid contention
lock.Unlock();
new WorkerThread(this);
WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
worker->Start();
} else {
num_pollers_--;
num_threads_--;
resource_exhausted = true;
delete worker;
}
} else if (num_pollers_ > 0) {
// There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would

@ -124,6 +124,9 @@ class ThreadManager {
WorkerThread(ThreadManager* thd_mgr);
~WorkerThread();
bool created() const { return created_; }
void Start() { thd_.Start(); }
private:
// Calls thd_mgr_->MainWorkLoop() and once that completes, calls
// thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
@ -131,6 +134,7 @@ class ThreadManager {
ThreadManager* const thd_mgr_;
grpc_core::Thread thd_;
bool created_;
};
// The main function in ThreadManager

@ -20,7 +20,6 @@ import os.path
import platform
import re
import shutil
import subprocess
import sys
import setuptools
@ -125,7 +124,10 @@ class TestAio(setuptools.Command):
import tests
loader = tests.Loader()
loader.loadTestsFromNames(['tests_aio'])
runner = tests.Runner()
# Even without dedicated threads, the framework will somehow spawn a
# new thread for tests to run upon. New thread doesn't have event loop
# attached by default, so initialization is needed.
runner = tests.Runner(dedicated_threads=False)
result = runner.run(loader.suite)
if not result.wasSuccessful():
sys.exit('Test failure')

@ -117,8 +117,15 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
class Runner(object):
def __init__(self):
def __init__(self, dedicated_threads=False):
"""Constructs the Runner object.
Args:
dedicated_threads: A bool indicates whether to spawn each unit test
in separate thread or not.
"""
self._skipped_tests = []
self._dedicated_threads = dedicated_threads
def skip_tests(self, tests):
self._skipped_tests = tests
@ -194,24 +201,31 @@ class Runner(object):
sys.stdout.write('Running {}\n'.format(
augmented_case.case.id()))
sys.stdout.flush()
case_thread = threading.Thread(
target=augmented_case.case.run, args=(result,))
try:
with stdout_pipe, stderr_pipe:
case_thread.start()
while case_thread.is_alive():
check_kill_self()
time.sleep(0)
case_thread.join()
except: # pylint: disable=try-except-raise
# re-raise the exception after forcing the with-block to end
raise
result.set_output(augmented_case.case, stdout_pipe.output(),
stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
check_kill_self()
if self._dedicated_threads:
# (Deprecated) Spawns dedicated thread for each test case.
case_thread = threading.Thread(
target=augmented_case.case.run, args=(result,))
try:
with stdout_pipe, stderr_pipe:
case_thread.start()
# If the thread is exited unexpected, stop testing.
while case_thread.is_alive():
check_kill_self()
time.sleep(0)
case_thread.join()
except: # pylint: disable=try-except-raise
# re-raise the exception after forcing the with-block to end
raise
# Records the result of the test case run.
result.set_output(augmented_case.case, stdout_pipe.output(),
stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
check_kill_self()
else:
# Donates current thread to test case execution.
augmented_case.case.run(result)
result.stopTestRun()
stdout_pipe.close()
stderr_pipe.close()

@ -2,5 +2,6 @@
"_sanity._sanity_test.AioSanityTest",
"unit.channel_test.TestChannel",
"unit.init_test.TestAioRpcError",
"unit.init_test.TestInsecureChannel"
"unit.init_test.TestInsecureChannel",
"unit.server_test.TestServer"
]

@ -17,27 +17,25 @@ package(
default_visibility = ["//visibility:public"],
)
GRPC_ASYNC_TESTS = [
"server_test.py",
"init_test.py",
"channel_test.py",
]
GRPC_ASYNC_TESTS = glob(["*_test.py"])
py_library(
name = "test_base",
srcs = ["test_base.py"],
name = "_test_base",
srcs_version = "PY3",
srcs = ["_test_base.py"],
)
py_library(
name = "sync_server",
srcs = ["sync_server.py"],
name = "_test_server",
srcs_version = "PY3",
srcs = ["_test_server.py"],
deps = [
"//src/proto/grpc/testing:empty_py_pb2",
"//src/python/grpcio/grpc:grpcio",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
],
srcs_version = "PY3",
"//src/proto/grpc/testing:empty_py_pb2",
]
)
[
@ -48,13 +46,13 @@ py_library(
main=test_file_name,
python_version="PY3",
deps=[
":_test_server",
":_test_base",
"//src/python/grpcio/grpc:grpcio",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:benchmark_service_py_pb2_grpc",
"//src/proto/grpc/testing:benchmark_service_py_pb2",
"//src/python/grpcio_tests/tests/unit/framework/common",
":test_base",
":sync_server",
"//external:six"
],
imports=["../../",],

@ -0,0 +1,29 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import unittest
from grpc.experimental import aio
class AioTestBase(unittest.TestCase):
def setUp(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
aio.init_grpc_aio()
@property
def loop(self):
return self._loop

@ -1,4 +1,4 @@
# Copyright 2019 gRPC authors.
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -18,38 +18,27 @@ from concurrent import futures
from time import sleep
import grpc
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
from tests.unit.framework.common import test_constants
# TODO (https://github.com/grpc/grpc/issues/19762)
# Change for an asynchronous server version once it's implemented.
class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
class _TestServiceServicer(test_pb2_grpc.TestServiceServicer):
def UnaryCall(self, request, context):
async def UnaryCall(self, request, context):
return messages_pb2.SimpleResponse()
def EmptyCall(self, request, context):
async def EmptyCall(self, request, context):
while True:
sleep(test_constants.LONG_TIMEOUT)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Synchronous gRPC server.')
parser.add_argument(
'--host_and_port',
required=True,
type=str,
nargs=1,
help='the host and port to listen.')
args = parser.parse_args()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(('grpc.so_reuseport', 1),))
test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(),
async def start_test_server():
server = aio.server(options=(('grpc.so_reuseport', 0),))
test_pb2_grpc.add_TestServiceServicer_to_server(_TestServiceServicer(),
server)
server.add_insecure_port(args.host_and_port[0])
server.start()
server.wait_for_termination()
port = server.add_insecure_port('[::]:0')
await server.start()
# NOTE(lidizheng) returning the server to prevent it from deallocation
return 'localhost:%d' % port, server

@ -12,23 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import unittest
import grpc
from grpc.experimental import aio
from tests_aio.unit import test_base
from src.proto.grpc.testing import messages_pb2
from tests.unit.framework.common import test_constants
from tests_aio.unit._test_server import start_test_server
from tests_aio.unit._test_base import AioTestBase
class TestChannel(test_base.AioTestBase):
class TestChannel(AioTestBase):
def test_async_context(self):
async def coro():
async with aio.insecure_channel(self.server_target) as channel:
server_target, unused_server = await start_test_server()
async with aio.insecure_channel(server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.
@ -42,7 +46,9 @@ class TestChannel(test_base.AioTestBase):
def test_unary_unary(self):
async def coro():
channel = aio.insecure_channel(self.server_target)
server_target, unused_server = await start_test_server()
channel = aio.insecure_channel(server_target)
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
@ -58,7 +64,9 @@ class TestChannel(test_base.AioTestBase):
def test_unary_call_times_out(self):
async def coro():
async with aio.insecure_channel(self.server_target) as channel:
server_target, unused_server = await start_test_server()
async with aio.insecure_channel(server_target) as channel:
empty_call_with_sleep = channel.unary_unary(
"/grpc.testing.TestService/EmptyCall",
request_serializer=messages_pb2.SimpleRequest.

@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import unittest
import grpc
from grpc.experimental import aio
from tests_aio.unit import test_base
from tests_aio.unit._test_server import start_test_server
from tests_aio.unit._test_base import AioTestBase
class TestAioRpcError(unittest.TestCase):
@ -59,12 +61,14 @@ class TestAioRpcError(unittest.TestCase):
second_aio_rpc_error.__class__)
class TestInsecureChannel(test_base.AioTestBase):
class TestInsecureChannel(AioTestBase):
def test_insecure_channel(self):
async def coro():
channel = aio.insecure_channel(self.server_target)
server_target, unused_server = await start_test_server()
channel = aio.insecure_channel(server_target)
self.assertIsInstance(channel, aio.Channel)
self.loop.run_until_complete(coro())

@ -20,6 +20,7 @@ import grpc
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import benchmark_service_pb2_grpc
from tests_aio.unit._test_base import AioTestBase
_TEST_METHOD_PATH = ''
@ -37,10 +38,9 @@ class GenericHandler(grpc.GenericRpcHandler):
return grpc.unary_unary_rpc_method_handler(unary_unary)
class TestServer(unittest.TestCase):
class TestServer(AioTestBase):
def test_unary_unary(self):
loop = asyncio.get_event_loop()
async def test_unary_unary_body():
server = aio.server()
@ -53,10 +53,9 @@ class TestServer(unittest.TestCase):
response = await unary_call(_REQUEST)
self.assertEqual(response, _RESPONSE)
loop.run_until_complete(test_unary_unary_body())
self.loop.run_until_complete(test_unary_unary_body())
if __name__ == '__main__':
aio.init_grpc_aio()
logging.basicConfig()
unittest.main(verbosity=2)

@ -1,92 +0,0 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import subprocess
import asyncio
import unittest
import socket
from grpc.experimental import aio
from tests_aio.unit import sync_server
from tests.unit.framework.common import get_socket
def _get_free_loopback_tcp_port():
host, port, sock = get_socket(listen=False)
return sock, "{}:{}".format(host, port)
class _Server:
"""_Server is an wrapper for a sync-server subprocess.
The synchronous server is executed in another process which initializes
implicitly the grpc using the synchronous configuration. Both worlds
can not coexist within the same process.
"""
def __init__(self, host_and_port): # pylint: disable=W0621
self._host_and_port = host_and_port
self._handle = None
def start(self):
assert self._handle is None
try:
from google3.pyglib import resources
executable = resources.GetResourceFilename(
"google3/third_party/py/grpc/sync_server")
args = [executable, '--host_and_port', self._host_and_port]
except ImportError:
executable = sys.executable
directory, _ = os.path.split(os.path.abspath(__file__))
filename = directory + '/sync_server.py'
args = [
executable, filename, '--host_and_port', self._host_and_port
]
self._handle = subprocess.Popen(args)
def terminate(self):
if not self._handle:
return
self._handle.terminate()
self._handle.wait()
self._handle = None
class AioTestBase(unittest.TestCase):
def setUp(self):
self._socket, self._target = _get_free_loopback_tcp_port()
self._server = _Server(self._target)
self._server.start()
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
aio.init_grpc_aio()
def tearDown(self):
self._server.terminate()
self._socket.close()
@property
def loop(self):
return self._loop
@property
def server_target(self):
return self._target

@ -83,7 +83,7 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f) {
grpc_call* c;
grpc_call* s;

@ -42,7 +42,7 @@ static void drain_cq(grpc_completion_queue* cq) {
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture* f) {
grpc_call* c;
grpc_call* s;

@ -86,7 +86,7 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f, size_t num_ops) {
grpc_call* c;
cq_verifier* cqv = cq_verifier_create(f.cq);

@ -100,7 +100,7 @@ static grpc_slice generate_random_slice() {
return out;
}
static void request_response_with_payload(grpc_end2end_test_config config,
static void request_response_with_payload(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f) {
/* Create large request and response bodies. These are big enough to require
* multiple round trips to deliver to the peer, and their exact contents of

@ -90,7 +90,7 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f) {
grpc_call* c;
grpc_call* s;

@ -86,7 +86,7 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f, void* rc) {
grpc_call* c;
grpc_call* s;

@ -85,7 +85,7 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
static void simple_request_body(grpc_end2end_test_config config,
static void simple_request_body(grpc_end2end_test_config /*config*/,
grpc_end2end_test_fixture f) {
grpc_call* c;
grpc_call* s;

@ -75,7 +75,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
grpc_end2end_test_fixture* f,
grpc_channel_args* client_args,
grpc_channel_args* server_args,
long delay_us) {
long /*delay_us*/) {
grpc_call* c;
grpc_call* s;
cq_verifier* cqv = cq_verifier_create(f->cq);

@ -25,7 +25,7 @@
static void* fake_malloc(size_t size) { return (void*)size; }
static void* fake_realloc(void* addr, size_t size) { return (void*)size; }
static void* fake_realloc(void* /*addr*/, size_t size) { return (void*)size; }
static void fake_free(void* addr) {
*(static_cast<intptr_t*>(addr)) = static_cast<intptr_t>(0xdeadd00d);

@ -131,6 +131,6 @@ int main(int argc, char** argv) {
#else /* GRPC_LINUX_ERRQUEUE */
int main(int argc, char** argv) { return 0; }
int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif /* GRPC_LINUX_ERRQUEUE */

@ -111,5 +111,5 @@ int main(int argc, char** argv) {
return 0;
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) && defined(GRPC_LINUX_EVENTFD) */
int main(int argc, char** argv) { return 0; }
int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif

@ -209,6 +209,6 @@ int main(int argc, char** argv) {
#else /* GRPC_UV */
int main(int argc, char** argv) { return 1; }
int main(int /*argc*/, char** /*argv*/) { return 1; }
#endif /* GRPC_UV */

@ -314,6 +314,6 @@ int main(int argc, char** argv) {
#else /* GRPC_UV */
int main(int argc, char** argv) { return 1; }
int main(int /*argc*/, char** /*argv*/) { return 1; }
#endif /* GRPC_UV */

@ -28,12 +28,14 @@
static int g_string_clear_once = 0;
static void string_clear(void* userdata) {
static void string_clear(void* /*userdata*/) {
GPR_ASSERT(!g_string_clear_once);
g_string_clear_once = 1;
}
static uint32_t read_char(void* userdata) { return GRPC_JSON_READ_CHAR_ERROR; }
static uint32_t read_char(void* /*userdata*/) {
return GRPC_JSON_READ_CHAR_ERROR;
}
static grpc_json_reader_vtable reader_vtable = {
string_clear, nullptr, nullptr, read_char, nullptr, nullptr,

@ -72,7 +72,7 @@ static void test_gcp_environment_check_failure() {
GPR_ASSERT(!check_bios_data_linux_test("\n"));
}
int main(int argc, char** argv) {
int main(int /*argc*/, char** /*argv*/) {
/* Tests. */
test_gcp_environment_check_success();
test_gcp_environment_check_failure();
@ -81,6 +81,6 @@ int main(int argc, char** argv) {
#else // GPR_LINUX
int main(int argc, char** argv) { return 0; }
int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif // GPR_LINUX

@ -87,6 +87,6 @@ int main(int argc, char** argv) {
}
#else // GPR_WINDOWS
int main(int argc, char** argv) { return 0; }
int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif // GPR_WINDOWS

@ -87,7 +87,7 @@ static void test_client_options_api_success() {
grpc_alts_credentials_options_destroy(new_options);
}
int main(int argc, char** argv) {
int main(int /*argc*/, char** /*argv*/) {
/* Test. */
test_copy_client_options_failure();
test_client_options_api_success();

@ -63,7 +63,7 @@ static void on_oauth2_response(void* arg, grpc_error* error) {
gpr_mu_unlock(request->mu);
}
static void destroy_after_shutdown(void* pollset, grpc_error* error) {
static void destroy_after_shutdown(void* pollset, grpc_error* /*error*/) {
grpc_pollset_destroy(reinterpret_cast<grpc_pollset*>(pollset));
gpr_free(pollset);
}

@ -98,7 +98,7 @@ static void expect_binary_header(const char* hdr, int binary) {
}
}
int main(int argc, char** argv) {
int main(int /*argc*/, char** /*argv*/) {
grpc_init();
/* Base64 test vectors from RFC 4648, with padding removed */

@ -122,7 +122,8 @@ export LANG=en_US.UTF-8
# Allow build_ext to build C/C++ files in parallel
# by enabling a monkeypatch. It speeds up the build a lot.
export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=4
DEFAULT_PARALLEL_JOBS=$(nproc) || DEFAULT_PARALLEL_JOBS=4
export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=${GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS:-$DEFAULT_PARALLEL_JOBS}
# If ccache is available on Linux, use it.
if [ "$(is_linux)" ]; then

@ -705,9 +705,16 @@ class PythonConfig(
class PythonLanguage(object):
_DEFAULT_COMMAND = 'test_lite'
_TEST_SPECS_FILE = 'src/python/grpcio_tests/tests/tests.json'
_TEST_FOLDER = 'test'
_TEST_SPECS_FILE = {
'native': 'src/python/grpcio_tests/tests/tests.json',
'gevent': 'src/python/grpcio_tests/tests/tests.json',
'asyncio': 'src/python/grpcio_tests/tests_aio/tests.json',
}
_TEST_FOLDER = {
'native': 'test',
'gevent': 'test',
'asyncio': 'test_aio',
}
def configure(self, config, args):
self.config = config
@ -716,7 +723,8 @@ class PythonLanguage(object):
def test_specs(self):
# load list of known test suites
with open(self._TEST_SPECS_FILE) as tests_json_file:
with open(self._TEST_SPECS_FILE[
self.args.iomgr_platform]) as tests_json_file:
tests_json = json.load(tests_json_file)
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
return [
@ -726,8 +734,9 @@ class PythonLanguage(object):
environ=dict(
list(environment.items()) + [(
'GRPC_PYTHON_TESTRUNNER_FILTER', str(suite_name))]),
shortname='%s.%s.%s' % (config.name, self._TEST_FOLDER,
suite_name),
shortname='%s.%s.%s' %
(config.name, self._TEST_FOLDER[self.args.iomgr_platform],
suite_name),
) for suite_name in tests_json for config in self.pythons
]
@ -795,9 +804,17 @@ class PythonLanguage(object):
venv_relative_python = ['bin/python']
toolchain = ['unix']
test_command = self._DEFAULT_COMMAND
if args.iomgr_platform == 'gevent':
# Selects the corresponding testing mode.
# See src/python/grpcio_tests/commands.py for implementation details.
if args.iomgr_platform == 'native':
test_command = 'test_lite'
elif args.iomgr_platform == 'gevent':
test_command = 'test_gevent'
elif args.iomgr_platform == 'asyncio':
test_command = 'test_aio'
else:
raise ValueError(
'Unsupported IO Manager platform: %s' % args.iomgr_platform)
runner = [
os.path.abspath('tools/run_tests/helper_scripts/run_python.sh')
]
@ -846,15 +863,25 @@ class PythonLanguage(object):
pypy32_config = _pypy_config_generator(
name='pypy3', major='3', config_vars=config_vars)
if args.iomgr_platform == 'asyncio':
if args.compiler not in ('default', 'python3.6', 'python3.7',
'python3.8'):
raise Exception(
'Compiler %s not supported with IO Manager platform: %s' %
(args.compiler, args.iomgr_platform))
if args.compiler == 'default':
if os.name == 'nt':
return (python35_config,)
else:
return (
python27_config,
python36_config,
python37_config,
)
if args.iomgr_platform == 'asyncio':
return (python36_config,)
else:
return (
python27_config,
python36_config,
python37_config,
)
elif args.compiler == 'python2.7':
return (python27_config,)
elif args.compiler == 'python3.4':
@ -889,31 +916,6 @@ class PythonLanguage(object):
return 'python'
class PythonAioLanguage(PythonLanguage):
_DEFAULT_COMMAND = 'test_aio'
_TEST_SPECS_FILE = 'src/python/grpcio_tests/tests_aio/tests.json'
_TEST_FOLDER = 'test_aio'
def configure(self, config, args):
self.config = config
self.args = args
self.pythons = self._get_pythons(self.args)
def _get_pythons(self, args):
"""Get python runtimes to test with, based on current platform, architecture, compiler etc."""
if args.compiler not in ('python3.6', 'python3.7', 'python3.8'):
raise Exception('Compiler %s not supported.' % args.compiler)
if args.iomgr_platform not in ('native'):
raise Exception(
'Iomgr platform %s not supported.' % args.iomgr_platform)
return super()._get_pythons(args)
def __str__(self):
return 'python_aio'
class RubyLanguage(object):
def configure(self, config, args):
@ -1321,7 +1323,6 @@ _LANGUAGES = {
'php': PhpLanguage(),
'php7': Php7Language(),
'python': PythonLanguage(),
'python-aio': PythonAioLanguage(),
'ruby': RubyLanguage(),
'csharp': CSharpLanguage(),
'objc': ObjCLanguage(),
@ -1489,7 +1490,7 @@ argp.add_argument(
)
argp.add_argument(
'--iomgr_platform',
choices=['native', 'uv', 'gevent'],
choices=['native', 'uv', 'gevent', 'asyncio'],
default='native',
help='Selects iomgr platform to build on')
argp.add_argument(

@ -231,7 +231,7 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
languages=['python'],
configs=['opt'],
platforms=['linux', 'macos', 'windows'],
iomgr_platforms=['native', 'gevent'],
iomgr_platforms=['native', 'gevent', 'asyncio'],
labels=['basictests', 'multilang'],
extra_args=extra_args + ['--report_multi_target'],
inner_jobs=inner_jobs)

Loading…
Cancel
Save