Conversion progress

reviewable/pr9949/r1
Craig Tiller 8 years ago
parent 2e34bd7ca3
commit c55c102ed4
  1. 43
      src/core/ext/client_channel/client_channel.c
  2. 9
      src/core/ext/load_reporting/load_reporting_filter.c
  3. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  4. 6
      src/core/lib/transport/transport.h

@ -449,7 +449,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
grpc_transport_op *op = arg;
grpc_channel_element *elem = op->transport_private.extra_arg;
grpc_channel_element *elem = op->handler_private.extra_arg;
channel_data *chand = elem->channel_data;
if (op->on_connectivity_state_change != NULL) {
@ -510,12 +510,12 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->bind_pollset);
}
op->transport_private.args[0] = elem;
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
grpc_closure_sched(
exec_ctx, grpc_closure_init(
&op->transport_private.closure, start_transport_op_locked,
op, grpc_combiner_scheduler(chand->combiner, false)),
exec_ctx,
grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
op, grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
}
@ -926,7 +926,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_error != GRPC_ERROR_NONE) {
if (op->cancel_stream) {
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
/* recurse to retry */
@ -939,27 +939,29 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
cancelled before any ops are passed down (e.g., if the deadline
is in the past when the call starts), we can return the right
error to the caller when the first op does get passed down. */
calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
calld->cancel_error =
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
fail_locked(exec_ctx, calld,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel_locked(exec_ctx, elem, NULL, 0,
&calld->connected_subchannel, NULL,
GRPC_ERROR_REF(op->cancel_error));
pick_subchannel_locked(
exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
}
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
exec_ctx, op,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
/* early out */
return;
}
}
/* if we don't have a subchannel, try to get one */
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
calld->connected_subchannel == NULL && op->send_initial_metadata) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
grpc_combiner_scheduler(chand->combiner, true));
@ -967,10 +969,11 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step,
GRPC_ERROR_NONE)) {
if (pick_subchannel_locked(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} else {
@ -1007,7 +1010,7 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
grpc_transport_stream_op *op = arg;
grpc_call_element *elem = op->handler_private.args[0];
grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
start_transport_stream_op_locked_inner(exec_ctx, op, elem);
@ -1050,7 +1053,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
/* we failed; lock and figure out what to do */
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
op->handler_private.args[0] = elem;
op->handler_private.extra_arg = elem;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,

@ -190,10 +190,13 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
/* substitute our callback for the higher callback */
calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
calld->ops_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->on_initial_md_ready;
}
grpc_call_next_op(exec_ctx, elem, op);

@ -1429,7 +1429,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
grpc_chttp2_transport *t = op->transport_private.extra_arg;
grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
if (op->on_connectivity_state_change != NULL) {
@ -1475,10 +1475,10 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
op->transport_private.extra_arg = gt;
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_closure_sched(
exec_ctx, grpc_closure_init(&op->transport_private.closure,
exec_ctx, grpc_closure_init(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner, false)),
GRPC_ERROR_NONE);

@ -110,7 +110,7 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from,
typedef struct {
void *extra_arg;
grpc_closure closure;
} grpc_transport_private_op_data;
} grpc_handler_private_op_data;
typedef struct grpc_transport_stream_op_payload
grpc_transport_stream_op_payload;
@ -159,7 +159,7 @@ typedef struct grpc_transport_stream_op {
* remaining fields are initialized and used at the discretion of the
* current handler of the op */
grpc_transport_private_op_data handler_private;
grpc_handler_private_op_data handler_private;
} grpc_transport_stream_op;
struct grpc_transport_stream_op_payload {
@ -248,7 +248,7 @@ typedef struct grpc_transport_op {
* remaining fields are initialized and used at the discretion of the
* transport implementation */
grpc_transport_private_op_data transport_private;
grpc_handler_private_op_data handler_private;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this

Loading…
Cancel
Save