stream_op cleanup: channel, client_config changes

reviewable/pr3993/r1
Craig Tiller 9 years ago
parent 0581d129f5
commit 577c9b2f11
  1. 69
      src/core/census/grpc_filter.c
  2. 50
      src/core/channel/channel_stack.c
  3. 57
      src/core/channel/channel_stack.h
  4. 494
      src/core/channel/client_channel.c
  5. 355
      src/core/channel/client_uchannel.c
  6. 293
      src/core/channel/compress_filter.c
  7. 29
      src/core/channel/connected_channel.c
  8. 2
      src/core/channel/connected_channel.h
  9. 113
      src/core/channel/http_client_filter.c
  10. 148
      src/core/channel/http_server_filter.c
  11. 22
      src/core/channel/noop_filter.c
  12. 282
      src/core/channel/subchannel_call_holder.c
  13. 83
      src/core/channel/subchannel_call_holder.h
  14. 37
      src/core/client_config/lb_policies/pick_first.c
  15. 40
      src/core/client_config/lb_policies/round_robin.c
  16. 17
      src/core/client_config/lb_policy.c
  17. 19
      src/core/client_config/lb_policy.h
  18. 133
      src/core/client_config/subchannel.c
  19. 23
      src/core/client_config/subchannel.h

@ -53,28 +53,24 @@ typedef struct call_data {
int error; int error;
/* recv callback */ /* recv callback */
grpc_stream_op_buffer *recv_ops; grpc_metadata_batch *recv_initial_metadata;
grpc_closure *on_done_recv; grpc_closure *on_done_recv;
grpc_closure finish_recv;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */ grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */
} channel_data; } channel_data;
static void extract_and_annotate_method_tag(grpc_stream_op_buffer *sopb, static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
call_data *calld, call_data *calld,
channel_data *chand) { channel_data *chand) {
grpc_linked_mdelem *m; grpc_linked_mdelem *m;
size_t i; for (m = md->list.head; m != NULL; m = m->next) {
for (i = 0; i < sopb->nops; i++) { if (m->md->key == chand->path_str) {
grpc_stream_op *op = &sopb->ops[i]; gpr_log(GPR_DEBUG, "%s",
if (op->type != GRPC_OP_METADATA) continue; (const char *)GPR_SLICE_START_PTR(m->md->value->slice));
for (m = op->data.metadata.list.head; m != NULL; m = m->next) { /* Add method tag here */
if (m->md->key == chand->path_str) {
gpr_log(GPR_DEBUG, "%s",
(const char *)GPR_SLICE_START_PTR(m->md->value->slice));
/* Add method tag here */
}
} }
} }
} }
@ -83,8 +79,8 @@ static void client_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
if (op->send_ops) { if (op->send_initial_metadata) {
extract_and_annotate_method_tag(op->send_ops, calld, chand); extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand);
} }
} }
@ -101,7 +97,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
if (success) { if (success) {
extract_and_annotate_method_tag(calld->recv_ops, calld, chand); extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
} }
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
} }
@ -109,11 +105,11 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
static void server_mutate_op(grpc_call_element *elem, static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (op->recv_ops) { if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */ /* substitute our callback for the op callback */
calld->recv_ops = op->recv_ops; calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_done_recv; calld->on_done_recv = op->on_complete;
op->on_done_recv = calld->on_done_recv; op->on_complete = &calld->finish_recv;
} }
} }
@ -128,13 +124,11 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
static void client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d)); memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME); d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
if (initial_op) client_mutate_op(elem, initial_op);
} }
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@ -146,15 +140,13 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
static void server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d)); memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME); d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */ /* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(d->on_done_recv, server_on_done_recv, elem); grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
if (initial_op) server_mutate_op(elem, initial_op);
} }
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@ -165,12 +157,11 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
} }
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_channel_element_args *args) {
int is_first, int is_last) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
GPR_ASSERT(chand != NULL); GPR_ASSERT(chand != NULL);
chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); chand->path_str = grpc_mdstr_from_string(args->metadata_context, ":path");
} }
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
@ -183,15 +174,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
} }
const grpc_channel_filter grpc_client_census_filter = { const grpc_channel_filter grpc_client_census_filter = {
client_start_transport_op, grpc_channel_next_op, client_start_transport_op, grpc_channel_next_op, sizeof(call_data),
sizeof(call_data), client_init_call_elem, client_init_call_elem, grpc_call_stack_ignore_set_pollset,
client_destroy_call_elem, sizeof(channel_data), client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
init_channel_elem, destroy_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, "census-client"};
grpc_call_next_get_peer, "census-client"};
const grpc_channel_filter grpc_server_census_filter = { const grpc_channel_filter grpc_server_census_filter = {
server_start_transport_op, grpc_channel_next_op, server_start_transport_op, grpc_channel_next_op, sizeof(call_data),
sizeof(call_data), server_init_call_elem, server_init_call_elem, grpc_call_stack_ignore_set_pollset,
server_destroy_call_elem, sizeof(channel_data), server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
init_channel_elem, destroy_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, "census-server"};
grpc_call_next_get_peer, "census-server"};

@ -104,13 +104,14 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter **filters, const grpc_channel_filter **filters,
size_t filter_count, grpc_channel *master, size_t filter_count, grpc_channel *master,
const grpc_channel_args *args, const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context, grpc_mdctx *metadata_context,
grpc_channel_stack *stack) { grpc_channel_stack *stack) {
size_t call_size = size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
grpc_channel_element *elems; grpc_channel_element *elems;
grpc_channel_element_args args;
char *user_data; char *user_data;
size_t i; size_t i;
@ -122,11 +123,14 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
/* init per-filter data */ /* init per-filter data */
for (i = 0; i < filter_count; i++) { for (i = 0; i < filter_count; i++) {
args.master = master;
args.channel_args = channel_args;
args.metadata_context = metadata_context;
args.is_first = i == 0;
args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i]; elems[i].filter = filters[i];
elems[i].channel_data = user_data; elems[i].channel_data = user_data;
elems[i].filter->init_channel_elem(exec_ctx, &elems[i], master, args, elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
metadata_context, i == 0,
i == (filter_count - 1));
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
} }
@ -151,33 +155,63 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
} }
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data, const void *transport_server_data,
grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack) { grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count; size_t count = channel_stack->count;
grpc_call_element *call_elems; grpc_call_element *call_elems;
char *user_data; char *user_data;
size_t i; size_t i;
call_stack->count = count; call_stack->count = count;
gpr_ref_init(&call_stack->refcount.refs, initial_refs);
grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg);
call_elems = CALL_ELEMS_FROM_STACK(call_stack); call_elems = CALL_ELEMS_FROM_STACK(call_stack);
user_data = ((char *)call_elems) + user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */ /* init per-filter data */
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
args.refcount = &call_stack->refcount;
args.server_transport_data = transport_server_data;
args.context = context;
call_elems[i].filter = channel_elems[i].filter; call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data; call_elems[i].call_data = user_data;
call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
transport_server_data, initial_op);
user_data += user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
} }
} }
void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_stack *call_stack,
grpc_pollset *pollset) {
size_t count = call_stack->count;
grpc_call_element *call_elems;
char *user_data;
size_t i;
call_elems = CALL_ELEMS_FROM_STACK(call_stack);
user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
for (i = 0; i < count; i++) {
call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
}
void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count; size_t count = stack->count;

@ -51,6 +51,20 @@
typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element; typedef struct grpc_call_element grpc_call_element;
typedef struct {
grpc_channel *master;
const grpc_channel_args *channel_args;
grpc_mdctx *metadata_context;
int is_first;
int is_last;
} grpc_channel_element_args;
typedef struct {
grpc_stream_refcount *refcount;
const void *server_transport_data;
grpc_call_context_element *context;
} grpc_call_element_args;
/* Channel filters specify: /* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX 1. the amount of memory needed in the channel & call (via the sizeof_XXX
members) members)
@ -84,8 +98,9 @@ typedef struct {
transport and is on the server. Most filters want to ignore this transport and is on the server. Most filters want to ignore this
argument. */ argument. */
void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args);
grpc_transport_stream_op *initial_op); void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset);
/* Destroy per call data. /* Destroy per call data.
The filter does not need to do any chaining */ The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
@ -99,9 +114,7 @@ typedef struct {
useful for asserting correct configuration by upper layer code. useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */ The filter does not need to do any chaining */
void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_channel *master, const grpc_channel_args *args, grpc_channel_element_args *args);
grpc_mdctx *metadata_context, int is_first,
int is_last);
/* Destroy per channel data. /* Destroy per channel data.
The filter does not need to do any chaining */ The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
@ -141,7 +154,14 @@ typedef struct {
/* A call stack tracks a set of related filters for one call, and guarantees /* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */ they live within a single malloc() allocation */
typedef struct { size_t count; } grpc_call_stack; typedef struct {
/* shared refcount for this channel stack.
MUST be the first element: the underlying code calls destroy
with the address of the refcount, but higher layers prefer to think
about the address of the call stack itself. */
grpc_stream_refcount refcount;
size_t count;
} grpc_call_stack;
/* Get a channel element given a channel stack and its index */ /* Get a channel element given a channel stack and its index */
grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
@ -170,13 +190,34 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
expected to be NULL on a client, or an opaque transport owned pointer on the expected to be NULL on a client, or an opaque transport owned pointer on the
server. */ server. */
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data, const void *transport_server_data,
grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack); grpc_call_stack *call_stack);
/* Set a pollset for a call stack: must occur before the first op is started */
void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_stack *call_stack,
grpc_pollset *pollset);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define grpc_call_stack_ref(call_stack, reason) \
grpc_stream_ref(&(call_stack)->refcount, reason)
#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason)
#else
#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount)
#define grpc_call_stack_unref(exec_ctx, call_stack) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount)
#endif
/* Destroy a call stack */ /* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
/* Ignore set pollset */
void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset);
/* Call the next operation in a call stack */ /* Call the next operation in a call stack */
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op); grpc_transport_stream_op *op);

