|
|
|
@ -80,9 +80,8 @@ void StreamContext::Start(bool buffered) { |
|
|
|
|
if (is_client_) { |
|
|
|
|
// TODO(yangg) handle metadata send path
|
|
|
|
|
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; |
|
|
|
|
grpc_call_error error = grpc_call_invoke(call(), cq(),
|
|
|
|
|
client_metadata_read_tag(), |
|
|
|
|
finished_tag(), flag); |
|
|
|
|
grpc_call_error error = grpc_call_invoke( |
|
|
|
|
call(), cq(), client_metadata_read_tag(), finished_tag(), flag); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == error); |
|
|
|
|
} else { |
|
|
|
|
// TODO(yangg) metadata needs to be added before accept
|
|
|
|
@ -105,7 +104,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) { |
|
|
|
|
if (read_ev->data.read) { |
|
|
|
|
if (!DeserializeProto(read_ev->data.read, msg)) { |
|
|
|
|
ret = false; |
|
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, "Failed to parse incoming proto"); |
|
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, |
|
|
|
|
"Failed to parse incoming proto"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
ret = false; |
|
|
|
@ -123,7 +123,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { |
|
|
|
|
if (msg) { |
|
|
|
|
grpc_byte_buffer* out_buf = nullptr; |
|
|
|
|
if (!SerializeProto(*msg, &out_buf)) { |
|
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, "Failed to serialize outgoing proto"); |
|
|
|
|
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, |
|
|
|
|
"Failed to serialize outgoing proto"); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; |
|
|
|
@ -162,20 +163,17 @@ const Status& StreamContext::Wait() { |
|
|
|
|
// TODO(yangg) protect states by a mutex, including other places.
|
|
|
|
|
if (!self_halfclosed_ || !peer_halfclosed_) { |
|
|
|
|
Cancel(); |
|
|
|
|
}
|
|
|
|
|
} |
|
|
|
|
grpc_event* finish_ev = |
|
|
|
|
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); |
|
|
|
|
GPR_ASSERT(finish_ev->type == GRPC_FINISHED); |
|
|
|
|
final_status_ = Status( |
|
|
|
|
static_cast<StatusCode>(finish_ev->data.finished.status), |
|
|
|
|
finish_ev->data.finished.details ? finish_ev->data.finished.details |
|
|
|
|
: ""); |
|
|
|
|
finish_ev->data.finished.details ? finish_ev->data.finished.details : ""); |
|
|
|
|
grpc_event_finish(finish_ev); |
|
|
|
|
return final_status_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void StreamContext::Cancel() { |
|
|
|
|
grpc_call_cancel(call()); |
|
|
|
|
} |
|
|
|
|
void StreamContext::Cancel() { grpc_call_cancel(call()); } |
|
|
|
|
|
|
|
|
|
} // namespace grpc
|
|
|
|
|