Merge pull request #5099 from vjpai/cpp_races

Move internals of many C++ Async classes to ref-counted struct
pull/5138/head
Nicolas Noble 9 years ago
commit e2c7671170
  1. 46
      include/grpc++/impl/codegen/async_unary_call.h
  2. 18
      include/grpc++/impl/codegen/call.h

@ -62,40 +62,50 @@ class ClientAsyncResponseReader GRPC_FINAL
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.SendInitialMetadata(context->send_initial_metadata_);
: context_(context),
call_(channel->CreateCall(method, context, cq)),
collection_(new CallOpSetCollection) {
collection_->init_buf_.SetCollection(collection_);
collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_buf_.SendMessage(request).ok());
init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok());
collection_->init_buf_.ClientSendClose();
call_.PerformOps(&collection_->init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.set_output_tag(tag);
meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
collection_->meta_buf_.SetCollection(collection_);
collection_->meta_buf_.set_output_tag(tag);
collection_->meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&collection_->meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.set_output_tag(tag);
collection_->finish_buf_.SetCollection(collection_);
collection_->finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.RecvInitialMetadata(context_);
collection_->finish_buf_.RecvInitialMetadata(context_);
}
finish_buf_.RecvMessage(msg);
finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
collection_->finish_buf_.RecvMessage(msg);
collection_->finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&collection_->finish_buf_);
}
private:
ClientContext* context_;
Call call_;
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
CallOpClientRecvStatus> finish_buf_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
CallOpClientRecvStatus> finish_buf_;
};
std::shared_ptr<CallOpSetCollection> collection_;
};
template <class W>

@ -472,6 +472,17 @@ class CallOpClientRecvStatus {
size_t status_details_capacity_;
};
/// An abstract collection of CallOpSet's, to be used whenever
/// CallOpSet objects must be thought of as a group. Each member
/// of the group should have a shared_ptr back to the collection,
/// as will the object that instantiates the collection, allowing
/// for automatic ref-counting. In practice, any actual use should
/// derive from this base class. This is specifically necessary if
/// some of the CallOpSet's in the collection are "Sneaky" and don't
/// report back to the C++ layer CQ operations
class CallOpSetCollectionInterface
: public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
@ -488,8 +499,14 @@ class CallOpSetInterface : public CompletionQueueTag {
max_message_size_ = max_message_size;
}
/// Mark this as belonging to a collection if needed
void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
collection_ = collection;
}
protected:
int max_message_size_;
std::shared_ptr<CallOpSetCollectionInterface> collection_;
};
/// Primary implementaiton of CallOpSetInterface.
@ -527,6 +544,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op5::FinishOp(status, max_message_size_);
this->Op6::FinishOp(status, max_message_size_);
*tag = return_tag_;
collection_.reset(); // drop the ref at this point
return true;
}

Loading…
Cancel
Save