Refactoring progress towards integrating client configs

pull/2303/head
Craig Tiller 10 years ago
parent 3bc8ebd48e
commit f5f1712e1f
  1. 8
      BUILD
  2. 4
      Makefile
  3. 4
      build.json
  4. 4
      gRPC.podspec
  5. 308
      src/core/channel/child_channel.c
  6. 65
      src/core/channel/child_channel.h
  7. 134
      src/core/channel/client_channel.c
  8. 5
      src/core/channel/client_channel.h
  9. 302
      src/core/channel/client_setup.c
  10. 77
      src/core/channel/client_setup.h
  11. 3
      src/core/client_config/lb_policy.h
  12. 13
      src/core/client_config/subchannel.h
  13. 5
      src/core/surface/channel.h
  14. 163
      src/core/surface/channel_create.c
  15. 2
      tools/doxygen/Doxyfile.core.internal
  16. 6
      vsprojects/grpc/grpc.vcxproj
  17. 12
      vsprojects/grpc/grpc.vcxproj.filters
  18. 6
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  19. 12
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -150,9 +150,7 @@ cc_library(
"src/core/channel/census_filter.h",
"src/core/channel/channel_args.h",
"src/core/channel/channel_stack.h",
"src/core/channel/child_channel.h",
"src/core/channel/client_channel.h",
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/context.h",
"src/core/channel/http_client_filter.h",
@ -266,9 +264,7 @@ cc_library(
"src/core/census/grpc_context.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
"src/core/channel/child_channel.c",
"src/core/channel/client_channel.c",
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_server_filter.c",
@ -396,9 +392,7 @@ cc_library(
"src/core/channel/census_filter.h",
"src/core/channel/channel_args.h",
"src/core/channel/channel_stack.h",
"src/core/channel/child_channel.h",
"src/core/channel/client_channel.h",
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/context.h",
"src/core/channel/http_client_filter.h",
@ -490,9 +484,7 @@ cc_library(
"src/core/census/grpc_context.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
"src/core/channel/child_channel.c",
"src/core/channel/client_channel.c",
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_server_filter.c",

@ -3016,9 +3016,7 @@ LIBGRPC_SRC = \
src/core/census/grpc_context.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
src/core/channel/child_channel.c \
src/core/channel/client_channel.c \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
src/core/channel/http_server_filter.c \
@ -3272,9 +3270,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/census/grpc_context.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
src/core/channel/child_channel.c \
src/core/channel/client_channel.c \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
src/core/channel/http_server_filter.c \

@ -111,9 +111,7 @@
"src/core/channel/census_filter.h",
"src/core/channel/channel_args.h",
"src/core/channel/channel_stack.h",
"src/core/channel/child_channel.h",
"src/core/channel/client_channel.h",
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/context.h",
"src/core/channel/http_client_filter.h",
@ -205,9 +203,7 @@
"src/core/census/grpc_context.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
"src/core/channel/child_channel.c",
"src/core/channel/client_channel.c",
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_server_filter.c",

File diff suppressed because one or more lines are too long

@ -1,308 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/child_channel.h"
#include "src/core/iomgr/iomgr.h"
#include <grpc/support/alloc.h>
/* Link back filter: passes up calls to the client channel, pushes down calls
down */
static void maybe_destroy_channel(grpc_child_channel *channel);
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_channel_element *back;
/* # of active calls on the channel */
gpr_uint32 active_calls;
/* has grpc_child_channel_destroy been called? */
gpr_uint8 destroyed;
/* has the transport reported itself disconnected? */
gpr_uint8 disconnected;
/* are we calling 'back' - our parent channel */
gpr_uint8 calling_back;
/* have we or our parent sent goaway yet? - dup suppression */
gpr_uint8 sent_goaway;
/* are we currently sending farewell (in this file: goaway + disconnect) */
gpr_uint8 sending_farewell;
/* have we sent farewell (goaway + disconnect) */
gpr_uint8 sent_farewell;
grpc_iomgr_closure finally_destroy_channel_closure;
grpc_iomgr_closure send_farewells_closure;
} lb_channel_data;
typedef struct { grpc_child_channel *channel; } lb_call_data;
static void lb_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
static void lb_channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem,
grpc_channel_op *op) {
lb_channel_data *chand = elem->channel_data;
grpc_channel_element *back;
int calling_back = 0;
switch (op->dir) {
case GRPC_CALL_UP:
gpr_mu_lock(&chand->mu);
back = chand->back;
if (back) {
chand->calling_back++;
calling_back = 1;
}
gpr_mu_unlock(&chand->mu);
if (back) {
back->filter->channel_op(chand->back, elem, op);
} else if (op->type == GRPC_TRANSPORT_GOAWAY) {
gpr_slice_unref(op->data.goaway.message);
}
break;
case GRPC_CALL_DOWN:
grpc_channel_next_op(elem, op);
break;
}
gpr_mu_lock(&chand->mu);
switch (op->type) {
case GRPC_TRANSPORT_CLOSED:
chand->disconnected = 1;
maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
break;
case GRPC_CHANNEL_GOAWAY:
chand->sent_goaway = 1;
break;
case GRPC_CHANNEL_DISCONNECT:
case GRPC_TRANSPORT_GOAWAY:
case GRPC_ACCEPT_CALL:
break;
}
if (calling_back) {
chand->calling_back--;
gpr_cv_signal(&chand->cv);
maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
}
gpr_mu_unlock(&chand->mu);
}
/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {}
/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void lb_init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
lb_channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
gpr_mu_init(&chand->mu);
gpr_cv_init(&chand->cv);
chand->back = NULL;
chand->destroyed = 0;
chand->disconnected = 0;
chand->active_calls = 0;
chand->sent_goaway = 0;
chand->calling_back = 0;
chand->sending_farewell = 0;
chand->sent_farewell = 0;
}
/* Destructor for channel_data */
static void lb_destroy_channel_elem(grpc_channel_element *elem) {
lb_channel_data *chand = elem->channel_data;
gpr_mu_destroy(&chand->mu);
gpr_cv_destroy(&chand->cv);
}
const grpc_channel_filter grpc_child_channel_top_filter = {
lb_start_transport_op, lb_channel_op,
sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
"child-channel",
};
/* grpc_child_channel proper */
#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \
grpc_channel_stack_element((channel), 0)
#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
static void finally_destroy_channel(void *c, int success) {
/* ignore success or not... this is a destruction callback and will only
happen once - the only purpose here is to release resources */
grpc_child_channel *channel = c;
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
/* wait for the initiator to leave the mutex */
gpr_mu_lock(&chand->mu);
gpr_mu_unlock(&chand->mu);
grpc_channel_stack_destroy(channel);
gpr_free(channel);
}
static void send_farewells(void *c, int success) {
grpc_child_channel *channel = c;
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
int send_goaway;
grpc_channel_op op;
gpr_mu_lock(&chand->mu);
send_goaway = !chand->sent_goaway;
chand->sent_goaway = 1;
gpr_mu_unlock(&chand->mu);
if (send_goaway) {
op.type = GRPC_CHANNEL_GOAWAY;
op.dir = GRPC_CALL_DOWN;
op.data.goaway.status = GRPC_STATUS_OK;
op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
grpc_channel_next_op(lbelem, &op);
}
op.type = GRPC_CHANNEL_DISCONNECT;
op.dir = GRPC_CALL_DOWN;
grpc_channel_next_op(lbelem, &op);
gpr_mu_lock(&chand->mu);
chand->sending_farewell = 0;
chand->sent_farewell = 1;
maybe_destroy_channel(channel);
gpr_mu_unlock(&chand->mu);
}
static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
!chand->sending_farewell && !chand->calling_back) {
chand->finally_destroy_channel_closure.cb = finally_destroy_channel;
chand->finally_destroy_channel_closure.cb_arg = channel;
grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
!chand->sent_farewell) {
chand->sending_farewell = 1;
chand->send_farewells_closure.cb = send_farewells;
chand->send_farewells_closure.cb_arg = channel;
grpc_iomgr_add_callback(&chand->send_farewells_closure);
}
}
grpc_child_channel *grpc_child_channel_create(
grpc_channel_element *parent, const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
grpc_mdctx *metadata_context) {
grpc_channel_stack *stk =
gpr_malloc(grpc_channel_stack_size(filters, filter_count));
lb_channel_data *lb;
grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk);
lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data;
gpr_mu_lock(&lb->mu);
lb->back = parent;
gpr_mu_unlock(&lb->mu);
return stk;
}
void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks) {
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
gpr_mu_lock(&chand->mu);
while (wait_for_callbacks && chand->calling_back) {
gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
}
chand->back = NULL;
chand->destroyed = 1;
maybe_destroy_channel(channel);
gpr_mu_unlock(&chand->mu);
}
void grpc_child_channel_handle_op(grpc_child_channel *channel,
grpc_channel_op *op) {
grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op);
}
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent,
grpc_transport_op *initial_op) {
grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
grpc_call_element *lbelem;
lb_call_data *lbcalld;
lb_channel_data *lbchand;
grpc_call_stack_init(channel, NULL, initial_op, stk);
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
lbchand->active_calls++;
gpr_mu_unlock(&lbchand->mu);
return stk;
}
void grpc_child_call_destroy(grpc_child_call *call) {
grpc_call_element *lbelem = LINK_BACK_ELEM_FROM_CALL(call);
lb_call_data *calld = lbelem->call_data;
lb_channel_data *chand = lbelem->channel_data;
grpc_child_channel *channel = calld->channel;
grpc_call_stack_destroy(call);
gpr_free(call);
gpr_mu_lock(&chand->mu);
chand->active_calls--;
maybe_destroy_channel(channel);
gpr_mu_unlock(&chand->mu);
}
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) {
return LINK_BACK_ELEM_FROM_CALL(call);
}

