Refactor Endpoint API

- Allow reads to complete immediately
- Unify read/write signatures
- Simplify memory management to allow future optimization work
pull/3025/head
Craig Tiller 9 years ago
parent 4f21d3549c
commit 592e7f2dd0
  1. 2
      include/grpc/support/slice_buffer.h
  2. 95
      src/core/httpcli/httpcli.c
  3. 17
      src/core/iomgr/endpoint.c
  4. 51
      src/core/iomgr/endpoint.h
  5. 520
      src/core/iomgr/tcp_posix.c
  6. 188
      src/core/security/secure_endpoint.c
  7. 119
      src/core/security/secure_transport_setup.c
  8. 22
      src/core/support/slice_buffer.c
  9. 10
      src/core/transport/chttp2/internal.h
  10. 21
      src/core/transport/chttp2/writing.c
  11. 125
      src/core/transport/chttp2_transport.c
  12. 17
      test/core/bad_client/bad_client.c
  13. 204
      test/core/iomgr/endpoint_tests.c
  14. 147
      test/core/iomgr/tcp_posix_test.c
  15. 55
      test/core/security/secure_endpoint_test.c

@ -86,6 +86,8 @@ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb);
void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b); void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
/* move all of the elements of src into dst */ /* move all of the elements of src into dst */
void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst); void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
/* remove n bytes from the end of a slice buffer */
void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n);
#ifdef __cplusplus #ifdef __cplusplus
} }

@ -61,6 +61,10 @@ typedef struct {
grpc_httpcli_context *context; grpc_httpcli_context *context;
grpc_pollset *pollset; grpc_pollset *pollset;
grpc_iomgr_object iomgr_obj; grpc_iomgr_object iomgr_obj;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
grpc_iomgr_closure on_read;
grpc_iomgr_closure done_write;
} internal_request; } internal_request;
static grpc_httpcli_get_override g_get_override = NULL; static grpc_httpcli_get_override g_get_override = NULL;
@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) {
gpr_slice_unref(req->request_text); gpr_slice_unref(req->request_text);
gpr_free(req->host); gpr_free(req->host);
grpc_iomgr_unregister_object(&req->iomgr_obj); grpc_iomgr_unregister_object(&req->iomgr_obj);
gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing);
gpr_free(req); gpr_free(req);
} }
static void on_read(void *user_data, gpr_slice *slices, size_t nslices, static void on_read(void *user_data, int success);
grpc_endpoint_cb_status status) {
static void do_read(internal_request *req) {
switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
case GRPC_ENDPOINT_DONE:
on_read(req, 1);
break;
case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_ERROR:
on_read(req, 0);
break;
}
}
static void on_read(void *user_data, int success) {
internal_request *req = user_data; internal_request *req = user_data;
size_t i; size_t i;
for (i = 0; i < nslices; i++) { for (i = 0; i < req->incoming.count; i++) {
if (GPR_SLICE_LENGTH(slices[i])) { if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
req->have_read_byte = 1; req->have_read_byte = 1;
if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) { if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
finish(req, 0); finish(req, 0);
goto done; return;
} }
} }
} }
switch (status) { if (success) {
case GRPC_ENDPOINT_CB_OK: do_read(req);
grpc_endpoint_notify_on_read(req->ep, on_read, req); } else if (!req->have_read_byte) {
break; next_address(req);
case GRPC_ENDPOINT_CB_EOF: } else {
case GRPC_ENDPOINT_CB_ERROR: finish(req, grpc_httpcli_parser_eof(&req->parser));
case GRPC_ENDPOINT_CB_SHUTDOWN:
if (!req->have_read_byte) {
next_address(req);
} else {
finish(req, grpc_httpcli_parser_eof(&req->parser));
}
break;
}
done:
for (i = 0; i < nslices; i++) {
gpr_slice_unref(slices[i]);
} }
} }
static void on_written(internal_request *req) { static void on_written(internal_request *req) { do_read(req); }
grpc_endpoint_notify_on_read(req->ep, on_read, req);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) { static void done_write(void *arg, int success) {
internal_request *req = arg; internal_request *req = arg;
switch (status) { if (success) {
case GRPC_ENDPOINT_CB_OK: on_written(req);
on_written(req); } else {
break; next_address(req);
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_SHUTDOWN:
case GRPC_ENDPOINT_CB_ERROR:
next_address(req);
break;
} }
} }
static void start_write(internal_request *req) { static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text); gpr_slice_ref(req->request_text);
switch ( gpr_slice_buffer_add(&req->outgoing, req->request_text);
grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
case GRPC_ENDPOINT_WRITE_DONE: case GRPC_ENDPOINT_DONE:
on_written(req); on_written(req);
break; break;
case GRPC_ENDPOINT_WRITE_PENDING: case GRPC_ENDPOINT_PENDING:
break; break;
case GRPC_ENDPOINT_WRITE_ERROR: case GRPC_ENDPOINT_ERROR:
finish(req, 0); finish(req, 0);
break; break;
} }
@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context; req->context = context;
req->pollset = pollset; req->pollset = pollset;
grpc_iomgr_closure_init(&req->on_read, on_read, req);
grpc_iomgr_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name); grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name); gpr_free(name);
@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context; req->context = context;
req->pollset = pollset; req->pollset = pollset;
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); grpc_iomgr_closure_init(&req->on_read, on_read, req);
grpc_iomgr_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name); grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name); gpr_free(name);
req->host = gpr_strdup(request->host); req->host = gpr_strdup(request->host);

@ -33,17 +33,16 @@
#include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/endpoint.h"
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
void *user_data) { gpr_slice_buffer *slices,
ep->vtable->notify_on_read(ep, cb, user_data); grpc_iomgr_closure *cb) {
return ep->vtable->read(ep, slices, cb);
} }
grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
gpr_slice *slices, gpr_slice_buffer *slices,
size_t nslices, grpc_iomgr_closure *cb) {
grpc_endpoint_write_cb cb, return ep->vtable->write(ep, slices, cb);
void *user_data) {
return ep->vtable->write(ep, slices, nslices, cb, user_data);
} }
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {

@ -37,6 +37,7 @@
#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h" #include "src/core/iomgr/pollset_set.h"
#include <grpc/support/slice.h> #include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
/* An endpoint caps a streaming channel between two communicating processes. /* An endpoint caps a streaming channel between two communicating processes.
@ -45,31 +46,17 @@
typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
typedef enum grpc_endpoint_cb_status { typedef enum grpc_endpoint_op_status {
GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */
GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */
GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ } grpc_endpoint_op_status;
} grpc_endpoint_cb_status;
typedef enum grpc_endpoint_write_status {
GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */
GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */
} grpc_endpoint_write_status;
typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
size_t nslices,
grpc_endpoint_cb_status error);
typedef void (*grpc_endpoint_write_cb)(void *user_data,
grpc_endpoint_cb_status error);
struct grpc_endpoint_vtable { struct grpc_endpoint_vtable {
void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
void *user_data); grpc_iomgr_closure *cb);
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
size_t nslices, grpc_endpoint_write_cb cb, grpc_iomgr_closure *cb);
void *user_data);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep); void (*shutdown)(grpc_endpoint *ep);
@ -77,9 +64,13 @@ struct grpc_endpoint_vtable {
char *(*get_peer)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep);
}; };
/* When data is available on the connection, calls the callback with slices. */ /* When data is available on the connection, calls the callback with slices.
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, Callback success indicates that the endpoint can accept more reads, failure
void *user_data); indicates the endpoint is closed.
Valid slices may be placed into \a slices even on callback success == 0. */
grpc_endpoint_op_status grpc_endpoint_read(
grpc_endpoint *ep, gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep); char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@ -89,11 +80,9 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
returns GRPC_ENDPOINT_WRITE_DONE. returns GRPC_ENDPOINT_WRITE_DONE.
Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
connection is ready for more data. */ connection is ready for more data. */
grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, grpc_endpoint_op_status grpc_endpoint_write(
gpr_slice *slices, grpc_endpoint *ep, gpr_slice_buffer *slices,
size_t nslices, grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
grpc_endpoint_write_cb cb,
void *user_data);
/* Causes any pending read/write callbacks to run immediately with /* Causes any pending read/write callbacks to run immediately with
GRPC_ENDPOINT_CB_SHUTDOWN status */ GRPC_ENDPOINT_CB_SHUTDOWN status */

