Moving chttp2 to new transport interface

pull/2303/head
Craig Tiller 10 years ago
parent 13ec8719ca
commit 1064f8b97e
  1. 8
      src/core/channel/client_channel.h
  2. 8
      src/core/channel/connected_channel.c
  3. 4
      src/core/channel/connected_channel.h
  4. 15
      src/core/surface/server.c
  5. 8
      src/core/surface/server.h
  6. 18
      src/core/surface/server_chttp2.c
  7. 36
      src/core/transport/chttp2/internal.h
  8. 229
      src/core/transport/chttp2_transport.c
  9. 9
      src/core/transport/chttp2_transport.h
  10. 18
      src/core/transport/transport.c
  11. 47
      src/core/transport/transport.h
  12. 3
      src/core/transport/transport_impl.h

@ -52,12 +52,4 @@ extern const grpc_channel_filter grpc_client_channel_filter;
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver); grpc_resolver *resolver);
/* grpc_transport_setup_callback for binding new transports into a client
channel - user_data should be the channel stack containing the client
channel */
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
grpc_channel_stack *channel_stack, grpc_transport *transport,
grpc_channel_filter const **channel_filters, size_t num_channel_filters,
grpc_mdctx *mdctx);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */

@ -132,13 +132,12 @@ const grpc_channel_filter grpc_connected_channel_filter = {
"connected", "connected",
}; };
grpc_transport_setup_result grpc_connected_channel_bind_transport( void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
grpc_channel_stack *channel_stack, grpc_transport *transport) { grpc_transport *transport) {
/* Assumes that the connected channel filter is always the last filter /* Assumes that the connected channel filter is always the last filter
in a channel stack */ in a channel stack */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *cd = (channel_data *)elem->channel_data; channel_data *cd = (channel_data *)elem->channel_data;
grpc_transport_setup_result ret;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GPR_ASSERT(cd->transport == NULL); GPR_ASSERT(cd->transport == NULL);
cd->transport = transport; cd->transport = transport;
@ -150,7 +149,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport(
the last call element, and the last call element MUST be the connected the last call element, and the last call element MUST be the connected
channel. */ channel. */
channel_stack->call_stack_size += grpc_transport_stream_size(transport); channel_stack->call_stack_size += grpc_transport_stream_size(transport);
ret.user_data = elem;
return ret;
} }

@ -43,7 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
/* Post construction fixup: set the transport in the connected channel. /* Post construction fixup: set the transport in the connected channel.
Must be called before any call stack using this filter is used. */ Must be called before any call stack using this filter is used. */
grpc_transport_setup_result grpc_connected_channel_bind_transport( void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
grpc_channel_stack *channel_stack, grpc_transport *transport); grpc_transport *transport);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */

@ -871,10 +871,10 @@ void grpc_server_start(grpc_server *server) {
} }
} }
grpc_transport_setup_result grpc_server_setup_transport( void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters,
grpc_channel_filter const **extra_filters, size_t num_extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_mdctx *mdctx, const grpc_channel_args *args) { const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1; size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters = grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@ -892,7 +892,6 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_uint32 probes; gpr_uint32 probes;
gpr_uint32 max_probes = 0; gpr_uint32 max_probes = 0;
grpc_transport_op op; grpc_transport_op op;
grpc_transport_setup_result result;
for (i = 0; i < s->channel_filter_count; i++) { for (i = 0; i < s->channel_filter_count; i++) {
filters[i] = s->channel_filters[i]; filters[i] = s->channel_filters[i];
@ -954,8 +953,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
chand->registered_method_max_probes = max_probes; chand->registered_method_max_probes = max_probes;
} }
result = grpc_connected_channel_bind_transport( grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
grpc_channel_get_channel_stack(channel), transport); transport);
gpr_mu_lock(&s->mu_global); gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data; chand->next = &s->root_channel_data;
@ -964,8 +963,6 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_mu_unlock(&s->mu_global); gpr_mu_unlock(&s->mu_global);
gpr_free(filters); gpr_free(filters);
return result;
} }
void grpc_server_shutdown_and_notify(grpc_server *server, void grpc_server_shutdown_and_notify(grpc_server *server,

@ -55,10 +55,10 @@ void grpc_server_listener_destroy_done(void *server);
/* Setup a transport - creates a channel stack, binds the transport to the /* Setup a transport - creates a channel stack, binds the transport to the
server */ server */
grpc_transport_setup_result grpc_server_setup_transport( void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters,
grpc_channel_filter const **extra_filters, size_t num_extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_mdctx *mdctx, const grpc_channel_args *args); const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);

@ -42,14 +42,13 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
static grpc_transport_setup_result setup_transport(void *server, static void setup_transport(void *server, grpc_transport *transport,
grpc_transport *transport, grpc_mdctx *mdctx) {
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = { static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter}; &grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters, grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(server)); grpc_server_get_channel_args(server));
} }
static void new_transport(void *server, grpc_endpoint *tcp) { static void new_transport(void *server, grpc_endpoint *tcp) {
@ -60,9 +59,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* (as in server_secure_chttp2.c) needs to add synchronization to avoid this * (as in server_secure_chttp2.c) needs to add synchronization to avoid this
* case. * case.
*/ */
grpc_create_chttp2_transport(setup_transport, server, grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_server_get_channel_args(server), tcp, NULL, grpc_transport *transport = grpc_create_chttp2_transport(
0, grpc_mdctx_create(), 0); grpc_server_get_channel_args(server), tcp, NULL, 0, mdctx, 0);
setup_transport(server, transport, mdctx);
} }
/* Server callback: start listening on our ports */ /* Server callback: start listening on our ports */

