|
|
|
@ -91,10 +91,9 @@ typedef enum { |
|
|
|
|
/* streams that are waiting to start because there are too many concurrent
|
|
|
|
|
streams on the connection */ |
|
|
|
|
WAITING_FOR_CONCURRENCY, |
|
|
|
|
/* streams that want to callback the application */ |
|
|
|
|
PENDING_CALLBACKS, |
|
|
|
|
/* streams that *ARE* calling back to the application */ |
|
|
|
|
EXECUTING_CALLBACKS, |
|
|
|
|
/* streams that have finished reading: we wait until unlock to coalesce
|
|
|
|
|
all changes into one callback */ |
|
|
|
|
FINISHED_READ_OP, |
|
|
|
|
STREAM_LIST_COUNT /* must be last */ |
|
|
|
|
} stream_list_id; |
|
|
|
|
|
|
|
|
@ -141,6 +140,12 @@ typedef enum { |
|
|
|
|
DTS_FRAME |
|
|
|
|
} deframe_transport_state; |
|
|
|
|
|
|
|
|
|
typedef enum { |
|
|
|
|
WRITE_STATE_OPEN, |
|
|
|
|
WRITE_STATE_QUEUED_CLOSE, |
|
|
|
|
WRITE_STATE_SENT_CLOSE |
|
|
|
|
} WRITE_STATE; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
stream *head; |
|
|
|
|
stream *tail; |
|
|
|
@ -182,6 +187,18 @@ typedef struct { |
|
|
|
|
gpr_slice debug; |
|
|
|
|
} pending_goaway; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
void (*cb)(void *user_data, int success); |
|
|
|
|
void *user_data; |
|
|
|
|
int success; |
|
|
|
|
} op_closure; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
op_closure *callbacks; |
|
|
|
|
size_t count; |
|
|
|
|
size_t capacity; |
|
|
|
|
} op_closure_array; |
|
|
|
|
|
|
|
|
|
struct transport { |
|
|
|
|
grpc_transport base; /* must be first */ |
|
|
|
|
const grpc_transport_callbacks *cb; |
|
|
|
@ -202,6 +219,10 @@ struct transport { |
|
|
|
|
gpr_uint8 closed; |
|
|
|
|
error_state error_state; |
|
|
|
|
|
|
|
|
|
/* queued callbacks */ |
|
|
|
|
op_closure_array pending_callbacks; |
|
|
|
|
op_closure_array executing_callbacks; |
|
|
|
|
|
|
|
|
|
/* stream indexing */ |
|
|
|
|
gpr_uint32 next_stream_id; |
|
|
|
|
gpr_uint32 last_incoming_stream_id; |
|
|
|
@ -281,13 +302,13 @@ struct stream { |
|
|
|
|
/* when the application requests writes be closed, the write_closed is
|
|
|
|
|
'queued'; when the close is flow controlled into the send path, we are |
|
|
|
|
'sending' it; when the write has been performed it is 'sent' */ |
|
|
|
|
gpr_uint8 queued_write_closed; |
|
|
|
|
gpr_uint8 sending_write_closed; |
|
|
|
|
gpr_uint8 sent_write_closed; |
|
|
|
|
WRITE_STATE write_state; |
|
|
|
|
gpr_uint8 send_closed; |
|
|
|
|
gpr_uint8 read_closed; |
|
|
|
|
gpr_uint8 cancelled; |
|
|
|
|
gpr_uint8 allow_window_updates; |
|
|
|
|
gpr_uint8 published_close; |
|
|
|
|
|
|
|
|
|
op_closure send_done_closure; |
|
|
|
|
op_closure recv_done_closure; |
|
|
|
|
|
|
|
|
|
stream_link links[STREAM_LIST_COUNT]; |
|
|
|
|
gpr_uint8 included[STREAM_LIST_COUNT]; |
|
|
|
@ -296,10 +317,14 @@ struct stream { |
|
|
|
|
grpc_linked_mdelem *incoming_metadata; |
|
|
|
|
size_t incoming_metadata_count; |
|
|
|
|
size_t incoming_metadata_capacity; |
|
|
|
|
grpc_linked_mdelem *old_incoming_metadata; |
|
|
|
|
gpr_timespec incoming_deadline; |
|
|
|
|
|
|
|
|
|
/* sops from application */ |
|
|
|
|
grpc_stream_op_buffer outgoing_sopb; |
|
|
|
|
grpc_stream_op_buffer *outgoing_sopb; |
|
|
|
|
grpc_stream_op_buffer *incoming_sopb; |
|
|
|
|
grpc_stream_state *publish_state; |
|
|
|
|
grpc_stream_state published_state; |
|
|
|
|
/* sops that have passed flow control to be written */ |
|
|
|
|
grpc_stream_op_buffer writing_sopb; |
|
|
|
|
|
|
|
|
@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id, |
|
|
|
|
grpc_chttp2_error_code error_code, int send_rst); |
|
|
|
|
static void cancel_stream(transport *t, stream *s, |
|
|
|
|
grpc_status_code local_status, |
|
|
|
|
grpc_chttp2_error_code error_code, int send_rst); |
|
|
|
|
grpc_chttp2_error_code error_code, |
|
|
|
|
grpc_mdstr *optional_message, int send_rst); |
|
|
|
|
static void finalize_cancellations(transport *t); |
|
|
|
|
static stream *lookup_stream(transport *t, gpr_uint32 id); |
|
|
|
|
static void remove_from_stream_map(transport *t, stream *s); |
|
|
|
@ -348,6 +374,14 @@ static void become_skip_parser(transport *t); |
|
|
|
|
static void recv_data(void *tp, gpr_slice *slices, size_t nslices, |
|
|
|
|
grpc_endpoint_cb_status error); |
|
|
|
|
|
|
|
|
|
static void schedule_cb(transport *t, op_closure closure, int success); |
|
|
|
|
static void maybe_finish_read(transport *t, stream *s); |
|
|
|
|
static void maybe_join_window_updates(transport *t, stream *s); |
|
|
|
|
static void finish_reads(transport *t); |
|
|
|
|
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); |
|
|
|
|
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); |
|
|
|
|
static void add_metadata_batch(transport *t, stream *s); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* CONSTRUCTION/DESTRUCTION/REFCOUNTING |
|
|
|
|
*/ |
|
|
|
@ -387,6 +421,9 @@ static void destruct_transport(transport *t) { |
|
|
|
|
} |
|
|
|
|
gpr_free(t->pings); |
|
|
|
|
|
|
|
|
|
gpr_free(t->pending_callbacks.callbacks); |
|
|
|
|
gpr_free(t->executing_callbacks.callbacks); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < t->num_pending_goaways; i++) { |
|
|
|
|
gpr_slice_unref(t->pending_goaways[i].debug); |
|
|
|
|
} |
|
|
|
@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); |
|
|
|
|
|
|
|
|
|
memset(t, 0, sizeof(*t)); |
|
|
|
|
|
|
|
|
|
t->base.vtable = &vtable; |
|
|
|
|
t->ep = ep; |
|
|
|
|
/* one ref is for destroy, the other for when ep becomes NULL */ |
|
|
|
@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, |
|
|
|
|
t->str_grpc_timeout = |
|
|
|
|
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); |
|
|
|
|
t->reading = 1; |
|
|
|
|
t->writing = 0; |
|
|
|
|
t->error_state = ERROR_STATE_NONE; |
|
|
|
|
t->next_stream_id = is_client ? 1 : 2; |
|
|
|
|
t->last_incoming_stream_id = 0; |
|
|
|
|
t->destroying = 0; |
|
|
|
|
t->closed = 0; |
|
|
|
|
t->is_client = is_client; |
|
|
|
|
t->outgoing_window = DEFAULT_WINDOW; |
|
|
|
|
t->incoming_window = DEFAULT_WINDOW; |
|
|
|
|
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; |
|
|
|
|
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; |
|
|
|
|
t->expect_continuation_stream_id = 0; |
|
|
|
|
t->pings = NULL; |
|
|
|
|
t->ping_count = 0; |
|
|
|
|
t->ping_capacity = 0; |
|
|
|
|
t->ping_counter = gpr_now().tv_nsec; |
|
|
|
|
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); |
|
|
|
|
grpc_chttp2_goaway_parser_init(&t->goaway_parser); |
|
|
|
|
t->pending_goaways = NULL; |
|
|
|
|
t->num_pending_goaways = 0; |
|
|
|
|
t->cap_pending_goaways = 0; |
|
|
|
|
gpr_slice_buffer_init(&t->outbuf); |
|
|
|
|
gpr_slice_buffer_init(&t->qbuf); |
|
|
|
|
grpc_sopb_init(&t->nuke_later_sopb); |
|
|
|
@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, |
|
|
|
|
needed. |
|
|
|
|
TODO(ctiller): tune this */ |
|
|
|
|
grpc_chttp2_stream_map_init(&t->stream_map, 8); |
|
|
|
|
memset(&t->lists, 0, sizeof(t->lists)); |
|
|
|
|
|
|
|
|
|
/* copy in initial settings to all setting sets */ |
|
|
|
|
for (i = 0; i < NUM_SETTING_SETS; i++) { |
|
|
|
@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&t->mu); |
|
|
|
|
t->calling_back = 1; |
|
|
|
|
ref_transport(t); |
|
|
|
|
ref_transport(t); /* matches unref at end of this function */ |
|
|
|
|
gpr_mu_unlock(&t->mu); |
|
|
|
|
|
|
|
|
|
sr = setup(arg, &t->base, t->metadata_context); |
|
|
|
@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, |
|
|
|
|
if (t->destroying) gpr_cv_signal(&t->cv); |
|
|
|
|
unlock(t); |
|
|
|
|
|
|
|
|
|
ref_transport(t); |
|
|
|
|
ref_transport(t); /* matches unref inside recv_data */ |
|
|
|
|
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); |
|
|
|
|
|
|
|
|
|
unref_transport(t); |
|
|
|
@ -573,10 +600,12 @@ static void goaway(grpc_transport *gt, grpc_status_code status, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int init_stream(grpc_transport *gt, grpc_stream *gs, |
|
|
|
|
const void *server_data) { |
|
|
|
|
const void *server_data, grpc_transport_op *initial_op) { |
|
|
|
|
transport *t = (transport *)gt; |
|
|
|
|
stream *s = (stream *)gs; |
|
|
|
|
|
|
|
|
|
memset(s, 0, sizeof(*s)); |
|
|
|
|
|
|
|
|
|
ref_transport(t); |
|
|
|
|
|
|
|
|
|
if (!server_data) { |
|
|
|
@ -585,6 +614,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, |
|
|
|
|
s->outgoing_window = 0; |
|
|
|
|
s->incoming_window = 0; |
|
|
|
|
} else { |
|
|
|
|
/* already locked */ |
|
|
|
|
s->id = (gpr_uint32)(gpr_uintptr)server_data; |
|
|
|
|
s->outgoing_window = |
|
|
|
|
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; |
|
|
|
@ -594,24 +624,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, |
|
|
|
|
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s->queued_write_closed = 0; |
|
|
|
|
s->sending_write_closed = 0; |
|
|
|
|
s->sent_write_closed = 0; |
|
|
|
|
s->read_closed = 0; |
|
|
|
|
s->cancelled = 0; |
|
|
|
|
s->allow_window_updates = 0; |
|
|
|
|
s->published_close = 0; |
|
|
|
|
s->incoming_metadata_count = 0; |
|
|
|
|
s->incoming_metadata_capacity = 0; |
|
|
|
|
s->incoming_metadata = NULL; |
|
|
|
|
s->incoming_deadline = gpr_inf_future; |
|
|
|
|
memset(&s->links, 0, sizeof(s->links)); |
|
|
|
|
memset(&s->included, 0, sizeof(s->included)); |
|
|
|
|
grpc_sopb_init(&s->outgoing_sopb); |
|
|
|
|
grpc_sopb_init(&s->writing_sopb); |
|
|
|
|
grpc_sopb_init(&s->callback_sopb); |
|
|
|
|
grpc_chttp2_data_parser_init(&s->parser); |
|
|
|
|
|
|
|
|
|
if (initial_op) perform_op_locked(t, s, initial_op); |
|
|
|
|
|
|
|
|
|
if (!server_data) { |
|
|
|
|
unlock(t); |
|
|
|
|
} |
|
|
|
@ -644,10 +663,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&t->mu); |
|
|
|
|
|
|
|
|
|
grpc_sopb_destroy(&s->outgoing_sopb); |
|
|
|
|
GPR_ASSERT(s->outgoing_sopb == NULL); |
|
|
|
|
GPR_ASSERT(s->incoming_sopb == NULL); |
|
|
|
|
grpc_sopb_destroy(&s->writing_sopb); |
|
|
|
|
grpc_sopb_destroy(&s->callback_sopb); |
|
|
|
|
grpc_chttp2_data_parser_destroy(&s->parser); |
|
|
|
|
for (i = 0; i < s->incoming_metadata_count; i++) { |
|
|
|
|
grpc_mdelem_unref(s->incoming_metadata[i].md); |
|
|
|
|
} |
|
|
|
|
gpr_free(s->incoming_metadata); |
|
|
|
|
gpr_free(s->old_incoming_metadata); |
|
|
|
|
|
|
|
|
|
unref_transport(t); |
|
|
|
|
} |
|
|
|
@ -710,8 +735,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void stream_list_join(transport *t, stream *s, stream_list_id id) { |
|
|
|
|
if (id == PENDING_CALLBACKS) |
|
|
|
|
GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); |
|
|
|
|
if (s->included[id]) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -720,6 +743,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) { |
|
|
|
|
|
|
|
|
|
static void remove_from_stream_map(transport *t, stream *s) { |
|
|
|
|
if (s->id == 0) return; |
|
|
|
|
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", |
|
|
|
|
t->is_client ? "CLI" : "SVR", s->id)); |
|
|
|
|
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { |
|
|
|
|
maybe_start_some_streams(t); |
|
|
|
|
} |
|
|
|
@ -764,6 +789,8 @@ static void unlock(transport *t) { |
|
|
|
|
finalize_cancellations(t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
finish_reads(t); |
|
|
|
|
|
|
|
|
|
/* gather any callbacks that need to be made */ |
|
|
|
|
if (!t->calling_back && cb) { |
|
|
|
|
perform_callbacks = prepare_callbacks(t); |
|
|
|
@ -867,21 +894,24 @@ static int prepare_write(transport *t) { |
|
|
|
|
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && |
|
|
|
|
s->outgoing_window > 0) { |
|
|
|
|
window_delta = grpc_chttp2_preencode( |
|
|
|
|
s->outgoing_sopb.ops, &s->outgoing_sopb.nops, |
|
|
|
|
s->outgoing_sopb->ops, &s->outgoing_sopb->nops, |
|
|
|
|
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); |
|
|
|
|
t->outgoing_window -= window_delta; |
|
|
|
|
s->outgoing_window -= window_delta; |
|
|
|
|
|
|
|
|
|
s->sending_write_closed = |
|
|
|
|
s->queued_write_closed && s->outgoing_sopb.nops == 0; |
|
|
|
|
if (s->writing_sopb.nops > 0 || s->sending_write_closed) { |
|
|
|
|
if (s->write_state == WRITE_STATE_QUEUED_CLOSE && |
|
|
|
|
s->outgoing_sopb->nops == 0) { |
|
|
|
|
s->send_closed = 1; |
|
|
|
|
} |
|
|
|
|
if (s->writing_sopb.nops > 0 || s->send_closed) { |
|
|
|
|
stream_list_join(t, s, WRITING); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* if there are still writes to do and the stream still has window
|
|
|
|
|
available, then schedule a further write */ |
|
|
|
|
if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) { |
|
|
|
|
GPR_ASSERT(!t->outgoing_window); |
|
|
|
|
/* we should either exhaust window or have no ops left, but not both */ |
|
|
|
|
if (s->outgoing_sopb->nops == 0) { |
|
|
|
|
s->outgoing_sopb = NULL; |
|
|
|
|
schedule_cb(t, s->send_done_closure, 1); |
|
|
|
|
} else if (s->outgoing_window) { |
|
|
|
|
stream_list_add_tail(t, s, WRITABLE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -914,10 +944,9 @@ static void finalize_outbuf(transport *t) { |
|
|
|
|
|
|
|
|
|
while ((s = stream_list_remove_head(t, WRITING))) { |
|
|
|
|
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, |
|
|
|
|
s->sending_write_closed, s->id, &t->hpack_compressor, |
|
|
|
|
&t->outbuf); |
|
|
|
|
s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); |
|
|
|
|
s->writing_sopb.nops = 0; |
|
|
|
|
if (s->sending_write_closed) { |
|
|
|
|
if (s->send_closed) { |
|
|
|
|
stream_list_join(t, s, WRITTEN_CLOSED); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -931,8 +960,10 @@ static void finish_write_common(transport *t, int success) { |
|
|
|
|
drop_connection(t); |
|
|
|
|
} |
|
|
|
|
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { |
|
|
|
|
s->sent_write_closed = 1; |
|
|
|
|
if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS); |
|
|
|
|
s->write_state = WRITE_STATE_SENT_CLOSE; |
|
|
|
|
if (1||!s->cancelled) { |
|
|
|
|
maybe_finish_read(t, s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
t->outbuf.count = 0; |
|
|
|
|
t->outbuf.length = 0; |
|
|
|
@ -982,6 +1013,9 @@ static void maybe_start_some_streams(transport *t) { |
|
|
|
|
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); |
|
|
|
|
if (!s) break; |
|
|
|
|
|
|
|
|
|
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", |
|
|
|
|
t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(s->id == 0); |
|
|
|
|
s->id = t->next_stream_id; |
|
|
|
|
t->next_stream_id += 2; |
|
|
|
@ -994,43 +1028,63 @@ static void maybe_start_some_streams(transport *t) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, |
|
|
|
|
size_t ops_count, int is_last) { |
|
|
|
|
transport *t = (transport *)gt; |
|
|
|
|
stream *s = (stream *)gs; |
|
|
|
|
|
|
|
|
|
lock(t); |
|
|
|
|
|
|
|
|
|
if (is_last) { |
|
|
|
|
s->queued_write_closed = 1; |
|
|
|
|
} |
|
|
|
|
if (!s->cancelled) { |
|
|
|
|
grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); |
|
|
|
|
if (s->id == 0) { |
|
|
|
|
stream_list_join(t, s, WAITING_FOR_CONCURRENCY); |
|
|
|
|
maybe_start_some_streams(t); |
|
|
|
|
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
cancel_stream( |
|
|
|
|
t, s, op->cancel_with_status, |
|
|
|
|
grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), |
|
|
|
|
op->cancel_message, 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->send_ops) { |
|
|
|
|
GPR_ASSERT(s->outgoing_sopb == NULL); |
|
|
|
|
s->send_done_closure.cb = op->on_done_send; |
|
|
|
|
s->send_done_closure.user_data = op->send_user_data; |
|
|
|
|
if (!s->cancelled) { |
|
|
|
|
s->outgoing_sopb = op->send_ops; |
|
|
|
|
if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { |
|
|
|
|
s->write_state = WRITE_STATE_QUEUED_CLOSE; |
|
|
|
|
} |
|
|
|
|
if (s->id == 0) { |
|
|
|
|
IF_TRACING(gpr_log(GPR_DEBUG, |
|
|
|
|
"HTTP:%s: New stream %p waiting for concurrency", |
|
|
|
|
t->is_client ? "CLI" : "SVR", s)); |
|
|
|
|
stream_list_join(t, s, WAITING_FOR_CONCURRENCY); |
|
|
|
|
maybe_start_some_streams(t); |
|
|
|
|
} else if (s->outgoing_window > 0) { |
|
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
|
schedule_nuke_sopb(t, op->send_ops); |
|
|
|
|
schedule_cb(t, s->send_done_closure, 0); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); |
|
|
|
|
} |
|
|
|
|
if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed && |
|
|
|
|
!s->published_close) { |
|
|
|
|
stream_list_join(t, s, PENDING_CALLBACKS); |
|
|
|
|
|
|
|
|
|
if (op->recv_ops) { |
|
|
|
|
GPR_ASSERT(s->incoming_sopb == NULL); |
|
|
|
|
s->recv_done_closure.cb = op->on_done_recv; |
|
|
|
|
s->recv_done_closure.user_data = op->recv_user_data; |
|
|
|
|
s->incoming_sopb = op->recv_ops; |
|
|
|
|
s->incoming_sopb->nops = 0; |
|
|
|
|
s->publish_state = op->recv_state; |
|
|
|
|
gpr_free(s->old_incoming_metadata); |
|
|
|
|
s->old_incoming_metadata = NULL; |
|
|
|
|
maybe_finish_read(t, s); |
|
|
|
|
maybe_join_window_updates(t, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
unlock(t); |
|
|
|
|
if (op->bind_pollset) { |
|
|
|
|
add_to_pollset_locked(t, op->bind_pollset); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void abort_stream(grpc_transport *gt, grpc_stream *gs, |
|
|
|
|
grpc_status_code status) { |
|
|
|
|
static void perform_op(grpc_transport *gt, grpc_stream *gs, |
|
|
|
|
grpc_transport_op *op) { |
|
|
|
|
transport *t = (transport *)gt; |
|
|
|
|
stream *s = (stream *)gs; |
|
|
|
|
|
|
|
|
|
lock(t); |
|
|
|
|
cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), |
|
|
|
|
1); |
|
|
|
|
perform_op_locked(t, s, op); |
|
|
|
|
unlock(t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1069,8 +1123,8 @@ static void finalize_cancellations(transport *t) { |
|
|
|
|
|
|
|
|
|
while ((s = stream_list_remove_head(t, CANCELLED))) { |
|
|
|
|
s->read_closed = 1; |
|
|
|
|
s->sent_write_closed = 1; |
|
|
|
|
stream_list_join(t, s, PENDING_CALLBACKS); |
|
|
|
|
s->write_state = WRITE_STATE_SENT_CLOSE; |
|
|
|
|
maybe_finish_read(t, s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1088,18 +1142,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { |
|
|
|
|
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, |
|
|
|
|
grpc_status_code local_status, |
|
|
|
|
grpc_chttp2_error_code error_code, |
|
|
|
|
int send_rst) { |
|
|
|
|
grpc_mdstr *optional_message, int send_rst) { |
|
|
|
|
int had_outgoing; |
|
|
|
|
char buffer[GPR_LTOA_MIN_BUFSIZE]; |
|
|
|
|
|
|
|
|
|
if (s) { |
|
|
|
|
/* clear out any unreported input & output: nobody cares anymore */ |
|
|
|
|
had_outgoing = s->outgoing_sopb.nops != 0; |
|
|
|
|
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; |
|
|
|
|
schedule_nuke_sopb(t, &s->parser.incoming_sopb); |
|
|
|
|
schedule_nuke_sopb(t, &s->outgoing_sopb); |
|
|
|
|
if (s->outgoing_sopb) { |
|
|
|
|
schedule_nuke_sopb(t, s->outgoing_sopb); |
|
|
|
|
s->outgoing_sopb = NULL; |
|
|
|
|
stream_list_remove(t, s, WRITABLE); |
|
|
|
|
schedule_cb(t, s->send_done_closure, 0); |
|
|
|
|
} |
|
|
|
|
if (s->cancelled) { |
|
|
|
|
send_rst = 0; |
|
|
|
|
} else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { |
|
|
|
|
} else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || |
|
|
|
|
had_outgoing) { |
|
|
|
|
s->cancelled = 1; |
|
|
|
|
stream_list_join(t, s, CANCELLED); |
|
|
|
|
|
|
|
|
@ -1107,17 +1167,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, |
|
|
|
|
add_incoming_metadata( |
|
|
|
|
t, s, |
|
|
|
|
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); |
|
|
|
|
switch (local_status) { |
|
|
|
|
case GRPC_STATUS_CANCELLED: |
|
|
|
|
add_incoming_metadata( |
|
|
|
|
t, s, grpc_mdelem_from_strings(t->metadata_context, |
|
|
|
|
"grpc-message", "Cancelled")); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
break; |
|
|
|
|
if (!optional_message) { |
|
|
|
|
switch (local_status) { |
|
|
|
|
case GRPC_STATUS_CANCELLED: |
|
|
|
|
add_incoming_metadata( |
|
|
|
|
t, s, grpc_mdelem_from_strings(t->metadata_context, |
|
|
|
|
"grpc-message", "Cancelled")); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
add_incoming_metadata( |
|
|
|
|
t, s, |
|
|
|
|
grpc_mdelem_from_metadata_strings( |
|
|
|
|
t->metadata_context, |
|
|
|
|
grpc_mdstr_from_string(t->metadata_context, "grpc-message"), |
|
|
|
|
grpc_mdstr_ref(optional_message))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
stream_list_join(t, s, PENDING_CALLBACKS); |
|
|
|
|
add_metadata_batch(t, s); |
|
|
|
|
maybe_finish_read(t, s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!id) send_rst = 0; |
|
|
|
@ -1125,24 +1194,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, |
|
|
|
|
gpr_slice_buffer_add(&t->qbuf, |
|
|
|
|
grpc_chttp2_rst_stream_create(id, error_code)); |
|
|
|
|
} |
|
|
|
|
if (optional_message) { |
|
|
|
|
grpc_mdstr_unref(optional_message); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_stream_id(transport *t, gpr_uint32 id, |
|
|
|
|
grpc_status_code local_status, |
|
|
|
|
grpc_chttp2_error_code error_code, int send_rst) { |
|
|
|
|
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, |
|
|
|
|
send_rst); |
|
|
|
|
NULL, send_rst); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_stream(transport *t, stream *s, |
|
|
|
|
grpc_status_code local_status, |
|
|
|
|
grpc_chttp2_error_code error_code, int send_rst) { |
|
|
|
|
cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst); |
|
|
|
|
grpc_chttp2_error_code error_code, |
|
|
|
|
grpc_mdstr *optional_message, int send_rst) { |
|
|
|
|
cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, |
|
|
|
|
send_rst); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { |
|
|
|
|
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE, |
|
|
|
|
GRPC_CHTTP2_INTERNAL_ERROR, 0); |
|
|
|
|
GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void end_all_the_calls(transport *t) { |
|
|
|
@ -1156,8 +1230,14 @@ static void drop_connection(transport *t) { |
|
|
|
|
end_all_the_calls(t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_finish_read(transport *t, stream *s) { |
|
|
|
|
if (s->incoming_sopb) { |
|
|
|
|
stream_list_join(t, s, FINISHED_READ_OP); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_join_window_updates(transport *t, stream *s) { |
|
|
|
|
if (s->allow_window_updates && |
|
|
|
|
if (s->incoming_sopb != NULL && |
|
|
|
|
s->incoming_window < |
|
|
|
|
t->settings[LOCAL_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * |
|
|
|
@ -1166,21 +1246,6 @@ static void maybe_join_window_updates(transport *t, stream *s) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, |
|
|
|
|
int allow) { |
|
|
|
|
transport *t = (transport *)tp; |
|
|
|
|
stream *s = (stream *)sp; |
|
|
|
|
|
|
|
|
|
lock(t); |
|
|
|
|
s->allow_window_updates = allow; |
|
|
|
|
if (allow) { |
|
|
|
|
maybe_join_window_updates(t, s); |
|
|
|
|
} else { |
|
|
|
|
stream_list_remove(t, s, WINDOW_UPDATE); |
|
|
|
|
} |
|
|
|
|
unlock(t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { |
|
|
|
|
if (t->incoming_frame_size > t->incoming_window) { |
|
|
|
|
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", |
|
|
|
@ -1254,7 +1319,7 @@ static int init_data_frame_parser(transport *t) { |
|
|
|
|
case GRPC_CHTTP2_STREAM_ERROR: |
|
|
|
|
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( |
|
|
|
|
GRPC_CHTTP2_INTERNAL_ERROR), |
|
|
|
|
GRPC_CHTTP2_INTERNAL_ERROR, 1); |
|
|
|
|
GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1); |
|
|
|
|
return init_skip_frame(t, 0); |
|
|
|
|
case GRPC_CHTTP2_CONNECTION_ERROR: |
|
|
|
|
drop_connection(t); |
|
|
|
@ -1273,11 +1338,10 @@ static void on_header(void *tp, grpc_mdelem *md) { |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(s); |
|
|
|
|
|
|
|
|
|
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id, |
|
|
|
|
grpc_mdstr_as_c_string(md->key), |
|
|
|
|
grpc_mdstr_as_c_string(md->value))); |
|
|
|
|
IF_TRACING(gpr_log( |
|
|
|
|
GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR", |
|
|
|
|
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); |
|
|
|
|
|
|
|
|
|
stream_list_join(t, s, PENDING_CALLBACKS); |
|
|
|
|
if (md->key == t->str_grpc_timeout) { |
|
|
|
|
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); |
|
|
|
|
if (!cached_timeout) { |
|
|
|
@ -1296,6 +1360,7 @@ static void on_header(void *tp, grpc_mdelem *md) { |
|
|
|
|
} else { |
|
|
|
|
add_incoming_metadata(t, s, md); |
|
|
|
|
} |
|
|
|
|
maybe_finish_read(t, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int init_header_frame_parser(transport *t, int is_continuation) { |
|
|
|
@ -1333,7 +1398,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"ignoring out of order new stream request on server; last stream " |
|
|
|
|
"id=%d, new stream id=%d", |
|
|
|
|
t->last_incoming_stream_id, t->incoming_stream); |
|
|
|
|
t->last_incoming_stream_id, t->incoming_stream_id); |
|
|
|
|
return init_skip_frame(t, 1); |
|
|
|
|
} else if ((t->incoming_stream_id & 1) == 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); |
|
|
|
|
return init_skip_frame(t, 1); |
|
|
|
|
} |
|
|
|
|
t->incoming_stream = NULL; |
|
|
|
@ -1470,33 +1538,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { |
|
|
|
|
return window + window_update < MAX_WINDOW; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void free_md(void *p, grpc_op_error result) { gpr_free(p); } |
|
|
|
|
|
|
|
|
|
static void add_metadata_batch(transport *t, stream *s) { |
|
|
|
|
grpc_metadata_batch b; |
|
|
|
|
size_t i; |
|
|
|
|
|
|
|
|
|
b.list.head = &s->incoming_metadata[0]; |
|
|
|
|
b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1]; |
|
|
|
|
b.list.head = NULL; |
|
|
|
|
/* Store away the last element of the list, so that in patch_metadata_ops
|
|
|
|
|
we can reconstitute the list. |
|
|
|
|
We can't do list building here as later incoming metadata may reallocate |
|
|
|
|
the underlying array. */ |
|
|
|
|
b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count; |
|
|
|
|
b.garbage.head = b.garbage.tail = NULL; |
|
|
|
|
b.deadline = s->incoming_deadline; |
|
|
|
|
|
|
|
|
|
for (i = 1; i < s->incoming_metadata_count; i++) { |
|
|
|
|
s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1]; |
|
|
|
|
s->incoming_metadata[i - 1].next = &s->incoming_metadata[i]; |
|
|
|
|
} |
|
|
|
|
s->incoming_metadata[0].prev = NULL; |
|
|
|
|
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; |
|
|
|
|
s->incoming_deadline = gpr_inf_future; |
|
|
|
|
|
|
|
|
|
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); |
|
|
|
|
grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, |
|
|
|
|
s->incoming_metadata); |
|
|
|
|
|
|
|
|
|
/* reset */ |
|
|
|
|
s->incoming_deadline = gpr_inf_future; |
|
|
|
|
s->incoming_metadata = NULL; |
|
|
|
|
s->incoming_metadata_count = 0; |
|
|
|
|
s->incoming_metadata_capacity = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
@ -1507,14 +1562,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
|
case GRPC_CHTTP2_PARSE_OK: |
|
|
|
|
if (st.end_of_stream) { |
|
|
|
|
t->incoming_stream->read_closed = 1; |
|
|
|
|
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); |
|
|
|
|
maybe_finish_read(t, t->incoming_stream); |
|
|
|
|
} |
|
|
|
|
if (st.need_flush_reads) { |
|
|
|
|
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); |
|
|
|
|
maybe_finish_read(t, t->incoming_stream); |
|
|
|
|
} |
|
|
|
|
if (st.metadata_boundary) { |
|
|
|
|
add_metadata_batch(t, t->incoming_stream); |
|
|
|
|
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); |
|
|
|
|
maybe_finish_read(t, t->incoming_stream); |
|
|
|
|
} |
|
|
|
|
if (st.ack_settings) { |
|
|
|
|
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); |
|
|
|
@ -1551,11 +1606,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
|
} |
|
|
|
|
if (st.initial_window_update) { |
|
|
|
|
for (i = 0; i < t->stream_map.count; i++) { |
|
|
|
|
stream *s = (stream*)(t->stream_map.values[i]); |
|
|
|
|
stream *s = (stream *)(t->stream_map.values[i]); |
|
|
|
|
int was_window_empty = s->outgoing_window <= 0; |
|
|
|
|
s->outgoing_window += st.initial_window_update; |
|
|
|
|
if (was_window_empty && s->outgoing_window > 0 && |
|
|
|
|
s->outgoing_sopb.nops > 0) { |
|
|
|
|
if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && |
|
|
|
|
s->outgoing_sopb->nops > 0) { |
|
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1569,12 +1624,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
|
if (!is_window_update_legal(st.window_update, s->outgoing_window)) { |
|
|
|
|
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( |
|
|
|
|
GRPC_CHTTP2_FLOW_CONTROL_ERROR), |
|
|
|
|
GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1); |
|
|
|
|
GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); |
|
|
|
|
} else { |
|
|
|
|
s->outgoing_window += st.window_update; |
|
|
|
|
/* if this window update makes outgoing ops writable again,
|
|
|
|
|
flag that */ |
|
|
|
|
if (was_window_empty && s->outgoing_sopb.nops) { |
|
|
|
|
if (was_window_empty && s->outgoing_sopb && |
|
|
|
|
s->outgoing_sopb->nops > 0) { |
|
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1836,53 +1892,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, |
|
|
|
|
return GRPC_STREAM_OPEN; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int prepare_callbacks(transport *t) { |
|
|
|
|
stream *s; |
|
|
|
|
int n = 0; |
|
|
|
|
while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { |
|
|
|
|
int execute = 1; |
|
|
|
|
|
|
|
|
|
s->callback_state = compute_state(s->sent_write_closed, s->read_closed); |
|
|
|
|
if (s->callback_state == GRPC_STREAM_CLOSED) { |
|
|
|
|
remove_from_stream_map(t, s); |
|
|
|
|
if (s->published_close) { |
|
|
|
|
execute = 0; |
|
|
|
|
} else if (s->incoming_metadata_count) { |
|
|
|
|
add_metadata_batch(t, s); |
|
|
|
|
} |
|
|
|
|
s->published_close = 1; |
|
|
|
|
static void patch_metadata_ops(stream *s) { |
|
|
|
|
grpc_stream_op *ops = s->incoming_sopb->ops; |
|
|
|
|
size_t nops = s->incoming_sopb->nops; |
|
|
|
|
size_t i; |
|
|
|
|
size_t j; |
|
|
|
|
size_t mdidx = 0; |
|
|
|
|
size_t last_mdidx; |
|
|
|
|
int found_metadata = 0; |
|
|
|
|
|
|
|
|
|
/* rework the array of metadata into a linked list, making use
|
|
|
|
|
of the breadcrumbs we left in metadata batches during
|
|
|
|
|
add_metadata_batch */ |
|
|
|
|
for (i = 0; i < nops; i++) { |
|
|
|
|
grpc_stream_op *op = &ops[i]; |
|
|
|
|
if (op->type != GRPC_OP_METADATA) continue; |
|
|
|
|
found_metadata = 1; |
|
|
|
|
/* we left a breadcrumb indicating where the end of this list is,
|
|
|
|
|
and since we add sequentially, we know from the end of the last |
|
|
|
|
segment where this segment begins */ |
|
|
|
|
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); |
|
|
|
|
GPR_ASSERT(last_mdidx > mdidx); |
|
|
|
|
GPR_ASSERT(last_mdidx <= s->incoming_metadata_count); |
|
|
|
|
/* turn the array into a doubly linked list */ |
|
|
|
|
op->data.metadata.list.head = &s->incoming_metadata[mdidx]; |
|
|
|
|
op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; |
|
|
|
|
for (j = mdidx + 1; j < last_mdidx; j++) { |
|
|
|
|
s->incoming_metadata[j].prev = &s->incoming_metadata[j-1]; |
|
|
|
|
s->incoming_metadata[j-1].next = &s->incoming_metadata[j]; |
|
|
|
|
} |
|
|
|
|
s->incoming_metadata[mdidx].prev = NULL; |
|
|
|
|
s->incoming_metadata[last_mdidx-1].next = NULL; |
|
|
|
|
/* track where we're up to */ |
|
|
|
|
mdidx = last_mdidx; |
|
|
|
|
} |
|
|
|
|
if (found_metadata) { |
|
|
|
|
s->old_incoming_metadata = s->incoming_metadata; |
|
|
|
|
if (mdidx != s->incoming_metadata_count) { |
|
|
|
|
/* we have a partially read metadata batch still in incoming_metadata */ |
|
|
|
|
size_t new_count = s->incoming_metadata_count - mdidx; |
|
|
|
|
size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; |
|
|
|
|
GPR_ASSERT(mdidx < s->incoming_metadata_count); |
|
|
|
|
s->incoming_metadata = gpr_malloc(copy_bytes); |
|
|
|
|
memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes); |
|
|
|
|
s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; |
|
|
|
|
} else { |
|
|
|
|
s->incoming_metadata = NULL; |
|
|
|
|
s->incoming_metadata_count = 0; |
|
|
|
|
s->incoming_metadata_capacity = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); |
|
|
|
|
static void finish_reads(transport *t) { |
|
|
|
|
stream *s; |
|
|
|
|
|
|
|
|
|
if (execute) { |
|
|
|
|
stream_list_add_tail(t, s, EXECUTING_CALLBACKS); |
|
|
|
|
n = 1; |
|
|
|
|
while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { |
|
|
|
|
int publish = 0; |
|
|
|
|
GPR_ASSERT(s->incoming_sopb); |
|
|
|
|
*s->publish_state = |
|
|
|
|
compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); |
|
|
|
|
if (*s->publish_state != s->published_state) { |
|
|
|
|
s->published_state = *s->publish_state; |
|
|
|
|
publish = 1; |
|
|
|
|
if (s->published_state == GRPC_STREAM_CLOSED) { |
|
|
|
|
remove_from_stream_map(t, s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (s->parser.incoming_sopb.nops > 0) { |
|
|
|
|
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); |
|
|
|
|
publish = 1; |
|
|
|
|
} |
|
|
|
|
if (publish) { |
|
|
|
|
if (s->incoming_metadata_count > 0) { |
|
|
|
|
patch_metadata_ops(s); |
|
|
|
|
} |
|
|
|
|
s->incoming_sopb = NULL; |
|
|
|
|
schedule_cb(t, s->recv_done_closure, 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return n; |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void schedule_cb(transport *t, op_closure closure, int success) { |
|
|
|
|
if (t->pending_callbacks.capacity == t->pending_callbacks.count) { |
|
|
|
|
t->pending_callbacks.capacity = |
|
|
|
|
GPR_MAX(t->pending_callbacks.capacity * 2, 8); |
|
|
|
|
t->pending_callbacks.callbacks = |
|
|
|
|
gpr_realloc(t->pending_callbacks.callbacks, |
|
|
|
|
t->pending_callbacks.capacity * |
|
|
|
|
sizeof(*t->pending_callbacks.callbacks)); |
|
|
|
|
} |
|
|
|
|
closure.success = success; |
|
|
|
|
t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int prepare_callbacks(transport *t) { |
|
|
|
|
op_closure_array temp = t->pending_callbacks; |
|
|
|
|
t->pending_callbacks = t->executing_callbacks; |
|
|
|
|
t->executing_callbacks = temp; |
|
|
|
|
return t->executing_callbacks.count > 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { |
|
|
|
|
stream *s; |
|
|
|
|
while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { |
|
|
|
|
size_t nops = s->callback_sopb.nops; |
|
|
|
|
s->callback_sopb.nops = 0; |
|
|
|
|
cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, |
|
|
|
|
s->callback_sopb.ops, nops, s->callback_state); |
|
|
|
|
size_t i; |
|
|
|
|
for (i = 0; i < t->executing_callbacks.count; i++) { |
|
|
|
|
op_closure c = t->executing_callbacks.callbacks[i]; |
|
|
|
|
c.cb(c.user_data, c.success); |
|
|
|
|
} |
|
|
|
|
t->executing_callbacks.count = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { |
|
|
|
|
cb->closed(t->cb_user_data, &t->base); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { |
|
|
|
|
transport *t = (transport *)gt; |
|
|
|
|
lock(t); |
|
|
|
|
/*
|
|
|
|
|
* POLLSET STUFF |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) { |
|
|
|
|
if (t->ep) { |
|
|
|
|
grpc_endpoint_add_to_pollset(t->ep, pollset); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { |
|
|
|
|
transport *t = (transport *)gt; |
|
|
|
|
lock(t); |
|
|
|
|
add_to_pollset_locked(t, pollset); |
|
|
|
|
unlock(t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1891,9 +2029,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static const grpc_transport_vtable vtable = { |
|
|
|
|
sizeof(stream), init_stream, send_batch, set_allow_window_updates, |
|
|
|
|
add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, |
|
|
|
|
send_ping, destroy_transport}; |
|
|
|
|
sizeof(stream), init_stream, perform_op, |
|
|
|
|
add_to_pollset, destroy_stream, goaway, |
|
|
|
|
close_transport, send_ping, destroy_transport}; |
|
|
|
|
|
|
|
|
|
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, |
|
|
|
|
void *arg, |
|
|
|
|