@ -61,209 +61,8 @@
#define SENDMSG_FLAGS 0 #define SENDMSG_FLAGS 0
#endif #endif
/* Holds a slice array and associated state. */
typedef struct grpc_tcp_slice_state {
gpr_slice *slices; /* Array of slices */
size_t nslices; /* Size of slices array. */
ssize_t first_slice; /* First valid slice in array */
ssize_t last_slice; /* Last valid slice in array */
gpr_slice working_slice; /* pointer to original final slice */
int working_slice_valid; /* True if there is a working slice */
int memory_owned; /* True if slices array is owned */
} grpc_tcp_slice_state;
int grpc_tcp_trace = 0; int grpc_tcp_trace = 0;
static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
size_t nslices, size_t valid_slices) {
state->slices = slices;
state->nslices = nslices;
if (valid_slices == 0) {
state->first_slice = -1;
} else {
state->first_slice = 0;
}
state->last_slice = valid_slices - 1;
state->working_slice_valid = 0;
state->memory_owned = 0;
}
/* Returns true if there is still available data */
static int slice_state_has_available(grpc_tcp_slice_state *state) {
return state->first_slice != -1 && state->last_slice >= state->first_slice;
}
static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
if (state->first_slice == -1) {
return 0;
} else {
return state->last_slice - state->first_slice + 1;
}
}
static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
/* TODO(klempner): use realloc instead when first_slice is 0 */
/* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
gpr_slice *slices = state->slices;
size_t original_size = slice_state_slices_allocated(state);
size_t i;
gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
for (i = 0; i < original_size; ++i) {
new_slices[i] = slices[i + state->first_slice];
}
state->slices = new_slices;
state->last_slice = original_size - 1;
if (original_size > 0) {
state->first_slice = 0;
} else {
state->first_slice = -1;
}
state->nslices = new_size;
if (state->memory_owned) {
gpr_free(slices);
}
state->memory_owned = 1;
}
static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
size_t prefix_bytes) {
gpr_slice *current_slice = &state->slices[state->first_slice];
size_t current_slice_size;
while (slice_state_has_available(state)) {
current_slice_size = GPR_SLICE_LENGTH(*current_slice);
if (current_slice_size > prefix_bytes) {
/* TODO(klempner): Get rid of the extra refcount created here by adding a
native "trim the first N bytes" operation to splice */
/* TODO(klempner): This really shouldn't be modifying the current slice
unless we own the slices array. */
gpr_slice tail;
tail = gpr_slice_split_tail(current_slice, prefix_bytes);
gpr_slice_unref(*current_slice);
*current_slice = tail;
return;
} else {
gpr_slice_unref(*current_slice);
++state->first_slice;
++current_slice;
prefix_bytes -= current_slice_size;
}
}
}
static void slice_state_destroy(grpc_tcp_slice_state *state) {
while (slice_state_has_available(state)) {
gpr_slice_unref(state->slices[state->first_slice]);
++state->first_slice;
}
if (state->memory_owned) {
gpr_free(state->slices);
state->memory_owned = 0;
}
}
void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
gpr_slice **slices, size_t *nslices) {
*slices = state->slices + state->first_slice;
*nslices = state->last_slice - state->first_slice + 1;
state->first_slice = -1;
state->last_slice = -1;
}
/* Fills iov with the first min(iov_size, available) slices, returns number
filled */
static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
struct iovec *iov, size_t iov_size) {
size_t nslices = state->last_slice - state->first_slice + 1;
gpr_slice *slices = state->slices + state->first_slice;
size_t i;
if (nslices < iov_size) {
iov_size = nslices;
}
for (i = 0; i < iov_size; ++i) {
iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
}
return iov_size;
}
/* Makes n blocks available at the end of state, writes them into iov, and
returns the number of bytes allocated */
static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
struct iovec *iov, size_t n,
size_t slice_size) {
size_t target_size;
size_t i;
size_t allocated_bytes;
ssize_t allocated_slices = slice_state_slices_allocated(state);
if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
/* Need to grow the slice array */
target_size = state->nslices;
do {
target_size = target_size * 2;
} while (target_size < allocated_slices + n - state->working_slice_valid);
/* TODO(klempner): If this ever needs to support both prefix removal and
append, we should be smarter about the growth logic here */
slice_state_realloc(state, target_size);
}
i = 0;
allocated_bytes = 0;
if (state->working_slice_valid) {
iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
GPR_SLICE_LENGTH(state->slices[state->last_slice]);
allocated_bytes += iov[0].iov_len;
++i;
state->slices[state->last_slice] = state->working_slice;
state->working_slice_valid = 0;
}
for (; i < n; ++i) {
++state->last_slice;
state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
iov[i].iov_len = slice_size;
allocated_bytes += slice_size;
}
if (state->first_slice == -1) {
state->first_slice = 0;
}
return allocated_bytes;
}
/* Remove the last n bytes from state */
/* TODO(klempner): Consider having this defer actual deletion until later */
static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
while (bytes > 0 && slice_state_has_available(state)) {
if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
state->working_slice = state->slices[state->last_slice];
state->working_slice_valid = 1;
/* TODO(klempner): Combine these into a single operation that doesn't need
to refcount */
gpr_slice_unref(gpr_slice_split_tail(
&state->slices[state->last_slice],
GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
bytes = 0;
} else {
bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
gpr_slice_unref(state->slices[state->last_slice]);
--state->last_slice;
if (state->last_slice == -1) {
state->first_slice = -1;
}
}
}
}
typedef struct { typedef struct {
grpc_endpoint base; grpc_endpoint base;
grpc_fd *em_fd; grpc_fd *em_fd;
@ -273,12 +72,15 @@ typedef struct {
size_t slice_size; size_t slice_size;
gpr_refcount refcount; gpr_refcount refcount;
grpc_endpoint_read_cb read_cb; gpr_slice_buffer *incoming_buffer;
void *read_user_data; gpr_slice_buffer *outgoing_buffer;
grpc_endpoint_write_cb write_cb; /** slice within outgoing_buffer to write next */
void *write_user_data; size_t outgoing_slice_idx;
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
size_t outgoing_byte_idx;
grpc_tcp_slice_state write_state; grpc_iomgr_closure *read_cb;
grpc_iomgr_closure *write_cb;
grpc_iomgr_closure read_closure; grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure; grpc_iomgr_closure write_closure;
@ -288,65 +90,95 @@ typedef struct {
char *peer_string; char *peer_string;
} grpc_tcp; } grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(tcp->em_fd); grpc_fd_shutdown(tcp->em_fd);
} }
static void grpc_tcp_unref(grpc_tcp *tcp) { static void tcp_free(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount); grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
if (refcount_zero) { gpr_free(tcp->peer_string);
grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); gpr_free(tcp);
gpr_free(tcp->peer_string); }
gpr_free(tcp);
#define GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count + 1);
gpr_ref(&tcp->refcount);
}
#ifdef GRPC_TCP_REFCOUNT_DEBUG
#else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
} }
} }
static void grpc_tcp_destroy(grpc_endpoint *ep) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void tcp_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_tcp_unref(tcp); TCP_UNREF(tcp, "destroy");
} }
static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, static void call_read_cb(grpc_tcp *tcp, int success) {
grpc_endpoint_cb_status status) { grpc_iomgr_closure *cb = tcp->read_cb;
grpc_endpoint_read_cb cb = tcp->read_cb;
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
size_t i; size_t i;
gpr_log(GPR_DEBUG, "read: status=%d", status); gpr_log(GPR_DEBUG, "read: success=%d", success);
for (i = 0; i < nslices; i++) { for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
gpr_free(dump); gpr_free(dump);
} }
} }
tcp->read_cb = NULL; tcp->read_cb = NULL;
cb(tcp->read_user_data, slices, nslices, status); tcp->incoming_buffer = NULL;
cb->cb(cb->cb_arg, success);
} }
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4 #define MAX_READ_IOVEC 4
static void grpc_tcp_continue_read(grpc_tcp *tcp) { static void tcp_continue_read(grpc_tcp *tcp) {
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
struct msghdr msg; struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC]; struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes; ssize_t read_bytes;
ssize_t allocated_bytes; size_t i;
struct grpc_tcp_slice_state read_state;
gpr_slice *final_slices;
size_t final_nslices;
GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(!tcp->finished_edge);
GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
allocated_bytes = slice_state_append_blocks_into_iovec( while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
&read_state, iov, tcp->iov_size, tcp->slice_size); gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
gpr_slice_malloc(tcp->slice_size));
}
for (i = 0; i < tcp->incoming_buffer->count; i++) {
iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
}
msg.msg_name = NULL; msg.msg_name = NULL;
msg.msg_namelen = 0; msg.msg_namelen = 0;
@ -362,87 +194,63 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
} while (read_bytes < 0 && errno == EINTR); } while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
* quick EAGAIN and saving a bunch of allocations. */
slice_state_remove_last(&read_state, read_bytes < 0
? allocated_bytes
: allocated_bytes - read_bytes);
}
if (read_bytes < 0) { if (read_bytes < 0) {
/* NB: After calling the user_cb a parallel call of the read handler may /* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */ * be running. */
if (errno == EAGAIN) { if (errno == EAGAIN) {
if (tcp->iov_size > 1) { if (tcp->iov_size > 1) {
tcp->iov_size /= 2; tcp->iov_size /= 2;
} }
if (slice_state_has_available(&read_state)) { /* We've consumed the edge, request a new one */
/* TODO(klempner): We should probably do the call into the application grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
without all this junk on the stack */
/* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
tcp->finished_edge = 1;
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
/* We've consumed the edge, request a new one */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else { } else {
/* TODO(klempner): Log interesting errors */ /* TODO(klempner): Log interesting errors */
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
slice_state_destroy(&read_state); call_read_cb(tcp, 0);
grpc_tcp_unref(tcp); TCP_UNREF(tcp, "read");
} }
} else if (read_bytes == 0) { } else if (read_bytes == 0) {
/* 0 read size ==> end of stream */ /* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
/* there were bytes already read: pass them up to the application */ call_read_cb(tcp, 0);
slice_state_transfer_ownership(&read_state, &final_slices, TCP_UNREF(tcp, "read");
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
} else {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
}
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else { } else {
if (tcp->iov_size < MAX_READ_IOVEC) { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
gpr_slice_buffer_trim_end(tcp->incoming_buffer,
tcp->incoming_buffer->length - read_bytes);
} else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size; ++tcp->iov_size;
} }
GPR_ASSERT(slice_state_has_available(&read_state)); GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); call_read_cb(tcp, 1);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); TCP_UNREF(tcp, "read");
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} }
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
} }
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg; grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(!tcp->finished_edge);
if (!success) { if (!success) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); call_read_cb(tcp, 0);
grpc_tcp_unref(tcp); TCP_UNREF(tcp, "read");
} else { } else {
grpc_tcp_continue_read(tcp); tcp_continue_read(tcp);
} }
} }
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
void *user_data) { gpr_slice_buffer *incoming_buffer,
grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL); GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb; tcp->read_cb = cb;
tcp->read_user_data = user_data; tcp->incoming_buffer = incoming_buffer;
gpr_ref(&tcp->refcount); gpr_slice_buffer_reset_and_unref(incoming_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) { if (tcp->finished_edge) {
tcp->finished_edge = 0; tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
@ -450,18 +258,41 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->handle_read_closure.cb_arg = tcp; tcp->handle_read_closure.cb_arg = tcp;
grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
} }
/* TODO(ctiller): immediate return */
return GRPC_ENDPOINT_PENDING;
} }
#define MAX_WRITE_IOVEC 16 #define MAX_WRITE_IOVEC 16
static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
struct msghdr msg; struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC]; struct iovec iov[MAX_WRITE_IOVEC];
int iov_size; int iov_size;
ssize_t sent_length; ssize_t sent_length;
grpc_tcp_slice_state *state = &tcp->write_state; ssize_t sending_length;
ssize_t trailing;
ssize_t unwind_slice_idx;
ssize_t unwind_byte_idx;
for (;;) { for (;;) {
iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC); sending_length = 0;
unwind_slice_idx = tcp->outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GPR_SLICE_START_PTR(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
tcp->outgoing_byte_idx;
iov[iov_size].iov_len =
GPR_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
tcp->outgoing_byte_idx;
sending_length += iov[iov_size].iov_len;
tcp->outgoing_slice_idx++;
tcp->outgoing_byte_idx = 0;
}
GPR_ASSERT(iov_size > 0);
msg.msg_name = NULL; msg.msg_name = NULL;
msg.msg_namelen = 0; msg.msg_namelen = 0;
@ -480,70 +311,75 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
if (sent_length < 0) { if (sent_length < 0) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
return GRPC_ENDPOINT_WRITE_PENDING; tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
return GRPC_ENDPOINT_PENDING;
} else { } else {
/* TODO(klempner): Log some of these */ /* TODO(klempner): Log some of these */
slice_state_destroy(state); return GRPC_ENDPOINT_ERROR;
return GRPC_ENDPOINT_WRITE_ERROR;
} }
} }
/* TODO(klempner): Probably better to batch this after we finish flushing */ GPR_ASSERT(tcp->outgoing_byte_idx == 0);
slice_state_remove_prefix(state, sent_length); trailing = sending_length - sent_length;
while (trailing > 0) {
ssize_t slice_length;
tcp->outgoing_slice_idx--;
slice_length = GPR_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
if (slice_length > trailing) {
tcp->outgoing_byte_idx = slice_length - trailing;
break;
} else {
trailing -= slice_length;
}
}
if (!slice_state_has_available(state)) { if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
return GRPC_ENDPOINT_WRITE_DONE; return GRPC_ENDPOINT_DONE;
} }
}; };
} }
static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg; grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_write_status write_status; grpc_endpoint_op_status status;
grpc_endpoint_cb_status cb_status; grpc_iomgr_closure *cb;
grpc_endpoint_write_cb cb;
if (!success) { if (!success) {
slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb; cb = tcp->write_cb;
tcp->write_cb = NULL; tcp->write_cb = NULL;
cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); cb->cb(cb->cb_arg, 0);
grpc_tcp_unref(tcp); TCP_UNREF(tcp, "write");
return; return;
} }
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
write_status = grpc_tcp_flush(tcp); status = tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { if (status == GRPC_ENDPOINT_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else { } else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
cb_status = GRPC_ENDPOINT_CB_OK;
} else {
cb_status = GRPC_ENDPOINT_CB_ERROR;
}
cb = tcp->write_cb; cb = tcp->write_cb;
tcp->write_cb = NULL; tcp->write_cb = NULL;
cb(tcp->write_user_data, cb_status); cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
grpc_tcp_unref(tcp); TCP_UNREF(tcp, "write");
} }
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
} }
static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
gpr_slice *slices, gpr_slice_buffer *buf,
size_t nslices, grpc_iomgr_closure *cb) {
grpc_endpoint_write_cb cb,
void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_write_status status; grpc_endpoint_op_status status;
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
size_t i; size_t i;
for (i = 0; i < nslices; i++) { for (i = 0; i < buf->count; i++) {
char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); char *data =
gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
gpr_free(data); gpr_free(data);
} }
@ -551,15 +387,19 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
GPR_ASSERT(tcp->write_cb == NULL); GPR_ASSERT(tcp->write_cb == NULL);
slice_state_init(&tcp->write_state, slices, nslices, nslices);
status = grpc_tcp_flush(tcp); if (buf->length == 0) {
if (status == GRPC_ENDPOINT_WRITE_PENDING) { GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
/* TODO(klempner): Consider inlining rather than malloc for small nslices */ return GRPC_ENDPOINT_DONE;
slice_state_realloc(&tcp->write_state, nslices); }
gpr_ref(&tcp->refcount); tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
status = tcp_flush(tcp);
if (status == GRPC_ENDPOINT_PENDING) {
TCP_REF(tcp, "write");
tcp->write_cb = cb; tcp->write_cb = cb;
tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} }
@ -567,27 +407,25 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
return status; return status;
} }
static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_add_fd(pollset, tcp->em_fd); grpc_pollset_add_fd(pollset, tcp->em_fd);
} }
static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, static void tcp_add_to_pollset_set(grpc_endpoint *ep,
grpc_pollset_set *pollset_set) { grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
} }
static char *grpc_tcp_get_peer(grpc_endpoint *ep) { static char *tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string); return gpr_strdup(tcp->peer_string);
} }
static const grpc_endpoint_vtable vtable = { static const grpc_endpoint_vtable vtable = {
grpc_tcp_notify_on_read, grpc_tcp_write, tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_tcp_shutdown, grpc_tcp_destroy,
grpc_tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) { const char *peer_string) {
@ -597,21 +435,19 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->fd = em_fd->fd; tcp->fd = em_fd->fd;
tcp->read_cb = NULL; tcp->read_cb = NULL;
tcp->write_cb = NULL; tcp->write_cb = NULL;
tcp->read_user_data = NULL; tcp->incoming_buffer = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size; tcp->slice_size = slice_size;
tcp->iov_size = 1; tcp->iov_size = 1;
tcp->finished_edge = 1; tcp->finished_edge = 1;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */ /* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd; tcp->em_fd = em_fd;
tcp->read_closure.cb = grpc_tcp_handle_read; tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp; tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write; tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp; tcp->write_closure.cb_arg = tcp;
tcp->handle_read_closure.cb = grpc_tcp_handle_read; tcp->handle_read_closure.cb = tcp_handle_read;
return &tcp->base; return &tcp->base;
} }

@ -49,15 +49,15 @@ typedef struct {
struct tsi_frame_protector *protector; struct tsi_frame_protector *protector;
gpr_mu protector_mu; gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */ /* saved upper level callbacks and user_data. */
grpc_endpoint_read_cb read_cb; grpc_iomgr_closure *read_cb;
void *read_user_data; grpc_iomgr_closure *write_cb;
grpc_endpoint_write_cb write_cb; grpc_iomgr_closure on_read;
void *write_user_data; gpr_slice_buffer *read_buffer;
gpr_slice_buffer source_buffer;
/* saved handshaker leftover data to unprotect. */ /* saved handshaker leftover data to unprotect. */
gpr_slice_buffer leftover_bytes; gpr_slice_buffer leftover_bytes;
/* buffers for read and write */ /* buffers for read and write */
gpr_slice read_staging_buffer; gpr_slice read_staging_buffer;
gpr_slice_buffer input_buffer;
gpr_slice write_staging_buffer; gpr_slice write_staging_buffer;
gpr_slice_buffer output_buffer; gpr_slice_buffer output_buffer;
@ -67,62 +67,91 @@ typedef struct {
int grpc_trace_secure_endpoint = 0; int grpc_trace_secure_endpoint = 0;
static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
static void destroy(secure_endpoint *secure_ep) { static void destroy(secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep; secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(ep->wrapped_ep); grpc_endpoint_destroy(ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector); tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes); gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer); gpr_slice_unref(ep->read_staging_buffer);
gpr_slice_buffer_destroy(&ep->input_buffer);
gpr_slice_unref(ep->write_staging_buffer); gpr_slice_unref(ep->write_staging_buffer);
gpr_slice_buffer_destroy(&ep->output_buffer); gpr_slice_buffer_destroy(&ep->output_buffer);
gpr_slice_buffer_destroy(&ep->source_buffer);
gpr_mu_destroy(&ep->protector_mu); gpr_mu_destroy(&ep->protector_mu);
gpr_free(ep); gpr_free(ep);
} }
#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
#define SECURE_ENDPOINT_UNREF(ep, reason) \
secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
#define SECURE_ENDPOINT_REF(ep, reason) \
secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
ep, reason, ep->ref.count, ep->ref.count - 1);
if (gpr_unref(&ep->ref)) {
destroy(ep);
}
}
static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d",
ep, reason, ep->ref.count, ep->ref.count + 1);
gpr_ref(&ep->ref);
}
#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
#else
#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
static void secure_endpoint_unref(secure_endpoint *ep) { static void secure_endpoint_unref(secure_endpoint *ep) {
if (gpr_unref(&ep->ref)) { if (gpr_unref(&ep->ref)) {
destroy(ep); destroy(ep);
} }
} }
static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
#endif
static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
gpr_uint8 **end) { gpr_uint8 **end) {
gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer); gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
*cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer); *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
} }
static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices, static void call_read_cb(secure_endpoint *ep, int success) {
grpc_endpoint_cb_status error) {
if (grpc_trace_secure_endpoint) { if (grpc_trace_secure_endpoint) {
size_t i; size_t i;
for (i = 0; i < nslices; i++) { for (i = 0; i < ep->read_buffer->count; i++) {
char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); char *data = gpr_dump_slice(ep->read_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", ep, data); gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
gpr_free(data); gpr_free(data);
} }
} }
ep->read_cb(ep->read_user_data, slices, nslices, error); ep->read_buffer = NULL;
secure_endpoint_unref(ep); ep->read_cb->cb(ep->read_cb->cb_arg, success);
SECURE_ENDPOINT_UNREF(ep, "read");
} }
static void on_read(void *user_data, gpr_slice *slices, size_t nslices, static int on_read(void *user_data, int success) {
grpc_endpoint_cb_status error) {
unsigned i; unsigned i;
gpr_uint8 keep_looping = 0; gpr_uint8 keep_looping = 0;
size_t input_buffer_count = 0;
tsi_result result = TSI_OK; tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)user_data; secure_endpoint *ep = (secure_endpoint *)user_data;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
if (!success) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
return 0;
}
/* TODO(yangg) check error, maybe bail out early */ /* TODO(yangg) check error, maybe bail out early */
for (i = 0; i < nslices; i++) { for (i = 0; i < ep->source_buffer.count; i++) {
gpr_slice encrypted = slices[i]; gpr_slice encrypted = ep->source_buffer.slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted); gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
size_t message_size = GPR_SLICE_LENGTH(encrypted); size_t message_size = GPR_SLICE_LENGTH(encrypted);
@ -161,7 +190,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) { if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
&ep->input_buffer, ep->read_buffer,
gpr_slice_split_head( gpr_slice_split_head(
&ep->read_staging_buffer, &ep->read_staging_buffer,
(size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer)))); (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@ -169,38 +198,53 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
/* TODO(yangg) experiment with moving this block after read_cb to see if it /* TODO(yangg) experiment with moving this block after read_cb to see if it
helps latency */ helps latency */
for (i = 0; i < nslices; i++) { gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
gpr_slice_unref(slices[i]);
}
if (result != TSI_OK) { if (result != TSI_OK) {
gpr_slice_buffer_reset_and_unref(&ep->input_buffer); gpr_slice_buffer_reset_and_unref(ep->read_buffer);
call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR); return 0;
return;
} }
/* The upper level will unref the slices. */
input_buffer_count = ep->input_buffer.count; return 1;
ep->input_buffer.count = 0; }
call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
static void on_read_cb(void *user_data, int success) {
call_read_cb(user_data, on_read(user_data, success));
} }
static void endpoint_notify_on_read(grpc_endpoint *secure_ep, static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
grpc_endpoint_read_cb cb, void *user_data) { gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
secure_endpoint *ep = (secure_endpoint *)secure_ep; secure_endpoint *ep = (secure_endpoint *)secure_ep;
int immediate_read_success = -1;
ep->read_cb = cb; ep->read_cb = cb;
ep->read_user_data = user_data; ep->read_buffer = slices;
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
secure_endpoint_ref(ep);
if (ep->leftover_bytes.count) { if (ep->leftover_bytes.count) {
size_t leftover_nslices = ep->leftover_bytes.count; gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
ep->leftover_bytes.count = 0; GPR_ASSERT(ep->leftover_bytes.count == 0);
on_read(ep, ep->leftover_bytes.slices, leftover_nslices, return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
GRPC_ENDPOINT_CB_OK);
return;
} }
grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep); SECURE_ENDPOINT_REF(ep, "read");
switch (
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
case GRPC_ENDPOINT_DONE:
immediate_read_success = on_read(ep, 1);
break;
case GRPC_ENDPOINT_PENDING:
return GRPC_ENDPOINT_PENDING;
case GRPC_ENDPOINT_ERROR:
immediate_read_success = on_read(ep, 0);
break;
}
GPR_ASSERT(immediate_read_success != -1);
SECURE_ENDPOINT_UNREF(ep, "read");
return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
} }
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@ -211,36 +255,28 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer); *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
} }
static void on_write(void *data, grpc_endpoint_cb_status error) { static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
secure_endpoint *ep = data; gpr_slice_buffer *slices,
ep->write_cb(ep->write_user_data, error); grpc_iomgr_closure *cb) {
secure_endpoint_unref(ep);
}
static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
gpr_slice *slices,
size_t nslices,
grpc_endpoint_write_cb cb,
void *user_data) {
unsigned i; unsigned i;
size_t output_buffer_count = 0;
tsi_result result = TSI_OK; tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep; secure_endpoint *ep = (secure_endpoint *)secure_ep;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
grpc_endpoint_write_status status;
GPR_ASSERT(ep->output_buffer.count == 0); gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
if (grpc_trace_secure_endpoint) { if (grpc_trace_secure_endpoint) {
for (i = 0; i < nslices; i++) { for (i = 0; i < slices->count; i++) {
char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); char *data =
gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data); gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
gpr_free(data); gpr_free(data);
} }
} }
for (i = 0; i < nslices; i++) { for (i = 0; i < slices->count; i++) {
gpr_slice plain = slices[i]; gpr_slice plain = slices->slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
size_t message_size = GPR_SLICE_LENGTH(plain); size_t message_size = GPR_SLICE_LENGTH(plain);
while (message_size > 0) { while (message_size > 0) {
@ -290,29 +326,13 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
} }
} }
for (i = 0; i < nslices; i++) {
gpr_slice_unref(slices[i]);
}
if (result != TSI_OK) { if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */ /* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer); gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
return GRPC_ENDPOINT_WRITE_ERROR; return GRPC_ENDPOINT_ERROR;
} }
/* clear output_buffer and let the lower level handle its slices. */ return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
output_buffer_count = ep->output_buffer.count;
ep->output_buffer.count = 0;
ep->write_cb = cb;
ep->write_user_data = user_data;
/* Need to keep the endpoint alive across a transport */
secure_endpoint_ref(ep);
status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
output_buffer_count, on_write, ep);
if (status != GRPC_ENDPOINT_WRITE_PENDING) {
secure_endpoint_unref(ep);
}
return status;
} }
static void endpoint_shutdown(grpc_endpoint *secure_ep) { static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@ -320,9 +340,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) {
grpc_endpoint_shutdown(ep->wrapped_ep); grpc_endpoint_shutdown(ep->wrapped_ep);
} }
static void endpoint_unref(grpc_endpoint *secure_ep) { static void endpoint_destroy(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep; secure_endpoint *ep = (secure_endpoint *)secure_ep;
secure_endpoint_unref(ep); SECURE_ENDPOINT_UNREF(ep, "destroy");
} }
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@ -343,9 +363,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
} }
static const grpc_endpoint_vtable vtable = { static const grpc_endpoint_vtable vtable = {
endpoint_notify_on_read, endpoint_write, endpoint_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set, endpoint_add_to_pollset, endpoint_add_to_pollset_set,
endpoint_shutdown, endpoint_unref, endpoint_shutdown, endpoint_destroy,
endpoint_get_peer}; endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create( grpc_endpoint *grpc_secure_endpoint_create(
@ -363,8 +383,10 @@ grpc_endpoint *grpc_secure_endpoint_create(
} }
ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
gpr_slice_buffer_init(&ep->input_buffer);
gpr_slice_buffer_init(&ep->output_buffer); gpr_slice_buffer_init(&ep->output_buffer);
gpr_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu); gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1); gpr_ref_init(&ep->ref, 1);
return &ep->base; return &ep->base;

