Progress - need to add http filters

pull/2303/head
Craig Tiller 10 years ago
parent ff54c92adc
commit 04c5d4b8fd
  1. 11
      src/core/client_config/connector.c
  2. 38
      src/core/client_config/connector.h
  3. 41
      src/core/client_config/subchannel.c
  4. 24
      src/core/surface/channel_create.c
  5. 2
      src/core/surface/server.c

@ -42,10 +42,9 @@ void grpc_connector_unref(grpc_connector *connector) {
}
void grpc_connector_connect(
grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline,
channel_args, metadata_context, transport, notify);
grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}

@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/sockaddr.h"
#include "src/core/transport/transport.h"
@ -44,22 +45,43 @@ struct grpc_connector {
const grpc_connector_vtable *vtable;
};
typedef struct {
/** set of pollsets interested in this connection */
grpc_pollset_set *interested_parties;
/** address to connect to */
const struct sockaddr *addr;
int addr_len;
/** deadline for connection */
gpr_timespec deadline;
/** channel arguments (to be passed to transport) */
const grpc_channel_args *channel_args;
/** metadata context */
grpc_mdctx *metadata_context;
} grpc_connect_in_args;
typedef struct {
/** the connected transport */
grpc_transport *transport;
/** any additional filters (owned by the caller of connect) */
const grpc_channel_filter **filters;
size_t num_filters;
} grpc_connect_out_args;
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_connector *connector);
void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len,
gpr_timespec deadline, const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context, grpc_transport **transport,
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(
grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify);
grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
#endif

@ -74,7 +74,7 @@ struct grpc_subchannel {
grpc_mdctx *mdctx;
/** set during connection */
grpc_transport *connecting_transport;
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
grpc_iomgr_closure connected;
@ -101,7 +101,8 @@ struct grpc_subchannel_call {
gpr_refcount refs;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c);
@ -160,6 +161,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
static void start_connect(grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = &c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c);
args.channel_args = c->args;
args.metadata_context = c->mdctx;
grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
}
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
@ -187,9 +201,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_subchannel_ref(c);
gpr_mu_unlock(&c->mu);
grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
c->addr_len, compute_connect_deadline(c), c->args,
c->mdctx, &c->connecting_transport, &c->connected);
start_connect(c);
} else {
gpr_mu_unlock(&c->mu);
}
@ -232,9 +244,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
}
if (do_connect) {
grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len,
compute_connect_deadline(c), c->args, c->mdctx,
&c->connecting_transport, &c->connected);
start_connect(c);
}
}
@ -246,8 +256,8 @@ static void publish_transport(grpc_subchannel *c) {
gpr_ref_init(&con->refs, 1);
con->subchannel = c;
grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_transport);
c->connecting_transport = NULL;
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->active == NULL);
@ -262,7 +272,7 @@ static void publish_transport(grpc_subchannel *c) {
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_transport) {
if (c->connecting_result.transport) {
publish_transport(c);
} else {
grpc_subchannel_unref(c);
@ -330,6 +340,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
}
grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
abort();
return NULL;
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
gpr_ref_init(&call->refs, 1);
grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
return call;
}

@ -49,10 +49,9 @@ typedef struct {
grpc_connector base;
gpr_refcount refs;
grpc_transport **transport;
grpc_iomgr_closure *notify;
const grpc_channel_args *args;
grpc_mdctx *mdctx;
grpc_connect_in_args args;
grpc_connect_out_args *result;
} connector;
static void connector_ref(grpc_connector *con) {
@ -71,10 +70,10 @@ static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
*c->transport =
grpc_create_chttp2_transport(c->args, tcp, NULL, 0, c->mdctx, 1);
c->result->transport =
grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
} else {
*c->transport = NULL;
c->result->transport = NULL;
}
notify = c->notify;
c->notify = NULL;
@ -82,18 +81,15 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
static void connector_connect(
grpc_connector *con, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify) {
grpc_connector *con, const grpc_connect_in_args *args,
grpc_connect_out_args *result, grpc_iomgr_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = channel_args;
c->mdctx = metadata_context;
c->transport = transport;
grpc_tcp_client_connect(connected, c, pollset_set, addr, addr_len, deadline);
c->args = *args;
c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect};

@ -1229,6 +1229,8 @@ static void begin_call(grpc_server *server, call_data *calld,
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
GPR_ASSERT(calld->host != NULL);
GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,

Loading…
Cancel
Save