diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index da02073353f..5ab64b9c46b 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,12 +52,4 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); -/* grpc_transport_setup_callback for binding new transports into a client - channel - user_data should be the channel stack containing the client - channel */ -grpc_transport_setup_result grpc_client_channel_transport_setup_complete( - grpc_channel_stack *channel_stack, grpc_transport *transport, - grpc_channel_filter const **channel_filters, size_t num_channel_filters, - grpc_mdctx *mdctx); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 1d30b073abb..80a3100af05 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -132,13 +132,12 @@ const grpc_channel_filter grpc_connected_channel_filter = { "connected", }; -grpc_transport_setup_result grpc_connected_channel_bind_transport( - grpc_channel_stack *channel_stack, grpc_transport *transport) { +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport) { /* Assumes that the connected channel filter is always the last filter in a channel stack */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *cd = (channel_data *)elem->channel_data; - grpc_transport_setup_result ret; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(cd->transport == NULL); cd->transport = transport; @@ -150,7 +149,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport( the last call element, and the last call element MUST be the connected channel. */ channel_stack->call_stack_size += grpc_transport_stream_size(transport); - - ret.user_data = elem; - return ret; } diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index 8b35f69b26f..d1e2c195cb7 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -43,7 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter; /* Post construction fixup: set the transport in the connected channel. Must be called before any call stack using this filter is used. */ -grpc_transport_setup_result grpc_connected_channel_bind_transport( - grpc_channel_stack *channel_stack, grpc_transport *transport); +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 568f7925dd5..98e0e81eaae 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -871,10 +871,10 @@ void grpc_server_start(grpc_server *server) { } } -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *s, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args) { +void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); @@ -892,7 +892,6 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_uint32 probes; gpr_uint32 max_probes = 0; grpc_transport_op op; - grpc_transport_setup_result result; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -954,8 +953,8 @@ grpc_transport_setup_result grpc_server_setup_transport( chand->registered_method_max_probes = max_probes; } - result = grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); gpr_mu_lock(&s->mu_global); chand->next = &s->root_channel_data; @@ -964,8 +963,6 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_mu_unlock(&s->mu_global); gpr_free(filters); - - return result; } void grpc_server_shutdown_and_notify(grpc_server *server, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 91a1a2a7f65..2899c6dea38 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -55,10 +55,10 @@ void grpc_server_listener_destroy_done(void *server); /* Setup a transport - creates a channel stack, binds the transport to the server */ -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *server, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args); +void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 7e49a531df2..9c02c3ef297 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -42,14 +42,13 @@ #include #include -static grpc_transport_setup_result setup_transport(void *server, - grpc_transport *transport, - grpc_mdctx *mdctx) { +static void setup_transport(void *server, grpc_transport *transport, + grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - return grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(server)); + grpc_server_setup_transport(server, transport, extra_filters, + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(server)); } static void new_transport(void *server, grpc_endpoint *tcp) { @@ -60,9 +59,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_create_chttp2_transport(setup_transport, server, - grpc_server_get_channel_args(server), tcp, NULL, - 0, grpc_mdctx_create(), 0); + grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_transport *transport = grpc_create_chttp2_transport( + grpc_server_get_channel_args(server), tcp, NULL, 0, mdctx, 0); + setup_transport(server, transport, mdctx); } /* Server callback: start listening on our ports */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 02c94744ee2..93235aef552 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -134,12 +134,6 @@ typedef struct { grpc_chttp2_stream *prev; } grpc_chttp2_stream_link; -typedef enum { - GRPC_CHTTP2_ERROR_STATE_NONE, - GRPC_CHTTP2_ERROR_STATE_SEEN, - GRPC_CHTTP2_ERROR_STATE_NOTIFIED -} grpc_chttp2_error_state; - /* We keep several sets of connection wide parameters */ typedef enum { /* The settings our peer has asked for (and we have acked) */ @@ -174,6 +168,9 @@ typedef struct { /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; + /** have we seen a goaway */ + gpr_uint8 seen_goaway; + /** is this transport a client? */ gpr_uint8 is_client; /** are the local settings dirty and need to be sent? */ @@ -185,10 +182,6 @@ typedef struct { /** settings values */ gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - /** has there been a connection level error, and have we notified - anyone about it? */ - grpc_chttp2_error_state error_state; - /** what is the next stream id to be allocated by this peer? copied to next_stream_id in parsing when parsing commences */ gpr_uint32 next_stream_id; @@ -204,13 +197,6 @@ typedef struct { /** concurrent stream count: updated when not parsing, so this is a strict over-estimation on the client */ gpr_uint32 concurrent_stream_count; - - /** is there a goaway available? (boolean) */ - grpc_chttp2_error_state goaway_state; - /** what is the debug text of the goaway? */ - gpr_slice goaway_text; - /** what is the status code of the goaway? */ - grpc_status_code goaway_error; } grpc_chttp2_transport_global; typedef struct { @@ -343,14 +329,14 @@ struct grpc_chttp2_transport { grpc_chttp2_stream **accepting_stream; struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; - /** transport channel-level callback */ - const grpc_transport_callbacks *cb; - /** user data for cb calls */ - void *cb_user_data; - /** closure for notifying transport closure */ - grpc_iomgr_closure notify_closed; + /* accept stream callback */ + void (*accept_stream)(void *user_data, grpc_transport *transport, + const void *server_data); + void *accept_stream_user_data; + + /** connectivity tracking */ + grpc_iomgr_closure *on_connectivity_changed; + grpc_connectivity_state *connectivity; } channel_callback; }; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6e61af6f191..11dd60bbb9c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -87,7 +87,6 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); static void reading_action(void *t, int iomgr_success_ignored); -static void notify_closed(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, @@ -101,9 +100,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void drop_connection(grpc_chttp2_transport *t); /** Perform a transport_op */ -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_stream_op *op); +static void perform_stream_op_locked( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, @@ -198,13 +197,11 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } #endif static void init_transport(grpc_chttp2_transport *t, - grpc_transport_setup_callback setup, void *arg, const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_mdctx *mdctx, int is_client) { size_t i; int j; - grpc_transport_setup_result sr; GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); @@ -219,7 +216,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; t->endpoint_reading = 1; - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE; t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; t->global.outgoing_window = DEFAULT_WINDOW; @@ -245,7 +241,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); - grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t); if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -312,23 +307,8 @@ static void init_transport(grpc_chttp2_transport *t, } } - gpr_mu_lock(&t->mu); - t->channel_callback.executing = 1; - REF_TRANSPORT(t, "init"); /* matches unref at end of this function */ - gpr_mu_unlock(&t->mu); - - sr = setup(arg, &t->base, t->metadata_context); - - lock(t); - t->channel_callback.cb = sr.callbacks; - t->channel_callback.cb_user_data = sr.user_data; - t->channel_callback.executing = 0; - unlock(t); - REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); - - UNREF_TRANSPORT(t, "init"); } static void destroy_transport(grpc_transport *gt) { @@ -351,23 +331,6 @@ static void close_transport_locked(grpc_chttp2_transport *t) { } } -static void close_transport(grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - gpr_mu_lock(&t->mu); - close_transport_locked(t); - gpr_mu_unlock(&t->mu); -} - -static void goaway(grpc_transport *gt, grpc_status_code status, - gpr_slice debug_data) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, - grpc_chttp2_grpc_status_to_http2_error(status), - debug_data, &t->global.qbuf); - unlock(t); -} - static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data, grpc_transport_stream_op *initial_op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; @@ -399,7 +362,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.in_stream_map = 1; } - if (initial_op) perform_op_locked(&t->global, &s->global, initial_op); + if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op); unlock(t); return 0; @@ -454,8 +417,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, - &t->base, (void *)(gpr_uintptr)id); + t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + &t->base, (void *)(gpr_uintptr)id); t->accepting_stream = NULL; return &accepting->parsing; } @@ -476,7 +439,7 @@ static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; unlock_check_read_write_state(t); - if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && + if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); @@ -553,14 +516,10 @@ void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, gpr_slice goaway_text) { char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg); gpr_free(msg); - if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_SEEN; - transport_global->goaway_text = goaway_text; - transport_global->goaway_error = goaway_error; - } else { - gpr_slice_unref(goaway_text); - } + gpr_slice_unref(goaway_text); + transport_global->seen_goaway = 1; } static void maybe_start_some_streams( @@ -613,9 +572,9 @@ static void maybe_start_some_streams( } } -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_stream_op *op) { +static void perform_stream_op_locked( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -672,21 +631,19 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, } } -static void perform_op(grpc_transport *gt, grpc_stream *gs, - grpc_transport_stream_op *op) { +static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, + grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; lock(t); - perform_op_locked(&t->global, &s->global, op); + perform_stream_op_locked(&t->global, &s->global, op); unlock(t); } -static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void send_ping_locked(grpc_chttp2_transport *t, + grpc_iomgr_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); - - lock(t); p->next = &t->global.pings; p->prev = p->next->prev; p->prev->next = p->next->prev = p; @@ -700,6 +657,49 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { p->id[7] = t->global.ping_counter & 0xff; p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); +} + +static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + if (op->on_consumed) { + grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1); + } + + if (op->on_connectivity_state_change) { + GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL); + t->channel_callback.on_connectivity_changed = + op->on_connectivity_state_change; + t->channel_callback.connectivity = op->connectivity_state; + } + + if (op->send_goaway) { + grpc_chttp2_goaway_append( + t->global.last_incoming_stream_id, + grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + *op->goaway_message, &t->global.qbuf); + } + + if (op->set_accept_stream != NULL) { + t->channel_callback.accept_stream = op->set_accept_stream; + t->channel_callback.accept_stream_user_data = + op->set_accept_stream_user_data; + } + + if (op->bind_pollset) { + add_to_pollset_locked(t, op->bind_pollset); + } + + if (op->send_ping) { + send_ping_locked(t, op->send_ping); + } + + if (op->disconnect) { + close_transport_locked(t); + } + unlock(t); } @@ -839,9 +839,6 @@ static void end_all_the_calls(grpc_chttp2_transport *t) { } static void drop_connection(grpc_chttp2_transport *t) { - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; - } close_transport_locked(t); end_all_the_calls(t); } @@ -886,7 +883,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, lock(t); i = 0; GPR_ASSERT(!t->parsing_active); - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + if (!t->closed) { t->parsing_active = 1; /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, @@ -931,67 +928,21 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ -typedef struct { - grpc_chttp2_transport *t; - gpr_uint32 error; - gpr_slice text; - grpc_iomgr_closure closure; -} notify_goaways_args; - -static void notify_goaways(void *p, int iomgr_success_ignored) { - notify_goaways_args *a = p; - grpc_chttp2_transport *t = a->t; - - t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base, - a->error, a->text); - - gpr_free(a); - - lock(t); - t->channel_callback.executing = 0; - unlock(t); - - UNREF_TRANSPORT(t, "notify_goaways"); -} - -static void notify_closed(void *gt, int iomgr_success_ignored) { - grpc_chttp2_transport *t = gt; - t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base); - - lock(t); - t->channel_callback.executing = 0; - unlock(t); - - UNREF_TRANSPORT(t, "notify_closed"); -} - static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { - if (t->channel_callback.executing) { - return; - } - if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) { - if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && - t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { - notify_goaways_args *a = gpr_malloc(sizeof(*a)); - a->t = t; - a->error = t->global.goaway_error; - a->text = t->global.goaway_text; - t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; - grpc_iomgr_closure_init(&a->closure, notify_goaways, a); - REF_TRANSPORT(t, "notify_goaways"); - grpc_chttp2_schedule_closure(&t->global, &a->closure, 1); - return; - } else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { - return; + if (t->channel_callback.on_connectivity_changed != NULL) { + grpc_connectivity_state current; + if (t->closed || t->global.seen_goaway) { + current = GRPC_CHANNEL_FATAL_FAILURE; + } else { + current = GRPC_CHANNEL_READY; + } + if (current != *t->channel_callback.connectivity) { + *t->channel_callback.connectivity = current; + grpc_chttp2_schedule_closure( + &t->global, t->channel_callback.on_connectivity_changed, 1); + t->channel_callback.on_connectivity_changed = NULL; + t->channel_callback.connectivity = NULL; } - } - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) { - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; - REF_TRANSPORT(t, "notify_closed"); - grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, - 1); } } @@ -1014,13 +965,6 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, } } -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - add_to_pollset_locked(t, pollset); - unlock(t); -} - /* * TRACING */ @@ -1056,19 +1000,14 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), - init_stream, - perform_op, - destroy_stream, - destroy_transport}; - -void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, - void *arg, - const grpc_channel_args *channel_args, - grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *mdctx, - int is_client) { +static const grpc_transport_vtable vtable = { + sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport}; + +grpc_transport *grpc_create_chttp2_transport( + const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_mdctx *mdctx, int is_client) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx, - is_client); + init_transport(t, channel_args, ep, slices, nslices, mdctx, is_client); + return &t->base; } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index 18e19f03afe..1747792b95e 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -40,11 +40,8 @@ extern int grpc_http_trace; extern int grpc_flowctl_trace; -void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, - void *arg, - const grpc_channel_args *channel_args, - grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *metadata_context, - int is_client); +grpc_transport *grpc_create_chttp2_transport( + const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_mdctx *metadata_context, int is_client); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a570cba33e8..c29217599e0 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -64,24 +64,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_setup_cancel(grpc_transport_setup *setup) { - setup->vtable->cancel(setup); -} - -void grpc_transport_setup_initiate(grpc_transport_setup *setup) { - setup->vtable->initiate(setup); -} - -void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset) { - setup->vtable->add_interested_party(setup, pollset); -} - -void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset) { - setup->vtable->del_interested_party(setup, pollset); -} - void grpc_transport_stream_op_finish_with_failure( grpc_transport_stream_op *op) { if (op->send_ops) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 1acd665a1df..24a02132e95 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -174,51 +174,4 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport); -/* Return type for grpc_transport_setup_callback */ -typedef struct grpc_transport_setup_result { - void *user_data; -} grpc_transport_setup_result; - -/* Given a transport, return callbacks for that transport. Used to finalize - setup as a transport is being created */ -typedef grpc_transport_setup_result (*grpc_transport_setup_callback)( - void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx); - -typedef struct grpc_transport_setup grpc_transport_setup; -typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable; - -struct grpc_transport_setup_vtable { - void (*initiate)(grpc_transport_setup *setup); - void (*add_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*del_interested_party)(grpc_transport_setup *setup, - grpc_pollset *pollset); - void (*cancel)(grpc_transport_setup *setup); -}; - -/* Transport setup is an asynchronous utility interface for client channels to - establish connections. It's transport agnostic. */ -struct grpc_transport_setup { - const grpc_transport_setup_vtable *vtable; -}; - -/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name - given at transport construction time, create the tcp connection, perform - handshakes, and call some grpc_transport_setup_result function provided at - setup construction time. - This *may* be implemented as a no-op if the setup process monitors something - continuously. */ -void grpc_transport_setup_initiate(grpc_transport_setup *setup); - -void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); -void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, - grpc_pollset *pollset); - -/* Cancel transport setup. After this returns, no new transports should be - created, and all pending transport setup callbacks should be completed. - After this call completes, setup should be considered invalid (this can be - used as a destruction call by setup). */ -void grpc_transport_setup_cancel(grpc_transport_setup *setup); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 82839390502..b18f9570095 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -51,8 +51,7 @@ typedef struct grpc_transport_vtable { grpc_transport_stream_op *op); /* implementation of grpc_transport_perform_op */ - void (*perform_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_op *op); + void (*perform_op)(grpc_transport *self, grpc_transport_op *op); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);