Merge pull request #12369 from markdroth/call_combiner
Second attempt at call combiner PRpull/12436/head
commit
8941f607d6
66 changed files with 1366 additions and 903 deletions
@ -0,0 +1,202 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2017 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include "src/core/lib/iomgr/call_combiner.h" |
||||||
|
|
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
grpc_tracer_flag grpc_call_combiner_trace = |
||||||
|
GRPC_TRACER_INITIALIZER(false, "call_combiner"); |
||||||
|
|
||||||
|
static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { |
||||||
|
if (cancel_state & 1) { |
||||||
|
return (grpc_error*)(cancel_state & ~(gpr_atm)1); |
||||||
|
} |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
static gpr_atm encode_cancel_state_error(grpc_error* error) { |
||||||
|
return (gpr_atm)1 | (gpr_atm)error; |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { |
||||||
|
gpr_mpscq_init(&call_combiner->queue); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { |
||||||
|
gpr_mpscq_destroy(&call_combiner->queue); |
||||||
|
GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); |
||||||
|
} |
||||||
|
|
||||||
|
#ifndef NDEBUG |
||||||
|
#define DEBUG_ARGS , const char *file, int line |
||||||
|
#define DEBUG_FMT_STR "%s:%d: " |
||||||
|
#define DEBUG_FMT_ARGS , file, line |
||||||
|
#else |
||||||
|
#define DEBUG_ARGS |
||||||
|
#define DEBUG_FMT_STR |
||||||
|
#define DEBUG_FMT_ARGS |
||||||
|
#endif |
||||||
|
|
||||||
|
void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_closure* closure, |
||||||
|
grpc_error* error DEBUG_ARGS, |
||||||
|
const char* reason) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR |
||||||
|
"%s] error=%s", |
||||||
|
call_combiner, closure DEBUG_FMT_ARGS, reason, |
||||||
|
grpc_error_string(error)); |
||||||
|
} |
||||||
|
size_t prev_size = |
||||||
|
(size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, |
||||||
|
prev_size + 1); |
||||||
|
} |
||||||
|
if (prev_size == 0) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); |
||||||
|
} |
||||||
|
// Queue was empty, so execute this closure immediately.
|
||||||
|
GRPC_CLOSURE_SCHED(exec_ctx, closure, error); |
||||||
|
} else { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_INFO, " QUEUING"); |
||||||
|
} |
||||||
|
// Queue was not empty, so add closure to queue.
|
||||||
|
closure->error_data.error = error; |
||||||
|
gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner DEBUG_ARGS, |
||||||
|
const char* reason) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", |
||||||
|
call_combiner DEBUG_FMT_ARGS, reason); |
||||||
|
} |
||||||
|
size_t prev_size = |
||||||
|
(size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, |
||||||
|
prev_size - 1); |
||||||
|
} |
||||||
|
GPR_ASSERT(prev_size >= 1); |
||||||
|
if (prev_size > 1) { |
||||||
|
while (true) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " checking queue"); |
||||||
|
} |
||||||
|
bool empty; |
||||||
|
grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end( |
||||||
|
&call_combiner->queue, &empty); |
||||||
|
if (closure == NULL) { |
||||||
|
// This can happen either due to a race condition within the mpscq
|
||||||
|
// code or because of a race with grpc_call_combiner_start().
|
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " queue returned no result; checking again"); |
||||||
|
} |
||||||
|
continue; |
||||||
|
} |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", |
||||||
|
closure, grpc_error_string(closure->error_data.error)); |
||||||
|
} |
||||||
|
GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); |
||||||
|
break; |
||||||
|
} |
||||||
|
} else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, " queue empty"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_closure* closure) { |
||||||
|
while (true) { |
||||||
|
// Decode original state.
|
||||||
|
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); |
||||||
|
grpc_error* original_error = decode_cancel_state_error(original_state); |
||||||
|
// If error is set, invoke the cancellation closure immediately.
|
||||||
|
// Otherwise, store the new closure.
|
||||||
|
if (original_error != GRPC_ERROR_NONE) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"call_combiner=%p: scheduling notify_on_cancel callback=%p " |
||||||
|
"for pre-existing cancellation", |
||||||
|
call_combiner, closure); |
||||||
|
} |
||||||
|
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error)); |
||||||
|
break; |
||||||
|
} else { |
||||||
|
if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, |
||||||
|
(gpr_atm)closure)) { |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p", |
||||||
|
call_combiner, closure); |
||||||
|
} |
||||||
|
// If we replaced an earlier closure, invoke the original
|
||||||
|
// closure with GRPC_ERROR_NONE. This allows callers to clean
|
||||||
|
// up any resources they may be holding for the callback.
|
||||||
|
if (original_state != 0) { |
||||||
|
closure = (grpc_closure*)original_state; |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"call_combiner=%p: scheduling old cancel callback=%p", |
||||||
|
call_combiner, closure); |
||||||
|
} |
||||||
|
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
// cas failed, try again.
|
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_error* error) { |
||||||
|
while (true) { |
||||||
|
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); |
||||||
|
grpc_error* original_error = decode_cancel_state_error(original_state); |
||||||
|
if (original_error != GRPC_ERROR_NONE) { |
||||||
|
GRPC_ERROR_UNREF(error); |
||||||
|
break; |
||||||
|
} |
||||||
|
if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, |
||||||
|
encode_cancel_state_error(error))) { |
||||||
|
if (original_state != 0) { |
||||||
|
grpc_closure* notify_on_cancel = (grpc_closure*)original_state; |
||||||
|
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"call_combiner=%p: scheduling notify_on_cancel callback=%p", |
||||||
|
call_combiner, notify_on_cancel); |
||||||
|
} |
||||||
|
GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
break; |
||||||
|
} |
||||||
|
// cas failed, try again.
|
||||||
|
} |
||||||
|
} |
@ -0,0 +1,121 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2017 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H |
||||||
|
#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H |
||||||
|
|
||||||
|
#include <stddef.h> |
||||||
|
|
||||||
|
#include <grpc/support/atm.h> |
||||||
|
|
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
#include "src/core/lib/iomgr/exec_ctx.h" |
||||||
|
#include "src/core/lib/support/mpscq.h" |
||||||
|
|
||||||
|
// A simple, lock-free mechanism for serializing activity related to a
|
||||||
|
// single call. This is similar to a combiner but is more lightweight.
|
||||||
|
//
|
||||||
|
// It requires the callback (or, in the common case where the callback
|
||||||
|
// actually kicks off a chain of callbacks, the last callback in that
|
||||||
|
// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
|
||||||
|
// when it is done with the action that was kicked off by the original
|
||||||
|
// callback.
|
||||||
|
|
||||||
|
extern grpc_tracer_flag grpc_call_combiner_trace; |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
gpr_atm size; // size_t, num closures in queue or currently executing
|
||||||
|
gpr_mpscq queue; |
||||||
|
// Either 0 (if not cancelled and no cancellation closure set),
|
||||||
|
// a grpc_closure* (if the lowest bit is 0),
|
||||||
|
// or a grpc_error* (if the lowest bit is 1).
|
||||||
|
gpr_atm cancel_state; |
||||||
|
} grpc_call_combiner; |
||||||
|
|
||||||
|
// Assumes memory was initialized to zero.
|
||||||
|
void grpc_call_combiner_init(grpc_call_combiner* call_combiner); |
||||||
|
|
||||||
|
void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); |
||||||
|
|
||||||
|
#ifndef NDEBUG |
||||||
|
#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ |
||||||
|
reason) \
|
||||||
|
grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
|
||||||
|
__FILE__, __LINE__, (reason)) |
||||||
|
#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ |
||||||
|
grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \
|
||||||
|
(reason)) |
||||||
|
/// Starts processing \a closure on \a call_combiner.
|
||||||
|
void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_closure* closure, grpc_error* error, |
||||||
|
const char* file, int line, const char* reason); |
||||||
|
/// Yields the call combiner to the next closure in the queue, if any.
|
||||||
|
void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
const char* file, int line, const char* reason); |
||||||
|
#else |
||||||
|
#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ |
||||||
|
reason) \
|
||||||
|
grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
|
||||||
|
(reason)) |
||||||
|
#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ |
||||||
|
grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason)) |
||||||
|
/// Starts processing \a closure on \a call_combiner.
|
||||||
|
void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_closure* closure, grpc_error* error, |
||||||
|
const char* reason); |
||||||
|
/// Yields the call combiner to the next closure in the queue, if any.
|
||||||
|
void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
const char* reason); |
||||||
|
#endif |
||||||
|
|
||||||
|
/// Registers \a closure to be invoked by \a call_combiner when
|
||||||
|
/// grpc_call_combiner_cancel() is called.
|
||||||
|
///
|
||||||
|
/// Once a closure is registered, it will always be scheduled exactly
|
||||||
|
/// once; this allows the closure to hold references that will be freed
|
||||||
|
/// regardless of whether or not the call was cancelled. If a cancellation
|
||||||
|
/// does occur, the closure will be scheduled with the cancellation error;
|
||||||
|
/// otherwise, it will be scheduled with GRPC_ERROR_NONE.
|
||||||
|
///
|
||||||
|
/// The closure will be scheduled in the following cases:
|
||||||
|
/// - If grpc_call_combiner_cancel() was called prior to registering the
|
||||||
|
/// closure, it will be scheduled immediately with the cancelation error.
|
||||||
|
/// - If grpc_call_combiner_cancel() is called after registering the
|
||||||
|
/// closure, the closure will be scheduled with the cancellation error.
|
||||||
|
/// - If grpc_call_combiner_set_notify_on_cancel() is called again to
|
||||||
|
/// register a new cancellation closure, the previous cancellation
|
||||||
|
/// closure will be scheduled with GRPC_ERROR_NONE.
|
||||||
|
///
|
||||||
|
/// If \a closure is NULL, then no closure will be invoked on
|
||||||
|
/// cancellation; this effectively unregisters the previously set closure.
|
||||||
|
/// However, most filters will not need to explicitly unregister their
|
||||||
|
/// callbacks, as this is done automatically when the call is destroyed.
|
||||||
|
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_closure* closure); |
||||||
|
|
||||||
|
/// Indicates that the call has been cancelled.
|
||||||
|
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, |
||||||
|
grpc_call_combiner* call_combiner, |
||||||
|
grpc_error* error); |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ |
Loading…
Reference in new issue