Merge pull request #16418 from yashykt/status

Status propagation changes
pull/16524/merge
Yash Tibrewal 6 years ago committed by GitHub
commit 774deae2ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/ext/filters/http/client/http_client_filter.cc
  2. 25
      src/core/ext/filters/http/server/http_server_filter.cc
  3. 36
      src/core/ext/filters/message_size/message_size_filter.cc
  4. 21
      src/core/lib/iomgr/error.cc
  5. 8
      src/core/lib/iomgr/error.h
  6. 25
      src/core/lib/security/transport/server_auth_filter.cc
  7. 362
      src/core/lib/surface/call.cc
  8. 23
      src/core/lib/surface/server.cc
  9. 11
      test/core/iomgr/error_test.cc

@ -51,6 +51,7 @@ struct call_data {
grpc_linked_mdelem user_agent;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch* recv_initial_metadata;
grpc_error* recv_initial_metadata_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
@ -147,6 +148,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata);
calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
} else {
GRPC_ERROR_REF(error);
}
@ -162,6 +164,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
error = grpc_error_add_child(
error, GRPC_ERROR_REF(calld->recv_initial_metadata_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
@ -434,7 +438,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {}
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
}
static grpc_mdelem scheme_from_args(const grpc_channel_args* args) {
unsigned i;

@ -50,6 +50,7 @@ struct call_data {
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_error* recv_initial_metadata_ready_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_metadata_batch* recv_initial_metadata;
uint32_t* recv_initial_metadata_flags;
@ -60,6 +61,9 @@ struct call_data {
grpc_closure recv_message_ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
bool seen_recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata_ready;
};
} // namespace
@ -267,6 +271,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
calld->seen_recv_initial_metadata_ready = true;
if (err == GRPC_ERROR_NONE) {
err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata);
calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err);
if (calld->seen_recv_message_ready) {
// We've already seen the recv_message callback, but we previously
// deferred it, so we need to return it here.
@ -313,6 +318,15 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
}
}
static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
err = grpc_error_add_child(
GRPC_ERROR_REF(err),
GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
}
static grpc_error* hs_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
/* grab pointers to our data from the call element */
@ -357,6 +371,13 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem,
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
if (op->send_trailing_metadata) {
grpc_error* error = hs_filter_outgoing_metadata(
elem, op->payload->send_trailing_metadata.send_trailing_metadata);
@ -389,6 +410,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
hs_recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
@ -397,6 +421,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error);
if (calld->have_read_stream) {
calld->read_stream->Orphan();
}

