Converting code

pull/11888/head
Craig Tiller 8 years ago
parent 909a984a92
commit dc3998e710
  1. 3
      src/core/ext/filters/client_channel/channel_connectivity.c
  2. 3
      src/core/ext/filters/client_channel/subchannel.c
  3. 4
      src/core/ext/transport/chttp2/server/chttp2_server.c
  4. 112
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  5. 14
      src/core/ext/transport/chttp2/transport/frame_ping.c
  6. 12
      src/core/ext/transport/chttp2/transport/internal.h
  7. 27
      src/core/ext/transport/chttp2/transport/writing.c
  8. 6
      src/core/lib/channel/handshaker.c
  9. 2
      src/core/lib/channel/handshaker.h
  10. 13
      src/core/lib/http/httpcli.c
  11. 12
      src/core/lib/http/httpcli.h
  12. 2
      src/core/lib/http/httpcli_security_connector.c
  13. 37
      src/core/lib/iomgr/ev_poll_posix.c
  14. 6
      src/core/lib/iomgr/ev_posix.c
  15. 4
      src/core/lib/iomgr/ev_posix.h
  16. 35
      src/core/lib/iomgr/exec_ctx.c
  17. 23
      src/core/lib/iomgr/exec_ctx.h
  18. 8
      src/core/lib/iomgr/iomgr.c
  19. 4
      src/core/lib/iomgr/pollset.h
  20. 2
      src/core/lib/iomgr/tcp_client.h
  21. 10
      src/core/lib/iomgr/tcp_client_posix.c
  22. 8
      src/core/lib/iomgr/timer.h
  23. 114
      src/core/lib/iomgr/timer_generic.c
  24. 17
      src/core/lib/iomgr/timer_manager.c
  25. 7
      src/core/lib/security/credentials/google_default/google_default_credentials.c
  26. 12
      src/core/lib/security/credentials/jwt/jwt_verifier.c
  27. 2
      src/core/lib/security/credentials/jwt/jwt_verifier.h
  28. 30
      src/core/lib/security/credentials/oauth2/oauth2_credentials.c
  29. 6
      src/core/lib/security/credentials/oauth2/oauth2_credentials.h
  30. 4
      src/core/lib/surface/alarm.c
  31. 45
      src/core/lib/surface/completion_queue.c
  32. 10
      test/core/end2end/fixtures/http_proxy_fixture.c
  33. 14
      test/core/iomgr/endpoint_tests.c
  34. 3
      test/core/security/oauth2_utils.c
  35. 14
      test/core/util/port_server_client.c

@ -210,8 +210,7 @@ void grpc_channel_watch_connectivity_state(
w->channel = channel;
grpc_timer_init(&exec_ctx, &w->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timespec_to_millis(&exec_ctx, deadline), &w->on_timeout);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");

@ -131,7 +131,7 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** next connect attempt time */
gpr_timespec next_attempt;
grpc_millis next_attempt;
/** backoff state */
gpr_backoff backoff_state;
/** do we have an active alarm? */
@ -486,7 +486,6 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
c->connecting = true;
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!c->backoff_begun) {
c->backoff_begun = true;
c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);

@ -146,8 +146,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
connection_state->handshake_mgr);
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
const grpc_millis deadline =
grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC;
grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
tcp, state->args, deadline, acceptor,
on_handshake_done, connection_state);