@ -134,12 +134,6 @@ typedef struct {
grpc_chttp2_stream *prev; grpc_chttp2_stream *prev;
} grpc_chttp2_stream_link; } grpc_chttp2_stream_link;
typedef enum {
GRPC_CHTTP2_ERROR_STATE_NONE,
GRPC_CHTTP2_ERROR_STATE_SEEN,
GRPC_CHTTP2_ERROR_STATE_NOTIFIED
} grpc_chttp2_error_state;
/* We keep several sets of connection wide parameters */ /* We keep several sets of connection wide parameters */
typedef enum { typedef enum {
/* The settings our peer has asked for (and we have acked) */ /* The settings our peer has asked for (and we have acked) */
@ -174,6 +168,9 @@ typedef struct {
/** how much window would we like to have for incoming_window */ /** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target; gpr_uint32 connection_window_target;
/** have we seen a goaway */
gpr_uint8 seen_goaway;
/** is this transport a client? */ /** is this transport a client? */
gpr_uint8 is_client; gpr_uint8 is_client;
/** are the local settings dirty and need to be sent? */ /** are the local settings dirty and need to be sent? */
@ -185,10 +182,6 @@ typedef struct {
/** settings values */ /** settings values */
gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
/** has there been a connection level error, and have we notified
anyone about it? */
grpc_chttp2_error_state error_state;
/** what is the next stream id to be allocated by this peer? /** what is the next stream id to be allocated by this peer?
copied to next_stream_id in parsing when parsing commences */ copied to next_stream_id in parsing when parsing commences */
gpr_uint32 next_stream_id; gpr_uint32 next_stream_id;
@ -204,13 +197,6 @@ typedef struct {
/** concurrent stream count: updated when not parsing, /** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */ so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count; gpr_uint32 concurrent_stream_count;
/** is there a goaway available? (boolean) */
grpc_chttp2_error_state goaway_state;
/** what is the debug text of the goaway? */
gpr_slice goaway_text;
/** what is the status code of the goaway? */
grpc_status_code goaway_error;
} grpc_chttp2_transport_global; } grpc_chttp2_transport_global;
typedef struct { typedef struct {
@ -343,14 +329,14 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream **accepting_stream; grpc_chttp2_stream **accepting_stream;
struct { struct {
/** is a thread currently performing channel callbacks */ /* accept stream callback */
gpr_uint8 executing; void (*accept_stream)(void *user_data, grpc_transport *transport,
/** transport channel-level callback */ const void *server_data);
const grpc_transport_callbacks *cb; void *accept_stream_user_data;
/** user data for cb calls */
void *cb_user_data; /** connectivity tracking */
/** closure for notifying transport closure */ grpc_iomgr_closure *on_connectivity_changed;
grpc_iomgr_closure notify_closed; grpc_connectivity_state *connectivity;
} channel_callback; } channel_callback;
}; };

@ -87,7 +87,6 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */ /* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored); static void writing_action(void *t, int iomgr_success_ignored);
static void reading_action(void *t, int iomgr_success_ignored); static void reading_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */ /** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
@ -101,9 +100,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void drop_connection(grpc_chttp2_transport *t); static void drop_connection(grpc_chttp2_transport *t);
/** Perform a transport_op */ /** Perform a transport_op */
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, static void perform_stream_op_locked(
grpc_chttp2_stream_global *stream_global, grpc_chttp2_transport_global *transport_global,
grpc_transport_stream_op *op); grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
/** Cancel a stream: coming from the transport API */ /** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_chttp2_transport_global *transport_global, static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
@ -198,13 +197,11 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif #endif
static void init_transport(grpc_chttp2_transport *t, static void init_transport(grpc_chttp2_transport *t,
grpc_transport_setup_callback setup, void *arg,
const grpc_channel_args *channel_args, const grpc_channel_args *channel_args,
grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
grpc_mdctx *mdctx, int is_client) { grpc_mdctx *mdctx, int is_client) {
size_t i; size_t i;
int j; int j;
grpc_transport_setup_result sr;
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@ -219,7 +216,6 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_mdctx_ref(mdctx); grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx; t->metadata_context = mdctx;
t->endpoint_reading = 1; t->endpoint_reading = 1;
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE;
t->global.next_stream_id = is_client ? 1 : 2; t->global.next_stream_id = is_client ? 1 : 2;
t->global.is_client = is_client; t->global.is_client = is_client;
t->global.outgoing_window = DEFAULT_WINDOW; t->global.outgoing_window = DEFAULT_WINDOW;
@ -245,7 +241,6 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
if (is_client) { if (is_client) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
&t->global.qbuf, &t->global.qbuf,
@ -312,23 +307,8 @@ static void init_transport(grpc_chttp2_transport *t,
} }
} }
gpr_mu_lock(&t->mu);
t->channel_callback.executing = 1;
REF_TRANSPORT(t, "init"); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
lock(t);
t->channel_callback.cb = sr.callbacks;
t->channel_callback.cb_user_data = sr.user_data;
t->channel_callback.executing = 0;
unlock(t);
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
UNREF_TRANSPORT(t, "init");
} }
static void destroy_transport(grpc_transport *gt) { static void destroy_transport(grpc_transport *gt) {
@ -351,23 +331,6 @@ static void close_transport_locked(grpc_chttp2_transport *t) {
} }
} }
static void close_transport(grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
gpr_mu_lock(&t->mu);
close_transport_locked(t);
gpr_mu_unlock(&t->mu);
}
static void goaway(grpc_transport *gt, grpc_status_code status,
gpr_slice debug_data) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(status),
debug_data, &t->global.qbuf);
unlock(t);
}
static int init_stream(grpc_transport *gt, grpc_stream *gs, static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data, grpc_transport_stream_op *initial_op) { const void *server_data, grpc_transport_stream_op *initial_op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
@ -399,7 +362,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.in_stream_map = 1; s->global.in_stream_map = 1;
} }
if (initial_op) perform_op_locked(&t->global, &s->global, initial_op); if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op);
unlock(t); unlock(t);
return 0; return 0;
@ -454,8 +417,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
GPR_ASSERT(t->accepting_stream == NULL); GPR_ASSERT(t->accepting_stream == NULL);
t->accepting_stream = &accepting; t->accepting_stream = &accepting;
t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
&t->base, (void *)(gpr_uintptr)id); &t->base, (void *)(gpr_uintptr)id);
t->accepting_stream = NULL; t->accepting_stream = NULL;
return &accepting->parsing; return &accepting->parsing;
} }
@ -476,7 +439,7 @@ static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures; grpc_iomgr_closure *run_closures;
unlock_check_read_write_state(t); unlock_check_read_write_state(t);
if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1; t->writing_active = 1;
REF_TRANSPORT(t, "writing"); REF_TRANSPORT(t, "writing");
@ -553,14 +516,10 @@ void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) { gpr_slice goaway_text) {
char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT);
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
gpr_free(msg); gpr_free(msg);
if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) { gpr_slice_unref(goaway_text);
transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_SEEN; transport_global->seen_goaway = 1;
transport_global->goaway_text = goaway_text;
transport_global->goaway_error = goaway_error;
} else {
gpr_slice_unref(goaway_text);
}
} }
static void maybe_start_some_streams( static void maybe_start_some_streams(
@ -613,9 +572,9 @@ static void maybe_start_some_streams(
} }
} }
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, static void perform_stream_op_locked(
grpc_chttp2_stream_global *stream_global, grpc_chttp2_transport_global *transport_global,
grpc_transport_stream_op *op) { grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) { if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(transport_global, stream_global, op->cancel_with_status); cancel_from_api(transport_global, stream_global, op->cancel_with_status);
} }
@ -672,21 +631,19 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
} }
} }
static void perform_op(grpc_transport *gt, grpc_stream *gs, static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
lock(t); lock(t);
perform_op_locked(&t->global, &s->global, op); perform_stream_op_locked(&t->global, &s->global, op);
unlock(t); unlock(t);
} }
static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { static void send_ping_locked(grpc_chttp2_transport *t,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_iomgr_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
lock(t);
p->next = &t->global.pings; p->next = &t->global.pings;
p->prev = p->next->prev; p->prev = p->next->prev;
p->prev->next = p->next->prev = p; p->prev->next = p->next->prev = p;
@ -700,6 +657,49 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
p->id[7] = t->global.ping_counter & 0xff; p->id[7] = t->global.ping_counter & 0xff;
p->on_recv = on_recv; p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
}
static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
if (op->on_consumed) {
grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1);
}
if (op->on_connectivity_state_change) {
GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL);
t->channel_callback.on_connectivity_changed =
op->on_connectivity_state_change;
t->channel_callback.connectivity = op->connectivity_state;
}
if (op->send_goaway) {
grpc_chttp2_goaway_append(
t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
*op->goaway_message, &t->global.qbuf);
}
if (op->set_accept_stream != NULL) {
t->channel_callback.accept_stream = op->set_accept_stream;
t->channel_callback.accept_stream_user_data =
op->set_accept_stream_user_data;
}
if (op->bind_pollset) {
add_to_pollset_locked(t, op->bind_pollset);
}
if (op->send_ping) {
send_ping_locked(t, op->send_ping);
}
if (op->disconnect) {
close_transport_locked(t);
}
unlock(t); unlock(t);
} }
@ -839,9 +839,6 @@ static void end_all_the_calls(grpc_chttp2_transport *t) {
} }
static void drop_connection(grpc_chttp2_transport *t) { static void drop_connection(grpc_chttp2_transport *t) {
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
}
close_transport_locked(t); close_transport_locked(t);
end_all_the_calls(t); end_all_the_calls(t);
} }
@ -886,7 +883,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
lock(t); lock(t);
i = 0; i = 0;
GPR_ASSERT(!t->parsing_active); GPR_ASSERT(!t->parsing_active);
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { if (!t->closed) {
t->parsing_active = 1; t->parsing_active = 1;
/* merge stream lists */ /* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_stream_map_move_into(&t->new_stream_map,
@ -931,67 +928,21 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP * CALLBACK LOOP
*/ */
typedef struct {
grpc_chttp2_transport *t;
gpr_uint32 error;
gpr_slice text;
grpc_iomgr_closure closure;
} notify_goaways_args;
static void notify_goaways(void *p, int iomgr_success_ignored) {
notify_goaways_args *a = p;
grpc_chttp2_transport *t = a->t;
t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base,
a->error, a->text);
gpr_free(a);
lock(t);
t->channel_callback.executing = 0;
unlock(t);
UNREF_TRANSPORT(t, "notify_goaways");
}
static void notify_closed(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
lock(t);
t->channel_callback.executing = 0;
unlock(t);
UNREF_TRANSPORT(t, "notify_closed");
}
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
if (t->channel_callback.executing) { if (t->channel_callback.on_connectivity_changed != NULL) {
return; grpc_connectivity_state current;
} if (t->closed || t->global.seen_goaway) {
if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) { current = GRPC_CHANNEL_FATAL_FAILURE;
if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && } else {
t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { current = GRPC_CHANNEL_READY;
notify_goaways_args *a = gpr_malloc(sizeof(*a)); }
a->t = t; if (current != *t->channel_callback.connectivity) {
a->error = t->global.goaway_error; *t->channel_callback.connectivity = current;
a->text = t->global.goaway_text; grpc_chttp2_schedule_closure(
t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; &t->global, t->channel_callback.on_connectivity_changed, 1);
t->channel_callback.executing = 1; t->channel_callback.on_connectivity_changed = NULL;
grpc_iomgr_closure_init(&a->closure, notify_goaways, a); t->channel_callback.connectivity = NULL;
REF_TRANSPORT(t, "notify_goaways");
grpc_chttp2_schedule_closure(&t->global, &a->closure, 1);
return;
} else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
return;
} }
}
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
t->channel_callback.executing = 1;
REF_TRANSPORT(t, "notify_closed");
grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed,
1);
} }
} }
@ -1014,13 +965,6 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
} }
} }
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
add_to_pollset_locked(t, pollset);
unlock(t);
}
/* /*
* TRACING * TRACING
*/ */
@ -1056,19 +1000,14 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
* INTEGRATION GLUE * INTEGRATION GLUE
*/ */
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), static const grpc_transport_vtable vtable = {
init_stream, sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
perform_op, perform_transport_op, destroy_stream, destroy_transport};
destroy_stream,
destroy_transport}; grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices,
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, size_t nslices, grpc_mdctx *mdctx, int is_client) {
void *arg,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_mdctx *mdctx,
int is_client) {
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx, init_transport(t, channel_args, ep, slices, nslices, mdctx, is_client);
is_client); return &t->base;
} }

