@ -36,41 +36,29 @@
static const size_t kMaxPayloadSizeForGet = 2048 ;
typedef struct call_data {
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method ;
grpc_linked_mdelem scheme ;
grpc_linked_mdelem authority ;
grpc_linked_mdelem te_trailers ;
grpc_linked_mdelem content_type ;
grpc_linked_mdelem user_agent ;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch * recv_initial_metadata ;
grpc_closure * original_recv_initial_metadata_ready ;
grpc_closure recv_initial_metadata_ready ;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch * recv_trailing_metadata ;
uint8_t * payload_bytes ;
/* Vars to read data off of send_message */
grpc_transport_stream_op_batch * send_op ;
uint32_t send_length ;
uint32_t send_flags ;
grpc_slice incoming_slice ;
grpc_slice_buffer_stream replacement_stream ;
grpc_slice_buffer slices ;
/* flag that indicates that all slices of send_messages aren't availble */
bool send_message_blocked ;
/** Closure to call when finished with the hc_on_recv hook */
grpc_closure * on_done_recv_initial_metadata ;
grpc_closure * on_done_recv_trailing_metadata ;
grpc_closure * on_complete ;
grpc_closure * post_send ;
/** Receive closures are chained: we inject this closure as the on_done_recv
up - call on transport_op , and remember to call our on_done_recv member
after handling it . */
grpc_closure hc_on_recv_initial_metadata ;
grpc_closure hc_on_recv_trailing_metadata ;
grpc_closure hc_on_complete ;
grpc_closure got_slice ;
grpc_closure send_done ;
grpc_closure * original_recv_trailing_metadata_on_complete ;
grpc_closure recv_trailing_metadata_on_complete ;
// State for handling send_message ops.
grpc_transport_stream_op_batch * send_message_batch ;
size_t send_message_bytes_read ;
grpc_byte_stream_cache send_message_cache ;
grpc_caching_byte_stream send_message_caching_stream ;
grpc_closure on_send_message_next_done ;
grpc_closure * original_send_message_on_complete ;
grpc_closure send_message_on_complete ;
} call_data ;
typedef struct channel_data {
@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE ;
}
static void hc_on_ recv_initial_metadata( grpc_exec_ctx * exec_ctx ,
static void recv_initial_metadata_ready ( grpc_exec_ctx * exec_ctx ,
void * user_data , grpc_error * error ) {
grpc_call_element * elem = user_data ;
call_data * calld = elem - > call_data ;
@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
} else {
GRPC_ERROR_REF ( error ) ;
}
GRPC_CLOSURE_RUN ( exec_ctx , calld - > on_done_recv_initial_metadata , error ) ;
GRPC_CLOSURE_RUN ( exec_ctx , calld - > original_recv_initial_metadata_ready ,
error ) ;
}
static void hc_on_recv_trailing_metadata ( grpc_exec_ctx * exec_ctx ,
void * user_data , grpc_error * error ) {
static void recv_trailing_metadata_on_complete ( grpc_exec_ctx * exec_ctx ,
void * user_data ,
grpc_error * error ) {
grpc_call_element * elem = user_data ;
call_data * calld = elem - > call_data ;
if ( error = = GRPC_ERROR_NONE ) {
@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
} else {
GRPC_ERROR_REF ( error ) ;
}
GRPC_CLOSURE_RUN ( exec_ctx , calld - > on_done_recv_trailing_metadata , error ) ;
GRPC_CLOSURE_RUN ( exec_ctx , calld - > original_recv_trailing_metadata_on_complete ,
error ) ;
}
static void hc_on_complete ( grpc_exec_ctx * exec_ctx , void * user_data ,
grpc_error * error ) {
grpc_call_element * elem = user_data ;
call_data * calld = elem - > call_data ;
if ( calld - > payload_bytes ) {
gpr_free ( calld - > payload_bytes ) ;
calld - > payload_bytes = NULL ;
static void send_message_on_complete ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_call_element * elem = ( grpc_call_element * ) arg ;
call_data * calld = ( call_data * ) elem - > call_data ;
grpc_byte_stream_cache_destroy ( exec_ctx , & calld - > send_message_cache ) ;
GRPC_CLOSURE_RUN ( exec_ctx , calld - > original_send_message_on_complete ,
GRPC_ERROR_REF ( error ) ) ;
}
// Pulls a slice from the send_message byte stream, updating
// calld->send_message_bytes_read.
static grpc_error * pull_slice_from_send_message ( grpc_exec_ctx * exec_ctx ,
call_data * calld ) {
grpc_slice incoming_slice ;
grpc_error * error = grpc_byte_stream_pull (
exec_ctx , & calld - > send_message_caching_stream . base , & incoming_slice ) ;
if ( error = = GRPC_ERROR_NONE ) {
calld - > send_message_bytes_read + = GRPC_SLICE_LENGTH ( incoming_slice ) ;
grpc_slice_unref_internal ( exec_ctx , incoming_slice ) ;
}
calld - > on_complete - > cb ( exec_ctx , calld - > on_complete - > cb_arg , error ) ;
return error ;
}
static void send_done ( grpc_exec_ctx * exec_ctx , void * elemp , grpc_error * error ) {
grpc_call_element * elem = elemp ;
call_data * calld = elem - > call_data ;
grpc_slice_buffer_reset_and_unref_internal ( exec_ctx , & calld - > slices ) ;
calld - > post_send - > cb ( exec_ctx , calld - > post_send - > cb_arg , error ) ;
// Reads as many slices as possible from the send_message byte stream.
// Upon successful return, if calld->send_message_bytes_read ==
// calld->send_message_caching_stream.base.length, then we have completed
// reading from the byte stream; otherwise, an async read has been dispatched
// and on_send_message_next_done() will be invoked when it is complete.
static grpc_error * read_all_available_send_message_data ( grpc_exec_ctx * exec_ctx ,
call_data * calld ) {
while ( grpc_byte_stream_next ( exec_ctx ,
& calld - > send_message_caching_stream . base ,
~ ( size_t ) 0 , & calld - > on_send_message_next_done ) ) {
grpc_error * error = pull_slice_from_send_message ( exec_ctx , calld ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( calld - > send_message_bytes_read = =
calld - > send_message_caching_stream . base . length ) {
break ;
}
}
return GRPC_ERROR_NONE ;
}
// Async callback for grpc_byte_stream_next().
static void on_send_message_next_done ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_call_element * elem = ( grpc_call_element * ) arg ;
call_data * calld = ( call_data * ) elem - > call_data ;
if ( error ! = GRPC_ERROR_NONE ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > send_message_batch , error ) ;
return ;
}
error = pull_slice_from_send_message ( exec_ctx , calld ) ;
if ( error ! = GRPC_ERROR_NONE ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > send_message_batch , error ) ;
return ;
}
// There may or may not be more to read, but we don't care. If we got
// here, then we know that all of the data was not available
// synchronously, so we were not able to do a cached call. Instead,
// we just reset the byte stream and then send down the batch as-is.
grpc_caching_byte_stream_reset ( & calld - > send_message_caching_stream ) ;
grpc_call_next_op ( exec_ctx , elem , calld - > send_message_batch ) ;
}
static char * slice_buffer_to_string ( grpc_slice_buffer * slice_buffer ) {
char * payload_bytes = gpr_malloc ( slice_buffer - > length + 1 ) ;
size_t offset = 0 ;
for ( size_t i = 0 ; i < slice_buffer - > count ; + + i ) {
memcpy ( payload_bytes + offset ,
GRPC_SLICE_START_PTR ( slice_buffer - > slices [ i ] ) ,
GRPC_SLICE_LENGTH ( slice_buffer - > slices [ i ] ) ) ;
offset + = GRPC_SLICE_LENGTH ( slice_buffer - > slices [ i ] ) ;
}
* ( payload_bytes + offset ) = ' \0 ' ;
return payload_bytes ;
}
// Modifies the path entry in the batch's send_initial_metadata to
// append the base64-encoded query for a GET request.
static grpc_error * update_path_for_get ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_transport_stream_op_batch * batch ) {
call_data * calld = ( call_data * ) elem - > call_data ;
grpc_slice path_slice =
GRPC_MDVALUE ( batch - > payload - > send_initial_metadata . send_initial_metadata
- > idx . named . path - > md ) ;
/* sum up individual component's lengths and allocate enough memory to
* hold combined path + query */
size_t estimated_len = GRPC_SLICE_LENGTH ( path_slice ) ;
estimated_len + + ; /* for the '?' */
estimated_len + = grpc_base64_estimate_encoded_size (
batch - > payload - > send_message . send_message - > length , true /* url_safe */ ,
false /* multi_line */ ) ;
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC ( estimated_len ) ;
/* memcopy individual pieces into this slice */
char * write_ptr = ( char * ) GRPC_SLICE_START_PTR ( path_with_query_slice ) ;
char * original_path = ( char * ) GRPC_SLICE_START_PTR ( path_slice ) ;
memcpy ( write_ptr , original_path , GRPC_SLICE_LENGTH ( path_slice ) ) ;
write_ptr + = GRPC_SLICE_LENGTH ( path_slice ) ;
* write_ptr + + = ' ? ' ;
char * payload_bytes =
slice_buffer_to_string ( & calld - > send_message_cache . cache_buffer ) ;
grpc_base64_encode_core ( ( char * ) write_ptr , payload_bytes ,
batch - > payload - > send_message . send_message - > length ,
true /* url_safe */ , false /* multi_line */ ) ;
gpr_free ( payload_bytes ) ;
/* remove trailing unused memory and add trailing 0 to terminate string */
char * t = ( char * ) GRPC_SLICE_START_PTR ( path_with_query_slice ) ;
/* safe to use strlen since base64_encode will always add '\0' */
path_with_query_slice =
grpc_slice_sub_no_ref ( path_with_query_slice , 0 , strlen ( t ) ) ;
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query =
grpc_mdelem_from_slices ( exec_ctx , GRPC_MDSTR_PATH , path_with_query_slice ) ;
grpc_metadata_batch * b =
batch - > payload - > send_initial_metadata . send_initial_metadata ;
return grpc_metadata_batch_substitute ( exec_ctx , b , b - > idx . named . path ,
mdelem_path_and_query ) ;
}
static void remove_if_present ( grpc_exec_ctx * exec_ctx ,
@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx,
}
}
static void continue_send_message ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
static void hc_start_transport_stream_op_batch (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_transport_stream_op_batch * batch ) {
call_data * calld = elem - > call_data ;
uint8_t * wrptr = calld - > payload_bytes ;
while ( grpc_byte_stream_next (
exec_ctx , calld - > send_op - > payload - > send_message . send_message , ~ ( size_t ) 0 ,
& calld - > got_slice ) ) {
grpc_byte_stream_pull ( exec_ctx ,
calld - > send_op - > payload - > send_message . send_message ,
& calld - > incoming_slice ) ;
if ( GRPC_SLICE_LENGTH ( calld - > incoming_slice ) > 0 ) {
memcpy ( wrptr , GRPC_SLICE_START_PTR ( calld - > incoming_slice ) ,
GRPC_SLICE_LENGTH ( calld - > incoming_slice ) ) ;
}
wrptr + = GRPC_SLICE_LENGTH ( calld - > incoming_slice ) ;
grpc_slice_buffer_add ( & calld - > slices , calld - > incoming_slice ) ;
if ( calld - > send_length = = calld - > slices . length ) {
calld - > send_message_blocked = false ;
break ;
}
}
}
channel_data * channeld = elem - > channel_data ;
GPR_TIMER_BEGIN ( " hc_start_transport_stream_op_batch " , 0 ) ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , batch ) ;
static void got_slice ( grpc_exec_ctx * exec_ctx , void * elemp , grpc_error * error ) {
grpc_call_element * elem = elemp ;
call_data * calld = elem - > call_data ;
calld - > send_message_blocked = false ;
if ( GRPC_ERROR_NONE ! =
grpc_byte_stream_pull ( exec_ctx ,
calld - > send_op - > payload - > send_message . send_message ,
& calld - > incoming_slice ) ) {
/* Should never reach here */
abort ( ) ;
}
grpc_slice_buffer_add ( & calld - > slices , calld - > incoming_slice ) ;
if ( calld - > send_length = = calld - > slices . length ) {
/* Pass down the original send_message op that was blocked.*/
grpc_slice_buffer_stream_init ( & calld - > replacement_stream , & calld - > slices ,
calld - > send_flags ) ;
calld - > send_op - > payload - > send_message . send_message =
& calld - > replacement_stream . base ;
calld - > post_send = calld - > send_op - > on_complete ;
calld - > send_op - > on_complete = & calld - > send_done ;
grpc_call_next_op ( exec_ctx , elem , calld - > send_op ) ;
} else {
continue_send_message ( exec_ctx , elem ) ;
if ( batch - > recv_initial_metadata ) {
/* substitute our callback for the higher callback */
calld - > recv_initial_metadata =
batch - > payload - > recv_initial_metadata . recv_initial_metadata ;
calld - > original_recv_initial_metadata_ready =
batch - > payload - > recv_initial_metadata . recv_initial_metadata_ready ;
batch - > payload - > recv_initial_metadata . recv_initial_metadata_ready =
& calld - > recv_initial_metadata_ready ;
}
}
static grpc_error * hc_mutate_op ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_transport_stream_op_batch * op ) {
/* grab pointers to our data from the call element */
call_data * calld = elem - > call_data ;
channel_data * channeld = elem - > channel_data ;
grpc_error * error ;
if ( batch - > recv_trailing_metadata ) {
/* substitute our callback for the higher callback */
calld - > recv_trailing_metadata =
batch - > payload - > recv_trailing_metadata . recv_trailing_metadata ;
calld - > original_recv_trailing_metadata_on_complete = batch - > on_complete ;
batch - > on_complete = & calld - > recv_trailing_metadata_on_complete ;
}
if ( op - > send_initial_metadata ) {
/* Decide which HTTP VERB to use. We use GET if the request is marked
cacheable , and the operation contains both initial metadata and send
message , and the payload is below the size threshold , and all the data
for this request is immediately available . */
grpc_error * error = GRPC_ERROR_NONE ;
bool batch_will_be_handled_asynchronously = false ;
if ( batch - > send_initial_metadata ) {
// Decide which HTTP VERB to use. We use GET if the request is marked
// cacheable, and the operation contains both initial metadata and send
// message, and the payload is below the size threshold, and all the data
// for this request is immediately available.
grpc_mdelem method = GRPC_MDELEM_METHOD_POST ;
if ( op - > send_message & &
( op - > payload - > send_initial_metadata . send_initial_metadata_flags &
if ( batch - > send_message & &
( batch - > payload - > send_initial_metadata . send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST ) & &
op - > payload - > send_message . send_message - > length <
batch - > payload - > send_message . send_message - > length <
channeld - > max_payload_size_for_get ) {
method = GRPC_MDELEM_METHOD_GET ;
/* The following write to calld->send_message_blocked isn't racy with
reads in hc_start_transport_op ( which deals with SEND_MESSAGE ops ) because
being here means ops - > send_message is not NULL , which is primarily
guarding the read there . */
calld - > send_message_blocked = true ;
} else if ( op - > payload - > send_initial_metadata . send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST ) {
method = GRPC_MDELEM_METHOD_PUT ;
}
/* Attempt to read the data from send_message and create a header field. */
if ( grpc_mdelem_eq ( method , GRPC_MDELEM_METHOD_GET ) ) {
/* allocate memory to hold the entire payload */
calld - > payload_bytes =
gpr_malloc ( op - > payload - > send_message . send_message - > length ) ;
/* read slices of send_message and copy into payload_bytes */
calld - > send_op = op ;
calld - > send_length = op - > payload - > send_message . send_message - > length ;
calld - > send_flags = op - > payload - > send_message . send_message - > flags ;
continue_send_message ( exec_ctx , elem ) ;
if ( calld - > send_message_blocked = = false ) {
/* when all the send_message data is available, then modify the path
* MDELEM by appending base64 encoded query to the path */
const int k_url_safe = 1 ;
const int k_multi_line = 0 ;
const unsigned char k_query_separator = ' ? ' ;
grpc_slice path_slice =
GRPC_MDVALUE ( op - > payload - > send_initial_metadata
. send_initial_metadata - > idx . named . path - > md ) ;
/* sum up individual component's lengths and allocate enough memory to
* hold combined path + query */
size_t estimated_len = GRPC_SLICE_LENGTH ( path_slice ) ;
estimated_len + + ; /* for the '?' */
estimated_len + = grpc_base64_estimate_encoded_size (
op - > payload - > send_message . send_message - > length , k_url_safe ,
k_multi_line ) ;
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC ( estimated_len ) ;
/* memcopy individual pieces into this slice */
uint8_t * write_ptr =
( uint8_t * ) GRPC_SLICE_START_PTR ( path_with_query_slice ) ;
uint8_t * original_path = ( uint8_t * ) GRPC_SLICE_START_PTR ( path_slice ) ;
memcpy ( write_ptr , original_path , GRPC_SLICE_LENGTH ( path_slice ) ) ;
write_ptr + = GRPC_SLICE_LENGTH ( path_slice ) ;
* write_ptr = k_query_separator ;
write_ptr + + ; /* for the '?' */
grpc_base64_encode_core ( ( char * ) write_ptr , calld - > payload_bytes ,
op - > payload - > send_message . send_message - > length ,
k_url_safe , k_multi_line ) ;
/* remove trailing unused memory and add trailing 0 to terminate string
*/
char * t = ( char * ) GRPC_SLICE_START_PTR ( path_with_query_slice ) ;
/* safe to use strlen since base64_encode will always add '\0' */
path_with_query_slice =
grpc_slice_sub_no_ref ( path_with_query_slice , 0 , strlen ( t ) ) ;
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices (
exec_ctx , GRPC_MDSTR_PATH , path_with_query_slice ) ;
grpc_metadata_batch * b =
op - > payload - > send_initial_metadata . send_initial_metadata ;
error = grpc_metadata_batch_substitute ( exec_ctx , b , b - > idx . named . path ,
mdelem_path_and_query ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
calld - > on_complete = op - > on_complete ;
op - > on_complete = & calld - > hc_on_complete ;
op - > send_message = false ;
calld - > send_message_bytes_read = 0 ;
grpc_byte_stream_cache_init ( & calld - > send_message_cache ,
batch - > payload - > send_message . send_message ) ;
grpc_caching_byte_stream_init ( & calld - > send_message_caching_stream ,
& calld - > send_message_cache ) ;
batch - > payload - > send_message . send_message =
& calld - > send_message_caching_stream . base ;
calld - > original_send_message_on_complete = batch - > on_complete ;
batch - > on_complete = & calld - > send_message_on_complete ;
calld - > send_message_batch = batch ;
error = read_all_available_send_message_data ( exec_ctx , calld ) ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
// If all the data has been read, then we can use GET.
if ( calld - > send_message_bytes_read = =
calld - > send_message_caching_stream . base . length ) {
method = GRPC_MDELEM_METHOD_GET ;
error = update_path_for_get ( exec_ctx , elem , batch ) ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
batch - > send_message = false ;
grpc_byte_stream_destroy ( exec_ctx ,
& calld - > send_message_caching_stream . base ) ;
} else {
/* Not all data is available. Fall back to POST. */
// Not all data is available. The batch will be sent down
// asynchronously in on_send_message_next_done().
batch_will_be_handled_asynchronously = true ;
// Fall back to POST.
gpr_log ( GPR_DEBUG ,
" Request is marked Cacheable but not all data is available. \
Falling back to POST " );
method = GRPC_MDELEM_METHOD_POST ;
" Request is marked Cacheable but not all data is available. "
" Falling back to POST " ) ;
}
} else if ( batch - > payload - > send_initial_metadata
. send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST ) {
method = GRPC_MDELEM_METHOD_PUT ;
}
remove_if_present ( exec_ctx ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_METHOD ) ;
remove_if_present ( exec_ctx ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_SCHEME ) ;
remove_if_present ( exec_ctx ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_TE ) ;
remove_if_present ( exec_ctx ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_CONTENT_TYPE ) ;
remove_if_present ( exec_ctx ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_USER_AGENT ) ;
remove_if_present (
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_METHOD ) ;
remove_if_present (
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_SCHEME ) ;
remove_if_present (
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_TE ) ;
remove_if_present (
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_CONTENT_TYPE ) ;
remove_if_present (
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
GRPC_BATCH_USER_AGENT ) ;
/* Send : prefixed headers, which have to be before any application
layer headers . */
error = grpc_metadata_batch_add_head (
exec_ctx , op - > payload - > send_initial_metadata . send_initial_metadata ,
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
& calld - > method , method ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
error = grpc_metadata_batch_add_head (
exec_ctx , op - > payload - > send_initial_metadata . send_initial_metadata ,
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
& calld - > scheme , channeld - > static_scheme ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
error = grpc_metadata_batch_add_tail (
exec_ctx , op - > payload - > send_initial_metadata . send_initial_metadata ,
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
& calld - > te_trailers , GRPC_MDELEM_TE_TRAILERS ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
error = grpc_metadata_batch_add_tail (
exec_ctx , op - > payload - > send_initial_metadata . send_initial_metadata ,
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
& calld - > content_type , GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
error = grpc_metadata_batch_add_tail (
exec_ctx , op - > payload - > send_initial_metadata . send_initial_metadata ,
exec_ctx , batch - > payload - > send_initial_metadata . send_initial_metadata ,
& calld - > user_agent , GRPC_MDELEM_REF ( channeld - > user_agent ) ) ;
if ( error ! = GRPC_ERROR_NONE ) return error ;
if ( error ! = GRPC_ERROR_NONE ) goto done ;
}
if ( op - > recv_initial_metadata ) {
/* substitute our callback for the higher callback */
calld - > recv_initial_metadata =
op - > payload - > recv_initial_metadata . recv_initial_metadata ;
calld - > on_done_recv_initial_metadata =
op - > payload - > recv_initial_metadata . recv_initial_metadata_ready ;
op - > payload - > recv_initial_metadata . recv_initial_metadata_ready =
& calld - > hc_on_recv_initial_metadata ;
}
if ( op - > recv_trailing_metadata ) {
/* substitute our callback for the higher callback */
calld - > recv_trailing_metadata =
op - > payload - > recv_trailing_metadata . recv_trailing_metadata ;
calld - > on_done_recv_trailing_metadata = op - > on_complete ;
op - > on_complete = & calld - > hc_on_recv_trailing_metadata ;
}
return GRPC_ERROR_NONE ;
}
static void hc_start_transport_op ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_transport_stream_op_batch * op ) {
GPR_TIMER_BEGIN ( " hc_start_transport_op " , 0 ) ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
grpc_error * error = hc_mutate_op ( exec_ctx , elem , op ) ;
done :
if ( error ! = GRPC_ERROR_NONE ) {
grpc_transport_stream_op_batch_finish_with_failure ( exec_ctx , op , error ) ;
} else {
call_data * calld = elem - > call_data ;
if ( op - > send_message & & calld - > send_message_blocked ) {
/* Don't forward the op. send_message contains slices that aren't ready
yet . The call will be forwarded by the op_complete of slice read call .
*/
} else {
grpc_call_next_op ( exec_ctx , elem , op ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > send_message_batch , error ) ;
} else if ( ! batch_will_be_handled_asynchronously ) {
grpc_call_next_op ( exec_ctx , elem , batch ) ;
}
GPR_TIMER_END ( " hc_start_transport_op " , 0 ) ;
GPR_TIMER_END ( " hc_start_transport_stream_op_batch " , 0 ) ;
}
/* Constructor for call_data */
static grpc_error * init_call_elem ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
const grpc_call_element_args * args ) {
call_data * calld = elem - > call_data ;
calld - > on_done_recv_initial_metadata = NULL ;
calld - > on_done_recv_trailing_metadata = NULL ;
calld - > on_complete = NULL ;
calld - > payload_bytes = NULL ;
calld - > send_message_blocked = false ;
grpc_slice_buffer_init ( & calld - > slices ) ;
GRPC_CLOSURE_INIT ( & calld - > hc_on_recv_initial_metadata ,
hc_on_recv_initial_metadata , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > hc_on_recv_trailing_metadata ,
hc_on_recv_trailing_metadata , elem ,
call_data * calld = ( call_data * ) elem - > call_data ;
GRPC_CLOSURE_INIT ( & calld - > recv_initial_metadata_ready ,
recv_initial_metadata_ready , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > hc_on_complete , hc_on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > got_slice , got_slice , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > send_done , send_done , elem ,
GRPC_CLOSURE_INIT ( & calld - > recv_trailing_metadata_on_complete ,
recv_trailing_metadata_on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > send_message_on_complete , send_message_on_complete ,
elem , grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_INIT ( & calld - > on_send_message_next_done ,
on_send_message_next_done , elem , grpc_schedule_on_exec_ctx ) ;
return GRPC_ERROR_NONE ;
}
/* Destructor for call_data */
static void destroy_call_elem ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
const grpc_call_final_info * final_info ,
grpc_closure * ignored ) {
call_data * calld = elem - > call_data ;
grpc_slice_buffer_destroy_internal ( exec_ctx , & calld - > slices ) ;
}
grpc_closure * ignored ) { }
static grpc_mdelem scheme_from_args ( const grpc_channel_args * args ) {
unsigned i ;
@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op ,
hc_start_transport_stream_ op_batch ,
grpc_channel_next_op ,
sizeof ( call_data ) ,
init_call_elem ,