Merge branch 'one-pass' into one-read

pull/1396/head
Craig Tiller 10 years ago
commit 42d7aa98ad
  1. 5
      include/grpc++/server.h
  2. 44
      src/core/surface/call.c
  3. 9
      src/core/surface/call.h
  4. 4
      src/core/surface/completion_queue.c
  5. 1
      src/core/surface/lame_client.c
  6. 2
      src/core/surface/server.c
  7. 9
      src/cpp/server/server.cc
  8. 3
      src/cpp/server/server_builder.cc

@ -80,7 +80,6 @@ class Server GRPC_FINAL : public GrpcLibrary,
// ServerBuilder use only // ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned); Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned);
Server() = delete;
// Register a service. This call does not take ownership of the service. // Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance. // The service must exist for the lifetime of the Server instance.
bool RegisterService(RpcService* service); bool RegisterService(RpcService* service);
@ -118,7 +117,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
int num_running_cb_; int num_running_cb_;
grpc::condition_variable callback_cv_; grpc::condition_variable callback_cv_;
std::list<SyncRequest> sync_methods_; std::list<SyncRequest>* sync_methods_;
// Pointer to the c grpc server. // Pointer to the c grpc server.
grpc_server* const server_; grpc_server* const server_;
@ -126,6 +125,8 @@ class Server GRPC_FINAL : public GrpcLibrary,
ThreadPoolInterface* thread_pool_; ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server. // Whether the thread pool is created and owned by the server.
bool thread_pool_owned_; bool thread_pool_owned_;
private:
Server() : server_(NULL) { abort(); }
}; };
} // namespace grpc } // namespace grpc

@ -299,7 +299,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
initial_op.on_done_recv = call_on_done_recv; initial_op.on_done_recv = call_on_done_recv;
initial_op.recv_user_data = call; initial_op.recv_user_data = call;
call->receiving = 1; call->receiving = 1;
grpc_call_internal_ref(call); GRPC_CALL_INTERNAL_REF(call, "receiving");
initial_op_ptr = &initial_op; initial_op_ptr = &initial_op;
} }
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
@ -319,8 +319,15 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq; return call->cq;
} }
void grpc_call_internal_ref(grpc_call *c) { #ifdef GRPC_CALL_REF_COUNT_DEBUG
gpr_ref(&c->internal_refcount); } void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
c->internal_refcount.count, c->internal_refcount.count + 1, reason);
#else
void grpc_call_internal_ref(grpc_call *c) {
#endif
gpr_ref(&c->internal_refcount);
}
static void destroy_call(void *call, int ignored_success) { static void destroy_call(void *call, int ignored_success) {
size_t i; size_t i;
@ -353,7 +360,14 @@ static void destroy_call(void *call, int ignored_success) {
gpr_free(c); gpr_free(c);
} }
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_unref(grpc_call *c, const char *reason,
int allow_immediate_deletion) {
gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
c->internal_refcount.count, c->internal_refcount.count - 1, reason);
#else
void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
#endif
if (gpr_unref(&c->internal_refcount)) { if (gpr_unref(&c->internal_refcount)) {
if (allow_immediate_deletion) { if (allow_immediate_deletion) {
destroy_call(c, 1); destroy_call(c, 1);
@ -411,8 +425,10 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
(call->write_state == WRITE_STATE_INITIAL && !call->is_client && call->read_state != READ_STATE_STREAM_CLOSED); grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
call->read_state != READ_STATE_STREAM_CLOSED);
} }
static void unlock(grpc_call *call) { static void unlock(grpc_call *call) {
@ -430,14 +446,14 @@ static void unlock(grpc_call *call) {
op.on_done_recv = call_on_done_recv; op.on_done_recv = call_on_done_recv;
op.recv_user_data = call; op.recv_user_data = call;
call->receiving = 1; call->receiving = 1;
grpc_call_internal_ref(call); GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1; start_op = 1;
} }
if (!call->sending) { if (!call->sending) {
if (fill_send_ops(call, &op)) { if (fill_send_ops(call, &op)) {
call->sending = 1; call->sending = 1;
grpc_call_internal_ref(call); GRPC_CALL_INTERNAL_REF(call, "sending");
start_op = 1; start_op = 1;
} }
} }
@ -448,7 +464,7 @@ static void unlock(grpc_call *call) {
sizeof(completed_requests)); sizeof(completed_requests));
call->num_completed_requests = 0; call->num_completed_requests = 0;
call->completing = 1; call->completing = 1;
grpc_call_internal_ref(call); GRPC_CALL_INTERNAL_REF(call, "completing");
} }
gpr_mu_unlock(&call->mu); gpr_mu_unlock(&call->mu);
@ -465,7 +481,7 @@ static void unlock(grpc_call *call) {
lock(call); lock(call);
call->completing = 0; call->completing = 0;
unlock(call); unlock(call);
grpc_call_internal_unref(call, 0); GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
} }
} }
@ -605,7 +621,7 @@ static void call_on_done_send(void *pc, int success) {
call->last_send_contains = 0; call->last_send_contains = 0;
call->sending = 0; call->sending = 0;
unlock(call); unlock(call);
grpc_call_internal_unref(call, 0); GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
} }
static void finish_message(grpc_call *call) { static void finish_message(grpc_call *call) {
@ -724,7 +740,7 @@ static void call_on_done_recv(void *pc, int success) {
call->recv_ops.nops = 0; call->recv_ops.nops = 0;
unlock(call); unlock(call);
grpc_call_internal_unref(call, 0); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
} }
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
@ -982,7 +998,7 @@ void grpc_call_destroy(grpc_call *c) {
cancel = c->read_state != READ_STATE_STREAM_CLOSED; cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c); unlock(c);
if (cancel) grpc_call_cancel(c); if (cancel) grpc_call_cancel(c);
grpc_call_internal_unref(c, 1); GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
} }
grpc_call_error grpc_call_cancel(grpc_call *call) { grpc_call_error grpc_call_cancel(grpc_call *call) {
@ -1029,7 +1045,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_cancel(call); grpc_call_cancel(call);
} }
} }
grpc_call_internal_unref(call, 1); GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
} }
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
@ -1038,7 +1054,7 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
assert(0); assert(0);
return; return;
} }
grpc_call_internal_ref(call); GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1; call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
} }

