Add tracing, fix some transport bugs wrt buffer_pools

reviewable/pr8239/r2
Craig Tiller 9 years ago
parent 82509936ae
commit ef6b97659e
  1. 2
      include/grpc/grpc.h
  2. 24
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 66
      src/core/lib/iomgr/buffer_pool.c
  4. 6
      src/core/lib/iomgr/buffer_pool.h
  5. 2
      src/core/lib/iomgr/tcp_client_posix.c
  6. 2
      src/core/lib/iomgr/tcp_posix.c
  7. 2
      src/core/lib/iomgr/tcp_server_posix.c
  8. 3
      src/core/lib/security/credentials/google_default/google_default_credentials.c
  9. 4
      src/core/lib/security/credentials/jwt/jwt_verifier.c
  10. 5
      src/core/lib/security/credentials/oauth2/oauth2_credentials.c
  11. 2
      src/core/lib/surface/init.c
  12. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  13. 228
      test/core/end2end/tests/buffer_pool_server.c
  14. 12
      test/core/util/mock_endpoint.c
  15. 16
      test/core/util/passthru_endpoint.c
  16. 11
      test/core/util/port_server_client.c

@ -402,7 +402,7 @@ GRPCAPI int grpc_is_binary_header(const char *key, size_t length);
GRPCAPI const char *grpc_call_error_to_string(grpc_call_error error);
/** Create a buffer pool */
GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(void);
GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(const char *trace_name);
/** Add a reference to a buffer pool */
GRPCAPI void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool);

@ -257,7 +257,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t);
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@ -2124,10 +2125,21 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
t->peer_string);
}
send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
gpr_slice_from_static_string("Buffers full"));
} else if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG,
"HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
grpc_buffer_user_finish_reclaimation(exec_ctx,
grpc_endpoint_get_buffer_user(t->ep));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
}
@ -2138,18 +2150,20 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
t->destructive_reclaimer_registered = false;
if (error == GRPC_ERROR_NONE && n > 0) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
s->id);
}
grpc_chttp2_cancel_stream(
exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_CHTTP2_ENHANCE_YOUR_CALM));
if (n > 1) {
post_destructive_reclaimer(exec_ctx, t);
t->destructive_reclaimer_registered = true;
grpc_buffer_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_buffer_user(t->ep),
true, &t->destructive_reclaimer);
}
}
grpc_buffer_user_finish_reclaimation(exec_ctx,
grpc_endpoint_get_buffer_user(t->ep));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
}

