From 2d29269f8ccb903343c03921c1d164bfbf0b401e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 14 Apr 2020 13:56:52 -0700 Subject: [PATCH] Make proper use of anonymous namespace --- src/core/lib/surface/server.cc | 363 ++++++++++++++++----------------- 1 file changed, 178 insertions(+), 185 deletions(-) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 08b15ac001a..b4825489291 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -52,11 +52,11 @@ grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel"); using grpc_core::LockedMultiProducerSingleConsumerQueue; -static void server_on_recv_initial_metadata(void* ptr, grpc_error* error); -static void server_recv_trailing_metadata_ready(void* user_data, - grpc_error* error); - namespace { + +void server_on_recv_initial_metadata(void* ptr, grpc_error* error); +void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error); + struct listener { void* arg; void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets, @@ -117,13 +117,13 @@ struct channel_data { intptr_t channelz_socket_uuid; }; -typedef struct shutdown_tag { +struct shutdown_tag { void* tag; grpc_completion_queue* cq; grpc_cq_completion completion; -} shutdown_tag; +}; -typedef enum { +enum call_state { /* waiting for metadata */ NOT_STARTED, /* initial metadata read, not flow controlled in yet */ @@ -132,9 +132,9 @@ typedef enum { ACTIVATED, /* cancelled before being queued */ ZOMBIED -} call_state; +}; -typedef struct request_matcher request_matcher; +struct request_matcher; struct call_data { call_data(grpc_call_element* elem, const grpc_call_element_args& args) @@ -144,7 +144,7 @@ struct call_data { ::server_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, - server_recv_trailing_metadata_ready, elem, + ::server_recv_trailing_metadata_ready, elem, grpc_schedule_on_exec_ctx); } ~call_data() { @@ -213,10 +213,10 @@ struct registered_method { registered_method* next; }; -typedef struct { +struct channel_broadcaster { grpc_channel** channels; size_t num_channels; -} channel_broadcaster; +}; } // namespace struct grpc_server { @@ -269,19 +269,20 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data*)(elem)->channel_data)->server) -static void publish_new_rpc(void* calld, grpc_error* error); -static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, - grpc_error* error); +namespace { +void publish_new_rpc(void* calld, grpc_error* error); +void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, + grpc_error* error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ -static void maybe_finish_shutdown(grpc_server* server); +void maybe_finish_shutdown(grpc_server* server); /* * channel broadcaster */ /* assumes server locked */ -static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) { +void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) { channel_data* c; size_t count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { @@ -302,15 +303,15 @@ struct shutdown_cleanup_args { grpc_slice slice; }; -static void shutdown_cleanup(void* arg, grpc_error* /*error*/) { +void shutdown_cleanup(void* arg, grpc_error* /*error*/) { struct shutdown_cleanup_args* a = static_cast(arg); grpc_slice_unref_internal(a->slice); gpr_free(a); } -static void send_shutdown(grpc_channel* channel, bool send_goaway, - grpc_error* send_disconnect) { +void send_shutdown(grpc_channel* channel, bool send_goaway, + grpc_error* send_disconnect) { struct shutdown_cleanup_args* sc = static_cast(gpr_malloc(sizeof(*sc))); GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc, @@ -331,9 +332,8 @@ static void send_shutdown(grpc_channel* channel, bool send_goaway, elem->filter->start_transport_op(elem, op); } -static void channel_broadcaster_shutdown(channel_broadcaster* cb, - bool send_goaway, - grpc_error* force_disconnect) { +void channel_broadcaster_shutdown(channel_broadcaster* cb, bool send_goaway, + grpc_error* force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -349,7 +349,7 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb, * request_matcher */ -static void request_matcher_init(request_matcher* rm, grpc_server* server) { +void request_matcher_init(request_matcher* rm, grpc_server* server) { rm->server = server; rm->pending_head = rm->pending_tail = nullptr; rm->requests_per_cq = static_cast( @@ -359,7 +359,7 @@ static void request_matcher_init(request_matcher* rm, grpc_server* server) { } } -static void request_matcher_destroy(request_matcher* rm) { +void request_matcher_destroy(request_matcher* rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { GPR_ASSERT(rm->requests_per_cq[i].Pop() == nullptr); rm->requests_per_cq[i].~LockedMultiProducerSingleConsumerQueue(); @@ -367,12 +367,12 @@ static void request_matcher_destroy(request_matcher* rm) { gpr_free(rm->requests_per_cq); } -static void kill_zombie(void* elem, grpc_error* /*error*/) { +void kill_zombie(void* elem, grpc_error* /*error*/) { grpc_call_unref( grpc_call_from_top_element(static_cast(elem))); } -static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { +void request_matcher_zombify_all_pending_calls(request_matcher* rm) { while (rm->pending_head) { call_data* calld = rm->pending_head; rm->pending_head = calld->pending_next; @@ -386,9 +386,8 @@ static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { } } -static void request_matcher_kill_requests(grpc_server* server, - request_matcher* rm, - grpc_error* error) { +void request_matcher_kill_requests(grpc_server* server, request_matcher* rm, + grpc_error* error) { requested_call* rc; for (size_t i = 0; i < server->cq_count; i++) { while ((rc = reinterpret_cast( @@ -403,9 +402,9 @@ static void request_matcher_kill_requests(grpc_server* server, * server proper */ -static void server_ref(grpc_server* server) { server->internal_refcount.Ref(); } +void server_ref(grpc_server* server) { server->internal_refcount.Ref(); } -static void server_delete(grpc_server* server) { +void server_delete(grpc_server* server) { registered_method* rm; size_t i; server->channelz_server.reset(); @@ -434,30 +433,28 @@ static void server_delete(grpc_server* server) { gpr_free(server); } -static void server_unref(grpc_server* server) { +void server_unref(grpc_server* server) { if (GPR_UNLIKELY(server->internal_refcount.Unref())) { server_delete(server); } } -static int is_channel_orphaned(channel_data* chand) { - return chand->next == chand; -} +int is_channel_orphaned(channel_data* chand) { return chand->next == chand; } -static void orphan_channel(channel_data* chand) { +void orphan_channel(channel_data* chand) { chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; } -static void finish_destroy_channel(void* cd, grpc_error* /*error*/) { +void finish_destroy_channel(void* cd, grpc_error* /*error*/) { channel_data* chand = static_cast(cd); grpc_server* server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); server_unref(server); } -static void destroy_channel(channel_data* chand) { +void destroy_channel(channel_data* chand) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != nullptr); orphan_channel(chand); @@ -478,12 +475,10 @@ static void destroy_channel(channel_data* chand) { op); } -static void done_request_event(void* req, grpc_cq_completion* /*c*/) { - gpr_free(req); -} +void done_request_event(void* req, grpc_cq_completion* /*c*/) { gpr_free(req); } -static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, - requested_call* rc) { +void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, + requested_call* rc) { grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); grpc_call* call = calld->call; *rc->call = call; @@ -515,7 +510,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, rc, &rc->completion, true); } -static void publish_new_rpc(void* arg, grpc_error* error) { +void publish_new_rpc(void* arg, grpc_error* error) { grpc_call_element* call_elem = static_cast(arg); call_data* calld = static_cast(call_elem->call_data); channel_data* chand = static_cast(call_elem->channel_data); @@ -580,7 +575,7 @@ static void publish_new_rpc(void* arg, grpc_error* error) { gpr_mu_unlock(&server->mu_call); } -static void finish_start_new_rpc( +void finish_start_new_rpc( grpc_server* server, grpc_call_element* elem, request_matcher* rm, grpc_server_register_method_payload_handling payload_handling) { call_data* calld = static_cast(elem->call_data); @@ -614,7 +609,7 @@ static void finish_start_new_rpc( } } -static void start_new_rpc(grpc_call_element* elem) { +void start_new_rpc(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); grpc_server* server = chand->server; @@ -665,7 +660,7 @@ static void start_new_rpc(grpc_call_element* elem) { GRPC_SRM_PAYLOAD_NONE); } -static int num_listeners(grpc_server* server) { +int num_listeners(grpc_server* server) { listener* l; int n = 0; for (l = server->listeners; l; l = l->next) { @@ -674,12 +669,11 @@ static int num_listeners(grpc_server* server) { return n; } -static void done_shutdown_event(void* server, - grpc_cq_completion* /*completion*/) { +void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) { server_unref(static_cast(server)); } -static int num_channels(grpc_server* server) { +int num_channels(grpc_server* server) { channel_data* chand; int n = 0; for (chand = server->root_channel_data.next; @@ -689,7 +683,7 @@ static int num_channels(grpc_server* server) { return n; } -static void kill_pending_work_locked(grpc_server* server, grpc_error* error) { +void kill_pending_work_locked(grpc_server* server, grpc_error* error) { if (server->started) { request_matcher_kill_requests(server, &server->unregistered_request_matcher, GRPC_ERROR_REF(error)); @@ -705,7 +699,7 @@ static void kill_pending_work_locked(grpc_server* server, grpc_error* error) { GRPC_ERROR_UNREF(error); } -static void maybe_finish_shutdown(grpc_server* server) { +void maybe_finish_shutdown(grpc_server* server) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; @@ -740,7 +734,7 @@ static void maybe_finish_shutdown(grpc_server* server) { } } -static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { +void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { grpc_call_element* elem = static_cast(ptr); call_data* calld = static_cast(elem->call_data); grpc_millis op_deadline; @@ -786,8 +780,7 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); } -static void server_recv_trailing_metadata_ready(void* user_data, - grpc_error* error) { +void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); if (calld->on_done_recv_initial_metadata != nullptr) { @@ -808,8 +801,8 @@ static void server_recv_trailing_metadata_ready(void* user_data, calld->original_recv_trailing_metadata_ready, error); } -static void server_mutate_op(grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { +void server_mutate_op(grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { call_data* calld = static_cast(elem->call_data); if (op->recv_initial_metadata) { @@ -831,13 +824,13 @@ static void server_mutate_op(grpc_call_element* elem, } } -static void server_start_transport_stream_op_batch( +void server_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { server_mutate_op(elem, op); grpc_call_next_op(elem, op); } -static void got_initial_metadata(void* ptr, grpc_error* error) { +void got_initial_metadata(void* ptr, grpc_error* error) { grpc_call_element* elem = static_cast(ptr); call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { @@ -855,8 +848,8 @@ static void got_initial_metadata(void* ptr, grpc_error* error) { } } -static void accept_stream(void* cd, grpc_transport* /*transport*/, - const void* transport_server_data) { +void accept_stream(void* cd, grpc_transport* /*transport*/, + const void* transport_server_data) { channel_data* chand = static_cast(cd); /* create a call */ grpc_call_create_args args; @@ -891,25 +884,25 @@ static void accept_stream(void* cd, grpc_transport* /*transport*/, grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata); } -static grpc_error* server_init_call_elem(grpc_call_element* elem, - const grpc_call_element_args* args) { +grpc_error* server_init_call_elem(grpc_call_element* elem, + const grpc_call_element_args* args) { channel_data* chand = static_cast(elem->channel_data); server_ref(chand->server); new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } -static void server_destroy_call_elem(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { +void server_destroy_call_elem(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { call_data* calld = static_cast(elem->call_data); calld->~call_data(); channel_data* chand = static_cast(elem->channel_data); server_unref(chand->server); } -static grpc_error* server_init_channel_elem(grpc_channel_element* elem, - grpc_channel_element_args* args) { +grpc_error* server_init_channel_elem(grpc_channel_element* elem, + grpc_channel_element_args* args) { channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(args->is_first); GPR_ASSERT(!args->is_last); @@ -920,7 +913,7 @@ static grpc_error* server_init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_NONE; } -static void server_destroy_channel_elem(grpc_channel_element* elem) { +void server_destroy_channel_elem(grpc_channel_element* elem) { size_t i; channel_data* chand = static_cast(elem->channel_data); if (chand->registered_methods) { @@ -954,6 +947,121 @@ static void server_destroy_channel_elem(grpc_channel_element* elem) { } } +void register_completion_queue(grpc_server* server, grpc_completion_queue* cq, + void* reserved) { + size_t i, n; + GPR_ASSERT(!reserved); + for (i = 0; i < server->cq_count; i++) { + if (server->cqs[i] == cq) return; + } + + GRPC_CQ_INTERNAL_REF(cq, "server"); + n = server->cq_count++; + server->cqs = static_cast(gpr_realloc( + server->cqs, server->cq_count * sizeof(grpc_completion_queue*))); + server->cqs[n] = cq; +} + +int streq(const char* a, const char* b) { + if (a == nullptr && b == nullptr) return 1; + if (a == nullptr) return 0; + if (b == nullptr) return 0; + return 0 == strcmp(a, b); +} + +class ConnectivityWatcher + : public grpc_core::AsyncConnectivityStateWatcherInterface { + public: + explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { + GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity"); + } + + ~ConnectivityWatcher() { + GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity"); + } + + private: + void OnConnectivityStateChange(grpc_connectivity_state new_state) override { + // Don't do anything until we are being shut down. + if (new_state != GRPC_CHANNEL_SHUTDOWN) return; + // Shut down channel. + grpc_server* server = chand_->server; + gpr_mu_lock(&server->mu_global); + destroy_channel(chand_); + gpr_mu_unlock(&server->mu_global); + } + + channel_data* chand_; +}; + +void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) { + (void)done_arg; + gpr_free(storage); +} + +void listener_destroy_done(void* s, grpc_error* /*error*/) { + grpc_server* server = static_cast(s); + gpr_mu_lock(&server->mu_global); + server->listeners_destroyed++; + maybe_finish_shutdown(server); + gpr_mu_unlock(&server->mu_global); +} + +grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, + requested_call* rc) { + call_data* calld = nullptr; + request_matcher* rm = nullptr; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + fail_call(server, cq_idx, rc, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + return GRPC_CALL_OK; + } + switch (rc->type) { + case BATCH_CALL: + rm = &server->unregistered_request_matcher; + break; + case REGISTERED_CALL: + rm = &rc->data.registered.method->matcher; + break; + } + if (rm->requests_per_cq[cq_idx].Push(rc->mpscq_node.get())) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server->mu_call); + while ((calld = rm->pending_head) != nullptr) { + rc = reinterpret_cast(rm->requests_per_cq[cq_idx].Pop()); + if (rc == nullptr) break; + rm->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); + } else { + publish_call(server, calld, cq_idx, rc); + } + gpr_mu_lock(&server->mu_call); + } + gpr_mu_unlock(&server->mu_call); + } + return GRPC_CALL_OK; +} + +void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, + grpc_error* error) { + *rc->call = nullptr; + rc->initial_metadata->count = 0; + GPR_ASSERT(error != GRPC_ERROR_NONE); + + grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc, + &rc->completion); +} +} // namespace + const grpc_channel_filter grpc_server_top_filter = { server_start_transport_stream_op_batch, grpc_channel_next_op, @@ -968,22 +1076,6 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -static void register_completion_queue(grpc_server* server, - grpc_completion_queue* cq, - void* reserved) { - size_t i, n; - GPR_ASSERT(!reserved); - for (i = 0; i < server->cq_count; i++) { - if (server->cqs[i] == cq) return; - } - - GRPC_CQ_INTERNAL_REF(cq, "server"); - n = server->cq_count++; - server->cqs = static_cast(gpr_realloc( - server->cqs, server->cq_count * sizeof(grpc_completion_queue*))); - server->cqs[n] = cq; -} - void grpc_server_register_completion_queue(grpc_server* server, grpc_completion_queue* cq, void* reserved) { @@ -1049,13 +1141,6 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { return server; } -static int streq(const char* a, const char* b) { - if (a == nullptr && b == nullptr) return 1; - if (a == nullptr) return 0; - if (b == nullptr) return 0; - return 0 == strcmp(a, b); -} - void* grpc_server_register_method( grpc_server* server, const char* method, const char* host, grpc_server_register_method_payload_handling payload_handling, @@ -1133,31 +1218,6 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, *pollsets = server->pollsets; } -class ConnectivityWatcher - : public grpc_core::AsyncConnectivityStateWatcherInterface { - public: - explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { - GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity"); - } - - ~ConnectivityWatcher() { - GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity"); - } - - private: - void OnConnectivityStateChange(grpc_connectivity_state new_state) override { - // Don't do anything until we are being shut down. - if (new_state != GRPC_CHANNEL_SHUTDOWN) return; - // Shut down channel. - grpc_server* server = chand_->server; - gpr_mu_lock(&server->mu_global); - destroy_channel(chand_); - gpr_mu_unlock(&server->mu_global); - } - - channel_data* chand_; -}; - void grpc_server_setup_transport( grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, @@ -1257,19 +1317,6 @@ void grpc_server_setup_transport( grpc_transport_perform_op(transport, op); } -void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) { - (void)done_arg; - gpr_free(storage); -} - -static void listener_destroy_done(void* s, grpc_error* /*error*/) { - grpc_server* server = static_cast(s); - gpr_mu_lock(&server->mu_global); - server->listeners_destroyed++; - maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu_global); -} - /* - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via grpc_server_request_call and grpc_server_request_registered call will now be @@ -1419,50 +1466,6 @@ void grpc_server_add_listener( server->listeners = l; } -static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, - requested_call* rc) { - call_data* calld = nullptr; - request_matcher* rm = nullptr; - if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(server, cq_idx, rc, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); - return GRPC_CALL_OK; - } - switch (rc->type) { - case BATCH_CALL: - rm = &server->unregistered_request_matcher; - break; - case REGISTERED_CALL: - rm = &rc->data.registered.method->matcher; - break; - } - if (rm->requests_per_cq[cq_idx].Push(rc->mpscq_node.get())) { - /* this was the first queued request: we need to lock and start - matching calls */ - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head) != nullptr) { - rc = reinterpret_cast(rm->requests_per_cq[cq_idx].Pop()); - if (rc == nullptr) break; - rm->pending_head = calld->pending_next; - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, - GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } - gpr_mu_lock(&server->mu_call); - } - gpr_mu_unlock(&server->mu_call); - } - return GRPC_CALL_OK; -} - grpc_call_error grpc_server_request_call( grpc_server* server, grpc_call** call, grpc_call_details* details, grpc_metadata_array* initial_metadata, @@ -1563,16 +1566,6 @@ grpc_call_error grpc_server_request_registered_call( return queue_call_request(server, cq_idx, rc); } -static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, - grpc_error* error) { - *rc->call = nullptr; - rc->initial_metadata->count = 0; - GPR_ASSERT(error != GRPC_ERROR_NONE); - - grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc, - &rc->completion); -} - const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) { return server->channel_args; }