@ -93,8 +93,17 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *call, const char *reason);
void grpc_call_internal_unref(grpc_call *call, const char *reason, int allow_immediate_deletion);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call, reason)
#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, reason, allow_immediate_deletion)
#else
void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, allow_immediate_deletion)
#endif
grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,

@ -135,7 +135,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type) { grpc_completion_type type) {
gpr_ref(&cc->refs); gpr_ref(&cc->refs);
if (call) grpc_call_internal_ref(call); if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
#ifndef NDEBUG #ifndef NDEBUG
gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
#endif #endif
@ -411,7 +411,7 @@ void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base; event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) { if (ev->base.call) {
grpc_call_internal_unref(ev->base.call, 1); GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1);
} }
gpr_free(ev); gpr_free(ev);
} }

@ -76,7 +76,6 @@ static void lame_start_transport_op(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED; *op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv(op->recv_user_data, 1); op->on_done_recv(op->recv_user_data, 1);
} }
grpc_transport_op_finish_with_failure(op);
} }
static void channel_op(grpc_channel_element *elem, static void channel_op(grpc_channel_element *elem,

@ -1132,7 +1132,7 @@ static void begin_call(grpc_server *server, call_data *calld,
break; break;
} }
grpc_call_internal_ref(calld->call); GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
rc->tag); rc->tag);
} }

@ -180,6 +180,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
: started_(false), : started_(false),
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
server_(grpc_server_create(cq_.cq(), nullptr)), server_(grpc_server_create(cq_.cq(), nullptr)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {} thread_pool_owned_(thread_pool_owned) {}
@ -196,6 +197,7 @@ Server::~Server() {
if (thread_pool_owned_) { if (thread_pool_owned_) {
delete thread_pool_; delete thread_pool_;
} }
delete sync_methods_;
} }
bool Server::RegisterService(RpcService* service) { bool Server::RegisterService(RpcService* service) {
@ -208,7 +210,8 @@ bool Server::RegisterService(RpcService* service) {
method->name()); method->name());
return false; return false;
} }
sync_methods_.emplace_back(method, tag); SyncRequest request(method, tag);
sync_methods_->emplace_back(request);
} }
return true; return true;
} }
@ -250,8 +253,8 @@ bool Server::Start() {
grpc_server_start(server_); grpc_server_start(server_);
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_.empty()) { if (!sync_methods_->empty()) {
for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
m->Request(server_); m->Request(server_);
} }

@ -66,7 +66,8 @@ void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
void ServerBuilder::AddListeningPort(const grpc::string& addr, void ServerBuilder::AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds, std::shared_ptr<ServerCredentials> creds,
int* selected_port) { int* selected_port) {
ports_.push_back(Port{addr, creds, selected_port}); Port port = {addr, creds, selected_port};
ports_.push_back(port);
} }
void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {

Loading…
Cancel
Save