@ -50,16 +50,17 @@ typedef struct {
grpc_endpoint *wrapped_endpoint; grpc_endpoint *wrapped_endpoint;
grpc_endpoint *secure_endpoint; grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs; gpr_slice_buffer left_overs;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb; grpc_secure_transport_setup_done_cb cb;
void *user_data; void *user_data;
grpc_iomgr_closure on_handshake_data_sent_to_peer;
grpc_iomgr_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup; } grpc_secure_transport_setup;
static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices, static void on_handshake_data_received_from_peer(void *setup, int success);
size_t nslices,
grpc_endpoint_cb_status error);
static void on_handshake_data_sent_to_peer(void *setup, static void on_handshake_data_sent_to_peer(void *setup, int success);
grpc_endpoint_cb_status error);
static void secure_transport_setup_done(grpc_secure_transport_setup *s, static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) { int is_success) {
@ -78,6 +79,8 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
gpr_slice_buffer_destroy(&s->left_overs); gpr_slice_buffer_destroy(&s->left_overs);
gpr_slice_buffer_destroy(&s->outgoing);
gpr_slice_buffer_destroy(&s->incoming);
GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
gpr_free(s); gpr_free(s);
} }
@ -102,6 +105,8 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
s->secure_endpoint = s->secure_endpoint =
grpc_secure_endpoint_create(protector, s->wrapped_endpoint, grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
s->left_overs.slices, s->left_overs.count); s->left_overs.slices, s->left_overs.count);
s->left_overs.count = 0;
s->left_overs.length = 0;
secure_transport_setup_done(s, 1); secure_transport_setup_done(s, 1);
return; return;
} }
@ -132,7 +137,6 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
size_t offset = 0; size_t offset = 0;
tsi_result result = TSI_OK; tsi_result result = TSI_OK;
gpr_slice to_send; gpr_slice to_send;
grpc_endpoint_write_status write_status;
do { do {
size_t to_send_size = s->handshake_buffer_size - offset; size_t to_send_size = s->handshake_buffer_size - offset;
@ -155,28 +159,25 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
to_send = to_send =
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
gpr_slice_buffer_reset_and_unref(&s->outgoing);
gpr_slice_buffer_add(&s->outgoing, to_send);
/* TODO(klempner,jboeuf): This should probably use the client setup /* TODO(klempner,jboeuf): This should probably use the client setup
deadline */ deadline */
write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
on_handshake_data_sent_to_peer, s); &s->on_handshake_data_sent_to_peer)) {
if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { case GRPC_ENDPOINT_ERROR:
gpr_log(GPR_ERROR, "Could not send handshake data to peer."); gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
secure_transport_setup_done(s, 0); secure_transport_setup_done(s, 0);
} else if (write_status == GRPC_ENDPOINT_WRITE_DONE) { break;
on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK); case GRPC_ENDPOINT_DONE:
} on_handshake_data_sent_to_peer(s, 1);
} break;
case GRPC_ENDPOINT_PENDING:
static void cleanup_slices(gpr_slice *slices, size_t num_slices) { break;
size_t i;
for (i = 0; i < num_slices; i++) {
gpr_slice_unref(slices[i]);
} }
} }
static void on_handshake_data_received_from_peer( static void on_handshake_data_received_from_peer(void *setup, int success) {
void *setup, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
grpc_secure_transport_setup *s = setup; grpc_secure_transport_setup *s = setup;
size_t consumed_slice_size = 0; size_t consumed_slice_size = 0;
tsi_result result = TSI_OK; tsi_result result = TSI_OK;
@ -184,32 +185,37 @@ static void on_handshake_data_received_from_peer(
size_t num_left_overs; size_t num_left_overs;
int has_left_overs_in_current_slice = 0; int has_left_overs_in_current_slice = 0;
if (error != GRPC_ENDPOINT_CB_OK) { if (!success) {
gpr_log(GPR_ERROR, "Read failed."); gpr_log(GPR_ERROR, "Read failed.");
cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0); secure_transport_setup_done(s, 0);
return; return;
} }
for (i = 0; i < nslices; i++) { for (i = 0; i < s->incoming.count; i++) {
consumed_slice_size = GPR_SLICE_LENGTH(slices[i]); consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
result = tsi_handshaker_process_bytes_from_peer( result = tsi_handshaker_process_bytes_from_peer(
s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size); s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
&consumed_slice_size);
if (!tsi_handshaker_is_in_progress(s->handshaker)) break; if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
} }
if (tsi_handshaker_is_in_progress(s->handshaker)) { if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* We may need more data. */ /* We may need more data. */
if (result == TSI_INCOMPLETE_DATA) { if (result == TSI_INCOMPLETE_DATA) {
/* TODO(klempner,jboeuf): This should probably use the client setup switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
deadline */ &s->on_handshake_data_received_from_peer)) {
grpc_endpoint_notify_on_read(s->wrapped_endpoint, case GRPC_ENDPOINT_DONE:
on_handshake_data_received_from_peer, setup); on_handshake_data_received_from_peer(s, 1);
cleanup_slices(slices, nslices); break;
case GRPC_ENDPOINT_ERROR:
on_handshake_data_received_from_peer(s, 0);
break;
case GRPC_ENDPOINT_PENDING:
break;
}
return; return;
} else { } else {
send_handshake_bytes_to_peer(s); send_handshake_bytes_to_peer(s);
cleanup_slices(slices, nslices);
return; return;
} }
} }
@ -217,42 +223,40 @@ static void on_handshake_data_received_from_peer(
if (result != TSI_OK) { if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s", gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result)); tsi_result_to_string(result));
cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0); secure_transport_setup_done(s, 0);
return; return;
} }
/* Handshake is done and successful this point. */ /* Handshake is done and successful this point. */
has_left_overs_in_current_slice = has_left_overs_in_current_slice =
(consumed_slice_size < GPR_SLICE_LENGTH(slices[i])); (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1; num_left_overs =
(has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
if (num_left_overs == 0) { if (num_left_overs == 0) {
cleanup_slices(slices, nslices);
check_peer(s); check_peer(s);
return; return;
} }
cleanup_slices(slices, nslices - num_left_overs);
/* Put the leftovers in our buffer (ownership transfered). */ /* Put the leftovers in our buffer (ownership transfered). */
if (has_left_overs_in_current_slice) { if (has_left_overs_in_current_slice) {
gpr_slice_buffer_add(&s->left_overs, gpr_slice_buffer_add(
gpr_slice_split_tail(&slices[i], consumed_slice_size)); &s->left_overs,
gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */ gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
gpr_slice_unref(
s->incoming.slices[i]); /* split_tail above increments refcount. */
} }
gpr_slice_buffer_addn( gpr_slice_buffer_addn(
&s->left_overs, &slices[i + 1], &s->left_overs, &s->incoming.slices[i + 1],
num_left_overs - (size_t)has_left_overs_in_current_slice); num_left_overs - (size_t)has_left_overs_in_current_slice);
check_peer(s); check_peer(s);
} }
/* If setup is NULL, the setup is done. */ /* If setup is NULL, the setup is done. */
static void on_handshake_data_sent_to_peer(void *setup, static void on_handshake_data_sent_to_peer(void *setup, int success) {
grpc_endpoint_cb_status error) {
grpc_secure_transport_setup *s = setup; grpc_secure_transport_setup *s = setup;
/* Make sure that write is OK. */ /* Make sure that write is OK. */
if (error != GRPC_ENDPOINT_CB_OK) { if (!success) {
gpr_log(GPR_ERROR, "Write failed with error %d.", error); gpr_log(GPR_ERROR, "Write failed.");
if (setup != NULL) secure_transport_setup_done(s, 0); if (setup != NULL) secure_transport_setup_done(s, 0);
return; return;
} }
@ -261,8 +265,17 @@ static void on_handshake_data_sent_to_peer(void *setup,
if (tsi_handshaker_is_in_progress(s->handshaker)) { if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup /* TODO(klempner,jboeuf): This should probably use the client setup
deadline */ deadline */
grpc_endpoint_notify_on_read(s->wrapped_endpoint, switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
on_handshake_data_received_from_peer, setup); &s->on_handshake_data_received_from_peer)) {
case GRPC_ENDPOINT_ERROR:
on_handshake_data_received_from_peer(s, 0);
break;
case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_DONE:
on_handshake_data_received_from_peer(s, 1);
break;
}
} else { } else {
check_peer(s); check_peer(s);
} }
@ -288,6 +301,12 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint; s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data; s->user_data = user_data;
s->cb = cb; s->cb = cb;
grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, s);
grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs); gpr_slice_buffer_init(&s->left_overs);
gpr_slice_buffer_init(&s->outgoing);
gpr_slice_buffer_init(&s->incoming);
send_handshake_bytes_to_peer(s); send_handshake_bytes_to_peer(s);
} }

