Starting to convert code to work queues

pull/3423/head
Craig Tiller 9 years ago
parent 73b6606629
commit 8664ca6463
  1. 3
      src/core/channel/client_channel.c
  2. 30
      src/core/iomgr/iomgr.c
  3. 9
      src/core/iomgr/iomgr.h
  4. 3
      src/core/iomgr/iomgr_internal.h
  5. 3
      src/core/iomgr/pollset_posix.c
  6. 13
      src/core/security/credentials.c
  7. 3
      src/core/surface/channel.h
  8. 4
      src/core/surface/secure_channel_create.c

@ -600,7 +600,8 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (on_consumed) {
grpc_iomgr_add_callback(on_consumed);
grpc_workqueue_push(grpc_channel_get_workqueue(chand->master), on_consumed,
1);
}
}

@ -46,39 +46,9 @@
static gpr_mu g_mu;
static gpr_cv g_rcv;
static grpc_iomgr_closure *g_cbs_head = NULL;
static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;
/* Execute followup callbacks continuously.
Other threads may check in and help during pollset_work() */
static void background_callback_executor(void *ignored) {
gpr_mu_lock(&g_mu);
while (!g_shutdown) {
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC),
&deadline)) {
} else {
gpr_mu_unlock(&g_mu);
gpr_sleep_until(gpr_time_min(short_deadline, deadline));
gpr_mu_lock(&g_mu);
}
}
gpr_mu_unlock(&g_mu);
gpr_event_set(&g_background_callback_executor_done, (void *)1);
}
void grpc_kick_poller(void) {
/* Empty. The background callback executor polls periodically. The activity
* the kicker is trying to draw the executor's attention to will be picked up

@ -68,13 +68,4 @@ void grpc_iomgr_init(void);
/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);
/** Registers a closure to be invoked at some point in the future.
*
* Can be called from within a callback or from anywhere else */
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
/** As per grpc_iomgr_add_callback, with the ability to set the success
argument. */
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */

@ -43,9 +43,6 @@ typedef struct grpc_iomgr_object {
struct grpc_iomgr_object *prev;
} grpc_iomgr_object;
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);

@ -178,9 +178,6 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
goto done;
}

@ -47,6 +47,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
/* -- Common. -- */
@ -54,7 +55,6 @@
struct grpc_credentials_metadata_request {
grpc_credentials *creds;
grpc_credentials_metadata_cb cb;
grpc_iomgr_closure *on_simulated_token_fetch_done_closure;
void *user_data;
};
@ -66,8 +66,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
gpr_malloc(sizeof(grpc_credentials_metadata_request));
r->creds = grpc_credentials_ref(creds);
r->cb = cb;
r->on_simulated_token_fetch_done_closure =
gpr_malloc(sizeof(grpc_iomgr_closure));
r->user_data = user_data;
return r;
}
@ -75,7 +73,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
static void grpc_credentials_metadata_request_destroy(
grpc_credentials_metadata_request *r) {
grpc_credentials_unref(r->creds);
gpr_free(r->on_simulated_token_fetch_done_closure);
gpr_free(r);
}
@ -746,11 +743,10 @@ static int md_only_test_has_request_metadata_only(
return 1;
}
void on_simulated_token_fetch_done(void *user_data, int success) {
void on_simulated_token_fetch_done(void *user_data) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
GPR_ASSERT(success);
r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries,
GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
@ -764,11 +760,10 @@ static void md_only_test_get_request_metadata(grpc_credentials *creds,
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
if (c->is_async) {
gpr_thd_id thd_id;
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure,
on_simulated_token_fetch_done, cb_arg);
grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure);
gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL);
} else {
cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}

@ -36,6 +36,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/subchannel_factory.h"
#include "src/core/iomgr/workqueue.h"
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count,
@ -61,6 +62,8 @@ grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel);
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);

@ -95,7 +95,7 @@ static void on_secure_transport_setup_done(void *arg,
}
notify = c->notify;
c->notify = NULL;
grpc_iomgr_add_callback(notify);
notify->cb(notify->cb_arg, 1);
}
static void connected(void *arg, grpc_endpoint *tcp) {
@ -108,7 +108,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
grpc_iomgr_add_callback(notify);
notify->cb(notify->cb_arg, 1);
}
}

Loading…
Cancel
Save