Delete custom iomgr (#28816)
* Squashed version of gnossen's gevent pR * rm * Automated change: Fix sanity tests Co-authored-by: drfloob <drfloob@users.noreply.github.com>pull/28878/head
parent
1fee3d72be
commit
84ed11459d
31 changed files with 7 additions and 2066 deletions
@ -1,82 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
|
||||
#include <grpc/support/thd_id.h> |
||||
|
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/iomgr/executor.h" |
||||
#include "src/core/lib/iomgr/iomgr_internal.h" |
||||
#include "src/core/lib/iomgr/pollset_custom.h" |
||||
#include "src/core/lib/iomgr/pollset_set_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/resolve_address_custom.h" |
||||
|
||||
gpr_thd_id g_init_thread; |
||||
|
||||
static void iomgr_platform_init(void) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_core::Executor::SetThreadingAll(false); |
||||
g_init_thread = gpr_thd_currentid(); |
||||
grpc_pollset_global_init(); |
||||
} |
||||
static void iomgr_platform_flush(void) {} |
||||
static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } |
||||
static void iomgr_platform_shutdown_background_closure(void) {} |
||||
static bool iomgr_platform_is_any_background_poller_thread(void) { |
||||
return false; |
||||
} |
||||
static bool iomgr_platform_add_closure_to_background_poller( |
||||
grpc_closure* /*closure*/, grpc_error_handle /*error*/) { |
||||
return false; |
||||
} |
||||
|
||||
bool g_custom_iomgr_enabled = false; |
||||
|
||||
static grpc_iomgr_platform_vtable vtable = { |
||||
iomgr_platform_init, |
||||
iomgr_platform_flush, |
||||
iomgr_platform_shutdown, |
||||
iomgr_platform_shutdown_background_closure, |
||||
iomgr_platform_is_any_background_poller_thread, |
||||
iomgr_platform_add_closure_to_background_poller}; |
||||
|
||||
void grpc_custom_iomgr_init(grpc_socket_vtable* socket, |
||||
grpc_custom_resolver_vtable* resolver, |
||||
grpc_custom_timer_vtable* timer, |
||||
grpc_custom_poller_vtable* poller) { |
||||
g_custom_iomgr_enabled = true; |
||||
grpc_custom_endpoint_init(socket); |
||||
grpc_custom_timer_init(timer); |
||||
grpc_custom_pollset_init(poller); |
||||
grpc_custom_pollset_set_init(); |
||||
grpc_core::CustomDNSResolver::Create(resolver); |
||||
grpc_core::CustomDNSResolver* custom_dns_resolver = |
||||
grpc_core::CustomDNSResolver::Get(); |
||||
grpc_core::SetDNSResolver(custom_dns_resolver); |
||||
grpc_set_iomgr_platform_vtable(&vtable); |
||||
} |
||||
|
||||
#ifdef GRPC_CUSTOM_SOCKET |
||||
grpc_iomgr_platform_vtable* grpc_default_iomgr_platform_vtable() { |
||||
return &vtable; |
||||
} |
||||
#endif |
@ -1,49 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/thd_id.h> |
||||
|
||||
#include "src/core/lib/iomgr/pollset_custom.h" |
||||
#include "src/core/lib/iomgr/resolve_address_custom.h" |
||||
#include "src/core/lib/iomgr/tcp_custom.h" |
||||
#include "src/core/lib/iomgr/timer_custom.h" |
||||
|
||||
/* The thread ID of the thread on which grpc was initialized. Used to verify
|
||||
* that all calls into the custom iomgr are made on that same thread */ |
||||
extern gpr_thd_id g_init_thread; |
||||
|
||||
#ifdef GRPC_CUSTOM_IOMGR_THREAD_CHECK |
||||
#define GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD() \ |
||||
GPR_ASSERT(gpr_thd_currentid() == g_init_thread) |
||||
#else |
||||
#define GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD() |
||||
#endif /* GRPC_CUSTOM_IOMGR_THREAD_CHECK */ |
||||
|
||||
extern bool g_custom_iomgr_enabled; |
||||
|
||||
void grpc_custom_iomgr_init(grpc_socket_vtable* socket, |
||||
grpc_custom_resolver_vtable* resolver, |
||||
grpc_custom_timer_vtable* timer, |
||||
grpc_custom_poller_vtable* poller); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H */ |
@ -1,105 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/pollset_custom.h" |
||||
|
||||
#include <stddef.h> |
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/pollset.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
static grpc_custom_poller_vtable* poller_vtable; |
||||
|
||||
struct grpc_pollset { |
||||
gpr_mu mu; |
||||
}; |
||||
|
||||
static size_t pollset_size() { return sizeof(grpc_pollset); } |
||||
|
||||
static void pollset_global_init() { poller_vtable->init(); } |
||||
|
||||
static void pollset_global_shutdown() { poller_vtable->shutdown(); } |
||||
|
||||
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
gpr_mu_init(&pollset->mu); |
||||
*mu = &pollset->mu; |
||||
} |
||||
|
||||
static void pollset_shutdown(grpc_pollset* /*pollset*/, grpc_closure* closure) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
static void pollset_destroy(grpc_pollset* pollset) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
gpr_mu_destroy(&pollset->mu); |
||||
} |
||||
|
||||
static grpc_error_handle pollset_work(grpc_pollset* pollset, |
||||
grpc_pollset_worker** /*worker_hdl*/, |
||||
grpc_millis deadline) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
gpr_mu_unlock(&pollset->mu); |
||||
grpc_millis now = grpc_core::ExecCtx::Get()->Now(); |
||||
grpc_millis timeout = 0; |
||||
if (deadline > now) { |
||||
timeout = deadline - now; |
||||
} |
||||
// We yield here because the poll() call might yield
|
||||
// control back to the application
|
||||
grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get(); |
||||
grpc_core::ExecCtx::Set(nullptr); |
||||
grpc_error_handle err = poller_vtable->poll(static_cast<size_t>(timeout)); |
||||
grpc_core::ExecCtx::Set(curr); |
||||
grpc_core::ExecCtx::Get()->InvalidateNow(); |
||||
if (grpc_core::ExecCtx::Get()->HasWork()) { |
||||
grpc_core::ExecCtx::Get()->Flush(); |
||||
} |
||||
gpr_mu_lock(&pollset->mu); |
||||
return err; |
||||
} |
||||
|
||||
static grpc_error_handle pollset_kick( |
||||
grpc_pollset* /*pollset*/, grpc_pollset_worker* /*specific_worker*/) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
poller_vtable->kick(); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
grpc_pollset_vtable custom_pollset_vtable = { |
||||
pollset_global_init, pollset_global_shutdown, |
||||
pollset_init, pollset_shutdown, |
||||
pollset_destroy, pollset_work, |
||||
pollset_kick, pollset_size}; |
||||
|
||||
void grpc_custom_pollset_init(grpc_custom_poller_vtable* vtable) { |
||||
poller_vtable = vtable; |
||||
grpc_set_pollset_vtable(&custom_pollset_vtable); |
||||
} |
@ -1,37 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include "src/core/lib/iomgr/error.h" |
||||
|
||||
typedef struct grpc_custom_poller_vtable { |
||||
void (*init)(); |
||||
grpc_error_handle (*poll)(size_t timeout_ms); |
||||
void (*kick)(); |
||||
void (*shutdown)(); |
||||
} grpc_custom_poller_vtable; |
||||
|
||||
void grpc_custom_pollset_init(grpc_custom_poller_vtable* vtable); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H */ |
@ -1,47 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/pollset_set.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
|
||||
static grpc_pollset_set* pollset_set_create(void) { |
||||
return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef)); |
||||
} |
||||
|
||||
static void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {} |
||||
|
||||
static void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/, |
||||
grpc_pollset* /*pollset*/) {} |
||||
|
||||
static void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/, |
||||
grpc_pollset* /*pollset*/) {} |
||||
|
||||
static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/, |
||||
grpc_pollset_set* /*item*/) {} |
||||
|
||||
static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/, |
||||
grpc_pollset_set* /*item*/) {} |
||||
|
||||
static grpc_pollset_set_vtable vtable = { |
||||
pollset_set_create, pollset_set_destroy, |
||||
pollset_set_add_pollset, pollset_set_del_pollset, |
||||
pollset_set_add_pollset_set, pollset_set_del_pollset_set}; |
||||
|
||||
void grpc_custom_pollset_set_init() { grpc_set_pollset_set_vtable(&vtable); } |
@ -1,26 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
void grpc_custom_pollset_set_init(); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H */ |
@ -1,191 +0,0 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/resolve_address_custom.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <cstdio> |
||||
#include <string> |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/gpr/string.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/resolve_address_impl.h" |
||||
#include "src/core/lib/transport/error_utils.h" |
||||
|
||||
void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addresses) { |
||||
if (addresses != nullptr) { |
||||
gpr_free(addresses->addrs); |
||||
} |
||||
gpr_free(addresses); |
||||
} |
||||
|
||||
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, |
||||
grpc_resolved_addresses* result, |
||||
grpc_error_handle error) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_core::CustomDNSResolver::Request* request = |
||||
grpc_core::CustomDNSResolver::Request::FromC(resolver); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
request->ResolveCallback(grpc_error_to_absl_status(error)); |
||||
} else { |
||||
std::vector<grpc_resolved_address> addresses; |
||||
for (size_t i = 0; i < result->naddrs; i++) { |
||||
addresses.push_back(result->addrs[i]); |
||||
} |
||||
request->ResolveCallback(std::move(addresses)); |
||||
grpc_resolved_addresses_destroy(result); |
||||
} |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
absl::Status TrySplitHostPort(absl::string_view name, |
||||
absl::string_view default_port, std::string* host, |
||||
std::string* port) { |
||||
// parse name, splitting it into host and port parts
|
||||
SplitHostPort(name, host, port); |
||||
if (host->empty()) { |
||||
return absl::UnknownError( |
||||
absl::StrFormat("unparseable host:port: '%s'", name)); |
||||
} |
||||
if (port->empty()) { |
||||
// TODO(murgatroid99): add tests for this case
|
||||
if (default_port.empty()) { |
||||
return absl::UnknownError(absl::StrFormat("no port in name '%s'", name)); |
||||
} |
||||
*port = std::string(default_port); |
||||
} |
||||
return absl::OkStatus(); |
||||
} |
||||
|
||||
absl::StatusOr<std::string> NamedPortToNumeric(absl::string_view named_port) { |
||||
if (named_port == "http") { |
||||
return "80"; |
||||
} else if (named_port == "https") { |
||||
return "443"; |
||||
} else { |
||||
return absl::UnknownError(absl::StrCat("unknown named port: ", named_port)); |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
void CustomDNSResolver::Request::ResolveCallback( |
||||
absl::StatusOr<std::vector<grpc_resolved_address>> result) { |
||||
if (!result.ok()) { |
||||
auto numeric_port_or = NamedPortToNumeric(port_); |
||||
if (numeric_port_or.ok()) { |
||||
port_ = *numeric_port_or; |
||||
resolve_address_vtable_->resolve_async(c_ptr(), host_.c_str(), |
||||
port_.c_str()); |
||||
// keep holding ref for active resolution
|
||||
return; |
||||
} |
||||
} |
||||
// since we can't guarantee that we're not being called inline from
|
||||
// Start(), run the callback on the ExecCtx.
|
||||
new DNSCallbackExecCtxScheduler(std::move(on_done_), std::move(result)); |
||||
Unref(); |
||||
} |
||||
|
||||
namespace { |
||||
CustomDNSResolver* g_custom_dns_resolver; |
||||
} // namespace
|
||||
|
||||
// Creates the global custom resolver with the specified vtable.
|
||||
void CustomDNSResolver::Create(grpc_custom_resolver_vtable* vtable) { |
||||
if (g_custom_dns_resolver != nullptr) return; |
||||
g_custom_dns_resolver = new CustomDNSResolver(vtable); |
||||
} |
||||
|
||||
// Gets the singleton instance.
|
||||
CustomDNSResolver* CustomDNSResolver::Get() { return g_custom_dns_resolver; } |
||||
|
||||
absl::StatusOr<std::vector<grpc_resolved_address>> |
||||
CustomDNSResolver::ResolveNameBlocking(absl::string_view name, |
||||
absl::string_view default_port) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
|
||||
std::string host; |
||||
std::string port; |
||||
absl::Status parse_status = |
||||
TrySplitHostPort(name, default_port, &host, &port); |
||||
if (!parse_status.ok()) { |
||||
return parse_status; |
||||
} |
||||
|
||||
// Call getaddrinfo
|
||||
ExecCtx* curr = ExecCtx::Get(); |
||||
ExecCtx::Set(nullptr); |
||||
grpc_resolved_addresses* addrs; |
||||
grpc_error_handle err = |
||||
resolve_address_vtable_->resolve(host.c_str(), port.c_str(), &addrs); |
||||
if (err != GRPC_ERROR_NONE) { |
||||
auto numeric_port_or = NamedPortToNumeric(port); |
||||
if (numeric_port_or.ok()) { |
||||
port = *numeric_port_or; |
||||
GRPC_ERROR_UNREF(err); |
||||
err = |
||||
resolve_address_vtable_->resolve(host.c_str(), port.c_str(), &addrs); |
||||
} |
||||
} |
||||
ExecCtx::Set(curr); |
||||
if (err == GRPC_ERROR_NONE) { |
||||
GPR_ASSERT(addrs != nullptr); |
||||
std::vector<grpc_resolved_address> result; |
||||
for (size_t i = 0; i < addrs->naddrs; i++) { |
||||
result.push_back(addrs->addrs[i]); |
||||
} |
||||
grpc_resolved_addresses_destroy(addrs); |
||||
return result; |
||||
} |
||||
auto error_result = grpc_error_to_absl_status(err); |
||||
GRPC_ERROR_UNREF(err); |
||||
return error_result; |
||||
} |
||||
|
||||
void CustomDNSResolver::Request::Start() { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
absl::Status parse_status = |
||||
TrySplitHostPort(name_, default_port_, &host_, &port_); |
||||
if (!parse_status.ok()) { |
||||
new DNSCallbackExecCtxScheduler(std::move(on_done_), |
||||
std::move(parse_status)); |
||||
return; |
||||
} |
||||
// Call getaddrinfo
|
||||
Ref().release(); // ref held by resolution
|
||||
resolve_address_vtable_->resolve_async(c_ptr(), host_.c_str(), port_.c_str()); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -1,126 +0,0 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "src/core/lib/gprpp/cpp_impl_of.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/resolve_address.h" |
||||
#include "src/core/lib/iomgr/sockaddr.h" |
||||
|
||||
// User-configured custom DNS resolution APIs
|
||||
|
||||
// TODO(apolcyn): This type could be deleted as a part of converting
|
||||
// this grpc_custom_resolver API to C++.
|
||||
struct grpc_resolved_addresses { |
||||
size_t naddrs; |
||||
grpc_resolved_address* addrs; |
||||
}; |
||||
|
||||
// Destroy resolved addresses
|
||||
void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addresses); |
||||
|
||||
typedef struct grpc_custom_resolver grpc_custom_resolver; |
||||
|
||||
typedef struct grpc_custom_resolver_vtable { |
||||
grpc_error_handle (*resolve)(const char* host, const char* port, |
||||
grpc_resolved_addresses** res); |
||||
void (*resolve_async)(grpc_custom_resolver* resolver, const char* host, |
||||
const char* port); |
||||
} grpc_custom_resolver_vtable; |
||||
|
||||
// TODO(apolcyn): as a part of converting this API to C++,
|
||||
// callers of \a grpc_custom_resolve_callback could instead just invoke
|
||||
// CustomDNSResolver::Request::ResolveCallback directly.
|
||||
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, |
||||
grpc_resolved_addresses* result, |
||||
grpc_error_handle error); |
||||
|
||||
// Internal APIs
|
||||
|
||||
namespace grpc_core { |
||||
|
||||
class CustomDNSResolver : public DNSResolver { |
||||
public: |
||||
class Request : public DNSResolver::Request, |
||||
public CppImplOf<Request, grpc_custom_resolver> { |
||||
public: |
||||
Request( |
||||
absl::string_view name, absl::string_view default_port, |
||||
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)> |
||||
on_done, |
||||
const grpc_custom_resolver_vtable* resolve_address_vtable) |
||||
: name_(name), |
||||
default_port_(default_port), |
||||
on_done_(std::move(on_done)), |
||||
resolve_address_vtable_(resolve_address_vtable) {} |
||||
|
||||
// Starts the resolution
|
||||
void Start() override; |
||||
|
||||
// This is a no-op for the native resolver. Note
|
||||
// that no I/O polling is required for the resolution to finish.
|
||||
void Orphan() override { Unref(); } |
||||
|
||||
// Continues async resolution with the results passed first in to
|
||||
// grpc_custom_resolve_callback.
|
||||
void ResolveCallback( |
||||
absl::StatusOr<std::vector<grpc_resolved_address>> result); |
||||
|
||||
private: |
||||
const std::string name_; |
||||
const std::string default_port_; |
||||
std::string host_; |
||||
std::string port_; |
||||
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)> |
||||
on_done_; |
||||
// user-defined DNS methods
|
||||
const grpc_custom_resolver_vtable* resolve_address_vtable_; |
||||
}; |
||||
|
||||
// Creates the global custom resolver with the specified vtable.
|
||||
static void Create(grpc_custom_resolver_vtable* vtable); |
||||
|
||||
// Gets the singleton instance.
|
||||
static CustomDNSResolver* Get(); |
||||
|
||||
explicit CustomDNSResolver(const grpc_custom_resolver_vtable* vtable) |
||||
: resolve_address_vtable_(vtable) {} |
||||
|
||||
OrphanablePtr<DNSResolver::Request> ResolveName( |
||||
absl::string_view name, absl::string_view default_port, |
||||
grpc_pollset_set* /* interested_parties */, |
||||
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)> |
||||
on_done) override { |
||||
return MakeOrphanable<Request>(name, default_port, std::move(on_done), |
||||
resolve_address_vtable_); |
||||
} |
||||
|
||||
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking( |
||||
absl::string_view name, absl::string_view default_port) override; |
||||
|
||||
private: |
||||
// user-defined DNS methods
|
||||
const grpc_custom_resolver_vtable* resolve_address_vtable_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H */ |
@ -1,146 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/tcp_client.h" |
||||
#include "src/core/lib/iomgr/tcp_custom.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||
extern grpc_socket_vtable* grpc_custom_socket_vtable; |
||||
|
||||
struct grpc_custom_tcp_connect { |
||||
grpc_custom_socket* socket; |
||||
grpc_timer alarm; |
||||
grpc_closure on_alarm; |
||||
grpc_closure* closure; |
||||
grpc_endpoint** endpoint; |
||||
int refs; |
||||
std::string addr_name; |
||||
}; |
||||
|
||||
static void custom_tcp_connect_cleanup(grpc_custom_tcp_connect* connect) { |
||||
grpc_custom_socket* socket = connect->socket; |
||||
delete connect; |
||||
if (socket->refs == 0) { |
||||
grpc_custom_socket_vtable->destroy(socket); |
||||
gpr_free(socket); |
||||
} |
||||
} |
||||
|
||||
static void custom_close_callback(grpc_custom_socket* /*socket*/) {} |
||||
|
||||
static void on_alarm(void* acp, grpc_error_handle error) { |
||||
int done; |
||||
grpc_custom_socket* socket = static_cast<grpc_custom_socket*>(acp); |
||||
grpc_custom_tcp_connect* connect = socket->connector; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s", |
||||
connect->addr_name.c_str(), grpc_error_std_string(error).c_str()); |
||||
} |
||||
if (error == GRPC_ERROR_NONE) { |
||||
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
|
||||
it was cancelled, then the handler that cancelled it also should close |
||||
the handle, if applicable */ |
||||
grpc_custom_socket_vtable->close(socket, custom_close_callback); |
||||
} |
||||
done = (--connect->refs == 0); |
||||
socket->refs--; |
||||
if (done) { |
||||
custom_tcp_connect_cleanup(connect); |
||||
} |
||||
} |
||||
|
||||
static void custom_connect_callback_internal(grpc_custom_socket* socket, |
||||
grpc_error_handle error) { |
||||
grpc_custom_tcp_connect* connect = socket->connector; |
||||
int done; |
||||
grpc_closure* closure = connect->closure; |
||||
grpc_timer_cancel(&connect->alarm); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
*connect->endpoint = |
||||
custom_tcp_endpoint_create(socket, connect->addr_name.c_str()); |
||||
} |
||||
done = (--connect->refs == 0); |
||||
socket->refs--; |
||||
if (done) { |
||||
grpc_core::ExecCtx::Get()->Flush(); |
||||
custom_tcp_connect_cleanup(connect); |
||||
} |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); |
||||
} |
||||
|
||||
static void custom_connect_callback(grpc_custom_socket* socket, |
||||
grpc_error_handle error) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
if (grpc_core::ExecCtx::Get() == nullptr) { |
||||
/* If we are being run on a thread which does not have an exec_ctx created
|
||||
* yet, we should create one. */ |
||||
grpc_core::ExecCtx exec_ctx; |
||||
custom_connect_callback_internal(socket, error); |
||||
} else { |
||||
custom_connect_callback_internal(socket, error); |
||||
} |
||||
} |
||||
|
||||
static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, |
||||
grpc_pollset_set* interested_parties, |
||||
const grpc_channel_args* channel_args, |
||||
const grpc_resolved_address* resolved_addr, |
||||
grpc_millis deadline) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
(void)channel_args; |
||||
(void)interested_parties; |
||||
grpc_custom_socket* socket = |
||||
static_cast<grpc_custom_socket*>(gpr_malloc(sizeof(grpc_custom_socket))); |
||||
socket->refs = 2; |
||||
(void)grpc_custom_socket_vtable->init(socket, GRPC_AF_UNSPEC); |
||||
grpc_custom_tcp_connect* connect = new grpc_custom_tcp_connect(); |
||||
connect->closure = closure; |
||||
connect->endpoint = ep; |
||||
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); |
||||
connect->socket = socket; |
||||
socket->connector = connect; |
||||
socket->endpoint = nullptr; |
||||
socket->listener = nullptr; |
||||
connect->refs = 2; |
||||
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "CLIENT_CONNECT: %p %s: asynchronously connecting", |
||||
socket, connect->addr_name.c_str()); |
||||
} |
||||
|
||||
GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket, |
||||
grpc_schedule_on_exec_ctx); |
||||
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); |
||||
grpc_custom_socket_vtable->connect( |
||||
socket, reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr), |
||||
resolved_addr->len, custom_connect_callback); |
||||
} |
||||
|
||||
grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect}; |
@ -1,350 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/tcp_custom.h" |
||||
|
||||
#include <limits.h> |
||||
#include <string.h> |
||||
|
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/tcp_client.h" |
||||
#include "src/core/lib/iomgr/tcp_server.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_string_helpers.h" |
||||
|
||||
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 |
||||
|
||||
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||
|
||||
grpc_socket_vtable* grpc_custom_socket_vtable = nullptr; |
||||
extern grpc_tcp_server_vtable custom_tcp_server_vtable; |
||||
extern grpc_tcp_client_vtable custom_tcp_client_vtable; |
||||
|
||||
void grpc_custom_endpoint_init(grpc_socket_vtable* impl) { |
||||
grpc_custom_socket_vtable = impl; |
||||
grpc_set_tcp_client_impl(&custom_tcp_client_vtable); |
||||
grpc_set_tcp_server_impl(&custom_tcp_server_vtable); |
||||
} |
||||
|
||||
struct custom_tcp_endpoint { |
||||
grpc_endpoint base; |
||||
gpr_refcount refcount; |
||||
grpc_custom_socket* socket; |
||||
|
||||
grpc_closure* read_cb = nullptr; |
||||
grpc_closure* write_cb = nullptr; |
||||
|
||||
grpc_slice_buffer* read_slices = nullptr; |
||||
grpc_slice_buffer* write_slices = nullptr; |
||||
|
||||
bool shutting_down; |
||||
|
||||
std::string peer_string; |
||||
std::string local_address; |
||||
}; |
||||
static void tcp_free(grpc_custom_socket* s) { |
||||
custom_tcp_endpoint* tcp = |
||||
reinterpret_cast<custom_tcp_endpoint*>(s->endpoint); |
||||
delete tcp; |
||||
s->refs--; |
||||
if (s->refs == 0) { |
||||
grpc_custom_socket_vtable->destroy(s); |
||||
gpr_free(s); |
||||
} |
||||
} |
||||
|
||||
#ifndef NDEBUG |
||||
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) |
||||
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) |
||||
static void tcp_unref(custom_tcp_endpoint* tcp, const char* reason, |
||||
const char* file, int line) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); |
||||
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, |
||||
"TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason, |
||||
val, val - 1); |
||||
} |
||||
if (gpr_unref(&tcp->refcount)) { |
||||
tcp_free(tcp->socket); |
||||
} |
||||
} |
||||
|
||||
static void tcp_ref(custom_tcp_endpoint* tcp, const char* reason, |
||||
const char* file, int line) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); |
||||
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, |
||||
"TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason, |
||||
val, val + 1); |
||||
} |
||||
gpr_ref(&tcp->refcount); |
||||
} |
||||
#else |
||||
#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) |
||||
#define TCP_REF(tcp, reason) tcp_ref((tcp)) |
||||
static void tcp_unref(custom_tcp_endpoint* tcp) { |
||||
if (gpr_unref(&tcp->refcount)) { |
||||
tcp_free(tcp->socket); |
||||
} |
||||
} |
||||
|
||||
static void tcp_ref(custom_tcp_endpoint* tcp) { gpr_ref(&tcp->refcount); } |
||||
#endif |
||||
|
||||
static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error_handle error) { |
||||
grpc_closure* cb = tcp->read_cb; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp->socket, cb, cb->cb, |
||||
cb->cb_arg); |
||||
size_t i; |
||||
gpr_log(GPR_INFO, "read: error=%s", grpc_error_std_string(error).c_str()); |
||||
for (i = 0; i < tcp->read_slices->count; i++) { |
||||
char* dump = grpc_dump_slice(tcp->read_slices->slices[i], |
||||
GPR_DUMP_HEX | GPR_DUMP_ASCII); |
||||
gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string.c_str(), |
||||
dump); |
||||
gpr_free(dump); |
||||
} |
||||
} |
||||
TCP_UNREF(tcp, "read"); |
||||
tcp->read_slices = nullptr; |
||||
tcp->read_cb = nullptr; |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); |
||||
} |
||||
|
||||
static void custom_read_callback(grpc_custom_socket* socket, size_t nread, |
||||
grpc_error_handle error) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_slice_buffer garbage; |
||||
custom_tcp_endpoint* tcp = |
||||
reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint); |
||||
if (error == GRPC_ERROR_NONE && nread == 0) { |
||||
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); |
||||
} |
||||
if (error == GRPC_ERROR_NONE) { |
||||
// Successful read
|
||||
if (nread < tcp->read_slices->length) { |
||||
/* TODO(murgatroid99): Instead of discarding the unused part of the read
|
||||
* buffer, reuse it as the next read buffer. */ |
||||
grpc_slice_buffer_init(&garbage); |
||||
grpc_slice_buffer_trim_end(tcp->read_slices, |
||||
tcp->read_slices->length - nread, &garbage); |
||||
grpc_slice_buffer_reset_and_unref_internal(&garbage); |
||||
} |
||||
} else { |
||||
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); |
||||
} |
||||
call_read_cb(tcp, error); |
||||
} |
||||
|
||||
static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, |
||||
grpc_closure* cb, bool /*urgent*/) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
GPR_ASSERT(tcp->read_cb == nullptr); |
||||
tcp->read_cb = cb; |
||||
tcp->read_slices = read_slices; |
||||
grpc_slice_buffer_reset_and_unref_internal(read_slices); |
||||
TCP_REF(tcp, "read"); |
||||
if (tcp->read_slices->length < GRPC_TCP_DEFAULT_READ_SLICE_SIZE) { |
||||
grpc_slice_buffer_add_indexed( |
||||
tcp->read_slices, GRPC_SLICE_MALLOC(GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); |
||||
} |
||||
/* slices[0] should always exist here since we just added it if it did not */ |
||||
char* buffer = reinterpret_cast<char*>( |
||||
GRPC_SLICE_START_PTR(tcp->read_slices->slices[0])); |
||||
size_t len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]); |
||||
grpc_custom_socket_vtable->read(tcp->socket, buffer, len, |
||||
custom_read_callback); |
||||
} |
||||
|
||||
static void custom_write_callback(grpc_custom_socket* socket, |
||||
grpc_error_handle error) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
custom_tcp_endpoint* tcp = |
||||
reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint); |
||||
grpc_closure* cb = tcp->write_cb; |
||||
tcp->write_cb = nullptr; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "write complete on %p: error=%s", tcp->socket, |
||||
grpc_error_std_string(error).c_str()); |
||||
} |
||||
TCP_UNREF(tcp, "write"); |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); |
||||
} |
||||
|
||||
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, |
||||
grpc_closure* cb, void* /*arg*/) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
size_t j; |
||||
|
||||
for (j = 0; j < write_slices->count; j++) { |
||||
char* data = grpc_dump_slice(write_slices->slices[j], |
||||
GPR_DUMP_HEX | GPR_DUMP_ASCII); |
||||
gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp->socket, |
||||
tcp->peer_string.c_str(), data); |
||||
gpr_free(data); |
||||
} |
||||
} |
||||
|
||||
if (tcp->shutting_down) { |
||||
grpc_core::ExecCtx::Run( |
||||
DEBUG_LOCATION, cb, |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP socket is shutting down")); |
||||
return; |
||||
} |
||||
|
||||
GPR_ASSERT(tcp->write_cb == nullptr); |
||||
tcp->write_slices = write_slices; |
||||
GPR_ASSERT(tcp->write_slices->count <= UINT_MAX); |
||||
if (tcp->write_slices->count == 0) { |
||||
// No slices means we don't have to do anything
|
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE); |
||||
return; |
||||
} |
||||
tcp->write_cb = cb; |
||||
TCP_REF(tcp, "write"); |
||||
grpc_custom_socket_vtable->write(tcp->socket, tcp->write_slices, |
||||
custom_write_callback); |
||||
} |
||||
|
||||
static void endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { |
||||
// No-op. We're ignoring pollsets currently
|
||||
(void)ep; |
||||
(void)pollset; |
||||
} |
||||
|
||||
static void endpoint_add_to_pollset_set(grpc_endpoint* ep, |
||||
grpc_pollset_set* pollset) { |
||||
// No-op. We're ignoring pollsets currently
|
||||
(void)ep; |
||||
(void)pollset; |
||||
} |
||||
|
||||
static void endpoint_delete_from_pollset_set(grpc_endpoint* ep, |
||||
grpc_pollset_set* pollset) { |
||||
// No-op. We're ignoring pollsets currently
|
||||
(void)ep; |
||||
(void)pollset; |
||||
} |
||||
|
||||
static void endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
if (!tcp->shutting_down) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "TCP %p shutdown why=%s", tcp->socket, |
||||
grpc_error_std_string(why).c_str()); |
||||
} |
||||
tcp->shutting_down = true; |
||||
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->read_cb,
|
||||
// GRPC_ERROR_REF(why));
|
||||
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->write_cb,
|
||||
// GRPC_ERROR_REF(why)); tcp->read_cb = nullptr; tcp->write_cb = nullptr;
|
||||
grpc_custom_socket_vtable->shutdown(tcp->socket); |
||||
} |
||||
GRPC_ERROR_UNREF(why); |
||||
} |
||||
|
||||
static void custom_close_callback(grpc_custom_socket* socket) { |
||||
socket->refs--; |
||||
if (socket->refs == 0) { |
||||
grpc_custom_socket_vtable->destroy(socket); |
||||
gpr_free(socket); |
||||
} else if (socket->endpoint) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
custom_tcp_endpoint* tcp = |
||||
reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint); |
||||
TCP_UNREF(tcp, "destroy"); |
||||
} |
||||
} |
||||
|
||||
static void endpoint_destroy(grpc_endpoint* ep) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
grpc_custom_socket_vtable->close(tcp->socket, custom_close_callback); |
||||
} |
||||
|
||||
static absl::string_view endpoint_get_peer(grpc_endpoint* ep) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
return tcp->peer_string; |
||||
} |
||||
|
||||
static absl::string_view endpoint_get_local_address(grpc_endpoint* ep) { |
||||
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep); |
||||
return tcp->local_address; |
||||
} |
||||
|
||||
static int endpoint_get_fd(grpc_endpoint* /*ep*/) { return -1; } |
||||
|
||||
static bool endpoint_can_track_err(grpc_endpoint* /*ep*/) { return false; } |
||||
|
||||
static grpc_endpoint_vtable vtable = {endpoint_read, |
||||
endpoint_write, |
||||
endpoint_add_to_pollset, |
||||
endpoint_add_to_pollset_set, |
||||
endpoint_delete_from_pollset_set, |
||||
endpoint_shutdown, |
||||
endpoint_destroy, |
||||
endpoint_get_peer, |
||||
endpoint_get_local_address, |
||||
endpoint_get_fd, |
||||
endpoint_can_track_err}; |
||||
|
||||
grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket, |
||||
const char* peer_string) { |
||||
custom_tcp_endpoint* tcp = new custom_tcp_endpoint; |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "Creating TCP endpoint %p", socket); |
||||
} |
||||
socket->refs++; |
||||
socket->endpoint = reinterpret_cast<grpc_endpoint*>(tcp); |
||||
tcp->socket = socket; |
||||
tcp->base.vtable = &vtable; |
||||
gpr_ref_init(&tcp->refcount, 1); |
||||
tcp->peer_string = peer_string; |
||||
grpc_resolved_address resolved_local_addr; |
||||
resolved_local_addr.len = sizeof(resolved_local_addr.addr); |
||||
if (grpc_custom_socket_vtable->getsockname( |
||||
socket, reinterpret_cast<sockaddr*>(resolved_local_addr.addr), |
||||
reinterpret_cast<int*>(&resolved_local_addr.len)) != |
||||
GRPC_ERROR_NONE) { |
||||
tcp->local_address = ""; |
||||
} else { |
||||
tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr); |
||||
} |
||||
tcp->shutting_down = false; |
||||
return &tcp->base; |
||||
} |
@ -1,85 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/sockaddr.h" |
||||
|
||||
// Same number as the micro of SO_REUSEPORT in kernel
|
||||
#define GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT (0x00000200u) |
||||
|
||||
typedef struct grpc_tcp_listener grpc_tcp_listener; |
||||
typedef struct grpc_custom_tcp_connect grpc_custom_tcp_connect; |
||||
|
||||
typedef struct grpc_custom_socket { |
||||
// Implementation defined
|
||||
void* impl; |
||||
grpc_endpoint* endpoint; |
||||
grpc_tcp_listener* listener; |
||||
grpc_custom_tcp_connect* connector; |
||||
int refs; |
||||
} grpc_custom_socket; |
||||
|
||||
typedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, |
||||
grpc_error_handle error); |
||||
typedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, |
||||
grpc_error_handle error); |
||||
typedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, |
||||
size_t nread, |
||||
grpc_error_handle error); |
||||
typedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, |
||||
grpc_custom_socket* client, |
||||
grpc_error_handle error); |
||||
typedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket); |
||||
|
||||
typedef struct grpc_socket_vtable { |
||||
grpc_error_handle (*init)(grpc_custom_socket* socket, int domain); |
||||
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, |
||||
size_t len, grpc_custom_connect_callback cb); |
||||
void (*destroy)(grpc_custom_socket* socket); |
||||
void (*shutdown)(grpc_custom_socket* socket); |
||||
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); |
||||
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, |
||||
grpc_custom_write_callback cb); |
||||
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, |
||||
grpc_custom_read_callback cb); |
||||
grpc_error_handle (*getpeername)(grpc_custom_socket* socket, |
||||
const grpc_sockaddr* addr, int* len); |
||||
grpc_error_handle (*getsockname)(grpc_custom_socket* socket, |
||||
const grpc_sockaddr* addr, int* len); |
||||
grpc_error_handle (*bind)(grpc_custom_socket* socket, |
||||
const grpc_sockaddr* addr, size_t len, int flags); |
||||
grpc_error_handle (*listen)(grpc_custom_socket* socket); |
||||
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, |
||||
grpc_custom_accept_callback cb); |
||||
} grpc_socket_vtable; |
||||
|
||||
/* Internal APIs */ |
||||
void grpc_custom_endpoint_init(grpc_socket_vtable* impl); |
||||
|
||||
void grpc_custom_close_server_callback(grpc_tcp_listener* listener); |
||||
|
||||
/// Takes ownership of \a slice_allocator.
|
||||
grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket, |
||||
const char* peer_string); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H */ |
@ -1,457 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <assert.h> |
||||
#include <string.h> |
||||
|
||||
#include <string> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/sockaddr.h" |
||||
#include "src/core/lib/iomgr/tcp_custom.h" |
||||
#include "src/core/lib/iomgr/tcp_server.h" |
||||
|
||||
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||
|
||||
extern grpc_socket_vtable* grpc_custom_socket_vtable; |
||||
|
||||
/* one listening port */ |
||||
struct grpc_tcp_listener { |
||||
grpc_tcp_server* server; |
||||
unsigned port_index; |
||||
int port; |
||||
|
||||
grpc_custom_socket* socket; |
||||
|
||||
/* linked list */ |
||||
struct grpc_tcp_listener* next; |
||||
|
||||
bool closed; |
||||
}; |
||||
|
||||
struct grpc_tcp_server { |
||||
gpr_refcount refs; |
||||
|
||||
/* Called whenever accept() succeeds on a server port. */ |
||||
grpc_tcp_server_cb on_accept_cb; |
||||
void* on_accept_cb_arg; |
||||
|
||||
int open_ports; |
||||
|
||||
/* linked list of server ports */ |
||||
grpc_tcp_listener* head; |
||||
grpc_tcp_listener* tail; |
||||
|
||||
/* List of closures passed to shutdown_starting_add(). */ |
||||
grpc_closure_list shutdown_starting; |
||||
|
||||
/* shutdown callback */ |
||||
grpc_closure* shutdown_complete; |
||||
|
||||
bool shutdown; |
||||
bool so_reuseport; |
||||
}; |
||||
|
||||
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, |
||||
const grpc_channel_args* args, |
||||
grpc_tcp_server** server) { |
||||
grpc_tcp_server* s = new grpc_tcp_server(); |
||||
s->so_reuseport = |
||||
grpc_channel_args_find_bool(args, GRPC_ARG_ALLOW_REUSEPORT, true); |
||||
gpr_ref_init(&s->refs, 1); |
||||
s->on_accept_cb = nullptr; |
||||
s->on_accept_cb_arg = nullptr; |
||||
s->open_ports = 0; |
||||
s->head = nullptr; |
||||
s->tail = nullptr; |
||||
s->shutdown_starting.head = nullptr; |
||||
s->shutdown_starting.tail = nullptr; |
||||
s->shutdown_complete = shutdown_complete; |
||||
s->shutdown = false; |
||||
*server = s; |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
gpr_ref(&s->refs); |
||||
return s; |
||||
} |
||||
|
||||
static void tcp_server_shutdown_starting_add(grpc_tcp_server* s, |
||||
grpc_closure* shutdown_starting) { |
||||
grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
static void finish_shutdown(grpc_tcp_server* s) { |
||||
GPR_ASSERT(s->shutdown); |
||||
if (s->shutdown_complete != nullptr) { |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
while (s->head) { |
||||
grpc_tcp_listener* sp = s->head; |
||||
s->head = sp->next; |
||||
sp->next = nullptr; |
||||
gpr_free(sp); |
||||
} |
||||
gpr_free(s); |
||||
} |
||||
|
||||
static void custom_close_callback(grpc_custom_socket* socket) { |
||||
grpc_tcp_listener* sp = socket->listener; |
||||
if (sp) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
sp->server->open_ports--; |
||||
if (sp->server->open_ports == 0 && sp->server->shutdown) { |
||||
finish_shutdown(sp->server); |
||||
} |
||||
} |
||||
socket->refs--; |
||||
if (socket->refs == 0) { |
||||
grpc_custom_socket_vtable->destroy(socket); |
||||
gpr_free(socket); |
||||
} |
||||
} |
||||
|
||||
void grpc_custom_close_server_callback(grpc_tcp_listener* listener) { |
||||
if (listener) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
listener->server->open_ports--; |
||||
if (listener->server->open_ports == 0 && listener->server->shutdown) { |
||||
finish_shutdown(listener->server); |
||||
} |
||||
} |
||||
} |
||||
|
||||
static void close_listener(grpc_tcp_listener* sp) { |
||||
grpc_custom_socket* socket = sp->socket; |
||||
if (!sp->closed) { |
||||
sp->closed = true; |
||||
grpc_custom_socket_vtable->close(socket, custom_close_callback); |
||||
} |
||||
} |
||||
|
||||
static void tcp_server_destroy(grpc_tcp_server* s) { |
||||
int immediately_done = 0; |
||||
grpc_tcp_listener* sp; |
||||
|
||||
GPR_ASSERT(!s->shutdown); |
||||
s->shutdown = true; |
||||
|
||||
if (s->open_ports == 0) { |
||||
immediately_done = 1; |
||||
} |
||||
for (sp = s->head; sp; sp = sp->next) { |
||||
close_listener(sp); |
||||
} |
||||
|
||||
if (immediately_done) { |
||||
finish_shutdown(s); |
||||
} |
||||
} |
||||
|
||||
static void tcp_server_unref(grpc_tcp_server* s) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
if (gpr_unref(&s->refs)) { |
||||
/* Complete shutdown_starting work before destroying. */ |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting); |
||||
grpc_core::ExecCtx::Get()->Flush(); |
||||
tcp_server_destroy(s); |
||||
} |
||||
} |
||||
|
||||
static void finish_accept(grpc_tcp_listener* sp, grpc_custom_socket* socket) { |
||||
grpc_tcp_server_acceptor* acceptor = |
||||
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor))); |
||||
grpc_endpoint* ep = nullptr; |
||||
grpc_resolved_address peer_name; |
||||
std::string peer_name_string; |
||||
grpc_error_handle err; |
||||
|
||||
memset(&peer_name, 0, sizeof(grpc_resolved_address)); |
||||
peer_name.len = GRPC_MAX_SOCKADDR_SIZE; |
||||
err = grpc_custom_socket_vtable->getpeername( |
||||
socket, reinterpret_cast<grpc_sockaddr*>(&peer_name.addr), |
||||
reinterpret_cast<int*>(&peer_name.len)); |
||||
if (err == GRPC_ERROR_NONE) { |
||||
peer_name_string = grpc_sockaddr_to_uri(&peer_name); |
||||
} else { |
||||
GRPC_LOG_IF_ERROR("getpeername error", err); |
||||
GRPC_ERROR_UNREF(err); |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "SERVER_CONNECT: %p accepted connection: %s", sp->server, |
||||
peer_name_string.c_str()); |
||||
} |
||||
ep = custom_tcp_endpoint_create(socket, peer_name_string.c_str()); |
||||
acceptor->from_server = sp->server; |
||||
acceptor->port_index = sp->port_index; |
||||
acceptor->fd_index = 0; |
||||
acceptor->external_connection = false; |
||||
sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, nullptr, acceptor); |
||||
} |
||||
|
||||
static void custom_accept_callback(grpc_custom_socket* socket, |
||||
grpc_custom_socket* client, |
||||
grpc_error_handle error); |
||||
|
||||
static void custom_accept_callback(grpc_custom_socket* socket, |
||||
grpc_custom_socket* client, |
||||
grpc_error_handle error) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_tcp_listener* sp = socket->listener; |
||||
if (error != GRPC_ERROR_NONE) { |
||||
if (!sp->closed) { |
||||
gpr_log(GPR_ERROR, "Accept failed: %s", |
||||
grpc_error_std_string(error).c_str()); |
||||
} |
||||
gpr_free(client); |
||||
GRPC_ERROR_UNREF(error); |
||||
return; |
||||
} |
||||
finish_accept(sp, client); |
||||
if (!sp->closed) { |
||||
grpc_custom_socket* new_socket = static_cast<grpc_custom_socket*>( |
||||
gpr_malloc(sizeof(grpc_custom_socket))); |
||||
new_socket->endpoint = nullptr; |
||||
new_socket->listener = nullptr; |
||||
new_socket->connector = nullptr; |
||||
new_socket->refs = 1; |
||||
grpc_custom_socket_vtable->accept(sp->socket, new_socket, |
||||
custom_accept_callback); |
||||
} |
||||
} |
||||
|
||||
static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, |
||||
grpc_custom_socket* socket, |
||||
const grpc_resolved_address* addr, |
||||
unsigned port_index, |
||||
grpc_tcp_listener** listener) { |
||||
grpc_tcp_listener* sp = nullptr; |
||||
int port = -1; |
||||
grpc_error_handle error; |
||||
grpc_resolved_address sockname_temp; |
||||
|
||||
// NOTE(lidiz) The last argument is "flags" which is unused by other
|
||||
// implementations. Python IO managers uses it to specify SO_REUSEPORT.
|
||||
int flags = 0; |
||||
if (s->so_reuseport) { |
||||
flags |= GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT; |
||||
} |
||||
|
||||
error = grpc_custom_socket_vtable->bind( |
||||
socket, reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)), |
||||
addr->len, flags); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
return error; |
||||
} |
||||
|
||||
error = grpc_custom_socket_vtable->listen(socket); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
return error; |
||||
} |
||||
|
||||
sockname_temp.len = GRPC_MAX_SOCKADDR_SIZE; |
||||
error = grpc_custom_socket_vtable->getsockname( |
||||
socket, reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr), |
||||
reinterpret_cast<int*>(&sockname_temp.len)); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
return error; |
||||
} |
||||
|
||||
port = grpc_sockaddr_get_port(&sockname_temp); |
||||
|
||||
GPR_ASSERT(port >= 0); |
||||
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); |
||||
sp = grpc_core::Zalloc<grpc_tcp_listener>(); |
||||
sp->next = nullptr; |
||||
if (s->head == nullptr) { |
||||
s->head = sp; |
||||
} else { |
||||
s->tail->next = sp; |
||||
} |
||||
s->tail = sp; |
||||
sp->server = s; |
||||
sp->socket = socket; |
||||
sp->port = port; |
||||
sp->port_index = port_index; |
||||
sp->closed = false; |
||||
s->open_ports++; |
||||
*listener = sp; |
||||
|
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s, |
||||
const grpc_resolved_address* addr, |
||||
int* port) { |
||||
// This function is mostly copied from tcp_server_windows.c
|
||||
grpc_tcp_listener* sp = nullptr; |
||||
grpc_custom_socket* socket; |
||||
grpc_resolved_address addr6_v4mapped; |
||||
grpc_resolved_address wildcard; |
||||
grpc_resolved_address* allocated_addr = nullptr; |
||||
grpc_resolved_address sockname_temp; |
||||
unsigned port_index = 0; |
||||
grpc_error_handle error = GRPC_ERROR_NONE; |
||||
int family; |
||||
|
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
|
||||
if (s->tail != nullptr) { |
||||
port_index = s->tail->port_index + 1; |
||||
} |
||||
|
||||
/* Check if this is a wildcard port, and if so, try to keep the port the same
|
||||
as some previously created listener. */ |
||||
if (grpc_sockaddr_get_port(addr) == 0) { |
||||
for (sp = s->head; sp; sp = sp->next) { |
||||
socket = sp->socket; |
||||
sockname_temp.len = GRPC_MAX_SOCKADDR_SIZE; |
||||
if (grpc_custom_socket_vtable->getsockname( |
||||
socket, reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr), |
||||
reinterpret_cast<int*>(&sockname_temp.len)) == GRPC_ERROR_NONE) { |
||||
*port = grpc_sockaddr_get_port(&sockname_temp); |
||||
if (*port > 0) { |
||||
allocated_addr = static_cast<grpc_resolved_address*>( |
||||
gpr_malloc(sizeof(grpc_resolved_address))); |
||||
memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); |
||||
grpc_sockaddr_set_port(allocated_addr, *port); |
||||
addr = allocated_addr; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { |
||||
addr = &addr6_v4mapped; |
||||
} |
||||
|
||||
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ |
||||
if (grpc_sockaddr_is_wildcard(addr, port)) { |
||||
grpc_sockaddr_make_wildcard6(*port, &wildcard); |
||||
|
||||
addr = &wildcard; |
||||
} |
||||
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "SERVER %p add_port %s error=%s", s, |
||||
grpc_sockaddr_to_string(addr, false).c_str(), |
||||
grpc_error_std_string(error).c_str()); |
||||
} |
||||
|
||||
family = grpc_sockaddr_get_family(addr); |
||||
socket = |
||||
static_cast<grpc_custom_socket*>(gpr_malloc(sizeof(grpc_custom_socket))); |
||||
socket->refs = 1; |
||||
socket->endpoint = nullptr; |
||||
socket->listener = nullptr; |
||||
socket->connector = nullptr; |
||||
error = grpc_custom_socket_vtable->init(socket, family); |
||||
|
||||
if (error == GRPC_ERROR_NONE) { |
||||
error = add_socket_to_server(s, socket, addr, port_index, &sp); |
||||
} |
||||
gpr_free(allocated_addr); |
||||
|
||||
if (error != GRPC_ERROR_NONE) { |
||||
grpc_error_handle error_out = |
||||
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
||||
"Failed to add port to server", &error, 1); |
||||
GRPC_ERROR_UNREF(error); |
||||
error = error_out; |
||||
*port = -1; |
||||
} else { |
||||
GPR_ASSERT(sp != nullptr); |
||||
*port = sp->port; |
||||
} |
||||
socket->listener = sp; |
||||
return error; |
||||
} |
||||
|
||||
static void tcp_server_start(grpc_tcp_server* server, |
||||
const std::vector<grpc_pollset*>* /*pollsets*/, |
||||
grpc_tcp_server_cb on_accept_cb, void* cb_arg) { |
||||
grpc_tcp_listener* sp; |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { |
||||
gpr_log(GPR_INFO, "SERVER_START %p", server); |
||||
} |
||||
GPR_ASSERT(on_accept_cb); |
||||
GPR_ASSERT(!server->on_accept_cb); |
||||
server->on_accept_cb = on_accept_cb; |
||||
server->on_accept_cb_arg = cb_arg; |
||||
for (sp = server->head; sp; sp = sp->next) { |
||||
grpc_custom_socket* new_socket = static_cast<grpc_custom_socket*>( |
||||
gpr_malloc(sizeof(grpc_custom_socket))); |
||||
new_socket->endpoint = nullptr; |
||||
new_socket->listener = nullptr; |
||||
new_socket->connector = nullptr; |
||||
new_socket->refs = 1; |
||||
grpc_custom_socket_vtable->accept(sp->socket, new_socket, |
||||
custom_accept_callback); |
||||
} |
||||
} |
||||
|
||||
static unsigned tcp_server_port_fd_count(grpc_tcp_server* /*s*/, |
||||
unsigned /*port_index*/) { |
||||
return 0; |
||||
} |
||||
|
||||
static int tcp_server_port_fd(grpc_tcp_server* /*s*/, unsigned /*port_index*/, |
||||
unsigned /*fd_index*/) { |
||||
return -1; |
||||
} |
||||
|
||||
static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { |
||||
for (grpc_tcp_listener* sp = s->head; sp; sp = sp->next) { |
||||
if (!sp->closed) { |
||||
sp->closed = true; |
||||
grpc_custom_socket_vtable->close(sp->socket, custom_close_callback); |
||||
} |
||||
} |
||||
} |
||||
|
||||
static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler( |
||||
grpc_tcp_server* /*s*/) { |
||||
return nullptr; |
||||
} |
||||
|
||||
grpc_tcp_server_vtable custom_tcp_server_vtable = { |
||||
tcp_server_create, tcp_server_start, |
||||
tcp_server_add_port, tcp_server_create_fd_handler, |
||||
tcp_server_port_fd_count, tcp_server_port_fd, |
||||
tcp_server_ref, tcp_server_shutdown_starting_add, |
||||
tcp_server_unref, tcp_server_shutdown_listeners}; |
@ -1,96 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/timer_custom.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/iomgr/iomgr_custom.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
static grpc_custom_timer_vtable* custom_timer_impl; |
||||
|
||||
void grpc_custom_timer_callback(grpc_custom_timer* t, |
||||
grpc_error_handle /*error*/) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_timer* timer = t->original; |
||||
GPR_ASSERT(timer->pending); |
||||
timer->pending = false; |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE); |
||||
custom_timer_impl->stop(t); |
||||
gpr_free(t); |
||||
} |
||||
|
||||
static void timer_init(grpc_timer* timer, grpc_millis deadline, |
||||
grpc_closure* closure) { |
||||
uint64_t timeout; |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
grpc_millis now = grpc_core::ExecCtx::Get()->Now(); |
||||
if (deadline <= grpc_core::ExecCtx::Get()->Now()) { |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); |
||||
timer->pending = false; |
||||
return; |
||||
} else { |
||||
timeout = deadline - now; |
||||
} |
||||
timer->pending = true; |
||||
timer->closure = closure; |
||||
grpc_custom_timer* timer_wrapper = |
||||
static_cast<grpc_custom_timer*>(gpr_malloc(sizeof(grpc_custom_timer))); |
||||
timer_wrapper->timeout_ms = timeout; |
||||
timer->custom_timer = timer_wrapper; |
||||
timer_wrapper->original = timer; |
||||
custom_timer_impl->start(timer_wrapper); |
||||
} |
||||
|
||||
static void timer_cancel(grpc_timer* timer) { |
||||
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); |
||||
grpc_custom_timer* tw = static_cast<grpc_custom_timer*>(timer->custom_timer); |
||||
if (timer->pending) { |
||||
timer->pending = false; |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, |
||||
GRPC_ERROR_CANCELLED); |
||||
custom_timer_impl->stop(tw); |
||||
gpr_free(tw); |
||||
} |
||||
} |
||||
|
||||
static grpc_timer_check_result timer_check(grpc_millis* /*next*/) { |
||||
return GRPC_TIMERS_NOT_CHECKED; |
||||
} |
||||
|
||||
static void timer_list_init() {} |
||||
static void timer_list_shutdown() {} |
||||
|
||||
static void timer_consume_kick(void) {} |
||||
|
||||
static grpc_timer_vtable custom_timer_vtable = { |
||||
timer_init, timer_cancel, timer_check, |
||||
timer_list_init, timer_list_shutdown, timer_consume_kick}; |
||||
|
||||
void grpc_custom_timer_init(grpc_custom_timer_vtable* impl) { |
||||
custom_timer_impl = impl; |
||||
grpc_set_timer_impl(&custom_timer_vtable); |
||||
} |
@ -1,43 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H |
||||
#define GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
typedef struct grpc_custom_timer { |
||||
// Implementation defined
|
||||
void* timer; |
||||
uint64_t timeout_ms; |
||||
|
||||
grpc_timer* original; |
||||
} grpc_custom_timer; |
||||
|
||||
typedef struct grpc_custom_timer_vtable { |
||||
void (*start)(grpc_custom_timer* t); |
||||
void (*stop)(grpc_custom_timer* t); |
||||
} grpc_custom_timer_vtable; |
||||
|
||||
void grpc_custom_timer_init(grpc_custom_timer_vtable* impl); |
||||
|
||||
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error_handle error); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H */ |
Loading…
Reference in new issue