Revert "Start supporting a callback-based RPC under lock"

pull/19175/head
Karthik Ravi Shankar 6 years ago committed by GitHub
parent 837a99e1d4
commit b790c24e5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 85
      src/core/lib/surface/completion_queue.cc
  2. 3
      src/core/lib/surface/completion_queue.h
  3. 2
      src/core/lib/surface/server.cc
  4. 44
      test/core/surface/completion_queue_test.cc
  5. 28
      test/cpp/end2end/client_callback_end2end_test.cc
  6. 35
      test/cpp/microbenchmarks/bm_cq.cc
  7. 2
      test/cpp/microbenchmarks/callback_streaming_ping_pong.h

@ -34,7 +34,6 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -201,7 +200,7 @@ struct cq_vtable {
bool (*begin_op)(grpc_completion_queue* cq, void* tag); bool (*begin_op)(grpc_completion_queue* cq, void* tag);
void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, bool internal); void* done_arg, grpc_cq_completion* storage);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved); void* reserved);
grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
@ -355,20 +354,23 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
// queue. The done argument is a callback that will be invoked when it is // queue. The done argument is a callback that will be invoked when it is
// safe to free up that storage. The storage MUST NOT be freed until the // safe to free up that storage. The storage MUST NOT be freed until the
// done callback is invoked. // done callback is invoked.
static void cq_end_op_for_next( static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg,
grpc_cq_completion* storage, bool internal); grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error* error, static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_error* error,
grpc_cq_completion* storage, bool internal); void (*done)(void* done_arg,
grpc_cq_completion* storage),
static void cq_end_op_for_callback( void* done_arg, grpc_cq_completion* storage);
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
grpc_cq_completion* storage, bool internal); grpc_error* error,
void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved); void* reserved);
@ -672,10 +674,11 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion * completion
* type of GRPC_CQ_NEXT) */ * type of GRPC_CQ_NEXT) */
static void cq_end_op_for_next( static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_next", 0); GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
@ -751,10 +754,11 @@ static void cq_end_op_for_next(
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion * completion
* type of GRPC_CQ_PLUCK) */ * type of GRPC_CQ_PLUCK) */
static void cq_end_op_for_pluck( static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
@ -817,19 +821,15 @@ static void cq_end_op_for_pluck(
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void functor_callback(void* arg, grpc_error* error) {
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
functor->functor_run(functor, error == GRPC_ERROR_NONE);
}
/* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
static void cq_end_op_for_callback( static void cq_end_op_for_callback(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
bool is_success = (error == GRPC_ERROR_NONE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
@ -856,25 +856,16 @@ static void cq_end_op_for_callback(
cq_finish_shutdown_callback(cq); cq_finish_shutdown_callback(cq);
} }
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
if (internal) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
(error == GRPC_ERROR_NONE));
} else {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, functor,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success);
} }
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, void* done_arg, grpc_cq_completion* storage) {
bool internal) { cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
} }
typedef struct { typedef struct {
@ -1352,11 +1343,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
GRPC_CLOSURE_SCHED( grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
GRPC_CLOSURE_CREATE(
functor_callback, callback,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_NONE);
} }
static void cq_shutdown_callback(grpc_completion_queue* cq) { static void cq_shutdown_callback(grpc_completion_queue* cq) {

@ -77,8 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag);
grpc_cq_begin_op */ grpc_cq_begin_op */
void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, void* done_arg, grpc_cq_completion* storage);
bool internal = false);
grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc);

