Fix client_crash_test, implement idempotency, fail_fast for C++

pull/5726/head
Craig Tiller 9 years ago
parent 2fe814d0ce
commit 399b3c42de
  1. 36
      include/grpc++/impl/codegen/async_stream.h
  2. 9
      include/grpc++/impl/codegen/async_unary_call.h
  3. 7
      include/grpc++/impl/codegen/call.h
  4. 13
      include/grpc++/impl/codegen/client_context.h
  5. 3
      include/grpc++/impl/codegen/client_unary_call.h
  6. 29
      include/grpc++/impl/codegen/method_handler_impl.h
  7. 2
      include/grpc++/impl/codegen/server_context.h
  8. 24
      include/grpc++/impl/codegen/sync_stream.h
  9. 2
      src/cpp/client/client_context.cc
  10. 2
      test/cpp/end2end/client_crash_test.cc

@ -108,7 +108,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
@ -173,7 +174,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
finish_ops_.RecvMessage(response);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@ -240,7 +242,8 @@ class ClientAsyncReaderWriter GRPC_FINAL
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@ -305,7 +308,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -319,7 +323,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const W& msg, const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@ -336,7 +341,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -366,7 +372,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -374,7 +381,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -385,7 +393,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -415,7 +424,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -429,7 +439,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -440,7 +451,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);

@ -67,7 +67,8 @@ class ClientAsyncResponseReader GRPC_FINAL
call_(channel->CreateCall(method, context, cq)),
collection_(new CallOpSetCollection) {
collection_->init_buf_.SetCollection(collection_);
collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
collection_->init_buf_.SendInitialMetadata(
context->send_initial_metadata_, context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
collection_->init_buf_.ClientSendClose();
@ -122,7 +123,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
@ -130,7 +132,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.

@ -181,8 +181,10 @@ class CallOpSendInitialMetadata {
CallOpSendInitialMetadata() : send_(false) {}
void SendInitialMetadata(
const std::multimap<grpc::string, grpc::string>& metadata) {
const std::multimap<grpc::string, grpc::string>& metadata,
uint32_t flags) {
send_ = true;
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
}
@ -192,7 +194,7 @@ class CallOpSendInitialMetadata {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->flags = flags_;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
@ -204,6 +206,7 @@ class CallOpSendInitialMetadata {
}
bool send_;
uint32_t flags_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
};

@ -221,6 +221,12 @@ class ClientContext {
deadline_ = deadline_tp.raw_time();
}
/// EXPERIMENTAL: Set this request to be idempotent
void set_idempotent(bool idempotent) { idempotent_ = idempotent; }
/// EXPERIMENTAL: Trigger fail-fast or not on this request
void set_fail_fast(bool fail_fast) { fail_fast_ = fail_fast; }
#ifndef GRPC_CXX0X_NO_CHRONO
/// Return the deadline for the client call.
std::chrono::system_clock::time_point deadline() {
@ -328,9 +334,16 @@ class ClientContext {
grpc_call* call() { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
uint32_t initial_metadata_flags() const {
return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) |
(fail_fast_ ? 0 : GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY);
}
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
bool fail_fast_;
bool idempotent_;
std::shared_ptr<Channel> channel_;
grpc::mutex mu_;
grpc_call* call_;

@ -62,7 +62,8 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
if (!status.ok()) {
return status;
}
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();

@ -44,10 +44,10 @@ namespace grpc {
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
RpcMethodHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
const RequestType*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
@ -63,7 +63,8 @@ class RpcMethodHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -87,7 +88,8 @@ class ClientStreamingHandler : public MethodHandler {
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReader<RequestType>*, ResponseType*)> func,
ServerReader<RequestType>*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
@ -100,7 +102,8 @@ class ClientStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -122,7 +125,8 @@ class ServerStreamingHandler : public MethodHandler {
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func,
ServerWriter<ResponseType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
@ -138,7 +142,8 @@ class ServerStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -170,7 +175,8 @@ class BidiStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -191,7 +197,8 @@ class UnknownMethodHandler : public MethodHandler {
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);

@ -195,6 +195,8 @@ class ServerContext {
void set_call(grpc_call* call);
uint32_t initial_metadata_flags() const { return 0; }
CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;

@ -125,7 +125,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
@ -190,7 +191,8 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@ -268,7 +270,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@ -334,7 +337,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -361,7 +365,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -374,7 +379,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
@ -397,7 +403,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -417,7 +424,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);

@ -60,6 +60,8 @@ static ClientContext::GlobalCallbacks* g_client_callbacks =
ClientContext::ClientContext()
: initial_metadata_received_(false),
fail_fast_(true),
idempotent_(false),
call_(nullptr),
call_canceled_(false),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),

@ -88,6 +88,7 @@ TEST_F(CrashTest, KillBeforeWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);
@ -113,6 +114,7 @@ TEST_F(CrashTest, KillAfterWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);

Loading…
Cancel
Save