@ -207,3 +207,25 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->count = 0; src->count = 0;
src->length = 0; src->length = 0;
} }
void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
GPR_ASSERT(n <= sb->length);
sb->length -= n;
for (;;) {
size_t idx = sb->count - 1;
gpr_slice slice = sb->slices[idx];
size_t slice_len = GPR_SLICE_LENGTH(slice);
if (slice_len > n) {
sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
return;
} else if (slice_len == n) {
gpr_slice_unref(slice);
sb->count = idx;
return;
} else {
gpr_slice_unref(slice);
n -= slice_len;
sb->count = idx;
}
}
}

@ -214,6 +214,8 @@ typedef struct {
grpc_chttp2_hpack_compressor hpack_compressor; grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */ /** is this a client? */
gpr_uint8 is_client; gpr_uint8 is_client;
/** callback for when writing is done */
grpc_iomgr_closure done_cb;
} grpc_chttp2_transport_writing; } grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing { struct grpc_chttp2_transport_parsing {
@ -331,6 +333,11 @@ struct grpc_chttp2_transport {
grpc_iomgr_closure writing_action; grpc_iomgr_closure writing_action;
/** closure to start reading from the endpoint */ /** closure to start reading from the endpoint */
grpc_iomgr_closure reading_action; grpc_iomgr_closure reading_action;
/** closure to finish reading from the endpoint */
grpc_iomgr_closure recv_data;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
/** address to place a newly accepted stream - set and unset by /** address to place a newly accepted stream - set and unset by
grpc_chttp2_parsing_accept_stream; used by init_stream to grpc_chttp2_parsing_accept_stream; used by init_stream to
@ -463,8 +470,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing); grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes( void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing( void grpc_chttp2_terminate_writing(void *transport_writing, int success);
grpc_chttp2_transport_writing *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing); grpc_chttp2_transport_writing *writing);

@ -37,7 +37,6 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes( int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
@ -165,16 +164,15 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint); GPR_ASSERT(endpoint);
switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
transport_writing->outbuf.count, finish_write_cb, &transport_writing->done_cb)) {
transport_writing)) { case GRPC_ENDPOINT_DONE:
case GRPC_ENDPOINT_WRITE_DONE:
grpc_chttp2_terminate_writing(transport_writing, 1); grpc_chttp2_terminate_writing(transport_writing, 1);
break; break;
case GRPC_ENDPOINT_WRITE_ERROR: case GRPC_ENDPOINT_ERROR:
grpc_chttp2_terminate_writing(transport_writing, 0); grpc_chttp2_terminate_writing(transport_writing, 0);
break; break;
case GRPC_ENDPOINT_WRITE_PENDING: case GRPC_ENDPOINT_PENDING:
break; break;
} }
} }
@ -209,12 +207,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
} }
} }
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
grpc_chttp2_transport_writing *transport_writing = tw;
grpc_chttp2_terminate_writing(transport_writing,
write_status == GRPC_ENDPOINT_CB_OK);
}
void grpc_chttp2_cleanup_writing( void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_transport_writing *transport_writing) {
@ -243,6 +235,5 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_list_add_read_write_state_changed(transport_global, grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global); stream_global);
} }
transport_writing->outbuf.count = 0; gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
transport_writing->outbuf.length = 0;
} }

@ -91,8 +91,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value); gpr_uint32 value);
/** Endpoint callback to process incoming data */ /** Endpoint callback to process incoming data */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void recv_data(void *tp, int success);
grpc_endpoint_cb_status error);
/** Start disconnection chain */ /** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t); static void drop_connection(grpc_chttp2_transport *t);
@ -143,6 +142,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf); gpr_slice_buffer_destroy(&t->parsing.qbuf);
gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
@ -255,6 +255,11 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
&t->writing);
grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) { if (is_client) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
&t->global.qbuf, &t->global.qbuf,
@ -502,8 +507,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
} }
} }
void grpc_chttp2_terminate_writing( void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
grpc_chttp2_transport_writing *transport_writing, int success) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t); lock(t);
@ -1060,66 +1065,56 @@ static void read_error_locked(grpc_chttp2_transport *t) {
} }
/* tcp read callback */ /* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void recv_data(void *tp, int success) {
grpc_endpoint_cb_status error) {
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
size_t i; size_t i;
int unref = 0; int unref = 0;
switch (error) { lock(t);
case GRPC_ENDPOINT_CB_SHUTDOWN: i = 0;
case GRPC_ENDPOINT_CB_EOF: GPR_ASSERT(!t->parsing_active);
case GRPC_ENDPOINT_CB_ERROR: if (!t->closed) {
lock(t); t->parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (; i < t->read_buffer.count &&
grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
i++)
;
gpr_mu_lock(&t->mu);
if (i != t->read_buffer.count) {
drop_connection(t); drop_connection(t);
read_error_locked(t); }
unlock(t); /* merge stream lists */
unref = 1; grpc_chttp2_stream_map_move_into(&t->new_stream_map,
for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); &t->parsing_stream_map);
break; t->global.concurrent_stream_count =
case GRPC_ENDPOINT_CB_OK: grpc_chttp2_stream_map_size(&t->parsing_stream_map);
lock(t); if (t->parsing.initial_window_update != 0) {
i = 0; grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
GPR_ASSERT(!t->parsing_active); update_global_window, t);
if (!t->closed) { t->parsing.initial_window_update = 0;
t->parsing_active = 1; }
/* merge stream lists */ /* handle higher level things */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_publish_reads(&t->global, &t->parsing);
&t->parsing_stream_map); t->parsing_active = 0;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing); }
gpr_mu_unlock(&t->mu); if (!success) {
for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); drop_connection(t);
i++) { read_error_locked(t);
gpr_slice_unref(slices[i]); unref = 1;
} } else if (i == t->read_buffer.count) {
gpr_mu_lock(&t->mu); grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
if (i != nslices) { } else {
drop_connection(t); read_error_locked(t);
} unref = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
t->parsing.initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0;
}
if (i == nslices) {
grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
} else {
read_error_locked(t);
unref = 1;
}
unlock(t);
for (; i < nslices; i++) gpr_slice_unref(slices[i]);
break;
} }
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
unlock(t);
if (unref) { if (unref) {
UNREF_TRANSPORT(t, "recv_data"); UNREF_TRANSPORT(t, "recv_data");
} }
@ -1127,7 +1122,16 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void reading_action(void *pt, int iomgr_success_ignored) { static void reading_action(void *pt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = pt; grpc_chttp2_transport *t = pt;
grpc_endpoint_notify_on_read(t->ep, recv_data, t); switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
case GRPC_ENDPOINT_DONE:
recv_data(t, 1);
break;
case GRPC_ENDPOINT_ERROR:
recv_data(t, 0);
break;
case GRPC_ENDPOINT_PENDING:
break;
}
} }
/* /*
@ -1240,5 +1244,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices) { gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
recv_data(t, 1);
} }

@ -59,7 +59,7 @@ static void thd_func(void *arg) {
gpr_event_set(&a->done_thd, (void *)1); gpr_event_set(&a->done_thd, (void *)1);
} }
static void done_write(void *arg, grpc_endpoint_cb_status status) { static void done_write(void *arg, int success) {
thd_args *a = arg; thd_args *a = arg;
gpr_event_set(&a->done_write, (void *)1); gpr_event_set(&a->done_write, (void *)1);
} }
@ -85,6 +85,8 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_mdctx *mdctx = grpc_mdctx_create();
gpr_slice slice = gpr_slice slice =
gpr_slice_from_copied_buffer(client_payload, client_payload_length); gpr_slice_from_copied_buffer(client_payload, client_payload_length);
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_write_closure;
hex = gpr_dump(client_payload, client_payload_length, hex = gpr_dump(client_payload, client_payload_length,
GPR_DUMP_HEX | GPR_DUMP_ASCII); GPR_DUMP_HEX | GPR_DUMP_ASCII);
@ -122,14 +124,18 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
/* Start validator */ /* Start validator */
gpr_thd_new(&id, thd_func, &a, NULL); gpr_thd_new(&id, thd_func, &a, NULL);
gpr_slice_buffer_init(&outgoing);
gpr_slice_buffer_add(&outgoing, slice);
grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
/* Write data */ /* Write data */
switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) { switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
case GRPC_ENDPOINT_WRITE_DONE: case GRPC_ENDPOINT_DONE:
done_write(&a, 1); done_write(&a, 1);
break; break;
case GRPC_ENDPOINT_WRITE_PENDING: case GRPC_ENDPOINT_PENDING:
break; break;
case GRPC_ENDPOINT_WRITE_ERROR: case GRPC_ENDPOINT_ERROR:
done_write(&a, 0); done_write(&a, 0);
break; break;
} }
@ -155,6 +161,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
.type == GRPC_OP_COMPLETE); .type == GRPC_OP_COMPLETE);
grpc_server_destroy(a.server); grpc_server_destroy(a.server);
grpc_completion_queue_destroy(a.cq); grpc_completion_queue_destroy(a.cq);
gpr_slice_buffer_destroy(&outgoing);
grpc_shutdown(); grpc_shutdown();
} }

