|
|
|
@ -34,6 +34,7 @@ |
|
|
|
|
#include "src/core/channel/client_channel.h" |
|
|
|
|
|
|
|
|
|
#include <stdio.h> |
|
|
|
|
#include <string.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/channel/channel_args.h" |
|
|
|
|
#include "src/core/channel/connected_channel.h" |
|
|
|
@ -75,6 +76,7 @@ typedef enum { |
|
|
|
|
CALL_CREATED, |
|
|
|
|
CALL_WAITING_FOR_CONFIG, |
|
|
|
|
CALL_WAITING_FOR_PICK, |
|
|
|
|
CALL_WAITING_FOR_CALL, |
|
|
|
|
CALL_ACTIVE, |
|
|
|
|
CALL_CANCELLED |
|
|
|
|
} call_state; |
|
|
|
@ -87,17 +89,13 @@ struct call_data { |
|
|
|
|
|
|
|
|
|
call_state state; |
|
|
|
|
gpr_timespec deadline; |
|
|
|
|
union { |
|
|
|
|
struct { |
|
|
|
|
/* our child call stack */ |
|
|
|
|
grpc_subchannel_call *subchannel_call; |
|
|
|
|
} active; |
|
|
|
|
grpc_transport_stream_op waiting_op; |
|
|
|
|
struct { |
|
|
|
|
grpc_linked_mdelem status; |
|
|
|
|
grpc_linked_mdelem details; |
|
|
|
|
} cancelled; |
|
|
|
|
} s; |
|
|
|
|
grpc_subchannel *picked_channel; |
|
|
|
|
grpc_iomgr_closure async_setup_task; |
|
|
|
|
grpc_transport_stream_op waiting_op; |
|
|
|
|
/* our child call stack */ |
|
|
|
|
grpc_subchannel_call *subchannel_call; |
|
|
|
|
grpc_linked_mdelem status; |
|
|
|
|
grpc_linked_mdelem details; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
#if 0 |
|
|
|
@ -110,9 +108,9 @@ static int prepare_activate(grpc_call_element *elem, |
|
|
|
|
/* no more access to calld->s.waiting allowed */ |
|
|
|
|
GPR_ASSERT(calld->state == CALL_WAITING); |
|
|
|
|
|
|
|
|
|
if (calld->s.waiting_op.bind_pollset) { |
|
|
|
|
if (calld->waiting_op.bind_pollset) { |
|
|
|
|
grpc_transport_setup_del_interested_party(chand->transport_setup, |
|
|
|
|
calld->s.waiting_op.bind_pollset); |
|
|
|
|
calld->waiting_op.bind_pollset); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
calld->state = CALL_ACTIVE; |
|
|
|
@ -143,7 +141,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { |
|
|
|
|
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { |
|
|
|
|
if (chand->waiting_children[i] == calld) { |
|
|
|
|
grpc_transport_setup_del_interested_party( |
|
|
|
|
chand->transport_setup, calld->s.waiting_op.bind_pollset); |
|
|
|
|
chand->transport_setup, calld->waiting_op.bind_pollset); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
chand->waiting_children[new_count++] = chand->waiting_children[i]; |
|
|
|
@ -166,15 +164,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem, |
|
|
|
|
char status[GPR_LTOA_MIN_BUFSIZE]; |
|
|
|
|
grpc_metadata_batch mdb; |
|
|
|
|
gpr_ltoa(GRPC_STATUS_CANCELLED, status); |
|
|
|
|
calld->s.cancelled.status.md = |
|
|
|
|
calld->status.md = |
|
|
|
|
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); |
|
|
|
|
calld->s.cancelled.details.md = |
|
|
|
|
calld->details.md = |
|
|
|
|
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); |
|
|
|
|
calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; |
|
|
|
|
calld->s.cancelled.status.next = &calld->s.cancelled.details; |
|
|
|
|
calld->s.cancelled.details.prev = &calld->s.cancelled.status; |
|
|
|
|
mdb.list.head = &calld->s.cancelled.status; |
|
|
|
|
mdb.list.tail = &calld->s.cancelled.details; |
|
|
|
|
calld->status.prev = calld->details.next = NULL; |
|
|
|
|
calld->status.next = &calld->details; |
|
|
|
|
calld->details.prev = &calld->status; |
|
|
|
|
mdb.list.head = &calld->status; |
|
|
|
|
mdb.list.tail = &calld->details; |
|
|
|
|
mdb.garbage.head = mdb.garbage.tail = NULL; |
|
|
|
|
mdb.deadline = gpr_inf_future; |
|
|
|
|
grpc_sopb_add_metadata(op->recv_ops, mdb); |
|
|
|
@ -186,16 +184,111 @@ static void handle_op_after_cancellation(grpc_call_element *elem, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) { |
|
|
|
|
abort(); |
|
|
|
|
typedef struct { |
|
|
|
|
grpc_iomgr_closure closure; |
|
|
|
|
grpc_call_element *elem; |
|
|
|
|
} waiting_call; |
|
|
|
|
|
|
|
|
|
static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation); |
|
|
|
|
|
|
|
|
|
static void continue_with_pick(void *arg, int iomgr_success) { |
|
|
|
|
waiting_call *wc = arg; |
|
|
|
|
call_data *calld = wc->elem->call_data; |
|
|
|
|
perform_transport_stream_op(wc->elem, &calld->waiting_op, 1); |
|
|
|
|
gpr_free(wc); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
waiting_call *wc = gpr_malloc(sizeof(*wc)); |
|
|
|
|
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); |
|
|
|
|
wc->elem = elem; |
|
|
|
|
wc->closure.next = chand->waiting_for_config_closures; |
|
|
|
|
chand->waiting_for_config_closures = &wc->closure; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int is_empty(void *p, int len) { |
|
|
|
|
char *ptr = p; |
|
|
|
|
int i; |
|
|
|
|
for (i = 0; i < len; i++) { |
|
|
|
|
if (ptr[i] != 0) return 0; |
|
|
|
|
} |
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void started_call(void *arg, int iomgr_success) { |
|
|
|
|
call_data *calld = arg; |
|
|
|
|
grpc_transport_stream_op op; |
|
|
|
|
int have_waiting; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.cancel_with_status = GRPC_STATUS_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_subchannel_call_process_op(calld->subchannel_call, &op); |
|
|
|
|
} else if (calld->state == CALL_WAITING_FOR_CALL) { |
|
|
|
|
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); |
|
|
|
|
if (calld->subchannel_call != NULL) { |
|
|
|
|
calld->state = CALL_ACTIVE; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (have_waiting) { |
|
|
|
|
grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (have_waiting) { |
|
|
|
|
handle_op_after_cancellation(calld->elem, &calld->waiting_op); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(calld->state == CALL_CANCELLED); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void picked_target(void *arg, int iomgr_success) { |
|
|
|
|
call_data *calld = arg; |
|
|
|
|
channel_data *chand = calld->elem->channel_data; |
|
|
|
|
grpc_transport_stream_op op; |
|
|
|
|
|
|
|
|
|
if (calld->picked_channel == NULL) { |
|
|
|
|
/* treat this like a cancellation */ |
|
|
|
|
calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; |
|
|
|
|
perform_transport_stream_op(calld->elem, &calld->waiting_op, 1); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
if (calld->state == CALL_CANCELLED) { |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
handle_op_after_cancellation(calld->elem, &calld->waiting_op); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); |
|
|
|
|
calld->state = CALL_WAITING_FOR_CALL; |
|
|
|
|
op = calld->waiting_op; |
|
|
|
|
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); |
|
|
|
|
grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { |
|
|
|
|
abort(); |
|
|
|
|
grpc_metadata_batch *initial_metadata; |
|
|
|
|
grpc_transport_stream_op *op = &calld->waiting_op; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(op->bind_pollset); |
|
|
|
|
GPR_ASSERT(op->send_ops); |
|
|
|
|
GPR_ASSERT(op->send_ops->nops >= 1); |
|
|
|
|
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); |
|
|
|
|
initial_metadata = &op->send_ops->ops[0].data.metadata; |
|
|
|
|
|
|
|
|
|
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); |
|
|
|
|
grpc_lb_policy_pick(lb_policy, op->bind_pollset,
|
|
|
|
|
initial_metadata, &calld->picked_channel, &calld->async_setup_task); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_start_transport_stream_op(grpc_call_element *elem, |
|
|
|
|
grpc_transport_stream_op *op) { |
|
|
|
|
static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
grpc_subchannel_call *subchannel_call; |
|
|
|
@ -206,7 +299,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
switch (calld->state) { |
|
|
|
|
case CALL_ACTIVE: |
|
|
|
|
subchannel_call = calld->s.active.subchannel_call; |
|
|
|
|
GPR_ASSERT(!continuation); |
|
|
|
|
subchannel_call = calld->subchannel_call; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_subchannel_call_process_op(subchannel_call, op); |
|
|
|
|
break; |
|
|
|
@ -214,13 +308,44 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
break; |
|
|
|
|
case CALL_WAITING_FOR_CONFIG: |
|
|
|
|
case CALL_WAITING_FOR_PICK: |
|
|
|
|
case CALL_WAITING_FOR_CALL: |
|
|
|
|
if (!continuation) { |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT((calld->waiting_op.send_ops == NULL) != |
|
|
|
|
(op->send_ops == NULL)); |
|
|
|
|
GPR_ASSERT((calld->waiting_op.recv_ops == NULL) != |
|
|
|
|
(op->recv_ops == NULL)); |
|
|
|
|
if (op->send_ops != NULL) { |
|
|
|
|
calld->waiting_op.send_ops = op->send_ops; |
|
|
|
|
calld->waiting_op.is_last_send = op->is_last_send; |
|
|
|
|
calld->waiting_op.on_done_send = op->on_done_send; |
|
|
|
|
} |
|
|
|
|
if (op->recv_ops != NULL) { |
|
|
|
|
calld->waiting_op.recv_ops = op->recv_ops; |
|
|
|
|
calld->waiting_op.recv_state = op->recv_state; |
|
|
|
|
calld->waiting_op.on_done_recv = op->on_done_recv; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (op->on_consumed != NULL) { |
|
|
|
|
op->on_consumed->cb(op->on_consumed->cb_arg, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
/* fall through */ |
|
|
|
|
case CALL_CREATED: |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
} else { |
|
|
|
|
calld->s.waiting_op = *op; |
|
|
|
|
calld->waiting_op = *op; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&chand->mu_config); |
|
|
|
|
lb_policy = chand->lb_policy; |
|
|
|
@ -235,141 +360,22 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, |
|
|
|
|
grpc_lb_policy_unref(lb_policy); |
|
|
|
|
} else { |
|
|
|
|
calld->state = CALL_WAITING_FOR_CONFIG; |
|
|
|
|
add_to_lb_policy_wait_queue_locked_state_config(chand, calld); |
|
|
|
|
add_to_lb_policy_wait_queue_locked_state_config(elem); |
|
|
|
|
gpr_mu_unlock(&chand->mu_config); |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case CALL_WAITING_FOR_CONFIG: |
|
|
|
|
case CALL_WAITING_FOR_PICK: |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != |
|
|
|
|
(op->send_ops == NULL)); |
|
|
|
|
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != |
|
|
|
|
(op->recv_ops == NULL)); |
|
|
|
|
if (op->send_ops != NULL) { |
|
|
|
|
calld->s.waiting_op.send_ops = op->send_ops; |
|
|
|
|
calld->s.waiting_op.is_last_send = op->is_last_send; |
|
|
|
|
calld->s.waiting_op.on_done_send = op->on_done_send; |
|
|
|
|
} |
|
|
|
|
if (op->recv_ops != NULL) { |
|
|
|
|
calld->s.waiting_op.recv_ops = op->recv_ops; |
|
|
|
|
calld->s.waiting_op.recv_state = op->recv_state; |
|
|
|
|
calld->s.waiting_op.on_done_recv = op->on_done_recv; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (op->on_consumed != NULL) { |
|
|
|
|
op->on_consumed->cb(op->on_consumed->cb_arg, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#if 0 |
|
|
|
|
gpr_mu_lock(&chand->mu); |
|
|
|
|
switch (calld->state) { |
|
|
|
|
case CALL_ACTIVE: |
|
|
|
|
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
child_elem->filter->start_transport_op(child_elem, op); |
|
|
|
|
break; |
|
|
|
|
case CALL_CREATED: |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
} else { |
|
|
|
|
calld->state = CALL_WAITING; |
|
|
|
|
calld->s.waiting_op.bind_pollset = NULL; |
|
|
|
|
if (chand->active_child) { |
|
|
|
|
/* channel is connected - use the connected stack */ |
|
|
|
|
if (prepare_activate(elem, chand->active_child)) { |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
/* activate the request (pass it down) outside the lock */ |
|
|
|
|
complete_activate(elem, op); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
/* check to see if we should initiate a connection (if we're not
|
|
|
|
|
already), |
|
|
|
|
but don't do so until outside the lock to avoid re-entrancy |
|
|
|
|
problems if |
|
|
|
|
the callback is immediate */ |
|
|
|
|
int initiate_transport_setup = 0; |
|
|
|
|
if (!chand->transport_setup_initiated) { |
|
|
|
|
chand->transport_setup_initiated = 1; |
|
|
|
|
initiate_transport_setup = 1; |
|
|
|
|
} |
|
|
|
|
/* add this call to the waiting set to be resumed once we have a child
|
|
|
|
|
channel stack, growing the waiting set if needed */ |
|
|
|
|
if (chand->waiting_child_count == chand->waiting_child_capacity) { |
|
|
|
|
chand->waiting_child_capacity = |
|
|
|
|
GPR_MAX(chand->waiting_child_capacity * 2, 8); |
|
|
|
|
chand->waiting_children = gpr_realloc( |
|
|
|
|
chand->waiting_children, |
|
|
|
|
chand->waiting_child_capacity * sizeof(call_data *)); |
|
|
|
|
} |
|
|
|
|
calld->s.waiting_op = *op; |
|
|
|
|
chand->waiting_children[chand->waiting_child_count++] = calld; |
|
|
|
|
grpc_transport_setup_add_interested_party(chand->transport_setup, |
|
|
|
|
op->bind_pollset); |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
|
|
|
|
|
/* finally initiate transport setup if needed */ |
|
|
|
|
if (initiate_transport_setup) { |
|
|
|
|
grpc_transport_setup_initiate(chand->transport_setup); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case CALL_WAITING: |
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
|
waiting_op = calld->s.waiting_op; |
|
|
|
|
remove_waiting_child(chand, calld); |
|
|
|
|
calld->state = CALL_CANCELLED; |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
handle_op_after_cancellation(elem, &waiting_op); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != |
|
|
|
|
(op->send_ops == NULL)); |
|
|
|
|
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != |
|
|
|
|
(op->recv_ops == NULL)); |
|
|
|
|
if (op->send_ops) { |
|
|
|
|
calld->s.waiting_op.send_ops = op->send_ops; |
|
|
|
|
calld->s.waiting_op.is_last_send = op->is_last_send; |
|
|
|
|
calld->s.waiting_op.on_done_send = op->on_done_send; |
|
|
|
|
} |
|
|
|
|
if (op->recv_ops) { |
|
|
|
|
calld->s.waiting_op.recv_ops = op->recv_ops; |
|
|
|
|
calld->s.waiting_op.recv_state = op->recv_state; |
|
|
|
|
calld->s.waiting_op.on_done_recv = op->on_done_recv; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
if (op->on_consumed) { |
|
|
|
|
op->on_consumed->cb(op->on_consumed->cb_arg, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case CALL_CANCELLED: |
|
|
|
|
gpr_mu_unlock(&chand->mu); |
|
|
|
|
handle_op_after_cancellation(elem, op); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
static void cc_start_transport_stream_op(grpc_call_element *elem, |
|
|
|
|
grpc_transport_stream_op *op) { |
|
|
|
|
perform_transport_stream_op(elem, op, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void update_state_locked(channel_data *chand) { |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, "update_state_locked not implemented"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_on_config_changed(void *arg, int iomgr_success) { |
|
|
|
@ -382,9 +388,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { |
|
|
|
|
if (chand->incoming_configuration) { |
|
|
|
|
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); |
|
|
|
|
grpc_lb_policy_ref(lb_policy); |
|
|
|
|
|
|
|
|
|
grpc_client_config_unref(chand->incoming_configuration); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_client_config_unref(chand->incoming_configuration); |
|
|
|
|
chand->incoming_configuration = NULL; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&chand->mu_config); |
|
|
|
@ -402,7 +409,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { |
|
|
|
|
wakeup_closures = next; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_lb_policy_unref(old_lb_policy); |
|
|
|
|
if (old_lb_policy) { |
|
|
|
|
grpc_lb_policy_unref(old_lb_policy); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (iomgr_success) { |
|
|
|
|
grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); |
|
|
|
@ -511,6 +520,7 @@ static void init_call_elem(grpc_call_element *elem, |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); |
|
|
|
|
GPR_ASSERT(server_transport_data == NULL); |
|
|
|
|
gpr_mu_init(&calld->mu_state); |
|
|
|
|
calld->elem = elem; |
|
|
|
|
calld->state = CALL_CREATED; |
|
|
|
|
calld->deadline = gpr_inf_future; |
|
|
|
@ -527,7 +537,7 @@ static void destroy_call_elem(grpc_call_element *elem) { |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
switch (calld->state) { |
|
|
|
|
case CALL_ACTIVE: |
|
|
|
|
subchannel_call = calld->s.active.subchannel_call; |
|
|
|
|
subchannel_call = calld->subchannel_call; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_subchannel_call_unref(subchannel_call); |
|
|
|
|
break; |
|
|
|
@ -537,6 +547,7 @@ static void destroy_call_elem(grpc_call_element *elem) { |
|
|
|
|
break; |
|
|
|
|
case CALL_WAITING_FOR_PICK: |
|
|
|
|
case CALL_WAITING_FOR_CONFIG: |
|
|
|
|
case CALL_WAITING_FOR_CALL: |
|
|
|
|
gpr_log(GPR_ERROR, "should never reach here"); |
|
|
|
|
abort(); |
|
|
|
|
break; |
|
|
|
@ -550,12 +561,12 @@ static void init_channel_elem(grpc_channel_element *elem, |
|
|
|
|
int is_last) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!is_first); |
|
|
|
|
memset(chand, 0, sizeof(*chand)); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(is_last); |
|
|
|
|
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&chand->mu_config); |
|
|
|
|
chand->resolver = NULL; |
|
|
|
|
chand->mdctx = metadata_context; |
|
|
|
|
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); |
|
|
|
|
} |
|
|
|
@ -633,7 +644,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( |
|
|
|
|
call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < waiting_child_count; i++) { |
|
|
|
|
call_ops[i] = waiting_children[i]->s.waiting_op; |
|
|
|
|
call_ops[i] = waiting_children[i]->waiting_op; |
|
|
|
|
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { |
|
|
|
|
waiting_children[i] = NULL; |
|
|
|
|
grpc_transport_stream_op_finish_with_failure(&call_ops[i]); |
|
|
|
|