@ -43,6 +43,7 @@
#include "src/core/channel/channel_args.h" #include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h" #include "src/core/channel/connected_channel.h"
#include "src/core/channel/subchannel_call_holder.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
@ -51,7 +52,7 @@
/* Client channel implementation */ /* Client channel implementation */
typedef struct call_data call_data; typedef grpc_subchannel_call_holder call_data;
typedef struct client_channel_channel_data { typedef struct client_channel_channel_data {
/** metadata context for this channel */ /** metadata context for this channel */
@ -98,360 +99,22 @@ typedef struct {
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher; } lb_policy_connectivity_watcher;
typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CONFIG,
CALL_WAITING_FOR_PICK,
CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
struct call_data {
/* owning element */
grpc_call_element *elem;
gpr_mu mu_state;
call_state state;
gpr_timespec deadline;
grpc_subchannel *picked_channel;
grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
};
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op)
GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
calld->status.prev = calld->details.next = NULL;
calld->status.next = &calld->details;
calld->details.prev = &calld->status;
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
}
}
typedef struct { typedef struct {
grpc_closure closure; grpc_closure closure;
grpc_call_element *elem; grpc_call_element *elem;
} waiting_call; } waiting_call;
static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation);
static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
waiting_call *wc = arg;
call_data *calld = wc->elem->call_data;
perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
gpr_free(wc);
}
static void add_to_lb_policy_wait_queue_locked_state_config(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
}
static int is_empty(void *p, int len) {
char *ptr = p;
int i;
for (i = 0; i < len; i++) {
if (ptr[i] != 0) return 0;
}
return 1;
}
static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
} else if (calld->state == CALL_WAITING_FOR_CALL) {
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
if (calld->subchannel_call != NULL) {
calld->state = CALL_ACTIVE;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
&calld->waiting_op);
}
} else {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
}
}
} else {
GPR_ASSERT(calld->state == CALL_CANCELLED);
gpr_mu_unlock(&calld->mu_state);
}
}
static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
gpr_mu_lock(&calld->mu_state);
started_call_locked(exec_ctx, arg, iomgr_success);
}
static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
grpc_subchannel_call_create_status call_creation_status;
GPR_TIMER_BEGIN("picked_target", 0);
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
} else {
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
} else {
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
grpc_closure_init(&calld->async_setup_task, started_call, calld);
call_creation_status = grpc_subchannel_create_call(
exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
&calld->async_setup_task);
if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
started_call_locked(exec_ctx, calld, iomgr_success);
} else {
gpr_mu_unlock(&calld->mu_state);
}
}
}
GPR_TIMER_END("picked_target", 0);
}
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
if (new_op->send_ops != NULL) {
waiting_op->send_ops = new_op->send_ops;
waiting_op->is_last_send = new_op->is_last_send;
waiting_op->on_done_send = new_op->on_done_send;
}
if (new_op->recv_ops != NULL) {
waiting_op->recv_ops = new_op->recv_ops;
waiting_op->recv_state = new_op->recv_state;
waiting_op->on_done_recv = new_op->on_done_recv;
}
if (new_op->on_consumed != NULL) {
if (waiting_op->on_consumed != NULL) {
consumed_op = waiting_op->on_consumed;
}
waiting_op->on_consumed = new_op->on_consumed;
}
if (new_op->cancel_with_status != GRPC_STATUS_OK) {
waiting_op->cancel_with_status = new_op->cancel_with_status;
}
return consumed_op;
}
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call; return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
char *result; chand->master);
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_ACTIVE) {
subchannel_call = calld->subchannel_call;
GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
gpr_mu_unlock(&calld->mu_state);
result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
return result;
} else {
gpr_mu_unlock(&calld->mu_state);
return grpc_channel_get_target(chand->master);
}
}
static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
GPR_TIMER_BEGIN("perform_transport_stream_op", 0);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
GPR_ASSERT(!continuation);
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
break;
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
break;
}
*op = calld->waiting_op;
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
continuation = 1;
/* fall through */
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CALL:
if (!continuation) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
op2 = calld->waiting_op;
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
if (op->on_consumed) {
calld->waiting_op.on_consumed = op->on_consumed;
op->on_consumed = NULL;
} else if (op2.on_consumed) {
calld->waiting_op.on_consumed = op2.on_consumed;
op2.on_consumed = NULL;
}
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
handle_op_after_cancellation(exec_ctx, elem, &op2);
} else {
grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
}
/* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
} else {
calld->waiting_op = *op;
if (op->send_ops == NULL) {
/* need to have some send ops before we can select the
lb target */
calld->state = CALL_WAITING_FOR_SEND;
gpr_mu_unlock(&calld->mu_state);
} else {
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy) {
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
grpc_pollset *bind_pollset = waiting_op->bind_pollset;
grpc_metadata_batch *initial_metadata =
&waiting_op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
GPR_ASSERT(waiting_op->bind_pollset);
GPR_ASSERT(waiting_op->send_ops);
GPR_ASSERT(waiting_op->send_ops->nops >= 1);
GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
initial_metadata, &calld->picked_channel,
&calld->async_setup_task);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
} else if (chand->resolver != NULL) {
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(elem);
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
chand->started_resolving = 1;
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
&chand->on_config_changed);
}
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
} else {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
}
}
}
break;
}
GPR_TIMER_END("perform_transport_stream_op", 0);
} }
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
perform_transport_stream_op(exec_ctx, elem, op, 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
} }
static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
@ -593,11 +256,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->connectivity_state = NULL; op->connectivity_state = NULL;
} }
if (!is_empty(op, sizeof(*op))) { lb_policy = chand->lb_policy;
lb_policy = chand->lb_policy; if (lb_policy) {
if (lb_policy) { GRPC_LB_POLICY_REF(lb_policy, "broadcast");
GRPC_LB_POLICY_REF(lb_policy, "broadcast");
}
} }
if (op->disconnect && chand->resolver != NULL) { if (op->disconnect && chand->resolver != NULL) {
@ -624,67 +285,110 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
} }
} }
/* Constructor for call_data */ typedef struct {
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata;
const void *server_transport_data, grpc_subchannel **subchannel;
grpc_transport_stream_op *initial_op) { grpc_closure *on_ready;
grpc_call_element *elem;
grpc_closure closure;
} continue_picking_args;
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel,
grpc_closure *on_ready);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
continue_picking_args *cpa = arg;
if (!success) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
} else if (cpa->subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
}
gpr_free(cpa);
}
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
continue_picking_args *cpa;
grpc_closure *closure;
/* TODO(ctiller): is there something useful we can do here? */ GPR_ASSERT(subchannel);
GPR_ASSERT(initial_op == NULL);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_lock(&chand->mu_config);
GPR_ASSERT(server_transport_data == NULL); if (initial_metadata == NULL) {
gpr_mu_init(&calld->mu_state); if (chand->lb_policy != NULL) {
calld->elem = elem; grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
calld->state = CALL_CREATED; }
calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = grpc_closure_next(closure)) {
cpa = closure->cb_arg;
if (cpa->subchannel == subchannel) {
cpa->subchannel = NULL;
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
}
}
gpr_mu_unlock(&chand->mu_config);
return 1;
}
if (chand->lb_policy != NULL) {
int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
initial_metadata, subchannel, on_ready);
gpr_mu_unlock(&chand->mu_config);
return r;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
&chand->on_config_changed);
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
cpa->subchannel = subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1);
gpr_mu_unlock(&chand->mu_config);
return 0;
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem);
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
call_data *calld = elem->call_data; grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND:
GPR_UNREACHABLE_CODE(return );
}
} }
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_channel_element_args *args) {
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand)); memset(chand, 0, sizeof(*chand));
GPR_ASSERT(is_last); GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config); gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context; chand->mdctx = args->metadata_context;
chand->master = master; chand->master = args->master;
grpc_pollset_set_init(&chand->pollset_set); grpc_pollset_set_init(&chand->pollset_set);
grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
@ -709,10 +413,16 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_config); gpr_mu_destroy(&chand->mu_config);
} }
static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {
call_data *calld = elem->call_data;
calld->pollset = pollset;
}
const grpc_channel_filter grpc_client_channel_filter = { const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
destroy_channel_elem, cc_get_peer, "client-channel", init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
}; };
void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,

