mirror of https://github.com/grpc/grpc.git
Remove UDP code - it's unused (#27595)
* Remove UDP code - its unused * Automated change: Fix sanity tests Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/27684/head
parent
0c6586523f
commit
a21ad2c296
19 changed files with 0 additions and 1349 deletions
@ -1,747 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ |
||||
#ifndef _GNU_SOURCE |
||||
#define _GNU_SOURCE |
||||
#endif |
||||
|
||||
#ifndef SO_RXQ_OVFL |
||||
#define SO_RXQ_OVFL 40 |
||||
#endif |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/port.h" |
||||
|
||||
#ifdef GRPC_POSIX_SOCKET_UDP_SERVER |
||||
|
||||
#include <errno.h> |
||||
#include <fcntl.h> |
||||
#include <limits.h> |
||||
#include <netinet/in.h> |
||||
#include <netinet/tcp.h> |
||||
#include <string.h> |
||||
#include <sys/socket.h> |
||||
#include <sys/stat.h> |
||||
#include <sys/types.h> |
||||
#include <unistd.h> |
||||
|
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/container/inlined_vector.h" |
||||
#include "absl/strings/str_cat.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/sync.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gpr/string.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/ev_posix.h" |
||||
#include "src/core/lib/iomgr/executor.h" |
||||
#include "src/core/lib/iomgr/resolve_address.h" |
||||
#include "src/core/lib/iomgr/sockaddr.h" |
||||
#include "src/core/lib/iomgr/socket_factory_posix.h" |
||||
#include "src/core/lib/iomgr/socket_utils_posix.h" |
||||
#include "src/core/lib/iomgr/udp_server.h" |
||||
#include "src/core/lib/iomgr/unix_sockets_posix.h" |
||||
|
||||
/* A listener which implements basic features of Listening on a port for
|
||||
* I/O events*/ |
||||
class GrpcUdpListener { |
||||
public: |
||||
GrpcUdpListener(grpc_udp_server* server, int fd, |
||||
const grpc_resolved_address* addr); |
||||
~GrpcUdpListener(); |
||||
|
||||
/* Called when grpc server starts to listening on the grpc_fd. */ |
||||
void StartListening(const std::vector<grpc_pollset*>* pollsets, |
||||
GrpcUdpHandlerFactory* handler_factory); |
||||
|
||||
/* Called when data is available to read from the socket.
|
||||
* Return true if there is more data to read from fd. */ |
||||
void OnRead(grpc_error_handle error, void* do_read_arg); |
||||
|
||||
/* Called when the socket is writeable. The given closure should be scheduled
|
||||
* when the socket becomes blocked next time. */ |
||||
void OnCanWrite(grpc_error_handle error, void* do_write_arg); |
||||
|
||||
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ |
||||
void OnFdAboutToOrphan(); |
||||
|
||||
/* Called to orphan fd of this listener.*/ |
||||
void OrphanFd(); |
||||
|
||||
/* Called when this listener is going to be destroyed. */ |
||||
void OnDestroy(); |
||||
|
||||
int fd() const { return fd_; } |
||||
|
||||
protected: |
||||
grpc_fd* emfd() const { return emfd_; } |
||||
|
||||
gpr_mu* mutex() { return &mutex_; } |
||||
|
||||
private: |
||||
/* event manager callback when reads are ready */ |
||||
static void on_read(void* arg, grpc_error_handle error); |
||||
static void on_write(void* arg, grpc_error_handle error); |
||||
|
||||
static void do_read(void* arg, grpc_error_handle error); |
||||
static void do_write(void* arg, grpc_error_handle error); |
||||
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback
|
||||
// interface.
|
||||
static void fd_notify_on_write_wrapper(void* arg, grpc_error_handle error); |
||||
|
||||
static void shutdown_fd(void* args, grpc_error_handle error); |
||||
|
||||
int fd_; |
||||
grpc_fd* emfd_; |
||||
grpc_udp_server* server_; |
||||
grpc_resolved_address addr_; |
||||
grpc_closure read_closure_; |
||||
grpc_closure write_closure_; |
||||
// To be called when corresponding QuicGrpcServer closes all active
|
||||
// connections.
|
||||
grpc_closure orphan_fd_closure_; |
||||
grpc_closure destroyed_closure_; |
||||
// To be scheduled on another thread to actually read/write.
|
||||
grpc_closure do_read_closure_; |
||||
grpc_closure do_write_closure_; |
||||
grpc_closure notify_on_write_closure_; |
||||
// True if orphan_cb is trigered.
|
||||
bool orphan_notified_; |
||||
// True if grpc_fd_notify_on_write() is called after on_write() call.
|
||||
bool notify_on_write_armed_; |
||||
// True if fd has been shutdown.
|
||||
bool already_shutdown_; |
||||
// Object actually handles I/O events. Assigned in StartListening().
|
||||
GrpcUdpHandler* udp_handler_ = nullptr; |
||||
// To be notified on destruction.
|
||||
GrpcUdpHandlerFactory* handler_factory_ = nullptr; |
||||
// Required to access above fields.
|
||||
gpr_mu mutex_; |
||||
}; |
||||
|
||||
GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, |
||||
const grpc_resolved_address* addr) |
||||
: fd_(fd), |
||||
server_(server), |
||||
orphan_notified_(false), |
||||
already_shutdown_(false) { |
||||
std::string addr_str = grpc_sockaddr_to_string(addr, true); |
||||
std::string name = absl::StrCat("udp-server-listener:", addr_str); |
||||
emfd_ = grpc_fd_create(fd, name.c_str(), true); |
||||
memcpy(&addr_, addr, sizeof(grpc_resolved_address)); |
||||
GPR_ASSERT(emfd_); |
||||
gpr_mu_init(&mutex_); |
||||
} |
||||
|
||||
GrpcUdpListener::~GrpcUdpListener() { gpr_mu_destroy(&mutex_); } |
||||
|
||||
/* the overall server */ |
||||
struct grpc_udp_server { |
||||
gpr_mu mu; |
||||
|
||||
/* factory to use for creating and binding sockets, or NULL */ |
||||
grpc_socket_factory* socket_factory; |
||||
|
||||
/* active port count: how many ports are actually still listening */ |
||||
size_t active_ports; |
||||
/* destroyed port count: how many ports are completely destroyed */ |
||||
size_t destroyed_ports; |
||||
|
||||
/* is this server shutting down? (boolean) */ |
||||
int shutdown; |
||||
|
||||
/* An array of listeners */ |
||||
absl::InlinedVector<GrpcUdpListener, 16> listeners; |
||||
|
||||
/* factory for use to create udp listeners */ |
||||
GrpcUdpHandlerFactory* handler_factory; |
||||
|
||||
/* shutdown callback */ |
||||
grpc_closure* shutdown_complete; |
||||
|
||||
/* all pollsets interested in new connections. The object pointed at is not
|
||||
* owned by this struct. */ |
||||
const std::vector<grpc_pollset*>* pollsets; |
||||
/* opaque object to pass to callbacks */ |
||||
void* user_data; |
||||
|
||||
/* latch has_so_reuseport during server creation */ |
||||
bool so_reuseport; |
||||
}; |
||||
|
||||
static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { |
||||
if (args) { |
||||
const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); |
||||
if (arg) { |
||||
GPR_ASSERT(arg->type == GRPC_ARG_POINTER); |
||||
return static_cast<grpc_socket_factory*>(arg->value.pointer.p); |
||||
} |
||||
} |
||||
return nullptr; |
||||
} |
||||
|
||||
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { |
||||
grpc_udp_server* s = new grpc_udp_server(); |
||||
gpr_mu_init(&s->mu); |
||||
s->socket_factory = get_socket_factory(args); |
||||
if (s->socket_factory) { |
||||
grpc_socket_factory_ref(s->socket_factory); |
||||
} |
||||
s->active_ports = 0; |
||||
s->destroyed_ports = 0; |
||||
s->shutdown = 0; |
||||
s->so_reuseport = grpc_is_socket_reuse_port_supported(); |
||||
return s; |
||||
} |
||||
|
||||
// static
|
||||
void GrpcUdpListener::shutdown_fd(void* args, grpc_error_handle error) { |
||||
if (args == nullptr) { |
||||
// No-op if shutdown args are null.
|
||||
return; |
||||
} |
||||
auto sp = static_cast<GrpcUdpListener*>(args); |
||||
gpr_mu_lock(sp->mutex()); |
||||
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_); |
||||
grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error)); |
||||
sp->already_shutdown_ = true; |
||||
if (!sp->notify_on_write_armed_) { |
||||
// Re-arm write notification to notify listener with error. This is
|
||||
// necessary to decrement active_ports.
|
||||
sp->notify_on_write_armed_ = true; |
||||
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
||||
} |
||||
gpr_mu_unlock(sp->mutex()); |
||||
} |
||||
|
||||
static void finish_shutdown(grpc_udp_server* s) { |
||||
if (s->shutdown_complete != nullptr) { |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
gpr_mu_destroy(&s->mu); |
||||
|
||||
gpr_log(GPR_DEBUG, "Destroy all listeners."); |
||||
for (size_t i = 0; i < s->listeners.size(); ++i) { |
||||
s->listeners[i].OnDestroy(); |
||||
} |
||||
|
||||
if (s->socket_factory) { |
||||
grpc_socket_factory_unref(s->socket_factory); |
||||
} |
||||
|
||||
delete s; |
||||
} |
||||
|
||||
static void destroyed_port(void* server, grpc_error_handle /*error*/) { |
||||
grpc_udp_server* s = static_cast<grpc_udp_server*>(server); |
||||
gpr_mu_lock(&s->mu); |
||||
s->destroyed_ports++; |
||||
if (s->destroyed_ports == s->listeners.size()) { |
||||
gpr_mu_unlock(&s->mu); |
||||
finish_shutdown(s); |
||||
} else { |
||||
gpr_mu_unlock(&s->mu); |
||||
} |
||||
} |
||||
|
||||
/* called when all listening endpoints have been shutdown, so no further
|
||||
events will be received on them - at this point it's safe to destroy |
||||
things */ |
||||
static void deactivated_all_ports(grpc_udp_server* s) { |
||||
/* delete ALL the things */ |
||||
gpr_mu_lock(&s->mu); |
||||
|
||||
GPR_ASSERT(s->shutdown); |
||||
|
||||
if (s->listeners.empty()) { |
||||
gpr_mu_unlock(&s->mu); |
||||
finish_shutdown(s); |
||||
return; |
||||
} |
||||
for (size_t i = 0; i < s->listeners.size(); ++i) { |
||||
s->listeners[i].OrphanFd(); |
||||
} |
||||
gpr_mu_unlock(&s->mu); |
||||
} |
||||
|
||||
void GrpcUdpListener::OrphanFd() { |
||||
gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_); |
||||
grpc_unlink_if_unix_domain_socket(&addr_); |
||||
|
||||
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, |
||||
grpc_schedule_on_exec_ctx); |
||||
/* Because at this point, all listening sockets have been shutdown already, no
|
||||
* need to call OnFdAboutToOrphan() to notify the handler again. */ |
||||
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown"); |
||||
} |
||||
|
||||
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { |
||||
gpr_mu_lock(&s->mu); |
||||
|
||||
GPR_ASSERT(!s->shutdown); |
||||
s->shutdown = 1; |
||||
|
||||
s->shutdown_complete = on_done; |
||||
|
||||
gpr_log(GPR_DEBUG, "start to destroy udp_server"); |
||||
/* shutdown all fd's */ |
||||
if (s->active_ports) { |
||||
for (size_t i = 0; i < s->listeners.size(); ++i) { |
||||
GrpcUdpListener* sp = &s->listeners[i]; |
||||
sp->OnFdAboutToOrphan(); |
||||
} |
||||
gpr_mu_unlock(&s->mu); |
||||
} else { |
||||
gpr_mu_unlock(&s->mu); |
||||
deactivated_all_ports(s); |
||||
} |
||||
} |
||||
|
||||
void GrpcUdpListener::OnFdAboutToOrphan() { |
||||
gpr_mu_lock(&mutex_); |
||||
grpc_unlink_if_unix_domain_socket(&addr_); |
||||
|
||||
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, |
||||
grpc_schedule_on_exec_ctx); |
||||
if (!orphan_notified_ && udp_handler_ != nullptr) { |
||||
/* Signals udp_handler that the FD is about to be closed and
|
||||
* should no longer be used. */ |
||||
GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_); |
||||
udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data); |
||||
orphan_notified_ = true; |
||||
} |
||||
gpr_mu_unlock(&mutex_); |
||||
} |
||||
|
||||
static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, |
||||
const grpc_resolved_address* addr) { |
||||
return (socket_factory != nullptr) |
||||
? grpc_socket_factory_bind(socket_factory, sockfd, addr) |
||||
: bind(sockfd, |
||||
reinterpret_cast<grpc_sockaddr*>( |
||||
const_cast<char*>(addr->addr)), |
||||
addr->len); |
||||
} |
||||
|
||||
/* Prepare a recently-created socket for listening. */ |
||||
static int prepare_socket(grpc_socket_factory* socket_factory, int fd, |
||||
const grpc_resolved_address* addr, int rcv_buf_size, |
||||
int snd_buf_size, bool so_reuseport) { |
||||
grpc_resolved_address sockname_temp; |
||||
grpc_sockaddr* addr_ptr = |
||||
reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)); |
||||
|
||||
if (fd < 0) { |
||||
goto error; |
||||
} |
||||
|
||||
if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno)); |
||||
goto error; |
||||
} |
||||
if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno)); |
||||
goto error; |
||||
} |
||||
|
||||
if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Unable to set ip_pktinfo."); |
||||
goto error; |
||||
} else if (addr_ptr->sa_family == AF_INET6) { |
||||
if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo."); |
||||
goto error; |
||||
} |
||||
} |
||||
|
||||
if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", |
||||
snd_buf_size); |
||||
goto error; |
||||
} |
||||
|
||||
if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", |
||||
rcv_buf_size); |
||||
goto error; |
||||
} |
||||
|
||||
{ |
||||
int get_overflow = 1; |
||||
if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, |
||||
sizeof(get_overflow))) { |
||||
gpr_log(GPR_INFO, "Failed to set socket overflow support"); |
||||
} |
||||
} |
||||
|
||||
if (so_reuseport && !grpc_is_unix_socket(addr) && |
||||
grpc_set_socket_reuse_port(fd, 1) != GRPC_ERROR_NONE) { |
||||
gpr_log(GPR_ERROR, "Failed to set SO_REUSEPORT for fd %d", fd); |
||||
goto error; |
||||
} |
||||
|
||||
if (bind_socket(socket_factory, fd, addr) < 0) { |
||||
std::string addr_str = grpc_sockaddr_to_string(addr, false); |
||||
gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str.c_str(), strerror(errno)); |
||||
goto error; |
||||
} |
||||
|
||||
sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage)); |
||||
|
||||
if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr), |
||||
&sockname_temp.len) < 0) { |
||||
gpr_log(GPR_ERROR, "Unable to get the address socket %d is bound to: %s", |
||||
fd, strerror(errno)); |
||||
goto error; |
||||
} |
||||
|
||||
return grpc_sockaddr_get_port(&sockname_temp); |
||||
|
||||
error: |
||||
if (fd >= 0) { |
||||
close(fd); |
||||
} |
||||
return -1; |
||||
} |
||||
|
||||
// static
|
||||
void GrpcUdpListener::do_read(void* arg, grpc_error_handle error) { |
||||
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
||||
GPR_ASSERT(error == GRPC_ERROR_NONE); |
||||
/* TODO: the reason we hold server->mu here is merely to prevent fd
|
||||
* shutdown while we are reading. However, it blocks do_write(). Switch to |
||||
* read lock if available. */ |
||||
gpr_mu_lock(sp->mutex()); |
||||
/* Tell the registered callback that data is available to read. */ |
||||
if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { |
||||
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
|
||||
* after finishing this event loop. */ |
||||
grpc_core::Executor::Run(&sp->do_read_closure_, GRPC_ERROR_NONE, |
||||
grpc_core::ExecutorType::DEFAULT, |
||||
grpc_core::ExecutorJobType::LONG); |
||||
} else { |
||||
/* Finish reading all the packets, re-arm the notification event so we can
|
||||
* get another chance to read. Or fd already shutdown, re-arm to get a |
||||
* notification with shutdown error. */ |
||||
grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_); |
||||
} |
||||
gpr_mu_unlock(sp->mutex()); |
||||
} |
||||
|
||||
// static
|
||||
void GrpcUdpListener::on_read(void* arg, grpc_error_handle error) { |
||||
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
||||
sp->OnRead(error, arg); |
||||
} |
||||
|
||||
void GrpcUdpListener::OnRead(grpc_error_handle error, void* do_read_arg) { |
||||
if (error != GRPC_ERROR_NONE) { |
||||
gpr_mu_lock(&server_->mu); |
||||
if (0 == --server_->active_ports && server_->shutdown) { |
||||
gpr_mu_unlock(&server_->mu); |
||||
deactivated_all_ports(server_); |
||||
} else { |
||||
gpr_mu_unlock(&server_->mu); |
||||
} |
||||
return; |
||||
} |
||||
|
||||
/* Read once. If there is more data to read, off load the work to another
|
||||
* thread to finish. */ |
||||
if (udp_handler_->Read()) { |
||||
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
|
||||
* after finishing this event loop. */ |
||||
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, nullptr); |
||||
grpc_core::Executor::Run(&do_read_closure_, GRPC_ERROR_NONE, |
||||
grpc_core::ExecutorType::DEFAULT, |
||||
grpc_core::ExecutorJobType::LONG); |
||||
} else { |
||||
/* Finish reading all the packets, re-arm the notification event so we can
|
||||
* get another chance to read. Or fd already shutdown, re-arm to get a |
||||
* notification with shutdown error. */ |
||||
grpc_fd_notify_on_read(emfd_, &read_closure_); |
||||
} |
||||
} |
||||
|
||||
// static
|
||||
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
|
||||
void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, |
||||
grpc_error_handle /*error*/) { |
||||
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
||||
gpr_mu_lock(sp->mutex()); |
||||
if (!sp->notify_on_write_armed_) { |
||||
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
||||
sp->notify_on_write_armed_ = true; |
||||
} |
||||
gpr_mu_unlock(sp->mutex()); |
||||
} |
||||
|
||||
// static
|
||||
void GrpcUdpListener::do_write(void* arg, grpc_error_handle error) { |
||||
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
||||
gpr_mu_lock(sp->mutex()); |
||||
if (sp->already_shutdown_) { |
||||
// If fd has been shutdown, don't write any more and re-arm notification.
|
||||
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
||||
} else { |
||||
sp->notify_on_write_armed_ = false; |
||||
/* Tell the registered callback that the socket is writeable. */ |
||||
GPR_ASSERT(error == GRPC_ERROR_NONE); |
||||
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper, |
||||
arg, grpc_schedule_on_exec_ctx); |
||||
sp->udp_handler_->OnCanWrite(sp->server_->user_data, |
||||
&sp->notify_on_write_closure_); |
||||
} |
||||
gpr_mu_unlock(sp->mutex()); |
||||
} |
||||
|
||||
// static
|
||||
void GrpcUdpListener::on_write(void* arg, grpc_error_handle error) { |
||||
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
||||
sp->OnCanWrite(error, arg); |
||||
} |
||||
|
||||
void GrpcUdpListener::OnCanWrite(grpc_error_handle error, void* do_write_arg) { |
||||
if (error != GRPC_ERROR_NONE) { |
||||
gpr_mu_lock(&server_->mu); |
||||
if (0 == --server_->active_ports && server_->shutdown) { |
||||
gpr_mu_unlock(&server_->mu); |
||||
deactivated_all_ports(server_); |
||||
} else { |
||||
gpr_mu_unlock(&server_->mu); |
||||
} |
||||
return; |
||||
} |
||||
|
||||
/* Schedule actual write in another thread. */ |
||||
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, nullptr); |
||||
|
||||
grpc_core::Executor::Run(&do_write_closure_, GRPC_ERROR_NONE, |
||||
grpc_core::ExecutorType::DEFAULT, |
||||
grpc_core::ExecutorJobType::LONG); |
||||
} |
||||
|
||||
static int add_socket_to_server(grpc_udp_server* s, int fd, |
||||
const grpc_resolved_address* addr, |
||||
int rcv_buf_size, int snd_buf_size) { |
||||
gpr_log(GPR_DEBUG, "add socket %d to server", fd); |
||||
|
||||
int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, |
||||
snd_buf_size, s->so_reuseport); |
||||
if (port >= 0) { |
||||
gpr_mu_lock(&s->mu); |
||||
s->listeners.emplace_back(s, fd, addr); |
||||
gpr_log(GPR_DEBUG, |
||||
"add socket %d to server for port %d, %zu listener(s) in total", fd, |
||||
port, s->listeners.size()); |
||||
gpr_mu_unlock(&s->mu); |
||||
} |
||||
return port; |
||||
} |
||||
|
||||
int grpc_udp_server_add_port(grpc_udp_server* s, grpc_resolved_address* addr, |
||||
int rcv_buf_size, int snd_buf_size, |
||||
GrpcUdpHandlerFactory* handler_factory, |
||||
size_t num_listeners) { |
||||
if (num_listeners > 1 && !s->so_reuseport) { |
||||
gpr_log(GPR_ERROR, |
||||
"Try to have multiple listeners on same port, but SO_REUSEPORT is " |
||||
"not supported. Only create 1 listener."); |
||||
} |
||||
std::string addr_str = grpc_sockaddr_to_string(addr, true); |
||||
gpr_log(GPR_DEBUG, "add address: %s to server", addr_str.c_str()); |
||||
|
||||
int allocated_port1 = -1; |
||||
int allocated_port2 = -1; |
||||
int fd; |
||||
grpc_dualstack_mode dsmode; |
||||
grpc_resolved_address addr6_v4mapped; |
||||
grpc_resolved_address wild4; |
||||
grpc_resolved_address wild6; |
||||
grpc_resolved_address addr4_copy; |
||||
grpc_resolved_address* allocated_addr = nullptr; |
||||
grpc_resolved_address sockname_temp; |
||||
int port = 0; |
||||
|
||||
/* Check if this is a wildcard port, and if so, try to keep the port the same
|
||||
as some previously created listener. */ |
||||
if (grpc_sockaddr_get_port(addr) == 0) { |
||||
/* Loop through existing listeners to find the port in use. */ |
||||
for (size_t i = 0; i < s->listeners.size(); ++i) { |
||||
sockname_temp.len = |
||||
static_cast<socklen_t>(sizeof(struct sockaddr_storage)); |
||||
if (0 == getsockname(s->listeners[i].fd(), |
||||
reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr), |
||||
&sockname_temp.len)) { |
||||
port = grpc_sockaddr_get_port(&sockname_temp); |
||||
if (port > 0) { |
||||
/* Found such a port, update |addr| to reflects this port. */ |
||||
allocated_addr = static_cast<grpc_resolved_address*>( |
||||
gpr_malloc(sizeof(grpc_resolved_address))); |
||||
memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); |
||||
grpc_sockaddr_set_port(allocated_addr, port); |
||||
addr = allocated_addr; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { |
||||
addr = &addr6_v4mapped; |
||||
} |
||||
|
||||
s->handler_factory = handler_factory; |
||||
for (size_t i = 0; i < num_listeners; ++i) { |
||||
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ |
||||
if (grpc_sockaddr_is_wildcard(addr, &port)) { |
||||
grpc_sockaddr_make_wildcards(port, &wild4, &wild6); |
||||
|
||||
/* Try listening on IPv6 first. */ |
||||
addr = &wild6; |
||||
// TODO(rjshade): Test and propagate the returned grpc_error_handle:
|
||||
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( |
||||
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); |
||||
allocated_port1 = |
||||
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); |
||||
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { |
||||
if (port == 0) { |
||||
/* This is the first time to bind to |addr|. If its port is still
|
||||
* wildcard port, update |addr| with the ephermeral port returned by |
||||
* kernel. Thus |addr| can have a specific port in following |
||||
* iterations. */ |
||||
grpc_sockaddr_set_port(addr, allocated_port1); |
||||
port = allocated_port1; |
||||
} else if (allocated_port1 >= 0) { |
||||
/* The following successfully created socket should have same port as
|
||||
* the first one. */ |
||||
GPR_ASSERT(port == allocated_port1); |
||||
} |
||||
/* A dualstack socket is created, no need to create corresponding IPV4
|
||||
* socket. */ |
||||
continue; |
||||
} |
||||
|
||||
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ |
||||
if (port == 0 && allocated_port1 > 0) { |
||||
/* |port| hasn't been assigned to an emphemeral port yet, |wild4| must
|
||||
* have a wildcard port. Update it with the emphemeral port created |
||||
* during binding.*/ |
||||
grpc_sockaddr_set_port(&wild4, allocated_port1); |
||||
port = allocated_port1; |
||||
} |
||||
/* |wild4| should have been updated with an emphemeral port by now. Use
|
||||
* this IPV4 address to create a IPV4 socket. */ |
||||
addr = &wild4; |
||||
} |
||||
|
||||
// TODO(rjshade): Test and propagate the returned grpc_error_handle:
|
||||
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( |
||||
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); |
||||
if (fd < 0) { |
||||
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); |
||||
} |
||||
if (dsmode == GRPC_DSMODE_IPV4 && |
||||
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { |
||||
addr = &addr4_copy; |
||||
} |
||||
allocated_port2 = |
||||
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); |
||||
if (port == 0) { |
||||
/* Update |addr| with the ephermeral port returned by kernel. So |addr|
|
||||
* can have a specific port in following iterations. */ |
||||
grpc_sockaddr_set_port(addr, allocated_port2); |
||||
port = allocated_port2; |
||||
} else if (allocated_port2 >= 0) { |
||||
GPR_ASSERT(port == allocated_port2); |
||||
} |
||||
} |
||||
|
||||
gpr_free(allocated_addr); |
||||
return port; |
||||
} |
||||
|
||||
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { |
||||
if (port_index >= s->listeners.size()) { |
||||
return -1; |
||||
} |
||||
|
||||
return s->listeners[port_index].fd(); |
||||
} |
||||
|
||||
void grpc_udp_server_start(grpc_udp_server* udp_server, |
||||
const std::vector<grpc_pollset*>* pollsets, |
||||
void* user_data) { |
||||
gpr_log(GPR_DEBUG, "grpc_udp_server_start"); |
||||
gpr_mu_lock(&udp_server->mu); |
||||
GPR_ASSERT(udp_server->active_ports == 0); |
||||
udp_server->pollsets = pollsets; |
||||
udp_server->user_data = user_data; |
||||
|
||||
for (auto& listener : udp_server->listeners) { |
||||
listener.StartListening(pollsets, udp_server->handler_factory); |
||||
} |
||||
|
||||
gpr_mu_unlock(&udp_server->mu); |
||||
} |
||||
|
||||
void GrpcUdpListener::StartListening(const std::vector<grpc_pollset*>* pollsets, |
||||
GrpcUdpHandlerFactory* handler_factory) { |
||||
gpr_mu_lock(&mutex_); |
||||
handler_factory_ = handler_factory; |
||||
udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data); |
||||
for (grpc_pollset* pollset : *pollsets) { |
||||
grpc_pollset_add_fd(pollset, emfd_); |
||||
} |
||||
GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx); |
||||
grpc_fd_notify_on_read(emfd_, &read_closure_); |
||||
|
||||
GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx); |
||||
notify_on_write_armed_ = true; |
||||
grpc_fd_notify_on_write(emfd_, &write_closure_); |
||||
|
||||
/* Registered for both read and write callbacks: increment active_ports
|
||||
* twice to account for this, and delay free-ing of memory until both |
||||
* on_read and on_write have fired. */ |
||||
server_->active_ports += 2; |
||||
gpr_mu_unlock(&mutex_); |
||||
} |
||||
|
||||
void GrpcUdpListener::OnDestroy() { |
||||
if (udp_handler_ != nullptr) { |
||||
handler_factory_->DestroyUdpHandler(udp_handler_); |
||||
} |
||||
} |
||||
|
||||
#endif |
@ -1,103 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_UDP_SERVER_H |
||||
#define GRPC_CORE_LIB_IOMGR_UDP_SERVER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <vector> |
||||
|
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/ev_posix.h" |
||||
#include "src/core/lib/iomgr/resolve_address.h" |
||||
|
||||
/* Forward decl of struct grpc_server */ |
||||
/* This is not typedef'ed to avoid a typedef-redefinition error */ |
||||
struct grpc_server; |
||||
|
||||
/* Forward decl of grpc_udp_server */ |
||||
typedef struct grpc_udp_server grpc_udp_server; |
||||
|
||||
/* An interface associated with a socket. udp server delivers I/O event on that
|
||||
* socket to the subclass of this interface which is created through |
||||
* GrpcUdpHandlerFactory. |
||||
* Its implementation should do the real IO work, e.g. read packet and write. */ |
||||
class GrpcUdpHandler { |
||||
public: |
||||
GrpcUdpHandler(grpc_fd* /* emfd */, void* /* user_data */) {} |
||||
virtual ~GrpcUdpHandler() {} |
||||
|
||||
// Interfaces to be implemented by subclasses to do the actual setup/tear down
|
||||
// or I/O.
|
||||
|
||||
// Called when data is available to read from the socket. Returns true if
|
||||
// there is more data to read after this call.
|
||||
virtual bool Read() = 0; |
||||
// Called when socket becomes write unblocked. The given closure should be
|
||||
// scheduled when the socket becomes blocked next time.
|
||||
virtual void OnCanWrite(void* user_data, |
||||
grpc_closure* notify_on_write_closure) = 0; |
||||
// Called before the gRPC FD is orphaned. Notify udp server to continue
|
||||
// orphaning fd by scheduling the given closure, afterwards the associated fd
|
||||
// will be closed.
|
||||
virtual void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, |
||||
void* user_data) = 0; |
||||
}; |
||||
|
||||
class GrpcUdpHandlerFactory { |
||||
public: |
||||
virtual ~GrpcUdpHandlerFactory() {} |
||||
/* Called when start to listen on a socket.
|
||||
* Return an instance of the implementation of GrpcUdpHandler interface which |
||||
* will process I/O events for this socket from now on. */ |
||||
virtual GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) = 0; |
||||
virtual void DestroyUdpHandler(GrpcUdpHandler* handler) = 0; |
||||
}; |
||||
|
||||
/* Create a server, initially not bound to any ports */ |
||||
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args); |
||||
|
||||
/* Start listening to bound ports. user_data is passed to callbacks. */ |
||||
void grpc_udp_server_start(grpc_udp_server* udp_server, |
||||
const std::vector<grpc_pollset*>* pollsets, |
||||
void* user_data); |
||||
|
||||
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); |
||||
|
||||
/* Add a port to the server, returning port number on success, or negative
|
||||
on failure. |
||||
|
||||
Create |num_listeners| sockets for given address to listen on using |
||||
SO_REUSEPORT if supported. |
||||
|
||||
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting |
||||
both IPv4 and IPv6 connections, but :: is the preferred style. This usually |
||||
creates |num_listeners| sockets, but possibly 2 * |num_listeners| on systems |
||||
which support IPv6, but not dualstack sockets. */ |
||||
|
||||
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
|
||||
all of the multiple socket port matching logic in one place */ |
||||
int grpc_udp_server_add_port(grpc_udp_server* s, grpc_resolved_address* addr, |
||||
int rcv_buf_size, int snd_buf_size, |
||||
GrpcUdpHandlerFactory* handler_factory, |
||||
size_t num_listeners); |
||||
|
||||
void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */ |
@ -1,393 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/iomgr/port.h" |
||||
|
||||
// This test won't work except with posix sockets enabled
|
||||
#ifdef GRPC_POSIX_SOCKET_UDP_SERVER |
||||
|
||||
#include <netinet/in.h> |
||||
#include <string.h> |
||||
#include <sys/socket.h> |
||||
#include <unistd.h> |
||||
|
||||
#include <vector> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
#include "src/core/lib/iomgr/ev_posix.h" |
||||
#include "src/core/lib/iomgr/iomgr.h" |
||||
#include "src/core/lib/iomgr/socket_factory_posix.h" |
||||
#include "src/core/lib/iomgr/socket_utils_posix.h" |
||||
#include "src/core/lib/iomgr/udp_server.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) |
||||
|
||||
static grpc_pollset* g_pollset; |
||||
static gpr_mu* g_mu; |
||||
static int g_number_of_reads = 0; |
||||
static int g_number_of_writes = 0; |
||||
static int g_number_of_bytes_read = 0; |
||||
static int g_number_of_orphan_calls = 0; |
||||
static int g_number_of_starts = 0; |
||||
|
||||
int rcv_buf_size = 1024; |
||||
int snd_buf_size = 1024; |
||||
|
||||
static int g_num_listeners = 1; |
||||
|
||||
class TestGrpcUdpHandler : public GrpcUdpHandler { |
||||
public: |
||||
TestGrpcUdpHandler(grpc_fd* emfd, void* user_data) |
||||
: GrpcUdpHandler(emfd, user_data), emfd_(emfd) { |
||||
g_number_of_starts++; |
||||
} |
||||
~TestGrpcUdpHandler() override {} |
||||
|
||||
protected: |
||||
bool Read() override { |
||||
char read_buffer[512]; |
||||
ssize_t byte_count; |
||||
|
||||
gpr_mu_lock(g_mu); |
||||
byte_count = |
||||
recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0); |
||||
|
||||
g_number_of_reads++; |
||||
g_number_of_bytes_read += static_cast<int>(byte_count); |
||||
|
||||
gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this); |
||||
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", |
||||
grpc_pollset_kick(g_pollset, nullptr))); |
||||
gpr_mu_unlock(g_mu); |
||||
return false; |
||||
} |
||||
|
||||
void OnCanWrite(void* /*user_data*/, |
||||
grpc_closure* /*notify_on_write_closure*/) override { |
||||
gpr_mu_lock(g_mu); |
||||
g_number_of_writes++; |
||||
|
||||
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", |
||||
grpc_pollset_kick(g_pollset, nullptr))); |
||||
gpr_mu_unlock(g_mu); |
||||
} |
||||
|
||||
void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, |
||||
void* /*user_data*/) override { |
||||
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", |
||||
grpc_fd_wrapped_fd(emfd())); |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, orphan_fd_closure, GRPC_ERROR_NONE); |
||||
g_number_of_orphan_calls++; |
||||
} |
||||
|
||||
grpc_fd* emfd() { return emfd_; } |
||||
|
||||
private: |
||||
grpc_fd* emfd_; |
||||
}; |
||||
|
||||
class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory { |
||||
public: |
||||
GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override { |
||||
gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd)); |
||||
return new TestGrpcUdpHandler(emfd, user_data); |
||||
} |
||||
|
||||
void DestroyUdpHandler(GrpcUdpHandler* handler) override { |
||||
gpr_log(GPR_INFO, "Destroy handler"); |
||||
delete reinterpret_cast<TestGrpcUdpHandler*>(handler); |
||||
} |
||||
}; |
||||
|
||||
TestGrpcUdpHandlerFactory handler_factory; |
||||
|
||||
struct test_socket_factory { |
||||
grpc_socket_factory base; |
||||
int number_of_socket_calls; |
||||
int number_of_bind_calls; |
||||
}; |
||||
typedef struct test_socket_factory test_socket_factory; |
||||
|
||||
static int test_socket_factory_socket(grpc_socket_factory* factory, int domain, |
||||
int type, int protocol) { |
||||
test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory); |
||||
f->number_of_socket_calls++; |
||||
return socket(domain, type, protocol); |
||||
} |
||||
|
||||
static int test_socket_factory_bind(grpc_socket_factory* factory, int sockfd, |
||||
const grpc_resolved_address* addr) { |
||||
test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory); |
||||
f->number_of_bind_calls++; |
||||
return bind(sockfd, |
||||
reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)), |
||||
static_cast<socklen_t>(addr->len)); |
||||
} |
||||
|
||||
static int test_socket_factory_compare(grpc_socket_factory* a, |
||||
grpc_socket_factory* b) { |
||||
return grpc_core::QsortCompare(a, b); |
||||
} |
||||
|
||||
static void test_socket_factory_destroy(grpc_socket_factory* factory) { |
||||
test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory); |
||||
gpr_free(f); |
||||
} |
||||
|
||||
static const grpc_socket_factory_vtable test_socket_factory_vtable = { |
||||
test_socket_factory_socket, test_socket_factory_bind, |
||||
test_socket_factory_compare, test_socket_factory_destroy}; |
||||
|
||||
static test_socket_factory* test_socket_factory_create(void) { |
||||
test_socket_factory* factory = static_cast<test_socket_factory*>( |
||||
gpr_malloc(sizeof(test_socket_factory))); |
||||
grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable); |
||||
factory->number_of_socket_calls = 0; |
||||
factory->number_of_bind_calls = 0; |
||||
return factory; |
||||
} |
||||
|
||||
static void destroy_pollset(void* p, grpc_error_handle /*error*/) { |
||||
grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); |
||||
} |
||||
|
||||
static void shutdown_and_destroy_pollset() { |
||||
gpr_mu_lock(g_mu); |
||||
auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset, |
||||
grpc_schedule_on_exec_ctx); |
||||
grpc_pollset_shutdown(g_pollset, closure); |
||||
gpr_mu_unlock(g_mu); |
||||
/* Flush exec_ctx to run |destroyed| */ |
||||
grpc_core::ExecCtx::Get()->Flush(); |
||||
} |
||||
|
||||
static void test_no_op(void) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_udp_server* s = grpc_udp_server_create(nullptr); |
||||
LOG_TEST("test_no_op"); |
||||
grpc_udp_server_destroy(s, nullptr); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
static void test_no_op_with_start(void) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_udp_server* s = grpc_udp_server_create(nullptr); |
||||
LOG_TEST("test_no_op_with_start"); |
||||
std::vector<grpc_pollset*> empty_pollset; |
||||
grpc_udp_server_start(s, &empty_pollset, nullptr); |
||||
grpc_udp_server_destroy(s, nullptr); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
static void test_no_op_with_port(void) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
g_number_of_orphan_calls = 0; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_resolved_address resolved_addr; |
||||
struct sockaddr_in* addr = |
||||
reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr); |
||||
grpc_udp_server* s = grpc_udp_server_create(nullptr); |
||||
LOG_TEST("test_no_op_with_port"); |
||||
|
||||
memset(&resolved_addr, 0, sizeof(resolved_addr)); |
||||
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in)); |
||||
addr->sin_family = AF_INET; |
||||
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, |
||||
snd_buf_size, &handler_factory, |
||||
g_num_listeners) > 0); |
||||
|
||||
grpc_udp_server_destroy(s, nullptr); |
||||
|
||||
/* The server haven't start listening, so no udp handler to be notified. */ |
||||
GPR_ASSERT(g_number_of_orphan_calls == 0); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
static void test_no_op_with_port_and_socket_factory(void) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
g_number_of_orphan_calls = 0; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_resolved_address resolved_addr; |
||||
struct sockaddr_in* addr = |
||||
reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr); |
||||
|
||||
test_socket_factory* socket_factory = test_socket_factory_create(); |
||||
grpc_arg socket_factory_arg = |
||||
grpc_socket_factory_to_arg(&socket_factory->base); |
||||
grpc_channel_args* channel_args = |
||||
grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1); |
||||
grpc_udp_server* s = grpc_udp_server_create(channel_args); |
||||
grpc_channel_args_destroy(channel_args); |
||||
|
||||
LOG_TEST("test_no_op_with_port_and_socket_factory"); |
||||
|
||||
memset(&resolved_addr, 0, sizeof(resolved_addr)); |
||||
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in)); |
||||
addr->sin_family = AF_INET; |
||||
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, |
||||
snd_buf_size, &handler_factory, |
||||
g_num_listeners) > 0); |
||||
GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners); |
||||
GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners); |
||||
|
||||
grpc_udp_server_destroy(s, nullptr); |
||||
|
||||
grpc_socket_factory_unref(&socket_factory->base); |
||||
|
||||
/* The server haven't start listening, so no udp handler to be notified. */ |
||||
GPR_ASSERT(g_number_of_orphan_calls == 0); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
static void test_no_op_with_port_and_start(void) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
g_number_of_orphan_calls = 0; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_resolved_address resolved_addr; |
||||
struct sockaddr_in* addr = |
||||
reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr); |
||||
grpc_udp_server* s = grpc_udp_server_create(nullptr); |
||||
LOG_TEST("test_no_op_with_port_and_start"); |
||||
|
||||
memset(&resolved_addr, 0, sizeof(resolved_addr)); |
||||
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in)); |
||||
addr->sin_family = AF_INET; |
||||
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, |
||||
snd_buf_size, &handler_factory, |
||||
g_num_listeners) > 0); |
||||
|
||||
std::vector<grpc_pollset*> empty_pollset; |
||||
grpc_udp_server_start(s, &empty_pollset, nullptr); |
||||
GPR_ASSERT(g_number_of_starts == g_num_listeners); |
||||
grpc_udp_server_destroy(s, nullptr); |
||||
|
||||
/* The server had a single FD, which is orphaned exactly once in *
|
||||
* grpc_udp_server_destroy. */ |
||||
GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
static void test_receive(int number_of_clients) { |
||||
grpc_pollset_init(g_pollset, &g_mu); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_resolved_address resolved_addr; |
||||
struct sockaddr_storage* addr = |
||||
reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr); |
||||
int clifd, svrfd; |
||||
grpc_udp_server* s = grpc_udp_server_create(nullptr); |
||||
int i; |
||||
grpc_millis deadline; |
||||
LOG_TEST("test_receive"); |
||||
gpr_log(GPR_INFO, "clients=%d", number_of_clients); |
||||
|
||||
g_number_of_bytes_read = 0; |
||||
g_number_of_orphan_calls = 0; |
||||
|
||||
memset(&resolved_addr, 0, sizeof(resolved_addr)); |
||||
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage)); |
||||
addr->ss_family = AF_INET; |
||||
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, |
||||
snd_buf_size, &handler_factory, |
||||
g_num_listeners) > 0); |
||||
|
||||
svrfd = grpc_udp_server_get_fd(s, 0); |
||||
GPR_ASSERT(svrfd >= 0); |
||||
GPR_ASSERT(getsockname(svrfd, (struct sockaddr*)addr, |
||||
(socklen_t*)&resolved_addr.len) == 0); |
||||
GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage)); |
||||
|
||||
std::vector<grpc_pollset*> test_pollsets; |
||||
test_pollsets.emplace_back(g_pollset); |
||||
grpc_udp_server_start(s, &test_pollsets, nullptr); |
||||
|
||||
gpr_mu_lock(g_mu); |
||||
|
||||
for (i = 0; i < number_of_clients; i++) { |
||||
deadline = |
||||
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); |
||||
|
||||
int number_of_bytes_read_before = g_number_of_bytes_read; |
||||
/* Create a socket, send a packet to the UDP server. */ |
||||
clifd = socket(addr->ss_family, SOCK_DGRAM, 0); |
||||
GPR_ASSERT(clifd >= 0); |
||||
GPR_ASSERT(connect(clifd, (struct sockaddr*)addr, |
||||
(socklen_t)resolved_addr.len) == 0); |
||||
GPR_ASSERT(5 == write(clifd, "hello", 5)); |
||||
while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) && |
||||
deadline > grpc_core::ExecCtx::Get()->Now()) { |
||||
grpc_pollset_worker* worker = nullptr; |
||||
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
||||
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
||||
gpr_mu_unlock(g_mu); |
||||
grpc_core::ExecCtx::Get()->Flush(); |
||||
gpr_mu_lock(g_mu); |
||||
} |
||||
close(clifd); |
||||
} |
||||
GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients); |
||||
|
||||
gpr_mu_unlock(g_mu); |
||||
|
||||
grpc_udp_server_destroy(s, nullptr); |
||||
|
||||
/* The server had a single FD, which is orphaned exactly once in *
|
||||
* grpc_udp_server_destroy. */ |
||||
GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners); |
||||
shutdown_and_destroy_pollset(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(argc, argv); |
||||
grpc_init(); |
||||
if (grpc_is_socket_reuse_port_supported()) { |
||||
g_num_listeners = 10; |
||||
} |
||||
{ |
||||
grpc_core::ExecCtx exec_ctx; |
||||
g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); |
||||
|
||||
test_no_op(); |
||||
test_no_op_with_start(); |
||||
test_no_op_with_port(); |
||||
test_no_op_with_port_and_socket_factory(); |
||||
test_no_op_with_port_and_start(); |
||||
test_receive(1); |
||||
test_receive(10); |
||||
|
||||
gpr_free(g_pollset); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
||||
|
||||
#else /* GRPC_POSIX_SOCKET_UDP_SERVER */ |
||||
|
||||
int main(int argc, char** argv) { return 1; } |
||||
|
||||
#endif /* GRPC_POSIX_SOCKET_UDP_SERVER */ |
Loading…
Reference in new issue