|
|
@ -112,7 +112,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) { |
|
|
|
if (read_ev->data.read) { |
|
|
|
if (read_ev->data.read) { |
|
|
|
if (!DeserializeProto(read_ev->data.read, msg)) { |
|
|
|
if (!DeserializeProto(read_ev->data.read, msg)) { |
|
|
|
ret = false; |
|
|
|
ret = false; |
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, "Failed to parse incoming proto"); |
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, |
|
|
|
|
|
|
|
"Failed to parse incoming proto"); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
ret = false; |
|
|
|
ret = false; |
|
|
@ -130,7 +131,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { |
|
|
|
if (msg) { |
|
|
|
if (msg) { |
|
|
|
grpc_byte_buffer* out_buf = nullptr; |
|
|
|
grpc_byte_buffer* out_buf = nullptr; |
|
|
|
if (!SerializeProto(*msg, &out_buf)) { |
|
|
|
if (!SerializeProto(*msg, &out_buf)) { |
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, "Failed to serialize outgoing proto"); |
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, |
|
|
|
|
|
|
|
"Failed to serialize outgoing proto"); |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; |
|
|
|
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; |
|
|
@ -169,20 +171,17 @@ const Status& StreamContext::Wait() { |
|
|
|
// TODO(yangg) protect states by a mutex, including other places.
|
|
|
|
// TODO(yangg) protect states by a mutex, including other places.
|
|
|
|
if (!self_halfclosed_ || !peer_halfclosed_) { |
|
|
|
if (!self_halfclosed_ || !peer_halfclosed_) { |
|
|
|
Cancel(); |
|
|
|
Cancel(); |
|
|
|
}
|
|
|
|
} |
|
|
|
grpc_event* finish_ev = |
|
|
|
grpc_event* finish_ev = |
|
|
|
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); |
|
|
|
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); |
|
|
|
GPR_ASSERT(finish_ev->type == GRPC_FINISHED); |
|
|
|
GPR_ASSERT(finish_ev->type == GRPC_FINISHED); |
|
|
|
final_status_ = Status( |
|
|
|
final_status_ = Status( |
|
|
|
static_cast<StatusCode>(finish_ev->data.finished.status), |
|
|
|
static_cast<StatusCode>(finish_ev->data.finished.status), |
|
|
|
finish_ev->data.finished.details ? finish_ev->data.finished.details |
|
|
|
finish_ev->data.finished.details ? finish_ev->data.finished.details : ""); |
|
|
|
: ""); |
|
|
|
|
|
|
|
grpc_event_finish(finish_ev); |
|
|
|
grpc_event_finish(finish_ev); |
|
|
|
return final_status_; |
|
|
|
return final_status_; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void StreamContext::Cancel() { |
|
|
|
void StreamContext::Cancel() { grpc_call_cancel(call()); } |
|
|
|
grpc_call_cancel(call()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} // namespace grpc
|
|
|
|
} // namespace grpc
|
|
|
|