@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h" #include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h" #include "src/core/channel/client_channel.h"
#include "src/core/channel/compress_filter.h" #include "src/core/channel/compress_filter.h"
#include "src/core/channel/subchannel_call_holder.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
@ -52,8 +53,6 @@
/** Microchannel (uchannel) implementation: a lightweight channel without any /** Microchannel (uchannel) implementation: a lightweight channel without any
* load-balancing mechanisms meant for communication from within the core. */ * load-balancing mechanisms meant for communication from within the core. */
typedef struct call_data call_data;
typedef struct client_uchannel_channel_data { typedef struct client_uchannel_channel_data {
/** metadata context for this channel */ /** metadata context for this channel */
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
@ -80,85 +79,7 @@ typedef struct client_uchannel_channel_data {
gpr_mu mu_state; gpr_mu mu_state;
} channel_data; } channel_data;
typedef enum { typedef grpc_subchannel_call_holder call_data;
CALL_CREATED,
CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
struct call_data {
/* owning element */
grpc_call_element *elem;
gpr_mu mu_state;
call_state state;
gpr_timespec deadline;
grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
};
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op)
GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
calld->status.prev = calld->details.next = NULL;
calld->status.next = &calld->details;
calld->details.prev = &calld->status;
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
}
}
typedef struct {
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation);
static int is_empty(void *p, int len) {
char *ptr = p;
int i;
for (i = 0; i < len; i++) {
if (ptr[i] != 0) return 0;
}
return 1;
}
static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) { int iomgr_success) {
@ -171,201 +92,17 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
&chand->connectivity_cb); &chand->connectivity_cb);
} }
static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
if (calld->state == CALL_CANCELLED && iomgr_success == 0) {
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
}
} else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
} else if (calld->state == CALL_WAITING_FOR_CALL) {
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
if (calld->subchannel_call != NULL) {
calld->state = CALL_ACTIVE;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
&calld->waiting_op);
}
} else {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
}
}
} else {
GPR_ASSERT(calld->state == CALL_CANCELLED);
gpr_mu_unlock(&calld->mu_state);
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
if (have_waiting) {
handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
}
}
}
static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
gpr_mu_lock(&calld->mu_state);
started_call_locked(exec_ctx, arg, iomgr_success);
}
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
if (new_op->send_ops != NULL) {
waiting_op->send_ops = new_op->send_ops;
waiting_op->is_last_send = new_op->is_last_send;
waiting_op->on_done_send = new_op->on_done_send;
}
if (new_op->recv_ops != NULL) {
waiting_op->recv_ops = new_op->recv_ops;
waiting_op->recv_state = new_op->recv_state;
waiting_op->on_done_recv = new_op->on_done_recv;
}
if (new_op->on_consumed != NULL) {
if (waiting_op->on_consumed != NULL) {
consumed_op = waiting_op->on_consumed;
}
waiting_op->on_consumed = new_op->on_consumed;
}
if (new_op->cancel_with_status != GRPC_STATUS_OK) {
waiting_op->cancel_with_status = new_op->cancel_with_status;
}
return consumed_op;
}
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
char *result;
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_ACTIVE) {
subchannel_call = calld->subchannel_call;
GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
gpr_mu_unlock(&calld->mu_state);
result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
return result;
} else {
gpr_mu_unlock(&calld->mu_state);
return grpc_channel_get_target(chand->master);
}
}
static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call; return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
grpc_transport_stream_op op2; chand->master);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&calld->mu_state);
/* make sure the wrapped subchannel has been set (see
* grpc_client_uchannel_set_subchannel) */
GPR_ASSERT(chand->subchannel != NULL);
switch (calld->state) {
case CALL_ACTIVE:
GPR_ASSERT(!continuation);
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
break;
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
break;
}
*op = calld->waiting_op;
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
continuation = 1;
/* fall through */
case CALL_WAITING_FOR_CALL:
if (!continuation) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
op2 = calld->waiting_op;
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
if (op->on_consumed) {
calld->waiting_op.on_consumed = op->on_consumed;
op->on_consumed = NULL;
} else if (op2.on_consumed) {
calld->waiting_op.on_consumed = op2.on_consumed;
op2.on_consumed = NULL;
}
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
handle_op_after_cancellation(exec_ctx, elem, &op2);
grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1);
} else {
grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
}
/* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(exec_ctx, elem, op);
} else {
calld->waiting_op = *op;
if (op->send_ops == NULL) {
calld->state = CALL_WAITING_FOR_SEND;
gpr_mu_unlock(&calld->mu_state);
} else {
grpc_subchannel_call_create_status call_creation_status;
grpc_pollset *pollset = calld->waiting_op.bind_pollset;
calld->state = CALL_WAITING_FOR_CALL;
grpc_closure_init(&calld->async_setup_task, started_call, calld);
call_creation_status = grpc_subchannel_create_call(
exec_ctx, chand->subchannel, pollset, &calld->subchannel_call,
&calld->async_setup_task);
if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
started_call_locked(exec_ctx, calld, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
}
}
break;
}
} }
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
perform_transport_stream_op(exec_ctx, elem, op, 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
} }
static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
@ -392,64 +129,40 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
} }
} }
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;
GPR_ASSERT(initial_metadata != NULL);
*subchannel = chand->subchannel;
return 1;
}
/* Constructor for call_data */ /* Constructor for call_data */
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
call_data *calld = elem->call_data; elem->channel_data);
memset(calld, 0, sizeof(call_data));
/* TODO(ctiller): is there something useful we can do here? */
GPR_ASSERT(initial_op == NULL);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
GPR_ASSERT(server_transport_data == NULL);
gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
call_data *calld = elem->call_data; grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
break;
case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND:
GPR_UNREACHABLE_CODE(return );
}
} }
/* Constructor for channel_data */ /* Constructor for channel_data */
static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,
grpc_channel *master, grpc_channel_element_args *args) {
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand)); memset(chand, 0, sizeof(*chand));
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
GPR_ASSERT(is_last); GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
chand->mdctx = metadata_context; chand->mdctx = args->metadata_context;
chand->master = master; chand->master = args->master;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel"); "client_uchannel");
gpr_mu_init(&chand->mu_state); gpr_mu_init(&chand->mu_state);
@ -465,17 +178,17 @@ static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_state); gpr_mu_destroy(&chand->mu_state);
} }
static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {
call_data *calld = elem->call_data;
calld->pollset = pollset;
}
const grpc_channel_filter grpc_client_uchannel_filter = { const grpc_channel_filter grpc_client_uchannel_filter = {
cuc_start_transport_stream_op, cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
cuc_start_transport_op, cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
sizeof(call_data), sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
cuc_init_call_elem, cuc_get_peer, "client-uchannel",
cuc_destroy_call_elem,
sizeof(channel_data),
cuc_init_channel_elem,
cuc_destroy_channel_elem,
cuc_get_peer,
"client-uchannel",
}; };
grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(

@ -50,13 +50,20 @@ typedef struct call_data {
grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage; grpc_linked_mdelem accept_encoding_storage;
gpr_uint32 remaining_slice_bytes; gpr_uint32 remaining_slice_bytes;
/**< Input data to be read, as per BEGIN_MESSAGE */
int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming /** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */ * metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm; grpc_compression_algorithm compression_algorithm;
/** If true, contents of \a compression_algorithm are authoritative */ /** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm; int has_compression_algorithm;
grpc_transport_stream_op send_op;
gpr_uint32 send_length;
gpr_uint32 send_flags;
gpr_slice incoming_slice;
grpc_slice_buffer_stream replacement_stream;
grpc_closure *post_send;
grpc_closure send_done;
grpc_closure got_slice;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
@ -76,24 +83,6 @@ typedef struct channel_data {
grpc_compression_options compression_options; grpc_compression_options compression_options;
} channel_data; } channel_data;
/** Compress \a slices in place using \a algorithm. Returns 1 if compression did
* actually happen, 0 otherwise (for example if the compressed output size was
* larger than the raw input).
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
did_compress = grpc_msg_compress(algorithm, slices, &tmp);
if (did_compress) {
gpr_slice_buffer_swap(slices, &tmp);
}
gpr_slice_buffer_destroy(&tmp);
return did_compress;
}
/** For each \a md element from the incoming metadata, filter out the entry for /** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's * "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */ * compression_algorithm field. */
@ -127,7 +116,9 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
return md; return md;
} }
static int skip_compression(channel_data *channeld, call_data *calld) { static int skip_compression(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
if (calld->has_compression_algorithm) { if (calld->has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1; return 1;
@ -138,169 +129,127 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
} }
/** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying /** Filter initial metadata */
* the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length, static void process_send_initial_metadata(
* flags indicating compression is in effect) and replaces \a send_ops with it. grpc_call_element *elem, grpc_metadata_batch *initial_metadata) {
* */
static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
grpc_call_element *elem) {
size_t i;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
int new_slices_added = 0; /* GPR_FALSE */ channel_data *channeld = elem->channel_data;
grpc_metadata_batch metadata; /* Parse incoming request for compression. If any, it'll be available
grpc_stream_op_buffer new_send_ops; * at calld->compression_algorithm */
grpc_sopb_init(&new_send_ops); grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem);
if (!calld->has_compression_algorithm) {
for (i = 0; i < send_ops->nops; i++) { /* If no algorithm was found in the metadata and we aren't
grpc_stream_op *sop = &send_ops->ops[i]; * exceptionally skipping compression, fall back to the channel
switch (sop->type) { * default */
case GRPC_OP_BEGIN_MESSAGE: calld->compression_algorithm = channeld->default_compression_algorithm;
GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); calld->has_compression_algorithm = 1; /* GPR_TRUE */
grpc_sopb_add_begin_message(
&new_send_ops, (gpr_uint32)calld->slices.length,
sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
break;
case GRPC_OP_SLICE:
/* Once we reach the slices section of the original buffer, simply add
* all the new (compressed) slices. We obviously want to do this only
* once, hence the "new_slices_added" guard. */
if (!new_slices_added) {
size_t j;
for (j = 0; j < calld->slices.count; ++j) {
grpc_sopb_add_slice(&new_send_ops,
gpr_slice_ref(calld->slices.slices[j]));
}
new_slices_added = 1; /* GPR_TRUE */
}
break;
case GRPC_OP_METADATA:
/* move the metadata to the new buffer. */
grpc_metadata_batch_move(&metadata, &sop->data.metadata);
grpc_sopb_add_metadata(&new_send_ops, metadata);
break;
case GRPC_NO_OP:
break;
}
} }
grpc_sopb_swap(send_ops, &new_send_ops); /* hint compression algorithm */
grpc_sopb_destroy(&new_send_ops); grpc_metadata_batch_add_tail(
initial_metadata, &calld->compression_algorithm_storage,
GRPC_MDELEM_REF(
channeld
->mdelem_compression_algorithms[calld->compression_algorithm]));
/* convey supported compression algorithms */
grpc_metadata_batch_add_tail(
initial_metadata, &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
} }
/** Filter's "main" function, called for any incoming grpc_transport_stream_op static void continue_send_message(grpc_exec_ctx *exec_ctx,
* instance that holds a non-zero number of send operations, accesible to this grpc_call_element *elem);
* function in \a send_ops. */
static void process_send_ops(grpc_call_element *elem,
grpc_stream_op_buffer *send_ops) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
int did_compress = 0;
/* In streaming calls, we need to reset the previously accumulated slices */ static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_reset_and_unref(&calld->slices); gpr_slice_buffer_reset_and_unref(&calld->slices);
for (i = 0; i < send_ops->nops; ++i) { calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success);
grpc_stream_op *sop = &send_ops->ops[i]; }
switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE:
/* buffer up slices until we've processed all the expected ones (as
* given by GRPC_OP_BEGIN_MESSAGE) */
calld->remaining_slice_bytes = sop->data.begin_message.length;
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
case GRPC_OP_METADATA:
if (!calld->written_initial_metadata) {
/* Parse incoming request for compression. If any, it'll be available
* at calld->compression_algorithm */
grpc_metadata_batch_filter(&(sop->data.metadata),
compression_md_filter, elem);
if (!calld->has_compression_algorithm) {
/* If no algorithm was found in the metadata and we aren't
* exceptionally skipping compression, fall back to the channel
* default */
calld->compression_algorithm =
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
/* hint compression algorithm */
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
/* convey supported compression algorithms */
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
case GRPC_OP_SLICE:
if (skip_compression(channeld, calld)) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* Increase input ref count, gpr_slice_buffer_add takes ownership. */
gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) <=
calld->remaining_slice_bytes);
calld->remaining_slice_bytes -=
(gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
if (calld->remaining_slice_bytes == 0) {
did_compress =
compress_send_sb(calld->compression_algorithm, &calld->slices);
}
break;
case GRPC_NO_OP:
break;
}
}
/* Modify the send_ops stream_op_buffer depending on whether compression was static void finish_send_message(grpc_exec_ctx *exec_ctx,
* carried out */ grpc_call_element *elem) {
call_data *calld = elem->call_data;
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) { if (did_compress) {
finish_compressed_sopb(send_ops, elem); gpr_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
gpr_slice_buffer_destroy(&tmp);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
calld->send_op.send_message = &calld->replacement_stream.base;
calld->post_send = calld->send_op.on_complete;
calld->send_op.on_complete = &calld->send_done;
grpc_call_next_op(exec_ctx, elem, &calld->send_op);
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
} else {
continue_send_message(exec_ctx, elem);
}
}
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
break;
}
} }
} }
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
if (op->send_ops && op->send_ops->nops > 0) { if (op->send_initial_metadata) {
process_send_ops(elem, op->send_ops); process_send_initial_metadata(elem, op->send_initial_metadata);
}
if (op->send_message != NULL && !skip_compression(elem) &&
0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
calld->send_op = *op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
} }
GPR_TIMER_END("compress_start_transport_stream_op", 0); GPR_TIMER_END("compress_start_transport_stream_op", 0);
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
} }
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
/* initialize members */ /* initialize members */
gpr_slice_buffer_init(&calld->slices); gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0; calld->has_compression_algorithm = 0;
calld->written_initial_metadata = 0; /* GPR_FALSE */ grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
if (initial_op) {
if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
process_send_ops(elem, initial_op->send_ops);
}
}
} }
/* Destructor for call_data */ /* Destructor for call_data */
@ -313,9 +262,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_channel_element_args *args) {
int is_first, int is_last) {
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx; grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
@ -325,24 +273,25 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_compression_options_init(&channeld->compression_options); grpc_compression_options_init(&channeld->compression_options);
channeld->compression_options.enabled_algorithms_bitset = channeld->compression_options.enabled_algorithms_bitset =
(gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(
args->channel_args);
channeld->default_compression_algorithm = channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args); grpc_channel_args_get_compression_algorithm(args->channel_args);
/* Make sure the default isn't disabled. */ /* Make sure the default isn't disabled. */
GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, channeld->default_compression_algorithm)); &channeld->compression_options, channeld->default_compression_algorithm));
channeld->compression_options.default_compression_algorithm = channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm; channeld->default_compression_algorithm;
channeld->mdstr_request_compression_algorithm_key = channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(
grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); args->metadata_context, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
channeld->mdstr_outgoing_compression_algorithm_key = channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding"); grpc_mdstr_from_string(args->metadata_context, "grpc-encoding");
channeld->mdstr_compression_capabilities_key = channeld->mdstr_compression_capabilities_key =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); grpc_mdstr_from_string(args->metadata_context, "grpc-accept-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name; char *algorithm_name;
@ -354,9 +303,9 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] = channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings( grpc_mdelem_from_metadata_strings(
mdctx, args->metadata_context,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name)); grpc_mdstr_from_string(args->metadata_context, algorithm_name));
if (algo_idx > 0) { if (algo_idx > 0) {
supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
} }
@ -369,11 +318,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
&accept_encoding_str_len); &accept_encoding_str_len);
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), args->metadata_context,
grpc_mdstr_from_string(mdctx, accept_encoding_str)); GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
grpc_mdstr_from_string(args->metadata_context, accept_encoding_str));
gpr_free(accept_encoding_str); gpr_free(accept_encoding_str);
GPR_ASSERT(!is_last); GPR_ASSERT(!args->is_last);
} }
/* Destructor for channel data */ /* Destructor for channel data */
@ -393,5 +343,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_compress_filter = { const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
destroy_channel_elem, grpc_call_next_get_peer, "compress"}; sizeof(channel_data), init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, "compress"};

