diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 505b5019683..7a1f3a2e54a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -527,6 +527,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { if (request_id == -1) { continue; } else { + gpr_log(GPR_DEBUG, "queue lockfree, retries=%d chose=%d", i, cq_idx); + gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); @@ -537,6 +539,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } /* no cq to take the request found: queue it on the slow list */ + gpr_log(GPR_DEBUG, "queue slowpath"); gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; @@ -1300,12 +1303,14 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, server->requested_calls[request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + gpr_log(GPR_DEBUG, "request against empty"); /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); if (request_id == -1) break; + gpr_log(GPR_DEBUG, "drain1"); rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1326,6 +1331,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&server->mu_call); } gpr_mu_unlock(&server->mu_call); + } else { + gpr_log(GPR_DEBUG, "request lockfree"); } return GRPC_CALL_OK; } @@ -1379,6 +1386,7 @@ grpc_call_error grpc_server_request_registered_call( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *rm = rmp; + gpr_log(GPR_DEBUG, "method: %s", rm->method); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1393,6 +1401,7 @@ grpc_call_error grpc_server_request_registered_call( break; } } + gpr_log(GPR_DEBUG, "cq_idx=%d, cq_count=%d", cq_idx, server->cq_count); if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index a19fccbb6ba..b4270070e2f 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -217,8 +217,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back( - builder.AddCompletionQueue(i == num_cqs_frequently_polled - 1)); + cqs_.push_back(builder.AddCompletionQueue(i == num_cqs_frequently_polled - 1)); } server_ = builder.BuildAndStart(); } @@ -511,12 +510,22 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) { SetUpServer(&service, nullptr, &generic_service, 3); ResetStub(); std::thread generic_handler_thread([this, &generic_service] { + gpr_log(GPR_DEBUG, "t0 start"); HandleGenericCall(&generic_service, cqs_[0].get()); + gpr_log(GPR_DEBUG, "t0 done"); }); std::thread request_stream_handler_thread( - [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + [this, &service] { + gpr_log(GPR_DEBUG, "t1 start"); + HandleClientStreaming(&service, cqs_[1].get()); + gpr_log(GPR_DEBUG, "t1 done"); + }); std::thread response_stream_handler_thread( - [this, &service] { HandleServerStreaming(&service, cqs_[2].get()); }); + [this, &service] { + gpr_log(GPR_DEBUG, "t2 start"); + HandleServerStreaming(&service, cqs_[2].get()); + gpr_log(GPR_DEBUG, "t2 done"); + }); TestAllMethods(); generic_handler_thread.join(); request_stream_handler_thread.join();