Rename some things

pull/2149/head
Craig Tiller 10 years ago
parent 9b8671c739
commit b084d90151
  1. 54
      src/core/transport/chttp2/internal.h
  2. 292
      src/core/transport/chttp2_transport.c

@ -3,8 +3,8 @@
#include "src/core/transport/transport_impl.h"
typedef struct transport transport;
typedef struct stream stream;
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
@ -31,7 +31,7 @@ typedef enum {
OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
NEW_OUTGOING_WINDOW,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
} grpc_chttp2_stream_list_id;
/* deframer state for the overall http2 stream of bytes */
typedef enum {
@ -74,35 +74,35 @@ typedef enum {
DTS_FH_8,
/* inside a http2 frame */
DTS_FRAME
} deframe_transport_state;
} grpc_chttp2_deframe_transport_state;
typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
} write_state;
} grpc_chttp2_write_state;
typedef enum {
DONT_SEND_CLOSED = 0,
SEND_CLOSED,
SEND_CLOSED_WITH_RST_STREAM
} send_closed;
} grpc_chttp2_send_closed;
typedef struct {
stream *head;
stream *tail;
} stream_list;
grpc_chttp2_stream *head;
grpc_chttp2_stream *tail;
} grpc_chttp2_stream_list;
typedef struct {
stream *next;
stream *prev;
} stream_link;
grpc_chttp2_stream *next;
grpc_chttp2_stream *prev;
} grpc_chttp2_stream_link;
typedef enum {
ERROR_STATE_NONE,
ERROR_STATE_SEEN,
ERROR_STATE_NOTIFIED
} error_state;
} grpc_chttp2_error_state;
/* We keep several sets of connection wide parameters */
typedef enum {
@ -115,21 +115,21 @@ typedef enum {
/* The settings the peer has acked */
ACKED_SETTINGS,
NUM_SETTING_SETS
} setting_set;
} grpc_chttp2_setting_set;
/* Outstanding ping request data */
typedef struct {
gpr_uint8 id[8];
void (*cb)(void *user_data);
void *user_data;
} outstanding_ping;
} grpc_chttp2_outstanding_ping;
typedef struct {
grpc_status_code status;
gpr_slice debug;
} pending_goaway;
} grpc_chttp2_pending_goaway;
struct transport {
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
grpc_mdctx *metadata_context;
@ -145,7 +145,7 @@ struct transport {
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
error_state error_state;
grpc_chttp2_error_state error_state;
/* stream indexing */
gpr_uint32 next_stream_id;
@ -164,7 +164,7 @@ struct transport {
gpr_uint32 connection_window_target;
/* deframing */
deframe_transport_state deframe_state;
grpc_chttp2_deframe_transport_state deframe_state;
gpr_uint8 incoming_frame_type;
gpr_uint8 incoming_frame_flags;
gpr_uint8 header_eof;
@ -173,7 +173,7 @@ struct transport {
gpr_uint32 incoming_stream_id;
/* goaway */
pending_goaway *pending_goaways;
grpc_chttp2_pending_goaway *pending_goaways;
size_t num_pending_goaways;
size_t cap_pending_goaways;
@ -185,16 +185,16 @@ struct transport {
/* active parser */
void *parser_data;
stream *incoming_stream;
grpc_chttp2_stream *incoming_stream;
grpc_chttp2_parse_error (*parser)(void *parser_user_data,
grpc_chttp2_parse_state *state,
gpr_slice slice, int is_last);
stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_map stream_map;
/* pings */
outstanding_ping *pings;
grpc_chttp2_outstanding_ping *pings;
size_t ping_count;
size_t ping_capacity;
gpr_int64 ping_counter;
@ -252,7 +252,7 @@ struct transport {
} channel_callback;
};
struct stream {
struct grpc_chttp2_stream {
struct {
grpc_iomgr_closure *send_done_closure;
grpc_iomgr_closure *recv_done_closure;
@ -262,7 +262,7 @@ struct stream {
/* sops that have passed flow control to be written */
grpc_stream_op_buffer sopb;
/* how strongly should we indicate closure with the next write */
send_closed send_closed;
grpc_chttp2_send_closed send_closed;
} writing;
struct {
@ -277,11 +277,11 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
write_state write_state;
grpc_chttp2_write_state write_state;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
stream_link links[STREAM_LIST_COUNT];
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
/* incoming metadata */

@ -84,52 +84,52 @@ int grpc_flowctl_trace = 0;
static const grpc_transport_vtable vtable;
static void push_setting(transport *t, grpc_chttp2_setting_id id,
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
static void lock(transport *t);
static void unlock(transport *t);
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
static void unlock_check_writes(transport* t);
static void unlock_check_cancellations(transport* t);
static void unlock_check_parser(transport* t);
static void unlock_check_channel_callbacks(transport* t);
static void unlock_check_writes(grpc_chttp2_transport* t);
static void unlock_check_cancellations(grpc_chttp2_transport* t);
static void unlock_check_parser(grpc_chttp2_transport* t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport* t);
static void writing_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
static void drop_connection(transport *t);
static void end_all_the_calls(transport *t);
static void drop_connection(grpc_chttp2_transport *t);
static void end_all_the_calls(grpc_chttp2_transport *t);
static stream *stream_list_remove_head(transport *t, stream_list_id id);
static void stream_list_remove(transport *t, stream *s, stream_list_id id);
static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
static void stream_list_join(transport *t, stream *s, stream_list_id id);
static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void cancel_stream_id(transport *t, gpr_uint32 id,
static void cancel_stream_id(grpc_chttp2_transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
static void maybe_start_some_streams(transport *t);
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id);
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
static void maybe_start_some_streams(grpc_chttp2_transport *t);
static void parsing_become_skip_parser(transport *t);
static void parsing_become_skip_parser(grpc_chttp2_transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success);
static void maybe_finish_read(transport *t, stream *s, int is_parser);
static void maybe_join_window_updates(transport *t, stream *s);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void add_metadata_batch(transport *t, stream *s);
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success);
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser);
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset);
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op);
static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
static void flowctl_trace(grpc_chttp2_transport *t, const char *flow, gpr_int32 window,
gpr_uint32 id, gpr_int32 delta) {
gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
delta, window + delta);
@ -139,7 +139,7 @@ static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
static void destruct_transport(transport *t) {
static void destruct_transport(grpc_chttp2_transport *t) {
size_t i;
gpr_mu_lock(&t->mu);
@ -189,14 +189,14 @@ static void destruct_transport(transport *t) {
gpr_free(t);
}
static void unref_transport(transport *t) {
static void unref_transport(grpc_chttp2_transport *t) {
if (!gpr_unref(&t->refs)) return;
destruct_transport(t);
}
static void ref_transport(transport *t) { gpr_ref(&t->refs); }
static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(transport *t, grpc_transport_setup_callback setup,
static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callback setup,
void *arg, const grpc_channel_args *channel_args,
grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
grpc_mdctx *mdctx, int is_client) {
@ -322,7 +322,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
}
static void destroy_transport(grpc_transport *gt) {
transport *t = (transport *)gt;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
t->destroying = 1;
@ -338,7 +338,7 @@ static void destroy_transport(grpc_transport *gt) {
drop_connection(t);
unlock(t);
/* The drop_connection() above puts the transport into an error state, and
/* The drop_connection() above puts the grpc_chttp2_transport into an error state, and
the follow-up unlock should then (as part of the cleanup work it does)
ensure that cb is NULL, and therefore not call back anything further.
This check validates this very subtle behavior.
@ -351,7 +351,7 @@ static void destroy_transport(grpc_transport *gt) {
unref_transport(t);
}
static void close_transport_locked(transport *t) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
if (t->ep) {
@ -361,7 +361,7 @@ static void close_transport_locked(transport *t) {
}
static void close_transport(grpc_transport *gt) {
transport *t = (transport *)gt;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
gpr_mu_lock(&t->mu);
close_transport_locked(t);
gpr_mu_unlock(&t->mu);
@ -369,7 +369,7 @@ static void close_transport(grpc_transport *gt) {
static void goaway(grpc_transport *gt, grpc_status_code status,
gpr_slice debug_data) {
transport *t = (transport *)gt;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
grpc_chttp2_goaway_append(t->last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(status),
@ -379,8 +379,8 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data, grpc_transport_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
memset(s, 0, sizeof(*s));
@ -414,14 +414,14 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
return 0;
}
static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
static void schedule_nuke_sopb(grpc_chttp2_transport *t, grpc_stream_op_buffer *sopb) {
grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
sopb->nops = 0;
}
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
size_t i;
gpr_mu_lock(&t->mu);
@ -452,14 +452,14 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
static int stream_list_empty(transport *t, stream_list_id id) {
static int stream_list_empty(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
static stream *stream_list_remove_head(transport *t, stream_list_id id) {
stream *s = t->lists[id].head;
static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
stream *new_head = s->links[id].next;
grpc_chttp2_stream *new_head = s->links[id].next;
GPR_ASSERT(s->included[id]);
if (new_head) {
t->lists[id].head = new_head;
@ -473,7 +473,7 @@ static stream *stream_list_remove_head(transport *t, stream_list_id id) {
return s;
}
static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
s->included[id] = 0;
if (s->links[id].prev) {
@ -489,8 +489,8 @@ static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
}
}
static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
stream *old_tail;
static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *old_tail;
GPR_ASSERT(!s->included[id]);
old_tail = t->lists[id].tail;
s->links[id].next = NULL;
@ -505,16 +505,16 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
s->included[id] = 1;
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
stream_list_add_tail(t, s, id);
}
static void remove_from_stream_map(transport *t, stream *s) {
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (s->id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d",
t->is_client ? "CLI" : "SVR", s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
@ -525,14 +525,14 @@ static void remove_from_stream_map(transport *t, stream *s) {
* LOCK MANAGEMENT
*/
/* We take a transport-global lock in response to calls coming in from above,
/* We take a grpc_chttp2_transport-global lock in response to calls coming in from above,
and in response to data being received from below. New data to be written
is always queued, as are callbacks to process data. During unlock() we
check our todo lists and initiate callbacks and flush writes. */
static void lock(transport *t) { gpr_mu_lock(&t->mu); }
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(transport *t) {
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
unlock_check_writes(t);
@ -556,7 +556,7 @@ static void unlock(transport *t) {
* OUTPUT PROCESSING
*/
static void push_setting(transport *t, grpc_chttp2_setting_id id,
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
@ -571,8 +571,8 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
}
}
static void unlock_check_writes(transport *t) {
stream *s;
static void unlock_check_writes(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
gpr_uint32 window_delta;
/* don't do anything if we are already writing */
@ -594,7 +594,7 @@ static void unlock_check_writes(transport *t) {
t->sent_local_settings = 1;
}
/* for each stream that's become writable, frame it's data (according to
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
@ -628,7 +628,7 @@ static void unlock_check_writes(transport *t) {
}
if (!t->parsing.executing) {
/* for each stream that wants to update its window, add that window here */
/* for each grpc_chttp2_stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
@ -641,7 +641,7 @@ static void unlock_check_writes(transport *t) {
}
}
/* if the transport is ready to send a window update, do so here also */
/* if the grpc_chttp2_transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->writing.outbuf,
@ -659,8 +659,8 @@ static void unlock_check_writes(transport *t) {
}
}
static void writing_finalize_outbuf(transport *t) {
stream *s;
static void writing_finalize_outbuf(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing.sopb.ops, s->writing.sopb.nops,
@ -677,8 +677,8 @@ static void writing_finalize_outbuf(transport *t) {
}
}
static void writing_finish(transport *t, int success) {
stream *s;
static void writing_finish(grpc_chttp2_transport *t, int success) {
grpc_chttp2_stream *s;
lock(t);
if (!success) {
@ -710,12 +710,12 @@ static void writing_finish(transport *t, int success) {
}
static void writing_finish_write_cb(void *tp, grpc_endpoint_cb_status error) {
transport *t = tp;
grpc_chttp2_transport *t = tp;
writing_finish(t, error == GRPC_ENDPOINT_CB_OK);
}
static void writing_action(void *gt, int iomgr_success_ignored) {
transport *t = gt;
grpc_chttp2_transport *t = gt;
gpr_log(GPR_DEBUG, "writing_action");
@ -736,12 +736,12 @@ static void writing_action(void *gt, int iomgr_success_ignored) {
}
}
static void add_goaway(transport *t, gpr_uint32 goaway_error,
static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
if (t->num_pending_goaways == t->cap_pending_goaways) {
t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
t->pending_goaways = gpr_realloc(
t->pending_goaways, sizeof(pending_goaway) * t->cap_pending_goaways);
t->pending_goaways, sizeof(grpc_chttp2_pending_goaway) * t->cap_pending_goaways);
}
t->pending_goaways[t->num_pending_goaways].status =
grpc_chttp2_http2_error_to_grpc_status(goaway_error);
@ -749,16 +749,16 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error,
t->num_pending_goaways++;
}
static void maybe_start_some_streams(transport *t) {
/* start streams where we have free stream ids and free concurrency */
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
/* start streams where we have free grpc_chttp2_stream ids and free concurrency */
while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
if (t->next_stream_id == MAX_CLIENT_STREAM_ID) {
@ -779,7 +779,7 @@ static void maybe_start_some_streams(transport *t) {
}
/* cancel out streams that will never be started */
while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) return;
cancel_stream(
@ -789,7 +789,7 @@ static void maybe_start_some_streams(transport *t) {
}
}
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
t, s, op->cancel_with_status,
@ -807,7 +807,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
}
if (s->id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New stream %p waiting for concurrency",
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
t->is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
@ -844,8 +844,8 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
lock(t);
perform_op_locked(t, s, op);
@ -854,14 +854,14 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs,
static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
void *user_data) {
transport *t = (transport *)gt;
outstanding_ping *p;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_outstanding_ping *p;
lock(t);
if (t->ping_capacity == t->ping_count) {
t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
t->pings =
gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
gpr_realloc(t->pings, sizeof(grpc_chttp2_outstanding_ping) * t->ping_capacity);
}
p = &t->pings[t->ping_count++];
p->id[0] = (t->ping_counter >> 56) & 0xff;
@ -882,8 +882,8 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
* INPUT PROCESSING
*/
static void unlock_check_cancellations(transport *t) {
stream *s;
static void unlock_check_cancellations(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
if (t->writing.executing) {
return;
@ -896,7 +896,7 @@ static void unlock_check_cancellations(transport *t) {
}
}
static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void add_incoming_metadata(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_mdelem *elem) {
if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
s->incoming_metadata_capacity =
GPR_MAX(8, 2 * s->incoming_metadata_capacity);
@ -907,7 +907,7 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
s->incoming_metadata[s->incoming_metadata_count++].md = elem;
}
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst,
@ -973,7 +973,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
}
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
static void cancel_stream_id(grpc_chttp2_transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
lock(t);
@ -982,7 +982,7 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
unlock(t);
}
static void cancel_stream(transport *t, stream *s,
static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
@ -990,16 +990,16 @@ static void cancel_stream(transport *t, stream *s,
send_rst, 0);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_stream) {
cancel_stream(user_data, grpc_chttp2_stream, GRPC_STATUS_UNAVAILABLE,
GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
static void end_all_the_calls(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
}
static void drop_connection(transport *t) {
static void drop_connection(grpc_chttp2_transport *t) {
if (t->error_state == ERROR_STATE_NONE) {
t->error_state = ERROR_STATE_SEEN;
}
@ -1007,7 +1007,7 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
static void maybe_finish_read(transport *t, stream *s, int is_parser) {
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser) {
if (is_parser) {
stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE);
} else if (s->incoming_sopb) {
@ -1015,7 +1015,7 @@ static void maybe_finish_read(transport *t, stream *s, int is_parser) {
}
}
static void maybe_join_window_updates(transport *t, stream *s) {
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (t->parsing.executing) {
stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return;
@ -1029,7 +1029,7 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (t->incoming_frame_size > t->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
t->incoming_frame_size, t->incoming_window);
@ -1047,13 +1047,13 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size;
/* if the stream incoming window is getting low, schedule an update */
/* if the grpc_chttp2_stream incoming window is getting low, schedule an update */
stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return GRPC_CHTTP2_PARSE_OK;
}
static stream *lookup_stream(transport *t, gpr_uint32 id) {
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
@ -1065,7 +1065,7 @@ static grpc_chttp2_parse_error skip_parser(void *parser,
static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
static int parsing_init_skip_frame(transport *t, int is_header) {
static int parsing_init_skip_frame(grpc_chttp2_transport *t, int is_header) {
if (is_header) {
int is_eoh = t->expect_continuation_stream_id != 0;
t->parser = grpc_chttp2_header_parser_parse;
@ -1080,12 +1080,12 @@ static int parsing_init_skip_frame(transport *t, int is_header) {
return 1;
}
static void parsing_become_skip_parser(transport *t) {
static void parsing_become_skip_parser(grpc_chttp2_transport *t) {
parsing_init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
}
static int parsing_init_data_frame_parser(transport *t) {
stream *s = lookup_stream(t, t->incoming_stream_id);
static int parsing_init_data_frame_parser(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id);
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
if (!s || s->read_closed) return parsing_init_skip_frame(t, 0);
if (err == GRPC_CHTTP2_PARSE_OK) {
@ -1118,8 +1118,8 @@ static int parsing_init_data_frame_parser(transport *t) {
static void free_timeout(void *p) { gpr_free(p); }
static void parsing_on_header(void *tp, grpc_mdelem *md) {
transport *t = tp;
stream *s = t->incoming_stream;
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s = t->incoming_stream;
GPR_ASSERT(s);
@ -1148,10 +1148,10 @@ static void parsing_on_header(void *tp, grpc_mdelem *md) {
maybe_finish_read(t, s, 1);
}
static int parsing_init_header_frame_parser(transport *t, int is_continuation) {
static int parsing_init_header_frame_parser(grpc_chttp2_transport *t, int is_continuation) {
int is_eoh =
(t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
stream *s;
grpc_chttp2_stream *s;
if (is_eoh) {
t->expect_continuation_stream_id = 0;
@ -1164,46 +1164,46 @@ static int parsing_init_header_frame_parser(transport *t, int is_continuation) {
(t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
}
/* could be a new stream or an existing stream */
/* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
s = lookup_stream(t, t->incoming_stream_id);
if (!s) {
if (is_continuation) {
gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
return parsing_init_skip_frame(t, 1);
}
if (t->is_client) {
if ((t->incoming_stream_id & 1) &&
t->incoming_stream_id < t->next_stream_id) {
/* this is an old (probably cancelled) stream */
/* this is an old (probably cancelled) grpc_chttp2_stream */
} else {
gpr_log(GPR_ERROR, "ignoring new stream creation on client");
gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
}
return parsing_init_skip_frame(t, 1);
} else if (t->last_incoming_stream_id > t->incoming_stream_id) {
gpr_log(GPR_ERROR,
"ignoring out of order new stream request on server; last stream "
"id=%d, new stream id=%d",
"ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
"id=%d, new grpc_chttp2_stream id=%d",
t->last_incoming_stream_id, t->incoming_stream_id);
return parsing_init_skip_frame(t, 1);
} else if ((t->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d",
gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
t->incoming_stream_id);
return parsing_init_skip_frame(t, 1);
}
t->incoming_stream = NULL;
/* if stream is accepted, we set incoming_stream in init_stream */
/* if grpc_chttp2_stream is accepted, we set incoming_stream in init_stream */
t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base,
(void *)(gpr_uintptr)t->incoming_stream_id);
s = t->incoming_stream;
if (!s) {
gpr_log(GPR_ERROR, "stream not accepted");
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
return parsing_init_skip_frame(t, 1);
}
} else {
t->incoming_stream = s;
}
if (t->incoming_stream->read_closed) {
gpr_log(GPR_ERROR, "skipping already closed stream header");
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
t->incoming_stream = NULL;
return parsing_init_skip_frame(t, 1);
}
@ -1220,7 +1220,7 @@ static int parsing_init_header_frame_parser(transport *t, int is_continuation) {
return 1;
}
static int parsing_init_window_update_frame_parser(transport *t) {
static int parsing_init_window_update_frame_parser(grpc_chttp2_transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
&t->parsing.simple.window_update,
t->incoming_frame_size,
@ -1233,7 +1233,7 @@ static int parsing_init_window_update_frame_parser(transport *t) {
return ok;
}
static int parsing_init_ping_parser(transport *t) {
static int parsing_init_ping_parser(grpc_chttp2_transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_ping_parser_begin_frame(&t->parsing.simple.ping,
t->incoming_frame_size,
@ -1246,7 +1246,7 @@ static int parsing_init_ping_parser(transport *t) {
return ok;
}
static int parsing_init_rst_stream_parser(transport *t) {
static int parsing_init_rst_stream_parser(grpc_chttp2_transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
&t->parsing.simple.rst_stream,
t->incoming_frame_size,
@ -1259,7 +1259,7 @@ static int parsing_init_rst_stream_parser(transport *t) {
return ok;
}
static int parsing_init_goaway_parser(transport *t) {
static int parsing_init_goaway_parser(grpc_chttp2_transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_goaway_parser_begin_frame(
@ -1272,11 +1272,11 @@ static int parsing_init_goaway_parser(transport *t) {
return ok;
}
static int parsing_init_settings_frame_parser(transport *t) {
static int parsing_init_settings_frame_parser(grpc_chttp2_transport *t) {
int ok;
if (t->incoming_stream_id != 0) {
gpr_log(GPR_ERROR, "settings frame received for stream %d", t->incoming_stream_id);
gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", t->incoming_stream_id);
drop_connection(t);
return 0;
}
@ -1298,7 +1298,7 @@ static int parsing_init_settings_frame_parser(transport *t) {
return ok;
}
static int init_frame_parser(transport *t) {
static int init_frame_parser(grpc_chttp2_transport *t) {
if (t->expect_continuation_stream_id != 0) {
if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
@ -1307,7 +1307,7 @@ static int init_frame_parser(transport *t) {
}
if (t->expect_continuation_stream_id != t->incoming_stream_id) {
gpr_log(GPR_ERROR,
"Expected CONTINUATION frame for stream %08x, got stream %08x",
"Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x",
t->expect_continuation_stream_id, t->incoming_stream_id);
return 0;
}
@ -1343,7 +1343,7 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
}
*/
static void add_metadata_batch(transport *t, stream *s) {
static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
grpc_metadata_batch b;
b.list.head = NULL;
@ -1359,7 +1359,7 @@ static void add_metadata_batch(transport *t, stream *s) {
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
}
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
static int parse_frame_slice(grpc_chttp2_transport *t, gpr_slice slice, int is_last) {
grpc_chttp2_parse_state st;
size_t i;
memset(&st, 0, sizeof(st));
@ -1399,7 +1399,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
memcmp(t->pings[i].id, t->parsing.simple.ping.opaque_8bytes, 8)) {
t->pings[i].cb(t->pings[i].user_data);
memmove(&t->pings[i], &t->pings[i + 1],
(t->ping_count - i - 1) * sizeof(outstanding_ping));
(t->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping));
t->ping_count--;
break;
}
@ -1407,21 +1407,21 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]);
grpc_chttp2_stream *s = (grpc_chttp2_stream *)(t->stream_map.values[i]);
s->outgoing_window_update += st.initial_window_update;
stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
}
if (st.window_update) {
if (t->incoming_stream_id) {
/* if there was a stream id, this is for some stream */
stream *s = lookup_stream(t, t->incoming_stream_id);
/* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */
grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id);
if (s) {
s->outgoing_window_update += st.window_update;
stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
} else {
/* transport level window update */
/* grpc_chttp2_transport level window update */
t->outgoing_window_update += st.window_update;
}
}
@ -1442,7 +1442,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
return 0;
}
static int process_read(transport *t, gpr_slice slice) {
static int process_read(grpc_chttp2_transport *t, gpr_slice slice) {
gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
@ -1564,12 +1564,12 @@ static int process_read(transport *t, gpr_slice slice) {
if (!init_frame_parser(t)) {
return 0;
}
/* t->last_incoming_stream_id is used as last-stream-id when
/* t->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
sending GOAWAY frame.
https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
says that last-stream-id is peer-initiated stream ID. So,
says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So,
since we don't have server pushed streams, client should send
GOAWAY last-stream-id=0 in this case. */
GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
if (!t->is_client) {
t->last_incoming_stream_id = t->incoming_stream_id;
}
@ -1626,8 +1626,8 @@ static int process_read(transport *t, gpr_slice slice) {
/* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
transport *t = tp;
stream *s;
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s;
size_t i;
int keep_reading = 0;
@ -1705,7 +1705,7 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
static void patch_metadata_ops(stream *s) {
static void patch_metadata_ops(grpc_chttp2_stream *s) {
grpc_stream_op *ops = s->incoming_sopb->ops;
size_t nops = s->incoming_sopb->nops;
size_t i;
@ -1758,8 +1758,8 @@ static void patch_metadata_ops(stream *s) {
}
}
static void unlock_check_parser(transport *t) {
stream *s;
static void unlock_check_parser(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
if (t->parsing.executing) {
return;
@ -1792,8 +1792,8 @@ static void unlock_check_parser(transport *t) {
}
typedef struct {
transport *t;
pending_goaway *goaways;
grpc_chttp2_transport *t;
grpc_chttp2_pending_goaway *goaways;
size_t num_goaways;
grpc_iomgr_closure closure;
} notify_goaways_args;
@ -1801,7 +1801,7 @@ typedef struct {
static void notify_goaways(void *p, int iomgr_success_ignored) {
size_t i;
notify_goaways_args *a = p;
transport *t = a->t;
grpc_chttp2_transport *t = a->t;
for (i = 0; i < a->num_goaways; i++) {
t->channel_callback.cb->goaway(
@ -1821,7 +1821,7 @@ static void notify_goaways(void *p, int iomgr_success_ignored) {
unref_transport(t);
}
static void unlock_check_channel_callbacks(transport *t) {
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
if (t->channel_callback.executing) {
return;
}
@ -1853,7 +1853,7 @@ static void unlock_check_channel_callbacks(transport *t) {
}
static void notify_closed(void *gt, int iomgr_success_ignored) {
transport *t = gt;
grpc_chttp2_transport *t = gt;
t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
lock(t);
@ -1863,7 +1863,7 @@ static void notify_closed(void *gt, int iomgr_success_ignored) {
unref_transport(t);
}
static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success) {
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success) {
closure->success = success;
closure->next = t->global.pending_closures;
t->global.pending_closures = closure;
@ -1873,14 +1873,14 @@ static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success)
* POLLSET STUFF
*/
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
}
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
transport *t = (transport *)gt;
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
add_to_pollset_locked(t, pollset);
unlock(t);
@ -1891,7 +1891,7 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
sizeof(stream), init_stream, perform_op,
sizeof(grpc_chttp2_stream), init_stream, perform_op,
add_to_pollset, destroy_stream, goaway,
close_transport, send_ping, destroy_transport};
@ -1901,7 +1901,7 @@ void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_mdctx *mdctx,
int is_client) {
transport *t = gpr_malloc(sizeof(transport));
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
is_client);
}

Loading…
Cancel
Save