Unifying error handling within core

pull/6897/head
Craig Tiller 9 years ago
parent d5c6eca64b
commit 27f59afecb
  1. 6
      BUILD
  2. 2
      Makefile
  3. 1
      binding.gyp
  4. 2
      build.yaml
  5. 1
      config.m4
  6. 3
      gRPC.podspec
  7. 2
      grpc.gemspec
  8. 2
      package.xml
  9. 26
      src/core/lib/iomgr/closure.c
  10. 21
      src/core/lib/iomgr/closure.h
  11. 346
      src/core/lib/iomgr/error.c
  12. 69
      src/core/lib/iomgr/error.h
  13. 26
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  14. 15
      src/core/lib/iomgr/exec_ctx.c
  15. 6
      src/core/lib/iomgr/exec_ctx.h
  16. 4
      src/core/lib/iomgr/executor.c
  17. 2
      src/core/lib/iomgr/executor.h
  18. 2
      src/core/lib/iomgr/resolve_address_posix.c
  19. 118
      src/core/lib/iomgr/socket_utils_common_posix.c
  20. 22
      src/core/lib/iomgr/socket_utils_posix.h
  21. 53
      src/core/lib/iomgr/tcp_client_posix.c
  22. 32
      src/core/lib/iomgr/tcp_posix.c
  23. 1
      src/python/grpcio/grpc_core_dependencies.py
  24. 2
      tools/doxygen/Doxyfile.core.internal
  25. 3
      tools/run_tests/sources_and_headers.json
  26. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj
  27. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  28. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  29. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -176,6 +176,7 @@ cc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@ -309,6 +310,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/error.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@ -521,6 +523,7 @@ cc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@ -641,6 +644,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/error.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@ -1321,6 +1325,7 @@ objc_library(
"src/core/lib/iomgr/endpoint.c",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/error.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
@ -1512,6 +1517,7 @@ objc_library(
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",

@ -2471,6 +2471,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/error.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
@ -2812,6 +2813,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/error.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \

@ -579,6 +579,7 @@
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',

@ -163,6 +163,7 @@ filegroups:
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_pair.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/ev_poll_and_epoll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/exec_ctx.h
@ -237,6 +238,7 @@ filegroups:
- src/core/lib/iomgr/endpoint.c
- src/core/lib/iomgr/endpoint_pair_posix.c
- src/core/lib/iomgr/endpoint_pair_windows.c
- src/core/lib/iomgr/error.c
- src/core/lib/iomgr/ev_poll_and_epoll_posix.c
- src/core/lib/iomgr/ev_posix.c
- src/core/lib/iomgr/exec_ctx.c

@ -98,6 +98,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/error.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \

@ -178,6 +178,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
@ -343,6 +344,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
@ -520,6 +522,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/error.h',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',

@ -190,6 +190,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/closure.h )
s.files += %w( src/core/lib/iomgr/endpoint.h )
s.files += %w( src/core/lib/iomgr/endpoint_pair.h )
s.files += %w( src/core/lib/iomgr/error.h )
s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.h )
s.files += %w( src/core/lib/iomgr/ev_posix.h )
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
@ -327,6 +328,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/endpoint.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.c )
s.files += %w( src/core/lib/iomgr/error.c )
s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.c )
s.files += %w( src/core/lib/iomgr/ev_posix.c )
s.files += %w( src/core/lib/iomgr/exec_ctx.c )

@ -193,6 +193,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
@ -330,6 +331,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_windows.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.c" role="src" />