@ -59,8 +59,7 @@
static grpc_pollset *g_pollset; static grpc_pollset *g_pollset;
size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
int *current_data) {
size_t num_bytes = 0; size_t num_bytes = 0;
size_t i; size_t i;
size_t j; size_t j;
@ -72,7 +71,6 @@ size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256; *current_data = (*current_data + 1) % 256;
} }
num_bytes += GPR_SLICE_LENGTH(slices[i]); num_bytes += GPR_SLICE_LENGTH(slices[i]);
gpr_slice_unref(slices[i]);
} }
return num_bytes; return num_bytes;
} }
@ -121,86 +119,76 @@ struct read_and_write_test_state {
int current_write_data; int current_write_data;
int read_done; int read_done;
int write_done; int write_done;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_read;
grpc_iomgr_closure done_write;
}; };
static void read_and_write_test_read_handler(void *data, gpr_slice *slices, static void read_and_write_test_read_handler(void *data, int success) {
size_t nslices,
grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data; struct read_and_write_test_state *state = data;
GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
gpr_log(GPR_INFO, "Read handler shutdown");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
return;
}
state->bytes_read += state->bytes_read += count_slices(
count_and_unref_slices(slices, nslices, &state->current_read_data); state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes) { if (state->bytes_read == state->target_bytes || !success) {
gpr_log(GPR_INFO, "Read handler done"); gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1; state->read_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL); grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else { } else if (success) {
grpc_endpoint_notify_on_read(state->read_ep, switch (grpc_endpoint_read(state->read_ep, &state->incoming,
read_and_write_test_read_handler, data); &state->done_read)) {
case GRPC_ENDPOINT_ERROR:
read_and_write_test_read_handler(data, 0);
break;
case GRPC_ENDPOINT_DONE:
read_and_write_test_read_handler(data, 1);
break;
case GRPC_ENDPOINT_PENDING:
break;
}
} }
} }
static void read_and_write_test_write_handler(void *data, static void read_and_write_test_write_handler(void *data, int success) {
grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data; struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL; gpr_slice *slices = NULL;
size_t nslices; size_t nslices;
grpc_endpoint_write_status write_status; grpc_endpoint_op_status write_status;
GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); if (success) {
for (;;) {
gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler", /* Need to do inline writes until they don't succeed synchronously or we
error); finish writing */
state->bytes_written += state->current_write_size;
if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { if (state->target_bytes - state->bytes_written <
gpr_log(GPR_INFO, "Write handler shutdown"); state->current_write_size) {
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->current_write_size = state->target_bytes - state->bytes_written;
state->write_done = 1; }
grpc_pollset_kick(g_pollset, NULL); if (state->current_write_size == 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); break;
return; }
}
slices = allocate_blocks(state->current_write_size, 8192, &nslices,
for (;;) { &state->current_write_data);
/* Need to do inline writes until they don't succeed synchronously or we gpr_slice_buffer_reset_and_unref(&state->outgoing);
finish writing */ gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
state->bytes_written += state->current_write_size; write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
if (state->target_bytes - state->bytes_written < &state->done_write);
state->current_write_size) { gpr_log(GPR_DEBUG, "write_status=%d", write_status);
state->current_write_size = state->target_bytes - state->bytes_written; GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
} free(slices);
if (state->current_write_size == 0) { if (write_status == GRPC_ENDPOINT_PENDING) {
break; return;
} }
slices = allocate_blocks(state->current_write_size, 8192, &nslices,
&state->current_write_data);
write_status =
grpc_endpoint_write(state->write_ep, slices, nslices,
read_and_write_test_write_handler, state);
gpr_log(GPR_DEBUG, "write_status=%d", write_status);
GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
free(slices);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
return;
} }
GPR_ASSERT(state->bytes_written == state->target_bytes);
} }
GPR_ASSERT(state->bytes_written == state->target_bytes);
gpr_log(GPR_INFO, "Write handler done"); gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1; state->write_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL); grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} }
@ -234,16 +222,31 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0; state.write_done = 0;
state.current_read_data = 0; state.current_read_data = 0;
state.current_write_data = 0; state.current_write_data = 0;
grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
&state);
grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
&state);
gpr_slice_buffer_init(&state.outgoing);
gpr_slice_buffer_init(&state.incoming);
/* Get started by pretending an initial write completed */ /* Get started by pretending an initial write completed */
/* NOTE: Sets up initial conditions so we can have the same write handler /* NOTE: Sets up initial conditions so we can have the same write handler
for the first iteration as for later iterations. It does the right thing for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */ even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size; state.bytes_written -= state.current_write_size;
read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); read_and_write_test_write_handler(&state, 1);
grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, switch (
&state); grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_ERROR:
read_and_write_test_read_handler(&state, 0);
break;
case GRPC_ENDPOINT_DONE:
read_and_write_test_read_handler(&state, 1);
break;
}
if (shutdown) { if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read"); gpr_log(GPR_DEBUG, "shutdown read");
@ -262,6 +265,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_destroy(state.read_ep); grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep); grpc_endpoint_destroy(state.write_ep);
gpr_slice_buffer_destroy(&state.outgoing);
gpr_slice_buffer_destroy(&state.incoming);
end_test(config); end_test(config);
} }
@ -272,36 +277,40 @@ struct timeout_test_state {
typedef struct { typedef struct {
int done; int done;
grpc_endpoint *ep; grpc_endpoint *ep;
gpr_slice_buffer incoming;
grpc_iomgr_closure done_read;
} shutdown_during_write_test_state; } shutdown_during_write_test_state;
static void shutdown_during_write_test_read_handler( static void shutdown_during_write_test_read_handler(void *user_data,
void *user_data, gpr_slice *slices, size_t nslices, int success) {
grpc_endpoint_cb_status error) {
size_t i;
shutdown_during_write_test_state *st = user_data; shutdown_during_write_test_state *st = user_data;
for (i = 0; i < nslices; i++) { if (!success) {
gpr_slice_unref(slices[i]);
}
if (error != GRPC_ENDPOINT_CB_OK) {
grpc_endpoint_destroy(st->ep); grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
st->done = error; st->done = 1;
grpc_pollset_kick(g_pollset, NULL); grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else { } else {
grpc_endpoint_notify_on_read( switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
st->ep, shutdown_during_write_test_read_handler, user_data); case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_ERROR:
shutdown_during_write_test_read_handler(user_data, 0);
break;
case GRPC_ENDPOINT_DONE:
shutdown_during_write_test_read_handler(user_data, 1);
break;
}
} }
} }
static void shutdown_during_write_test_write_handler( static void shutdown_during_write_test_write_handler(void *user_data,
void *user_data, grpc_endpoint_cb_status error) { int success) {
shutdown_during_write_test_state *st = user_data; shutdown_during_write_test_state *st = user_data;
gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d", gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
error); success);
if (error == 0) { if (success) {
/* This happens about 0.5% of the time when run under TSAN, and is entirely /* This happens about 0.5% of the time when run under TSAN, and is entirely
legitimate, but means we aren't testing the path we think we are. */ legitimate, but means we aren't testing the path we think we are. */
/* TODO(klempner): Change this test to retry the write in that case */ /* TODO(klempner): Change this test to retry the write in that case */
@ -324,6 +333,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
shutdown_during_write_test_state read_st; shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st; shutdown_during_write_test_state write_st;
gpr_slice *slices; gpr_slice *slices;
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_write;
grpc_endpoint_test_fixture f = grpc_endpoint_test_fixture f =
begin_test(config, "shutdown_during_write_test", slice_size); begin_test(config, "shutdown_during_write_test", slice_size);
@ -334,19 +345,26 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
read_st.done = 0; read_st.done = 0;
write_st.done = 0; write_st.done = 0;
grpc_endpoint_notify_on_read( grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
read_st.ep, shutdown_during_write_test_read_handler, &read_st); &write_st);
grpc_iomgr_closure_init(&read_st.done_read,
shutdown_during_write_test_read_handler, &read_st);
gpr_slice_buffer_init(&read_st.incoming);
gpr_slice_buffer_init(&outgoing);
GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
&read_st.done_read) == GRPC_ENDPOINT_PENDING);
for (size = 1;; size *= 2) { for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data); slices = allocate_blocks(size, 1, &nblocks, &current_data);
switch (grpc_endpoint_write(write_st.ep, slices, nblocks, gpr_slice_buffer_reset_and_unref(&outgoing);
shutdown_during_write_test_write_handler, gpr_slice_buffer_addn(&outgoing, slices, nblocks);
&write_st)) { switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
case GRPC_ENDPOINT_WRITE_DONE: case GRPC_ENDPOINT_DONE:
break; break;
case GRPC_ENDPOINT_WRITE_ERROR: case GRPC_ENDPOINT_ERROR:
gpr_log(GPR_ERROR, "error writing"); gpr_log(GPR_ERROR, "error writing");
abort(); abort();
case GRPC_ENDPOINT_WRITE_PENDING: case GRPC_ENDPOINT_PENDING:
grpc_endpoint_shutdown(write_st.ep); grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@ -365,6 +383,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
} }
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices); gpr_free(slices);
gpr_slice_buffer_destroy(&read_st.incoming);
gpr_slice_buffer_destroy(&outgoing);
end_test(config); end_test(config);
return; return;
} }