@ -370,36 +370,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy = (grpc_chttp2_repeated_ping_policy){
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
.min_time_between_pings =
gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
.min_time_between_pings = DEFAULT_MIN_TIME_BETWEEN_PINGS_MS,
.max_ping_strikes = DEFAULT_MAX_PING_STRIKES,
.min_ping_interval_without_data = gpr_time_from_millis(
DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN),
.min_ping_interval_without_data =
DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS,
};
/* Keepalive setting */
if (t->is_client) {
t->keepalive_time =
g_default_client_keepalive_time_ms == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(g_default_client_keepalive_time_ms,
GPR_TIMESPAN);
t->keepalive_timeout =
g_default_client_keepalive_timeout_ms == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(g_default_client_keepalive_timeout_ms,
GPR_TIMESPAN);
t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_client_keepalive_time_ms;
t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_client_keepalive_timeout_ms;
} else {
t->keepalive_time =
g_default_server_keepalive_time_ms == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(g_default_server_keepalive_time_ms,
GPR_TIMESPAN);
t->keepalive_timeout =
g_default_server_keepalive_timeout_ms == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(g_default_server_keepalive_timeout_ms,
GPR_TIMESPAN);
t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_server_keepalive_time_ms;
t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_server_keepalive_timeout_ms;
}
t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
@ -440,21 +431,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
t->ping_policy.min_time_between_pings = gpr_time_from_millis(
grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
INT_MAX}),
GPR_TIMESPAN);
t->ping_policy.min_time_between_pings = grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
INT_MAX});
} else if (0 ==
strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) {
t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis(
t->ping_policy.min_ping_interval_without_data =
grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){
DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}),
GPR_TIMESPAN);
DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
@ -472,9 +460,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
? g_default_client_keepalive_time_ms
: g_default_server_keepalive_time_ms,
1, INT_MAX});
t->keepalive_time = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(value, GPR_TIMESPAN);
t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
@ -483,9 +469,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
? g_default_client_keepalive_timeout_ms
: g_default_server_keepalive_timeout_ms,
0, INT_MAX});
t->keepalive_timeout = value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(value, GPR_TIMESPAN);
t->keepalive_timeout =
value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
t->keepalive_permit_without_calls =
@ -548,17 +533,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.last_ping_recv_time = 0;
t->ping_recv_state.ping_strikes = 0;
/* Start keepalive pings */
if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) {
if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
} else {
/* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
inflight keeaplive timers */
@ -1005,14 +989,12 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR,
"Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
"data equal to \"too_many_pings\"");
double current_keepalive_time_ms =
gpr_timespec_to_micros(t->keepalive_time) / 1000;
double current_keepalive_time_ms = (double)t->keepalive_time;
t->keepalive_time =
current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis((int64_t)(current_keepalive_time_ms *
KEEPALIVE_TIME_BACKOFF_MULTIPLIER),
GPR_TIMESPAN);
? GRPC_MILLIS_INF_FUTURE
: (grpc_millis)(current_keepalive_time_ms *
KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
}
/* lie: use transient failure from the transport to indicate goaway has been
@ -2391,18 +2373,16 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
&t->finish_keepalive_ping_locked);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
} else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping");
}
@ -2411,10 +2391,9 @@ static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(
exec_ctx, &t->keepalive_watchdog_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout),
&t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer,
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->keepalive_watchdog_fired_locked);
}
static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -2425,10 +2404,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");

@ -104,10 +104,9 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
if (!t->is_client) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_allowed_ping =
gpr_time_add(t->ping_recv_state.last_ping_recv_time,
t->ping_policy.min_ping_interval_without_data);
grpc_millis next_allowed_ping =
t->ping_recv_state.last_ping_recv_time +
t->ping_policy.min_ping_interval_without_data;
if (t->keepalive_permit_without_calls == 0 &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@ -115,15 +114,14 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
no less than two hours. When there is no outstanding streams, we
restrict the number of PINGS equivalent to TCP Keep-Alive. */
next_allowed_ping =
gpr_time_add(t->ping_recv_state.last_ping_recv_time,
gpr_time_from_seconds(7200, GPR_TIMESPAN));
t->ping_recv_state.last_ping_recv_time + 7200 * GPR_MS_PER_SEC;
}
if (gpr_time_cmp(next_allowed_ping, now) > 0) {
if (next_allowed_ping > grpc_exec_ctx_now(exec_ctx)) {
grpc_chttp2_add_ping_strike(exec_ctx, t);
}
t->ping_recv_state.last_ping_recv_time = now;
t->ping_recv_state.last_ping_recv_time = grpc_exec_ctx_now(exec_ctx);
}
if (!g_disable_ping_ack) {
if (t->ping_ack_count == t->ping_ack_capacity) {

@ -95,21 +95,21 @@ typedef struct {
} grpc_chttp2_ping_queue;
typedef struct {
gpr_timespec min_time_between_pings;
grpc_millis min_time_between_pings;
int max_pings_without_data;
int max_ping_strikes;
gpr_timespec min_ping_interval_without_data;
grpc_millis min_ping_interval_without_data;
} grpc_chttp2_repeated_ping_policy;
typedef struct {
gpr_timespec last_ping_sent_time;
grpc_millis last_ping_sent_time;
int pings_before_data_required;
grpc_timer delayed_ping_timer;
bool is_delayed_ping_timer_set;
} grpc_chttp2_repeated_ping_state;
typedef struct {
gpr_timespec last_ping_recv_time;
grpc_millis last_ping_recv_time;
int ping_strikes;
} grpc_chttp2_server_ping_recv_state;
@ -414,9 +414,9 @@ struct grpc_chttp2_transport {
/** watchdog to kill the transport when waiting for the keepalive ping */
grpc_timer keepalive_watchdog_timer;
/** time duration in between pings */
gpr_timespec keepalive_time;
grpc_millis keepalive_time;
/** grace period for a ping to complete before watchdog kicks in */
gpr_timespec keepalive_timeout;
grpc_millis keepalive_timeout;
/** if keepalive pings are allowed when there's no outstanding streams */
bool keepalive_permit_without_calls;
/** keep-alive state machine state */

@ -91,12 +91,12 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
}
return;
}
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time);
grpc_millis elapsed =
grpc_exec_ctx_now(exec_ctx) - t->ping_state.last_ping_sent_time;
/*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec,
elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec,
(int)t->ping_policy.min_time_between_pings.tv_nsec);*/
if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) {
if (elapsed < t->ping_policy.min_time_between_pings) {
/* not enough elapsed time between successive pings */
if (GRPC_TRACER_ON(grpc_http_trace) ||
GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
@ -107,10 +107,9 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
if (!t->ping_state.is_delayed_ping_timer_set) {
t->ping_state.is_delayed_ping_timer_set = true;
grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer,
gpr_time_add(t->ping_state.last_ping_sent_time,
t->ping_policy.min_time_between_pings),
&t->retry_initiate_ping_locked,
gpr_now(GPR_CLOCK_MONOTONIC));
t->ping_state.last_ping_sent_time +
t->ping_policy.min_time_between_pings,
&t->retry_initiate_ping_locked);
}
return;
}
@ -131,7 +130,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_ping_create(false, pq->inflight_id));
t->ping_state.last_ping_sent_time = now;
t->ping_state.last_ping_sent_time = grpc_exec_ctx_now(exec_ctx);
t->ping_state.pings_before_data_required -=
(t->ping_state.pings_before_data_required != 0);
}
@ -255,8 +254,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.last_ping_recv_time = 0;
t->ping_recv_state.ping_strikes = 0;
}
}
@ -269,8 +267,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.last_ping_recv_time = 0;
t->ping_recv_state.ping_strikes = 0;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
@ -306,8 +303,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.last_ping_recv_time = 0;
t->ping_recv_state.ping_strikes = 0;
}
if (is_last_frame) {
@ -396,8 +392,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.last_ping_recv_time = 0;
t->ping_recv_state.ping_strikes = 0;
}
}