@ -39,25 +39,29 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->final_data = 0;
}
void grpc_closure_list_add(grpc_closure_list *closure_list,
grpc_closure *closure, bool success) {
void grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) return;
closure->final_data = (success != 0);
closure->final_data.error = error;
closure->next = NULL;
if (closure_list->head == NULL) {
closure_list->head = closure;
} else {
closure_list->tail->final_data |= (uintptr_t)closure;
closure_list->tail->next = closure;
}
closure_list->tail = closure;
}
void grpc_closure_list_fail_all(grpc_closure_list *list) {
for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) {
c->final_data &= ~(uintptr_t)1;
void grpc_closure_list_fail_all(grpc_closure_list *list,
grpc_error *forced_failure) {
for (grpc_closure *c = list->head; c != NULL; c = c->next) {
if (c->final_data.error == NULL) {
c->final_data.error = grpc_error_ref(forced_failure);
}
}
grpc_error_unref(forced_failure);
}
bool grpc_closure_list_empty(grpc_closure_list closure_list) {
@ -71,7 +75,7 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
if (dst->head == NULL) {
*dst = *src;
} else {
dst->tail->final_data |= (uintptr_t)src->head;
dst->tail->next = src->head;
dst->tail = src->tail;
}
src->head = src->tail = NULL;
@ -98,7 +102,3 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper;
}
grpc_closure *grpc_closure_next(grpc_closure *closure) {
return (grpc_closure *)(closure->final_data & ~(uintptr_t)1);
}

@ -36,6 +36,7 @@
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include "src/core/lib/iomgr/error.h"
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@ -65,10 +66,12 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
/** Once enqueued, contains in the lower bit the success of the closure,
and in the upper bits the pointer to the next closure in the list.
Before enqueing for execution, this is usable for scratch data. */
uintptr_t final_data;
union {
grpc_error *error;
uintptr_t scratch;
} final_data;
grpc_closure *next;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@ -83,11 +86,12 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
/** add \a closure to the end of \a list and set \a closure's success to \a
* success */
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
bool success);
void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
grpc_error *error);
/** force all success bits in \a list to false */
void grpc_closure_list_fail_all(grpc_closure_list *list);
void grpc_closure_list_fail_all(grpc_closure_list *list,
grpc_error *forced_failure);
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
@ -95,7 +99,4 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
bool grpc_closure_list_empty(grpc_closure_list list);
/** return the next pointer for a queued closure list */
grpc_closure *grpc_closure_next(grpc_closure *closure);
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */

