Update Windows to new endpoint API

Also solve an infinite recursion in chttp2_transport
pull/3025/head
Craig Tiller 10 years ago
parent 592e7f2dd0
commit ec9acabb4f
  1. 124
      src/core/iomgr/tcp_windows.c
  2. 2
      src/core/transport/chttp2/internal.h
  3. 64
      src/core/transport/chttp2_transport.c

@ -82,13 +82,11 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */ /* Refcounting how many operations are in progress. */
gpr_refcount refcount; gpr_refcount refcount;
grpc_endpoint_read_cb read_cb; grpc_iomgr_closure *read_cb;
void *read_user_data; grpc_iomgr_closure *write_cb;
gpr_slice read_slice; gpr_slice read_slice;
gpr_slice_buffer *write_slices;
grpc_endpoint_write_cb write_cb; gpr_slice_buffer *read_slices;
void *write_user_data;
gpr_slice_buffer write_slices;
/* The IO Completion Port runs from another thread. We need some mechanism /* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */ to protect ourselves when requesting a shutdown. */
@ -102,7 +100,6 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void tcp_unref(grpc_tcp *tcp) { static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket); grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu); gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string); gpr_free(tcp->peer_string);
@ -111,21 +108,16 @@ static void tcp_unref(grpc_tcp *tcp) {
} }
/* Asynchronous callback from the IOCP, or the background thread. */ /* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(void *tcpp, int from_iocp) { static int on_read(grpc_tcp *tcp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *socket = tcp->socket; grpc_winsocket *socket = tcp->socket;
gpr_slice sub; gpr_slice sub;
gpr_slice *slice = NULL; gpr_slice *slice = NULL;
size_t nslices = 0; size_t nslices = 0;
grpc_endpoint_cb_status status; int success;
grpc_endpoint_read_cb cb;
grpc_winsocket_callback_info *info = &socket->read_info; grpc_winsocket_callback_info *info = &socket->read_info;
void *opaque = tcp->read_user_data;
int do_abort = 0; int do_abort = 0;
gpr_mu_lock(&tcp->mu); gpr_mu_lock(&tcp->mu);
cb = tcp->read_cb;
tcp->read_cb = NULL;
if (!from_iocp || tcp->shutting_down) { if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to /* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen shutting down the endpoint. No actual abort callback will happen
@ -140,8 +132,7 @@ static void on_read(void *tcpp, int from_iocp) {
gpr_slice_unref(tcp->read_slice); gpr_slice_unref(tcp->read_slice);
} }
tcp_unref(tcp); tcp_unref(tcp);
if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); return 0;
return;
} }
GPR_ASSERT(tcp->socket->read_info.outstanding); GPR_ASSERT(tcp->socket->read_info.outstanding);
@ -152,27 +143,33 @@ static void on_read(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
} }
status = GRPC_ENDPOINT_CB_ERROR; success = 0;
} else { } else {
if (info->bytes_transfered != 0) { if (info->bytes_transfered != 0) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
status = GRPC_ENDPOINT_CB_OK; gpr_slice_buffer_add(tcp->read_slices, sub);
slice = ⊂ success = 1;
nslices = 1;
} else { } else {
gpr_slice_unref(tcp->read_slice); gpr_slice_unref(tcp->read_slice);
status = GRPC_ENDPOINT_CB_EOF; success = 0;
} }
} }
tcp->socket->read_info.outstanding = 0; tcp->socket->read_info.outstanding = 0;
return success;
}
static void on_read_cb(void *tcpp, int from_iocp) {
grpc_tcp *tcp = tcpp;
grpc_iomgr_closure *cb = tcp->read_cb;
int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL;
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, slice, nslices, status); cb->cb(cb->cb_arg, success);
} }
static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, static grpc_endpoint_op_status win_read(grpc_endpoint *ep, gpr_slice_buffer *read_slices, grpc_iomgr_closure *cb) {
void *arg) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket; grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info; grpc_winsocket_callback_info *info = &handle->read_info;
@ -183,13 +180,11 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
GPR_ASSERT(!tcp->socket->read_info.outstanding); GPR_ASSERT(!tcp->socket->read_info.outstanding);
if (tcp->shutting_down) { if (tcp->shutting_down) {
cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); return GRPC_ENDPOINT_ERROR;
return;
} }
tcp_ref(tcp);
tcp->socket->read_info.outstanding = 1; tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb; tcp->read_cb = cb;
tcp->read_user_data = arg; tcp->read_slices = read_slices;
tcp->read_slice = gpr_slice_malloc(8192); tcp->read_slice = gpr_slice_malloc(8192);
@ -204,9 +199,8 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
/* Did we get data immediately ? Yay. */ /* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read; info->bytes_transfered = bytes_read;
/* This might heavily recurse. */ gpr_log(GPR_DEBUG, "immread: %d bytes", bytes_read);
on_read(tcp, 1); return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
return;
} }
/* Otherwise, let's retry, by queuing a read. */ /* Otherwise, let's retry, by queuing a read. */
@ -218,12 +212,14 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
int wsa_error = WSAGetLastError(); int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) { if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error; info->wsa_error = wsa_error;
on_read(tcp, 1); gpr_log(GPR_DEBUG, "immread: err=%d", wsa_error);
return; return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
} }
} }
grpc_socket_notify_on_read(tcp->socket, on_read, tcp); tcp_ref(tcp);
grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
return GRPC_ENDPOINT_PENDING;
} }
/* Asynchronous callback from the IOCP, or the background thread. */ /* Asynchronous callback from the IOCP, or the background thread. */
@ -231,9 +227,8 @@ static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket; grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info; grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; grpc_iomgr_closure *cb;
grpc_endpoint_write_cb cb; int success;
void *opaque = tcp->write_user_data;
int do_abort = 0; int do_abort = 0;
gpr_mu_lock(&tcp->mu); gpr_mu_lock(&tcp->mu);
@ -250,10 +245,11 @@ static void on_write(void *tcpp, int from_iocp) {
if (do_abort) { if (do_abort) {
if (from_iocp) { if (from_iocp) {
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
} }
tcp_unref(tcp); tcp_unref(tcp);
if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); if (cb) {
cb->cb(cb->cb_arg, 0);
}
return; return;
} }
@ -265,23 +261,22 @@ static void on_write(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
} }
status = GRPC_ENDPOINT_CB_ERROR; success = 0;
} else { } else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
success = 1;
} }
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, status); cb->cb(cb->cb_arg, success);
} }
/* Initiates a write. */ /* Initiates a write. */
static grpc_endpoint_write_status win_write(grpc_endpoint *ep, static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices, gpr_slice_buffer *slices,
grpc_endpoint_write_cb cb, grpc_iomgr_closure *cb) {
void *arg) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket; grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info; grpc_winsocket_callback_info *info = &socket->write_info;
@ -294,28 +289,26 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
GPR_ASSERT(!tcp->socket->write_info.outstanding); GPR_ASSERT(!tcp->socket->write_info.outstanding);
if (tcp->shutting_down) { if (tcp->shutting_down) {
return GRPC_ENDPOINT_WRITE_ERROR; return GRPC_ENDPOINT_ERROR;
} }
tcp_ref(tcp); tcp_ref(tcp);
tcp->socket->write_info.outstanding = 1; tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb; tcp->write_cb = cb;
tcp->write_user_data = arg; tcp->write_slices = slices;
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
allocated = buffers; allocated = buffers;
} }
for (i = 0; i < tcp->write_slices.count; i++) { for (i = 0; i < tcp->write_slices->count; i++) {
buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]); buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
} }
/* First, let's try a synchronous, non-blocking write. */ /* First, let's try a synchronous, non-blocking write. */
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices->count,
&bytes_sent, 0, NULL, NULL); &bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError(); info->wsa_error = status == 0 ? 0 : WSAGetLastError();
@ -323,10 +316,10 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */ avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
if (status == 0) { if (status == 0) {
ret = GRPC_ENDPOINT_WRITE_DONE; ret = GRPC_ENDPOINT_DONE;
GPR_ASSERT(bytes_sent == tcp->write_slices.length); GPR_ASSERT(bytes_sent == tcp->write_slices->length);
} else { } else {
if (socket->read_info.wsa_error != WSAECONNRESET) { if (socket->read_info.wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error); char *utf8_message = gpr_format_message(info->wsa_error);
@ -335,7 +328,6 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
} }
} }
if (allocated) gpr_free(allocated); if (allocated) gpr_free(allocated);
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); tcp_unref(tcp);
return ret; return ret;
@ -344,24 +336,23 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */ operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices->count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL); &bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated); if (allocated) gpr_free(allocated);
if (status != 0) { if (status != 0) {
int wsa_error = WSAGetLastError(); int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) { if (wsa_error != WSA_IO_PENDING) {
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); tcp_unref(tcp);
return GRPC_ENDPOINT_WRITE_ERROR; return GRPC_ENDPOINT_ERROR;
} }
} }
/* As all is now setup, we can now ask for the IOCP notification. It may /* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */ trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp); grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING; return GRPC_ENDPOINT_PENDING;
} }
static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
@ -407,7 +398,7 @@ static char *win_get_peer(grpc_endpoint *ep) {
} }
static grpc_endpoint_vtable vtable = { static grpc_endpoint_vtable vtable = {
win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set, win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
win_shutdown, win_destroy, win_get_peer}; win_shutdown, win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
@ -416,7 +407,6 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
tcp->socket = socket; tcp->socket = socket;
gpr_mu_init(&tcp->mu); gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string); tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base; return &tcp->base;