@ -83,8 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
int r; int r;
@ -92,10 +91,18 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
r = grpc_transport_init_stream(exec_ctx, chand->transport, r = grpc_transport_init_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld), TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data, initial_op); args->refcount, args->server_transport_data);
GPR_ASSERT(r == 0); GPR_ASSERT(r == 0);
} }
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_set_pollset(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset);
}
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
@ -108,11 +115,10 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_channel_element_args *args) {
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data; channel_data *cd = (channel_data *)elem->channel_data;
GPR_ASSERT(is_last); GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL; cd->transport = NULL;
} }
@ -132,8 +138,8 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
const grpc_channel_filter grpc_connected_channel_filter = { const grpc_channel_filter grpc_connected_channel_filter = {
con_start_transport_stream_op, con_start_transport_op, sizeof(call_data), con_start_transport_stream_op, con_start_transport_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, init_call_elem, set_pollset, destroy_call_elem, sizeof(channel_data),
destroy_channel_elem, con_get_peer, "connected", init_channel_elem, destroy_channel_elem, con_get_peer, "connected",
}; };
void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
@ -154,3 +160,8 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
channel. */ channel. */
channel_stack->call_stack_size += grpc_transport_stream_size(transport); channel_stack->call_stack_size += grpc_transport_stream_size(transport);
} }
grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
call_data *calld = elem->call_data;
return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
}