@ -1,65 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H
#define GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H
#include "src/core/channel/channel_stack.h"
/* helper for filters that need to host child channel stacks... handles
lifetime and upwards propagation cleanly */
extern const grpc_channel_filter grpc_child_channel_top_filter;
typedef grpc_channel_stack grpc_child_channel;
typedef grpc_call_stack grpc_child_call;
/* filters[0] must be &grpc_child_channel_top_filter */
grpc_child_channel *grpc_child_channel_create(
grpc_channel_element *parent, const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
grpc_mdctx *metadata_context);
void grpc_child_channel_handle_op(grpc_child_channel *channel,
grpc_channel_op *op);
grpc_channel_element *grpc_child_channel_get_bottom_element(
grpc_child_channel *channel);
void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent,
grpc_transport_op *initial_op);
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
void grpc_child_call_destroy(grpc_child_call *call);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */

@ -36,7 +36,6 @@
#include <stdio.h>
#include "src/core/channel/channel_args.h"
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
@ -51,26 +50,25 @@
typedef struct call_data call_data;
typedef struct {
/* protects children, child_count, child_capacity, active_child,
transport_setup_initiated
does not protect channel stacks held by children
transport_setup is assumed to be set once during construction */
gpr_mu mu;
/* the sending child (may be null) */
grpc_child_channel *active_child;
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
grpc_resolver *resolver;
/** channel arguments for this channel
TODO(ctiller): still needed? */
grpc_channel_args *args;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
size_t waiting_child_count;
size_t waiting_child_capacity;
/** mutex protecting waiting list */
gpr_mu mu_waiting;
/** mutex protecting client configuration, resolution state */
gpr_mu mu_config;
/* transport setup for this channel */
grpc_transport_setup *transport_setup;
int transport_setup_initiated;
/** currently active load balancer - guarded by mu_config */
grpc_lb_policy *lb_policy;
grpc_channel_args *args;
/** incoming configuration - set by resolver.next
guarded by mu_config */
grpc_client_config *incoming_configuration;
} channel_data;
typedef enum {
@ -84,12 +82,14 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
gpr_mu mu_state;
call_state state;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
grpc_subchannel_call *subchannel_call;
} active;
grpc_transport_op waiting_op;
struct {
@ -99,6 +99,7 @@ struct call_data {
} s;
};
#if 0
static int prepare_activate(grpc_call_element *elem,
grpc_child_channel *on_child) {
call_data *calld = elem->call_data;
@ -150,6 +151,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
new_count == chand->waiting_child_count);
chand->waiting_child_count = new_count;
}
#endif
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_op *op) {
@ -183,15 +185,99 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
}
static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) {
abort();
}
static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
abort();
}
static void cc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_op waiting_op;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->s.active.subchannel_call;
grpc_subchannel_call_ref(subchannel_call);
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(subchannel_call, op);
grpc_subchannel_call_unref(subchannel_call);
break;
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
break;
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
calld->state = CALL_WAITING;
calld->s.waiting_op = *op;
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy) {
grpc_lb_policy_ref(lb_policy);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
pick_target(lb_policy, calld);
grpc_lb_policy_unref(lb_policy);
} else {
add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
}
}
break;
case CALL_WAITING:
if (op->cancel_with_status != GRPC_STATUS_OK) {
waiting_op = calld->s.waiting_op;
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, &waiting_op);
handle_op_after_cancellation(elem, op);
} else {
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
(op->send_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
(op->recv_ops == NULL));
if (op->send_ops) {
calld->s.waiting_op.send_ops = op->send_ops;
calld->s.waiting_op.is_last_send = op->is_last_send;
calld->s.waiting_op.on_done_send = op->on_done_send;
calld->s.waiting_op.send_user_data = op->send_user_data;
}
if (op->recv_ops) {
calld->s.waiting_op.recv_ops = op->recv_ops;
calld->s.waiting_op.recv_state = op->recv_state;
calld->s.waiting_op.on_done_recv = op->on_done_recv;
calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&calld->mu_state);
if (op->on_consumed) {
op->on_consumed(op->on_consumed_user_data, 0);
}
}
break;
}
#if 0
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
@ -285,6 +371,7 @@ static void cc_start_transport_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
break;
}
#endif
}
static void channel_op(grpc_channel_element *elem,
@ -536,11 +623,12 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
return result;
}
void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
grpc_transport_setup *setup) {
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
GPR_ASSERT(!chand->transport_setup);
chand->transport_setup = setup;
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
}