@ -331,8 +331,6 @@ struct grpc_chttp2_transport {
/** closure to execute writing */ /** closure to execute writing */
grpc_iomgr_closure writing_action; grpc_iomgr_closure writing_action;
/** closure to start reading from the endpoint */
grpc_iomgr_closure reading_action;
/** closure to finish reading from the endpoint */ /** closure to finish reading from the endpoint */
grpc_iomgr_closure recv_data; grpc_iomgr_closure recv_data;

@ -84,7 +84,6 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */ /* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored); static void writing_action(void *t, int iomgr_success_ignored);
static void reading_action(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */ /** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
@ -249,7 +248,6 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf); gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
grpc_iomgr_closure_init(&t->writing_action, writing_action, t); grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf); gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@ -1065,10 +1063,9 @@ static void read_error_locked(grpc_chttp2_transport *t) {
} }
/* tcp read callback */ /* tcp read callback */
static void recv_data(void *tp, int success) { static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
grpc_chttp2_transport *t = tp;
size_t i; size_t i;
int unref = 0; int keep_reading = 0;
lock(t); lock(t);
i = 0; i = 0;
@ -1077,12 +1074,12 @@ static void recv_data(void *tp, int success) {
t->parsing_active = 1; t->parsing_active = 1;
/* merge stream lists */ /* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map); &t->parsing_stream_map);
grpc_chttp2_prepare_to_read(&t->global, &t->parsing); grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
for (; i < t->read_buffer.count && for (; i < t->read_buffer.count &&
grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
i++) i++)
; ;
gpr_mu_lock(&t->mu); gpr_mu_lock(&t->mu);
if (i != t->read_buffer.count) { if (i != t->read_buffer.count) {
@ -1090,48 +1087,53 @@ static void recv_data(void *tp, int success) {
} }
/* merge stream lists */ /* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map); &t->parsing_stream_map);
t->global.concurrent_stream_count = t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map); grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (t->parsing.initial_window_update != 0) { if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t); update_global_window, t);
t->parsing.initial_window_update = 0; t->parsing.initial_window_update = 0;
} }
/* handle higher level things */ /* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing); grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0; t->parsing_active = 0;
} }
if (!success) { if (!*success || i != t->read_buffer.count) {
drop_connection(t); drop_connection(t);
read_error_locked(t); read_error_locked(t);
unref = 1; }
} else if (i == t->read_buffer.count) { else {
grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); keep_reading = 1;
} else {
read_error_locked(t);
unref = 1;
} }
gpr_slice_buffer_reset_and_unref(&t->read_buffer); gpr_slice_buffer_reset_and_unref(&t->read_buffer);
unlock(t); unlock(t);
if (unref) { if (keep_reading) {
UNREF_TRANSPORT(t, "recv_data"); switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
}
}
static void reading_action(void *pt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = pt;
switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
case GRPC_ENDPOINT_DONE: case GRPC_ENDPOINT_DONE:
recv_data(t, 1); *success = 1;
break; return 1;
case GRPC_ENDPOINT_ERROR: case GRPC_ENDPOINT_ERROR:
recv_data(t, 0); *success = 0;
break; return 1;
case GRPC_ENDPOINT_PENDING: case GRPC_ENDPOINT_PENDING:
break; return 0;
}
}
else {
UNREF_TRANSPORT(t, "recv_data");
return 0;
} }
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
static void recv_data(void *tp, int success) {
grpc_chttp2_transport *t = tp;
while (recv_data_loop(t, &success));
} }
/* /*

Loading…
Cancel
Save