Remove code from network_status_tracker.c Pull the hash function to useful.h

pull/9696/head
yang-g 8 years ago
parent ee31910271
commit 6955c5e8d2
  1. 3
      include/grpc/support/useful.h
  2. 87
      src/core/lib/iomgr/network_status_tracker.c
  3. 10
      src/core/lib/iomgr/timer_generic.c
  4. 9
      src/core/lib/support/cpu_posix.c
  5. 5
      test/core/end2end/tests/network_status_change.c

@ -74,4 +74,7 @@
#define GPR_ICMP(a, b) ((a) < (b) ? -1 : ((a) > (b) ? 1 : 0))
#define GPR_HASH_POINTER(x, range) \
((((size_t)x) >> 4) ^ (((size_t)x) >> 9) ^ (((size_t)x) >> 14)) % (range)
#endif /* GRPC_SUPPORT_USEFUL_H */

@ -31,95 +31,18 @@
*
*/
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/endpoint.h"
typedef struct endpoint_ll_node {
grpc_endpoint *ep;
struct endpoint_ll_node *next;
} endpoint_ll_node;
static endpoint_ll_node *head = NULL;
static gpr_mu g_endpoint_mutex;
void grpc_network_status_shutdown(void) {
if (head != NULL) {
gpr_log(GPR_ERROR,
"Memory leaked as not all network endpoints were shut down");
}
gpr_mu_destroy(&g_endpoint_mutex);
}
void grpc_network_status_shutdown(void) {}
void grpc_network_status_init(void) {
gpr_mu_init(&g_endpoint_mutex);
// TODO(makarandd): Install callback with OS to monitor network status.
}
void grpc_destroy_network_status_monitor() {
for (endpoint_ll_node *curr = head; curr != NULL;) {
endpoint_ll_node *next = curr->next;
gpr_free(curr);
curr = next;
}
gpr_mu_destroy(&g_endpoint_mutex);
}
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
head->ep = ep;
head->next = NULL;
} else {
endpoint_ll_node *prev_head = head;
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
head->ep = ep;
head->next = prev_head;
}
gpr_mu_unlock(&g_endpoint_mutex);
}
void grpc_destroy_network_status_monitor() {}
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) {
gpr_mu_lock(&g_endpoint_mutex);
GPR_ASSERT(head);
bool found = false;
endpoint_ll_node *prev = head;
// if we're unregistering the head, just move head to the next
if (ep == head->ep) {
head = head->next;
gpr_free(prev);
found = true;
} else {
for (endpoint_ll_node *curr = head->next; curr != NULL; curr = curr->next) {
if (ep == curr->ep) {
prev->next = curr->next;
gpr_free(curr);
found = true;
break;
}
prev = curr;
}
}
gpr_mu_unlock(&g_endpoint_mutex);
GPR_ASSERT(found);
}
void grpc_network_status_register_endpoint(grpc_endpoint *ep) { (void)ep; }
// Walk the linked-list from head and execute shutdown. It is possible that
// other threads might be in the process of shutdown as well, but that has
// no side effect since endpoint shutdown is idempotent.
void grpc_network_status_shutdown_all_endpoints() {
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
gpr_mu_unlock(&g_endpoint_mutex);
return;
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) { (void)ep; }
for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
curr->ep->vtable->shutdown(&exec_ctx, curr->ep,
GRPC_ERROR_CREATE("Network unavailable"));
}
gpr_mu_unlock(&g_endpoint_mutex);
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_network_status_shutdown_all_endpoints() {}

@ -121,12 +121,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
g_initialized = false;
}
/* This is a cheap, but good enough, pointer hash for sharding the tasks: */
static size_t shard_idx(const grpc_timer *info) {
size_t x = (size_t)info;
return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
}
static double ts_to_dbl(gpr_timespec ts) {
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
}
@ -181,7 +175,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
shard_type *shard = &g_shards[shard_idx(timer)];
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
@ -247,7 +241,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return;
}
shard_type *shard = &g_shards[shard_idx(timer)];
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);

@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
static __thread char magic_thread_local;
@ -60,18 +61,12 @@ unsigned gpr_cpu_num_cores(void) {
return (unsigned)ncpus;
}
/* This is a cheap, but good enough, pointer hash for sharding things: */
static size_t shard_ptr(const void *info) {
size_t x = (size_t)info;
return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) % gpr_cpu_num_cores();
}
unsigned gpr_cpu_current_cpu(void) {
/* NOTE: there's no way I know to return the actual cpu index portably...
most code that's using this is using it to shard across work queues though,
so here we use thread identity instead to achieve a similar though not
identical effect */
return (unsigned)shard_ptr(&magic_thread_local);
return (unsigned)GPR_HASH_POINTER(&magic_thread_local, gpr_cpu_num_cores());
}
#endif /* GPR_CPU_POSIX */

@ -212,8 +212,11 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
// TODO(makdharma) Update this when the shutdown_all_endpoints is implemented.
// Expected behavior of a RPC when network is lost.
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
// GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
validate_host_override_string("foo.test.google.fr:1234", call_details.host,
config);

Loading…
Cancel
Save