@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver.h"
/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
@ -48,8 +49,8 @@ extern const grpc_channel_filter grpc_client_channel_filter;
/* post-construction initializer to let the client channel know which
transport setup it should cancel upon destruction, or initiate when it needs
a connection */
void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
grpc_transport_setup *setup);
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
/* grpc_transport_setup_callback for binding new transports into a client
channel - user_data should be the channel stack containing the client

@ -1,302 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/client_setup.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
struct grpc_client_setup {
grpc_transport_setup base; /* must be first */
void (*initiate)(void *user_data, grpc_client_setup_request *request);
void (*done)(void *user_data);
void *user_data;
grpc_channel_args *args;
grpc_mdctx *mdctx;
grpc_alarm backoff_alarm;
gpr_timespec current_backoff_interval;
int in_alarm;
int in_cb;
int cancelled;
gpr_mu mu;
gpr_cv cv;
grpc_client_setup_request *active_request;
int refs;
/** The set of pollsets that are currently interested in this
connection being established */
grpc_pollset_set interested_parties;
};
struct grpc_client_setup_request {
/* pointer back to the setup object */
grpc_client_setup *setup;
gpr_timespec deadline;
};
gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
return r->deadline;
}
grpc_pollset_set *grpc_client_setup_get_interested_parties(
grpc_client_setup_request *r) {
return &r->setup->interested_parties;
}
static void destroy_setup(grpc_client_setup *s) {
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
s->done(s->user_data);
grpc_channel_args_destroy(s->args);
grpc_pollset_set_destroy(&s->interested_parties);
gpr_free(s);
}
static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); }
/* initiate handshaking */
static void setup_initiate(grpc_transport_setup *sp) {
grpc_client_setup *s = (grpc_client_setup *)sp;
grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
int in_alarm = 0;
r->setup = s;
r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->refs > 0);
/* there might be more than one request outstanding if the caller calls
initiate in some kind of rapid-fire way: we try to connect each time,
and keep track of the latest request (which is the only one that gets
to finish) */
if (!s->in_alarm) {
s->active_request = r;
s->refs++;
} else {
/* TODO(klempner): Maybe do something more clever here */
in_alarm = 1;
}
gpr_mu_unlock(&s->mu);
if (!in_alarm) {
s->initiate(s->user_data, r);
} else {
destroy_request(r);
}
}
/** implementation of add_interested_party for setup vtable */
static void setup_add_interested_party(grpc_transport_setup *sp,
grpc_pollset *pollset) {
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu);
}
/** implementation of del_interested_party for setup vtable */
static void setup_del_interested_party(grpc_transport_setup *sp,
grpc_pollset *pollset) {
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu);
}
/* cancel handshaking: cancel all requests, and shutdown (the caller promises
not to initiate again) */
static void setup_cancel(grpc_transport_setup *sp) {
grpc_client_setup *s = (grpc_client_setup *)sp;
int cancel_alarm = 0;
gpr_mu_lock(&s->mu);
s->cancelled = 1;
while (s->in_cb) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
GPR_ASSERT(s->refs > 0);
/* effectively cancels the current request (if any) */
s->active_request = NULL;
if (s->in_alarm) {
cancel_alarm = 1;
}
if (--s->refs == 0) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
} else {
gpr_mu_unlock(&s->mu);
}
if (cancel_alarm) {
grpc_alarm_cancel(&s->backoff_alarm);
}
}
int grpc_client_setup_cb_begin(grpc_client_setup_request *r,
const char *reason) {
gpr_mu_lock(&r->setup->mu);
if (r->setup->cancelled) {
gpr_mu_unlock(&r->setup->mu);
return 0;
}
r->setup->in_cb++;
gpr_mu_unlock(&r->setup->mu);
return 1;
}
void grpc_client_setup_cb_end(grpc_client_setup_request *r,
const char *reason) {
gpr_mu_lock(&r->setup->mu);
r->setup->in_cb--;
if (r->setup->cancelled) gpr_cv_signal(&r->setup->cv);
gpr_mu_unlock(&r->setup->mu);
}
/* vtable for transport setup */
static const grpc_transport_setup_vtable setup_vtable = {
setup_initiate, setup_add_interested_party, setup_del_interested_party,
setup_cancel};
void grpc_client_setup_create_and_attach(
grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
grpc_mdctx *mdctx,
void (*initiate)(void *user_data, grpc_client_setup_request *request),
void (*done)(void *user_data), void *user_data) {
grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup));
s->base.vtable = &setup_vtable;
gpr_mu_init(&s->mu);
gpr_cv_init(&s->cv);
s->refs = 1;
s->mdctx = mdctx;
s->initiate = initiate;
s->done = done;
s->user_data = user_data;
s->active_request = NULL;
s->args = grpc_channel_args_copy(args);
s->current_backoff_interval = gpr_time_from_micros(1000000);
s->in_alarm = 0;
s->in_cb = 0;
s->cancelled = 0;
grpc_pollset_set_init(&s->interested_parties);
grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
}
int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
const char *reason) {
int result;
if (gpr_time_cmp(gpr_now(), r->deadline) > 0) {
result = 0;
} else {
gpr_mu_lock(&r->setup->mu);
result = r->setup->active_request == r;
gpr_mu_unlock(&r->setup->mu);
}
return result;
}
static void backoff_alarm_done(void *arg /* grpc_client_setup_request */,
int success) {
grpc_client_setup_request *r = arg;
grpc_client_setup *s = r->setup;
/* Handle status cancelled? */
gpr_mu_lock(&s->mu);
s->in_alarm = 0;
if (s->active_request != NULL || !success) {
if (0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
destroy_request(r);
return;
} else {
gpr_mu_unlock(&s->mu);
destroy_request(r);
return;
}
}
s->active_request = r;
gpr_mu_unlock(&s->mu);
s->initiate(s->user_data, r);
}
void grpc_client_setup_request_finish(grpc_client_setup_request *r,
int was_successful) {
int retry = !was_successful;
grpc_client_setup *s = r->setup;
gpr_mu_lock(&s->mu);
if (s->active_request == r) {
s->active_request = NULL;
} else {
retry = 0;
}
if (!retry && 0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
destroy_request(r);
} else if (retry) {
/* TODO(klempner): Replace these values with further consideration. 2x is
probably too aggressive of a backoff. */
gpr_timespec max_backoff = gpr_time_from_minutes(2);
gpr_timespec now = gpr_now();
gpr_timespec deadline = gpr_time_add(s->current_backoff_interval, now);
GPR_ASSERT(!s->in_alarm);
s->in_alarm = 1;
grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, r, now);
s->current_backoff_interval =
gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
s->current_backoff_interval = max_backoff;
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
destroy_request(r);
}
}
const grpc_channel_args *grpc_client_setup_get_channel_args(
grpc_client_setup_request *r) {
return r->setup->args;
}
grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r) {
return r->setup->mdctx;
}

