diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 9709a665bac..aab7fd2ffea 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -65,6 +65,13 @@ void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { bba_push(&q->filling, buffer); } +void grpc_bbq_flush(grpc_byte_buffer_queue *q) { + grpc_byte_buffer *bb; + while ((bb = grpc_bbq_pop(q))) { + grpc_byte_buffer_destroy(bb); + } +} + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 358a42d5af0..3420dc5caba 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -53,6 +53,7 @@ typedef struct { void grpc_bbq_destroy(grpc_byte_buffer_queue *q); grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); +void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index b47d5f4aecb..bcb7c877334 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -297,6 +297,10 @@ static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { call->status[source].is_set = 1; call->status[source].code = status; + + if (status != GRPC_OP_OK) { + grpc_bbq_flush(&call->incoming_queue); + } } static void set_status_details(grpc_call *call, status_source source, diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 3dc34ff5062..7e13021dc8e 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -184,7 +184,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; op++; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(3))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, server_ops, tag(3))); GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));