@ -46,4 +46,6 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack, void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack,
grpc_transport* transport); grpc_transport* transport);
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */

@ -45,10 +45,8 @@ typedef struct call_data {
grpc_linked_mdelem te_trailers; grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type; grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent; grpc_linked_mdelem user_agent;
int sent_initial_metadata;
int got_initial_metadata; grpc_metadata_batch *recv_initial_metadata;
grpc_stream_op_buffer *recv_ops;
/** Closure to call when finished with the hc_on_recv hook */ /** Closure to call when finished with the hc_on_recv hook */
grpc_closure *on_done_recv; grpc_closure *on_done_recv;
@ -91,18 +89,11 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data; grpc_call_element *elem = user_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
size_t i; client_recv_filter_args a;
size_t nops = calld->recv_ops->nops; a.elem = elem;
grpc_stream_op *ops = calld->recv_ops->ops; a.exec_ctx = exec_ctx;
for (i = 0; i < nops; i++) { grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter,
grpc_stream_op *op = &ops[i]; &a);
client_recv_filter_args a;
if (op->type != GRPC_OP_METADATA) continue;
calld->got_initial_metadata = 1;
a.elem = elem;
a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a);
}
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
} }
@ -123,40 +114,29 @@ static void hc_mutate_op(grpc_call_element *elem,
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
size_t i; if (op->send_initial_metadata != NULL) {
if (op->send_ops && !calld->sent_initial_metadata) { grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter,
size_t nops = op->send_ops->nops; elem);
grpc_stream_op *ops = op->send_ops->ops; /* Send : prefixed headers, which have to be before any application
for (i = 0; i < nops; i++) { layer headers. */
grpc_stream_op *stream_op = &ops[i]; grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
if (stream_op->type != GRPC_OP_METADATA) continue; GRPC_MDELEM_REF(channeld->method));
calld->sent_initial_metadata = 1; grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
grpc_metadata_batch_filter(&stream_op->data.metadata, client_strip_filter, GRPC_MDELEM_REF(channeld->scheme));
elem); grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
/* Send : prefixed headers, which have to be before any application GRPC_MDELEM_REF(channeld->te_trailers));
layer headers. */ grpc_metadata_batch_add_tail(op->send_initial_metadata,
grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->method, &calld->content_type,
GRPC_MDELEM_REF(channeld->method)); GRPC_MDELEM_REF(channeld->content_type));
grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->scheme, grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent,
GRPC_MDELEM_REF(channeld->scheme)); GRPC_MDELEM_REF(channeld->user_agent));
grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->te_trailers,
GRPC_MDELEM_REF(channeld->te_trailers));
grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->content_type,
GRPC_MDELEM_REF(channeld->content_type));
grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->user_agent,
GRPC_MDELEM_REF(channeld->user_agent));
break;
}
} }
if (op->recv_ops && !calld->got_initial_metadata) { if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */ /* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops; calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_done_recv; calld->on_done_recv = op->on_complete;
op->on_done_recv = &calld->hc_on_recv; op->on_complete = &calld->hc_on_recv;
} }
} }
@ -172,14 +152,10 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
calld->on_done_recv = NULL; calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
} }
/* Destructor for call_data */ /* Destructor for call_data */
@ -250,28 +226,31 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *channel_args, grpc_channel_element_args *args) {
grpc_mdctx *mdctx, int is_first, int is_last) {
/* grab pointers to our data from the channel element */ /* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to /* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down handle the case that there's no 'next' filter to call on the up or down
path */ path */
GPR_ASSERT(!is_last); GPR_ASSERT(!args->is_last);
/* initialize members */ /* initialize members */
channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); channeld->te_trailers =
channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); grpc_mdelem_from_strings(args->metadata_context, "te", "trailers");
channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", channeld->method =
scheme_from_args(channel_args)); grpc_mdelem_from_strings(args->metadata_context, ":method", "POST");
channeld->content_type = channeld->scheme = grpc_mdelem_from_strings(
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); args->metadata_context, ":scheme", scheme_from_args(args->channel_args));
channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); channeld->content_type = grpc_mdelem_from_strings(
args->metadata_context, "content-type", "application/grpc");
channeld->status =
grpc_mdelem_from_strings(args->metadata_context, ":status", "200");
channeld->user_agent = grpc_mdelem_from_metadata_strings( channeld->user_agent = grpc_mdelem_from_metadata_strings(
mdctx, grpc_mdstr_from_string(mdctx, "user-agent"), args->metadata_context,
user_agent_from_args(mdctx, channel_args)); grpc_mdstr_from_string(args->metadata_context, "user-agent"),
user_agent_from_args(args->metadata_context, args->channel_args));
} }
/* Destructor for channel data */ /* Destructor for channel data */
@ -290,6 +269,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_http_client_filter = { const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-client"}; grpc_call_next_get_peer, "http-client"};

@ -39,7 +39,6 @@
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
typedef struct call_data { typedef struct call_data {
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path; gpr_uint8 seen_path;
gpr_uint8 seen_post; gpr_uint8 seen_post;
gpr_uint8 sent_status; gpr_uint8 sent_status;
@ -49,7 +48,7 @@ typedef struct call_data {
grpc_linked_mdelem status; grpc_linked_mdelem status;
grpc_linked_mdelem content_type; grpc_linked_mdelem content_type;
grpc_stream_op_buffer *recv_ops; grpc_metadata_batch *recv_initial_metadata;
/** Closure to call when finished with the hs_on_recv hook */ /** Closure to call when finished with the hs_on_recv hook */
grpc_closure *on_done_recv; grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv /** Receive closures are chained: we inject this closure as the on_done_recv
@ -154,43 +153,35 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data; grpc_call_element *elem = user_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (success) { if (success) {
size_t i; server_filter_args a;
size_t nops = calld->recv_ops->nops; a.elem = elem;
grpc_stream_op *ops = calld->recv_ops->ops; a.exec_ctx = exec_ctx;
for (i = 0; i < nops; i++) { grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a);
grpc_stream_op *op = &ops[i]; /* Have we seen the required http2 transport headers?
server_filter_args a; (:method, :scheme, content-type, with :path and :authority covered
if (op->type != GRPC_OP_METADATA) continue; at the channel level right now) */
calld->got_initial_metadata = 1; if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
a.elem = elem; calld->seen_path && calld->seen_authority) {
a.exec_ctx = exec_ctx; /* do nothing */
grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a); } else {
/* Have we seen the required http2 transport headers? if (!calld->seen_path) {
(:method, :scheme, content-type, with :path and :authority covered gpr_log(GPR_ERROR, "Missing :path header");
at the channel level right now) */ }
if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && if (!calld->seen_authority) {
calld->seen_path && calld->seen_authority) { gpr_log(GPR_ERROR, "Missing :authority header");
/* do nothing */ }
} else { if (!calld->seen_post) {
if (!calld->seen_path) { gpr_log(GPR_ERROR, "Missing :method header");
gpr_log(GPR_ERROR, "Missing :path header");
}
if (!calld->seen_authority) {
gpr_log(GPR_ERROR, "Missing :authority header");
}
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
success = 0;
grpc_call_element_send_cancel(exec_ctx, elem);
} }
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
success = 0;
grpc_call_element_send_cancel(exec_ctx, elem);
} }
} }
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
@ -201,29 +192,21 @@ static void hs_mutate_op(grpc_call_element *elem,
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
size_t i;
if (op->send_ops && !calld->sent_status) { if (op->send_initial_metadata != NULL && !calld->sent_status) {
size_t nops = op->send_ops->nops; calld->sent_status = 1;
grpc_stream_op *ops = op->send_ops->ops; grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status,
for (i = 0; i < nops; i++) { GRPC_MDELEM_REF(channeld->status_ok));
grpc_stream_op *stream_op = &ops[i]; grpc_metadata_batch_add_tail(op->send_initial_metadata,
if (stream_op->type != GRPC_OP_METADATA) continue; &calld->content_type,
calld->sent_status = 1; GRPC_MDELEM_REF(channeld->content_type));
grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->status,
GRPC_MDELEM_REF(channeld->status_ok));
grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->content_type,
GRPC_MDELEM_REF(channeld->content_type));
break;
}
} }
if (op->recv_ops && !calld->got_initial_metadata) { if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */ /* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops; calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_done_recv; calld->on_done_recv = op->on_complete;
op->on_done_recv = &calld->hs_on_recv; op->on_complete = &calld->hs_on_recv;
} }
} }
@ -239,14 +222,12 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
/* initialize members */ /* initialize members */
memset(calld, 0, sizeof(*calld)); memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
if (initial_op) hs_mutate_op(elem, initial_op);
} }
/* Destructor for call_data */ /* Destructor for call_data */
@ -255,34 +236,39 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_channel_element_args *args) {
int is_first, int is_last) {
/* grab pointers to our data from the channel element */ /* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to /* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down handle the case that there's no 'next' filter to call on the up or down
path */ path */
GPR_ASSERT(!is_first); GPR_ASSERT(!args->is_last);
GPR_ASSERT(!is_last);
/* initialize members */ /* initialize members */
channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); channeld->te_trailers =
channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200"); grpc_mdelem_from_strings(args->metadata_context, "te", "trailers");
channeld->status_ok =
grpc_mdelem_from_strings(args->metadata_context, ":status", "200");
channeld->status_not_found = channeld->status_not_found =
grpc_mdelem_from_strings(mdctx, ":status", "404"); grpc_mdelem_from_strings(args->metadata_context, ":status", "404");
channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); channeld->method_post =
channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); grpc_mdelem_from_strings(args->metadata_context, ":method", "POST");
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); channeld->http_scheme =
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); grpc_mdelem_from_strings(args->metadata_context, ":scheme", "http");
channeld->path_key = grpc_mdstr_from_string(mdctx, ":path"); channeld->https_scheme =
channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority"); grpc_mdelem_from_strings(args->metadata_context, ":scheme", "https");
channeld->host_key = grpc_mdstr_from_string(mdctx, "host"); channeld->grpc_scheme =
channeld->content_type = grpc_mdelem_from_strings(args->metadata_context, ":scheme", "grpc");
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->path_key = grpc_mdstr_from_string(args->metadata_context, ":path");
channeld->authority_key =
grpc_mdstr_from_string(args->metadata_context, ":authority");
channeld->host_key = grpc_mdstr_from_string(args->metadata_context, "host");
channeld->content_type = grpc_mdelem_from_strings(
args->metadata_context, "content-type", "application/grpc");
channeld->mdctx = mdctx; channeld->mdctx = args->metadata_context;
} }
/* Destructor for channel data */ /* Destructor for channel data */
@ -306,6 +292,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_http_server_filter = { const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-server"}; grpc_call_next_get_peer, "http-server"};

@ -73,16 +73,13 @@ static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */ /* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data, grpc_call_element_args *args) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
/* initialize members */ /* initialize members */
calld->unused = channeld->unused; calld->unused = channeld->unused;
if (initial_op) noop_mutate_op(elem, initial_op);
} }
/* Destructor for call_data */ /* Destructor for call_data */
@ -91,17 +88,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel *master, grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_channel_element_args *args) {
int is_first, int is_last) {
/* grab pointers to our data from the channel element */ /* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to /* The last filter tends to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down handle the case that there's no 'next' filter to call on the down
path */ path */
GPR_ASSERT(!is_first); GPR_ASSERT(!args->is_last);
GPR_ASSERT(!is_last);
/* initialize members */ /* initialize members */
channeld->unused = 0; channeld->unused = 0;
@ -118,5 +113,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_no_op_filter = { const grpc_channel_filter grpc_no_op_filter = {
noop_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), noop_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
destroy_channel_elem, grpc_call_next_get_peer, "no-op"}; sizeof(channel_data), init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, "no-op"};