@ -0,0 +1,346 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/lib/iomgr/error.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
static void destroy_integer(void *key) {}
static void *copy_integer(void *key) { return key; }
static long compare_integers(void *key1, void *key2) {
return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2);
}
static void destroy_string(void *str) { gpr_free(str); }
static void *copy_string(void *str) { return gpr_strdup(str); }
static void destroy_err(void *err) { grpc_error_unref(err); }
static void *copy_err(void *err) { return grpc_error_ref(err); }
static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
compare_integers,
destroy_integer, copy_integer};
static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
compare_integers, destroy_string,
copy_string};
static const gpr_avl_vtable avl_vtable_errs = {
destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
static const char *error_int_name(grpc_error_ints key) {
switch (key) {
case GRPC_ERROR_INT_STATUS_CODE:
return "status_code";
case GRPC_ERROR_INT_ERRNO:
return "errno";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
static const char *error_str_name(grpc_error_strs key) {
switch (key) {
case GRPC_ERROR_STR_DESCRIPTION:
return "description";
case GRPC_ERROR_STR_OS_ERROR:
return "os_error";
case GRPC_ERROR_STR_TARGET_ADDRESS:
return "target_address";
case GRPC_ERROR_STR_SYSCALL:
return "syscall";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
struct grpc_error {
gpr_refcount refs;
gpr_avl ints;
gpr_avl strs;
gpr_avl errs;
uintptr_t next_err;
};
static bool is_special(grpc_error *err) {
return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM;
}
grpc_error *grpc_error_ref(grpc_error *err) {
if (is_special(err)) return err;
gpr_ref(&err->refs);
return err;
}
static void error_destroy(grpc_error *err) {
GPR_ASSERT(!is_special(err));
gpr_avl_unref(err->ints);
gpr_avl_unref(err->strs);
gpr_avl_unref(err->errs);
}
void grpc_error_unref(grpc_error *err) {
if (!is_special(err) && gpr_unref(&err->refs)) {
error_destroy(err);
}
}
grpc_error *grpc_error_create(void) {
grpc_error *err = gpr_malloc(sizeof(*err));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
err->ints = gpr_avl_create(&avl_vtable_ints);
err->strs = gpr_avl_create(&avl_vtable_strs);
err->errs = gpr_avl_create(&avl_vtable_errs);
err->next_err = 0;
gpr_ref_init(&err->refs, 1);
return err;
}
static grpc_error *copy_error_and_unref(grpc_error *in) {
if (is_special(in)) {
return grpc_error_create();
}
grpc_error *out = gpr_malloc(sizeof(*out));
out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs);
out->errs = gpr_avl_ref(in->errs);
out->next_err = in->next_err;
gpr_ref_init(&out->refs, 1);
grpc_error_unref(in);
return out;
}
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) {
grpc_error *new = copy_error_and_unref(src);
new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value);
return new;
}
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) {
grpc_error *new = copy_error_and_unref(src);
new->strs = gpr_avl_add(new->strs, (void *)(uintptr_t)which, (void *)value);
return new;
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
grpc_error *new = copy_error_and_unref(src);
new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
return new;
}
static const char *no_error_string = "null";
static const char *oom_error_string = "\"Out of memory\"";
typedef struct {
char *key;
char *value;
} kv_pair;
typedef struct {
kv_pair *kvs;
size_t num_kvs;
size_t cap_kvs;
} kv_pairs;
static void append_kv(kv_pairs *kvs, char *key, char *value) {
if (kvs->num_kvs == kvs->cap_kvs) {
kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
}
kvs->kvs[kvs->num_kvs].key = key;
kvs->kvs[kvs->num_kvs].value = value;
kvs->num_kvs++;
}
static void collect_kvs(gpr_avl_node *node, char *key(void *k),
char *fmt(void *v), kv_pairs *kvs) {
if (node == NULL) return;
append_kv(kvs, key(node->key), fmt(node->value));
collect_kvs(node->left, key, fmt, kvs);
collect_kvs(node->right, key, fmt, kvs);
}
static char *key_int(void *p) {
return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p));
}
static char *key_str(void *p) {
return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
}
static char *fmt_int(void *p) {
char *s;
gpr_asprintf(&s, "%lld", (intptr_t)p);
return s;
}
static void append_chr(char c, char **s, size_t *sz, size_t *cap) {
if (*sz == *cap) {
*cap = GPR_MAX(8, 3 * *cap / 2);
*s = gpr_realloc(*s, *cap);
}
(*s)[(*sz)++] = c;
}
static void append_str(const char *str, char **s, size_t *sz, size_t *cap) {
for (const char *c = str; *c; c++) {
append_chr(*c, s, sz, cap);
}
}
static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) {
static const char *hex = "0123456789abcdef";
append_chr('"', s, sz, cap);
for (const uint8_t *c = (const uint8_t *)str; *c; c++) {
if (*c < 32 || *c >= 127) {
append_chr('\\', s, sz, cap);
switch (*c) {
case '\b':
append_chr('b', s, sz, cap);
break;
case '\f':
append_chr('f', s, sz, cap);
break;
case '\n':
append_chr('n', s, sz, cap);
break;
case '\r':
append_chr('r', s, sz, cap);
break;
case '\t':
append_chr('t', s, sz, cap);
break;
default:
append_chr('u', s, sz, cap);
append_chr('0', s, sz, cap);
append_chr('0', s, sz, cap);
append_chr(hex[*c >> 4], s, sz, cap);
append_chr(hex[*c & 0x0f], s, sz, cap);
break;
}
} else {
append_chr((char)*c, s, sz, cap);
}
}
append_chr('"', s, sz, cap);
append_chr(0, s, sz, cap);
}
static char *fmt_str(void *p) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
append_esc_str(p, &s, &sz, &cap);
return s;
}
static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
if (n == NULL) return;
add_errs(n->left, s, sz, cap);
const char *e = grpc_error_string(n->value);
append_str(e, s, sz, cap);
grpc_error_free_string(e);
add_errs(n->right, s, sz, cap);
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
append_chr('[', &s, &sz, &cap);
add_errs(err->errs.root, &s, &sz, &cap);
append_chr(']', &s, &sz, &cap);
return s;
}
static int cmp_kvs(const void *a, const void *b) {
const kv_pair *ka = a;
const kv_pair *kb = b;
return strcmp(ka->key, kb->key);
}
static const char *finish_kvs(kv_pairs *kvs) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
append_chr('{', &s, &sz, &cap);
for (size_t i = 0; i < kvs->num_kvs; i++) {
append_esc_str(kvs->kvs[i].key, &s, &sz, &cap);
gpr_free(kvs->kvs[i].key);
append_chr(':', &s, &sz, &cap);
append_str(kvs->kvs[i].value, &s, &sz, &cap);
gpr_free(kvs->kvs[i].value);
}
append_chr('}', &s, &sz, &cap);
gpr_free(kvs->kvs);
return s;
}
const char *grpc_error_string(grpc_error *err) {
if (err == GRPC_ERROR_NONE) return no_error_string;
if (err == GRPC_ERROR_OOM) return oom_error_string;
kv_pairs kvs;
memset(&kvs, 0, sizeof(kvs));
collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs);
return finish_kvs(&kvs);
}
grpc_error *grpc_os_error(int err, const char *call_name) {
return grpc_error_set_str(
grpc_error_set_str(
grpc_error_set_int(grpc_error_create(), GRPC_ERROR_INT_ERRNO, err),
GRPC_ERROR_STR_OS_ERROR, strerror(err)),
GRPC_ERROR_STR_SYSCALL, call_name);
}

