From 4ce897eddc994509f54ece9a478831ea276a2871 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 2 Apr 2020 18:54:18 -0700 Subject: [PATCH 1/4] C++ize message compress filter --- .../message_compress_filter.cc | 458 ++++++++++-------- 1 file changed, 244 insertions(+), 214 deletions(-) diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 27f0333bee4..fcee54ebfe3 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -40,94 +40,153 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -static void start_send_message_batch(void* arg, grpc_error* unused); -static void send_message_on_complete(void* arg, grpc_error* error); -static void on_send_message_next_done(void* arg, grpc_error* error); - namespace { -struct channel_data { +class ChannelData { + public: + ChannelData(grpc_channel_element_args* args) { + // Get the enabled and the default algorithms from channel args. + enabled_compression_algorithms_bitset_ = + grpc_channel_args_compression_algorithm_get_states(args->channel_args); + default_compression_algorithm_ = + grpc_channel_args_get_channel_default_compression_algorithm( + args->channel_args); + // Make sure the default is enabled. + if (!GPR_BITGET(enabled_compression_algorithms_bitset_, + default_compression_algorithm_)) { + const char* name; + GPR_ASSERT(grpc_compression_algorithm_name(default_compression_algorithm_, + &name) == 1); + gpr_log(GPR_ERROR, + "default compression algorithm %s not enabled: switching to none", + name); + default_compression_algorithm_ = GRPC_COMPRESS_NONE; + } + enabled_message_compression_algorithms_bitset_ = + grpc_compression_bitset_to_message_bitset( + enabled_compression_algorithms_bitset_); + enabled_stream_compression_algorithms_bitset_ = + grpc_compression_bitset_to_stream_bitset( + enabled_compression_algorithms_bitset_); + GPR_ASSERT(!args->is_last); + } + + grpc_compression_algorithm default_compression_algorithm() const { + return default_compression_algorithm_; + } + + uint32_t enabled_compression_algorithms_bitset() const { + return enabled_compression_algorithms_bitset_; + } + + uint32_t enabled_message_compression_algorithms_bitset() const { + return enabled_message_compression_algorithms_bitset_; + } + + uint32_t enabled_stream_compression_algorithms_bitset() const { + return enabled_stream_compression_algorithms_bitset_; + } + + private: /** The default, channel-level, compression algorithm */ - grpc_compression_algorithm default_compression_algorithm; + grpc_compression_algorithm default_compression_algorithm_; /** Bitset of enabled compression algorithms */ - uint32_t enabled_compression_algorithms_bitset; + uint32_t enabled_compression_algorithms_bitset_; /** Bitset of enabled message compression algorithms */ - uint32_t enabled_message_compression_algorithms_bitset; + uint32_t enabled_message_compression_algorithms_bitset_; /** Bitset of enabled stream compression algorithms */ - uint32_t enabled_stream_compression_algorithms_bitset; + uint32_t enabled_stream_compression_algorithms_bitset_; }; -struct call_data { - call_data(grpc_call_element* elem, const grpc_call_element_args& args) - : call_combiner(args.call_combiner) { - channel_data* channeld = static_cast(elem->channel_data); +class CallData { + public: + CallData(grpc_call_element* elem, const grpc_call_element_args& args) + : elem_(elem), call_combiner_(args.call_combiner) { + ChannelData* channeld = static_cast(elem_->channel_data); // The call's message compression algorithm is set to channel's default // setting. It can be overridden later by initial metadata. - if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset, - channeld->default_compression_algorithm))) { - message_compression_algorithm = + if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset(), + channeld->default_compression_algorithm()))) { + message_compression_algorithm_ = grpc_compression_algorithm_to_message_compression_algorithm( - channeld->default_compression_algorithm); + channeld->default_compression_algorithm()); } - GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner, - start_send_message_batch, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner_, + StartSendMessageBatch, this, grpc_schedule_on_exec_ctx); } - ~call_data() { - if (state_initialized) { - grpc_slice_buffer_destroy_internal(&slices); + ~CallData() { + if (state_initialized_) { + grpc_slice_buffer_destroy_internal(&slices_); } - GRPC_ERROR_UNREF(cancel_error); + GRPC_ERROR_UNREF(cancel_error_); } - grpc_core::CallCombiner* call_combiner; - grpc_message_compression_algorithm message_compression_algorithm = + static void CompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + + bool SkipMessageCompression(); + void InitializeState(); + + grpc_error* ProcessSendInitialMetadata(grpc_metadata_batch* initial_metadata); + + // Methods for processing a send_message batch + static void StartSendMessageBatch(void* arg, grpc_error* unused); + static void OnSendMessageNextDone(void* arg, grpc_error* error); + grpc_error* PullSliceFromSendMessage(); + void ContinueReadingSendMessage(); + void FinishSendMessage(); + void SendMessageBatchContinue(); + static void FailSendMessageBatchInCallCombiner(void* arg, grpc_error* error); + + static void SendMessageOnComplete(void* arg, grpc_error* error); + + private: + grpc_call_element* elem_ = nullptr; + grpc_core::CallCombiner* call_combiner_ = nullptr; + grpc_message_compression_algorithm message_compression_algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; - grpc_error* cancel_error = GRPC_ERROR_NONE; - grpc_transport_stream_op_batch* send_message_batch = nullptr; - bool seen_initial_metadata = false; + grpc_error* cancel_error_ = GRPC_ERROR_NONE; + grpc_transport_stream_op_batch* send_message_batch_ = nullptr; + bool seen_initial_metadata_ = false; /* Set to true, if the fields below are initialized. */ - bool state_initialized = false; - grpc_closure start_send_message_batch_in_call_combiner; + bool state_initialized_ = false; + grpc_closure start_send_message_batch_in_call_combiner_; /* The fields below are only initialized when we compress the payload. * Keep them at the bottom of the struct, so they don't pollute the * cache-lines. */ - grpc_linked_mdelem message_compression_algorithm_storage; - grpc_linked_mdelem stream_compression_algorithm_storage; - grpc_linked_mdelem accept_encoding_storage; - grpc_linked_mdelem accept_stream_encoding_storage; - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ + grpc_linked_mdelem message_compression_algorithm_storage_; + grpc_linked_mdelem stream_compression_algorithm_storage_; + grpc_linked_mdelem accept_encoding_storage_; + grpc_linked_mdelem accept_stream_encoding_storage_; + grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */ grpc_core::ManualConstructor - replacement_stream; - grpc_closure* original_send_message_on_complete; - grpc_closure send_message_on_complete; - grpc_closure on_send_message_next_done; + replacement_stream_; + grpc_closure* original_send_message_on_complete_ = nullptr; + grpc_closure send_message_on_complete_; + grpc_closure on_send_message_next_done_; }; -} // namespace - // Returns true if we should skip message compression for the current message. -static bool skip_message_compression(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); +bool CallData::SkipMessageCompression() { // If the flags of this message indicate that it shouldn't be compressed, we // skip message compression. uint32_t flags = - calld->send_message_batch->payload->send_message.send_message->flags(); + send_message_batch_->payload->send_message.send_message->flags(); if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { return true; } // If this call doesn't have any message compression algorithm set, skip // message compression. - return calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE; + return message_compression_algorithm_ == GRPC_MESSAGE_COMPRESS_NONE; } // Determines the compression algorithm from the initial metadata and the // channel's default setting. -static grpc_compression_algorithm find_compression_algorithm( - grpc_metadata_batch* initial_metadata, channel_data* channeld) { +grpc_compression_algorithm FindCompressionAlgorithm( + grpc_metadata_batch* initial_metadata, ChannelData* channeld) { if (initial_metadata->idx.named.grpc_internal_encoding_request == nullptr) { - return channeld->default_compression_algorithm; + return channeld->default_compression_algorithm(); } grpc_compression_algorithm compression_algorithm; // Parse the compression algorithm from the initial metadata. @@ -143,7 +202,7 @@ static grpc_compression_algorithm find_compression_algorithm( // enabled. // TODO(juanlishen): Maybe use channel default or abort() if the algorithm // from the initial metadata is disabled. - if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset, + if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset(), compression_algorithm))) { return compression_algorithm; } @@ -158,30 +217,24 @@ static grpc_compression_algorithm find_compression_algorithm( return GRPC_COMPRESS_NONE; } -static void initialize_state(grpc_call_element* elem, call_data* calld) { - GPR_DEBUG_ASSERT(!calld->state_initialized); - calld->state_initialized = true; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->send_message_on_complete, - ::send_message_on_complete, elem, +void CallData::InitializeState() { + GPR_DEBUG_ASSERT(!state_initialized_); + state_initialized_ = true; + grpc_slice_buffer_init(&slices_); + GRPC_CLOSURE_INIT(&send_message_on_complete_, SendMessageOnComplete, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, - ::on_send_message_next_done, elem, + GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, this, grpc_schedule_on_exec_ctx); } -static grpc_error* process_send_initial_metadata( - grpc_call_element* elem, - grpc_metadata_batch* initial_metadata) GRPC_MUST_USE_RESULT; -static grpc_error* process_send_initial_metadata( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { - call_data* calld = static_cast(elem->call_data); - channel_data* channeld = static_cast(elem->channel_data); +grpc_error* CallData::ProcessSendInitialMetadata( + grpc_metadata_batch* initial_metadata) { + ChannelData* channeld = static_cast(elem_->channel_data); // Find the compression algorithm. grpc_compression_algorithm compression_algorithm = - find_compression_algorithm(initial_metadata, channeld); + FindCompressionAlgorithm(initial_metadata, channeld); // Note that at most one of the following algorithms can be set. - calld->message_compression_algorithm = + message_compression_algorithm_ = grpc_compression_algorithm_to_message_compression_algorithm( compression_algorithm); grpc_stream_compression_algorithm stream_compression_algorithm = @@ -189,257 +242,253 @@ static grpc_error* process_send_initial_metadata( compression_algorithm); // Hint compression algorithm. grpc_error* error = GRPC_ERROR_NONE; - if (calld->message_compression_algorithm != GRPC_MESSAGE_COMPRESS_NONE) { - initialize_state(elem, calld); + if (message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { + InitializeState(); error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->message_compression_algorithm_storage, + initial_metadata, &message_compression_algorithm_storage_, grpc_message_compression_encoding_mdelem( - calld->message_compression_algorithm), + message_compression_algorithm_), GRPC_BATCH_GRPC_ENCODING); } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { - initialize_state(elem, calld); + InitializeState(); error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->stream_compression_algorithm_storage, + initial_metadata, &stream_compression_algorithm_storage_, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm), GRPC_BATCH_CONTENT_ENCODING); } if (error != GRPC_ERROR_NONE) return error; // Convey supported compression algorithms. error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_encoding_storage, + initial_metadata, &accept_encoding_storage_, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( - channeld->enabled_message_compression_algorithms_bitset), + channeld->enabled_message_compression_algorithms_bitset()), GRPC_BATCH_GRPC_ACCEPT_ENCODING); if (error != GRPC_ERROR_NONE) return error; // Do not overwrite accept-encoding header if it already presents (e.g., added // by some proxy). if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( - initial_metadata, &calld->accept_stream_encoding_storage, + initial_metadata, &accept_stream_encoding_storage_, GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( - channeld->enabled_stream_compression_algorithms_bitset), + channeld->enabled_stream_compression_algorithms_bitset()), GRPC_BATCH_ACCEPT_ENCODING); } return error; } -static void send_message_on_complete(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - call_data* calld = static_cast(elem->call_data); - grpc_slice_buffer_reset_and_unref_internal(&calld->slices); +void CallData::SendMessageOnComplete(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + grpc_slice_buffer_reset_and_unref_internal(&calld->slices_); grpc_core::Closure::Run(DEBUG_LOCATION, - calld->original_send_message_on_complete, + calld->original_send_message_on_complete_, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); +void CallData::SendMessageBatchContinue() { // Note: The call to grpc_call_next_op() results in yielding the - // call combiner, so we need to clear calld->send_message_batch + // call combiner, so we need to clear calld->send_message_batch_ // before we do that. grpc_transport_stream_op_batch* send_message_batch = - calld->send_message_batch; - calld->send_message_batch = nullptr; - grpc_call_next_op(elem, send_message_batch); + this->send_message_batch_; + send_message_batch_ = nullptr; + grpc_call_next_op(elem_, send_message_batch); } -static void finish_send_message(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); - GPR_DEBUG_ASSERT(calld->message_compression_algorithm != +void CallData::FinishSendMessage() { + GPR_DEBUG_ASSERT(message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE); // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = - calld->send_message_batch->payload->send_message.send_message->flags(); - bool did_compress = grpc_msg_compress(calld->message_compression_algorithm, - &calld->slices, &tmp); + send_message_batch_->payload->send_message.send_message->flags(); + bool did_compress = + grpc_msg_compress(message_compression_algorithm_, &slices_, &tmp); if (did_compress) { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { const char* algo_name; - const size_t before_size = calld->slices.length; + const size_t before_size = slices_.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - static_cast(after_size) / static_cast(before_size); GPR_ASSERT(grpc_message_compression_algorithm_name( - calld->message_compression_algorithm, &algo_name)); + message_compression_algorithm_, &algo_name)); gpr_log(GPR_INFO, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } - grpc_slice_buffer_swap(&calld->slices, &tmp); + grpc_slice_buffer_swap(&slices_, &tmp); send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { const char* algo_name; GPR_ASSERT(grpc_message_compression_algorithm_name( - calld->message_compression_algorithm, &algo_name)); + message_compression_algorithm_, &algo_name)); gpr_log(GPR_INFO, "Algorithm '%s' enabled but decided not to compress. Input size: " "%" PRIuPTR, - algo_name, calld->slices.length); + algo_name, slices_.length); } } grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - calld->replacement_stream.Init(&calld->slices, send_flags); - calld->send_message_batch->payload->send_message.send_message.reset( - calld->replacement_stream.get()); - calld->original_send_message_on_complete = - calld->send_message_batch->on_complete; - calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(elem); + replacement_stream_.Init(&slices_, send_flags); + send_message_batch_->payload->send_message.send_message.reset( + replacement_stream_.get()); + original_send_message_on_complete_ = send_message_batch_->on_complete; + send_message_batch_->on_complete = &send_message_on_complete_; + SendMessageBatchContinue(); } -static void fail_send_message_batch_in_call_combiner(void* arg, - grpc_error* error) { - call_data* calld = static_cast(arg); - if (calld->send_message_batch != nullptr) { +void CallData::FailSendMessageBatchInCallCombiner(void* arg, + grpc_error* error) { + CallData* calld = static_cast(arg); + if (calld->send_message_batch_ != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner); - calld->send_message_batch = nullptr; + calld->send_message_batch_, GRPC_ERROR_REF(error), + calld->call_combiner_); + calld->send_message_batch_ = nullptr; } } -// Pulls a slice from the send_message byte stream and adds it to calld->slices. -static grpc_error* pull_slice_from_send_message(call_data* calld) { +// Pulls a slice from the send_message byte stream and adds it to +// calld->slices_. +grpc_error* CallData::PullSliceFromSendMessage() { grpc_slice incoming_slice; grpc_error* error = - calld->send_message_batch->payload->send_message.send_message->Pull( + send_message_batch_->payload->send_message.send_message->Pull( &incoming_slice); if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&calld->slices, incoming_slice); + grpc_slice_buffer_add(&slices_, incoming_slice); } return error; } // Reads as many slices as possible from the send_message byte stream. -// If all data has been read, invokes finish_send_message(). Otherwise, +// If all data has been read, invokes FinishSendMessage(). Otherwise, // an async call to ByteStream::Next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_call_element* elem) { - call_data* calld = static_cast(elem->call_data); - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length()) { - finish_send_message(elem); +void CallData::ContinueReadingSendMessage() { + if (slices_.length == + send_message_batch_->payload->send_message.send_message->length()) { + FinishSendMessage(); return; } - while (calld->send_message_batch->payload->send_message.send_message->Next( - ~static_cast(0), &calld->on_send_message_next_done)) { - grpc_error* error = pull_slice_from_send_message(calld); + while (send_message_batch_->payload->send_message.send_message->Next( + ~static_cast(0), &on_send_message_next_done_)) { + grpc_error* error = PullSliceFromSendMessage(); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(this, error); GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == calld->send_message_batch->payload->send_message - .send_message->length()) { - finish_send_message(elem); + if (slices_.length == + send_message_batch_->payload->send_message.send_message->length()) { + FinishSendMessage(); break; } } } // Async callback for ByteStream::Next(). -static void on_send_message_next_done(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - call_data* calld = static_cast(elem->call_data); +void CallData::OnSendMessageNextDone(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(calld, error); return; } - error = pull_slice_from_send_message(calld); + error = calld->PullSliceFromSendMessage(); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(calld, error); + FailSendMessageBatchInCallCombiner(calld, error); GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length()) { - finish_send_message(elem); + if (calld->slices_.length == calld->send_message_batch_->payload->send_message + .send_message->length()) { + calld->FinishSendMessage(); } else { - continue_reading_send_message(elem); + calld->ContinueReadingSendMessage(); } } -static void start_send_message_batch(void* arg, grpc_error* /*unused*/) { - grpc_call_element* elem = static_cast(arg); - if (skip_message_compression(elem)) { - send_message_batch_continue(elem); +void CallData::StartSendMessageBatch(void* arg, grpc_error* /*unused*/) { + CallData* calld = static_cast(arg); + if (calld->SkipMessageCompression()) { + calld->SendMessageBatchContinue(); } else { - continue_reading_send_message(elem); + calld->ContinueReadingSendMessage(); } } -static void compress_start_transport_stream_op_batch( +void CallData::CompressStartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0); - call_data* calld = static_cast(elem->call_data); + CallData* calld = static_cast(elem->call_data); // Handle cancel_stream. if (batch->cancel_stream) { - GRPC_ERROR_UNREF(calld->cancel_error); - calld->cancel_error = + GRPC_ERROR_UNREF(calld->cancel_error_); + calld->cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (calld->send_message_batch != nullptr) { - if (!calld->seen_initial_metadata) { + if (calld->send_message_batch_ != nullptr) { + if (!calld->seen_initial_metadata_) { GRPC_CALL_COMBINER_START( - calld->call_combiner, - GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, + calld->call_combiner_, + GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, calld, grpc_schedule_on_exec_ctx), - GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); + GRPC_ERROR_REF(calld->cancel_error_), "failing send_message op"); } else { - calld->send_message_batch->payload->send_message.send_message->Shutdown( - GRPC_ERROR_REF(calld->cancel_error)); + calld->send_message_batch_->payload->send_message.send_message + ->Shutdown(GRPC_ERROR_REF(calld->cancel_error_)); } } - } else if (calld->cancel_error != GRPC_ERROR_NONE) { + } else if (calld->cancel_error_ != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); + batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_); return; } // Handle send_initial_metadata. if (batch->send_initial_metadata) { - GPR_ASSERT(!calld->seen_initial_metadata); - grpc_error* error = process_send_initial_metadata( - elem, batch->payload->send_initial_metadata.send_initial_metadata); + GPR_ASSERT(!calld->seen_initial_metadata_); + grpc_error* error = calld->ProcessSendInitialMetadata( + batch->payload->send_initial_metadata.send_initial_metadata); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure(batch, error, - calld->call_combiner); + calld->call_combiner_); return; } - calld->seen_initial_metadata = true; + calld->seen_initial_metadata_ = true; // If we had previously received a batch containing a send_message op, // handle it now. Note that we need to re-enter the call combiner // for this, since we can't send two batches down while holding the // call combiner, since the connected_channel filter (at the bottom of // the call stack) will release the call combiner for each batch it sees. - if (calld->send_message_batch != nullptr) { + if (calld->send_message_batch_ != nullptr) { GRPC_CALL_COMBINER_START( - calld->call_combiner, - &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, + calld->call_combiner_, + &calld->start_send_message_batch_in_call_combiner_, GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } } // Handle send_message. if (batch->send_message) { - GPR_ASSERT(calld->send_message_batch == nullptr); - calld->send_message_batch = batch; + GPR_ASSERT(calld->send_message_batch_ == nullptr); + calld->send_message_batch_ = batch; // If we have not yet seen send_initial_metadata, then we have to // wait. We save the batch in calld and then drop the call // combiner, which we'll have to pick up again later when we get // send_initial_metadata. - if (!calld->seen_initial_metadata) { + if (!calld->seen_initial_metadata_) { GRPC_CALL_COMBINER_STOP( - calld->call_combiner, + calld->call_combiner_, "send_message batch pending send_initial_metadata"); return; } - start_send_message_batch(elem, GRPC_ERROR_NONE); + StartSendMessageBatch(calld, GRPC_ERROR_NONE); } else { // Pass control down the stack. grpc_call_next_op(elem, batch); @@ -447,63 +496,44 @@ static void compress_start_transport_stream_op_batch( } /* Constructor for call_data */ -static grpc_error* compress_init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { - new (elem->call_data) call_data(elem, *args); +static grpc_error* CompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(elem, *args); return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void compress_destroy_call_elem( - grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - call_data* calld = static_cast(elem->call_data); - calld->~call_data(); +static void CompressDestroyCallElem(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + CallData* calld = static_cast(elem->call_data); + calld->~CallData(); } -/* Constructor for channel_data */ -static grpc_error* compress_init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - channel_data* channeld = static_cast(elem->channel_data); - // Get the enabled and the default algorithms from channel args. - channeld->enabled_compression_algorithms_bitset = - grpc_channel_args_compression_algorithm_get_states(args->channel_args); - channeld->default_compression_algorithm = - grpc_channel_args_get_channel_default_compression_algorithm( - args->channel_args); - // Make sure the default is enabled. - if (!GPR_BITGET(channeld->enabled_compression_algorithms_bitset, - channeld->default_compression_algorithm)) { - const char* name; - GPR_ASSERT(grpc_compression_algorithm_name( - channeld->default_compression_algorithm, &name) == 1); - gpr_log(GPR_ERROR, - "default compression algorithm %s not enabled: switching to none", - name); - channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; - } - channeld->enabled_message_compression_algorithms_bitset = - grpc_compression_bitset_to_message_bitset( - channeld->enabled_compression_algorithms_bitset); - channeld->enabled_stream_compression_algorithms_bitset = - grpc_compression_bitset_to_stream_bitset( - channeld->enabled_compression_algorithms_bitset); - GPR_ASSERT(!args->is_last); +/* Constructor for ChannelData */ +static grpc_error* CompressInitChannelElem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + new (elem->channel_data) ChannelData(args); return GRPC_ERROR_NONE; } /* Destructor for channel data */ -static void compress_destroy_channel_elem(grpc_channel_element* /*elem*/) {} +void CompressDestroyChannelElem(grpc_channel_element* elem) { + ChannelData* channeld = static_cast(elem->channel_data); + channeld->~ChannelData(); +} + +} // namespace const grpc_channel_filter grpc_message_compress_filter = { - compress_start_transport_stream_op_batch, + CallData::CompressStartTransportStreamOpBatch, grpc_channel_next_op, - sizeof(call_data), - compress_init_call_elem, + sizeof(CallData), + CompressInitCallElem, grpc_call_stack_ignore_set_pollset_or_pollset_set, - compress_destroy_call_elem, - sizeof(channel_data), - compress_init_channel_elem, - compress_destroy_channel_elem, + CompressDestroyCallElem, + sizeof(ChannelData), + CompressInitChannelElem, + CompressDestroyChannelElem, grpc_channel_next_get_info, "message_compress"}; From 44bc3b9bd6c030f9ed5983b682ae51e454bf1026 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 9 Apr 2020 22:45:08 -0700 Subject: [PATCH 2/4] Reviewer comments --- .../message_compress_filter.cc | 186 +++++++++--------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index fcee54ebfe3..b962c99e1d5 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -21,6 +21,8 @@ #include #include +#include "absl/types/optional.h" + #include #include #include @@ -44,7 +46,7 @@ namespace { class ChannelData { public: - ChannelData(grpc_channel_element_args* args) { + explicit ChannelData(grpc_channel_element_args* args) { // Get the enabled and the default algorithms from channel args. enabled_compression_algorithms_bitset_ = grpc_channel_args_compression_algorithm_get_states(args->channel_args); @@ -101,8 +103,8 @@ class ChannelData { class CallData { public: CallData(grpc_call_element* elem, const grpc_call_element_args& args) - : elem_(elem), call_combiner_(args.call_combiner) { - ChannelData* channeld = static_cast(elem_->channel_data); + : call_combiner_(args.call_combiner) { + ChannelData* channeld = static_cast(elem->channel_data); // The call's message compression algorithm is set to channel's default // setting. It can be overridden later by initial metadata. if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset(), @@ -112,7 +114,7 @@ class CallData { channeld->default_compression_algorithm()); } GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner_, - StartSendMessageBatch, this, grpc_schedule_on_exec_ctx); + StartSendMessageBatch, elem, grpc_schedule_on_exec_ctx); } ~CallData() { @@ -122,28 +124,29 @@ class CallData { GRPC_ERROR_UNREF(cancel_error_); } - static void CompressStartTransportStreamOpBatch( + void CompressStartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + private: bool SkipMessageCompression(); - void InitializeState(); + void InitializeState(grpc_call_element* elem); - grpc_error* ProcessSendInitialMetadata(grpc_metadata_batch* initial_metadata); + grpc_error* ProcessSendInitialMetadata(grpc_call_element* elem, + grpc_metadata_batch* initial_metadata); // Methods for processing a send_message batch - static void StartSendMessageBatch(void* arg, grpc_error* unused); - static void OnSendMessageNextDone(void* arg, grpc_error* error); + static void StartSendMessageBatch(void* elem_arg, grpc_error* unused); + static void OnSendMessageNextDone(void* elem_arg, grpc_error* error); grpc_error* PullSliceFromSendMessage(); - void ContinueReadingSendMessage(); - void FinishSendMessage(); - void SendMessageBatchContinue(); - static void FailSendMessageBatchInCallCombiner(void* arg, grpc_error* error); + void ContinueReadingSendMessage(grpc_call_element* elem); + void FinishSendMessage(grpc_call_element* elem); + void SendMessageBatchContinue(grpc_call_element* elem); + static void FailSendMessageBatchInCallCombiner(void* calld_arg, + grpc_error* error); - static void SendMessageOnComplete(void* arg, grpc_error* error); + static void SendMessageOnComplete(void* calld_arg, grpc_error* error); - private: - grpc_call_element* elem_ = nullptr; - grpc_core::CallCombiner* call_combiner_ = nullptr; + grpc_core::CallCombiner* call_combiner_; grpc_message_compression_algorithm message_compression_algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; grpc_error* cancel_error_ = GRPC_ERROR_NONE; @@ -160,8 +163,7 @@ class CallData { grpc_linked_mdelem accept_encoding_storage_; grpc_linked_mdelem accept_stream_encoding_storage_; grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */ - grpc_core::ManualConstructor - replacement_stream_; + absl::optional replacement_stream_; grpc_closure* original_send_message_on_complete_ = nullptr; grpc_closure send_message_on_complete_; grpc_closure on_send_message_next_done_; @@ -217,19 +219,19 @@ grpc_compression_algorithm FindCompressionAlgorithm( return GRPC_COMPRESS_NONE; } -void CallData::InitializeState() { +void CallData::InitializeState(grpc_call_element* elem) { GPR_DEBUG_ASSERT(!state_initialized_); state_initialized_ = true; grpc_slice_buffer_init(&slices_); GRPC_CLOSURE_INIT(&send_message_on_complete_, SendMessageOnComplete, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, this, + GRPC_CLOSURE_INIT(&on_send_message_next_done_, OnSendMessageNextDone, elem, grpc_schedule_on_exec_ctx); } grpc_error* CallData::ProcessSendInitialMetadata( - grpc_metadata_batch* initial_metadata) { - ChannelData* channeld = static_cast(elem_->channel_data); + grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { + ChannelData* channeld = static_cast(elem->channel_data); // Find the compression algorithm. grpc_compression_algorithm compression_algorithm = FindCompressionAlgorithm(initial_metadata, channeld); @@ -243,14 +245,14 @@ grpc_error* CallData::ProcessSendInitialMetadata( // Hint compression algorithm. grpc_error* error = GRPC_ERROR_NONE; if (message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { - InitializeState(); + InitializeState(elem); error = grpc_metadata_batch_add_tail( initial_metadata, &message_compression_algorithm_storage_, grpc_message_compression_encoding_mdelem( message_compression_algorithm_), GRPC_BATCH_GRPC_ENCODING); } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { - InitializeState(); + InitializeState(elem); error = grpc_metadata_batch_add_tail( initial_metadata, &stream_compression_algorithm_storage_, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm), @@ -276,25 +278,23 @@ grpc_error* CallData::ProcessSendInitialMetadata( return error; } -void CallData::SendMessageOnComplete(void* arg, grpc_error* error) { - CallData* calld = static_cast(arg); +void CallData::SendMessageOnComplete(void* calld_arg, grpc_error* error) { + CallData* calld = static_cast(calld_arg); grpc_slice_buffer_reset_and_unref_internal(&calld->slices_); grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_send_message_on_complete_, GRPC_ERROR_REF(error)); } -void CallData::SendMessageBatchContinue() { +void CallData::SendMessageBatchContinue(grpc_call_element* elem) { // Note: The call to grpc_call_next_op() results in yielding the - // call combiner, so we need to clear calld->send_message_batch_ - // before we do that. - grpc_transport_stream_op_batch* send_message_batch = - this->send_message_batch_; + // call combiner, so we need to clear send_message_batch_ before we do that. + grpc_transport_stream_op_batch* send_message_batch = send_message_batch_; send_message_batch_ = nullptr; - grpc_call_next_op(elem_, send_message_batch); + grpc_call_next_op(elem, send_message_batch); } -void CallData::FinishSendMessage() { +void CallData::FinishSendMessage(grpc_call_element* elem) { GPR_DEBUG_ASSERT(message_compression_algorithm_ != GRPC_MESSAGE_COMPRESS_NONE); // Compress the data if appropriate. @@ -334,17 +334,17 @@ void CallData::FinishSendMessage() { grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - replacement_stream_.Init(&slices_, send_flags); + replacement_stream_.emplace(&slices_, send_flags); send_message_batch_->payload->send_message.send_message.reset( - replacement_stream_.get()); + &replacement_stream_.value()); original_send_message_on_complete_ = send_message_batch_->on_complete; send_message_batch_->on_complete = &send_message_on_complete_; - SendMessageBatchContinue(); + SendMessageBatchContinue(elem); } -void CallData::FailSendMessageBatchInCallCombiner(void* arg, +void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg, grpc_error* error) { - CallData* calld = static_cast(arg); + CallData* calld = static_cast(calld_arg); if (calld->send_message_batch_ != nullptr) { grpc_transport_stream_op_batch_finish_with_failure( calld->send_message_batch_, GRPC_ERROR_REF(error), @@ -353,8 +353,7 @@ void CallData::FailSendMessageBatchInCallCombiner(void* arg, } } -// Pulls a slice from the send_message byte stream and adds it to -// calld->slices_. +// Pulls a slice from the send_message byte stream and adds it to slices_. grpc_error* CallData::PullSliceFromSendMessage() { grpc_slice incoming_slice; grpc_error* error = @@ -369,11 +368,11 @@ grpc_error* CallData::PullSliceFromSendMessage() { // Reads as many slices as possible from the send_message byte stream. // If all data has been read, invokes FinishSendMessage(). Otherwise, // an async call to ByteStream::Next() has been started, which will -// eventually result in calling on_send_message_next_done(). -void CallData::ContinueReadingSendMessage() { +// eventually result in calling OnSendMessageNextDone(). +void CallData::ContinueReadingSendMessage(grpc_call_element* elem) { if (slices_.length == send_message_batch_->payload->send_message.send_message->length()) { - FinishSendMessage(); + FinishSendMessage(elem); return; } while (send_message_batch_->payload->send_message.send_message->Next( @@ -387,15 +386,16 @@ void CallData::ContinueReadingSendMessage() { } if (slices_.length == send_message_batch_->payload->send_message.send_message->length()) { - FinishSendMessage(); + FinishSendMessage(elem); break; } } } // Async callback for ByteStream::Next(). -void CallData::OnSendMessageNextDone(void* arg, grpc_error* error) { - CallData* calld = static_cast(arg); +void CallData::OnSendMessageNextDone(void* elem_arg, grpc_error* error) { + grpc_call_element* elem = static_cast(elem_arg); + CallData* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { // Closure callback; does not take ownership of error. FailSendMessageBatchInCallCombiner(calld, error); @@ -410,109 +410,111 @@ void CallData::OnSendMessageNextDone(void* arg, grpc_error* error) { } if (calld->slices_.length == calld->send_message_batch_->payload->send_message .send_message->length()) { - calld->FinishSendMessage(); + calld->FinishSendMessage(elem); } else { - calld->ContinueReadingSendMessage(); + calld->ContinueReadingSendMessage(elem); } } -void CallData::StartSendMessageBatch(void* arg, grpc_error* /*unused*/) { - CallData* calld = static_cast(arg); +void CallData::StartSendMessageBatch(void* elem_arg, grpc_error* /*unused*/) { + grpc_call_element* elem = static_cast(elem_arg); + CallData* calld = static_cast(elem->call_data); if (calld->SkipMessageCompression()) { - calld->SendMessageBatchContinue(); + calld->SendMessageBatchContinue(elem); } else { - calld->ContinueReadingSendMessage(); + calld->ContinueReadingSendMessage(elem); } } void CallData::CompressStartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0); - CallData* calld = static_cast(elem->call_data); // Handle cancel_stream. if (batch->cancel_stream) { - GRPC_ERROR_UNREF(calld->cancel_error_); - calld->cancel_error_ = - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (calld->send_message_batch_ != nullptr) { - if (!calld->seen_initial_metadata_) { + GRPC_ERROR_UNREF(cancel_error_); + cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + if (send_message_batch_ != nullptr) { + if (!seen_initial_metadata_) { GRPC_CALL_COMBINER_START( - calld->call_combiner_, - GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, calld, + call_combiner_, + GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this, grpc_schedule_on_exec_ctx), - GRPC_ERROR_REF(calld->cancel_error_), "failing send_message op"); + GRPC_ERROR_REF(cancel_error_), "failing send_message op"); } else { - calld->send_message_batch_->payload->send_message.send_message - ->Shutdown(GRPC_ERROR_REF(calld->cancel_error_)); + send_message_batch_->payload->send_message.send_message->Shutdown( + GRPC_ERROR_REF(cancel_error_)); } } - } else if (calld->cancel_error_ != GRPC_ERROR_NONE) { + } else if (cancel_error_ != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_); + batch, GRPC_ERROR_REF(cancel_error_), call_combiner_); return; } // Handle send_initial_metadata. if (batch->send_initial_metadata) { - GPR_ASSERT(!calld->seen_initial_metadata_); - grpc_error* error = calld->ProcessSendInitialMetadata( - batch->payload->send_initial_metadata.send_initial_metadata); + GPR_ASSERT(!seen_initial_metadata_); + grpc_error* error = ProcessSendInitialMetadata( + elem, batch->payload->send_initial_metadata.send_initial_metadata); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure(batch, error, - calld->call_combiner_); + call_combiner_); return; } - calld->seen_initial_metadata_ = true; + seen_initial_metadata_ = true; // If we had previously received a batch containing a send_message op, // handle it now. Note that we need to re-enter the call combiner // for this, since we can't send two batches down while holding the // call combiner, since the connected_channel filter (at the bottom of // the call stack) will release the call combiner for each batch it sees. - if (calld->send_message_batch_ != nullptr) { + if (send_message_batch_ != nullptr) { GRPC_CALL_COMBINER_START( - calld->call_combiner_, - &calld->start_send_message_batch_in_call_combiner_, GRPC_ERROR_NONE, - "starting send_message after send_initial_metadata"); + call_combiner_, &start_send_message_batch_in_call_combiner_, + GRPC_ERROR_NONE, "starting send_message after send_initial_metadata"); } } // Handle send_message. if (batch->send_message) { - GPR_ASSERT(calld->send_message_batch_ == nullptr); - calld->send_message_batch_ = batch; + GPR_ASSERT(send_message_batch_ == nullptr); + send_message_batch_ = batch; // If we have not yet seen send_initial_metadata, then we have to - // wait. We save the batch in calld and then drop the call - // combiner, which we'll have to pick up again later when we get - // send_initial_metadata. - if (!calld->seen_initial_metadata_) { + // wait. We save the batch and then drop the call combiner, which we'll + // have to pick up again later when we get send_initial_metadata. + if (!seen_initial_metadata_) { GRPC_CALL_COMBINER_STOP( - calld->call_combiner_, - "send_message batch pending send_initial_metadata"); + call_combiner_, "send_message batch pending send_initial_metadata"); return; } - StartSendMessageBatch(calld, GRPC_ERROR_NONE); + StartSendMessageBatch(elem, GRPC_ERROR_NONE); } else { // Pass control down the stack. grpc_call_next_op(elem, batch); } } +void CompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + CallData* calld = static_cast(elem->call_data); + calld->CompressStartTransportStreamOpBatch(elem, batch); +} + /* Constructor for call_data */ -static grpc_error* CompressInitCallElem(grpc_call_element* elem, - const grpc_call_element_args* args) { +grpc_error* CompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { new (elem->call_data) CallData(elem, *args); return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void CompressDestroyCallElem(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { +void CompressDestroyCallElem(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { CallData* calld = static_cast(elem->call_data); calld->~CallData(); } /* Constructor for ChannelData */ -static grpc_error* CompressInitChannelElem(grpc_channel_element* elem, - grpc_channel_element_args* args) { +grpc_error* CompressInitChannelElem(grpc_channel_element* elem, + grpc_channel_element_args* args) { new (elem->channel_data) ChannelData(args); return GRPC_ERROR_NONE; } @@ -526,7 +528,7 @@ void CompressDestroyChannelElem(grpc_channel_element* elem) { } // namespace const grpc_channel_filter grpc_message_compress_filter = { - CallData::CompressStartTransportStreamOpBatch, + CompressStartTransportStreamOpBatch, grpc_channel_next_op, sizeof(CallData), CompressInitCallElem, From 7a654ed1a5569c5bb3ee443ad24425a265e19669 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 10 Apr 2020 03:34:37 -0700 Subject: [PATCH 3/4] Remove const data member from ByteStream to make it copyable --- src/core/lib/transport/byte_stream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 6988ab843b2..ffa9b47c20d 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -72,7 +72,7 @@ class ByteStream : public Orphanable { : length_(length), flags_(flags) {} private: - const uint32_t length_; + uint32_t length_; uint32_t flags_; }; From cdf7b2cc7a9404d993c43d44ca00b762b7cfa087 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 10 Apr 2020 14:03:25 -0700 Subject: [PATCH 4/4] Use std::aligned_storage instead of absl::optional to maintain compatibility with Windows --- .../http/message_compress/message_compress_filter.cc | 11 ++++++++--- src/core/lib/transport/byte_stream.h | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index b962c99e1d5..09c914fab07 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -163,7 +163,10 @@ class CallData { grpc_linked_mdelem accept_encoding_storage_; grpc_linked_mdelem accept_stream_encoding_storage_; grpc_slice_buffer slices_; /**< Buffers up input slices to be compressed */ - absl::optional replacement_stream_; + // Allocate space for the replacement stream + std::aligned_storage::type + replacement_stream_; grpc_closure* original_send_message_on_complete_ = nullptr; grpc_closure send_message_on_complete_; grpc_closure on_send_message_next_done_; @@ -334,9 +337,11 @@ void CallData::FinishSendMessage(grpc_call_element* elem) { grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - replacement_stream_.emplace(&slices_, send_flags); + new (&replacement_stream_) + grpc_core::SliceBufferByteStream(&slices_, send_flags); send_message_batch_->payload->send_message.send_message.reset( - &replacement_stream_.value()); + reinterpret_cast( + &replacement_stream_)); original_send_message_on_complete_ = send_message_batch_->on_complete; send_message_batch_->on_complete = &send_message_on_complete_; SendMessageBatchContinue(elem); diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index ffa9b47c20d..6988ab843b2 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -72,7 +72,7 @@ class ByteStream : public Orphanable { : length_(length), flags_(flags) {} private: - uint32_t length_; + const uint32_t length_; uint32_t flags_; };