@ -40,11 +40,8 @@
extern int grpc_http_trace; extern int grpc_http_trace;
extern int grpc_flowctl_trace; extern int grpc_flowctl_trace;
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, grpc_transport *grpc_create_chttp2_transport(
void *arg, const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices,
const grpc_channel_args *channel_args, size_t nslices, grpc_mdctx *metadata_context, int is_client);
grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_mdctx *metadata_context,
int is_client);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */

@ -64,24 +64,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream); transport->vtable->destroy_stream(transport, stream);
} }
void grpc_transport_setup_cancel(grpc_transport_setup *setup) {
setup->vtable->cancel(setup);
}
void grpc_transport_setup_initiate(grpc_transport_setup *setup) {
setup->vtable->initiate(setup);
}
void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup,
grpc_pollset *pollset) {
setup->vtable->add_interested_party(setup, pollset);
}
void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
grpc_pollset *pollset) {
setup->vtable->del_interested_party(setup, pollset);
}
void grpc_transport_stream_op_finish_with_failure( void grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
if (op->send_ops) { if (op->send_ops) {

@ -174,51 +174,4 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */ /* Destroy the transport */
void grpc_transport_destroy(grpc_transport *transport); void grpc_transport_destroy(grpc_transport *transport);
/* Return type for grpc_transport_setup_callback */
typedef struct grpc_transport_setup_result {
void *user_data;
} grpc_transport_setup_result;
/* Given a transport, return callbacks for that transport. Used to finalize
setup as a transport is being created */
typedef grpc_transport_setup_result (*grpc_transport_setup_callback)(
void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx);
typedef struct grpc_transport_setup grpc_transport_setup;
typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable;
struct grpc_transport_setup_vtable {
void (*initiate)(grpc_transport_setup *setup);
void (*add_interested_party)(grpc_transport_setup *setup,
grpc_pollset *pollset);
void (*del_interested_party)(grpc_transport_setup *setup,
grpc_pollset *pollset);
void (*cancel)(grpc_transport_setup *setup);
};
/* Transport setup is an asynchronous utility interface for client channels to
establish connections. It's transport agnostic. */
struct grpc_transport_setup {
const grpc_transport_setup_vtable *vtable;
};
/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name
given at transport construction time, create the tcp connection, perform
handshakes, and call some grpc_transport_setup_result function provided at
setup construction time.
This *may* be implemented as a no-op if the setup process monitors something
continuously. */
void grpc_transport_setup_initiate(grpc_transport_setup *setup);
void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup,
grpc_pollset *pollset);
void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
grpc_pollset *pollset);
/* Cancel transport setup. After this returns, no new transports should be
created, and all pending transport setup callbacks should be completed.
After this call completes, setup should be considered invalid (this can be
used as a destruction call by setup). */
void grpc_transport_setup_cancel(grpc_transport_setup *setup);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */

@ -51,8 +51,7 @@ typedef struct grpc_transport_vtable {
grpc_transport_stream_op *op); grpc_transport_stream_op *op);
/* implementation of grpc_transport_perform_op */ /* implementation of grpc_transport_perform_op */
void (*perform_op)(grpc_transport *self, grpc_stream *stream, void (*perform_op)(grpc_transport *self, grpc_transport_op *op);
grpc_transport_op *op);
/* implementation of grpc_transport_destroy_stream */ /* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);

Loading…
Cancel
Save