Add an implementation firewall against pollset_set

So multiple implementations can exist in one binary
pull/5386/head
Craig Tiller 9 years ago
parent ee1f1f347d
commit c46beaaa29
  1. 45
      src/core/channel/client_channel.c
  2. 31
      src/core/client_config/lb_policies/pick_first.c
  3. 25
      src/core/client_config/lb_policies/round_robin.c
  4. 4
      src/core/client_config/lb_policy.c
  5. 3
      src/core/client_config/lb_policy.h
  6. 30
      src/core/client_config/subchannel.c
  7. 22
      src/core/httpcli/httpcli.c
  8. 3
      src/core/httpcli/httpcli.h
  9. 10
      src/core/iomgr/pollset_set.h
  10. 23
      src/core/iomgr/pollset_set_posix.c
  11. 17
      src/core/iomgr/pollset_set_posix.h
  12. 4
      src/core/iomgr/pollset_set_windows.c
  13. 2
      src/core/iomgr/pollset_set_windows.h
  14. 12
      src/core/iomgr/tcp_client_posix.c
  15. 5
      src/core/iomgr/tcp_posix.c

@ -78,8 +78,8 @@ typedef struct client_channel_channel_data {
int exit_idle_when_lb_policy_arrives;
/** owning stack */
grpc_channel_stack *owning_stack;
/** interested parties */
grpc_pollset_set interested_parties;
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
&chand->interested_parties);
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
}
gpr_mu_lock(&chand->mu_config);
@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
&old_lb_policy->interested_parties,
&chand->interested_parties);
grpc_pollset_set_del_pollset_set(
exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(op->set_accept_stream == NULL);
if (op->bind_pollset != NULL) {
grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
op->bind_pollset);
}
@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
&chand->lb_policy->interested_parties,
&chand->interested_parties);
chand->lb_policy->interested_parties,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
grpc_pollset_set_init(&chand->interested_parties);
chand->interested_parties = grpc_pollset_set_create();
}
/* Destructor for channel_data */
@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
&chand->lb_policy->interested_parties,
&chand->interested_parties);
chand->lb_policy->interested_parties,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(&chand->interested_parties);
grpc_pollset_set_destroy(chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@ -441,9 +440,17 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
cc_start_transport_stream_op,
cc_start_transport_op,
sizeof(call_data),
init_call_elem,
cc_set_pollset,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
cc_get_peer,
"client-channel",
};
void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
@ -501,7 +508,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
bool iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
@ -517,7 +524,7 @@ void grpc_client_channel_watch_connectivity_state(
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");

@ -31,8 +31,8 @@
*
*/
#include "src/core/client_config/lb_policy_factory.h"
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/client_config/lb_policy_factory.h"
#include <string.h>
@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties, &p->checking_connectivity,
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
pollset);
grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, &p->base.interested_parties,
exec_ctx, selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, &p->base.interested_parties,
exec_ctx, selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties, &p->checking_connectivity,
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} else {
goto loop;
@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties, &p->checking_connectivity,
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
@ -379,8 +378,14 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle,
pf_check_connectivity, pf_notify_on_state_change};
pf_destroy,
pf_shutdown,
pf_pick,
pf_cancel_pick,
pf_ping_one,
pf_exit_idle,
pf_check_connectivity,
pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}

@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
subchannel_data *sd = p->subchannels[i];
sd->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, &p->base.interested_parties,
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
}
@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
pollset);
grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, &p->base.interested_parties,
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_CONNECTING:
@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
sd->connectivity_state,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, &p->base.interested_parties,
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* renew state notification */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, &p->base.interested_parties,
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
/* remove from ready list if still present */
@ -484,8 +483,14 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle,
rr_check_connectivity, rr_notify_on_state_change};
rr_destroy,
rr_shutdown,
rr_pick,
rr_cancel_pick,
rr_ping_one,
rr_exit_idle,
rr_check_connectivity,
rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}

@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
grpc_pollset_set_init(&policy->interested_parties);
policy->interested_parties = grpc_pollset_set_create();
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_val =
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
grpc_pollset_set_destroy(&policy->interested_parties);
grpc_pollset_set_destroy(policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}

