diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index c2020158756..3919447f264 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -35,43 +35,37 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 namespace { + struct call_data { grpc_call_combiner* call_combiner; + // Outgoing headers to add to send_initial_metadata. grpc_linked_mdelem status; grpc_linked_mdelem content_type; - /* did this request come with path query containing request payload */ - bool seen_path_with_query; - /* flag to ensure payload_bin is delivered only once */ - bool payload_bin_delivered; + // If we see the recv_message contents in the GET query string, we + // store it here. + grpc_core::ManualConstructor read_stream; + bool have_read_stream; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; - /** Closure to call when finished with the hs_on_recv hook */ - grpc_closure* on_done_recv; - /** Closure to call when we retrieve read message from the path URI - */ - grpc_closure* recv_message_ready; - grpc_closure* on_complete; - grpc_core::OrphanablePtr* pp_recv_message; - grpc_core::ManualConstructor read_stream; + bool seen_recv_initial_metadata_ready; - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hs_on_recv; - grpc_closure hs_on_complete; - grpc_closure hs_recv_message_ready; + // State for intercepting recv_message. + grpc_closure* original_recv_message_ready; + grpc_closure recv_message_ready; + grpc_core::OrphanablePtr* recv_message; + bool seen_recv_message_ready; }; -struct channel_data { - uint8_t unused; -}; } // namespace -static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem, - grpc_metadata_batch* b) { +static grpc_error* hs_filter_outgoing_metadata(grpc_call_element* elem, + grpc_metadata_batch* b) { if (b->idx.named.grpc_message != nullptr) { grpc_slice pct_encoded_msg = grpc_percent_encode_slice( GRPC_MDVALUE(b->idx.named.grpc_message->md), @@ -86,8 +80,8 @@ static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem, return GRPC_ERROR_NONE; } -static void add_error(const char* error_name, grpc_error** cumulative, - grpc_error* new_err) { +static void hs_add_error(const char* error_name, grpc_error** cumulative, + grpc_error* new_err) { if (new_err == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); @@ -95,8 +89,8 @@ static void add_error(const char* error_name, grpc_error** cumulative, *cumulative = grpc_error_add_child(*cumulative, new_err); } -static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, - grpc_metadata_batch* b) { +static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem, + grpc_metadata_batch* b) { call_data* calld = static_cast(elem->call_data); grpc_error* error = GRPC_ERROR_NONE; static const char* error_name = "Failed processing incoming headers"; @@ -119,14 +113,14 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, *calld->recv_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } else { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.method->md)); + hs_add_error(error_name, &error, + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.method->md)); } grpc_metadata_batch_remove(b, b->idx.named.method); } else { - add_error( + hs_add_error( error_name, &error, grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), @@ -135,31 +129,31 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, if (b->idx.named.te != nullptr) { if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.te->md)); + hs_add_error(error_name, &error, + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.te->md)); } grpc_metadata_batch_remove(b, b->idx.named.te); } else { - add_error(error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te"))); + hs_add_error(error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te"))); } if (b->idx.named.scheme != nullptr) { if (!grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTP) && !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) && !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.scheme->md)); + hs_add_error(error_name, &error, + grpc_attach_md_to_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), + b->idx.named.scheme->md)); } grpc_metadata_batch_remove(b, b->idx.named.scheme); } else { - add_error( + hs_add_error( error_name, &error, grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), @@ -196,10 +190,11 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, } if (b->idx.named.path == nullptr) { - add_error(error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); + hs_add_error( + error_name, &error, + grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), + GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); } else if (*calld->recv_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { /* We have a cacheable request made with GET verb. The path contains the @@ -235,7 +230,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, GRPC_SLICE_LENGTH(query_slice), k_url_safe)); calld->read_stream.Init(&read_slice_buffer, 0); grpc_slice_buffer_destroy_internal(&read_slice_buffer); - calld->seen_path_with_query = true; + calld->have_read_stream = true; grpc_slice_unref_internal(query_slice); } else { gpr_log(GPR_ERROR, "GET request without QUERY"); @@ -246,17 +241,17 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, grpc_linked_mdelem* el = b->idx.named.host; grpc_mdelem md = GRPC_MDELEM_REF(el->md); grpc_metadata_batch_remove(b, el); - add_error(error_name, &error, - grpc_metadata_batch_add_head( - b, el, - grpc_mdelem_from_slices( - GRPC_MDSTR_AUTHORITY, - grpc_slice_ref_internal(GRPC_MDVALUE(md))))); + hs_add_error(error_name, &error, + grpc_metadata_batch_add_head( + b, el, + grpc_mdelem_from_slices( + GRPC_MDSTR_AUTHORITY, + grpc_slice_ref_internal(GRPC_MDVALUE(md))))); GRPC_MDELEM_UNREF(md); } if (b->idx.named.authority == nullptr) { - add_error( + hs_add_error( error_name, &error, grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), @@ -266,49 +261,55 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, return error; } -static void hs_on_recv(void* user_data, grpc_error* err) { +static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); + calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { - err = server_filter_incoming_metadata(elem, calld->recv_initial_metadata); + err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + if (calld->seen_recv_message_ready) { + // We've already seen the recv_message callback, but we previously + // deferred it, so we need to return it here. + // Replace the recv_message byte stream if needed. + if (calld->have_read_stream) { + calld->recv_message->reset(calld->read_stream.get()); + calld->have_read_stream = false; + } + // Re-enter call combiner for original_recv_message_ready, since the + // surface code will release the call combiner for each callback it + // receives. + GRPC_CALL_COMBINER_START( + calld->call_combiner, calld->original_recv_message_ready, + GRPC_ERROR_REF(err), + "resuming recv_message_ready from recv_initial_metadata_ready"); + } } else { GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->on_done_recv, err); -} - -static void hs_on_complete(void* user_data, grpc_error* err) { - grpc_call_element* elem = static_cast(user_data); - call_data* calld = static_cast(elem->call_data); - /* Call recv_message_ready if we got the payload via the path field */ - if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) { - calld->pp_recv_message->reset( - calld->payload_bin_delivered ? nullptr - : reinterpret_cast( - calld->read_stream.get())); - // Re-enter call combiner for recv_message_ready, since the surface - // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready, - GRPC_ERROR_REF(err), - "resuming recv_message_ready from on_complete"); - calld->recv_message_ready = nullptr; - calld->payload_bin_delivered = true; - } - GRPC_CLOSURE_RUN(calld->on_complete, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); } static void hs_recv_message_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - if (calld->seen_path_with_query) { - // Do nothing. This is probably a GET request, and payload will be - // returned in hs_on_complete callback. + calld->seen_recv_message_ready = true; + if (calld->seen_recv_initial_metadata_ready) { + // We've already seen the recv_initial_metadata callback, so + // replace the recv_message byte stream if needed and invoke the + // original recv_message callback immediately. + if (calld->have_read_stream) { + calld->recv_message->reset(calld->read_stream.get()); + calld->have_read_stream = false; + } + GRPC_CLOSURE_RUN(calld->original_recv_message_ready, GRPC_ERROR_REF(err)); + } else { + // We have not yet seen the recv_initial_metadata callback, so we + // need to wait to see if this is a GET request. // Note that we release the call combiner here, so that other // callbacks can run. - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "pausing recv_message_ready until on_complete"); - } else { - GRPC_CLOSURE_RUN(calld->recv_message_ready, GRPC_ERROR_REF(err)); + GRPC_CALL_COMBINER_STOP( + calld->call_combiner, + "pausing recv_message_ready until recv_initial_metadata_ready"); } } @@ -320,18 +321,18 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, if (op->send_initial_metadata) { grpc_error* error = GRPC_ERROR_NONE; static const char* error_name = "Failed sending initial metadata"; - add_error(error_name, &error, - grpc_metadata_batch_add_head( - op->payload->send_initial_metadata.send_initial_metadata, - &calld->status, GRPC_MDELEM_STATUS_200)); - add_error(error_name, &error, - grpc_metadata_batch_add_tail( - op->payload->send_initial_metadata.send_initial_metadata, - &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); - add_error( + hs_add_error(error_name, &error, + grpc_metadata_batch_add_head( + op->payload->send_initial_metadata.send_initial_metadata, + &calld->status, GRPC_MDELEM_STATUS_200)); + hs_add_error(error_name, &error, + grpc_metadata_batch_add_tail( + op->payload->send_initial_metadata.send_initial_metadata, + &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); + hs_add_error( error_name, &error, - server_filter_outgoing_metadata( + hs_filter_outgoing_metadata( elem, op->payload->send_initial_metadata.send_initial_metadata)); if (error != GRPC_ERROR_NONE) return error; } @@ -343,27 +344,21 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_initial_metadata.recv_initial_metadata; calld->recv_initial_metadata_flags = op->payload->recv_initial_metadata.recv_flags; - calld->on_done_recv = + calld->original_recv_initial_metadata_ready = op->payload->recv_initial_metadata.recv_initial_metadata_ready; op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hs_on_recv; + &calld->recv_initial_metadata_ready; } if (op->recv_message) { - calld->recv_message_ready = op->payload->recv_message.recv_message_ready; - calld->pp_recv_message = op->payload->recv_message.recv_message; - if (op->payload->recv_message.recv_message_ready) { - op->payload->recv_message.recv_message_ready = - &calld->hs_recv_message_ready; - } - if (op->on_complete) { - calld->on_complete = op->on_complete; - op->on_complete = &calld->hs_on_complete; - } + calld->recv_message = op->payload->recv_message.recv_message; + calld->original_recv_message_ready = + op->payload->recv_message.recv_message_ready; + op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } if (op->send_trailing_metadata) { - grpc_error* error = server_filter_outgoing_metadata( + grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); if (error != GRPC_ERROR_NONE) return error; } @@ -385,50 +380,47 @@ static void hs_start_transport_stream_op_batch( } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { - /* grab pointers to our data from the call element */ +static grpc_error* hs_init_call_elem(grpc_call_element* elem, + const grpc_call_element_args* args) { call_data* calld = static_cast(elem->call_data); - /* initialize members */ calld->call_combiner = args->call_combiner; - GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + hs_recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, + GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* ignored) { +static void hs_destroy_call_elem(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); - if (calld->seen_path_with_query && !calld->payload_bin_delivered) { + if (calld->have_read_stream) { calld->read_stream->Orphan(); } } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { +static grpc_error* hs_init_channel_elem(grpc_channel_element* elem, + grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void hs_destroy_channel_elem(grpc_channel_element* elem) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), - init_call_elem, + hs_init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, + hs_destroy_call_elem, + 0, + hs_init_channel_elem, + hs_destroy_channel_elem, grpc_channel_next_get_info, "http-server"};