Add a mechanism for tagging threads that might be owned by

calls/channels

Use it to ensure we don't delete the call from that thread: doing so
would create a cycle that's kind of bad.
reviewable/pr8842/r4
Craig Tiller 8 years ago
parent 45d318351b
commit 8cf88f1eb1
  1. 15
      src/core/lib/iomgr/exec_ctx.c
  2. 15
      src/core/lib/iomgr/exec_ctx.h
  3. 4
      src/core/lib/security/credentials/plugin/plugin_credentials.c
  4. 8
      src/core/lib/surface/completion_queue.c
  5. 11
      src/core/lib/transport/transport.c

@ -42,11 +42,16 @@
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
if (!exec_ctx->cached_ready_to_finish) {
exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish(
exec_ctx, exec_ctx->check_ready_to_finish_arg);
if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
if (exec_ctx->check_ready_to_finish(exec_ctx,
exec_ctx->check_ready_to_finish_arg)) {
exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
return true;
}
return false;
} else {
return true;
}
return exec_ctx->cached_ready_to_finish;
}
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
@ -82,7 +87,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
exec_ctx->cached_ready_to_finish = true;
exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
grpc_exec_ctx_flush(exec_ctx);
}

@ -43,6 +43,13 @@
typedef struct grpc_workqueue grpc_workqueue;
typedef struct grpc_combiner grpc_combiner;
/* This exec_ctx is ready to return: either pre-populated, or cached as soon as
the finish_check returns true */
#define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
/* The exec_ctx's thread is (potentially) owned by a call or channel: care
should be given to not delete said call/channel from this exec_ctx */
#define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
/** Execution context.
* A bag of data that collects information along a callstack.
* Generally created at public API entry points, and passed down as
@ -69,20 +76,20 @@ struct grpc_exec_ctx {
grpc_combiner *active_combiner;
/** last active combiner in the active combiner list */
grpc_combiner *last_combiner;
bool cached_ready_to_finish;
uintptr_t flags;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, NULL, NULL, false, finish_check_arg, finish_check }
#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check }
/* initialize an execution context at the top level of an API call into grpc
(this is safe to use elsewhere, though possibly not as efficient) */
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
GRPC_EXEC_CTX_INITIALIZER(GRPC_EXEC_CTX_FLAG_IS_FINISHED, NULL, NULL)
extern grpc_closure_scheduler *grpc_schedule_on_exec_ctx;

@ -65,7 +65,9 @@ static void plugin_md_request_metadata_ready(void *request,
grpc_status_code status,
const char *error_details) {
/* called from application code */
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(
GRPC_EXEC_CTX_FLAG_IS_FINISHED | GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP,
NULL, NULL);
grpc_metadata_plugin_request *r = (grpc_metadata_plugin_request *)request;
if (status != GRPC_STATUS_OK) {
if (error_details != NULL) {

@ -402,8 +402,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
@ -571,8 +571,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);

@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@ -69,6 +70,16 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
/* Ick.
The thread we're running on MAY be owned (indirectly) by a call-stack.
If that's the case, destroying the call-stack MAY try to destroy the
thread, which is a tangled mess that we just don't want to ever have to
cope with.
Throw this over to the executor (on a core-owned thread) and process it
there. */
refcount->destroy.scheduler = grpc_executor_scheduler;
}
grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}

Loading…
Cancel
Save