Changing grpc_tcp_client_vtable to include TCP cancel connect method (#29968)

* Changing grpc_tcp_client_vtable to include TCP cancel connect method

* fix unused parameter error

* update

* fix sanity checks
pull/29986/head
Vignesh Babu 3 years ago committed by GitHub
parent a0da0ab243
commit f8eedac1fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/lib/iomgr/tcp_client.cc
  2. 14
      src/core/lib/iomgr/tcp_client.h
  3. 12
      src/core/lib/iomgr/tcp_client_cfstream.cc
  4. 10
      src/core/lib/iomgr/tcp_client_posix.cc
  5. 10
      src/core/lib/iomgr/tcp_client_windows.cc
  6. 10
      test/core/end2end/fuzzers/api_fuzzer.cc
  7. 15
      test/cpp/end2end/connection_delay_injector.cc

@ -22,15 +22,20 @@
grpc_tcp_client_vtable* grpc_tcp_client_impl;
void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties,
return grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties,
channel_args, addr, deadline);
}
bool grpc_tcp_client_cancel_connect(int64_t connection_handle) {
return grpc_tcp_client_impl->cancel_connect(connection_handle);
}
void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl) {
grpc_tcp_client_impl = impl;
}

@ -30,24 +30,32 @@
#include "src/core/lib/resource_quota/memory_quota.h"
typedef struct grpc_tcp_client_vtable {
void (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
int64_t (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
bool (*cancel_connect)(int64_t connection_handle);
} grpc_tcp_client_vtable;
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
interested_parties points to a set of pollsets that would be interested
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
in this connection being established (in order to continue their work). It
returns a handle to the connect operation which can be used to cancel the
connection attempt. */
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
// Returns true if a connect attempt corresponding to the provided handle
// is successfully cancelled. Otherwise it returns false.
bool grpc_tcp_client_cancel_connect(int64_t connection_handle);
void grpc_tcp_client_global_init();
void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl);

@ -149,7 +149,7 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr,
*port = grpc_sockaddr_get_port(addr);
}
static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
static int64_t CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* resolved_addr,
@ -159,7 +159,7 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_error_handle error =
GRPC_ERROR_CREATE_FROM_CPP_STRING(addr_uri.status().ToString());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
return 0;
}
CFStreamConnect* connect = new CFStreamConnect();
@ -198,8 +198,14 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
CFWriteStreamOpen(write_stream);
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
gpr_mu_unlock(&connect->mu);
return 0;
}
grpc_tcp_client_vtable grpc_cfstream_client_vtable = {CFStreamClientConnect};
static bool CFStreamClientCancelConnect(int64_t /*connection_handle*/) {
return false;
}
grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
CFStreamClientConnect, CFStreamClientCancelConnect};
#endif /* GRPC_CFSTREAM_CLIENT */

@ -328,7 +328,7 @@ void grpc_tcp_client_create_from_prepared_fd(
gpr_mu_unlock(&ac->mu);
}
static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
@ -340,12 +340,16 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fd)) != GRPC_ERROR_NONE) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
return 0;
}
grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
channel_args, &mapped_addr, deadline,
ep);
return 0;
}
grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; }
grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect,
tcp_cancel_connect};
#endif

@ -121,7 +121,7 @@ static void on_connect(void* acp, grpc_error_handle error) {
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
@ -216,7 +216,7 @@ static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
grpc_socket_notify_on_write(socket, &ac->on_connect);
gpr_mu_unlock(&ac->mu);
return;
return 0;
failure:
GPR_ASSERT(!GRPC_ERROR_IS_NONE(error));
@ -232,8 +232,12 @@ failure:
closesocket(sock);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
return 0;
}
grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect};
static bool tcp_cancel_connect(int64_t /*connection_handle*/) { return false; }
grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect,
tcp_cancel_connect};
#endif /* GRPC_WINSOCK_SOCKET */

@ -258,15 +258,21 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
}
static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
static int64_t my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_channel_args* /*channel_args*/,
const grpc_resolved_address* /*addr*/,
grpc_core::Timestamp deadline) {
sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
return 0;
}
grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect};
static bool my_tcp_cancel_connect(int64_t /*connection_handle*/) {
return false;
}
grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect,
my_tcp_cancel_connect};
////////////////////////////////////////////////////////////////////////////////
// test driver

@ -38,7 +38,7 @@ grpc_tcp_client_vtable* g_original_vtable = nullptr;
grpc_core::Mutex* g_mu = nullptr;
ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr;
void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
@ -47,13 +47,22 @@ void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
if (g_injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
return;
return 0;
}
g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline);
return 0;
}
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
// TODO(vigneshbabu): This method should check whether the connect attempt has
// actually been started, and if so, it should call
// g_original_vtable->cancel_connect(). If the attempt has not actually been
// started, it should mark the connect request as cancelled, so that when the
// request is resumed, it will not actually proceed.
bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; }
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay,
TcpConnectCancel};
} // namespace

Loading…
Cancel
Save