@ -0,0 +1,69 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_ERROR_H
#define GRPC_CORE_LIB_IOMGR_ERROR_H
#include <stdint.h>
typedef struct grpc_error grpc_error;
typedef enum {
GRPC_ERROR_INT_STATUS_CODE,
GRPC_ERROR_INT_ERRNO
} grpc_error_ints;
typedef enum {
GRPC_ERROR_STR_DESCRIPTION,
GRPC_ERROR_STR_TARGET_ADDRESS,
GRPC_ERROR_STR_OS_ERROR,
GRPC_ERROR_STR_SYSCALL
} grpc_error_strs;
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
#define GRPC_ERROR_OOM ((grpc_error *)1)
const char *grpc_error_string(grpc_error *error);
void grpc_error_free_string(const char *str);
grpc_error *grpc_error_create(void);
grpc_error *grpc_error_ref(grpc_error *err);
void grpc_error_unref(grpc_error *err);
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value);
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value);
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child);
grpc_error *grpc_os_error(int err, const char *call_name);
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */

@ -459,7 +459,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
} else {
remove_fd_from_all_epoll_sets(fd->fd);
}
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@ -508,6 +508,15 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
return GRPC_ERROR_NONE;
} else {
return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_DESCRIPTION,
"FD shutdown");
}
}
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (*st == CLOSURE_NOT_READY) {
@ -516,7 +525,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
NULL);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@ -539,7 +549,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
@ -864,7 +874,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
pollset->vtable->finish_shutdown(pollset);
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -1137,7 +1147,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure,
GRPC_ERROR_NONE);
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
@ -1573,7 +1584,8 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
grpc_exec_ctx_push(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE,
NULL);
}
}
gpr_mu_unlock(&da->pollset->mu);
@ -1597,7 +1609,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
GRPC_FD_REF(fd, "delayed_add");
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL);
grpc_exec_ctx_push(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL);
}
}