@ -1,77 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H
#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H
#include "src/core/channel/client_channel.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/time.h>
/* Convenience API's to simplify transport setup */
typedef struct grpc_client_setup grpc_client_setup;
typedef struct grpc_client_setup_request grpc_client_setup_request;
void grpc_client_setup_create_and_attach(
grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
grpc_mdctx *mdctx,
void (*initiate)(void *user_data, grpc_client_setup_request *request),
void (*done)(void *user_data), void *user_data);
/* Check that r is the active request: needs to be performed at each callback.
If this races, we'll have two connection attempts running at once and the
old one will get cleaned up in due course, which is fine. */
int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
const char *reason);
void grpc_client_setup_request_finish(grpc_client_setup_request *r,
int was_successful);
const grpc_channel_args *grpc_client_setup_get_channel_args(
grpc_client_setup_request *r);
/* Call before calling back into the setup listener, and call only if
this function returns 1. If it returns 1, also promise to call
grpc_client_setup_cb_end */
int grpc_client_setup_cb_begin(grpc_client_setup_request *r,
const char *reason);
void grpc_client_setup_cb_end(grpc_client_setup_request *r, const char *reason);
/* Get the deadline for a request passed in to initiate. Implementations should
make a best effort to honor this deadline. */
gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r);
grpc_pollset_set *grpc_client_setup_get_interested_parties(
grpc_client_setup_request *r);
grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H */

