|
|
@ -637,7 +637,8 @@ static void on_response_trailers_received( |
|
|
|
Utility function that takes the data from s->write_slice_buffer and assembles |
|
|
|
Utility function that takes the data from s->write_slice_buffer and assembles |
|
|
|
into a contiguous byte stream with 5 byte gRPC header prepended. |
|
|
|
into a contiguous byte stream with 5 byte gRPC header prepended. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, |
|
|
|
static void create_grpc_frame(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
grpc_slice_buffer *write_slice_buffer, |
|
|
|
char **pp_write_buffer, |
|
|
|
char **pp_write_buffer, |
|
|
|
size_t *p_write_buffer_size, uint32_t flags) { |
|
|
|
size_t *p_write_buffer_size, uint32_t flags) { |
|
|
|
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); |
|
|
|
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); |
|
|
@ -657,6 +658,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, |
|
|
|
*p++ = (uint8_t)(length); |
|
|
|
*p++ = (uint8_t)(length); |
|
|
|
/* append actual data */ |
|
|
|
/* append actual data */ |
|
|
|
memcpy(p, GRPC_SLICE_START_PTR(slice), length); |
|
|
|
memcpy(p, GRPC_SLICE_START_PTR(slice), length); |
|
|
|
|
|
|
|
grpc_slice_unref_internal(exec_ctx, slice); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
@ -1017,14 +1019,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
if (write_slice_buffer.count > 0) { |
|
|
|
if (write_slice_buffer.count > 0) { |
|
|
|
size_t write_buffer_size; |
|
|
|
size_t write_buffer_size; |
|
|
|
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, |
|
|
|
create_grpc_frame(exec_ctx, &write_slice_buffer, |
|
|
|
&write_buffer_size, |
|
|
|
&stream_state->ws.write_buffer, &write_buffer_size, |
|
|
|
stream_op->payload->send_message.send_message->flags); |
|
|
|
stream_op->payload->send_message.send_message->flags); |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, |
|
|
|
stream_state->ws.write_buffer); |
|
|
|
stream_state->ws.write_buffer); |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
|
|
|
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
|
|
|
(int)write_buffer_size, false); |
|
|
|
(int)write_buffer_size, false); |
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer); |
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
if (!stream_op->send_trailing_metadata) { |
|
|
|
if (!stream_op->send_trailing_metadata) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); |
|
|
@ -1153,6 +1156,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
stream_state->rs.remaining_bytes = 0; |
|
|
|
stream_state->rs.remaining_bytes = 0; |
|
|
|
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); |
|
|
|
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); |
|
|
|
|
|
|
|
/* Clean up read_slice_buffer in case there is unread data. */ |
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal( |
|
|
|
|
|
|
|
exec_ctx, &stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, |
|
|
|
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
@ -1206,6 +1212,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
memcpy(dst_p, stream_state->rs.read_buffer, |
|
|
|
memcpy(dst_p, stream_state->rs.read_buffer, |
|
|
|
(size_t)stream_state->rs.length_field); |
|
|
|
(size_t)stream_state->rs.length_field); |
|
|
|
null_and_maybe_free_read_buffer(s); |
|
|
|
null_and_maybe_free_read_buffer(s); |
|
|
|
|
|
|
|
/* Clean up read_slice_buffer in case there is unread data. */ |
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(exec_ctx, |
|
|
|
|
|
|
|
&stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
|
|
|
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, |
|
|
|
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, |
|
|
|
read_data_slice); |
|
|
|
read_data_slice); |
|
|
@ -1369,6 +1378,8 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
grpc_closure *then_schedule_closure) { |
|
|
|
grpc_closure *then_schedule_closure) { |
|
|
|
stream_obj *s = (stream_obj *)gs; |
|
|
|
stream_obj *s = (stream_obj *)gs; |
|
|
|
null_and_maybe_free_read_buffer(s); |
|
|
|
null_and_maybe_free_read_buffer(s); |
|
|
|
|
|
|
|
/* Clean up read_slice_buffer in case there is unread data. */ |
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer); |
|
|
|
GRPC_ERROR_UNREF(s->state.cancel_error); |
|
|
|
GRPC_ERROR_UNREF(s->state.cancel_error); |
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); |
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); |
|
|
|
} |
|
|
|
} |
|
|
|