Update C++ code to set status via the C api.

This prevents mismatches from breaking tests.
pull/30/head
Craig Tiller 10 years ago
parent 4de9b7d4a3
commit d248c24546
  1. 6
      include/grpc++/stream.h
  2. 2
      include/grpc++/stream_context_interface.h
  3. 8
      include/grpc/grpc.h
  4. 33
      src/core/surface/call.c
  5. 38
      src/cpp/stream/stream_context.cc
  6. 4
      src/cpp/stream/stream_context.h
  7. 1
      third_party/libevent

@ -96,7 +96,7 @@ class ClientReader : public ClientStreamingInterface,
virtual bool Read(R* msg) { return context_->Read(msg); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); }
@ -122,7 +122,7 @@ class ClientWriter : public ClientStreamingInterface,
virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
virtual void Cancel() { context_->Cancel(); }
// Read the final response and wait for the final status.
virtual const Status& Wait() {
@ -165,7 +165,7 @@ class ClientReaderWriter : public ClientStreamingInterface,
virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); }

@ -53,7 +53,7 @@ class StreamContextInterface {
virtual bool Read(google::protobuf::Message* msg) = 0;
virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
virtual const Status& Wait() = 0;
virtual void FinishStream(const Status& status, bool send) = 0;
virtual void Cancel() = 0;
virtual google::protobuf::Message* request() = 0;
virtual google::protobuf::Message* response() = 0;

@ -365,6 +365,14 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
Can be called multiple times, from any thread. */
grpc_call_error grpc_call_cancel(grpc_call *call);
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
If a status has not been received for the call, set it to the status code
and description passed in.
Importantly, this function does not send status nor description to the
remote endpoint. */
grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description);
/* Queue a byte buffer for writing.
flags is a bit-field combination of the write flags defined above.
A write with byte_buffer null is allowed, and will not send any bytes on the

@ -270,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c);
}
static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
if (!call->got_status_code) {
call->status_code = status;
call->got_status_code = 1;
}
}
static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
if (!call->status_details) {
call->status_details = grpc_mdstr_ref(status);
}
}
grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem;
grpc_call_op op;
@ -286,6 +299,17 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
return GRPC_CALL_OK;
}
grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) {
grpc_mdstr *details = description? grpc_mdstr_from_string(c->metadata_context, description) : NULL;
gpr_mu_lock(&c->read_mu);
maybe_set_status_code(c, status);
if (details) {
maybe_set_status_details(c, details);
}
gpr_mu_unlock(&c->read_mu);
return grpc_call_cancel(c);
}
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
@ -803,16 +827,11 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_mdstr *key = md->key;
gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call, grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
if (!call->got_status_code) {
call->status_code = decode_status(md);
call->got_status_code = 1;
}
maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (key == grpc_channel_get_message_string(call->channel)) {
if (!call->status_details) {
call->status_details = grpc_mdstr_ref(md->value);
}
maybe_set_status_details(call, md->value);
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else {

@ -112,9 +112,7 @@ bool StreamContext::Read(google::protobuf::Message* msg) {
if (read_ev->data.read) {
if (!DeserializeProto(read_ev->data.read, msg)) {
ret = false;
FinishStream(
Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"),
true);
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, "Failed to parse incoming proto");
}
} else {
ret = false;
@ -132,9 +130,7 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) {
if (msg) {
grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) {
FinishStream(Status(StatusCode::INVALID_ARGUMENT,
"Failed to serialize outgoing proto"),
true);
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, "Failed to serialize outgoing proto");
return false;
}
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
@ -172,29 +168,21 @@ const Status& StreamContext::Wait() {
grpc_event_finish(metadata_ev);
// TODO(yangg) protect states by a mutex, including other places.
if (!self_halfclosed_ || !peer_halfclosed_) {
FinishStream(Status::Cancelled, true);
} else {
grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
final_status_ = Status(
static_cast<StatusCode>(finish_ev->data.finished.status),
finish_ev->data.finished.details ? finish_ev->data.finished.details
: "");
grpc_event_finish(finish_ev);
}
return final_status_;
}
void StreamContext::FinishStream(const Status& status, bool send) {
if (send) {
grpc_call_cancel(call());
}
Cancel();
}
grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
final_status_ = Status(
static_cast<StatusCode>(finish_ev->data.finished.status),
finish_ev->data.finished.details ? finish_ev->data.finished.details
: "");
grpc_event_finish(finish_ev);
final_status_ = status;
return final_status_;
}
void StreamContext::Cancel() {
grpc_call_cancel(call());
}
} // namespace grpc

@ -48,7 +48,7 @@ namespace grpc {
class ClientContext;
class RpcMethod;
class StreamContext : public StreamContextInterface {
class StreamContext final : public StreamContextInterface {
public:
StreamContext(const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface {
bool Read(google::protobuf::Message* msg) override;
bool Write(const google::protobuf::Message* msg, bool is_last) override;
const Status& Wait() override;
void FinishStream(const Status& status, bool send) override;
void Cancel() override;
google::protobuf::Message* request() override { return request_; }
google::protobuf::Message* response() override { return result_; }

@ -0,0 +1 @@
Subproject commit f7d92c63928a1460f3d99b9bc418bd3b686a0dca
Loading…
Cancel
Save