@ -0,0 +1,282 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/subchannel_call_holder.h"
#include <grpc/support/alloc.h>
#include "src/core/profiling/timers.h"
#define GET_CALL(holder) \
((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
int success);
static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
int success);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
void *pick_subchannel_arg) {
gpr_atm_rel_store(&holder->subchannel_call, 0);
holder->pick_subchannel = pick_subchannel;
holder->pick_subchannel_arg = pick_subchannel_arg;
gpr_mu_init(&holder->mu);
holder->subchannel = NULL;
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
grpc_subchannel_call *call = GET_CALL(holder);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
}
GPR_ASSERT(holder->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
gpr_mu_destroy(&holder->mu);
GPR_ASSERT(holder->waiting_ops_count == 0);
gpr_free(holder->waiting_ops);
}
void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op) {
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(holder);
GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
if (call == CANCELLED_CALL) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
if (call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, call, op);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
/* we failed; lock and figure out what to do */
gpr_mu_lock(&holder->mu);
retry:
/* need to recheck that another thread hasn't set the call */
call = GET_CALL(holder);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&holder->mu);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
if (call != NULL) {
gpr_mu_unlock(&holder->mu);
grpc_subchannel_call_process_op(exec_ctx, call, op);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_with_status != GRPC_STATUS_OK) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
goto retry;
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
&holder->subchannel_call);
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
&holder->subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
}
/* if we don't have a subchannel, try to get one */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->subchannel == NULL && op->send_initial_metadata != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&holder->next_step, subchannel_ready, holder);
if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
op->send_initial_metadata, &holder->subchannel,
&holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->subchannel != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
grpc_closure_init(&holder->next_step, call_ready, holder);
if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
holder->pollset, &holder->subchannel_call,
&holder->next_step)) {
/* got one immediately - continue the op (and any waiting ops) */
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
}
/* nothing to be done but wait */
add_waiting_locked(holder, op);
gpr_mu_unlock(&holder->mu);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_subchannel_call_holder *holder = arg;
grpc_subchannel_call *call;
gpr_mu_lock(&holder->mu);
GPR_ASSERT(holder->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
call = GET_CALL(holder);
GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
if (holder->subchannel == NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
fail_locked(exec_ctx, holder);
} else {
grpc_closure_init(&holder->next_step, call_ready, holder);
if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
holder->pollset, &holder->subchannel_call,
&holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
/* got one immediately - continue the op (and any waiting ops) */
retry_waiting_locked(exec_ctx, holder);
}
}
gpr_mu_unlock(&holder->mu);
}
static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_subchannel_call_holder *holder = arg;
GPR_TIMER_BEGIN("call_ready", 0);
gpr_mu_lock(&holder->mu);
GPR_ASSERT(holder->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (GET_CALL(holder) != NULL) {
retry_waiting_locked(exec_ctx, holder);
} else {
fail_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
GPR_TIMER_END("call_ready", 0);
}
typedef struct {
grpc_transport_stream_op *ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = holder->waiting_ops;
a->nops = holder->waiting_ops_count;
a->call = GET_CALL(holder);
if (a->call == CANCELLED_CALL) {
gpr_free(a);
fail_locked(exec_ctx, holder);
return;
}
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
}
static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
gpr_free(a);
}
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
holder->waiting_ops =
gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
sizeof(*holder->waiting_ops));
}
holder->waiting_ops[holder->waiting_ops_count++] = *op;
GPR_TIMER_END("add_waiting_locked", 0);
}
static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
0);
}
holder->waiting_ops_count = 0;
}
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_channel *master) {
grpc_subchannel_call *subchannel_call = GET_CALL(holder);
if (subchannel_call) {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
} else {
return grpc_channel_get_target(master);
}
}