@ -56,6 +56,9 @@ struct grpc_lb_policy_vtable {
grpc_iomgr_closure *on_complete);
};
void grpc_lb_policy_ref(grpc_lb_policy *policy);
void grpc_lb_policy_unref(grpc_lb_policy *policy);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy);

@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/sockaddr.h"
#include "src/core/transport/transport.h"
@ -41,6 +42,7 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
/** Connectivity state of a channel.
TODO(ctiller): move to grpc.h when we implement the public
@ -61,6 +63,9 @@ typedef enum {
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel);
@ -71,8 +76,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify);
/** continue processing of transport operation \a op */
void grpc_subchannel_continue_op(grpc_subchannel *channel,
grpc_transport_op *op);
/** construct a call */
grpc_subchannel_call *grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_call_element *parent, grpc_transport_op *initial_op);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_op *op);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */

@ -44,11 +44,6 @@ grpc_channel *grpc_channel_create_from_filters(
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
/** Get a (borrowed) pointer to this channels subchannel factory (if it exists)
*/
grpc_subchannel_factory *grpc_channel_get_subchannel_factory(
grpc_channel *channel);
/** Get a (borrowed) pointer to the channel wide metadata context */
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);

@ -31,159 +31,16 @@
*
*/
#include "src/core/iomgr/sockaddr.h"
#include <grpc/grpc.h>
#include <stdlib.h>
#include <string.h>
#include "src/core/channel/census_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/subchannels/tcp_subchannel.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
typedef struct setup setup;
/* A single setup request (started via initiate) */
typedef struct {
grpc_client_setup_request *cs_request;
setup *setup;
/* Resolved addresses, or null if resolution not yet completed */
grpc_resolved_addresses *resolved;
/* which address in resolved should we pick for the next connection attempt */
size_t resolved_index;
} request;
/* Global setup logic (may be running many simultaneous setup requests, but
with only one 'active' */
struct setup {
const char *target;
grpc_transport_setup_callback setup_callback;
void *setup_user_data;
};
static int maybe_try_next_resolved(request *r);
static void done(request *r, int was_successful) {
grpc_client_setup_request_finish(r->cs_request, was_successful);
if (r->resolved) {
grpc_resolved_addresses_destroy(r->resolved);
}
gpr_free(r);
}
/* connection callback: tcp is either valid, or null on error */
static void on_connect(void *rp, grpc_endpoint *tcp) {
request *r = rp;
if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect")) {
if (tcp) {
grpc_endpoint_shutdown(tcp);
grpc_endpoint_destroy(tcp);
}
done(r, 0);
return;
}
if (!tcp) {
if (!maybe_try_next_resolved(r)) {
done(r, 0);
return;
} else {
return;
}
} else if (grpc_client_setup_cb_begin(r->cs_request, "on_connect")) {
grpc_create_chttp2_transport(
r->setup->setup_callback, r->setup->setup_user_data,
grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0,
grpc_client_setup_get_mdctx(r->cs_request), 1);
grpc_client_setup_cb_end(r->cs_request, "on_connect");
done(r, 1);
return;
} else {
done(r, 0);
}
}
/* attempt to connect to the next available resolved address */
static int maybe_try_next_resolved(request *r) {
grpc_resolved_address *addr;
if (!r->resolved) return 0;
if (r->resolved_index == r->resolved->naddrs) return 0;
addr = &r->resolved->addrs[r->resolved_index++];
grpc_tcp_client_connect(
on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request),
(struct sockaddr *)&addr->addr, addr->len,
grpc_client_setup_request_deadline(r->cs_request));
return 1;
}
/* callback for when our target address has been resolved */
static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
request *r = rp;
/* if we're not still the active request, abort */
if (!grpc_client_setup_request_should_continue(r->cs_request,
"on_resolved")) {
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
}
done(r, 0);
return;
}
if (!resolved) {
done(r, 0);
return;
} else {
r->resolved = resolved;
r->resolved_index = 0;
if (!maybe_try_next_resolved(r)) {
done(r, 0);
}
}
}
static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) {
request *r = gpr_malloc(sizeof(request));
r->setup = sp;
r->cs_request = cs_request;
r->resolved = NULL;
r->resolved_index = 0;
/* TODO(klempner): Make grpc_resolve_address respect deadline */
grpc_resolve_address(r->setup->target, "http", on_resolved, r);
}
static void done_setup(void *sp) {
setup *s = sp;
gpr_free((void *)s->target);
gpr_free(s);
}
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
}
/* Create a client channel:
Asynchronously: - resolve target
@ -191,11 +48,10 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
- perform handshakes */
grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args) {
setup *s = gpr_malloc(sizeof(setup));
grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
int n = 0;
filters[n++] = &grpc_client_surface_filter;
/* TODO(census)
@ -204,15 +60,14 @@ grpc_channel *grpc_channel_create(const char *target,
} */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
s->target = gpr_strdup(target);
s->setup_callback = complete_setup;
s->setup_user_data = grpc_channel_get_channel_stack(channel);
resolver = grpc_resolver_create(target, grpc_create_tcp_subchannel_factory());
if (!resolver) {
return NULL;
}
grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
args, mdctx, initiate_setup, done_setup,
s);
channel = grpc_channel_create_from_filters(filters, n, args, grpc_mdctx_create(), 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
return channel;
}