@ -246,7 +246,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data) {
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
@ -268,9 +268,7 @@ void grpc_handshake_manager_do_handshake(
gpr_ref(&mgr->refs);
grpc_closure_init(&mgr->on_timeout, on_timeout, mgr,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout);
// Start first handshaker, which also owns a ref.
gpr_ref(&mgr->refs);
bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);

@ -160,7 +160,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data);
/// Add \a mgr to the server side list of all pending handshake managers, the

@ -58,7 +58,7 @@ typedef struct {
grpc_endpoint *ep;
char *host;
char *ssl_host_override;
gpr_timespec deadline;
grpc_millis deadline;
int have_read_byte;
const grpc_httpcli_handshaker *handshaker;
grpc_closure *on_done;
@ -79,7 +79,7 @@ static grpc_httpcli_post_override g_post_override = NULL;
static void plaintext_handshake(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint, const char *host,
gpr_timespec deadline,
grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_endpoint *endpoint)) {
@ -256,7 +256,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_done,
grpc_millis deadline, grpc_closure *on_done,
grpc_httpcli_response *response,
const char *name, grpc_slice request_text) {
internal_request *req = gpr_malloc(sizeof(internal_request));
@ -293,9 +293,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_done,
grpc_httpcli_response *response) {
const grpc_httpcli_request *request, grpc_millis deadline,
grpc_closure *on_done, grpc_httpcli_response *response) {
char *name;
if (g_get_override &&
g_get_override(exec_ctx, request, deadline, on_done, response)) {
@ -313,7 +312,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline, grpc_closure *on_done,
grpc_millis deadline, grpc_closure *on_done,
grpc_httpcli_response *response) {
char *name;
if (g_post_override &&

@ -57,7 +57,7 @@ typedef struct grpc_httpcli_context {
typedef struct {
const char *default_port;
void (*handshake)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint,
const char *host, gpr_timespec deadline,
const char *host, grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint));
} grpc_httpcli_handshaker;
@ -98,8 +98,8 @@ void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx,
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_complete,
const grpc_httpcli_request *request, grpc_millis deadline,
grpc_closure *on_complete,
grpc_httpcli_response *response);
/* Asynchronously perform a HTTP POST.
@ -121,18 +121,18 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_resource_quota *resource_quota,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline, grpc_closure *on_complete,
grpc_millis deadline, grpc_closure *on_complete,
grpc_httpcli_response *response);
/* override functions return 1 if they handled the request, 0 otherwise */
typedef int (*grpc_httpcli_get_override)(grpc_exec_ctx *exec_ctx,
const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_millis deadline,
grpc_closure *on_complete,
grpc_httpcli_response *response);
typedef int (*grpc_httpcli_post_override)(
grpc_exec_ctx *exec_ctx, const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size, gpr_timespec deadline,
const char *body_bytes, size_t body_size, grpc_millis deadline,
grpc_closure *on_complete, grpc_httpcli_response *response);
void grpc_httpcli_set_override(grpc_httpcli_get_override get,

@ -167,7 +167,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *tcp, const char *host,
gpr_timespec deadline,
grpc_millis deadline,
void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *endpoint)) {
grpc_channel_security_connector *sc = NULL;

@ -39,6 +39,7 @@
#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <poll.h>
#include <string.h>
#include <sys/socket.h>
@ -215,8 +216,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now);
static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
grpc_millis deadline);
/* Allow kick to wakeup the currently polling worker */
#define GRPC_POLLSET_CAN_KICK_SELF 1
@ -864,7 +865,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) {
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_millis deadline) {
grpc_pollset_worker worker;
if (worker_hdl) *worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
@ -932,7 +933,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd_watcher *watchers;
struct pollfd *pfds;
timeout = poll_deadline_to_millis_timeout(deadline, now);
timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (pollset->fd_count + 2 <= inline_elements) {
pfds = pollfd_space;
@ -1042,13 +1043,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (queued_work || worker.kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
deadline = 0;
}
keep_polling = 1;
}
if (keep_polling) {
now = gpr_now(now.clock_type);
}
}
gpr_tls_set(&g_current_thread_poller, 0);
if (added_worker) {
@ -1100,21 +1098,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int64_t max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
max_spin_polling_us,
GPR_TIMESPAN))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
grpc_millis deadline) {
if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
if (deadline == 0) return 0;
grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx);
if (n < 0) return 0;
if (n > INT_MAX) return -1;
return (int)n;
}
/*******************************************************************************

@ -218,9 +218,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline) {
return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
grpc_pollset_worker **worker,
grpc_millis deadline) {
return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline);
}
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,

@ -69,8 +69,8 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
grpc_pollset_worker **worker,
grpc_millis deadline);
grpc_error *(*pollset_kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,

@ -113,9 +113,42 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_global_init(void) {}
static gpr_timespec g_start_time;
void grpc_exec_ctx_global_init(void) {
g_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
void grpc_exec_ctx_global_shutdown(void) {}
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
ts = gpr_time_sub(ts, g_start_time);
double x =
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
}
grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) {
if (!exec_ctx->now_is_valid) {
exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC));
exec_ctx->now_is_valid = true;
}
return exec_ctx->now;
}
void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) {
exec_ctx->now_is_valid = false;
}
gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx,
grpc_millis millis,
gpr_clock_type clock_type) {
return gpr_time_add(gpr_convert_clock_type(g_start_time, clock_type),
gpr_time_from_millis(millis, GPR_TIMESPAN));
}
static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
exec_ctx_run, exec_ctx_sched, "exec_ctx"};
static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};

@ -34,9 +34,13 @@
#ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
#define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
#include <grpc/support/atm.h>
#include "src/core/lib/iomgr/closure.h"
/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */
typedef gpr_atm grpc_millis;
#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
@ -79,12 +83,18 @@ struct grpc_exec_ctx {
uintptr_t flags;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
bool now_is_valid;
grpc_millis now;
};
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check }
#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \
{ \
GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check, \
false, 0 \
}
/* initialize an execution context at the top level of an API call into grpc
(this is safe to use elsewhere, though possibly not as efficient) */
@ -116,4 +126,11 @@ void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_shutdown(void);
grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx);
void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx);
gpr_timespec grpc_millis_to_timespec(grpc_exec_ctx *exec_ctx,
grpc_millis millis, gpr_clock_type clock);
grpc_millis grpc_timespec_to_millis(grpc_exec_ctx *exec_ctx,
gpr_timespec timespec);
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */

