diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2d06715571d..f9733afaafd 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1754,7 +1754,7 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { return; } - gpr_log(GPR_DEBUG, "%s: %d %" PRId64, t->peer_string, bdp, delta); + gpr_log(GPR_DEBUG, "%s [%p]: %d %" PRId64, t->peer_string, t, bdp, delta); if (delta < 0) { t->retract_incoming_window += -delta; } else if (delta <= t->retract_incoming_window) { @@ -1811,12 +1811,6 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, return error; } -static double memory_pressure_to_error(double memory_pressure) { - if (memory_pressure < 0.8) return 0; - return (1.0 - memory_pressure) * 5 /* 1/0.2 */ * - 4096 /* arbitrary scale factor */; -} - static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { GPR_TIMER_BEGIN("reading_action_locked", 0); @@ -1903,50 +1897,59 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } int64_t estimate; + double bdp_error = 0.0; if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); - double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; - if (dt > 3) { - grpc_pid_controller_reset(&t->pid_controller); - } - t->bdp_guess += grpc_pid_controller_update( - &t->pid_controller, - 2.0 * (double)estimate - t->bdp_guess - - memory_pressure_to_error(grpc_resource_quota_get_memory_pressure( - grpc_endpoint_get_resource_user(t->ep)->resource_quota)), - dt); - update_bdp(exec_ctx, t, t->bdp_guess); - if (0) - gpr_log(GPR_DEBUG, "bdp guess %s: %lf (est=%" PRId64 " dt=%lf int=%lf)", - t->peer_string, t->bdp_guess, estimate, dt, - t->pid_controller.error_integral); - t->last_pid_update = now; - - /* - gpr_log( - GPR_DEBUG, "%s BDP estimate: %" PRId64 - " (%d %d) [%d %d %d %d %d %d %d %d %d %d %d %d %d %d %d - %d]", - t->peer_string, estimate, t->bdp_estimator.first_sample_idx, - t->bdp_estimator.num_samples, (int)t->bdp_estimator.samples[0], - (int)t->bdp_estimator.samples[1], - (int)t->bdp_estimator.samples[2], - (int)t->bdp_estimator.samples[3], - (int)t->bdp_estimator.samples[4], - (int)t->bdp_estimator.samples[5], - (int)t->bdp_estimator.samples[6], - (int)t->bdp_estimator.samples[7], - (int)t->bdp_estimator.samples[8], - (int)t->bdp_estimator.samples[9], - (int)t->bdp_estimator.samples[10], - (int)t->bdp_estimator.samples[11], - (int)t->bdp_estimator.samples[12], - (int)t->bdp_estimator.samples[13], - (int)t->bdp_estimator.samples[14], - (int)t->bdp_estimator.samples[15]); - */ + bdp_error = 2.0 * (double)estimate - t->bdp_guess; + } + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 3) { + grpc_pid_controller_reset(&t->pid_controller); } + double memory_pressure = grpc_resource_quota_get_memory_pressure( + grpc_endpoint_get_resource_user(t->ep)->resource_quota); + if (memory_pressure > 0.8) { + bdp_error = -(memory_pressure - 0.8) * 5 * 32768; + } + if (t->bdp_guess < 1e-6 && bdp_error < 0) { + bdp_error = 0; + } + gpr_log(GPR_DEBUG, "memory_pressure = %lf, error = %lf", memory_pressure, + bdp_error); + t->bdp_guess += + grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); + update_bdp(exec_ctx, t, t->bdp_guess); + if (1) + gpr_log(GPR_DEBUG, + "bdp guess %s: %lf (est=%" PRId64 " dt=%lf err=%lf int=%lf)", + t->peer_string, t->bdp_guess, estimate, dt, + t->pid_controller.last_error, t->pid_controller.error_integral); + t->last_pid_update = now; + + /* + gpr_log( + GPR_DEBUG, "%s BDP estimate: %" PRId64 + " (%d %d) [%d %d %d %d %d %d %d %d %d %d %d %d %d %d %d + %d]", + t->peer_string, estimate, t->bdp_estimator.first_sample_idx, + t->bdp_estimator.num_samples, (int)t->bdp_estimator.samples[0], + (int)t->bdp_estimator.samples[1], + (int)t->bdp_estimator.samples[2], + (int)t->bdp_estimator.samples[3], + (int)t->bdp_estimator.samples[4], + (int)t->bdp_estimator.samples[5], + (int)t->bdp_estimator.samples[6], + (int)t->bdp_estimator.samples[7], + (int)t->bdp_estimator.samples[8], + (int)t->bdp_estimator.samples[9], + (int)t->bdp_estimator.samples[10], + (int)t->bdp_estimator.samples[11], + (int)t->bdp_estimator.samples[12], + (int)t->bdp_estimator.samples[13], + (int)t->bdp_estimator.samples[14], + (int)t->bdp_estimator.samples[15]); + */ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { @@ -2036,6 +2039,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, max_recv_bytes = (uint32_t)max_size_hint; } + uint32_t v1 = max_recv_bytes; + /* account for bytes already received but unknown to higher layers */ if (max_recv_bytes >= have_already) { max_recv_bytes -= (uint32_t)have_already; @@ -2043,6 +2048,14 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, max_recv_bytes = 0; } + gpr_log(GPR_DEBUG, + "update_flow_control %s s->id=%d hint=%" PRIdPTR " have=%" PRIdPTR + " mrb0=%d mrb1=%d, iwd=%" PRId64 " add=%d cur_retract=%d twin=%d", + t->is_client ? "CLI" : "SVR", s->id, max_size_hint, have_already, v1, + max_recv_bytes, s->incoming_window_delta, + (uint32_t)(max_recv_bytes - s->incoming_window_delta), + (int)t->retract_incoming_window, (int)t->incoming_window); + /* add some small lookahead to keep pipelines flowing */ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); if (s->incoming_window_delta < max_recv_bytes) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index ee94f274f8b..129c7376dd8 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1072,6 +1072,10 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, for (;;) { size_t remaining = call->receiving_stream->length - (*call->receiving_buffer)->data.raw.slice_buffer.length; + gpr_log(GPR_DEBUG, "%p len=%d, have=%d, rem=%d", bctl, + (int)call->receiving_stream->length, + (int)(*call->receiving_buffer)->data.raw.slice_buffer.length, + (int)remaining); if (remaining == 0) { call->receiving_message = 0; grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index a2431eed7e5..d501e2dcc3f 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -112,11 +112,13 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_resource_quota_create("test_server"); grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024); -#define NUM_CALLS 100 +#define NUM_CALLS 10 #define CLIENT_BASE_TAG 1000 #define SERVER_START_BASE_TAG 2000 #define SERVER_RECV_BASE_TAG 3000 #define SERVER_END_BASE_TAG 4000 +#define NUM_ROUNDS 1 +#define MAX_READING 4 grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; @@ -132,132 +134,60 @@ void resource_quota_server(grpc_end2end_test_config config) { * multiple round trips to deliver to the peer, and their exact contents of * will be verified on completion. */ gpr_slice request_payload_slice = generate_random_slice(); - - grpc_call *client_calls[NUM_CALLS]; - grpc_call *server_calls[NUM_CALLS]; - grpc_metadata_array initial_metadata_recv[NUM_CALLS]; - grpc_metadata_array trailing_metadata_recv[NUM_CALLS]; - grpc_metadata_array request_metadata_recv[NUM_CALLS]; - grpc_call_details call_details[NUM_CALLS]; - grpc_status_code status[NUM_CALLS]; - char *details[NUM_CALLS]; - size_t details_capacity[NUM_CALLS]; - grpc_byte_buffer *request_payload_recv[NUM_CALLS]; - int was_cancelled[NUM_CALLS]; - grpc_call_error error; - int pending_client_calls = 0; - int pending_server_start_calls = 0; - int pending_server_recv_calls = 0; - int pending_server_end_calls = 0; - int cancelled_calls_on_client = 0; - int cancelled_calls_on_server = 0; - grpc_byte_buffer *request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_op ops[6]; - grpc_op *op; - - for (int i = 0; i < NUM_CALLS; i++) { - grpc_metadata_array_init(&initial_metadata_recv[i]); - grpc_metadata_array_init(&trailing_metadata_recv[i]); - grpc_metadata_array_init(&request_metadata_recv[i]); - grpc_call_details_init(&call_details[i]); - details[i] = NULL; - details_capacity[i] = 0; - request_payload_recv[i] = NULL; - was_cancelled[i] = 0; - } - - for (int i = 0; i < NUM_CALLS; i++) { - error = grpc_server_request_call( - f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i], - f.cq, f.cq, tag(SERVER_START_BASE_TAG + i)); - GPR_ASSERT(GRPC_CALL_OK == error); - - pending_server_start_calls++; - } - - for (int i = 0; i < NUM_CALLS; i++) { - client_calls[i] = grpc_channel_create_call( - f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", - "foo.test.google.fr", n_seconds_time(60), NULL); - - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = 0; - op->reserved = NULL; - op++; - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = request_payload; - op->flags = 0; - op->reserved = NULL; - op++; - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op->flags = 0; - op->reserved = NULL; - op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &initial_metadata_recv[i]; - op->flags = 0; - op->reserved = NULL; - op++; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = - &trailing_metadata_recv[i]; - op->data.recv_status_on_client.status = &status[i]; - op->data.recv_status_on_client.status_details = &details[i]; - op->data.recv_status_on_client.status_details_capacity = - &details_capacity[i]; - op->flags = 0; - op->reserved = NULL; - op++; - error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops), - tag(CLIENT_BASE_TAG + i), NULL); - GPR_ASSERT(GRPC_CALL_OK == error); - - pending_client_calls++; - } + for (int r = 0; r < NUM_ROUNDS; r++) { + grpc_call *client_calls[NUM_CALLS]; + grpc_call *server_calls[NUM_CALLS]; + grpc_metadata_array initial_metadata_recv[NUM_CALLS]; + grpc_metadata_array trailing_metadata_recv[NUM_CALLS]; + grpc_metadata_array request_metadata_recv[NUM_CALLS]; + grpc_call_details call_details[NUM_CALLS]; + grpc_status_code status[NUM_CALLS]; + char *details[NUM_CALLS]; + size_t details_capacity[NUM_CALLS]; + grpc_byte_buffer *request_payload_recv[NUM_CALLS]; + int was_cancelled[NUM_CALLS]; + grpc_call_error error; + int pending_client_calls = 0; + int pending_server_start_calls = 0; + int pending_server_recv_calls = 0; + int pending_server_end_calls = 0; + int cancelled_calls_on_client = 0; + int cancelled_calls_on_server = 0; + int num_ready_for_reading_on_server = 0; + int num_currently_reading_on_server = 0; + int ready_for_reading[NUM_CALLS]; + + grpc_op ops[6]; + grpc_op *op; + + for (int i = 0; i < NUM_CALLS; i++) { + grpc_metadata_array_init(&initial_metadata_recv[i]); + grpc_metadata_array_init(&trailing_metadata_recv[i]); + grpc_metadata_array_init(&request_metadata_recv[i]); + grpc_call_details_init(&call_details[i]); + details[i] = NULL; + details_capacity[i] = 0; + request_payload_recv[i] = NULL; + was_cancelled[i] = 0; + } - while (pending_client_calls + pending_server_recv_calls + - pending_server_end_calls > - 0) { - grpc_event ev = grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL); - GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - - int ev_tag = (int)(intptr_t)ev.tag; - if (ev_tag < CLIENT_BASE_TAG) { - abort(); /* illegal tag */ - } else if (ev_tag < SERVER_START_BASE_TAG) { - /* client call finished */ - int call_id = ev_tag - CLIENT_BASE_TAG; - GPR_ASSERT(call_id >= 0); - GPR_ASSERT(call_id < NUM_CALLS); - switch (status[call_id]) { - case GRPC_STATUS_RESOURCE_EXHAUSTED: - cancelled_calls_on_client++; - break; - case GRPC_STATUS_OK: - break; - default: - gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]); - abort(); - } - GPR_ASSERT(pending_client_calls > 0); + for (int i = 0; i < NUM_CALLS; i++) { + error = + grpc_server_request_call(f.server, &server_calls[i], &call_details[i], + &request_metadata_recv[i], f.cq, f.cq, + tag(SERVER_START_BASE_TAG + i)); + GPR_ASSERT(GRPC_CALL_OK == error); - grpc_metadata_array_destroy(&initial_metadata_recv[call_id]); - grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]); - grpc_call_destroy(client_calls[call_id]); - gpr_free(details[call_id]); + pending_server_start_calls++; + } - pending_client_calls--; - } else if (ev_tag < SERVER_RECV_BASE_TAG) { - /* new incoming call to the server */ - int call_id = ev_tag - SERVER_START_BASE_TAG; - GPR_ASSERT(call_id >= 0); - GPR_ASSERT(call_id < NUM_CALLS); + for (int i = 0; i < NUM_CALLS; i++) { + client_calls[i] = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", + "foo.test.google.fr", n_seconds_time(60), NULL); memset(ops, 0, sizeof(ops)); op = ops; @@ -266,81 +196,174 @@ void resource_quota_server(grpc_end2end_test_config config) { op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &request_payload_recv[call_id]; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; op->flags = 0; op->reserved = NULL; op++; - error = - grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops), - tag(SERVER_RECV_BASE_TAG + call_id), NULL); - GPR_ASSERT(GRPC_CALL_OK == error); - - GPR_ASSERT(pending_server_start_calls > 0); - pending_server_start_calls--; - pending_server_recv_calls++; - - grpc_call_details_destroy(&call_details[call_id]); - grpc_metadata_array_destroy(&request_metadata_recv[call_id]); - } else if (ev_tag < SERVER_END_BASE_TAG) { - /* finished read on the server */ - int call_id = ev_tag - SERVER_RECV_BASE_TAG; - GPR_ASSERT(call_id >= 0); - GPR_ASSERT(call_id < NUM_CALLS); - - if (ev.success) { - if (request_payload_recv[call_id] != NULL) { - grpc_byte_buffer_destroy(request_payload_recv[call_id]); - request_payload_recv[call_id] = NULL; - } - } else { - GPR_ASSERT(request_payload_recv[call_id] == NULL); - } - - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled[call_id]; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op->data.send_status_from_server.trailing_metadata_count = 0; - op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = "xyz"; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv[i]; op->flags = 0; op->reserved = NULL; op++; - error = - grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops), - tag(SERVER_END_BASE_TAG + call_id), NULL); + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &trailing_metadata_recv[i]; + op->data.recv_status_on_client.status = &status[i]; + op->data.recv_status_on_client.status_details = &details[i]; + op->data.recv_status_on_client.status_details_capacity = + &details_capacity[i]; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops), + tag(CLIENT_BASE_TAG + i), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - GPR_ASSERT(pending_server_recv_calls > 0); - pending_server_recv_calls--; - pending_server_end_calls++; - } else { - int call_id = ev_tag - SERVER_END_BASE_TAG; - GPR_ASSERT(call_id >= 0); - GPR_ASSERT(call_id < NUM_CALLS); + pending_client_calls++; + } - if (was_cancelled[call_id]) { - cancelled_calls_on_server++; + while (pending_client_calls + pending_server_recv_calls + + pending_server_end_calls > + 0) { + gpr_log(GPR_DEBUG, "cur=%d ready=%d", num_currently_reading_on_server, + num_ready_for_reading_on_server); + while (num_currently_reading_on_server < MAX_READING && + num_ready_for_reading_on_server > 0) { + int call_id = ready_for_reading[--num_ready_for_reading_on_server]; + + gpr_log(GPR_DEBUG, "start reading %d", call_id); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv[call_id]; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch( + server_calls[call_id], ops, (size_t)(op - ops), + tag(SERVER_RECV_BASE_TAG + call_id), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + num_currently_reading_on_server++; + + gpr_log(GPR_DEBUG, "> cur=%d ready=%d", num_currently_reading_on_server, + num_ready_for_reading_on_server); } - GPR_ASSERT(pending_server_end_calls > 0); - pending_server_end_calls--; - grpc_call_destroy(server_calls[call_id]); - } - } + grpc_event ev = + grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + + int ev_tag = (int)(intptr_t)ev.tag; + if (ev_tag < CLIENT_BASE_TAG) { + abort(); /* illegal tag */ + } else if (ev_tag < SERVER_START_BASE_TAG) { + /* client call finished */ + int call_id = ev_tag - CLIENT_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + switch (status[call_id]) { + case GRPC_STATUS_RESOURCE_EXHAUSTED: + cancelled_calls_on_client++; + break; + case GRPC_STATUS_OK: + break; + default: + gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]); + abort(); + } + GPR_ASSERT(pending_client_calls > 0); + + grpc_metadata_array_destroy(&initial_metadata_recv[call_id]); + grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]); + grpc_call_destroy(client_calls[call_id]); + gpr_free(details[call_id]); + + pending_client_calls--; + } else if (ev_tag < SERVER_RECV_BASE_TAG) { + /* new incoming call to the server */ + int call_id = ev_tag - SERVER_START_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + ready_for_reading[num_ready_for_reading_on_server++] = call_id; + gpr_log(GPR_DEBUG, "queue read %d", call_id); + + GPR_ASSERT(pending_server_start_calls > 0); + pending_server_start_calls--; + pending_server_recv_calls++; + + grpc_call_details_destroy(&call_details[call_id]); + grpc_metadata_array_destroy(&request_metadata_recv[call_id]); + } else if (ev_tag < SERVER_END_BASE_TAG) { + /* finished read on the server */ + int call_id = ev_tag - SERVER_RECV_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + if (ev.success) { + if (request_payload_recv[call_id] != NULL) { + grpc_byte_buffer_destroy(request_payload_recv[call_id]); + request_payload_recv[call_id] = NULL; + } + } else { + GPR_ASSERT(request_payload_recv[call_id] == NULL); + } - gpr_log( - GPR_INFO, - "Done. %d total calls: %d cancelled at server, %d cancelled at client.", - NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled[call_id]; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(server_calls[call_id], ops, + (size_t)(op - ops), + tag(SERVER_END_BASE_TAG + call_id), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + GPR_ASSERT(pending_server_recv_calls > 0); + pending_server_recv_calls--; + pending_server_end_calls++; + num_currently_reading_on_server--; + } else { + int call_id = ev_tag - SERVER_END_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + if (was_cancelled[call_id]) { + cancelled_calls_on_server++; + } + GPR_ASSERT(pending_server_end_calls > 0); + pending_server_end_calls--; - GPR_ASSERT(cancelled_calls_on_client >= cancelled_calls_on_server); - GPR_ASSERT(cancelled_calls_on_server >= 0.9 * cancelled_calls_on_client); + grpc_call_destroy(server_calls[call_id]); + } + } + + gpr_log( + GPR_INFO, + "Done. %d total calls: %d cancelled at server, %d cancelled at client.", + NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client); + } grpc_byte_buffer_destroy(request_payload); gpr_slice_unref(request_payload_slice);