@ -118,10 +118,12 @@ struct read_socket_state {
grpc_endpoint *ep; grpc_endpoint *ep;
ssize_t read_bytes; ssize_t read_bytes;
ssize_t target_read_bytes; ssize_t target_read_bytes;
gpr_slice_buffer incoming;
grpc_iomgr_closure read_cb;
}; };
static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, static ssize_t count_slices(gpr_slice *slices, size_t nslices,
int *current_data) { int *current_data) {
ssize_t num_bytes = 0; ssize_t num_bytes = 0;
unsigned i, j; unsigned i, j;
unsigned char *buf; unsigned char *buf;
@ -132,31 +134,41 @@ static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256; *current_data = (*current_data + 1) % 256;
} }
num_bytes += GPR_SLICE_LENGTH(slices[i]); num_bytes += GPR_SLICE_LENGTH(slices[i]);
gpr_slice_unref(slices[i]);
} }
return num_bytes; return num_bytes;
} }
static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, static void read_cb(void *user_data, int success) {
grpc_endpoint_cb_status error) {
struct read_socket_state *state = (struct read_socket_state *)user_data; struct read_socket_state *state = (struct read_socket_state *)user_data;
ssize_t read_bytes; ssize_t read_bytes;
int current_data; int current_data;
GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); GPR_ASSERT(success);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256; current_data = state->read_bytes % 256;
read_bytes = count_and_unref_slices(slices, nslices, &current_data); read_bytes = count_slices(state->incoming.slices, state->incoming.count,
&current_data);
state->read_bytes += read_bytes; state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes); state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) { if (state->read_bytes >= state->target_read_bytes) {
/* empty */ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else { } else {
grpc_endpoint_notify_on_read(state->ep, read_cb, state); switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
case GRPC_ENDPOINT_DONE:
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
read_cb(user_data, 1);
break;
case GRPC_ENDPOINT_ERROR:
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
read_cb(user_data, 0);
break;
case GRPC_ENDPOINT_PENDING:
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
}
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} }
/* Write to a socket, then read from it using the grpc_tcp API. */ /* Write to a socket, then read from it using the grpc_tcp API. */
@ -181,8 +193,19 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.ep = ep; state.ep = ep;
state.read_bytes = 0; state.read_bytes = 0;
state.target_read_bytes = written_bytes; state.target_read_bytes = written_bytes;
gpr_slice_buffer_init(&state.incoming);
grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
grpc_endpoint_notify_on_read(ep, read_cb, &state); switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
case GRPC_ENDPOINT_DONE:
read_cb(&state, 1);
break;
case GRPC_ENDPOINT_ERROR:
read_cb(&state, 0);
break;
case GRPC_ENDPOINT_PENDING:
break;
}
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
@ -192,6 +215,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes); GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep); grpc_endpoint_destroy(ep);
} }
@ -218,8 +242,19 @@ static void large_read_test(ssize_t slice_size) {
state.ep = ep; state.ep = ep;
state.read_bytes = 0; state.read_bytes = 0;
state.target_read_bytes = written_bytes; state.target_read_bytes = written_bytes;
gpr_slice_buffer_init(&state.incoming);
grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
grpc_endpoint_notify_on_read(ep, read_cb, &state); switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
case GRPC_ENDPOINT_DONE:
read_cb(&state, 1);
break;
case GRPC_ENDPOINT_ERROR:
read_cb(&state, 0);
break;
case GRPC_ENDPOINT_PENDING:
break;
}
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
@ -229,6 +264,7 @@ static void large_read_test(ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes); GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep); grpc_endpoint_destroy(ep);
} }
@ -260,8 +296,7 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size,
return slices; return slices;
} }
static void write_done(void *user_data /* write_socket_state */, static void write_done(void *user_data /* write_socket_state */, int success) {
grpc_endpoint_cb_status error) {
struct write_socket_state *state = (struct write_socket_state *)user_data; struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called"); gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@ -336,6 +371,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks; size_t num_blocks;
gpr_slice *slices; gpr_slice *slices;
int current_data = 0; int current_data = 0;
gpr_slice_buffer outgoing;
grpc_iomgr_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@ -352,73 +389,21 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data); slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == gpr_slice_buffer_init(&outgoing);
GRPC_ENDPOINT_WRITE_DONE) { gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
/* Write completed immediately */ grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
read_bytes = drain_socket(sv[0]);
GPR_ASSERT(read_bytes == num_bytes);
} else {
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
grpc_pollset_worker worker;
if (state.write_done) {
break;
}
grpc_pollset_work(&g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
grpc_endpoint_destroy(ep);
gpr_free(slices);
}
static void read_done_for_write_error(void *ud, gpr_slice *slices,
size_t nslices,
grpc_endpoint_cb_status error) {
GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
GPR_ASSERT(nslices == 0);
}
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
socket in parallel with the read. */
static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
int sv[2];
grpc_endpoint *ep;
struct write_socket_state state;
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
grpc_pollset_worker worker;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
num_bytes, slice_size);
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); case GRPC_ENDPOINT_DONE:
grpc_endpoint_add_to_pollset(ep, &g_pollset);
close(sv[0]);
state.ep = ep;
state.write_done = 0;
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
case GRPC_ENDPOINT_WRITE_DONE:
case GRPC_ENDPOINT_WRITE_ERROR:
/* Write completed immediately */ /* Write completed immediately */
read_bytes = drain_socket(sv[0]);
GPR_ASSERT(read_bytes == num_bytes);
break; break;
case GRPC_ENDPOINT_WRITE_PENDING: case GRPC_ENDPOINT_PENDING:
grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) { for (;;) {
grpc_pollset_worker worker;
if (state.write_done) { if (state.write_done) {
break; break;
} }
@ -426,10 +411,14 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break; break;
case GRPC_ENDPOINT_ERROR:
gpr_log(GPR_ERROR, "endpoint got error");
abort();
} }
gpr_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(ep); grpc_endpoint_destroy(ep);
free(slices); gpr_free(slices);
} }
void run_tests(void) { void run_tests(void) {
@ -448,10 +437,6 @@ void run_tests(void) {
write_test(100000, 1); write_test(100000, 1);
write_test(100000, 137); write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_error_test(40320, i);
}
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i); write_test(40320, i);
} }

