Merge pull request #22757 from vjpai/core_server_cxx

C++ify grpc_server
pull/23571/head
Vijay Pai 4 years ago committed by GitHub
commit 172f61a3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  2. 8
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  3. 7
      src/core/lib/iomgr/tcp_server.cc
  4. 12
      src/core/lib/iomgr/tcp_server.h
  5. 6
      src/core/lib/iomgr/tcp_server_custom.cc
  6. 31
      src/core/lib/iomgr/tcp_server_posix.cc
  7. 7
      src/core/lib/iomgr/tcp_server_utils_posix.h
  8. 6
      src/core/lib/iomgr/tcp_server_windows.cc
  9. 38
      src/core/lib/iomgr/udp_server.cc
  10. 7
      src/core/lib/iomgr/udp_server.h
  11. 1113
      src/core/lib/surface/server.cc
  12. 17
      src/core/lib/surface/server.h
  13. 2
      test/core/end2end/bad_server_response_test.cc
  14. 23
      test/core/end2end/fixtures/http_proxy_fixture.cc
  15. 10
      test/core/iomgr/tcp_server_posix_test.cc
  16. 14
      test/core/iomgr/udp_server_test.cc
  17. 43
      test/core/surface/concurrent_connectivity_test.cc
  18. 21
      test/core/util/test_tcp_server.cc
  19. 16
      test/core/util/test_tcp_server.h
  20. 9
      test/cpp/microbenchmarks/fullstack_fixtures.h
  21. 9
      test/cpp/performance/writes_per_rpc_test.cc

@ -23,6 +23,7 @@
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
@ -64,8 +65,8 @@ class Chttp2ServerListener : public ServerListenerInterface {
Chttp2ServerListener(grpc_server* server, grpc_channel_args* args);
~Chttp2ServerListener();
void Start(grpc_server* server, grpc_pollset** pollsets,
size_t npollsets) override;
void Start(grpc_server* server,
const std::vector<grpc_pollset*>* pollsets) override;
channelz::ListenSocketNode* channelz_listen_socket_node() const override {
return channelz_listen_socket_.get();
@ -383,13 +384,12 @@ Chttp2ServerListener::~Chttp2ServerListener() {
/* Server callback: start listening on our ports */
void Chttp2ServerListener::Start(grpc_server* /*server*/,
grpc_pollset** pollsets,
size_t pollset_count) {
const std::vector<grpc_pollset*>* pollsets) {
{
MutexLock lock(&mu_);
shutdown_ = false;
}
grpc_tcp_server_start(tcp_server_, pollsets, pollset_count, OnAccept, this);
grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
}
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {

@ -51,12 +51,8 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
grpc_server_get_pollsets(server, &pollsets, &num_pollsets);
for (size_t i = 0; i < num_pollsets; i++) {
grpc_endpoint_add_to_pollset(server_endpoint, pollsets[i]);
for (grpc_pollset* pollset : grpc_server_get_pollsets(server)) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_server_setup_transport(server, transport, nullptr, server_args, nullptr);

@ -28,11 +28,10 @@ grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
return grpc_tcp_server_impl->create(shutdown_complete, args, server);
}
void grpc_tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
size_t pollset_count,
void grpc_tcp_server_start(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* pollsets,
grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
grpc_tcp_server_impl->start(server, pollsets, pollset_count, on_accept_cb,
cb_arg);
grpc_tcp_server_impl->start(server, pollsets, on_accept_cb, cb_arg);
}
grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,

@ -24,6 +24,8 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <vector>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -64,9 +66,9 @@ typedef struct grpc_tcp_server_vtable {
grpc_error* (*create)(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server);
void (*start)(grpc_tcp_server* server, grpc_pollset** pollsets,
size_t pollset_count, grpc_tcp_server_cb on_accept_cb,
void* cb_arg);
void (*start)(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* pollsets,
grpc_tcp_server_cb on_accept_cb, void* cb_arg);
grpc_error* (*add_port)(grpc_tcp_server* s, const grpc_resolved_address* addr,
int* out_port);
grpc_core::TcpServerFdHandler* (*create_fd_handler)(grpc_tcp_server* s);
@ -87,8 +89,8 @@ grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
grpc_tcp_server** server);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
size_t pollset_count,
void grpc_tcp_server_start(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* pollsets,
grpc_tcp_server_cb on_accept_cb, void* cb_arg);
/* Add a port to the server, returning the newly allocated port on success, or

@ -417,12 +417,10 @@ static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
return error;
}
static void tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
size_t pollset_count,
static void tcp_server_start(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* /*pollsets*/,
grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
grpc_tcp_listener* sp;
(void)pollsets;
(void)pollset_count;
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "SERVER_START %p", server);

@ -247,10 +247,10 @@ static void on_read(void* arg, grpc_error* err) {
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1)) %
sp->server->pollset_count];
read_notifier_pollset = (*(sp->server->pollsets))
[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1)) %
sp->server->pollsets->size()];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
@ -487,8 +487,8 @@ static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
return -1;
}
static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
size_t pollset_count,
static void tcp_server_start(grpc_tcp_server* s,
const std::vector<grpc_pollset*>* pollsets,
grpc_tcp_server_cb on_accept_cb,
void* on_accept_cb_arg) {
size_t i;
@ -500,15 +500,14 @@ static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets;
s->pollset_count = pollset_count;
sp = s->head;
while (sp != nullptr) {
if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
pollset_count > 1) {
pollsets->size() > 1) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
"clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
for (i = 0; i < pollsets->size(); i++) {
grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
@ -516,8 +515,8 @@ static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
sp = sp->next;
}
} else {
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
for (i = 0; i < pollsets->size(); i++) {
grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
}
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
@ -594,9 +593,9 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
read_notifier_pollset =
s_->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s_->next_pollset_to_assign, 1)) %
s_->pollset_count];
(*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s_->next_pollset_to_assign, 1)) %
s_->pollsets->size()];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));

