|
|
|
@ -69,11 +69,16 @@ class ShutdownCallback : public grpc_core::CQCallbackInterface { |
|
|
|
|
gpr_cv_broadcast(&cv_); |
|
|
|
|
gpr_mu_unlock(&mu_); |
|
|
|
|
} |
|
|
|
|
void Wait(gpr_timespec deadline) { |
|
|
|
|
// The Wait function waits for a specified amount of
|
|
|
|
|
// time for the completion of the shutdown and returns
|
|
|
|
|
// whether it was successfully shut down
|
|
|
|
|
bool Wait(gpr_timespec deadline) { |
|
|
|
|
gpr_mu_lock(&mu_); |
|
|
|
|
while (!done_ && !gpr_cv_wait(&cv_, &mu_, deadline)) { |
|
|
|
|
} |
|
|
|
|
bool ret = done_; |
|
|
|
|
gpr_mu_unlock(&mu_); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -85,6 +90,12 @@ class ShutdownCallback : public grpc_core::CQCallbackInterface { |
|
|
|
|
ShutdownCallback* g_shutdown_callback; |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
// The following global structure is the tag collection. It holds
|
|
|
|
|
// all information related to tags expected and tags received
|
|
|
|
|
// during the execution, with each callback setting a tag.
|
|
|
|
|
// The tag sets are implemented and checked using arrays and
|
|
|
|
|
// linear lookups (rather than maps) so that this test doesn't
|
|
|
|
|
// need the C++ standard library.
|
|
|
|
|
static gpr_mu tags_mu; |
|
|
|
|
static gpr_cv tags_cv; |
|
|
|
|
const size_t kAvailableTags = 4; |
|
|
|
@ -93,6 +104,10 @@ bool tags_valid[kAvailableTags]; |
|
|
|
|
bool tags_expected[kAvailableTags]; |
|
|
|
|
bool tags_needed[kAvailableTags]; |
|
|
|
|
|
|
|
|
|
// Mark that a tag is expected; this function must be
|
|
|
|
|
// executed in the main thread only while there are no
|
|
|
|
|
// other threads altering the expectation set (e.g.,
|
|
|
|
|
// running callbacks).
|
|
|
|
|
static void expect_tag(intptr_t tag, bool ok) { |
|
|
|
|
size_t idx = static_cast<size_t>(tag); |
|
|
|
|
GPR_ASSERT(idx < kAvailableTags); |
|
|
|
@ -100,6 +115,10 @@ static void expect_tag(intptr_t tag, bool ok) { |
|
|
|
|
tags_expected[idx] = ok; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The tag verifier doesn't have to drive the CQ at all (unlike the
|
|
|
|
|
// next-based end2end tests) because the tags will get set when the
|
|
|
|
|
// callbacks are executed, which happens when a particular batch
|
|
|
|
|
// related to a callback is complete
|
|
|
|
|
static void verify_tags(gpr_timespec deadline) { |
|
|
|
|
bool done = false; |
|
|
|
|
|
|
|
|
@ -143,6 +162,30 @@ static void verify_tags(gpr_timespec deadline) { |
|
|
|
|
gpr_mu_unlock(&tags_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This function creates a callback functor that emits the
|
|
|
|
|
// desired tag into the global tag set
|
|
|
|
|
static grpc_core::CQCallbackInterface* tag(intptr_t t) { |
|
|
|
|
auto func = [t](bool ok) { |
|
|
|
|
gpr_mu_lock(&tags_mu); |
|
|
|
|
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); |
|
|
|
|
bool was_empty = true; |
|
|
|
|
for (size_t i = 0; i < kAvailableTags; i++) { |
|
|
|
|
if (tags_valid[i]) { |
|
|
|
|
was_empty = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
size_t idx = static_cast<size_t>(t); |
|
|
|
|
tags[idx] = ok; |
|
|
|
|
tags_valid[idx] = true; |
|
|
|
|
if (was_empty) { |
|
|
|
|
gpr_cv_signal(&tags_cv); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&tags_mu); |
|
|
|
|
}; |
|
|
|
|
auto cb = NewDeletingCallback(func); |
|
|
|
|
return cb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_end2end_test_fixture inproc_create_fixture( |
|
|
|
|
grpc_channel_args* client_args, grpc_channel_args* server_args) { |
|
|
|
|
grpc_end2end_test_fixture f; |
|
|
|
@ -180,28 +223,6 @@ void inproc_tear_down(grpc_end2end_test_fixture* f) { |
|
|
|
|
gpr_free(ffd); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_core::CQCallbackInterface* tag(intptr_t t) { |
|
|
|
|
auto func = [t](bool ok) { |
|
|
|
|
gpr_mu_lock(&tags_mu); |
|
|
|
|
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); |
|
|
|
|
bool was_empty = true; |
|
|
|
|
for (size_t i = 0; i < kAvailableTags; i++) { |
|
|
|
|
if (tags_valid[i]) { |
|
|
|
|
was_empty = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
size_t idx = static_cast<size_t>(t); |
|
|
|
|
tags[idx] = ok; |
|
|
|
|
tags_valid[idx] = true; |
|
|
|
|
if (was_empty) { |
|
|
|
|
gpr_cv_signal(&tags_cv); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&tags_mu); |
|
|
|
|
}; |
|
|
|
|
auto cb = NewDeletingCallback(func); |
|
|
|
|
return cb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, |
|
|
|
|
const char* test_name, |
|
|
|
|
grpc_channel_args* client_args, |
|
|
|
@ -221,7 +242,8 @@ static gpr_timespec n_seconds_from_now(int n) { |
|
|
|
|
static gpr_timespec five_seconds_from_now() { return n_seconds_from_now(5); } |
|
|
|
|
|
|
|
|
|
static void drain_cq(grpc_completion_queue* cq) { |
|
|
|
|
g_shutdown_callback->Wait(five_seconds_from_now()); |
|
|
|
|
// Wait for the shutdown callback to arrive, or fail the test
|
|
|
|
|
GPR_ASSERT(g_shutdown_callback->Wait(five_seconds_from_now())); |
|
|
|
|
gpr_log(GPR_DEBUG, "CQ shutdown wait complete"); |
|
|
|
|
grpc_core::Delete(g_shutdown_callback); |
|
|
|
|
} |
|
|
|
@ -288,6 +310,7 @@ static void simple_request_body(grpc_end2end_test_config config, |
|
|
|
|
grpc_metadata_array_init(&request_metadata_recv); |
|
|
|
|
grpc_call_details_init(&call_details); |
|
|
|
|
|
|
|
|
|
// Create a basic client unary request batch (no payload)
|
|
|
|
|
memset(ops, 0, sizeof(ops)); |
|
|
|
|
op = ops; |
|
|
|
|
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
|
|
|
@ -316,9 +339,13 @@ static void simple_request_body(grpc_end2end_test_config config, |
|
|
|
|
nullptr); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == error); |
|
|
|
|
|
|
|
|
|
// Register a call at the server-side to match the incoming client call
|
|
|
|
|
error = grpc_server_request_call(f.server, &s, &call_details, |
|
|
|
|
&request_metadata_recv, f.cq, f.cq, tag(2)); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == error); |
|
|
|
|
|
|
|
|
|
// We expect that the server call creation callback (and no others) will
|
|
|
|
|
// execute now since no other batch should be complete.
|
|
|
|
|
expect_tag(2, true); |
|
|
|
|
verify_tags(deadline); |
|
|
|
|
|
|
|
|
@ -331,6 +358,7 @@ static void simple_request_body(grpc_end2end_test_config config, |
|
|
|
|
gpr_log(GPR_DEBUG, "client_peer=%s", peer); |
|
|
|
|
gpr_free(peer); |
|
|
|
|
|
|
|
|
|
// Create the server response batch (no payload)
|
|
|
|
|
memset(ops, 0, sizeof(ops)); |
|
|
|
|
op = ops; |
|
|
|
|
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
|
|
|
@ -355,6 +383,8 @@ static void simple_request_body(grpc_end2end_test_config config, |
|
|
|
|
nullptr); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == error); |
|
|
|
|
|
|
|
|
|
// Both the client request and server response batches should get complete
|
|
|
|
|
// now and we should see that their callbacks have been executed
|
|
|
|
|
expect_tag(3, true); |
|
|
|
|
expect_tag(1, true); |
|
|
|
|
verify_tags(deadline); |
|
|
|
|