diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb1..04ac4ac947c 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,6 +51,7 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -147,6 +148,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -162,6 +164,13 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } + if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); + } else if (error != calld->recv_initial_metadata_error) { + error = grpc_error_add_child(error, calld->recv_initial_metadata_error); + } + } GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 3919447f264..01e5aa445eb 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -50,6 +50,7 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -60,6 +61,9 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr* recv_message; bool seen_recv_message_ready; + + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; }; } // namespace @@ -267,6 +271,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -313,6 +318,23 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { + if (err == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); + } else if (err != calld->recv_initial_metadata_ready_error) { + err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); + } else { + err = GRPC_ERROR_REF(err); + } + } else { + err = GRPC_ERROR_REF(err); + } + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -357,6 +379,11 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + } + if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -389,6 +416,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -397,6 +427,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e627..deb5ae70ecc 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata_ready; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* next_recv_trailing_metadata_ready; }; struct channel_data { @@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -144,6 +150,26 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->error); + } else if (error != calld->error) { + error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error)); + } else { + error = GRPC_ERROR_REF(error); + } + } else { + error = GRPC_ERROR_REF(error); + } + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +198,13 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->next_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +216,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->next_recv_trailing_metadata_ready = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +251,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) {