@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/combiner.h"
@ -62,6 +63,8 @@ struct grpc_buffer_pool {
grpc_closure bpreclaimation_done_closure;
grpc_buffer_user *roots[GRPC_BULIST_COUNT];
char *name;
};
/*******************************************************************************
@ -175,8 +178,18 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
gpr_mu_lock(&buffer_user->mu);
if (buffer_user->free_pool < 0 &&
-buffer_user->free_pool <= buffer_pool->free_pool) {
buffer_pool->free_pool += buffer_user->free_pool;
int64_t amt = -buffer_user->free_pool;
buffer_user->free_pool = 0;
buffer_pool->free_pool -= amt;
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
" bytes; bp_free_pool -> %" PRId64,
buffer_pool->name, buffer_user->name, amt,
buffer_pool->free_pool);
}
} else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) {
gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
buffer_pool->name, buffer_user->name);
}
if (buffer_user->free_pool >= 0) {
buffer_user->allocating = false;
@ -198,8 +211,15 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&buffer_user->mu);
if (buffer_user->free_pool > 0) {
buffer_pool->free_pool += buffer_user->free_pool;
int64_t amt = buffer_user->free_pool;
buffer_user->free_pool = 0;
buffer_pool->free_pool += amt;
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
" bytes; bp_free_pool -> %" PRId64,
buffer_pool->name, buffer_user->name, amt,
buffer_pool->free_pool);
}
gpr_mu_unlock(&buffer_user->mu);
return true;
} else {
@ -217,6 +237,10 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
: GRPC_BULIST_RECLAIMER_BENIGN;
grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list);
if (buffer_user == NULL) return false;
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name,
buffer_user->name, destructive ? "destructive" : "benign");
}
buffer_pool->reclaiming = true;
grpc_closure *c = buffer_user->reclaimers[destructive];
buffer_user->reclaimers[destructive] = NULL;
@ -384,7 +408,7 @@ static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
* grpc_buffer_pool api
*/
grpc_buffer_pool *grpc_buffer_pool_create(void) {
grpc_buffer_pool *grpc_buffer_pool_create(const char *name) {
grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool));
gpr_ref_init(&buffer_pool->refs, 1);
buffer_pool->combiner = grpc_combiner_create(NULL);
@ -392,6 +416,12 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) {
buffer_pool->size = INT64_MAX;
buffer_pool->step_scheduled = false;
buffer_pool->reclaiming = false;
if (name != NULL) {
buffer_pool->name = gpr_strdup(name);
} else {
gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)buffer_pool);
}
grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool);
grpc_closure_init(&buffer_pool->bpreclaimation_done_closure,
bp_reclaimation_done, buffer_pool);
@ -451,7 +481,7 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
}
}
}
return grpc_buffer_pool_create();
return grpc_buffer_pool_create(NULL);
}
static void *bp_copy(void *bp) {
@ -473,7 +503,7 @@ const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) {
*/
void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
grpc_buffer_pool *buffer_pool) {
grpc_buffer_pool *buffer_pool, const char *name) {
buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user);
grpc_closure_init(&buffer_user->add_to_free_pool_closure,
@ -498,6 +528,12 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
#ifndef NDEBUG
buffer_user->asan_canary = gpr_malloc(1);
#endif
if (name != NULL) {
buffer_user->name = gpr_strdup(name);
} else {
gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR,
(intptr_t)buffer_user);
}
}
void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
@ -533,6 +569,10 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
&buffer_user->on_done_destroy_closure);
if (on_done_destroy != NULL) {
/* already shutdown */
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
buffer_user->buffer_pool->name, buffer_user->name, size);
}
grpc_exec_ctx_sched(
exec_ctx, optional_on_done,
GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
@ -541,6 +581,12 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
}
buffer_user->allocated += (int64_t)size;
buffer_user->free_pool -= (int64_t)size;
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
buffer_user->buffer_pool->name, buffer_user->name, size,
buffer_user->allocated, buffer_user->free_pool);
}
if (buffer_user->free_pool < 0) {
grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done,
GRPC_ERROR_NONE);
@ -563,6 +609,12 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
bool was_zero_or_negative = buffer_user->free_pool <= 0;
buffer_user->free_pool += (int64_t)size;
buffer_user->allocated -= (int64_t)size;
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
buffer_user->buffer_pool->name, buffer_user->name, size,
buffer_user->allocated, buffer_user->free_pool);
}
bool is_bigger_than_zero = buffer_user->free_pool > 0;
if (is_bigger_than_zero && was_zero_or_negative &&
!buffer_user->added_to_free_pool) {
@ -597,6 +649,10 @@ void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user) {
if (grpc_buffer_pool_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
buffer_user->buffer_pool->name, buffer_user->name);
}
grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
&buffer_user->buffer_pool->bpreclaimation_done_closure,
GRPC_ERROR_NONE, false);

@ -38,6 +38,8 @@
#include "src/core/lib/iomgr/exec_ctx.h"
extern int grpc_buffer_pool_trace;
grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool);
void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
grpc_buffer_pool *buffer_pool);
@ -83,10 +85,12 @@ struct grpc_buffer_user {
gpr_atm on_done_destroy_closure;
grpc_buffer_user_link links[GRPC_BULIST_COUNT];
char *name;
};
void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
grpc_buffer_pool *buffer_pool);
grpc_buffer_pool *buffer_pool, const char *name);
void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user,
grpc_closure *on_done);