@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
} }
grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
rc, &rc->completion, true); rc, &rc->completion);
} }
static void publish_new_rpc(void* arg, grpc_error* error) { static void publish_new_rpc(void* arg, grpc_error* error) {

@ -23,7 +23,6 @@
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
@ -360,19 +359,12 @@ static void test_pluck_after_shutdown(void) {
static void test_callback(void) { static void test_callback(void) {
grpc_completion_queue* cc; grpc_completion_queue* cc;
static void* tags[128]; void* tags[128];
grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
grpc_cq_polling_type polling_types[] = { grpc_cq_polling_type polling_types[] = {
GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
grpc_completion_queue_attributes attr; grpc_completion_queue_attributes attr;
unsigned i; unsigned i;
static gpr_mu mu, shutdown_mu;
static gpr_cv cv, shutdown_cv;
static int cb_counter;
gpr_mu_init(&mu);
gpr_mu_init(&shutdown_mu);
gpr_cv_init(&cv);
gpr_cv_init(&shutdown_cv);
LOG_TEST("test_callback"); LOG_TEST("test_callback");
@ -384,11 +376,7 @@ static void test_callback(void) {
} }
~ShutdownCallback() {} ~ShutdownCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&shutdown_mu);
*static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok); *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
// Signal when the shutdown callback is completed.
gpr_cv_signal(&shutdown_cv);
gpr_mu_unlock(&shutdown_mu);
} }
private: private:
@ -403,9 +391,9 @@ static void test_callback(void) {
for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
int sumtags = 0; int sumtags = 0;
int counter = 0; int counter = 0;
cb_counter = 0;
{ {
// reset exec_ctx types // reset exec_ctx types
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
attr.cq_polling_type = polling_types[pidx]; attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create( cc = grpc_completion_queue_create(
@ -421,13 +409,7 @@ static void test_callback(void) {
int ok) { int ok) {
GPR_ASSERT(static_cast<bool>(ok)); GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb); auto* callback = static_cast<TagCallback*>(cb);
gpr_mu_lock(&mu);
cb_counter++;
*callback->counter_ += callback->tag_; *callback->counter_ += callback->tag_;
if (cb_counter == GPR_ARRAY_SIZE(tags)) {
gpr_cv_signal(&cv);
}
gpr_mu_unlock(&mu);
grpc_core::Delete(callback); grpc_core::Delete(callback);
}; };
@ -447,34 +429,12 @@ static void test_callback(void) {
nullptr, &completions[i]); nullptr, &completions[i]);
} }
gpr_mu_lock(&mu);
while (cb_counter != GPR_ARRAY_SIZE(tags)) {
// Wait for all the callbacks to complete.
gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&mu);
shutdown_and_destroy(cc); shutdown_and_destroy(cc);
gpr_mu_lock(&shutdown_mu);
while (!got_shutdown) {
// Wait for the shutdown callback to complete.
gpr_cv_wait(&shutdown_cv, &shutdown_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&shutdown_mu);
} }
// Run the assertions to check if the test ran successfully.
GPR_ASSERT(sumtags == counter); GPR_ASSERT(sumtags == counter);
GPR_ASSERT(got_shutdown); GPR_ASSERT(got_shutdown);
got_shutdown = false; got_shutdown = false;
} }
gpr_cv_destroy(&cv);
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&mu);
gpr_mu_destroy(&shutdown_mu);
} }
struct thread_state { struct thread_state {

@ -374,34 +374,6 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
SendRpcs(1, false); SendRpcs(1, false);
} }
TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) {
MAYBE_SKIP_TEST;
ResetStub();
std::mutex mu;
std::condition_variable cv;
bool done = false;
EchoRequest request;
request.set_message("Hello locked world.");
EchoResponse response;
ClientContext cli_ctx;
{
std::lock_guard<std::mutex> l(mu);
stub_->experimental_async()->Echo(
&cli_ctx, &request, &response,
[&mu, &cv, &done, &request, &response](Status s) {
std::lock_guard<std::mutex> l(mu);
EXPECT_TRUE(s.ok());
EXPECT_EQ(request.message(), response.message());
done = true;
cv.notify_one();
});
}
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();

@ -150,9 +150,6 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) {
grpc_completion_queue_destroy(cc); grpc_completion_queue_destroy(cc);
} }
static gpr_mu shutdown_mu, mu;
static gpr_cv shutdown_cv, cv;
// Tag completion queue iterate times // Tag completion queue iterate times
class TagCallback : public grpc_experimental_completion_queue_functor { class TagCallback : public grpc_experimental_completion_queue_functor {
public: public:
@ -161,11 +158,8 @@ class TagCallback : public grpc_experimental_completion_queue_functor {
} }
~TagCallback() {} ~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&mu);
GPR_ASSERT(static_cast<bool>(ok)); GPR_ASSERT(static_cast<bool>(ok));
*static_cast<TagCallback*>(cb)->iter_ += 1; *static_cast<TagCallback*>(cb)->iter_ += 1;
gpr_cv_signal(&cv);
gpr_mu_unlock(&mu);
}; };
private: private:
@ -180,10 +174,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
} }
~ShutdownCallback() {} ~ShutdownCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&shutdown_mu);
*static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok); *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
gpr_cv_signal(&shutdown_cv);
gpr_mu_unlock(&shutdown_mu);
} }
private: private:
@ -192,12 +183,8 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
int iteration = 0, current_iterations = 0; int iteration = 0;
TagCallback tag_cb(&iteration); TagCallback tag_cb(&iteration);
gpr_mu_init(&mu);
gpr_cv_init(&cv);
gpr_mu_init(&shutdown_mu);
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false; bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown); ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc = grpc_completion_queue* cc =
@ -211,29 +198,9 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
nullptr, &completion); nullptr, &completion);
} }
shutdown_and_destroy(cc); shutdown_and_destroy(cc);
gpr_mu_lock(&mu);
current_iterations = static_cast<int>(state.iterations());
while (current_iterations != iteration) {
// Wait for all the callbacks to complete.
gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&mu);
gpr_mu_lock(&shutdown_mu);
while (!got_shutdown) {
// Wait for the shutdown callback to complete.
gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&shutdown_mu);
GPR_ASSERT(got_shutdown); GPR_ASSERT(got_shutdown);
GPR_ASSERT(iteration == static_cast<int>(state.iterations())); GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
track_counters.Finish(state); track_counters.Finish(state);
gpr_cv_destroy(&cv);
gpr_mu_destroy(&mu);
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
} }
BENCHMARK(BM_Callback_CQ_Pass1Core); BENCHMARK(BM_Callback_CQ_Pass1Core);

@ -115,7 +115,7 @@ class BidiClient
int msgs_size_; int msgs_size_;
std::mutex mu; std::mutex mu;
std::condition_variable cv; std::condition_variable cv;
bool done = false; bool done;
}; };
template <class Fixture, class ClientContextMutator, class ServerContextMutator> template <class Fixture, class ClientContextMutator, class ServerContextMutator>

Loading…
Cancel
Save