diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 95afc0d2e37..a4de59efb1e 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -77,6 +77,7 @@ typedef struct { typedef enum { CALL_CREATED, + CALL_WAITING_FOR_SEND, CALL_WAITING_FOR_CONFIG, CALL_WAITING_FOR_PICK, CALL_WAITING_FOR_CALL, @@ -101,6 +102,9 @@ struct call_data { grpc_linked_mdelem details; }; +static grpc_iomgr_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT; + static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; @@ -241,12 +245,13 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { &calld->picked_channel, &calld->async_setup_task); } -static void merge_into_waiting_op(grpc_call_element *elem, +static grpc_iomgr_closure *merge_into_waiting_op(grpc_call_element *elem, grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; + grpc_iomgr_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL)); - GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL)); + GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL) || waiting_op->send_ops == NULL); + GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL) || waiting_op->recv_ops == NULL); if (new_op->send_ops != NULL) { waiting_op->send_ops = new_op->send_ops; waiting_op->is_last_send = new_op->is_last_send; @@ -257,13 +262,16 @@ static void merge_into_waiting_op(grpc_call_element *elem, waiting_op->recv_state = new_op->recv_state; waiting_op->on_done_recv = new_op->on_done_recv; } - if (waiting_op->on_consumed == NULL) { + if (new_op->on_consumed != NULL) { + if (waiting_op->on_consumed != NULL) { + consumed_op = waiting_op->on_consumed; + } waiting_op->on_consumed = new_op->on_consumed; - new_op->on_consumed = NULL; } if (new_op->cancel_with_status != GRPC_STATUS_OK) { waiting_op->cancel_with_status = new_op->cancel_with_status; } + return consumed_op; } static void perform_transport_stream_op(grpc_call_element *elem, @@ -274,6 +282,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; + grpc_iomgr_closure *consumed_op = NULL; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -289,6 +298,17 @@ static void perform_transport_stream_op(grpc_call_element *elem, gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); break; + case CALL_WAITING_FOR_SEND: + GPR_ASSERT(!continuation); + consumed_op = merge_into_waiting_op(elem, op); + if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { + gpr_mu_unlock(&calld->mu_state); + break; + } + *op = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + continuation = 1; + /* fall through */ case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CALL: @@ -308,7 +328,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, &op2); } else { - merge_into_waiting_op(elem, op); + consumed_op = merge_into_waiting_op(elem, op); gpr_mu_unlock(&calld->mu_state); if (op->on_consumed != NULL) { op->on_consumed->cb(op->on_consumed->cb_arg, 0); @@ -325,26 +345,37 @@ static void perform_transport_stream_op(grpc_call_element *elem, } else { calld->waiting_op = *op; - gpr_mu_lock(&chand->mu_config); - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "pick"); - gpr_mu_unlock(&chand->mu_config); - calld->state = CALL_WAITING_FOR_PICK; + if (op->send_ops == NULL) { + /* need to have some send ops before we can select the + lb target */ + calld->state = CALL_WAITING_FOR_SEND; gpr_mu_unlock(&calld->mu_state); - - pick_target(lb_policy, calld); - - GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else { - calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config(elem); - gpr_mu_unlock(&chand->mu_config); - gpr_mu_unlock(&calld->mu_state); + gpr_mu_lock(&chand->mu_config); + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "pick"); + gpr_mu_unlock(&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; + gpr_mu_unlock(&calld->mu_state); + + pick_target(lb_policy, calld); + + GRPC_LB_POLICY_UNREF(lb_policy, "pick"); + } else { + calld->state = CALL_WAITING_FOR_CONFIG; + add_to_lb_policy_wait_queue_locked_state_config(elem); + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + } } } break; } + + if (consumed_op != NULL) { + consumed_op->cb(consumed_op->cb_arg, 1); + } } static void cc_start_transport_stream_op(grpc_call_element *elem, @@ -503,6 +534,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_CALL: + case CALL_WAITING_FOR_SEND: gpr_log(GPR_ERROR, "should never reach here"); abort(); break; diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 914355a408b..3f5557e08ee 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -198,7 +198,7 @@ static void on_connected(void *arg, grpc_endpoint *tcp) { GRPC_SECURITY_OK); grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done, req); - grpc_security_connector_unref(&sc->base); + GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } else { start_write(req); } diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 5d04ec49b24..6816fbcfa1e 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -297,7 +297,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, /* initialize members */ GPR_ASSERT(sc->is_client_side); chand->security_connector = - (grpc_channel_security_connector *)grpc_security_connector_ref(sc); + (grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF(sc, "client_auth_filter"); chand->md_ctx = metadata_context; chand->authority_string = grpc_mdstr_from_string(chand->md_ctx, ":authority"); chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path"); @@ -310,7 +310,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; grpc_channel_security_connector *ctx = chand->security_connector; - if (ctx != NULL) grpc_security_connector_unref(&ctx->base); + if (ctx != NULL) GRPC_SECURITY_CONNECTOR_UNREF(&ctx->base, "client_auth_filter"); if (chand->authority_string != NULL) { grpc_mdstr_unref(chand->authority_string); } diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 1b39ab141ec..becc23bf7f5 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -74,7 +74,7 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s, if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); gpr_slice_buffer_destroy(&s->left_overs); - grpc_security_connector_unref(s->connector); + GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); gpr_free(s); } @@ -275,7 +275,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, secure_transport_setup_done(s, 0); return; } - s->connector = grpc_security_connector_ref(connector); + s->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); s->endpoint = nonsecure_endpoint; diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 34cb0395a24..f53e005d5bb 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -124,24 +124,42 @@ grpc_security_status grpc_channel_security_connector_check_call_host( return sc->check_call_host(sc, host, cb, user_data); } -void grpc_security_connector_unref(grpc_security_connector *sc) { - if (sc == NULL) return; - if (gpr_unref(&sc->refcount)) sc->vtable->destroy(sc); -} - -grpc_security_connector *grpc_security_connector_ref( - grpc_security_connector *sc) { +#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG +grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *sc, + const char *file, int line, + const char *reason) { + if (sc == NULL) return NULL; + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SECURITY_CONNECTOR:%p ref %d -> %d %s", sc, (int)sc->refcount.count, + (int)sc->refcount.count + 1, reason); +#else +grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *sc) { if (sc == NULL) return NULL; +#endif gpr_ref(&sc->refcount); return sc; } +#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG +void grpc_security_connector_unref(grpc_security_connector *sc, const char *file, int line, + const char *reason) { + if (sc == NULL) return; + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SECURITY_CONNECTOR:%p unref %d -> %d %s", sc, (int)sc->refcount.count, + (int)sc->refcount.count - 1, reason); +#else +void grpc_security_connector_unref(grpc_security_connector *sc) { + if (sc == NULL) return; +#endif + if (gpr_unref(&sc->refcount)) sc->vtable->destroy(sc); +} + static void connector_pointer_arg_destroy(void *p) { - grpc_security_connector_unref(p); + GRPC_SECURITY_CONNECTOR_UNREF(p, "connector_pointer_arg"); } static void *connector_pointer_arg_copy(void *p) { - return grpc_security_connector_ref(p); + return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg"); } grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) { diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index ee3057b43ba..f258b86b28e 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -80,12 +80,23 @@ struct grpc_security_connector { grpc_auth_context *auth_context; /* Populated after the peer is checked. */ }; -/* Increments the refcount. */ -grpc_security_connector *grpc_security_connector_ref( - grpc_security_connector *sc); - -/* Decrements the refcount and destroys the object if it reaches 0. */ -void grpc_security_connector_unref(grpc_security_connector *sc); +/* Refcounting. */ +#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG +#define GRPC_SECURITY_CONNECTOR_REF(p, r) \ + grpc_security_connector_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) \ + grpc_security_connector_unref((p), __FILE__, __LINE__, (r)) +grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *policy, + const char *file, int line, + const char *reason); +void grpc_security_connector_unref(grpc_security_connector *policy, const char *file, + int line, const char *reason); +#else +#define GRPC_SECURITY_CONNECTOR_REF(p, r) grpc_security_connector_ref((p)) +#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) grpc_security_connector_unref((p)) +grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *policy); +void grpc_security_connector_unref(grpc_security_connector *policy); +#endif /* Handshake creation. */ grpc_security_status grpc_security_connector_create_handshaker( diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 5675c064023..51a4b32a669 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -107,14 +107,14 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, /* initialize members */ GPR_ASSERT(!sc->is_client_side); - chand->security_connector = grpc_security_connector_ref(sc); + chand->security_connector = GRPC_SECURITY_CONNECTOR_REF(sc, "server_auth_filter"); } /* Destructor for channel data */ static void destroy_channel_elem(grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; - grpc_security_connector_unref(chand->security_connector); + GRPC_SECURITY_CONNECTOR_UNREF(chand->security_connector, "server_auth_filter"); } const grpc_channel_filter grpc_server_auth_filter = { diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 2e49e370f75..6a99324da6b 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -70,7 +70,7 @@ static void state_unref(grpc_server_secure_state *state) { gpr_mu_lock(&state->mu); gpr_mu_unlock(&state->mu); /* clean up */ - grpc_security_connector_unref(state->sc); + GRPC_SECURITY_CONNECTOR_UNREF(state->sc, "server"); gpr_free(state); } } @@ -220,7 +220,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, /* Error path: cleanup and return */ error: if (sc) { - grpc_security_connector_unref(sc); + GRPC_SECURITY_CONNECTOR_UNREF(sc, "server"); } if (resolved) { grpc_resolved_addresses_destroy(resolved); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 1dd9e61d0fc..927c678c673 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -142,6 +142,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { static void subchannel_factory_unref(grpc_subchannel_factory *scf) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { + GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -218,6 +219,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, gpr_ref_init(&f->refs, 1); grpc_mdctx_ref(mdctx); f->mdctx = mdctx; + GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); f->security_connector = connector; f->merge_args = grpc_channel_args_copy(args_copy); resolver = grpc_resolver_create(target, &f->base); @@ -229,6 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); + grpc_subchannel_factory_unref(&f->base); + GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create"); grpc_channel_args_destroy(args_copy); if (new_args_from_connector != NULL) {