@ -61,7 +61,9 @@ void grpc_iomgr_init(void) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_exec_ctx_global_init();
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_timer_list_init(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_network_status_init();
@ -107,7 +109,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
exec_ctx->now_is_valid = true;
exec_ctx->now = GRPC_MILLIS_INF_FUTURE;
if (grpc_timer_check(exec_ctx, NULL)) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();

@ -82,8 +82,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
pollset
lock */
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline) GRPC_MUST_USE_RESULT;
grpc_pollset_worker **worker,
grpc_millis deadline) GRPC_MUST_USE_RESULT;
/* Break one polling thread out of polling work for this pollset.
If specific_worker is non-NULL, then kick that worker. */

@ -50,6 +50,6 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline);
grpc_millis deadline);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */

@ -257,7 +257,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
grpc_millis deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@ -337,9 +337,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&ac->mu);
grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm);
grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
@ -353,14 +351,14 @@ void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) = tcp_client_connect_impl;
grpc_millis deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
grpc_millis deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
channel_args, addr, deadline);
}

@ -56,8 +56,7 @@ typedef struct grpc_timer grpc_timer;
application callback is also responsible for maintaining information about
when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now);
grpc_millis deadline, grpc_closure *closure);
/* Note that there is no timer destroy function. This is because the
timer is a one-time occurrence with a guarantee that the callback will
@ -96,9 +95,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, grpc_millis *next);
void grpc_timer_list_init(grpc_exec_ctx *exec_ctx);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);
/* Consume a kick issued by grpc_kick_poller */