@ -82,10 +82,9 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure* shutdown_complete;
/* all pollsets interested in new connections */
grpc_pollset** pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
/* all pollsets interested in new connections. The object pointed at is not
* owned by this struct */
const std::vector<grpc_pollset*>* pollsets;
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;

@ -27,6 +27,8 @@
#include <inttypes.h>
#include <io.h>
#include <vector>
#include "absl/strings/str_cat.h"
#include <grpc/support/alloc.h>
@ -518,8 +520,8 @@ done:
return error;
}
static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
size_t pollset_count,
static void tcp_server_start(grpc_tcp_server* s,
const std::vector<grpc_pollset*>* /*pollsets*/,
grpc_tcp_server_cb on_accept_cb,
void* on_accept_cb_arg) {
grpc_tcp_listener* sp;

@ -45,6 +45,7 @@
#include <unistd.h>
#include <string>
#include <vector>
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_cat.h"
@ -77,7 +78,7 @@ class GrpcUdpListener {
~GrpcUdpListener();
/* Called when grpc server starts to listening on the grpc_fd. */
void StartListening(grpc_pollset** pollsets, size_t pollset_count,
void StartListening(const std::vector<grpc_pollset*>* pollsets,
GrpcUdpHandlerFactory* handler_factory);
/* Called when data is available to read from the socket.
@ -185,10 +186,9 @@ struct grpc_udp_server {
/* shutdown callback */
grpc_closure* shutdown_complete;
/* all pollsets interested in new connections */
grpc_pollset** pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
/* all pollsets interested in new connections. The object pointed at is not
* owned by this struct. */
const std::vector<grpc_pollset*>* pollsets;
/* opaque object to pass to callbacks */
void* user_data;
@ -282,7 +282,7 @@ static void deactivated_all_ports(grpc_udp_server* s) {
GPR_ASSERT(s->shutdown);
if (s->listeners.size() == 0) {
if (s->listeners.empty()) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
return;
@ -701,29 +701,29 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
return s->listeners[port_index].fd();
}
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
void grpc_udp_server_start(grpc_udp_server* udp_server,
const std::vector<grpc_pollset*>* pollsets,
void* user_data) {
gpr_log(GPR_DEBUG, "grpc_udp_server_start");
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
s->user_data = user_data;
gpr_mu_lock(&udp_server->mu);
GPR_ASSERT(udp_server->active_ports == 0);
udp_server->pollsets = pollsets;
udp_server->user_data = user_data;
for (size_t i = 0; i < s->listeners.size(); ++i) {
s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory);
for (auto& listener : udp_server->listeners) {
listener.StartListening(pollsets, udp_server->handler_factory);
}
gpr_mu_unlock(&s->mu);
gpr_mu_unlock(&udp_server->mu);
}
void GrpcUdpListener::StartListening(grpc_pollset** pollsets,
size_t pollset_count,
void GrpcUdpListener::StartListening(const std::vector<grpc_pollset*>* pollsets,
GrpcUdpHandlerFactory* handler_factory) {
gpr_mu_lock(&mutex_);
handler_factory_ = handler_factory;
udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data);
for (size_t i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], emfd_);
for (grpc_pollset* pollset : *pollsets) {
grpc_pollset_add_fd(pollset, emfd_);
}
GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(emfd_, &read_closure_);

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include <vector>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -72,8 +74,9 @@ class GrpcUdpHandlerFactory {
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args);
/* Start listening to bound ports. user_data is passed to callbacks. */
void grpc_udp_server_start(grpc_udp_server* udp_server, grpc_pollset** pollsets,
size_t pollset_count, void* user_data);
void grpc_udp_server_start(grpc_udp_server* udp_server,
const std::vector<grpc_pollset*>* pollsets,
void* user_data);
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);

File diff suppressed because it is too large Load Diff

@ -41,9 +41,10 @@ class ServerListenerInterface : public Orphanable {
public:
virtual ~ServerListenerInterface() = default;
/// Starts listening.
virtual void Start(grpc_server* server, grpc_pollset** pollsets,
size_t npollsets) = 0;
/// Starts listening. This listener may refer to the pollset object beyond
/// this call, so it is a pointer rather than a reference.
virtual void Start(grpc_server* server,
const std::vector<grpc_pollset*>* pollsets) = 0;
/// Returns the channelz node for the listen socket, or null if not
/// supported.
@ -78,12 +79,12 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server);
bool grpc_server_has_open_connections(grpc_server* server);
/* Do not call this before grpc_server_start. Returns the pollsets and the
* number of pollsets via 'pollsets' and 'pollset_count'. */
void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
size_t* pollset_count);
// Do not call this before grpc_server_start. Returns the pollsets. The vector
// itself is immutable, but the pollsets inside are mutable. The result is valid
// for the lifetime of the server.
const std::vector<grpc_pollset*>& grpc_server_get_pollsets(grpc_server* server);
namespace grpc_core {

@ -142,7 +142,7 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
grpc_slice_buffer_init(&state.outgoing_buffer);
state.tcp = tcp;
state.incoming_data_length = 0;
grpc_endpoint_add_to_pollset(tcp, server->pollset);
grpc_endpoint_add_to_pollset(tcp, server->pollset[0]);
grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}

@ -54,11 +54,7 @@
struct grpc_end2end_http_proxy {
grpc_end2end_http_proxy()
: server(nullptr),
channel_args(nullptr),
mu(nullptr),
pollset(nullptr),
combiner(nullptr) {
: server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) {
gpr_ref_init(&users, 1);
combiner = grpc_combiner_create();
}
@ -67,7 +63,7 @@ struct grpc_end2end_http_proxy {
grpc_tcp_server* server;
grpc_channel_args* channel_args;
gpr_mu* mu;
grpc_pollset* pollset;
std::vector<grpc_pollset*> pollset;
gpr_refcount users;
grpc_core::Combiner* combiner;
@ -568,7 +564,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
conn->proxy = proxy;
gpr_ref_init(&conn->refcount, 1);
conn->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset);
grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset[0]);
grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set);
grpc_slice_buffer_init(&conn->client_read_buffer);
grpc_slice_buffer_init(&conn->client_deferred_write_buffer);
@ -599,7 +595,7 @@ static void thread_main(void* arg) {
gpr_mu_lock(proxy->mu);
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(proxy->pollset, &worker,
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC));
gpr_mu_unlock(proxy->mu);
grpc_core::ExecCtx::Get()->Flush();
@ -631,9 +627,10 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(port == proxy_port);
// Start server.
proxy->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(proxy->pollset, &proxy->mu);
grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy);
auto* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(pollset, &proxy->mu);
proxy->pollset.push_back(pollset);
grpc_tcp_server_start(proxy->server, &proxy->pollset, on_accept, proxy);
// Start proxy thread.
proxy->thd = grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
@ -654,8 +651,8 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
grpc_tcp_server_shutdown_listeners(proxy->server);
grpc_tcp_server_unref(proxy->server);
grpc_channel_args_destroy(proxy->channel_args);
grpc_pollset_shutdown(proxy->pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset,
grpc_pollset_shutdown(proxy->pollset[0],
GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset[0],
grpc_schedule_on_exec_ctx));
GRPC_COMBINER_UNREF(proxy->combiner, "test");
delete proxy;

