@ -72,29 +72,31 @@ struct call_data {
GRPC_CLOSURE_INIT ( & start_send_message_batch_in_call_combiner ,
start_send_message_batch , elem ,
grpc_schedule_on_exec_ctx ) ;
grpc_slice_buffer_init ( & slices ) ;
GRPC_CLOSURE_INIT ( & send_message_on_complete , : : send_message_on_complete ,
elem , grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & on_send_message_next_done , : : on_send_message_next_done ,
elem , grpc_schedule_on_exec_ctx ) ;
}
~ call_data ( ) {
if ( state_initialized ) {
grpc_slice_buffer_destroy_internal ( & slices ) ;
}
GRPC_ERROR_UNREF ( cancel_error ) ;
}
grpc_core : : CallCombiner * call_combiner ;
grpc_linked_mdelem message_compression_algorithm_storage ;
grpc_linked_mdelem stream_compression_algorithm_storage ;
grpc_linked_mdelem accept_encoding_storage ;
grpc_linked_mdelem accept_stream_encoding_storage ;
grpc_message_compression_algorithm message_compression_algorithm =
GRPC_MESSAGE_COMPRESS_NONE ;
bool seen_initial_metadata = false ;
grpc_error * cancel_error = GRPC_ERROR_NONE ;
grpc_closure start_send_message_batch_in_call_combiner ;
grpc_transport_stream_op_batch * send_message_batch = nullptr ;
bool seen_initial_metadata = false ;
/* Set to true, if the fields below are initialized. */
bool state_initialized = false ;
grpc_closure start_send_message_batch_in_call_combiner ;
/* The fields below are only initialized when we compress the payload.
* Keep them at the bottom of the struct , so they don ' t pollute the
* cache - lines . */
grpc_linked_mdelem message_compression_algorithm_storage ;
grpc_linked_mdelem stream_compression_algorithm_storage ;
grpc_linked_mdelem accept_encoding_storage ;
grpc_linked_mdelem accept_stream_encoding_storage ;
grpc_slice_buffer slices ; /**< Buffers up input slices to be compressed */
grpc_core : : ManualConstructor < grpc_core : : SliceBufferByteStream >
replacement_stream ;
@ -157,6 +159,18 @@ static grpc_compression_algorithm find_compression_algorithm(
return GRPC_COMPRESS_NONE ;
}
static void initialize_state ( grpc_call_element * elem , call_data * calld ) {
GPR_DEBUG_ASSERT ( ! calld - > state_initialized ) ;
calld - > state_initialized = true ;
grpc_slice_buffer_init ( & calld - > slices ) ;
GRPC_CLOSURE_INIT ( & calld - > send_message_on_complete ,
: : send_message_on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > on_send_message_next_done ,
: : on_send_message_next_done , elem ,
grpc_schedule_on_exec_ctx ) ;
}
static grpc_error * process_send_initial_metadata (
grpc_call_element * elem ,
grpc_metadata_batch * initial_metadata ) GRPC_MUST_USE_RESULT ;
@ -177,11 +191,13 @@ static grpc_error* process_send_initial_metadata(
// Hint compression algorithm.
grpc_error * error = GRPC_ERROR_NONE ;
if ( calld - > message_compression_algorithm ! = GRPC_MESSAGE_COMPRESS_NONE ) {
initialize_state ( elem , calld ) ;
error = grpc_metadata_batch_add_tail (
initial_metadata , & calld - > message_compression_algorithm_storage ,
grpc_message_compression_encoding_mdelem (
calld - > message_compression_algorithm ) ) ;
} else if ( stream_compression_algorithm ! = GRPC_STREAM_COMPRESS_NONE ) {
initialize_state ( elem , calld ) ;
error = grpc_metadata_batch_add_tail (
initial_metadata , & calld - > stream_compression_algorithm_storage ,
grpc_stream_compression_encoding_mdelem ( stream_compression_algorithm ) ) ;
@ -225,6 +241,8 @@ static void send_message_batch_continue(grpc_call_element* elem) {
static void finish_send_message ( grpc_call_element * elem ) {
call_data * calld = static_cast < call_data * > ( elem - > call_data ) ;
GPR_DEBUG_ASSERT ( calld - > message_compression_algorithm ! =
GRPC_MESSAGE_COMPRESS_NONE ) ;
// Compress the data if appropriate.
grpc_slice_buffer tmp ;
grpc_slice_buffer_init ( & tmp ) ;