@ -86,11 +86,9 @@ struct shared_mutables {
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_shared_mutables.mu */
static shard_type *g_shard_queue[NUM_SHARDS];
static gpr_timespec g_start_time;
GPR_TLS_DECL(g_last_seen_min_timer);
@ -104,51 +102,18 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error);
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = (int64_t)d;
ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec));
ts.clock_type = GPR_TIMESPAN;
return ts;
}
static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
ts = gpr_time_sub(ts, g_start_time);
double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
(double)ts.tv_nsec / GPR_NS_PER_MS +
(double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
}
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
ts = gpr_time_sub(ts, g_start_time);
double x =
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
}
static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
static gpr_atm compute_min_deadline(shard_type *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
}
void grpc_timer_list_init(gpr_timespec now) {
void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
uint32_t i;
g_shared_mutables.initialized = true;
gpr_mu_init(&g_shared_mutables.mu);
g_clock_type = now.clock_type;
g_start_time = now;
g_shared_mutables.min_timer = timespec_to_atm_round_down(now);
g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx);
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
grpc_register_tracer("timer", &grpc_timer_trace);
@ -183,10 +148,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
g_shared_mutables.initialized = false;
}
static double ts_to_dbl(gpr_timespec ts) {
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
}
/* returns true if the first element in the list */
static void list_join(grpc_timer *head, grpc_timer *timer) {
timer->next = head;
@ -225,20 +186,15 @@ static void note_deadline_change(shard_type *shard) {
}
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
grpc_millis deadline, grpc_closure *closure) {
int is_first_timer = 0;
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR
"] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]",
timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec,
now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb);
gpr_log(GPR_DEBUG,
"TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer,
deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb);
}
if (!g_shared_mutables.initialized) {
@ -251,7 +207,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_mu_lock(&shard->mu);
timer->pending = true;
if (gpr_time_cmp(deadline, now) <= 0) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
if (deadline <= now) {
timer->pending = false;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
@ -259,9 +216,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
return;
}
grpc_time_averaged_stats_add_sample(&shard->stats,
ts_to_dbl(gpr_time_sub(deadline, now)));
if (deadline_atm < shard->queue_deadline_cap) {
grpc_time_averaged_stats_add_sample(&shard->stats, (deadline - now) / 1000.0);
if (deadline < shard->queue_deadline_cap) {
is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else {
timer->heap_index = INVALID_HEAP_INDEX;
@ -292,12 +248,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
shard->min_deadline);
}
if (deadline_atm < shard->min_deadline) {
if (deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline_atm;
shard->min_deadline = deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) {
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm);
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline);
grpc_kick_poller();
}
}
@ -494,30 +450,27 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
return (int)n;
}
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, grpc_millis *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */
gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer);
if (now_atm < min_timer) {
grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
if (now < min_timer) {
if (next != NULL) {
*next =
atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer));
*next = GPR_MIN(*next, min_timer);
}
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG,
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
now_atm, min_timer);
"TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
min_timer);
}
return 0;
}
grpc_error *shutdown_error =
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
now != GRPC_MILLIS_INF_FUTURE
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
@ -527,34 +480,23 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
if (next == NULL) {
next_str = gpr_strdup("NULL");
} else {
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, timespec_to_atm_round_down(*next));
gpr_asprintf(&next_str, "%" PRIdPTR, *next);
}
gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR
"] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR,
now.tv_sec, now.tv_nsec, now_atm, next_str,
gpr_tls_get(&g_last_seen_min_timer),
gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR
" next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR,
now, next_str, gpr_tls_get(&g_last_seen_min_timer),
gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));
gpr_free(next_str);
}
// actual code
bool r;
gpr_atm next_atm;
if (next == NULL) {
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
} else {
next_atm = timespec_to_atm_round_down(*next);
r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error);
*next = atm_to_timespec(next_atm);
}
bool r = run_some_expired_timers(exec_ctx, now, next, shutdown_error);
// tracing
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
char *next_str;
if (next == NULL) {
next_str = gpr_strdup("NULL");
} else {
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, next_atm);
gpr_asprintf(&next_str, "%" PRIdPTR, *next);
}
gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r,
next_str);