@ -135,62 +135,26 @@ static grpc_endpoint_test_config configs[] = {
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
}; };
static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
gpr_slice s =
gpr_slice_from_copied_string("hello world 12345678900987654321");
GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
GPR_ASSERT(nslices == 1);
GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
gpr_slice_unref(slices[0]);
gpr_slice_unref(s);
*(int *)user_data = 1;
}
static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
grpc_endpoint_test_fixture f = config.create_fixture(slice_size); grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
int verified = 0; gpr_slice_buffer incoming;
gpr_slice s =
gpr_slice_from_copied_string("hello world 12345678900987654321");
gpr_log(GPR_INFO, "Start test left over"); gpr_log(GPR_INFO, "Start test left over");
grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified); gpr_slice_buffer_init(&incoming);
GPR_ASSERT(verified == 1); GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
GRPC_ENDPOINT_DONE);
GPR_ASSERT(incoming.count == 1);
GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
grpc_endpoint_shutdown(f.client_ep); grpc_endpoint_shutdown(f.client_ep);
grpc_endpoint_shutdown(f.server_ep); grpc_endpoint_shutdown(f.server_ep);
grpc_endpoint_destroy(f.client_ep); grpc_endpoint_destroy(f.client_ep);
grpc_endpoint_destroy(f.server_ep); grpc_endpoint_destroy(f.server_ep);
clean_up();
}
static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
grpc_endpoint_test_fixture *f = user_data;
gpr_slice s =
gpr_slice_from_copied_string("hello world 12345678900987654321");
GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
GPR_ASSERT(nslices == 1);
grpc_endpoint_shutdown(f->client_ep);
grpc_endpoint_destroy(f->client_ep);
GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
gpr_slice_unref(slices[0]);
gpr_slice_unref(s); gpr_slice_unref(s);
} gpr_slice_buffer_destroy(&incoming);
/* test which destroys the ep before finishing reading */
static void test_destroy_ep_early(grpc_endpoint_test_config config,
size_t slice_size) {
grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
gpr_log(GPR_INFO, "Start test destroy early");
grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
grpc_endpoint_shutdown(f.server_ep);
grpc_endpoint_destroy(f.server_ep);
clean_up(); clean_up();
} }
@ -203,7 +167,6 @@ int main(int argc, char **argv) {
grpc_pollset_init(&g_pollset); grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset); grpc_endpoint_tests(configs[0], &g_pollset);
test_leftover(configs[1], 1); test_leftover(configs[1], 1);
test_destroy_ep_early(configs[1], 1);
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown(); grpc_iomgr_shutdown();

Loading…
Cancel
Save