@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
const char *addr_str) {
size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==

@ -544,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
grpc_buffer_user_init(&tcp->buffer_user, buffer_pool);
grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string);
grpc_buffer_user_slice_allocator_init(
&tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */

@ -163,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->buffer_pool = grpc_buffer_pool_create();
s->buffer_pool = grpc_buffer_pool_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {

@ -124,7 +124,8 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool =
grpc_buffer_pool_create("google_default_credentials");
grpc_httpcli_get(
&exec_ctx, &context, &detector.pollent, buffer_pool, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),

@ -660,7 +660,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
@ -772,7 +772,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),

@ -310,7 +310,7 @@ static void compute_engine_fetch_oauth2(
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("oauth2_credentials");
grpc_httpcli_get(exec_ctx, httpcli_context, pollent, buffer_pool, &request,
deadline, grpc_closure_create(response_cb, metadata_req),
&metadata_req->response);
@ -365,7 +365,8 @@ static void refresh_token_fetch_oauth2(
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool =
grpc_buffer_pool_create("oauth2_credentials_refresh");
grpc_httpcli_post(exec_ctx, httpcli_context, pollent, buffer_pool, &request,
body, strlen(body), deadline,
grpc_closure_create(response_cb, metadata_req),

@ -48,6 +48,7 @@
#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/buffer_pool.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@ -184,6 +185,7 @@ void grpc_init(void) {
// Default timeout trace to 1
grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
grpc_register_tracer("buffer_pool", &grpc_buffer_pool_trace);
#ifndef NDEBUG
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif

@ -347,7 +347,7 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(void);
typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(const char *trace_name);
extern grpc_buffer_pool_create_type grpc_buffer_pool_create_import;
#define grpc_buffer_pool_create grpc_buffer_pool_create_import
typedef void(*grpc_buffer_pool_ref_type)(grpc_buffer_pool *buffer_pool);

@ -95,9 +95,235 @@ static void end_test(grpc_end2end_test_fixture *f) {
grpc_completion_queue_destroy(f->cq);
}
/* Creates and returns a gpr_slice containing random alphanumeric characters. */
static gpr_slice generate_random_slice() {
size_t i;
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char output[1024 * 1024];
for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
}
output[GPR_ARRAY_SIZE(output) - 1] = '\0';
return gpr_slice_from_copied_string(output);
}
void buffer_pool_server(grpc_end2end_test_config config) {
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("test_server");
grpc_buffer_pool_resize(buffer_pool, 5 * 1024 * 1024);
#define NUM_CALLS 100
#define CLIENT_BASE_TAG 1000
#define SERVER_START_BASE_TAG 2000
#define SERVER_RECV_BASE_TAG 3000
#define SERVER_END_BASE_TAG 4000
grpc_arg arg;
arg.key = GRPC_ARG_BUFFER_POOL;
arg.type = GRPC_ARG_POINTER;
arg.value.pointer.p = buffer_pool;
arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable();
grpc_channel_args args = {1, &arg};
grpc_end2end_test_fixture f =
begin_test(config, "buffer_pool_server", NULL, NULL);
begin_test(config, "buffer_pool_server", NULL, &args);
/* Create large request and response bodies. These are big enough to require
* multiple round trips to deliver to the peer, and their exact contents of
* will be verified on completion. */
gpr_slice request_payload_slice = generate_random_slice();
grpc_call *client_calls[NUM_CALLS];
grpc_call *server_calls[NUM_CALLS];
grpc_metadata_array initial_metadata_recv[NUM_CALLS];
grpc_metadata_array trailing_metadata_recv[NUM_CALLS];
grpc_metadata_array request_metadata_recv[NUM_CALLS];
grpc_call_details call_details[NUM_CALLS];
grpc_status_code status[NUM_CALLS];
char *details[NUM_CALLS];
size_t details_capacity[NUM_CALLS];
grpc_byte_buffer *request_payload_recv[NUM_CALLS];
int was_cancelled[NUM_CALLS];
grpc_call_error error;
int pending_client_calls = 0;
int pending_server_start_calls = 0;
int pending_server_recv_calls = 0;
int pending_server_end_calls = 0;
int cancelled_calls_on_client = 0;
int cancelled_calls_on_server = 0;
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_op ops[6];
grpc_op *op;
for (int i = 0; i < NUM_CALLS; i++) {
grpc_metadata_array_init(&initial_metadata_recv[i]);
grpc_metadata_array_init(&trailing_metadata_recv[i]);
grpc_metadata_array_init(&request_metadata_recv[i]);
grpc_call_details_init(&call_details[i]);
details[i] = NULL;
details_capacity[i] = 0;
request_payload_recv[i] = NULL;
was_cancelled[i] = 0;
}
for (int i = 0; i < NUM_CALLS; i++) {
error = grpc_server_request_call(
f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i],
f.cq, f.cq, tag(SERVER_START_BASE_TAG + i));
GPR_ASSERT(GRPC_CALL_OK == error);
pending_server_start_calls++;
}
for (int i = 0; i < NUM_CALLS; i++) {
client_calls[i] = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo",
"foo.test.google.fr", n_seconds_time(60), NULL);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv[i];
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&trailing_metadata_recv[i];
op->data.recv_status_on_client.status = &status[i];
op->data.recv_status_on_client.status_details = &details[i];
op->data.recv_status_on_client.status_details_capacity =
&details_capacity[i];
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops),
tag(CLIENT_BASE_TAG + i), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
pending_client_calls++;
}
while (pending_client_calls + pending_server_recv_calls +
pending_server_end_calls >
0) {
gpr_log(GPR_DEBUG,
"pending: client_calls=%d server_start_calls=%d "
"server_recv_calls=%d server_end_calls=%d",
pending_client_calls, pending_server_start_calls,
pending_server_recv_calls, pending_server_end_calls);
grpc_event ev = grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
int ev_tag = (int)(intptr_t)ev.tag;
if (ev_tag < CLIENT_BASE_TAG) {
abort(); /* illegal tag */
} else if (ev_tag < SERVER_START_BASE_TAG) {
/* client call finished */
int call_id = ev_tag - CLIENT_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
switch (status[call_id]) {
case GRPC_STATUS_RESOURCE_EXHAUSTED:
cancelled_calls_on_client++;
break;
case GRPC_STATUS_OK:
break;
default:
gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]);
abort();
}
GPR_ASSERT(pending_client_calls > 0);
pending_client_calls--;
} else if (ev_tag < SERVER_RECV_BASE_TAG) {
/* new incoming call to the server */
int call_id = ev_tag - SERVER_START_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv[call_id];
op->flags = 0;
op->reserved = NULL;
op++;
error =
grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
tag(SERVER_RECV_BASE_TAG + call_id), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
GPR_ASSERT(pending_server_start_calls > 0);
pending_server_start_calls--;
pending_server_recv_calls++;
} else if (ev_tag < SERVER_END_BASE_TAG) {
/* finished read on the server */
int call_id = ev_tag - SERVER_RECV_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled[call_id];
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op->reserved = NULL;
op++;
error =
grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
tag(SERVER_END_BASE_TAG + call_id), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
GPR_ASSERT(pending_server_recv_calls > 0);
pending_server_recv_calls--;
pending_server_end_calls++;
} else {
int call_id = ev_tag - SERVER_END_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
if (was_cancelled[call_id]) {
cancelled_calls_on_server++;
}
GPR_ASSERT(pending_server_end_calls > 0);
pending_server_end_calls--;
}
}
gpr_log(
GPR_INFO,
"Done. %d total calls: %d cancelled at server, %d cancelled at client.",
NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client);
end_test(&f);
config.tear_down_data(&f);
}