File diff suppressed because one or more lines are too long

@ -176,9 +176,7 @@
<ClInclude Include="..\..\src\core\channel\census_filter.h" />
<ClInclude Include="..\..\src\core\channel\channel_args.h" />
<ClInclude Include="..\..\src\core\channel\channel_stack.h" />
<ClInclude Include="..\..\src\core\channel\child_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\context.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
@ -320,12 +318,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_stack.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\child_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_setup.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">

@ -79,15 +79,9 @@
<ClCompile Include="..\..\src\core\channel\channel_stack.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\child_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_setup.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\connected_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -470,15 +464,9 @@
<ClInclude Include="..\..\src\core\channel\channel_stack.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\child_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\client_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\client_setup.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\connected_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -158,9 +158,7 @@
<ClInclude Include="..\..\src\core\channel\census_filter.h" />
<ClInclude Include="..\..\src\core\channel\channel_args.h" />
<ClInclude Include="..\..\src\core\channel\channel_stack.h" />
<ClInclude Include="..\..\src\core\channel\child_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\context.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
@ -258,12 +256,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_stack.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\child_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_setup.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">

@ -13,15 +13,9 @@
<ClCompile Include="..\..\src\core\channel\channel_stack.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\child_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_setup.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\connected_channel.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -353,15 +347,9 @@
<ClInclude Include="..\..\src\core\channel\channel_stack.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\child_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\client_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\client_setup.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\connected_channel.h">
<Filter>src\core\channel</Filter>
</ClInclude>

Loading…
Cancel
Save