@ -101,9 +101,8 @@ static void start_timer_thread_and_unlock(void) {
void grpc_timer_manager_tick() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_timer_check(&exec_ctx, now, &next);
grpc_millis next = GRPC_MILLIS_INF_FUTURE;
grpc_timer_check(&exec_ctx, &next);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -112,12 +111,10 @@ static void timer_thread(void *unused) {
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_millis next = GRPC_MILLIS_INF_FUTURE;
// check timer state, updates next to the next time to run a check
if (grpc_timer_check(&exec_ctx, now, &next)) {
if (grpc_timer_check(&exec_ctx, &next)) {
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary
@ -161,12 +158,14 @@ static void timer_thread(void *unused) {
gpr_log(GPR_DEBUG, "sleep for a while");
}
} else {
next = inf_future;
next = GRPC_MILLIS_INF_FUTURE;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
}
gpr_cv_wait(&g_cv_wait, &g_mu, next);
gpr_cv_wait(&g_cv_wait, &g_mu,
grpc_millis_to_timespec(&exec_ctx, next, GPR_CLOCK_REALTIME));
grpc_exec_ctx_invalidate_now(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,

@ -110,7 +110,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
/* The http call is local. If it takes more than one sec, it is for sure not
on compute engine. */
gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
grpc_millis max_detection_delay = GPR_MS_PER_SEC;
grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &g_polling_mu);
@ -129,7 +129,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
grpc_resource_quota_create("google_default_credentials");
grpc_httpcli_get(
exec_ctx, &context, &detector.pollent, resource_quota, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
grpc_exec_ctx_now(exec_ctx) + max_detection_delay,
grpc_closure_create(on_compute_engine_detection_http_response, &detector,
grpc_schedule_on_exec_ctx),
&detector.response);
@ -146,8 +146,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) {
"pollset_work",
grpc_pollset_work(exec_ctx,
grpc_polling_entity_pollset(&detector.pollent),
&worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
&worker, GRPC_MILLIS_INF_FUTURE))) {
detector.is_done = 1;
detector.success = 0;
}

@ -395,7 +395,7 @@ void verifier_cb_ctx_destroy(grpc_exec_ctx *exec_ctx, verifier_cb_ctx *ctx) {
gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN};
/* Max delay defaults to one minute. */
gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN};
grpc_millis grpc_jwt_verifier_max_delay = 60 * GPR_MS_PER_SEC;
typedef struct {
char *email_domain;
@ -682,7 +682,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_resource_quota_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay,
grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@ -808,10 +808,10 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
extreme memory pressure. */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
http_cb, &ctx->responses[rsp_idx]);
grpc_httpcli_get(exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent,
resource_quota, &req,
grpc_exec_ctx_now(exec_ctx) + grpc_jwt_verifier_max_delay,
http_cb, &ctx->responses[rsp_idx]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
gpr_free(req.host);
gpr_free(req.http.path);

@ -96,7 +96,7 @@ typedef struct {
/* Globals to control the verifier. Not thread-safe. */
extern gpr_timespec grpc_jwt_verifier_clock_skew;
extern gpr_timespec grpc_jwt_verifier_max_delay;
extern grpc_millis grpc_jwt_verifier_max_delay;
/* The verifier can be created with some custom mappings to help with key
discovery in the case where the issuer is an email address.

@ -130,7 +130,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx,
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const grpc_http_response *response,
grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) {
grpc_credentials_md_store **token_md, grpc_millis *token_lifetime) {
char *null_terminated_body = NULL;
char *new_access_token = NULL;
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
@ -196,9 +196,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
}
gpr_asprintf(&new_access_token, "%s %s", token_type->value,
access_token->value);
token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10);
token_lifetime->tv_nsec = 0;
token_lifetime->clock_type = GPR_TIMESPAN;
*token_lifetime = strtol(expires_in->value, NULL, 10) * GPR_MS_PER_SEC;
if (*token_md != NULL) grpc_credentials_md_store_unref(exec_ctx, *token_md);
*token_md = grpc_credentials_md_store_create(1);
grpc_credentials_md_store_add_cstrings(
@ -224,7 +222,7 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
(grpc_credentials_metadata_request *)user_data;
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)r->creds;
gpr_timespec token_lifetime;
grpc_millis token_lifetime;
grpc_credentials_status status;
GRPC_LOG_IF_ERROR("oauth_fetch", GRPC_ERROR_REF(error));
@ -233,12 +231,11 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
status = grpc_oauth2_token_fetcher_credentials_parse_server_response(
exec_ctx, &r->response, &c->access_token_md, &token_lifetime);
if (status == GRPC_CREDENTIALS_OK) {
c->token_expiration =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime);
c->token_expiration = grpc_exec_ctx_now(exec_ctx) + token_lifetime;
r->cb(exec_ctx, r->user_data, c->access_token_md->entries,
c->access_token_md->num_entries, GRPC_CREDENTIALS_OK, NULL);
} else {
c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
c->token_expiration = 0;
r->cb(exec_ctx, r->user_data, NULL, 0, status,
"Error occured when fetching oauth2 token.");
}
@ -252,15 +249,14 @@ static void oauth2_token_fetcher_get_request_metadata(
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
gpr_timespec refresh_threshold = gpr_time_from_seconds(
GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
grpc_millis refresh_threshold =
GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS * GPR_MS_PER_SEC;
grpc_credentials_md_store *cached_access_token_md = NULL;
{
gpr_mu_lock(&c->mu);
if (c->access_token_md != NULL &&
(gpr_time_cmp(
gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)),
refresh_threshold) > 0)) {
(c->token_expiration + grpc_exec_ctx_now(exec_ctx) >
refresh_threshold)) {
cached_access_token_md =
grpc_credentials_md_store_ref(c->access_token_md);
}
@ -275,7 +271,7 @@ static void oauth2_token_fetcher_get_request_metadata(
exec_ctx,
grpc_credentials_metadata_request_create(creds, cb, user_data),
&c->httpcli_context, pollent, on_oauth2_token_fetcher_http_response,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
grpc_exec_ctx_now(exec_ctx) + refresh_threshold);
}
}
@ -285,7 +281,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
c->token_expiration = 0;
c->fetch_func = fetch_func;
grpc_httpcli_context_init(&c->httpcli_context);
}
@ -300,7 +296,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = {
static void compute_engine_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
grpc_iomgr_cb_func response_cb, grpc_millis deadline) {
grpc_http_header header = {"Metadata-Flavor", "Google"};
grpc_httpcli_request request;
memset(&request, 0, sizeof(grpc_httpcli_request));
@ -351,7 +347,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = {
static void refresh_token_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
grpc_iomgr_cb_func response_cb, grpc_millis deadline) {
grpc_google_refresh_token_credentials *c =
(grpc_google_refresh_token_credentials *)metadata_req->creds;
grpc_http_header header = {"Content-Type",

@ -72,12 +72,12 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
grpc_httpcli_context *http_context,
grpc_polling_entity *pollent,
grpc_iomgr_cb_func cb,
gpr_timespec deadline);
grpc_millis deadline);
typedef struct {
grpc_call_credentials base;
gpr_mu mu;
grpc_credentials_md_store *access_token_md;
gpr_timespec token_expiration;
grpc_millis token_expiration;
grpc_httpcli_context httpcli_context;
grpc_fetch_oauth2_func fetch_func;
} grpc_oauth2_token_fetcher_credentials;
@ -104,6 +104,6 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response,
grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime);
grpc_credentials_md_store **token_md, grpc_millis *token_lifetime);
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */

@ -68,8 +68,8 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_closure_init(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timespec_to_millis(&exec_ctx, deadline),
&alarm->on_alarm);
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}

@ -68,8 +68,7 @@ typedef struct {
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
grpc_pollset_worker **worker, grpc_millis deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
@ -107,8 +106,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
gpr_timespec now,
gpr_timespec deadline) {
grpc_millis deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
@ -122,7 +120,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
gpr_timespec deadline_ts =
grpc_millis_to_timespec(exec_ctx, deadline, GPR_CLOCK_REALTIME);
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
if (&w == npp->root) {
npp->root = w.next;
@ -453,7 +454,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
gpr_timespec deadline;
grpc_millis deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
bool first_loop;
@ -480,8 +481,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
#ifndef NDEBUG
@ -510,7 +510,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
gpr_timespec now;
if (cc->completion_type != GRPC_CQ_NEXT) {
gpr_log(GPR_ERROR,
@ -533,20 +532,20 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
dump_pending_tags(cc);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cc->things_queued_ever),
.cq = cc,
.deadline = deadline,
.deadline = 0, /* set below */
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
grpc_millis deadline_millis = is_finished_arg.deadline =
grpc_timespec_to_millis(&exec_ctx, deadline);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
@ -577,8 +576,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
if (!is_finished_arg.first_loop &&
grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
@ -586,7 +585,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, deadline);
NULL, deadline_millis);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
@ -661,8 +660,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
@ -671,7 +669,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@ -696,20 +693,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
dump_pending_tags(cc);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cc->things_queued_ever),
.cq = cc,
.deadline = deadline,
.deadline = 0, /* set below */
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
grpc_millis deadline_millis = is_finished_arg.deadline =
grpc_timespec_to_millis(&exec_ctx, deadline);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
@ -756,8 +753,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
dump_pending_tags(cc);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
if (!is_finished_arg.first_loop &&
grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
@ -766,7 +763,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, deadline);
&worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);

@ -377,8 +377,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
GPR_ASSERT(resolved_addresses->naddrs >= 1);
// Connect to requested address.
// The connection callback inherits our reference to conn.
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN));
const grpc_millis deadline =
grpc_exec_ctx_now(exec_ctx) + 10 * GPR_MS_PER_SEC;
grpc_tcp_client_connect(exec_ctx, &conn->on_server_connect_done,
&conn->server_endpoint, conn->pollset_set, NULL,
&resolved_addresses->addrs[0], deadline);
@ -434,14 +434,12 @@ static void thread_main(void* arg) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
do {
gpr_ref(&proxy->users);
const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
const gpr_timespec deadline =
gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
grpc_pollset_worker* worker = NULL;
gpr_mu_lock(proxy->mu);
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline));
grpc_pollset_work(&exec_ctx, proxy->pollset, &worker,
grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC));
gpr_mu_unlock(proxy->mu);
grpc_exec_ctx_flush(&exec_ctx);
} while (!gpr_unref(&proxy->users));