@ -0,0 +1,83 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
#define GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
#include "src/core/client_config/subchannel.h"
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel, grpc_closure *on_ready);
typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL,
GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
} grpc_subchannel_call_holder_creation_phase;
typedef struct grpc_subchannel_call_holder {
/* either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
void *pick_subchannel_arg;
gpr_mu mu;
grpc_subchannel_call_holder_creation_phase creation_phase;
grpc_subchannel *subchannel;
grpc_pollset *pollset;
grpc_transport_stream_op *waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
grpc_closure next_step;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
void *pick_subchannel_arg);
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_channel *master);
#endif

@ -130,6 +130,30 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
} }
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_subchannel **target) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_subchannel_del_interested_party(
exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1; p->started_picking = 1;
p->checking_subchannel = 0; p->checking_subchannel = 0;
@ -149,16 +173,16 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_subchannel **target, grpc_closure *on_complete) { grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
if (p->selected) { if (p->selected) {
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
*target = p->selected; *target = p->selected;
grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1); return 1;
} else { } else {
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
@ -172,6 +196,7 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->on_complete = on_complete; pp->on_complete = on_complete;
p->pending_picks = pp; p->pending_picks = pp;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
return 0;
} }
} }
@ -365,8 +390,8 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast, pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
pf_check_connectivity, pf_notify_on_state_change}; pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}

@ -264,6 +264,33 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
size_t i;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
for (i = 0; i < p->num_subchannels; i++) {
grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
pp->pollset);
}
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i; size_t i;
p->started_picking = 1; p->started_picking = 1;
@ -286,9 +313,9 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_subchannel **target, grpc_closure *on_complete) { grpc_closure *on_complete) {
size_t i; size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
@ -303,7 +330,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p); advance_last_picked_locked(p);
on_complete->cb(exec_ctx, on_complete->cb_arg, 1); return 1;
} else { } else {
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
@ -319,6 +346,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->on_complete = on_complete; pp->on_complete = on_complete;
p->pending_picks = pp; p->pending_picks = pp;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
return 0;
} }
} }
@ -487,8 +515,8 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
} }
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_exit_idle, rr_broadcast, rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
rr_check_connectivity, rr_notify_on_state_change}; rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}

@ -68,12 +68,17 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->shutdown(exec_ctx, policy); policy->vtable->shutdown(exec_ctx, policy);
} }
void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata,
grpc_subchannel **target, grpc_closure *on_complete) { grpc_subchannel **target, grpc_closure *on_complete) {
policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
on_complete); target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_subchannel **target) {
policy->vtable->cancel_pick(exec_ctx, policy, target);
} }
void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,

@ -56,9 +56,11 @@ struct grpc_lb_policy_vtable {
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** implement grpc_lb_policy_pick */ /** implement grpc_lb_policy_pick */
void (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
grpc_subchannel **target, grpc_closure *on_complete); grpc_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_subchannel **target);
/** try to enter a READY connectivity state */ /** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
@ -106,10 +108,13 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
target for this rpc, and 'return' it by calling \a on_complete after setting target for this rpc, and 'return' it by calling \a on_complete after setting
\a target. \a target.
Picking can be asynchronous. Any IO should be done under \a pollset. */ Picking can be asynchronous. Any IO should be done under \a pollset. */
void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata,
grpc_subchannel **target, grpc_closure *on_complete); grpc_subchannel **target, grpc_closure *on_complete);
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_subchannel **target);
void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_transport_op *op); grpc_transport_op *op);