@ -33,6 +33,8 @@
#include "test/core/util/mock_endpoint.h"
#include <inttypes.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@ -88,7 +90,8 @@ static void unref(grpc_exec_ctx *exec_ctx, grpc_mock_endpoint *m) {
}
}
static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me, grpc_error *error) {
static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me,
grpc_error *error) {
grpc_mock_endpoint *m = me;
unref(exec_ctx, m);
}
@ -108,7 +111,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
unref(exec_ctx,m);
unref(exec_ctx, m);
}
static char *me_get_peer(grpc_endpoint *ep) {
@ -139,7 +142,10 @@ grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice),
grpc_mock_endpoint *m = gpr_malloc(sizeof(*m));
m->base.vtable = &vtable;
m->refs = 2;
grpc_buffer_user_init(&m->buffer_user, buffer_pool);
char *name;
gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m);
grpc_buffer_user_init(&m->buffer_user, buffer_pool, name);
gpr_free(name);
gpr_slice_buffer_init(&m->read_buffer);
gpr_mu_init(&m->mu);
m->on_write = on_write;

@ -33,6 +33,8 @@
#include "test/core/util/passthru_endpoint.h"
#include <inttypes.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@ -141,7 +143,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
half *m = (half *)ep;
grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user,
grpc_closure_create(me_really_destroy, m));
grpc_closure_create(me_really_destroy, m));
}
static char *me_get_peer(grpc_endpoint *ep) {
@ -168,12 +170,16 @@ static const grpc_endpoint_vtable vtable = {
};
static void half_init(half *m, passthru_endpoint *parent,
grpc_buffer_pool *buffer_pool) {
grpc_buffer_pool *buffer_pool, const char *half_name) {
m->base.vtable = &vtable;
m->parent = parent;
gpr_slice_buffer_init(&m->read_buffer);
m->on_read = NULL;
grpc_buffer_user_init(&m->buffer_user, buffer_pool);
char *name;
gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
(intptr_t)parent);
grpc_buffer_user_init(&m->buffer_user, buffer_pool, name);
gpr_free(name);
}
void grpc_passthru_endpoint_create(grpc_endpoint **client,
@ -182,8 +188,8 @@ void grpc_passthru_endpoint_create(grpc_endpoint **client,
passthru_endpoint *m = gpr_malloc(sizeof(*m));
m->halves = 2;
m->shutdown = 0;
half_init(&m->client, m, buffer_pool);
half_init(&m->server, m, buffer_pool);
half_init(&m->client, m, buffer_pool, "client");
half_init(&m->server, m, buffer_pool, "server");
gpr_mu_init(&m->mu);
*client = &m->client.base;
*server = &m->server.base;

@ -49,6 +49,8 @@
#include "src/core/lib/http/httpcli.h"
int grpc_buffer_pool_trace = 0;
typedef struct freereq {
gpr_mu *mu;
grpc_polling_entity pops;
@ -99,7 +101,8 @@ void grpc_free_port_using_server(char *server, int port) {
req.http.path = path;
grpc_httpcli_context_init(&context);
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool =
grpc_buffer_pool_create("port_server_client/free");
grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(freed_port_from_server, &pr), &rsp);
@ -169,7 +172,8 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
req.http.path = "/get";
grpc_http_response_destroy(&pr->response);
memset(&pr->response, 0, sizeof(pr->response));
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool =
grpc_buffer_pool_create("port_server_client/pick_retry");
grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(got_port_from_server, pr),
@ -215,7 +219,8 @@ int grpc_pick_port_using_server(char *server) {
req.http.path = "/get";
grpc_httpcli_context_init(&context);
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_buffer_pool *buffer_pool =
grpc_buffer_pool_create("port_server_client/pick");
grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(got_port_from_server, &pr),

Loading…
Cancel
Save