diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 19630c9b544..a58e9872e60 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -35,6 +35,7 @@ #define GRPCXX_CLIENT_CONTEXT_H #include +#include #include #include @@ -126,9 +127,10 @@ class ClientContext { friend class ::grpc::ClientAsyncResponseReader; grpc_call* call() { return call_; } - void set_call(grpc_call* call) { + void set_call(grpc_call* call, const std::shared_ptr& channel) { GPR_ASSERT(call_ == nullptr); call_ = call; + channel_ = channel; } grpc_completion_queue* cq() { return cq_; } @@ -137,6 +139,7 @@ class ClientContext { grpc::string authority() { return authority_; } bool initial_metadata_received_; + std::shared_ptr channel_; grpc_call* call_; grpc_completion_queue* cq_; gpr_timespec deadline_; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 24f4a05071a..f3c2453b5e7 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -401,7 +401,9 @@ static void on_pollset_destroy_done(void *arg) { } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(cc->queue == NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9f9005691b7..26c550c1f17 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1890,13 +1890,22 @@ static void patch_metadata_ops(stream *s) { size_t j; size_t mdidx = 0; size_t last_mdidx; + int found_metadata = 0; + /* rework the array of metadata into a linked list, making use + of the breadcrumbs we left in metadata batches during + add_metadata_batch */ for (i = 0; i < nops; i++) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; + found_metadata = 1; + /* we left a breadcrumb indicating where the end of this list is, + and since we add sequentially, we know from the end of the last + segment where this segment begins */ last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); GPR_ASSERT(last_mdidx > mdidx); GPR_ASSERT(last_mdidx <= s->incoming_metadata_count); + /* turn the array into a doubly linked list */ op->data.metadata.list.head = &s->incoming_metadata[mdidx]; op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; for (j = mdidx + 1; j < last_mdidx; j++) { @@ -1905,13 +1914,25 @@ static void patch_metadata_ops(stream *s) { } s->incoming_metadata[mdidx].prev = NULL; s->incoming_metadata[last_mdidx-1].next = NULL; + /* track where we're up to */ mdidx = last_mdidx; } - GPR_ASSERT(mdidx == s->incoming_metadata_count); - s->old_incoming_metadata = s->incoming_metadata; - s->incoming_metadata = NULL; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; + if (found_metadata) { + s->old_incoming_metadata = s->incoming_metadata; + if (mdidx != s->incoming_metadata_count) { + /* we have a partially read metadata batch still in incoming_metadata */ + size_t new_count = s->incoming_metadata_count - mdidx; + size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; + GPR_ASSERT(mdidx < s->incoming_metadata_count); + s->incoming_metadata = gpr_malloc(copy_bytes); + memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes); + s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; + } else { + s->incoming_metadata = NULL; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; + } + } } static void finish_reads(transport *t) { diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index ba8882278f5..c541ddfb487 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, : context->authority().c_str(), context->raw_deadline()); GRPC_TIMER_MARK(CALL_CREATED, c_call); - context->set_call(c_call); + context->set_call(c_call, shared_from_this()); return Call(c_call, this, cq); } diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index cd239247c82..46009d20bad 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -51,6 +51,7 @@ class Credentials; class StreamContextInterface; class Channel GRPC_FINAL : public GrpcLibrary, + public std::enable_shared_from_this, public ChannelInterface { public: Channel(const grpc::string& target, grpc_channel* c_channel); diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 29097661bce..7b3500233bd 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -158,7 +158,7 @@ void test_connect(const char *server_host, const char *client_host, int port, cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded", NULL); - cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_ERROR); + cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); grpc_call_destroy(c); diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index aea5a0fb27a..5dd64d0b134 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -105,6 +105,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { + EndThreads(); if (stream_) { SimpleResponse response; stream_->WritesDone();