mirror of https://github.com/grpc/grpc.git
parent
1d2e21962e
commit
40fcdaff0a
7 changed files with 7 additions and 285 deletions
@ -1,179 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2014, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/cpp/stream/stream_context.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
#include "src/cpp/proto/proto_utils.h" |
||||
#include "src/cpp/util/time.h" |
||||
#include <grpc++/client_context.h> |
||||
#include <grpc++/config.h> |
||||
#include <grpc++/impl/rpc_method.h> |
||||
#include <google/protobuf/message.h> |
||||
|
||||
namespace grpc { |
||||
|
||||
// Client only ctor
|
||||
StreamContext::StreamContext(const RpcMethod &method, ClientContext *context, |
||||
const google::protobuf::Message *request, |
||||
google::protobuf::Message *result) |
||||
: is_client_(true), |
||||
method_(&method), |
||||
call_(context->call()), |
||||
cq_(context->cq()), |
||||
request_(const_cast<google::protobuf::Message *>(request)), |
||||
result_(result), |
||||
peer_halfclosed_(false), |
||||
self_halfclosed_(false) { |
||||
GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); |
||||
} |
||||
|
||||
// Server only ctor
|
||||
StreamContext::StreamContext(const RpcMethod &method, grpc_call *call, |
||||
grpc_completion_queue *cq, |
||||
google::protobuf::Message *request, |
||||
google::protobuf::Message *result) |
||||
: is_client_(false), |
||||
method_(&method), |
||||
call_(call), |
||||
cq_(cq), |
||||
request_(request), |
||||
result_(result), |
||||
peer_halfclosed_(false), |
||||
self_halfclosed_(false) { |
||||
GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); |
||||
} |
||||
|
||||
StreamContext::~StreamContext() {} |
||||
|
||||
void StreamContext::Start(bool buffered) { |
||||
if (is_client_) { |
||||
// TODO(yangg) handle metadata send path
|
||||
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; |
||||
grpc_call_error error = grpc_call_invoke_old( |
||||
call(), cq(), client_metadata_read_tag(), finished_tag(), flag); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
} else { |
||||
// TODO(yangg) metadata needs to be added before accept
|
||||
// TODO(yangg) correctly set flag to accept
|
||||
GPR_ASSERT(grpc_call_server_accept_old(call(), cq(), finished_tag()) == |
||||
GRPC_CALL_OK); |
||||
GPR_ASSERT(grpc_call_server_end_initial_metadata_old(call(), 0) == |
||||
GRPC_CALL_OK); |
||||
} |
||||
} |
||||
|
||||
bool StreamContext::Read(google::protobuf::Message *msg) { |
||||
// TODO(yangg) check peer_halfclosed_ here for possible early return.
|
||||
grpc_call_error err = grpc_call_start_read_old(call(), read_tag()); |
||||
GPR_ASSERT(err == GRPC_CALL_OK); |
||||
grpc_event *read_ev = |
||||
grpc_completion_queue_pluck(cq(), read_tag(), gpr_inf_future); |
||||
GPR_ASSERT(read_ev->type == GRPC_READ); |
||||
bool ret = true; |
||||
if (read_ev->data.read) { |
||||
if (!DeserializeProto(read_ev->data.read, msg)) { |
||||
ret = false; |
||||
grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, |
||||
"Failed to parse incoming proto"); |
||||
} |
||||
} else { |
||||
ret = false; |
||||
peer_halfclosed_ = true; |
||||
} |
||||
grpc_event_finish(read_ev); |
||||
return ret; |
||||
} |
||||
|
||||
bool StreamContext::Write(const google::protobuf::Message *msg, bool is_last) { |
||||
// TODO(yangg) check self_halfclosed_ for possible early return.
|
||||
bool ret = true; |
||||
grpc_event *ev = nullptr; |
||||
|
||||
if (msg) { |
||||
grpc_byte_buffer *out_buf = nullptr; |
||||
if (!SerializeProto(*msg, &out_buf)) { |
||||
grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, |
||||
"Failed to serialize outgoing proto"); |
||||
return false; |
||||
} |
||||
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; |
||||
grpc_call_error err = |
||||
grpc_call_start_write_old(call(), out_buf, write_tag(), flag); |
||||
grpc_byte_buffer_destroy(out_buf); |
||||
GPR_ASSERT(err == GRPC_CALL_OK); |
||||
|
||||
ev = grpc_completion_queue_pluck(cq(), write_tag(), gpr_inf_future); |
||||
GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED); |
||||
|
||||
ret = ev->data.write_accepted == GRPC_OP_OK; |
||||
grpc_event_finish(ev); |
||||
} |
||||
if (ret && is_last) { |
||||
grpc_call_error err = grpc_call_writes_done_old(call(), halfclose_tag()); |
||||
GPR_ASSERT(err == GRPC_CALL_OK); |
||||
ev = grpc_completion_queue_pluck(cq(), halfclose_tag(), gpr_inf_future); |
||||
GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED); |
||||
grpc_event_finish(ev); |
||||
|
||||
self_halfclosed_ = true; |
||||
} else if (!ret) { // Stream broken
|
||||
self_halfclosed_ = true; |
||||
peer_halfclosed_ = true; |
||||
} |
||||
|
||||
return ret; |
||||
} |
||||
|
||||
const Status &StreamContext::Wait() { |
||||
// TODO(yangg) properly support metadata
|
||||
grpc_event *metadata_ev = grpc_completion_queue_pluck( |
||||
cq(), client_metadata_read_tag(), gpr_inf_future); |
||||
grpc_event_finish(metadata_ev); |
||||
// TODO(yangg) protect states by a mutex, including other places.
|
||||
if (!self_halfclosed_ || !peer_halfclosed_) { |
||||
Cancel(); |
||||
} |
||||
grpc_event *finish_ev = |
||||
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); |
||||
GPR_ASSERT(finish_ev->type == GRPC_FINISHED); |
||||
final_status_ = Status( |
||||
static_cast<StatusCode>(finish_ev->data.finished.status), |
||||
finish_ev->data.finished.details ? finish_ev->data.finished.details : ""); |
||||
grpc_event_finish(finish_ev); |
||||
return final_status_; |
||||
} |
||||
|
||||
void StreamContext::Cancel() { grpc_call_cancel(call()); } |
||||
|
||||
} // namespace grpc
|
@ -1,99 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2014, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ |
||||
#define __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc++/status.h> |
||||
#include <grpc++/stream_context_interface.h> |
||||
|
||||
namespace google { |
||||
namespace protobuf { |
||||
class Message; |
||||
} |
||||
} |
||||
|
||||
namespace grpc { |
||||
class ClientContext; |
||||
class RpcMethod; |
||||
|
||||
class StreamContext final : public StreamContextInterface { |
||||
public: |
||||
StreamContext(const RpcMethod &method, ClientContext *context, |
||||
const google::protobuf::Message *request, |
||||
google::protobuf::Message *result); |
||||
StreamContext(const RpcMethod &method, grpc_call *call, |
||||
grpc_completion_queue *cq, google::protobuf::Message *request, |
||||
google::protobuf::Message *result); |
||||
~StreamContext(); |
||||
// Start the stream, if there is a final write following immediately, set
|
||||
// buffered so that the messages can be sent in batch.
|
||||
void Start(bool buffered) override; |
||||
bool Read(google::protobuf::Message *msg) override; |
||||
bool Write(const google::protobuf::Message *msg, bool is_last) override; |
||||
const Status &Wait() override; |
||||
void Cancel() override; |
||||
|
||||
google::protobuf::Message *request() override { return request_; } |
||||
google::protobuf::Message *response() override { return result_; } |
||||
|
||||
private: |
||||
// Unique tags for plucking events from the c layer. this pointer is casted
|
||||
// to char* to create single byte step between tags. It implicitly relies on
|
||||
// that StreamContext is large enough to contain all the pointers.
|
||||
void *finished_tag() { return reinterpret_cast<char *>(this); } |
||||
void *read_tag() { return reinterpret_cast<char *>(this) + 1; } |
||||
void *write_tag() { return reinterpret_cast<char *>(this) + 2; } |
||||
void *halfclose_tag() { return reinterpret_cast<char *>(this) + 3; } |
||||
void *client_metadata_read_tag() { |
||||
return reinterpret_cast<char *>(this) + 5; |
||||
} |
||||
grpc_call *call() { return call_; } |
||||
grpc_completion_queue *cq() { return cq_; } |
||||
|
||||
bool is_client_; |
||||
const RpcMethod *method_; // not owned
|
||||
grpc_call *call_; // not owned
|
||||
grpc_completion_queue *cq_; // not owned
|
||||
google::protobuf::Message *request_; // first request, not owned
|
||||
google::protobuf::Message *result_; // last response, not owned
|
||||
|
||||
bool peer_halfclosed_; |
||||
bool self_halfclosed_; |
||||
Status final_status_; |
||||
}; |
||||
|
||||
} // namespace grpc
|
||||
|
||||
#endif // __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__
|
Loading…
Reference in new issue