@ -187,10 +187,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
size_t num_bytes, size_t write_size,
size_t slice_size, bool shutdown) {
struct read_and_write_test_state state;
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_millis deadline =
grpc_timespec_to_millis(&exec_ctx, grpc_timeout_seconds_to_deadline(20));
gpr_log(GPR_DEBUG, "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR
" slice_size=%" PRIuPTR " shutdown=%d",
num_bytes, write_size, slice_size, shutdown);
@ -246,11 +247,10 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_mu_lock(g_mu);
while (!state.read_done || !state.write_done) {
grpc_pollset_worker *worker = NULL;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
GPR_ASSERT(grpc_exec_ctx_now(&exec_ctx) < deadline);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(&exec_ctx);
@ -273,13 +273,11 @@ static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count,
grpc_exec_ctx_flush(exec_ctx);
for (int i = 0; i < 5 && *fail_count < want_fail_count; i++) {
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec deadline =
gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
gpr_mu_lock(g_mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(exec_ctx, g_pollset, &worker, now, deadline)));
grpc_pollset_work(exec_ctx, g_pollset, &worker,
grpc_exec_ctx_now(exec_ctx) + GPR_MS_PER_SEC)));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(exec_ctx);
}

@ -107,8 +107,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(
"pollset_work",
grpc_pollset_work(&exec_ctx,
grpc_polling_entity_pollset(&request.pops),
&worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
&worker, GRPC_MILLIS_INF_FUTURE))) {
request.is_done = 1;
}
}