@ -47,11 +47,12 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
bool success = (bool)(c->final_data & 1);
grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1);
grpc_closure *next = c->next;
grpc_error *error = c->final_data.error;
did_something = true;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
c->cb(exec_ctx, c->cb_arg, success);
c->cb(exec_ctx, c->cb_arg, error);
grpc_error_unref(error);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
c = next;
}
@ -64,11 +65,11 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
grpc_exec_ctx_flush(exec_ctx);
}
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
bool success,
grpc_workqueue *offload_target_or_null) {
void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_add(&exec_ctx->closure_list, closure, success);
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,

@ -83,9 +83,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
/** Add a closure to be executed at the next flush/finish point */
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
bool success,
grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,

@ -112,10 +112,10 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
void grpc_executor_enqueue(grpc_closure *closure, bool success) {
void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_add(&g_executor.closures, closure, success);
grpc_closure_list_append(&g_executor.closures, closure, error);
maybe_spawn_locked();
}
gpr_mu_unlock(&g_executor.mu);

@ -45,7 +45,7 @@ void grpc_executor_init();
/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
* thread */
void grpc_executor_enqueue(grpc_closure *closure, bool success);
void grpc_executor_push(grpc_closure *closure, grpc_error *error);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown();

@ -173,7 +173,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
grpc_executor_enqueue(&r->request_closure, 1);
grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,

@ -57,10 +57,10 @@
#include "src/core/lib/support/string.h"
/* set a socket to non blocking mode */
int grpc_set_socket_nonblocking(int fd, int non_blocking) {
grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) {
int oldflags = fcntl(fd, F_GETFL, 0);
if (oldflags < 0) {
return 0;
return grpc_os_error(errno, "fcntl");
}
if (non_blocking) {
@ -70,52 +70,58 @@ int grpc_set_socket_nonblocking(int fd, int non_blocking) {
}
if (fcntl(fd, F_SETFL, oldflags) != 0) {
return 0;
return grpc_os_error(errno, "fcntl");
}
return 1;
return GRPC_ERROR_NONE;
}
int grpc_set_socket_no_sigpipe_if_possible(int fd) {
grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) {
#ifdef GPR_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
return 0 == setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) &&
0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen) &&
(newval != 0) == val;
#else
return 1;
if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
return grpc_os_error(errno, "setsockopt(SO_NOSIGPIPE)");
}
if (0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
return grpc_os_error(errno, "getsockopt(SO_NOSIGPIPE)");
}
if ((newval != 0) == val) {
return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_grpc_os_error,
"Failed to set SO_NOSIGPIPE");
}
#endif
return GRPC_ERROR_NONE;
}
int grpc_set_socket_ip_pktinfo_if_possible(int fd) {
grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) {
#ifdef GPR_HAVE_IP_PKTINFO
int get_local_ip = 1;
return 0 == setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip));
#else
(void)fd;
return 1;
if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
return grpc_os_error(errno, "setsockopt(IP_PKTINFO)");
}
#endif
return GRPC_ERROR_NONE;
}
int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
#ifdef GPR_HAVE_IPV6_RECVPKTINFO
int get_local_ip = 1;
return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip));
#else
(void)fd;
return 1;
if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
return grpc_os_error(errno, "setsockopt(IPV6_RECVPKTINFO)");
}
#endif
return GRPC_ERROR_NONE;
}
/* set a socket to close on exec */
int grpc_set_socket_cloexec(int fd, int close_on_exec) {
grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) {
int oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
return 0;
return grpc_os_error(errno, "fcntl");
}
if (close_on_exec) {
@ -125,30 +131,47 @@ int grpc_set_socket_cloexec(int fd, int close_on_exec) {
}
if (fcntl(fd, F_SETFD, oldflags) != 0) {
return 0;
return grpc_os_error(errno, "fcntl");
}
return 1;
return GRPC_ERROR_NONE;
}
/* set a socket to reuse old addresses */
int grpc_set_socket_reuse_addr(int fd, int reuse) {
grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) {
int val = (reuse != 0);
int newval;
socklen_t intlen = sizeof(newval);
return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) &&
0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) &&
(newval != 0) == val;
if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
return grpc_os_error(errno, "setsockopt(SO_REUSEADDR)");
}
if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
return grpc_os_error(errno, "getsockopt(SO_REUSEADDR)");
}
if ((newval != 0) != val) {
return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR,
"Failed to set SO_REUSEADDR");
}
return GRPC_ERROR_NONE;
}
/* disable nagle */
int grpc_set_socket_low_latency(int fd, int low_latency) {
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
int val = (low_latency != 0);
int newval;
socklen_t intlen = sizeof(newval);
return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) &&
0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) &&
(newval != 0) == val;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
return grpc_os_error(errno, "setsockopt(TCP_NODELAY)");
}
if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
return grpc_os_error(errno, "getsockopt(TCP_NODELAY)");
}
if ((newval != 0) != val) {
return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR,
"Failed to set TCP_NODELAY");
}
return GRPC_ERROR_NONE;
}
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
@ -196,35 +219,40 @@ static int set_socket_dualstack(int fd) {
}
}
int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol, grpc_dualstack_mode *dsmode) {
grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol,
grpc_dualstack_mode *dsmode,
int *newfd) {
int family = addr->sa_family;
if (family == AF_INET6) {
int fd;
if (grpc_ipv6_loopback_available()) {
fd = socket(family, type, protocol);
*newfd = socket(family, type, protocol);
} else {
fd = -1;
*newfd = -1;
errno = EAFNOSUPPORT;
}
/* Check if we've got a valid dualstack socket. */
if (fd >= 0 && set_socket_dualstack(fd)) {
if (*newfd >= 0 && set_socket_dualstack(*newfd)) {
*dsmode = GRPC_DSMODE_DUALSTACK;
return fd;
return GRPC_ERROR_NONE;
}
/* If this isn't an IPv4 address, then return whatever we've got. */
if (!grpc_sockaddr_is_v4mapped(addr, NULL)) {
*dsmode = GRPC_DSMODE_IPV6;
return fd;
return GRPC_ERROR_NONE;
}
/* Fall back to AF_INET. */
if (fd >= 0) {
close(fd);
if (*newfd >= 0) {
close(*newfd);
}
family = AF_INET;
}
*dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
return socket(family, type, protocol);
*newfd = socket(family, type, protocol);
if (*newfd == -1) {
return grpc_os_error(errno, "socket");
}
return GRPC_ERROR_NONE;
}
#endif

@ -37,21 +37,23 @@
#include <sys/socket.h>
#include <unistd.h>
#include "src/core/lib/iomgr/error.h"
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
int nonblock, int cloexec);
/* set a socket to non blocking mode */
int grpc_set_socket_nonblocking(int fd, int non_blocking);
grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking);
/* set a socket to close on exec */
int grpc_set_socket_cloexec(int fd, int close_on_exec);
grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec);
/* set a socket to reuse old addresses */
int grpc_set_socket_reuse_addr(int fd, int reuse);
grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse);
/* disable nagle */
int grpc_set_socket_low_latency(int fd, int low_latency);
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency);
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
@ -66,17 +68,17 @@ int grpc_ipv6_loopback_available(void);
/* Tries to set SO_NOSIGPIPE if available on this platform.
Returns 1 on success, 0 on failure.
If SO_NO_SIGPIPE is not available, returns 1. */
int grpc_set_socket_no_sigpipe_if_possible(int fd);
grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd);
/* Tries to set IP_PKTINFO if available on this platform.
Returns 1 on success, 0 on failure.
If IP_PKTINFO is not available, returns 1. */
int grpc_set_socket_ip_pktinfo_if_possible(int fd);
grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd);
/* Tries to set IPV6_RECVPKTINFO if available on this platform.
Returns 1 on success, 0 on failure.
If IPV6_RECVPKTINFO is not available, returns 1. */
int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd);
grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd);
/* An enum to keep track of IPv4/IPv6 socket modes.
@ -117,7 +119,9 @@ extern int grpc_forbid_dualstack_sockets_for_testing;
IPv4, so that bind() or connect() see the correct family.
Also, it's important to distinguish between DUALSTACK and IPV6 when
listening on the [::] wildcard address. */
int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol, grpc_dualstack_mode *dsmode);
grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol,
grpc_dualstack_mode *dsmode,
int *newfd);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */

