diff --git a/Makefile b/Makefile index cdf9605342d..788d076b89e 100644 --- a/Makefile +++ b/Makefile @@ -2176,6 +2176,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/bm_error || ( echo test bm_error failed ; exit 1 ) $(E) "[RUN] Testing bm_fullstack_streaming_ping_pong" $(Q) $(BINDIR)/$(CONFIG)/bm_fullstack_streaming_ping_pong || ( echo test bm_fullstack_streaming_ping_pong failed ; exit 1 ) + $(E) "[RUN] Testing bm_fullstack_streaming_pump" + $(Q) $(BINDIR)/$(CONFIG)/bm_fullstack_streaming_pump || ( echo test bm_fullstack_streaming_pump failed ; exit 1 ) $(E) "[RUN] Testing bm_fullstack_unary_ping_pong" $(Q) $(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong || ( echo test bm_fullstack_unary_ping_pong failed ; exit 1 ) $(E) "[RUN] Testing bm_metadata" diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 38285bcdc2b..56b0b288ad6 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5133,7 +5133,6 @@ targets: - posix - name: bm_fullstack_streaming_pump build: test - run: false language: c++ headers: - test/cpp/microbenchmarks/fullstack_streaming_pump.h diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 27f0333bee4..09c914fab07 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -21,6 +21,8 @@ #include #include +#include "absl/types/optional.h" + #include #include #include @@ -40,94 +42,156 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -static void start_send_message_batch(void* arg, grpc_error* unused); -static void send_message_on_complete(void* arg, grpc_error* error); -static void on_send_message_next_done(void* arg, grpc_error* error); - namespace { -struct channel_data { +class ChannelData { + public: + explicit ChannelData(grpc_channel_element_args* args) { + // Get the enabled and the default algorithms from channel args. + enabled_compression_algorithms_bitset_ = + grpc_channel_args_compression_algorithm_get_states(args->channel_args); + default_compression_algorithm_ = + grpc_channel_args_get_channel_default_compression_algorithm( + args->channel_args); + // Make sure the default is enabled. + if (!GPR_BITGET(enabled_compression_algorithms_bitset_, + default_compression_algorithm_)) { + const char* name; + GPR_ASSERT(grpc_compression_algorithm_name(default_compression_algorithm_, + &name) == 1); + gpr_log(GPR_ERROR, + "default compression algorithm %s not enabled: switching to none", + name); + default_compression_algorithm_ = GRPC_COMPRESS_NONE; + } + enabled_message_compression_algorithms_bitset_ = + grpc_compression_bitset_to_message_bitset( + enabled_compression_algorithms_bitset_); + enabled_stream_compression_algorithms_bitset_ = + grpc_compression_bitset_to_stream_bitset( + enabled_compression_algorithms_bitset_); + GPR_ASSERT(!args->is_last); + } + + grpc_compression_algorithm default_compression_algorithm() const { + return default_compression_algorithm_; + } + + uint32_t enabled_compression_algorithms_bitset() const { + return enabled_compression_algorithms_bitset_; + } + + uint32_t enabled_message_compression_algorithms_bitset() const { + return enabled_message_compression_algorithms_bitset_; + } + + uint32_t enabled_stream_compression_algorithms_bitset() const { + return enabled_stream_compression_algorithms_bitset_; + } + + private: /** The default, channel-level, compression algorithm */ - grpc_compression_algorithm default_compression_algorithm; + grpc_compression_algorithm default_compression_algorithm_; /** Bitset of enabled compression algorithms */ - uint32_t enabled_compression_algorithms_bitset; + uint32_t enabled_compression_algorithms_bitset_; /** Bitset of enabled message compression algorithms */ - uint32_t enabled_message_compression_algorithms_bitset; + uint32_t enabled_message_compression_algorithms_bitset_; /** Bitset of enabled stream compression algorithms */ - uint32_t enabled_stream_compression_algorithms_bitset; + uint32_t enabled_stream_compression_algorithms_bitset_; }; -struct call_data { - call_data(grpc_call_element* elem, const grpc_call_element_args& args) - : call_combiner(args.call_combiner) { - channel_data* channeld = static_cast(elem->channel_data); +class CallData { + public: + CallData(grpc_call_element* elem, const grpc_call_element_args& args) + : call_combiner_(args.call_combiner) { + ChannelData* channeld = static_cast(elem->channel_data); // The call's message compression algorithm is set to channel's default // setting. It can be overridden later by initial metadata. - if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset, - channeld->default_compression_algorithm))) { - message_compression_algorithm = + if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset(), + channeld->default_compression_algorithm()))) { + message_compression_algorithm_ = grpc_compression_algorithm_to_message_compression_algorithm( - channeld->default_compression_algorithm); + channeld->default_compression_algorithm()); } - GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner, - start_send_message_batch, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner_, + StartSendMessageBatch, elem, grpc_schedule_on_exec_ctx); } - ~call_data() { - if (state_initialized) { - grpc_slice_buffer_destroy_internal(&slices); + ~CallData() { + if (state_initialized_) { + grpc_slice_buffer_destroy_internal(&slices_); } - GRPC_ERROR_UNREF(cancel_error); + GRPC_ERROR_UNREF(cancel_error_); } - grpc_core::CallCombiner* call_combiner; - grpc_message_compression_algorithm message_compression_algorithm = + void CompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + + private: + bool SkipMessageCompression(); + void InitializeState(grpc_call_element* elem); + + grpc_error* ProcessSendInitialMetadata(grpc_call_element* elem, + grpc_metadata_batch* initial_metadata); + + // Methods for processing a send_message batch + static void StartSendMessageBatch(void* elem_arg, grpc_error* unused); + static void OnSendMessageNextDone(void* elem_arg, grpc_error* error); + grpc_error* PullSliceFromSendMessage(); + void ContinueReadingSendMessage(grpc_call_element* elem); + void FinishSendMessage(grpc_call_element* elem); + void SendMessageBatchContinue(grpc_call_element* elem); + static void FailSendMessageBatchInCallCombiner(void* calld_arg, + grpc_error* error); + + static void SendMessageOnComplete(void* calld_arg, grpc_error* error); + + grpc_core::CallCombiner* call_combiner_; + grpc_message_compression_algorithm message_compression_algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; - grpc_error* cancel_error = GRPC_ERROR_NONE; - grpc_transport_stream_op_batch* send_message_batch = nullptr; - bool seen_initial_metadata = false; + grpc_error* cancel_error_ = GRPC_ERROR_NONE; + grpc_transport_stream_op_batch* send_message_batch_ = nullptr; + bool seen_initial_metadata_ = false; /* Set to true, if the fields below are initialized. */ - bool state_initialized = false; - grpc_closure start_send_message_batch_in_call_combiner; + bool state_initialized_ = false; + grpc_closure start_send_message_batch_in_call_combiner_; /* The fields below are only initialized when we compress the payload. * Keep them at the bottom of the struct, so they don't pollute the * cache-lines. */ - grpc_linked_mdelem message_compression_algorithm_storage; - grpc_linked_mdelem stream_compression_algorithm_storage; - grpc_linked_mdelem accept_encoding_storage; - grpc_linked_mdelem accept_stream_encoding_storage; - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ - grpc_core::ManualConstructor - replacement_stream; - grpc_closure* original_send_message_on_complete; - grpc_closure send_message_on_complete; - grpc_closure on_send_message_next_done; + grpc_linked_mdelem message_compression_algorithm_storage_; + grpc_linked_mdelem stream_compression_algorithm_storage_; + grpc_linked_mdelem accept_encoding_storage_; + grpc_linked_mdelem accept_stream_encoding_storage_; + grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */ + // Allocate space for the replacement stream + std::aligned_storage::type + replacement_stream_; + grpc_closure* original_send_message_on_complete_ = nullptr; + grpc_closure send_message_on_complete_; + grpc_closure on_send_message_next_done_; }; -} // namespace - // Returns true if we should skip message compression for the current message. -static bool skip_message_compression(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); +bool CallData::SkipMessageCompression() { // If the flags of this message indicate that it shouldn't be compressed, we // skip message compression. uint32_t flags = - calld->send_message_batch->payload->send_message.send_message->flags(); + send_message_batch_->payload->send_message.send_message->flags(); if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { return true; } // If this call doesn't have any message compression algorithm set, skip // message compression. - return calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE; + return message_compression_algorithm_ == GRPC_MESSAGE_COMPRESS_NONE; } // Determines the compression algorithm from the initial metadata and the // channel's default setting. -static grpc_compression_algorithm find_compression_algorithm( - grpc_metadata_batch* initial_metadata, channel_data* channeld) { +grpc_compression_algorithm FindCompressionAlgorithm( + grpc_metadata_batch* initial_metadata, ChannelData* channeld) { if (initial_metadata->idx.named.grpc_internal_encoding_request == nullptr) { - return channeld->default_compression_algorithm; + return channeld->default_compression_algorithm(); } grpc_compression_algorithm compression_algorithm; // Parse the compression algorithm from the initial metadata. @@ -143,7 +207,7 @@ static grpc_compression_algorithm find_compression_algorithm( // enabled. // TODO(juanlishen): Maybe use channel default or abort() if the algorithm // from the initial metadata is disabled. - if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset, + if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset(), compression_algorithm))) { return compression_algorithm; } @@ -158,30 +222,24 @@ static grpc_compression_algorithm find_compression_algorithm( return GRPC_COMPRESS_NONE; } -static void initialize_state(grpc_call_element* elem, call_data* calld) { - GPR_DEBUG_ASSERT(!calld->state_initialized); - calld->state_initialized = true; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->send_message_on_complete, - ::send_message_on_complete, elem, +void CallData::InitializeState(grpc_call_element* elem) { + GPR_DEBUG_ASSERT(!state_initialized_); + state_initialized_ = true; + grpc_slice_buffer_init(&slices_); + GRPC_CLOSURE_INIT(&send_message_on_complete_, SendMessageOnComplete, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, - ::on_send_message_next_done, elem, + GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, elem, grpc_schedule_on_exec_ctx); } -static grpc_error* process_send_initial_metadata( - grpc_call_element* elem, - grpc_metadata_batch* initial_metadata) GRPC_MUST_USE_RESULT; -static grpc_error* process_send_initial_metadata( +grpc_error* CallData::ProcessSendInitialMetadata( grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { - call_data* calld = static_cast(elem->call_data); - channel_data* channeld = static_cast(elem->channel_data); + ChannelData* channeld = static_cast(elem->channel_data); // Find the compression algorithm. grpc_compression_algorithm compression_algorithm = - find_compression_algorithm(initial_metadata, channeld); + FindCompressionAlgorithm(initial_metadata, channeld); // Note that at most one of the following algorithms can be set. - calld->message_compression_algorithm = + message_compression_algorithm_ = grpc_compression_algorithm_to_message_compression_algorithm( compression_algorithm); grpc_stream_compression_algorithm stream_compression_algorithm = @@ -189,321 +247,300 @@ static grpc_error* process_send_initial_metadata( compression_algorithm); // Hint compression algorithm. grpc_error* error = GRPC_ERROR_NONE; - if (calld->message_compression_algorithm != GRPC_MESSAGE_COMPRESS_NONE) { - initialize_state(elem, calld); + if (message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { + InitializeState(elem); error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->message_compression_algorithm_storage, + initial_metadata, &message_compression_algorithm_storage_, grpc_message_compression_encoding_mdelem( - calld->message_compression_algorithm), + message_compression_algorithm_), GRPC_BATCH_GRPC_ENCODING); } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { - initialize_state(elem, calld); + InitializeState(elem); error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->stream_compression_algorithm_storage, + initial_metadata, &stream_compression_algorithm_storage_, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm), GRPC_BATCH_CONTENT_ENCODING); } if (error != GRPC_ERROR_NONE) return error; // Convey supported compression algorithms. error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_encoding_storage, + initial_metadata, &accept_encoding_storage_, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( - channeld->enabled_message_compression_algorithms_bitset), + channeld->enabled_message_compression_algorithms_bitset()), GRPC_BATCH_GRPC_ACCEPT_ENCODING); if (error != GRPC_ERROR_NONE) return error; // Do not overwrite accept-encoding header if it already presents (e.g., added // by some proxy). if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_stream_encoding_storage, + initial_metadata, &accept_stream_encoding_storage_, GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( - channeld->enabled_stream_compression_algorithms_bitset), + channeld->enabled_stream_compression_algorithms_bitset()), GRPC_BATCH_ACCEPT_ENCODING); } return error; } -static void send_message_on_complete(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - call_data* calld = static_cast(elem->call_data); - grpc_slice_buffer_reset_and_unref_internal(&calld->slices); +void CallData::SendMessageOnComplete(void* calld_arg, grpc_error* error) { + CallData* calld = static_cast(calld_arg); + grpc_slice_buffer_reset_and_unref_internal(&calld->slices_); grpc_core::Closure::Run(DEBUG_LOCATION, - calld->original_send_message_on_complete, + calld->original_send_message_on_complete_, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); +void CallData::SendMessageBatchContinue(grpc_call_element* elem) { // Note: The call to grpc_call_next_op() results in yielding the - // call combiner, so we need to clear calld->send_message_batch - // before we do that. - grpc_transport_stream_op_batch* send_message_batch = - calld->send_message_batch; - calld->send_message_batch = nullptr; + // call combiner, so we need to clear send_message_batch_ before we do that. + grpc_transport_stream_op_batch* send_message_batch = send_message_batch_; + send_message_batch_ = nullptr; grpc_call_next_op(elem, send_message_batch); } -static void finish_send_message(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); - GPR_DEBUG_ASSERT(calld->message_compression_algorithm != +void CallData::FinishSendMessage(grpc_call_element* elem) { + GPR_DEBUG_ASSERT(message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE); // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = - calld->send_message_batch->payload->send_message.send_message->flags(); - bool did_compress = grpc_msg_compress(calld->message_compression_algorithm, - &calld->slices, &tmp); + send_message_batch_->payload->send_message.send_message->flags(); + bool did_compress = + grpc_msg_compress(message_compression_algorithm_, &slices_, &tmp); if (did_compress) { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { const char* algo_name; - const size_t before_size = calld->slices.length; + const size_t before_size = slices_.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - static_cast(after_size) / static_cast(before_size); GPR_ASSERT(grpc_message_compression_algorithm_name( - calld->message_compression_algorithm, &algo_name)); + message_compression_algorithm_, &algo_name)); gpr_log(GPR_INFO, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } - grpc_slice_buffer_swap(&calld->slices, &tmp); + grpc_slice_buffer_swap(&slices_, &tmp); send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { const char* algo_name; GPR_ASSERT(grpc_message_compression_algorithm_name( - calld->message_compression_algorithm, &algo_name)); + message_compression_algorithm_, &algo_name)); gpr_log(GPR_INFO, "Algorithm '%s' enabled but decided not to compress. Input size: " "%" PRIuPTR, - algo_name, calld->slices.length); + algo_name, slices_.length); } } grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - calld->replacement_stream.Init(&calld->slices, send_flags); - calld->send_message_batch->payload->send_message.send_message.reset( - calld->replacement_stream.get()); - calld->original_send_message_on_complete = - calld->send_message_batch->on_complete; - calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(elem); + new (&replacement_stream_) + grpc_core::SliceBufferByteStream(&slices_, send_flags); + send_message_batch_->payload->send_message.send_message.reset( + reinterpret_cast( + &replacement_stream_)); + original_send_message_on_complete_ = send_message_batch_->on_complete; + send_message_batch_->on_complete = &send_message_on_complete_; + SendMessageBatchContinue(elem); } -static void fail_send_message_batch_in_call_combiner(void* arg, - grpc_error* error) { - call_data* calld = static_cast(arg); - if (calld->send_message_batch != nullptr) { +void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg, + grpc_error* error) { + CallData* calld = static_cast(calld_arg); + if (calld->send_message_batch_ != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); - calld->send_message_batch = nullptr; + calld->send_message_batch_, GRPC_ERROR_REF(error), + calld->call_combiner_); + calld->send_message_batch_ = nullptr; } } -// Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error* pull_slice_from_send_message(call_data* calld) { +// Pulls a slice from the send_message byte stream and adds it to slices_. +grpc_error* CallData::PullSliceFromSendMessage() { grpc_slice incoming_slice; grpc_error* error = - calld->send_message_batch->payload->send_message.send_message->Pull( + send_message_batch_->payload->send_message.send_message->Pull( &incoming_slice); if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&calld->slices, incoming_slice); + grpc_slice_buffer_add(&slices_, incoming_slice); } return error; } // Reads as many slices as possible from the send_message byte stream. -// If all data has been read, invokes finish_send_message(). Otherwise, +// If all data has been read, invokes FinishSendMessage(). Otherwise, // an async call to ByteStream::Next() has been started, which will -// eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length()) { - finish_send_message(elem); +// eventually result in calling OnSendMessageNextDone(). +void CallData::ContinueReadingSendMessage(grpc_call_element* elem) { + if (slices_.length == + send_message_batch_->payload->send_message.send_message->length()) { + FinishSendMessage(elem); return; } - while (calld->send_message_batch->payload->send_message.send_message->Next( - ~static_cast(0), &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(calld); + while (send_message_batch_->payload->send_message.send_message->Next( + ~static_cast(0), &on_send_message_next_done_)) { + grpc_error* error = PullSliceFromSendMessage(); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(this, error); GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == calld->send_message_batch->payload->send_message - .send_message->length()) { - finish_send_message(elem); + if (slices_.length == + send_message_batch_->payload->send_message.send_message->length()) { + FinishSendMessage(elem); break; } } } // Async callback for ByteStream::Next(). -static void on_send_message_next_done(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - call_data* calld = static_cast(elem->call_data); +void CallData::OnSendMessageNextDone(void* elem_arg, grpc_error* error) { + grpc_call_element* elem = static_cast(elem_arg); + CallData* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(calld, error); return; } - error = pull_slice_from_send_message(calld); + error = calld->PullSliceFromSendMessage(); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(calld, error); GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length()) { - finish_send_message(elem); + if (calld->slices_.length == calld->send_message_batch_->payload->send_message + .send_message->length()) { + calld->FinishSendMessage(elem); } else { - continue_reading_send_message(elem); + calld->ContinueReadingSendMessage(elem); } } -static void start_send_message_batch(void* arg, grpc_error* /*unused*/) { - grpc_call_element* elem = static_cast(arg); - if (skip_message_compression(elem)) { - send_message_batch_continue(elem); +void CallData::StartSendMessageBatch(void* elem_arg, grpc_error* /*unused*/) { + grpc_call_element* elem = static_cast(elem_arg); + CallData* calld = static_cast(elem->call_data); + if (calld->SkipMessageCompression()) { + calld->SendMessageBatchContinue(elem); } else { - continue_reading_send_message(elem); + calld->ContinueReadingSendMessage(elem); } } -static void compress_start_transport_stream_op_batch( +void CallData::CompressStartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0); - call_data* calld = static_cast(elem->call_data); // Handle cancel_stream. if (batch->cancel_stream) { - GRPC_ERROR_UNREF(calld->cancel_error); - calld->cancel_error = - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (calld->send_message_batch != nullptr) { - if (!calld->seen_initial_metadata) { + GRPC_ERROR_UNREF(cancel_error_); + cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + if (send_message_batch_ != nullptr) { + if (!seen_initial_metadata_) { GRPC_CALL_COMBINER_START( - calld->call_combiner, - GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, + call_combiner_, + GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this, grpc_schedule_on_exec_ctx), - GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); + GRPC_ERROR_REF(cancel_error_), "failing send_message op"); } else { - calld->send_message_batch->payload->send_message.send_message->Shutdown( - GRPC_ERROR_REF(calld->cancel_error)); + send_message_batch_->payload->send_message.send_message->Shutdown( + GRPC_ERROR_REF(cancel_error_)); } } - } else if (calld->cancel_error != GRPC_ERROR_NONE) { + } else if (cancel_error_ != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); + batch, GRPC_ERROR_REF(cancel_error_), call_combiner_); return; } // Handle send_initial_metadata. if (batch->send_initial_metadata) { - GPR_ASSERT(!calld->seen_initial_metadata); - grpc_error* error = process_send_initial_metadata( + GPR_ASSERT(!seen_initial_metadata_); + grpc_error* error = ProcessSendInitialMetadata( elem, batch->payload->send_initial_metadata.send_initial_metadata); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure(batch, error, - calld->call_combiner); + call_combiner_); return; } - calld->seen_initial_metadata = true; + seen_initial_metadata_ = true; // If we had previously received a batch containing a send_message op, // handle it now. Note that we need to re-enter the call combiner // for this, since we can't send two batches down while holding the // call combiner, since the connected_channel filter (at the bottom of // the call stack) will release the call combiner for each batch it sees. - if (calld->send_message_batch != nullptr) { + if (send_message_batch_ != nullptr) { GRPC_CALL_COMBINER_START( - calld->call_combiner, - &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, - "starting send_message after send_initial_metadata"); + call_combiner_, &start_send_message_batch_in_call_combiner_, + GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } } // Handle send_message. if (batch->send_message) { - GPR_ASSERT(calld->send_message_batch == nullptr); - calld->send_message_batch = batch; + GPR_ASSERT(send_message_batch_ == nullptr); + send_message_batch_ = batch; // If we have not yet seen send_initial_metadata, then we have to - // wait. We save the batch in calld and then drop the call - // combiner, which we'll have to pick up again later when we get - // send_initial_metadata. - if (!calld->seen_initial_metadata) { + // wait. We save the batch and then drop the call combiner, which we'll + // have to pick up again later when we get send_initial_metadata. + if (!seen_initial_metadata_) { GRPC_CALL_COMBINER_STOP( - calld->call_combiner, - "send_message batch pending send_initial_metadata"); + call_combiner_, "send_message batch pending send_initial_metadata"); return; } - start_send_message_batch(elem, GRPC_ERROR_NONE); + StartSendMessageBatch(elem, GRPC_ERROR_NONE); } else { // Pass control down the stack. grpc_call_next_op(elem, batch); } } +void CompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + CallData* calld = static_cast(elem->call_data); + calld->CompressStartTransportStreamOpBatch(elem, batch); +} + /* Constructor for call_data */ -static grpc_error* compress_init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { - new (elem->call_data) call_data(elem, *args); +grpc_error* CompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(elem, *args); return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void compress_destroy_call_elem( - grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - call_data* calld = static_cast(elem->call_data); - calld->~call_data(); +void CompressDestroyCallElem(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + CallData* calld = static_cast(elem->call_data); + calld->~CallData(); } -/* Constructor for channel_data */ -static grpc_error* compress_init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - channel_data* channeld = static_cast(elem->channel_data); - // Get the enabled and the default algorithms from channel args. - channeld->enabled_compression_algorithms_bitset = - grpc_channel_args_compression_algorithm_get_states(args->channel_args); - channeld->default_compression_algorithm = - grpc_channel_args_get_channel_default_compression_algorithm( - args->channel_args); - // Make sure the default is enabled. - if (!GPR_BITGET(channeld->enabled_compression_algorithms_bitset, - channeld->default_compression_algorithm)) { - const char* name; - GPR_ASSERT(grpc_compression_algorithm_name( - channeld->default_compression_algorithm, &name) == 1); - gpr_log(GPR_ERROR, - "default compression algorithm %s not enabled: switching to none", - name); - channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; - } - channeld->enabled_message_compression_algorithms_bitset = - grpc_compression_bitset_to_message_bitset( - channeld->enabled_compression_algorithms_bitset); - channeld->enabled_stream_compression_algorithms_bitset = - grpc_compression_bitset_to_stream_bitset( - channeld->enabled_compression_algorithms_bitset); - GPR_ASSERT(!args->is_last); +/* Constructor for ChannelData */ +grpc_error* CompressInitChannelElem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + new (elem->channel_data) ChannelData(args); return GRPC_ERROR_NONE; } /* Destructor for channel data */ -static void compress_destroy_channel_elem(grpc_channel_element* /*elem*/) {} +void CompressDestroyChannelElem(grpc_channel_element* elem) { + ChannelData* channeld = static_cast(elem->channel_data); + channeld->~ChannelData(); +} + +} // namespace const grpc_channel_filter grpc_message_compress_filter = { - compress_start_transport_stream_op_batch, + CompressStartTransportStreamOpBatch, grpc_channel_next_op, - sizeof(call_data), - compress_init_call_elem, + sizeof(CallData), + CompressInitCallElem, grpc_call_stack_ignore_set_pollset_or_pollset_set, - compress_destroy_call_elem, - sizeof(channel_data), - compress_init_channel_elem, - compress_destroy_channel_elem, + CompressDestroyCallElem, + sizeof(ChannelData), + CompressInitChannelElem, + CompressDestroyChannelElem, grpc_channel_next_get_info, "message_compress"}; diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index d53475a1b61..babe564d39d 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -284,8 +284,8 @@ void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ - if (max_size_hint >= UINT32_MAX - sent_init_window) { - max_recv_bytes = UINT32_MAX - sent_init_window; + if (max_size_hint >= kMaxWindowUpdateSize - sent_init_window) { + max_recv_bytes = kMaxWindowUpdateSize - sent_init_window; } else { max_recv_bytes = static_cast(max_size_hint); } @@ -298,7 +298,7 @@ void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, } /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); + GPR_DEBUG_ASSERT(max_recv_bytes <= kMaxWindowUpdateSize - sent_init_window); if (local_window_delta_ < max_recv_bytes) { uint32_t add_max_recv_bytes = static_cast(max_recv_bytes - local_window_delta_); diff --git a/src/core/lib/security/security_connector/alts/alts_security_connector.cc b/src/core/lib/security/security_connector/alts/alts_security_connector.cc index 1274edb6e6e..6bfe6ea74c3 100644 --- a/src/core/lib/security/security_connector/alts/alts_security_connector.cc +++ b/src/core/lib/security/security_connector/alts/alts_security_connector.cc @@ -82,10 +82,17 @@ class grpc_alts_channel_security_connector final tsi_handshaker* handshaker = nullptr; const grpc_alts_credentials* creds = static_cast(channel_creds()); - GPR_ASSERT(alts_tsi_handshaker_create(creds->options(), target_name_, - creds->handshaker_service_url(), true, - interested_parties, - &handshaker) == TSI_OK); + size_t user_specified_max_frame_size = 0; + const grpc_arg* arg = + grpc_channel_args_find(args, GRPC_ARG_TSI_MAX_FRAME_SIZE); + if (arg != nullptr && arg->type == GRPC_ARG_INTEGER) { + user_specified_max_frame_size = grpc_channel_arg_get_integer( + arg, {0, 0, std::numeric_limits::max()}); + } + GPR_ASSERT(alts_tsi_handshaker_create( + creds->options(), target_name_, + creds->handshaker_service_url(), true, interested_parties, + &handshaker, user_specified_max_frame_size) == TSI_OK); handshake_manager->Add( grpc_core::SecurityHandshakerCreate(handshaker, this, args)); } @@ -140,9 +147,17 @@ class grpc_alts_server_security_connector final tsi_handshaker* handshaker = nullptr; const grpc_alts_server_credentials* creds = static_cast(server_creds()); + size_t user_specified_max_frame_size = 0; + const grpc_arg* arg = + grpc_channel_args_find(args, GRPC_ARG_TSI_MAX_FRAME_SIZE); + if (arg != nullptr && arg->type == GRPC_ARG_INTEGER) { + user_specified_max_frame_size = grpc_channel_arg_get_integer( + arg, {0, 0, std::numeric_limits::max()}); + } GPR_ASSERT(alts_tsi_handshaker_create( creds->options(), nullptr, creds->handshaker_service_url(), - false, interested_parties, &handshaker) == TSI_OK); + false, interested_parties, &handshaker, + user_specified_max_frame_size) == TSI_OK); handshake_manager->Add( grpc_core::SecurityHandshakerCreate(handshaker, this, args)); } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index 2592763e5a2..61927276195 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -102,6 +102,8 @@ typedef struct alts_grpc_handshaker_client { bool receive_status_finished; /* if non-null, contains arguments to complete a TSI next callback. */ recv_message_result* pending_recv_message_result; + /* Maximum frame size used by frame protector. */ + size_t max_frame_size; } alts_grpc_handshaker_client; static void handshaker_client_send_buffer_destroy( @@ -506,6 +508,8 @@ static grpc_byte_buffer* get_serialized_start_client( upb_strview_makez(ptr->data)); ptr = ptr->next; } + grpc_gcp_StartClientHandshakeReq_set_max_frame_size( + start_client, static_cast(client->max_frame_size)); return get_serialized_handshaker_req(req, arena.ptr()); } @@ -565,6 +569,8 @@ static grpc_byte_buffer* get_serialized_start_server( arena.ptr()); grpc_gcp_RpcProtocolVersions_assign_from_struct( server_version, arena.ptr(), &client->options->rpc_versions); + grpc_gcp_StartServerHandshakeReq_set_max_frame_size( + start_server, static_cast(client->max_frame_size)); return get_serialized_handshaker_req(req, arena.ptr()); } @@ -674,7 +680,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( grpc_alts_credentials_options* options, const grpc_slice& target_name, grpc_iomgr_cb_func grpc_cb, tsi_handshaker_on_next_done_cb cb, void* user_data, alts_handshaker_client_vtable* vtable_for_testing, - bool is_client) { + bool is_client, size_t max_frame_size) { if (channel == nullptr || handshaker_service_url == nullptr) { gpr_log(GPR_ERROR, "Invalid arguments to alts_handshaker_client_create()"); return nullptr; @@ -694,6 +700,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( client->recv_bytes = grpc_empty_slice(); grpc_metadata_array_init(&client->recv_initial_metadata); client->is_client = is_client; + client->max_frame_size = max_frame_size; client->buffer_size = TSI_ALTS_INITIAL_BUFFER_SIZE; client->buffer = static_cast(gpr_zalloc(client->buffer_size)); grpc_slice slice = grpc_slice_from_copied_string(handshaker_service_url); diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.h b/src/core/tsi/alts/handshaker/alts_handshaker_client.h index 319a23c88c7..d8669da01cb 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.h +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.h @@ -117,7 +117,7 @@ void alts_handshaker_client_destroy(alts_handshaker_client* client); * This method creates an ALTS handshaker client. * * - handshaker: ALTS TSI handshaker to which the created handshaker client - * belongs to. + * belongs to. * - channel: grpc channel to ALTS handshaker service. * - handshaker_service_url: address of ALTS handshaker service in the format of * "host:port". @@ -132,8 +132,12 @@ void alts_handshaker_client_destroy(alts_handshaker_client* client); * - vtable_for_testing: ALTS handshaker client vtable instance used for * testing purpose. * - is_client: a boolean value indicating if the created handshaker client is - * used at the client (is_client = true) or server (is_client = false) side. It - * returns the created ALTS handshaker client on success, and NULL on failure. + * used at the client (is_client = true) or server (is_client = false) side. + * - max_frame_size: Maximum frame size used by frame protector (User specified + * maximum frame size if present or default max frame size). + * + * It returns the created ALTS handshaker client on success, and NULL + * on failure. */ alts_handshaker_client* alts_grpc_handshaker_client_create( alts_tsi_handshaker* handshaker, grpc_channel* channel, @@ -141,7 +145,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( grpc_alts_credentials_options* options, const grpc_slice& target_name, grpc_iomgr_cb_func grpc_cb, tsi_handshaker_on_next_done_cb cb, void* user_data, alts_handshaker_client_vtable* vtable_for_testing, - bool is_client); + bool is_client, size_t max_frame_size); /** * This method handles handshaker response returned from ALTS handshaker diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 0c700306d8f..2a925182d3f 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -63,6 +63,8 @@ struct alts_tsi_handshaker { // shutdown effectively follows base.handshake_shutdown, // but is synchronized by the mutex of this object. bool shutdown; + // Maximum frame size used by frame protector. + size_t max_frame_size; }; /* Main struct for ALTS TSI handshaker result. */ @@ -75,6 +77,8 @@ typedef struct alts_tsi_handshaker_result { grpc_slice rpc_versions; bool is_client; grpc_slice serialized_context; + // Peer's maximum frame size. + size_t max_frame_size; } alts_tsi_handshaker_result; static tsi_result handshaker_result_extract_peer( @@ -156,6 +160,26 @@ static tsi_result handshaker_result_create_zero_copy_grpc_protector( alts_tsi_handshaker_result* result = reinterpret_cast( const_cast(self)); + + // In case the peer does not send max frame size (e.g. peer is gRPC Go or + // peer uses an old binary), the negotiated frame size is set to + // kTsiAltsMinFrameSize (ignoring max_output_protected_frame_size value if + // present). Otherwise, it is based on peer and user specified max frame + // size (if present). + size_t max_frame_size = kTsiAltsMinFrameSize; + if (result->max_frame_size) { + size_t peer_max_frame_size = result->max_frame_size; + max_frame_size = std::min(peer_max_frame_size, + max_output_protected_frame_size == nullptr + ? kTsiAltsMaxFrameSize + : *max_output_protected_frame_size); + max_frame_size = std::max(max_frame_size, kTsiAltsMinFrameSize); + } + max_output_protected_frame_size = &max_frame_size; + gpr_log(GPR_DEBUG, + "After Frame Size Negotiation, maximum frame size used by frame " + "protector equals %zu", + *max_output_protected_frame_size); tsi_result ok = alts_zero_copy_grpc_protector_create( reinterpret_cast(result->key_data), kAltsAes128GcmRekeyKeyLength, /*is_rekey=*/true, result->is_client, @@ -288,6 +312,7 @@ tsi_result alts_tsi_handshaker_result_create(grpc_gcp_HandshakerResp* resp, static_cast(gpr_zalloc(peer_service_account.size + 1)); memcpy(result->peer_identity, peer_service_account.data, peer_service_account.size); + result->max_frame_size = grpc_gcp_HandshakerResult_max_frame_size(hresult); upb::Arena rpc_versions_arena; bool serialized = grpc_gcp_rpc_protocol_versions_encode( peer_rpc_version, rpc_versions_arena.ptr(), &result->rpc_versions); @@ -374,7 +399,8 @@ static tsi_result alts_tsi_handshaker_continue_handshaker_next( handshaker, channel, handshaker->handshaker_service_url, handshaker->interested_parties, handshaker->options, handshaker->target_name, grpc_cb, cb, user_data, - handshaker->client_vtable_for_testing, handshaker->is_client); + handshaker->client_vtable_for_testing, handshaker->is_client, + handshaker->max_frame_size); if (client == nullptr) { gpr_log(GPR_ERROR, "Failed to create ALTS handshaker client"); return TSI_FAILED_PRECONDITION; @@ -570,7 +596,8 @@ bool alts_tsi_handshaker_has_shutdown(alts_tsi_handshaker* handshaker) { tsi_result alts_tsi_handshaker_create( const grpc_alts_credentials_options* options, const char* target_name, const char* handshaker_service_url, bool is_client, - grpc_pollset_set* interested_parties, tsi_handshaker** self) { + grpc_pollset_set* interested_parties, tsi_handshaker** self, + size_t user_specified_max_frame_size) { if (handshaker_service_url == nullptr || self == nullptr || options == nullptr || (is_client && target_name == nullptr)) { gpr_log(GPR_ERROR, "Invalid arguments to alts_tsi_handshaker_create()"); @@ -590,6 +617,9 @@ tsi_result alts_tsi_handshaker_create( handshaker->has_created_handshaker_client = false; handshaker->handshaker_service_url = gpr_strdup(handshaker_service_url); handshaker->options = grpc_alts_credentials_options_copy(options); + handshaker->max_frame_size = user_specified_max_frame_size != 0 + ? user_specified_max_frame_size + : kTsiAltsMaxFrameSize; handshaker->base.vtable = handshaker->use_dedicated_cq ? &handshaker_vtable_dedicated : &handshaker_vtable; diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.h b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.h index 5bace9affa8..e1ae985a84d 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.h +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.h @@ -38,6 +38,11 @@ const size_t kTsiAltsNumOfPeerProperties = 5; +// Frame size negotiation extends send frame size range to +// [kTsiAltsMinFrameSize, kTsiAltsMaxFrameSize]. +const size_t kTsiAltsMinFrameSize = 16 * 1024; +const size_t kTsiAltsMaxFrameSize = 128 * 1024; + typedef struct alts_tsi_handshaker alts_tsi_handshaker; /** @@ -54,6 +59,8 @@ typedef struct alts_tsi_handshaker alts_tsi_handshaker; * - interested_parties: set of pollsets interested in this connection. * - self: address of ALTS TSI handshaker instance to be returned from the * method. + * - user_specified_max_frame_size: Determines the maximum frame size used by + * frame protector that is specified via user. If unspecified, the value is 0. * * It returns TSI_OK on success and an error status code on failure. Note that * if interested_parties is nullptr, a dedicated TSI thread will be created and @@ -62,7 +69,8 @@ typedef struct alts_tsi_handshaker alts_tsi_handshaker; tsi_result alts_tsi_handshaker_create( const grpc_alts_credentials_options* options, const char* target_name, const char* handshaker_service_url, bool is_client, - grpc_pollset_set* interested_parties, tsi_handshaker** self); + grpc_pollset_set* interested_parties, tsi_handshaker** self, + size_t user_specified_max_frame_size); /** * This method creates an ALTS TSI handshaker result instance. diff --git a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc index 0e1ab006728..5f9a4b2d745 100644 --- a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc +++ b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc @@ -31,6 +31,7 @@ #define ALTS_HANDSHAKER_CLIENT_TEST_TARGET_NAME "bigtable.google.api.com" #define ALTS_HANDSHAKER_CLIENT_TEST_TARGET_SERVICE_ACCOUNT1 "A@google.com" #define ALTS_HANDSHAKER_CLIENT_TEST_TARGET_SERVICE_ACCOUNT2 "B@google.com" +#define ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE 64 * 1024 const size_t kHandshakerClientOpNum = 4; const size_t kMaxRpcVersionMajor = 3; @@ -155,8 +156,8 @@ static grpc_call_error check_must_not_be_called(grpc_call* /*call*/, /** * A mock grpc_caller used to check correct execution of client_start operation. * It checks if the client_start handshaker request is populated with correct - * handshake_security_protocol, application_protocol, and record_protocol, and - * op is correctly populated. + * handshake_security_protocol, application_protocol, record_protocol and + * max_frame_size, and op is correctly populated. */ static grpc_call_error check_client_start_success(grpc_call* /*call*/, const grpc_op* op, @@ -196,7 +197,8 @@ static grpc_call_error check_client_start_success(grpc_call* /*call*/, GPR_ASSERT(upb_strview_eql( grpc_gcp_StartClientHandshakeReq_target_name(client_start), upb_strview_makez(ALTS_HANDSHAKER_CLIENT_TEST_TARGET_NAME))); - + GPR_ASSERT(grpc_gcp_StartClientHandshakeReq_max_frame_size(client_start) == + ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE); GPR_ASSERT(validate_op(client, op, nops, true /* is_start */)); return GRPC_CALL_OK; } @@ -204,8 +206,8 @@ static grpc_call_error check_client_start_success(grpc_call* /*call*/, /** * A mock grpc_caller used to check correct execution of server_start operation. * It checks if the server_start handshaker request is populated with correct - * handshake_security_protocol, application_protocol, and record_protocol, and - * op is correctly populated. + * handshake_security_protocol, application_protocol, record_protocol and + * max_frame_size, and op is correctly populated. */ static grpc_call_error check_server_start_success(grpc_call* /*call*/, const grpc_op* op, @@ -245,6 +247,8 @@ static grpc_call_error check_server_start_success(grpc_call* /*call*/, upb_strview_makez(ALTS_RECORD_PROTOCOL))); validate_rpc_protocol_versions( grpc_gcp_StartServerHandshakeReq_rpc_versions(server_start)); + GPR_ASSERT(grpc_gcp_StartServerHandshakeReq_max_frame_size(server_start) == + ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE); GPR_ASSERT(validate_op(client, op, nops, true /* is_start */)); return GRPC_CALL_OK; } @@ -321,12 +325,14 @@ static alts_handshaker_client_test_config* create_config() { nullptr, config->channel, ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING, nullptr, server_options, grpc_slice_from_static_string(ALTS_HANDSHAKER_CLIENT_TEST_TARGET_NAME), - nullptr, nullptr, nullptr, nullptr, false); + nullptr, nullptr, nullptr, nullptr, false, + ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE); config->client = alts_grpc_handshaker_client_create( nullptr, config->channel, ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING, nullptr, client_options, grpc_slice_from_static_string(ALTS_HANDSHAKER_CLIENT_TEST_TARGET_NAME), - nullptr, nullptr, nullptr, nullptr, true); + nullptr, nullptr, nullptr, nullptr, true, + ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE); GPR_ASSERT(config->client != nullptr); GPR_ASSERT(config->server != nullptr); grpc_alts_credentials_options_destroy(client_options); diff --git a/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc b/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc index 5dd76d82fdc..2127e980488 100644 --- a/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc +++ b/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc @@ -27,6 +27,7 @@ #include "src/core/tsi/alts/handshaker/alts_shared_resource.h" #include "src/core/tsi/alts/handshaker/alts_tsi_handshaker.h" #include "src/core/tsi/alts/handshaker/alts_tsi_handshaker_private.h" +#include "src/core/tsi/transport_security_grpc.h" #include "src/proto/grpc/gcp/altscontext.upb.h" #include "test/core/tsi/alts/handshaker/alts_handshaker_service_api_test_lib.h" #include "test/core/util/test_config.h" @@ -49,6 +50,7 @@ #define ALTS_TSI_HANDSHAKER_TEST_APPLICATION_PROTOCOL \ "test application protocol" #define ALTS_TSI_HANDSHAKER_TEST_RECORD_PROTOCOL "test record protocol" +#define ALTS_TSI_HANDSHAKER_TEST_MAX_FRAME_SIZE 256 * 1024 using grpc_core::internal::alts_handshaker_client_check_fields_for_testing; using grpc_core::internal::alts_handshaker_client_get_handshaker_for_testing; @@ -164,6 +166,8 @@ static grpc_byte_buffer* generate_handshaker_response( upb_strview_makez(ALTS_TSI_HANDSHAKER_TEST_APPLICATION_PROTOCOL)); grpc_gcp_HandshakerResult_set_record_protocol( result, upb_strview_makez(ALTS_TSI_HANDSHAKER_TEST_RECORD_PROTOCOL)); + grpc_gcp_HandshakerResult_set_max_frame_size( + result, ALTS_TSI_HANDSHAKER_TEST_MAX_FRAME_SIZE); break; case SERVER_NEXT: grpc_gcp_HandshakerResp_set_bytes_consumed( @@ -283,6 +287,17 @@ static void on_client_next_success_cb(tsi_result status, void* user_data, GPR_ASSERT(memcmp(bytes_to_send, ALTS_TSI_HANDSHAKER_TEST_OUT_FRAME, bytes_to_send_size) == 0); GPR_ASSERT(result != nullptr); + // Validate max frame size value after Frame Size Negotiation. Here peer max + // frame size is greater than default value, and user specified max frame size + // is absent. + tsi_zero_copy_grpc_protector* zero_copy_protector = nullptr; + GPR_ASSERT(tsi_handshaker_result_create_zero_copy_grpc_protector( + result, nullptr, &zero_copy_protector) == TSI_OK); + size_t actual_max_frame_size; + tsi_zero_copy_grpc_protector_max_frame_size(zero_copy_protector, + &actual_max_frame_size); + GPR_ASSERT(actual_max_frame_size == kTsiAltsMaxFrameSize); + tsi_zero_copy_grpc_protector_destroy(zero_copy_protector); /* Validate peer identity. */ tsi_peer peer; GPR_ASSERT(tsi_handshaker_result_extract_peer(result, &peer) == TSI_OK); @@ -343,6 +358,20 @@ static void on_server_next_success_cb(tsi_result status, void* user_data, GPR_ASSERT(bytes_to_send_size == 0); GPR_ASSERT(bytes_to_send == nullptr); GPR_ASSERT(result != nullptr); + // Validate max frame size value after Frame Size Negotiation. The negotiated + // frame size value equals minimum send frame size, due to the absence of peer + // max frame size. + tsi_zero_copy_grpc_protector* zero_copy_protector = nullptr; + size_t user_specified_max_frame_size = + ALTS_TSI_HANDSHAKER_TEST_MAX_FRAME_SIZE; + GPR_ASSERT(tsi_handshaker_result_create_zero_copy_grpc_protector( + result, &user_specified_max_frame_size, + &zero_copy_protector) == TSI_OK); + size_t actual_max_frame_size; + tsi_zero_copy_grpc_protector_max_frame_size(zero_copy_protector, + &actual_max_frame_size); + GPR_ASSERT(actual_max_frame_size == kTsiAltsMinFrameSize); + tsi_zero_copy_grpc_protector_destroy(zero_copy_protector); /* Validate peer identity. */ tsi_peer peer; GPR_ASSERT(tsi_handshaker_result_extract_peer(result, &peer) == TSI_OK); @@ -478,7 +507,7 @@ static tsi_handshaker* create_test_handshaker(bool is_client) { grpc_alts_credentials_client_options_create(); alts_tsi_handshaker_create(options, "target_name", ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING, is_client, - nullptr, &handshaker); + nullptr, &handshaker, 0); alts_tsi_handshaker* alts_handshaker = reinterpret_cast(handshaker); alts_tsi_handshaker_set_client_vtable_for_testing(alts_handshaker, &vtable); diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 7e8d371fc3c..e8065fa8964 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -206,7 +206,6 @@ grpc_cc_test( srcs = [ "bm_fullstack_streaming_pump.cc", ], - flaky = True, # TODO(b/150422385) tags = [ "no_mac", # to emulate "excluded_poll_engines: poll" "no_windows", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 7bc5d2575a9..b09efd61926 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3757,6 +3757,26 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": true, + "ci_platforms": [ + "linux", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c++", + "name": "bm_fullstack_streaming_pump", + "platforms": [ + "linux", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": true, diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3dc66ab7424..762743e0993 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -182,6 +182,10 @@ argp.add_argument('--log_client_output', help='Log captured client output', default=False, action='store_true') +argp.add_argument('--only_stable_gcp_apis', + help='Do not use alpha compute APIs', + default=False, + action='store_true') args = argp.parse_args() if args.verbose: @@ -577,16 +581,27 @@ def add_instance_group(gcp, zone, name, size): def create_health_check(gcp, name): - config = { - 'name': name, - 'type': 'GRPC', - 'grpcHealthCheck': { - 'portSpecification': 'USE_SERVING_PORT' + if gcp.alpha_compute: + config = { + 'name': name, + 'type': 'GRPC', + 'grpcHealthCheck': { + 'portSpecification': 'USE_SERVING_PORT' + } } - } + compute_to_use = gcp.alpha_compute + else: + config = { + 'name': name, + 'type': 'TCP', + 'tcpHealthCheck': { + 'portName': 'grpc' + } + } + compute_to_use = gcp.compute logger.debug('Sending GCP request with body=%s', config) - result = gcp.alpha_compute.healthChecks().insert(project=gcp.project, - body=config).execute() + result = compute_to_use.healthChecks().insert(project=gcp.project, + body=config).execute() wait_for_global_operation(gcp, result['name']) gcp.health_check = GcpResource(config['name'], result['targetLink']) @@ -610,16 +625,22 @@ def create_health_check_firewall_rule(gcp, name): def add_backend_service(gcp, name): + if gcp.alpha_compute: + protocol = 'GRPC' + compute_to_use = gcp.alpha_compute + else: + protocol = 'HTTP2' + compute_to_use = gcp.compute config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 'healthChecks': [gcp.health_check.url], 'portName': 'grpc', - 'protocol': 'GRPC' + 'protocol': protocol } logger.debug('Sending GCP request with body=%s', config) - result = gcp.alpha_compute.backendServices().insert(project=gcp.project, - body=config).execute() + result = compute_to_use.backendServices().insert(project=gcp.project, + body=config).execute() wait_for_global_operation(gcp, result['name']) backend_service = GcpResource(config['name'], result['targetLink']) gcp.backend_services.append(backend_service) @@ -646,7 +667,7 @@ def create_url_map(gcp, name, backend_service, host_name): gcp.url_map = GcpResource(config['name'], result['targetLink']) -def patch_url_map_host_rule(gcp, name, backend_service, host_name): +def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): config = { 'hostRules': [{ 'hosts': ['%s:%d' % (host_name, gcp.service_port)], @@ -660,20 +681,33 @@ def patch_url_map_host_rule(gcp, name, backend_service, host_name): wait_for_global_operation(gcp, result['name']) -def create_target_grpc_proxy(gcp, name): - config = { - 'name': name, - 'url_map': gcp.url_map.url, - 'validate_for_proxyless': True, - } - logger.debug('Sending GCP request with body=%s', config) - result = gcp.alpha_compute.targetGrpcProxies().insert( - project=gcp.project, body=config).execute() +def create_target_proxy(gcp, name): + if gcp.alpha_compute: + config = { + 'name': name, + 'url_map': gcp.url_map.url, + 'validate_for_proxyless': True, + } + logger.debug('Sending GCP request with body=%s', config) + result = gcp.alpha_compute.targetGrpcProxies().insert( + project=gcp.project, body=config).execute() + else: + config = { + 'name': name, + 'url_map': gcp.url_map.url, + } + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.targetHttpProxies().insert(project=gcp.project, + body=config).execute() wait_for_global_operation(gcp, result['name']) - gcp.target_grpc_proxy = GcpResource(config['name'], result['targetLink']) + gcp.target_proxy = GcpResource(config['name'], result['targetLink']) def create_global_forwarding_rule(gcp, name, potential_ports): + if gcp.alpha_compute: + compute_to_use = gcp.alpha_compute + else: + compute_to_use = gcp.compute for port in potential_ports: try: config = { @@ -682,10 +716,10 @@ def create_global_forwarding_rule(gcp, name, potential_ports): 'portRange': str(port), 'IPAddress': '0.0.0.0', 'network': args.network, - 'target': gcp.target_grpc_proxy.url, + 'target': gcp.target_proxy.url, } logger.debug('Sending GCP request with body=%s', config) - result = gcp.alpha_compute.globalForwardingRules().insert( + result = compute_to_use.globalForwardingRules().insert( project=gcp.project, body=config).execute() wait_for_global_operation(gcp, result['name']) gcp.global_forwarding_rule = GcpResource(config['name'], @@ -708,11 +742,16 @@ def delete_global_forwarding_rule(gcp): logger.info('Delete failed: %s', http_error) -def delete_target_grpc_proxy(gcp): +def delete_target_proxy(gcp): try: - result = gcp.alpha_compute.targetGrpcProxies().delete( - project=gcp.project, - targetGrpcProxy=gcp.target_grpc_proxy.name).execute() + if gcp.alpha_compute: + result = gcp.alpha_compute.targetGrpcProxies().delete( + project=gcp.project, + targetGrpcProxy=gcp.target_proxy.name).execute() + else: + result = gcp.compute.targetHttpProxies().delete( + project=gcp.project, + targetHttpProxy=gcp.target_proxy.name).execute() wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) @@ -786,6 +825,10 @@ def patch_backend_instances(gcp, backend_service, instance_groups, balancing_mode='UTILIZATION'): + if gcp.alpha_compute: + compute_to_use = gcp.alpha_compute + else: + compute_to_use = gcp.compute config = { 'backends': [{ 'group': instance_group.url, @@ -794,10 +837,12 @@ def patch_backend_instances(gcp, } for instance_group in instance_groups], } logger.debug('Sending GCP request with body=%s', config) - result = gcp.alpha_compute.backendServices().patch( + result = compute_to_use.backendServices().patch( project=gcp.project, backendService=backend_service.name, body=config).execute() - wait_for_global_operation(gcp, result['name']) + wait_for_global_operation(gcp, + result['name'], + timeout_sec=_WAIT_FOR_BACKEND_SEC) def resize_instance_group(gcp, @@ -920,8 +965,8 @@ def get_instance_names(gcp, instance_group): def clean_up(gcp): if gcp.global_forwarding_rule: delete_global_forwarding_rule(gcp) - if gcp.target_grpc_proxy: - delete_target_grpc_proxy(gcp) + if gcp.target_proxy: + delete_target_proxy(gcp) if gcp.url_map: delete_url_map(gcp) delete_backend_services(gcp) @@ -959,23 +1004,26 @@ class GcpState(object): self.health_check_firewall_rule = None self.backend_services = [] self.url_map = None - self.target_grpc_proxy = None + self.target_proxy = None self.global_forwarding_rule = None self.service_port = None self.instance_template = None self.instance_groups = [] +alpha_compute = None if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: compute = googleapiclient.discovery.build_from_document( discovery_doc.read()) - with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: - alpha_compute = googleapiclient.discovery.build_from_document( - discovery_doc.read()) + if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: + with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: + alpha_compute = googleapiclient.discovery.build_from_document( + discovery_doc.read()) else: compute = googleapiclient.discovery.build('compute', 'v1') - alpha_compute = googleapiclient.discovery.build('compute', 'alpha') + if not args.only_stable_gcp_apis: + alpha_compute = googleapiclient.discovery.build('compute', 'alpha') try: gcp = GcpState(compute, alpha_compute, args.project_id) @@ -985,7 +1033,7 @@ try: alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix - target_grpc_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + target_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix template_name = _BASE_TEMPLATE_NAME + args.gcp_suffix instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix @@ -999,7 +1047,7 @@ try: alternate_backend_service = add_backend_service( gcp, alternate_backend_service_name) create_url_map(gcp, url_map_name, backend_service, service_host_name) - create_target_grpc_proxy(gcp, target_grpc_proxy_name) + create_target_proxy(gcp, target_proxy_name) potential_service_ports = list(args.service_port_range) random.shuffle(potential_service_ports) create_global_forwarding_rule(gcp, forwarding_rule_name, @@ -1007,8 +1055,10 @@ try: if not gcp.service_port: raise Exception( 'Failed to find a valid ip:port for the forwarding rule') - patch_url_map_host_rule(gcp, url_map_name, backend_service, - service_host_name) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) startup_script = get_startup_script(args.path_to_server_binary, gcp.service_port) create_instance_template(gcp, template_name, args.network,