@ -22,7 +22,7 @@
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
@ -41,8 +41,10 @@
#include "src/core/channel/client_channel.h" #include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h" #include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer.h"
#include "src/core/transport/connectivity_state.h" #include "src/core/profiling/timers.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
#include "src/core/transport/connectivity_state.h"
#include "src/core/transport/connectivity_state.h"
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -69,7 +71,7 @@ typedef struct waiting_for_connect {
struct waiting_for_connect *next; struct waiting_for_connect *next;
grpc_closure *notify; grpc_closure *notify;
grpc_pollset *pollset; grpc_pollset *pollset;
grpc_subchannel_call **target; gpr_atm *target;
grpc_subchannel *subchannel; grpc_subchannel *subchannel;
grpc_closure continuation; grpc_closure continuation;
} waiting_for_connect; } waiting_for_connect;
@ -137,14 +139,16 @@ struct grpc_subchannel {
struct grpc_subchannel_call { struct grpc_subchannel_call {
connection *connection; connection *connection;
gpr_refcount refs;
}; };
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call *)(callstack)) - 1)
static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
connection *con); connection *con,
grpc_pollset *pollset);
static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c, grpc_subchannel *c,
const char *reason); const char *reason);
@ -163,7 +167,7 @@ static grpc_subchannel *connection_unref_locked(
connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \ #define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r)) subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) \ #define SUBCHANNEL_UNREF_LOCKED(p, r) \
@ -173,6 +177,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
#define CONNECTION_UNREF_LOCKED(cl, p, r) \ #define CONNECTION_UNREF_LOCKED(cl, p, r) \
connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason #define REF_PASS_ARGS , file, line, reason
#define REF_PASS_REASON , reason
#define REF_LOG(name, p) \ #define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs + 1, reason) (name), (p), (p)->refs, (p)->refs + 1, reason)
@ -185,6 +190,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
#define REF_PASS_ARGS #define REF_PASS_ARGS
#define REF_PASS_REASON
#define REF_LOG(name, p) \ #define REF_LOG(name, p) \
do { \ do { \
} while (0) } while (0)
@ -312,9 +318,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c; return c;
} }
void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel, grpc_subchannel *subchannel,
int iomgr_success) { int iomgr_success) {
waiting_for_connect *w4c; waiting_for_connect *w4c;
gpr_mu_lock(&subchannel->mu); gpr_mu_lock(&subchannel->mu);
w4c = subchannel->waiting; w4c = subchannel->waiting;
@ -335,6 +341,37 @@ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
} }
} }
void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
gpr_atm *target) {
waiting_for_connect *w4c;
int unref_count = 0;
gpr_mu_lock(&subchannel->mu);
w4c = subchannel->waiting;
subchannel->waiting = NULL;
while (w4c != NULL) {
waiting_for_connect *next = w4c->next;
if (w4c->target == target) {
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
w4c->pollset);
grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
unref_count++;
gpr_free(w4c);
} else {
w4c->next = subchannel->waiting;
subchannel->waiting = w4c;
}
w4c = next;
}
gpr_mu_unlock(&subchannel->mu);
while (unref_count-- > 0) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
}
}
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args; grpc_connect_in_args args;
@ -358,29 +395,35 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) { int iomgr_success) {
grpc_subchannel_call_create_status call_creation_status; int call_creation_status;
waiting_for_connect *w4c = arg; waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
call_creation_status = grpc_subchannel_create_call( call_creation_status = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); GPR_ASSERT(call_creation_status == 1);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c); gpr_free(w4c);
} }
grpc_subchannel_call_create_status grpc_subchannel_create_call( int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, grpc_pollset *pollset, gpr_atm *target,
grpc_subchannel_call **target, grpc_closure *notify) { grpc_closure *notify) {
connection *con; connection *con;
grpc_subchannel_call *call;
GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
if (c->active != NULL) { if (c->active != NULL) {
con = c->active; con = c->active;
CONNECTION_REF_LOCKED(con, "call"); CONNECTION_REF_LOCKED(con, "call");
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con); call = create_call(exec_ctx, con, pollset);
return GRPC_SUBCHANNEL_CALL_CREATE_READY; if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
}
GPR_TIMER_END("grpc_subchannel_create_call", 0);
return 1;
} else { } else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting; w4c->next = c->waiting;
@ -405,7 +448,8 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call(
} else { } else {
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
} }
return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; GPR_TIMER_END("grpc_subchannel_create_call", 0);
return 0;
} }
} }
@ -687,7 +731,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
update_reconnect_parameters(c); update_reconnect_parameters(c);
continue_connect(exec_ctx, c); continue_connect(exec_ctx, c);
} else { } else {
grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success); cancel_waiting_calls(exec_ctx, c, iomgr_success);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
} }
@ -747,26 +791,40 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
* grpc_subchannel_call implementation * grpc_subchannel_call implementation
*/ */
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
int success) {
grpc_subchannel_call *c = call;
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
gpr_mu_lock(mu);
destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy != NULL) {
subchannel_destroy(exec_ctx, destroy);
}
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
void grpc_subchannel_call_ref(grpc_subchannel_call *c void grpc_subchannel_call_ref(grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_ref(&c->refs); #ifdef GRPC_STREAM_REFCOUNT_DEBUG
grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
#else
grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c));
#endif
} }
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *c grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (gpr_unref(&c->refs)) { #ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_mu *mu = &c->connection->subchannel->mu; grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
grpc_subchannel *destroy; #else
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
gpr_mu_lock(mu); #endif
destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy != NULL) {
subchannel_destroy(exec_ctx, destroy);
}
}
} }
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
@ -785,14 +843,16 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
} }
static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
connection *con) { connection *con,
grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call = grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con; call->connection = con;
gpr_ref_init(&call->refs, 1); grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); NULL, NULL, callstk);
grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
return call; return call;
} }
@ -803,3 +863,8 @@ grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) {
grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
return subchannel->master; return subchannel->master;
} }
grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}

@ -44,7 +44,7 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_args grpc_subchannel_args;
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \ #define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
@ -75,11 +75,6 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
typedef enum {
GRPC_SUBCHANNEL_CALL_CREATE_READY,
GRPC_SUBCHANNEL_CALL_CREATE_PENDING
} grpc_subchannel_call_create_status;
/** construct a subchannel call (possibly asynchronously). /** construct a subchannel call (possibly asynchronously).
* *
* If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
@ -88,14 +83,15 @@ typedef enum {
* Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
* subchannel call will be created asynchronously, invoking the \a notify * subchannel call will be created asynchronously, invoking the \a notify
* callback upon completion. */ * callback upon completion. */
grpc_subchannel_call_create_status grpc_subchannel_create_call( int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, grpc_subchannel *subchannel,
grpc_subchannel_call **target, grpc_closure *notify); grpc_pollset *pollset, gpr_atm *target,
grpc_closure *notify);
/** cancel \a call in the waiting state. */ /** cancel \a call in the waiting state. */
void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel, grpc_subchannel *subchannel,
int iomgr_success); gpr_atm *target);
/** process a transport level op */ /** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
@ -138,6 +134,9 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call); grpc_subchannel_call *subchannel_call);
grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call);
struct grpc_subchannel_args { struct grpc_subchannel_args {
/** Channel filters for this channel - wrapped factories will likely /** Channel filters for this channel - wrapped factories will likely
want to mutate this */ want to mutate this */

Loading…
Cancel
Save