@ -99,10 +99,15 @@ struct call_data {
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
// The error caused by a message that is too large, or GRPC_ERROR_NONE
grpc_error* error;
// Used by recv_message_ready.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* original_recv_trailing_metadata_ready;
};
struct channel_data {
@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_ERROR_UNREF(calld->error);
if (error == GRPC_ERROR_NONE) {
error = new_error;
} else {
error = grpc_error_add_child(error, new_error);
GRPC_ERROR_UNREF(new_error);
}
calld->error = GRPC_ERROR_REF(error);
gpr_free(message_string);
} else {
GRPC_ERROR_REF(error);
@ -144,6 +150,17 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
}
// Callback invoked on completion of recv_trailing_metadata
// Notifies the recv_trailing_metadata batch of any message size failures
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
error =
grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error));
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
// Start transport stream op.
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
@ -172,6 +189,13 @@ static void start_transport_stream_op_batch(
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Inject callback for receiving trailing metadata.
if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
@ -183,8 +207,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = nullptr;
calld->original_recv_trailing_metadata_ready = nullptr;
calld->error = GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
@ -213,7 +242,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Destructor for call_data.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {}
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
GRPC_ERROR_UNREF(calld->error);
}
static int default_size(const grpc_channel_args* args,
int without_minimal_stack) {

@ -513,9 +513,24 @@ bool grpc_error_get_str(grpc_error* err, grpc_error_strs which,
grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) {
GPR_TIMER_SCOPE("grpc_error_add_child", 0);
grpc_error* new_err = copy_error_and_unref(src);
internal_add_error(&new_err, child);
return new_err;
if (src != GRPC_ERROR_NONE) {
if (child == GRPC_ERROR_NONE) {
/* \a child is empty. Simply return the ref to \a src */
return src;
} else if (child != src) {
grpc_error* new_err = copy_error_and_unref(src);
internal_add_error(&new_err, child);
return new_err;
} else {
/* \a src and \a child are the same. Drop one of the references and return
* the other */
GRPC_ERROR_UNREF(child);
return src;
}
} else {
/* \a src is empty. Simply return the ref to \a child */
return child;
}
}
static const char* no_error_string = "\"No Error\"";

@ -185,8 +185,16 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which,
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them. The src error takes ownership of the
/// child error.
///
/// Edge Conditions -
/// 1) If either of \a src or \a child is GRPC_ERROR_NONE, returns a reference
/// to the other argument. 2) If both \a src and \a child are GRPC_ERROR_NONE,
/// returns GRPC_ERROR_NONE. 3) If \a src and \a child point to the same error,
/// returns a single reference. (Note that, 2 references should have been
/// received to the error in this case.)
grpc_error* grpc_error_add_child(grpc_error* src,
grpc_error* child) GRPC_MUST_USE_RESULT;
grpc_error* grpc_os_error(const char* file, int line, int err,
const char* call_name) GRPC_MUST_USE_RESULT;

@ -41,6 +41,9 @@ struct call_data {
grpc_transport_stream_op_batch* recv_initial_metadata_batch;
grpc_closure* original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
grpc_error* error;
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_metadata_array md;
const grpc_metadata* consumed_md;
size_t num_consumed_md;
@ -111,6 +114,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
}
calld->error = GRPC_ERROR_REF(error);
GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error);
}
@ -184,6 +188,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
GRPC_ERROR_REF(error));
}
static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
}
static void auth_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -195,6 +206,12 @@ static void auth_start_transport_stream_op_batch(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
if (batch->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
grpc_call_next_op(elem, batch);
}
@ -208,6 +225,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
// Create server security context. Set its auth context from channel
// data and save it in the call context.
grpc_server_security_context* server_ctx =
@ -227,7 +247,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {}
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_ERROR_UNREF(calld->error);
}
/* Constructor for channel_data */
static grpc_error* init_channel_elem(grpc_channel_element* elem,

@ -71,46 +71,6 @@
// Used to create arena for the first call.
#define ESTIMATED_MDELEM_COUNT 16
/* Status data for a request can come from several sources; this
enumerates them all, and acts as a priority sorting for which
status to return to the application - earlier entries override
later ones */
typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
/* Status was created by some internal channel stack operation: must come via
add_batch_error */
STATUS_FROM_CORE,
/* Status was created by some surface error */
STATUS_FROM_SURFACE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
} status_source;
typedef struct {
bool is_set;
grpc_error* error;
} received_status;
static gpr_atm pack_received_status(received_status r) {
return r.is_set ? (1 | (gpr_atm)r.error) : 0;
}
static received_status unpack_received_status(gpr_atm atm) {
if ((atm & 1) == 0) {
return {false, GRPC_ERROR_NONE};
} else {
return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))};
}
}
#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
grpc_call* call;
/* Share memory for cq_completion and notify_tag as they are never needed
@ -135,10 +95,7 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
grpc_error* errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
grpc_error* batch_error;
grpc_transport_stream_op_batch op;
} batch_control;
@ -201,9 +158,6 @@ struct grpc_call {
// A char* indicating the peer name.
gpr_atm peer_string;
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
/* Call data useful used for reporting. Only valid after the call has
* completed */
grpc_call_final_info final_info;
@ -236,6 +190,7 @@ struct grpc_call {
grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
gpr_atm cancelled;
grpc_closure release_call;
@ -249,6 +204,7 @@ struct grpc_call {
int* cancelled;
} server;
} final_op;
grpc_error* status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@ -286,23 +242,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
static void cancel_with_status(grpc_call* c, status_source source,
grpc_status_code status,
static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description);
static void cancel_with_error(grpc_call* c, status_source source,
grpc_error* error);
static void cancel_with_error(grpc_call* c, grpc_error* error);
static void destroy_call(void* call_stack, grpc_error* error);
static void receiving_slice_ready(void* bctlp, grpc_error* error);
static void get_final_status(
grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
void* set_value_user_data, grpc_slice* details, const char** error_string);
static void set_status_value_directly(grpc_status_code status, void* dest);
static void set_status_from_error(grpc_call* call, status_source source,
grpc_error* error);
static void set_final_status(grpc_call* call, grpc_error* error);
static void process_data_after_md(batch_control* bctl);
static void post_batch_completion(batch_control* bctl);
static void add_batch_error(batch_control* bctl, grpc_error* error,
bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@ -353,6 +301,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
gpr_atm_no_barrier_store(&call->cancelled, 0);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
@ -464,10 +413,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
gpr_mu_unlock(&pc->child_list_mu);
}
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
cancel_with_error(call, GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
cancel_with_error(call, GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(args->pollset_set_alternative == nullptr &&
@ -561,16 +510,13 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
get_final_status(c, set_status_value_directly, &c->final_info.final_status,
nullptr, &(c->final_info.error_string));
grpc_error_get_status(c->status_error, c->send_deadline,
&c->final_info.final_status, nullptr, nullptr,
&(c->final_info.error_string));
GRPC_ERROR_UNREF(c->status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
GRPC_ERROR_UNREF(
unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
@ -608,7 +554,7 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
cancel_with_error(c, GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
@ -626,8 +572,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ExecCtx exec_ctx;
cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
}
@ -681,8 +626,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
cancel_with_status(c, status, description);
return GRPC_CALL_OK;
}
@ -702,15 +646,17 @@ static void done_termination(void* arg, grpc_error* error) {
gpr_free(state);
}
static void cancel_with_error(grpc_call* c, status_source source,
grpc_error* error) {
static void cancel_with_error(grpc_call* c, grpc_error* error) {
if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
GRPC_ERROR_UNREF(error);
return;
}
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(c, source, GRPC_ERROR_REF(error));
cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@ -733,90 +679,44 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
static void cancel_with_status(grpc_call* c, status_source source,
grpc_status_code status,
static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description) {
cancel_with_error(c, source, error_from_status(status, description));
cancel_with_error(c, error_from_status(status, description));
}
/*******************************************************************************
* FINAL STATUS CODE MANIPULATION
*/
static bool get_final_status_from(
grpc_call* call, grpc_error* error, bool allow_ok_status,
void (*set_value)(grpc_status_code code, void* user_data),
void* set_value_user_data, grpc_slice* details, const char** error_string) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
error_string);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
set_value(code, set_value_user_data);
if (details != nullptr) {
*details = grpc_slice_ref_internal(slice);
}
return true;
}
static void get_final_status(
grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
void* set_value_user_data, grpc_slice* details, const char** error_string) {
int i;
received_status status[STATUS_SOURCE_COUNT];
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
}
static void set_final_status(grpc_call* call, grpc_error* error) {
if (grpc_call_error_trace.enabled()) {
gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
}
}
gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
}
/* first search through ignoring "OK" statuses: if something went wrong,
* ensure we report it */
for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
/* search for the best status we can present: ideally the error we use has a
clearly defined grpc-status, and we'll prefer that. */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set &&
grpc_error_has_clear_grpc_status(status[i].error)) {
if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details,
error_string)) {
return;
}
}
}
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details,
error_string)) {
return;
}
if (call->is_client) {
grpc_error_get_status(error, call->send_deadline,
call->final_op.client.status,
call->final_op.client.status_details, nullptr,
call->final_op.client.error_string);
// explicitly take a ref
grpc_slice_ref_internal(*call->final_op.client.status_details);
call->status_error = error;
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
if (*call->final_op.client.status != GRPC_STATUS_OK) {
channelz_channel->RecordCallFailed();
} else {
channelz_channel->RecordCallSucceeded();
}
}
}
/* If nothing exists, set some default */
if (call->is_client) {
set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
} else {
set_value(GRPC_STATUS_OK, set_value_user_data);
}
}
static void set_status_from_error(grpc_call* call, status_source source,
grpc_error* error) {
if (!gpr_atm_rel_cas(&call->status[source],
pack_received_status({false, GRPC_ERROR_NONE}),
pack_received_status({true, error}))) {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
/* TODO(ncteisen) : Update channelz handling for server
if (channelz_channel != nullptr) {
if (*call->final_op.server.cancelled) {
channelz_channel->RecordCallFailed();
} else {
channelz_channel->RecordCallSucceeded();
}
} */
GRPC_ERROR_UNREF(error);
}
}
@ -1035,6 +935,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
int is_trailing) {
if (b->list.count == 0) return;
if (!call->is_client && is_trailing) return;
if (is_trailing && call->buffered_metadata[1] == nullptr) return;
GPR_TIMER_SCOPE("publish_app_metadata", 0);
grpc_metadata_array* dest;
@ -1088,9 +989,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
publish_app_metadata(call, b, false);
}
static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
grpc_error* batch_error) {
grpc_call* call = static_cast<grpc_call*>(args);
if (b->idx.named.grpc_status != nullptr) {
if (batch_error != GRPC_ERROR_NONE) {
set_final_status(call, batch_error);
} else if (b->idx.named.grpc_status != nullptr) {
grpc_status_code status_code =
grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
grpc_error* error = GRPC_ERROR_NONE;
@ -1108,8 +1012,18 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
set_status_from_error(call, STATUS_FROM_WIRE, error);
set_final_status(call, GRPC_ERROR_REF(error));
grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
GRPC_ERROR_UNREF(error);
} else if (!call->is_client) {
set_final_status(call, GRPC_ERROR_NONE);
} else {
gpr_log(GPR_DEBUG,
"Received trailing metadata with no error and no status");
set_final_status(
call, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
}
publish_app_metadata(call, b, true);
}
@ -1124,14 +1038,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
* BATCH API IMPLEMENTATION
*/
static void set_status_value_directly(grpc_status_code status, void* dest) {
*static_cast<grpc_status_code*>(dest) = status;
}
static void set_cancelled_value(grpc_status_code status, void* dest) {
*static_cast<int*>(dest) = (status != GRPC_STATUS_OK);
}
static bool are_write_flags_valid(uint32_t flags) {
/* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
const uint32_t allowed_write_positions =
@ -1199,31 +1105,15 @@ static void finish_batch_completion(void* user_data,
GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
static grpc_error* consolidate_batch_errors(batch_control* bctl) {
size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors));
if (n == 0) {
return GRPC_ERROR_NONE;
} else if (n == 1) {
/* Skip creating a composite error in the case that only one error was
logged */
grpc_error* e = bctl->errors[0];
bctl->errors[0] = nullptr;
return e;
} else {
grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Call batch failed", bctl->errors, n);
for (size_t i = 0; i < n; i++) {
GRPC_ERROR_UNREF(bctl->errors[i]);
bctl->errors[i] = nullptr;
}
return error;
}
static void reset_batch_errors(batch_control* bctl) {
GRPC_ERROR_UNREF(bctl->batch_error);
bctl->batch_error = GRPC_ERROR_NONE;
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
grpc_error* error = consolidate_batch_errors(bctl);
grpc_error* error = GRPC_ERROR_REF(bctl->batch_error);
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@ -1249,8 +1139,7 @@ static void post_batch_completion(batch_control* bctl) {
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
cancel_with_error(child, GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
}
child = next_child_call;
@ -1258,24 +1147,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
call->final_op.client.status_details,
call->final_op.client.error_string);
} else {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
if (*call->final_op.client.status != GRPC_STATUS_OK) {
channelz_channel->RecordCallFailed();
} else {
channelz_channel->RecordCallSucceeded();
}
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@ -1284,9 +1155,10 @@ static void post_batch_completion(batch_control* bctl) {
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
}
reset_batch_errors(bctl);
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
/* unrefs error */
bctl->call = nullptr;
/* This closure may be meant to be run within some combiner. Since we aren't
* running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
@ -1296,7 +1168,7 @@ static void post_batch_completion(batch_control* bctl) {
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs bctl->error */
/* unrefs error */
grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
finish_batch_completion, bctl,
&bctl->completion_data.cq_completion);
@ -1405,8 +1277,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
add_batch_error(bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
@ -1442,8 +1316,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
error_msg);
cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else if (
grpc_compression_algorithm_from_message_stream_compression_algorithm(
@ -1455,8 +1328,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
"compression (%d).",
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
error_msg);
cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else {
char* error_msg = nullptr;
@ -1466,8 +1338,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
error_msg);
cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, compression_algorithm) == 0) {
/* check if algorithm is supported by current channel config */
@ -1476,8 +1347,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
error_msg);
cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
}
gpr_free(error_msg);
@ -1495,23 +1365,12 @@ static void validate_filtered_metadata(batch_control* bctl) {
}
}
static void add_batch_error(batch_control* bctl, grpc_error* error,
bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1));
if (idx == 0 && !has_cancelled) {
cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
}
bctl->errors[idx] = error;
}
static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@ -1524,6 +1383,11 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
call->send_deadline = md->deadline;
}
} else {
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
grpc_closure* saved_rsr_closure = nullptr;
@ -1561,10 +1425,9 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
finish_batch_step(bctl);
}
@ -1572,7 +1435,12 @@ static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
}
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, GRPC_ERROR_REF(error));
}
finish_batch_step(bctl);
}
@ -1774,28 +1642,32 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
call->channel, op->data.send_status_from_server.status);
{
grpc_error* override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
override_error =
error_from_status(op->data.send_status_from_server.status,
"Returned non-ok status");
}
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
grpc_error* status_error =
op->data.send_status_from_server.status == GRPC_STATUS_OK
? GRPC_ERROR_NONE
: grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server returned error"),
GRPC_ERROR_INT_GRPC_STATUS,
static_cast<intptr_t>(
op->data.send_status_from_server.status));
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
if (status_error != GRPC_ERROR_NONE) {
char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
override_error =
grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
status_error =
grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
}
call->status_error = status_error;
if (!prepare_application_metadata(
call,
static_cast<int>(

@ -149,6 +149,9 @@ struct call_data {
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
grpc_closure recv_trailing_metadata_ready;
grpc_error* error;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure publish;
@ -730,6 +733,14 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
}
static void server_recv_trailing_metadata_ready(void* user_data,
grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
}
static void server_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -745,6 +756,12 @@ static void server_mutate_op(grpc_call_element* elem,
op->payload->recv_initial_metadata.recv_flags =
&calld->recv_initial_metadata_flags;
}
if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
}
static void server_start_transport_stream_op_batch(
@ -828,7 +845,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
server_recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
server_ref(chand->server);
return GRPC_ERROR_NONE;
}
@ -840,7 +859,7 @@ static void destroy_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_ASSERT(calld->state != PENDING);
GRPC_ERROR_UNREF(calld->error);
if (calld->host_set) {
grpc_slice_unref_internal(calld->host);
}

@ -187,16 +187,6 @@ static void test_os_error() {
GRPC_ERROR_UNREF(error);
}
static void test_special() {
grpc_error* error = GRPC_ERROR_NONE;
error = grpc_error_add_child(
error, GRPC_ERROR_CREATE_FROM_STATIC_STRING("test child"));
intptr_t i;
GPR_ASSERT(grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &i));
GPR_ASSERT(i == GRPC_STATUS_OK);
GRPC_ERROR_UNREF(error);
}
static void test_overflow() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Overflow");
@ -235,7 +225,6 @@ int main(int argc, char** argv) {
test_os_error();
test_create_referencing();
test_create_referencing_many();
test_special();
test_overflow();
grpc_shutdown();

Loading…
Cancel
Save