@ -71,7 +71,9 @@ typedef struct {
grpc_closure *closure;
} async_connect;
static int prepare_socket(const struct sockaddr *addr, int fd) {
static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
grpc_error *err = GRPC_ERROR_NONE;
if (fd < 0) {
goto error;
}
@ -83,13 +85,14 @@ static int prepare_socket(const struct sockaddr *addr, int fd) {
strerror(errno));
goto error;
}
return 1;
goto done;
error:
if (fd >= 0) {
close(fd);
}
return 0;
done:
return err;
}
static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
@ -121,6 +124,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
grpc_endpoint **ep = ac->ep;
grpc_closure *closure = ac->closure;
grpc_fd *fd;
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d",
@ -143,8 +147,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
&so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s",
ac->addr_str, strerror(errno));
error = grpc_os_error(errno, "getsockopt");
goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
@ -169,14 +172,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
} else {
switch (so_error) {
case ECONNREFUSED:
gpr_log(
GPR_ERROR,
"failed to connect to '%s': socket error: connection refused",
ac->addr_str);
error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno);
error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
"Connection refused");
break;
default:
gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d",
ac->addr_str, so_error);
error = grpc_os_error(errno, "getsockopt(SO_ERROR)");
break;
}
goto finish;
@ -188,8 +189,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
goto finish;
}
} else {
gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred",
ac->addr_str);
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
@ -203,12 +204,18 @@ finish:
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
"Failed to connect to remote host");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
}
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
gpr_free(ac);
}
grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
grpc_exec_ctx_push(exec_ctx, closure, error, NULL);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@ -225,6 +232,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_fd *fdobj;
char *name;
char *addr_str;
grpc_error *error;
*ep = NULL;
@ -234,9 +242,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_len = sizeof(addr6_v4mapped);
}
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
grpc_exec_ctx_push(exec_ctx, closure, error, NULL);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
/* If we got an AF_INET socket, map the address back to IPv4. */
@ -244,8 +253,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
if (!prepare_socket(addr, fd)) {
grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_push(exec_ctx, closure, error, NULL);
return;
}
@ -261,14 +270,14 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
grpc_exec_ctx_push(exec_ctx, closure, grpc_os_error(errno, "connect"),
NULL);
goto done;
}