@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_atm ref_pair;
grpc_pollset_set interested_parties;
/* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set *interested_parties;
};
struct grpc_lb_policy_vtable {

@ -108,7 +108,7 @@ struct grpc_subchannel {
/** pollset_set tracking who's interested in a connection
being setup */
grpc_pollset_set pollset_set;
grpc_pollset_set *pollset_set;
/** active connection, or null; of type grpc_connected_subchannel */
gpr_atm connected_subchannel;
@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_connected_subchannel_ref(
grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
}
@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_slice_unref(c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
grpc_pollset_set_destroy(&c->pollset_set);
grpc_pollset_set_destroy(c->pollset_set);
grpc_subchannel_key_destroy(exec_ctx, c->key);
gpr_free(c);
}
@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
return old_val;
}
grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *grpc_subchannel_ref(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF"));
@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
return c;
}
grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *grpc_subchannel_weak_ref(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0);
@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
grpc_pollset_set_init(&c->pollset_set);
c->pollset_set = grpc_pollset_set_create();
c->addr_len = args->addr_len;
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
&c->initial_connect_string);
@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = &c->pollset_set;
args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c);
@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu);
@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change(
w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
if (interested_parties != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
interested_parties);
}
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state,
exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
/* signal completion */
@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
void grpc_subchannel_call_ref(grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_subchannel_call_ref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}

@ -31,20 +31,20 @@
*
*/
#include "src/core/iomgr/sockaddr.h"
#include "src/core/httpcli/httpcli.h"
#include "src/core/iomgr/sockaddr.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/httpcli/format_request.h"
#include "src/core/httpcli/parser.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/httpcli/format_request.h"
#include "src/core/httpcli/parser.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
typedef struct {
gpr_slice request_text;
@ -84,18 +84,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
plaintext_handshake};
void grpc_httpcli_context_init(grpc_httpcli_context *context) {
grpc_pollset_set_init(&context->pollset_set);
context->pollset_set = grpc_pollset_set_create();
}
void grpc_httpcli_context_destroy(grpc_httpcli_context *context) {
grpc_pollset_set_destroy(&context->pollset_set);
grpc_pollset_set_destroy(context->pollset_set);
}
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
int success) {
grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set,
grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
@ -197,7 +197,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) {
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
grpc_tcp_client_connect(
exec_ctx, &req->connected, &req->ep, &req->context->pollset_set,
exec_ctx, &req->connected, &req->ep, req->context->pollset_set,
(struct sockaddr *)&addr->addr, addr->len, req->deadline);
}
@ -237,7 +237,7 @@ static void internal_request_begin(
req->host = gpr_strdup(request->host);
req->ssl_host_override = gpr_strdup(request->ssl_host_override);
grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set,
grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req);

@ -39,6 +39,7 @@
#include <grpc/support/time.h>
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_set.h"
/* User agent this library reports */
@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header {
TODO(ctiller): allow caching and capturing multiple requests for the
same content and combining them */
typedef struct grpc_httpcli_context {
grpc_pollset_set pollset_set;
grpc_pollset_set *pollset_set;
} grpc_httpcli_context;
typedef struct {

@ -41,15 +41,9 @@
fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_set_posix.h"
#endif
typedef struct grpc_pollset_set grpc_pollset_set;
#ifdef GPR_WIN32
#include "src/core/iomgr/pollset_set_windows.h"
#endif
void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
grpc_pollset_set *grpc_pollset_set_create(void);
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set,

@ -42,11 +42,29 @@
#include <grpc/support/useful.h>
#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/iomgr/pollset_set_posix.h"
void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
struct grpc_pollset_set {
gpr_mu mu;
size_t pollset_count;
size_t pollset_capacity;
grpc_pollset **pollsets;
size_t pollset_set_count;
size_t pollset_set_capacity;
struct grpc_pollset_set **pollset_sets;
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
};
grpc_pollset_set *grpc_pollset_set_create(void) {
grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
memset(pollset_set, 0, sizeof(*pollset_set));
gpr_mu_init(&pollset_set->mu);
return pollset_set;
}
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
@ -58,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
gpr_free(pollset_set->fds);
gpr_free(pollset_set);
}
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,

@ -35,22 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H
#include "src/core/iomgr/fd_posix.h"
typedef struct grpc_pollset_set {
gpr_mu mu;
size_t pollset_count;
size_t pollset_capacity;
grpc_pollset **pollsets;
size_t pollset_set_count;
size_t pollset_set_capacity;
struct grpc_pollset_set **pollset_sets;
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
} grpc_pollset_set;
#include "src/core/iomgr/pollset_set.h"
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd);

@ -35,9 +35,9 @@
#ifdef GPR_WINSOCK_SOCKET
#include "src/core/iomgr/pollset_set.h"
#include "src/core/iomgr/pollset_set_windows.h"
void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {}
grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; }
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}

@ -34,6 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set;
#include "src/core/iomgr/pollset_set.h"
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */

@ -42,17 +42,19 @@
#include <string.h>
#include <unistd.h>
#include "src/core/iomgr/timer.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/iomgr/iomgr_posix.h"
#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
extern int grpc_tcp_trace;

@ -53,6 +53,7 @@
#include "src/core/debug/trace.h"
#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
@ -296,7 +297,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
unwind_slice_idx = tcp->outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
iov_size != MAX_WRITE_IOVEC;
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GPR_SLICE_START_PTR(
@ -445,7 +446,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,

Loading…
Cancel
Save