@ -102,7 +102,7 @@ void grpc_free_port_using_server(int port) {
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("port_server_client/free");
grpc_httpcli_get(&exec_ctx, &context, &pr.pops, resource_quota, &req,
grpc_timeout_seconds_to_deadline(30),
grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC,
grpc_closure_create(freed_port_from_server, &pr,
grpc_schedule_on_exec_ctx),
&rsp);
@ -113,8 +113,8 @@ void grpc_free_port_using_server(int port) {
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
&worker, gpr_now(GPR_CLOCK_MONOTONIC),
grpc_timeout_seconds_to_deadline(1)))) {
&worker,
grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) {
pr.done = 1;
}
}
@ -185,7 +185,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("port_server_client/pick_retry");
grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, resource_quota, &req,
grpc_timeout_seconds_to_deadline(10),
grpc_exec_ctx_now(exec_ctx) + 30 * GPR_MS_PER_SEC,
grpc_closure_create(got_port_from_server, pr,
grpc_schedule_on_exec_ctx),
&pr->response);
@ -235,7 +235,7 @@ int grpc_pick_port_using_server(void) {
grpc_resource_quota_create("port_server_client/pick");
grpc_httpcli_get(
&exec_ctx, &context, &pr.pops, resource_quota, &req,
grpc_timeout_seconds_to_deadline(30),
grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC,
grpc_closure_create(got_port_from_server, &pr, grpc_schedule_on_exec_ctx),
&pr.response);
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
@ -245,8 +245,8 @@ int grpc_pick_port_using_server(void) {
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
&worker, gpr_now(GPR_CLOCK_MONOTONIC),
grpc_timeout_seconds_to_deadline(1)))) {
&worker,
grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) {
pr.port = 0;
}
}

Loading…
Cancel
Save