@ -172,7 +172,8 @@ static void test_no_op_with_start(void) {
grpc_tcp_server* s;
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
LOG_TEST("test_no_op_with_start");
grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
std::vector<grpc_pollset*> empty_pollset;
grpc_tcp_server_start(s, &empty_pollset, on_connect, nullptr);
grpc_tcp_server_unref(s);
}
@ -213,7 +214,8 @@ static void test_no_op_with_port_and_start(void) {
GRPC_ERROR_NONE &&
port > 0);
grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
std::vector<grpc_pollset*> empty_pollset;
grpc_tcp_server_start(s, &empty_pollset, on_connect, nullptr);
grpc_tcp_server_unref(s);
}
@ -344,7 +346,9 @@ static void test_connect(size_t num_connects,
svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
GPR_ASSERT(svr1_fd_count >= 1);
grpc_tcp_server_start(s, &g_pollset, 1, on_connect, nullptr);
std::vector<grpc_pollset*> test_pollset;
test_pollset.push_back(g_pollset);
grpc_tcp_server_start(s, &test_pollset, on_connect, nullptr);
if (dst_addrs != nullptr) {
int ports[] = {svr_port, svr1_port};

@ -28,6 +28,8 @@
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -198,7 +200,8 @@ static void test_no_op_with_start(void) {
grpc_core::ExecCtx exec_ctx;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
LOG_TEST("test_no_op_with_start");
grpc_udp_server_start(s, nullptr, 0, nullptr);
std::vector<grpc_pollset*> empty_pollset;
grpc_udp_server_start(s, &empty_pollset, nullptr);
grpc_udp_server_destroy(s, nullptr);
shutdown_and_destroy_pollset();
}
@ -280,7 +283,8 @@ static void test_no_op_with_port_and_start(void) {
snd_buf_size, &handler_factory,
g_num_listeners) > 0);
grpc_udp_server_start(s, nullptr, 0, nullptr);
std::vector<grpc_pollset*> empty_pollset;
grpc_udp_server_start(s, &empty_pollset, nullptr);
GPR_ASSERT(g_number_of_starts == g_num_listeners);
grpc_udp_server_destroy(s, nullptr);
@ -300,7 +304,6 @@ static void test_receive(int number_of_clients) {
grpc_udp_server* s = grpc_udp_server_create(nullptr);
int i;
grpc_millis deadline;
grpc_pollset* pollsets[1];
LOG_TEST("test_receive");
gpr_log(GPR_INFO, "clients=%d", number_of_clients);
@ -320,8 +323,9 @@ static void test_receive(int number_of_clients) {
(socklen_t*)&resolved_addr.len) == 0);
GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
pollsets[0] = g_pollset;
grpc_udp_server_start(s, pollsets, 1, nullptr);
std::vector<grpc_pollset*> test_pollsets;
test_pollsets.emplace_back(g_pollset);
grpc_udp_server_start(s, &test_pollsets, nullptr);
gpr_mu_lock(g_mu);

@ -24,6 +24,7 @@
#include <memory.h>
#include <stdio.h>
#include <atomic>
#include <string>
@ -93,19 +94,20 @@ void create_loop_destroy(void* addr) {
}
}
struct server_thread_args {
// Always stack-allocate or new ServerThreadArgs; never use gpr_malloc since
// this contains C++ objects.
struct ServerThreadArgs {
std::string addr;
grpc_server* server = nullptr;
grpc_completion_queue* cq = nullptr;
grpc_pollset* pollset = nullptr;
std::vector<grpc_pollset*> pollset;
gpr_mu* mu = nullptr;
gpr_event ready;
gpr_atm stop = 0;
std::atomic_bool stop{false};
};
void server_thread(void* vargs) {
struct server_thread_args* args =
static_cast<struct server_thread_args*>(vargs);
struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
grpc_event ev;
gpr_timespec deadline =
grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
@ -118,19 +120,18 @@ static void on_connect(void* vargs, grpc_endpoint* tcp,
grpc_pollset* /*accepting_pollset*/,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
struct server_thread_args* args =
static_cast<struct server_thread_args*>(vargs);
struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
grpc_endpoint_shutdown(tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
grpc_endpoint_destroy(tcp);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(args->pollset[0], nullptr));
gpr_mu_unlock(args->mu);
}
void bad_server_thread(void* vargs) {
struct server_thread_args* args =
static_cast<struct server_thread_args*>(vargs);
struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
@ -146,18 +147,18 @@ void bad_server_thread(void* vargs) {
GPR_ASSERT(port > 0);
args->addr = absl::StrCat("localhost:", port);
grpc_tcp_server_start(s, &args->pollset, 1, on_connect, args);
grpc_tcp_server_start(s, &args->pollset, on_connect, args);
gpr_event_set(&args->ready, (void*)1);
gpr_mu_lock(args->mu);
while (gpr_atm_acq_load(&args->stop) == 0) {
while (args->stop.load(std::memory_order_acquire) == false) {
grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100;
grpc_pollset_worker* worker = nullptr;
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(args->pollset, &worker, deadline))) {
gpr_atm_rel_store(&args->stop, 1);
grpc_pollset_work(args->pollset[0], &worker, deadline))) {
args->stop.store(true, std::memory_order_release);
}
gpr_mu_unlock(args->mu);
@ -174,7 +175,7 @@ static void done_pollset_shutdown(void* pollset, grpc_error* /*error*/) {
}
int run_concurrent_connectivity_test() {
struct server_thread_args args;
struct ServerThreadArgs args;
grpc_init();
@ -225,8 +226,9 @@ int run_concurrent_connectivity_test() {
{
/* Third round, bogus tcp server */
gpr_log(GPR_DEBUG, "Wave 3");
args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args.pollset, &args.mu);
auto* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(pollset, &args.mu);
args.pollset.push_back(pollset);
gpr_event_init(&args.ready);
grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
server3.Start();
@ -242,13 +244,14 @@ int run_concurrent_connectivity_test() {
th.Join();
}
gpr_atm_rel_store(&args.stop, 1);
args.stop.store(true, std::memory_order_release);
server3.Join();
{
grpc_core::ExecCtx exec_ctx;
grpc_pollset_shutdown(
args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset,
grpc_schedule_on_exec_ctx));
args.pollset[0],
GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset[0],
grpc_schedule_on_exec_ctx));
}
}

@ -36,18 +36,19 @@
static void on_server_destroyed(void* data, grpc_error* /*error*/) {
test_tcp_server* server = static_cast<test_tcp_server*>(data);
server->shutdown = 1;
server->shutdown = true;
}
void test_tcp_server_init(test_tcp_server* server,
grpc_tcp_server_cb on_connect, void* user_data) {
grpc_init();
server->tcp_server = nullptr;
GRPC_CLOSURE_INIT(&server->shutdown_complete, on_server_destroyed, server,
grpc_schedule_on_exec_ctx);
server->shutdown = 0;
server->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(server->pollset, &server->mu);
grpc_pollset* pollset =
static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(pollset, &server->mu);
server->pollset.push_back(pollset);
server->on_connect = on_connect;
server->cb_data = user_data;
}
@ -71,7 +72,7 @@ void test_tcp_server_start(test_tcp_server* server, int port) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(port_added == port);
grpc_tcp_server_start(server->tcp_server, &server->pollset, 1,
grpc_tcp_server_start(server->tcp_server, &server->pollset,
server->on_connect, server->cb_data);
gpr_log(GPR_INFO, "test tcp server listening on 0.0.0.0:%d", port);
}
@ -83,7 +84,7 @@ void test_tcp_server_poll(test_tcp_server* server, int milliseconds) {
grpc_timeout_milliseconds_to_deadline(milliseconds));
gpr_mu_lock(server->mu);
GRPC_LOG_IF_ERROR("pollset_work",
grpc_pollset_work(server->pollset, &worker, deadline));
grpc_pollset_work(server->pollset[0], &worker, deadline));
gpr_mu_unlock(server->mu);
}
@ -106,10 +107,10 @@ void test_tcp_server_destroy(test_tcp_server* server) {
gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) {
test_tcp_server_poll(server, 1000);
}
grpc_pollset_shutdown(server->pollset,
GRPC_CLOSURE_CREATE(finish_pollset, server->pollset,
grpc_pollset_shutdown(server->pollset[0],
GRPC_CLOSURE_CREATE(finish_pollset, server->pollset[0],
grpc_schedule_on_exec_ctx));
grpc_core::ExecCtx::Get()->Flush();
gpr_free(server->pollset);
gpr_free(server->pollset[0]);
grpc_shutdown();
}

@ -19,18 +19,24 @@
#ifndef GRPC_TEST_CORE_UTIL_TEST_TCP_SERVER_H
#define GRPC_TEST_CORE_UTIL_TEST_TCP_SERVER_H
#include <vector>
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/tcp_server.h"
typedef struct test_tcp_server {
grpc_tcp_server* tcp_server;
// test_tcp_server should be stack-allocated or new'ed, never gpr_malloc'ed
// since it contains C++ objects.
struct test_tcp_server {
grpc_tcp_server* tcp_server = nullptr;
grpc_closure shutdown_complete;
int shutdown;
bool shutdown = false;
// mu is filled in by grpc_pollset_init and controlls the pollset.
// TODO: Switch this to a Mutex once pollset_init can provide a Mutex
gpr_mu* mu;
grpc_pollset* pollset;
std::vector<grpc_pollset*> pollset;
grpc_tcp_server_cb on_connect;
void* cb_data;
} test_tcp_server;
};
void test_tcp_server_init(test_tcp_server* server,
grpc_tcp_server_cb on_connect, void* user_data);

@ -178,12 +178,9 @@ class EndpointPairFixture : public BaseFixture {
server_transport_ = grpc_create_chttp2_transport(
server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets);
for (size_t i = 0; i < num_pollsets; i++) {
grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]);
for (grpc_pollset* pollset :
grpc_server_get_pollsets(server_->c_server())) {
grpc_endpoint_add_to_pollset(endpoints.server, pollset);
}
grpc_server_setup_transport(server_->c_server(), server_transport_,

@ -75,12 +75,9 @@ class EndpointPairFixture {
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets);
for (size_t i = 0; i < num_pollsets; i++) {
grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]);
for (grpc_pollset* pollset :
grpc_server_get_pollsets(server_->c_server())) {
grpc_endpoint_add_to_pollset(endpoints.server, pollset);
}
grpc_server_setup_transport(server_->c_server(), transport, nullptr,

Loading…
Cancel
Save