@ -274,14 +274,13 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
}
}
typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 16
static flush_result tcp_flush(grpc_tcp *tcp) {
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@ -331,10 +330,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
if (errno == EAGAIN) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
return FLUSH_PENDING;
return false;
} else {
/* TODO(klempner): Log some of these */
return FLUSH_ERROR;
*error = grpc_os_error(errno, "sendmsg");
return true;
}
}
@ -355,7 +354,8 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
}
if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
return FLUSH_DONE;
*error = GRPC_ERROR_NONE;
return true;
}
};
}
@ -363,7 +363,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
bool success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
flush_result status;
grpc_error *error = GRPC_ERROR_NONE;
grpc_closure *cb;
if (!success) {
@ -374,14 +374,13 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
return;
}
status = tcp_flush(tcp);
if (status == FLUSH_PENDING) {
if (!tcp_flush(tcp, &error)) {
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
cb->cb(exec_ctx, cb->cb_arg, error);
GPR_TIMER_END("tcp_handle_write.cb", 0);
TCP_UNREF(exec_ctx, tcp, "write");
}
@ -390,7 +389,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
flush_result status;
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
size_t i;
@ -408,20 +407,19 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
status = tcp_flush(tcp);
if (status == FLUSH_PENDING) {
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
grpc_exec_ctx_push(exec_ctx, cb, error, NULL);
}
GPR_TIMER_END("tcp_write", 0);

@ -92,6 +92,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',

@ -805,6 +805,7 @@ src/core/lib/http/parser.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
src/core/lib/iomgr/error.h \
src/core/lib/iomgr/ev_poll_and_epoll_posix.h \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
@ -942,6 +943,7 @@ src/core/lib/iomgr/closure.c \
src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/error.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \

@ -5571,6 +5571,7 @@
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
@ -5670,6 +5671,8 @@
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/error.c",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_posix.c",

@ -314,6 +314,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
@ -471,6 +472,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\error.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">

@ -55,6 +55,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\error.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -635,6 +638,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -303,6 +303,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\closure.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
@ -448,6 +449,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\error.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">

@ -58,6 +58,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair_windows.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\error.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -569,6 +572,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\error.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save