diff --git a/examples/BUILD b/examples/BUILD index 4fee663bd9e..0a1ca94a649 100644 --- a/examples/BUILD +++ b/examples/BUILD @@ -101,7 +101,8 @@ cc_binary( cc_binary( name = "keyvaluestore_client", - srcs = ["cpp/keyvaluestore/client.cc"], + srcs = ["cpp/keyvaluestore/caching_interceptor.h", + "cpp/keyvaluestore/client.cc"], defines = ["BAZEL_BUILD"], deps = [":keyvaluestore", "//:grpc++"], ) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h new file mode 100644 index 00000000000..5a31afe0f01 --- /dev/null +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -0,0 +1,134 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/keyvaluestore.grpc.pb.h" +#else +#include "keyvaluestore.grpc.pb.h" +#endif + +// This is a naive implementation of a cache. A new cache is for each call. For +// each new key request, the key is first searched in the map and if found, the +// interceptor fills in the return value without making a request to the server. +// Only if the key is not found in the cache do we make a request. +class CachingInterceptor : public grpc::experimental::Interceptor { + public: + CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} + + void Intercept( + ::grpc::experimental::InterceptorBatchMethods* methods) override { + bool hijack = false; + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints:: + PRE_SEND_INITIAL_METADATA)) { + // Hijack all calls + hijack = true; + // Create a stream on which this interceptor can make requests + stub_ = keyvaluestore::KeyValueStore::NewStub( + methods->GetInterceptedChannel()); + stream_ = stub_->GetValues(&context_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { + // We know that clients perform a Read and a Write in a loop, so we don't + // need to maintain a list of the responses. + std::string requested_key; + const keyvaluestore::Request* req_msg = + static_cast(methods->GetSendMessage()); + if (req_msg != nullptr) { + requested_key = req_msg->key(); + } else { + // The non-serialized form would not be available in certain scenarios, + // so add a fallback + keyvaluestore::Request req_msg; + auto* buffer = methods->GetSerializedSendMessage(); + auto copied_buffer = *buffer; + GPR_ASSERT( + grpc::SerializationTraits::Deserialize( + &copied_buffer, &req_msg) + .ok()); + requested_key = req_msg.key(); + } + + // Check if the key is present in the map + auto search = cached_map_.find(requested_key); + if (search != cached_map_.end()) { + std::cout << "Key " << requested_key << "found in map"; + response_ = search->second; + } else { + std::cout << "Key " << requested_key << "not found in cache"; + // Key was not found in the cache, so make a request + keyvaluestore::Request req; + req.set_key(requested_key); + stream_->Write(req); + keyvaluestore::Response resp; + stream_->Read(&resp); + response_ = resp.value(); + // Insert the pair in the cache for future requests + cached_map_.insert({requested_key, response_}); + } + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { + stream_->WritesDone(); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) { + keyvaluestore::Response* resp = + static_cast(methods->GetRecvMessage()); + resp->set_value(response_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { + auto* status = methods->GetRecvStatus(); + *status = grpc::Status::OK; + } + // One of Hijack or Proceed always needs to be called to make progress. + if (hijack) { + // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in + // the hook points + methods->Hijack(); + } else { + // Proceed is an indicator that the interceptor is done intercepting the + // batch. + methods->Proceed(); + } + } + + private: + grpc::ClientContext context_; + std::unique_ptr stub_; + std::unique_ptr< + grpc::ClientReaderWriter> + stream_; + std::map cached_map_; + std::string response_; +}; + +class CachingInterceptorFactory + : public grpc::experimental::ClientInterceptorFactoryInterface { + public: + grpc::experimental::Interceptor* CreateClientInterceptor( + grpc::experimental::ClientRpcInfo* info) override { + return new CachingInterceptor(info); + } +}; diff --git a/examples/cpp/keyvaluestore/client.cc b/examples/cpp/keyvaluestore/client.cc index 17e407c273b..57c451cadf3 100644 --- a/examples/cpp/keyvaluestore/client.cc +++ b/examples/cpp/keyvaluestore/client.cc @@ -23,6 +23,8 @@ #include +#include "caching_interceptor.h" + #ifdef BAZEL_BUILD #include "examples/protos/keyvaluestore.grpc.pb.h" #else @@ -77,9 +79,20 @@ int main(int argc, char** argv) { // are created. This channel models a connection to an endpoint (in this case, // localhost at port 50051). We indicate that the channel isn't authenticated // (use of InsecureChannelCredentials()). - KeyValueStoreClient client(grpc::CreateChannel( - "localhost:50051", grpc::InsecureChannelCredentials())); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; + // In this example, we are using a cache which has been added in as an + // interceptor. + grpc::ChannelArguments args; + std::vector< + std::unique_ptr> + interceptor_creators; + interceptor_creators.push_back(std::unique_ptr( + new CachingInterceptorFactory())); + auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( + "localhost:50051", grpc::InsecureChannelCredentials(), args, + std::move(interceptor_creators)); + KeyValueStoreClient client(channel); + std::vector keys = {"key1", "key2", "key3", "key4", + "key5", "key1", "key2", "key4"}; client.GetValues(keys); return 0; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7f4627fa773..fe88d4818e4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t, get better latency overall if we switch writing work elsewhere and continue with application work above */ if (!t->is_first_write_in_batch) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } /* equivalently, if it's a partial write, we *know* we're going to be taking a thread jump to write it because of the above, may as well do so immediately */ if (partial_write) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } switch (t->opt_target) { case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: /* executor gives us the largest probability of being able to batch a * write with others on this transport */ - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: return grpc_schedule_on_exec_ctx; } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 402f8904eae..4fc4a9dccf4 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &lock->offload, offload, lock, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); return lock; } @@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() { // 3. the DEFAULT executor is threaded // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded() && + grpc_core::Executor::IsThreadedDefault() && !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 45d96b80eb4..2703e1a0b77 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -45,20 +45,70 @@ gpr_log(GPR_INFO, "EXECUTOR " str); \ } -grpc_core::TraceFlag executor_trace(false, "executor"); +namespace grpc_core { +namespace { GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { +Executor* executors[static_cast(ExecutorType::NUM_EXECUTORS)]; + +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, true /* is_short */); +} + +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, false /* is_short */); +} + +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, true /* is_short */); +} + +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, false /* is_short */); +} + +const grpc_closure_scheduler_vtable + vtables_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&default_enqueue_short, &default_enqueue_short, + "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, + {{&resolver_enqueue_short, &resolver_enqueue_short, + "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, + "res-ex-long"}}}; + +grpc_closure_scheduler + schedulers_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::LONG)]}}, + {{&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::LONG)]}}}; + +} // namespace + +TraceFlag executor_trace(false, "executor"); + +Executor::Executor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; gpr_atm_rel_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } -void GrpcExecutor::Init() { SetThreading(true); } +void Executor::Init() { SetThreading(true); } -size_t GrpcExecutor::RunClosures(const char* executor_name, - grpc_closure_list list) { +size_t Executor::RunClosures(const char* executor_name, + grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -82,11 +132,11 @@ size_t GrpcExecutor::RunClosures(const char* executor_name, return n; } -bool GrpcExecutor::IsThreaded() const { +bool Executor::IsThreaded() const { return gpr_atm_acq_load(&num_threads_) > 0; } -void GrpcExecutor::SetThreading(bool threading) { +void Executor::SetThreading(bool threading) { gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); @@ -112,7 +162,7 @@ void GrpcExecutor::SetThreading(bool threading) { } thd_state_[0].thd = - grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); + grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); } else { // !threading if (curr_num_threads == 0) { @@ -153,9 +203,9 @@ void GrpcExecutor::SetThreading(bool threading) { EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); } -void GrpcExecutor::Shutdown() { SetThreading(false); } +void Executor::Shutdown() { SetThreading(false); } -void GrpcExecutor::ThreadMain(void* arg) { +void Executor::ThreadMain(void* arg) { ThreadState* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, reinterpret_cast(ts)); @@ -192,8 +242,8 @@ void GrpcExecutor::ThreadMain(void* arg) { } } -void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, - bool is_short) { +void Executor::Enqueue(grpc_closure* closure, grpc_error* error, + bool is_short) { bool retry_push; if (is_short) { GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); @@ -304,7 +354,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); thd_state_[cur_thread_count].thd = grpc_core::Thread( - name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); + name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]); thd_state_[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&adding_thread_lock_); @@ -316,85 +366,52 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; - -void default_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void default_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -static const grpc_closure_scheduler_vtable - vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, - {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, - {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, - {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}}; - -static grpc_closure_scheduler - schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}}, - {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}}; - -// grpc_executor_init() and grpc_executor_shutdown() functions are called in the +// Executor::InitAll() and Executor::ShutdownAll() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe -void grpc_executor_init() { - EXECUTOR_TRACE0("grpc_executor_init() enter"); +void Executor::InitAll() { + EXECUTOR_TRACE0("Executor::InitAll() enter"); - // Return if grpc_executor_init() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); + // Return if Executor::InitAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] != nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] != + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR] = - grpc_core::New("default-executor"); - executors[GRPC_RESOLVER_EXECUTOR] = - grpc_core::New("resolver-executor"); + executors[static_cast(ExecutorType::DEFAULT)] = + grpc_core::New("default-executor"); + executors[static_cast(ExecutorType::RESOLVER)] = + grpc_core::New("resolver-executor"); - executors[GRPC_DEFAULT_EXECUTOR]->Init(); - executors[GRPC_RESOLVER_EXECUTOR]->Init(); + executors[static_cast(ExecutorType::DEFAULT)]->Init(); + executors[static_cast(ExecutorType::RESOLVER)]->Init(); - EXECUTOR_TRACE0("grpc_executor_init() done"); + EXECUTOR_TRACE0("Executor::InitAll() done"); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type) { - return &schedulers_[executor_type][job_type]; +grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type, + ExecutorJobType job_type) { + return &schedulers_[static_cast(executor_type)] + [static_cast(job_type)]; } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); +grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) { + return Executor::Scheduler(ExecutorType::DEFAULT, job_type); } -void grpc_executor_shutdown() { - EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); +void Executor::ShutdownAll() { + EXECUTOR_TRACE0("Executor::ShutdownAll() enter"); - // Return if grpc_executor_shutdown() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); + // Return if Executor:SshutdownAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] == nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] == + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); - executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); + executors[static_cast(ExecutorType::DEFAULT)]->Shutdown(); + executors[static_cast(ExecutorType::RESOLVER)]->Shutdown(); // Delete the executor objects. // @@ -408,26 +425,36 @@ void grpc_executor_shutdown() { // By ensuring that all executors are shutdown first, we are also ensuring // that no thread is active across all executors. - grpc_core::Delete(executors[GRPC_DEFAULT_EXECUTOR]); - grpc_core::Delete(executors[GRPC_RESOLVER_EXECUTOR]); - executors[GRPC_DEFAULT_EXECUTOR] = nullptr; - executors[GRPC_RESOLVER_EXECUTOR] = nullptr; + grpc_core::Delete( + executors[static_cast(ExecutorType::DEFAULT)]); + grpc_core::Delete( + executors[static_cast(ExecutorType::RESOLVER)]); + executors[static_cast(ExecutorType::DEFAULT)] = nullptr; + executors[static_cast(ExecutorType::RESOLVER)] = nullptr; - EXECUTOR_TRACE0("grpc_executor_shutdown() done"); + EXECUTOR_TRACE0("Executor::ShutdownAll() done"); } -bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { - GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); - return executors[executor_type]->IsThreaded(); +bool Executor::IsThreaded(ExecutorType executor_type) { + GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); + return executors[static_cast(executor_type)]->IsThreaded(); } -bool grpc_executor_is_threaded() { - return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); +bool Executor::IsThreadedDefault() { + return Executor::IsThreaded(ExecutorType::DEFAULT); } -void grpc_executor_set_threading(bool enable) { - EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); - for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { +void Executor::SetThreadingAll(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable); + for (size_t i = 0; i < static_cast(ExecutorType::NUM_EXECUTORS); + i++) { executors[i]->SetThreading(enable); } } + +void Executor::SetThreadingDefault(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable); + executors[static_cast(ExecutorType::DEFAULT)]->SetThreading(enable); +} + +} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 8829138c5fa..9e472279b7b 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -25,7 +25,9 @@ #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/closure.h" -typedef struct { +namespace grpc_core { + +struct ThreadState { gpr_mu mu; size_t id; // For debugging purposes const char* name; // Thread state name @@ -35,17 +37,24 @@ typedef struct { bool shutdown; bool queued_long_job; grpc_core::Thread thd; -} ThreadState; +}; -typedef enum { - GRPC_EXECUTOR_SHORT = 0, - GRPC_EXECUTOR_LONG, - GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this -} GrpcExecutorJobType; +enum class ExecutorType { + DEFAULT = 0, + RESOLVER, + + NUM_EXECUTORS // Add new values above this +}; -class GrpcExecutor { +enum class ExecutorJobType { + SHORT = 0, + LONG, + NUM_JOB_TYPES // Add new values above this +}; + +class Executor { public: - GrpcExecutor(const char* executor_name); + Executor(const char* executor_name); void Init(); @@ -62,55 +71,51 @@ class GrpcExecutor { * a short job (i.e expected to not block and complete quickly) */ void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); - private: - static size_t RunClosures(const char* executor_name, grpc_closure_list list); - static void ThreadMain(void* arg); + // TODO(sreek): Currently we have two executors (available globally): The + // default executor and the resolver executor. + // + // Some of the functions below operate on the DEFAULT executor only while some + // operate of ALL the executors. This is a bit confusing and should be cleaned + // up in future (where we make all the following functions take ExecutorType + // and/or JobType) - const char* name_; - ThreadState* thd_state_; - size_t max_threads_; - gpr_atm num_threads_; - gpr_spinlock adding_thread_lock_; -}; - -// == Global executor functions == + // Initialize ALL the executors + static void InitAll(); -typedef enum { - GRPC_DEFAULT_EXECUTOR = 0, - GRPC_RESOLVER_EXECUTOR, + // Shutdown ALL the executors + static void ShutdownAll(); - GRPC_NUM_EXECUTORS // Add new values above this -} GrpcExecutorType; + // Set the threading mode for ALL the executors + static void SetThreadingAll(bool enable); -// TODO(sreek): Currently we have two executors (available globally): The -// default executor and the resolver executor. -// -// Some of the functions below operate on the DEFAULT executor only while some -// operate of ALL the executors. This is a bit confusing and should be cleaned -// up in future (where we make all the following functions take executor_type -// and/or job_type) + // Set the threading mode for ALL the executors + static void SetThreadingDefault(bool enable); -// Initialize ALL the executors -void grpc_executor_init(); + // Get the DEFAULT executor scheduler for the given job_type + static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type); -// Shutdown ALL the executors -void grpc_executor_shutdown(); + // Get the executor scheduler for a given executor_type and a job_type + static grpc_closure_scheduler* Scheduler(ExecutorType executor_type, + ExecutorJobType job_type); -// Set the threading mode for ALL the executors -void grpc_executor_set_threading(bool enable); + // Return if a given executor is running in threaded mode (i.e if + // SetThreading(true) was called previously on that executor) + static bool IsThreaded(ExecutorType executor_type); -// Get the DEFAULT executor scheduler for the given job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); + // Return if the DEFAULT executor is threaded + static bool IsThreadedDefault(); -// Get the executor scheduler for a given executor_type and a job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type); + private: + static size_t RunClosures(const char* executor_name, grpc_closure_list list); + static void ThreadMain(void* arg); -// Return if a given executor is running in threaded mode (i.e if -// grpc_executor_set_threading(true) was called previously on that executor) -bool grpc_executor_is_threaded(GrpcExecutorType executor_type); + const char* name_; + ThreadState* thd_state_; + size_t max_threads_; + gpr_atm num_threads_; + gpr_spinlock adding_thread_lock_; +}; -// Return if the DEFAULT executor is threaded -bool grpc_executor_is_threaded(); +} // namespace grpc_core #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index 05ecd2a49b7..2eebe3f26f6 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -71,7 +71,7 @@ void grpc_prefork() { return; } grpc_timer_manager_set_threading(false); - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_core::ExecCtx::Get()->Flush(); grpc_core::Fork::AwaitThreads(); skipped_handler = false; @@ -82,7 +82,7 @@ void grpc_postfork_parent() { grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } @@ -96,7 +96,7 @@ void grpc_postfork_child() { reset_polling_engine(); } grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index dcc69332e0b..33153d9cc3b 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -52,7 +52,7 @@ void grpc_iomgr_init() { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_executor_init(); + grpc_core::Executor::InitAll(); grpc_timer_list_init(); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char*)"root"; @@ -88,7 +88,7 @@ void grpc_iomgr_shutdown() { { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); gpr_mu_lock(&g_mu); g_shutdown = 1; diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index e1cd8f73104..3d07f1abe9a 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -34,7 +34,7 @@ gpr_thd_id g_init_thread; static void iomgr_platform_init(void) { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); g_init_thread = gpr_thd_currentid(); grpc_pollset_global_init(); } diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 2a03244ff7d..e6dd8f1ceab 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -150,7 +150,7 @@ typedef struct { void* arg; } request; -/* Callback to be passed to grpc_executor to asynch-ify +/* Callback to be passed to grpc Executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(void* rp, grpc_error* error) { request* r = static_cast(rp); @@ -168,7 +168,8 @@ static void posix_resolve_address(const char* name, const char* default_port, request* r = static_cast(gpr_malloc(sizeof(request))); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 3e977dca2da..64351c38a8f 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -153,7 +153,8 @@ static void windows_resolve_address(const char* name, const char* default_port, request* r = (request*)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index d0642c015ff..92f163b58e9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -227,10 +227,10 @@ static void cover_self(grpc_tcp* tcp) { } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::LONG)), + GRPC_ERROR_NONE); } else { while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == nullptr) { diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 3dd7cab855c..5f8865ca57f 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -481,8 +481,9 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_read_closure_, do_read, do_read_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can @@ -542,8 +543,9 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_write_closure_, do_write, do_write_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 67cf5d89bff..60f506ef5e2 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -165,7 +165,7 @@ void grpc_shutdown(void) { { grpc_timer_manager_set_threading( false); // shutdown timer_manager thread - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != nullptr) { g_all_of_the_plugins[i].destroy(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 7ae6e51a5fb..cdfd3336437 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1134,8 +1134,9 @@ void grpc_server_start(grpc_server* server) { server_ref(server); server->starting = true; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_CLOSURE_CREATE( + start_listeners, server, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_NONE); } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index b32f9c6ec1a..43add28ce03 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -73,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) { Throw this over to the executor (on a core-owned thread) and process it there. */ refcount->destroy.scheduler = - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index c9ea3e5f83b..efb59c71a8c 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -57,6 +57,7 @@ ClientContext::ClientContext() deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), census_context_(nullptr), propagate_from_call_(nullptr), + compression_algorithm_(GRPC_COMPRESS_NONE), initial_metadata_corked_(false) { g_client_callbacks->DefaultConstructor(this); } diff --git a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs index 5418417d7ed..e6297e61226 100644 --- a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs +++ b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs @@ -19,7 +19,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using Grpc.Core; namespace Grpc.Core.Testing { @@ -36,23 +35,73 @@ namespace Grpc.Core.Testing string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken, Func writeHeadersFunc, Func writeOptionsGetter, Action writeOptionsSetter) { - return new ServerCallContext(null, method, host, deadline, requestHeaders, cancellationToken, - writeHeadersFunc, new WriteOptionsHolder(writeOptionsGetter, writeOptionsSetter), - () => peer, () => authContext, () => contextPropagationToken); + return new TestingServerCallContext(method, host, deadline, requestHeaders, cancellationToken, peer, + authContext, contextPropagationToken, writeHeadersFunc, writeOptionsGetter, writeOptionsSetter); } - private class WriteOptionsHolder : IHasWriteOptions + private class TestingServerCallContext : ServerCallContext { - Func writeOptionsGetter; - Action writeOptionsSetter; + private readonly string method; + private readonly string host; + private readonly DateTime deadline; + private readonly Metadata requestHeaders; + private readonly CancellationToken cancellationToken; + private readonly Metadata responseTrailers = new Metadata(); + private Status status; + private readonly string peer; + private readonly AuthContext authContext; + private readonly ContextPropagationToken contextPropagationToken; + private readonly Func writeHeadersFunc; + private readonly Func writeOptionsGetter; + private readonly Action writeOptionsSetter; - public WriteOptionsHolder(Func writeOptionsGetter, Action writeOptionsSetter) + public TestingServerCallContext(string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken, + Func writeHeadersFunc, Func writeOptionsGetter, Action writeOptionsSetter) { + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestHeaders = requestHeaders; + this.cancellationToken = cancellationToken; + this.responseTrailers = new Metadata(); + this.status = Status.DefaultSuccess; + this.peer = peer; + this.authContext = authContext; + this.contextPropagationToken = contextPropagationToken; + this.writeHeadersFunc = writeHeadersFunc; this.writeOptionsGetter = writeOptionsGetter; this.writeOptionsSetter = writeOptionsSetter; } - public WriteOptions WriteOptions { get => writeOptionsGetter(); set => writeOptionsSetter(value); } + protected override string MethodCore => method; + + protected override string HostCore => host; + + protected override string PeerCore => peer; + + protected override DateTime DeadlineCore => deadline; + + protected override Metadata RequestHeadersCore => requestHeaders; + + protected override CancellationToken CancellationTokenCore => cancellationToken; + + protected override Metadata ResponseTrailersCore => responseTrailers; + + protected override Status StatusCore { get => status; set => status = value; } + protected override WriteOptions WriteOptionsCore { get => writeOptionsGetter(); set => writeOptionsSetter(value); } + + protected override AuthContext AuthContextCore => authContext; + + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options) + { + return contextPropagationToken; + } + + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } } } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs new file mode 100644 index 00000000000..8220e599f92 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs @@ -0,0 +1,110 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; + +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + /// + /// Default implementation of ServerCallContext. + /// + internal class DefaultServerCallContext : ServerCallContext + { + private readonly CallSafeHandle callHandle; + private readonly string method; + private readonly string host; + private readonly DateTime deadline; + private readonly Metadata requestHeaders; + private readonly CancellationToken cancellationToken; + private readonly Metadata responseTrailers; + private Status status; + private readonly IServerResponseStream serverResponseStream; + private readonly Lazy authContext; + + /// + /// Creates a new instance of ServerCallContext. + /// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally. + /// To provide state, this ServerCallContext instance and extraData will be passed to the member implementations. + /// + internal DefaultServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, + Metadata requestHeaders, CancellationToken cancellationToken, IServerResponseStream serverResponseStream) + { + this.callHandle = callHandle; + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestHeaders = requestHeaders; + this.cancellationToken = cancellationToken; + this.responseTrailers = new Metadata(); + this.status = Status.DefaultSuccess; + this.serverResponseStream = serverResponseStream; + // TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object + this.authContext = new Lazy(GetAuthContextEager); + } + + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options) + { + return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); + } + + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) + { + return serverResponseStream.WriteResponseHeadersAsync(responseHeaders); + } + + protected override string MethodCore => method; + + protected override string HostCore => host; + + protected override string PeerCore => callHandle.GetPeer(); + + protected override DateTime DeadlineCore => deadline; + + protected override Metadata RequestHeadersCore => requestHeaders; + + protected override CancellationToken CancellationTokenCore => cancellationToken; + + protected override Metadata ResponseTrailersCore => responseTrailers; + + protected override Status StatusCore + { + get => status; + set => status = value; + } + + protected override WriteOptions WriteOptionsCore + { + get => serverResponseStream.WriteOptions; + set => serverResponseStream.WriteOptions = value; + } + + protected override AuthContext AuthContextCore => authContext.Value; + + private AuthContext GetAuthContextEager() + { + using (var authContextNative = callHandle.GetAuthContext()) + { + return authContextNative.ToAuthContext(); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs new file mode 100644 index 00000000000..874aae703a2 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs @@ -0,0 +1,38 @@ +#region Copyright notice and license +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +using System; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// + /// Exposes non-generic members of ServerReponseStream. + /// + internal interface IServerResponseStream + { + /// + /// Asynchronously sends response headers for the current call to the client. See ServerCallContext.WriteResponseHeadersAsync for exact semantics. + /// + Task WriteResponseHeadersAsync(Metadata responseHeaders); + + /// + /// Gets or sets the write options. + /// + WriteOptions WriteOptions { get; set; } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index ec732e8c7f4..c3859f1de27 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -71,7 +71,7 @@ namespace Grpc.Core.Internal var response = await handler(request, context).ConfigureAwait(false); status = context.Status; responseWithFlags = new AsyncCallServer.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); - } + } catch (Exception e) { if (!(e is RpcException)) @@ -345,14 +345,10 @@ namespace Grpc.Core.Internal return writeOptions != null ? writeOptions.Flags : default(WriteFlags); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, ServerResponseStream serverResponseStream, CancellationToken cancellationToken) - where TRequest : class - where TResponse : class + public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken) { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); - - return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); + return new DefaultServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 352b98829c7..079849e4c61 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -23,7 +23,7 @@ namespace Grpc.Core.Internal /// /// Writes responses asynchronously to an underlying AsyncCallServer object. /// - internal class ServerResponseStream : IServerStreamWriter, IHasWriteOptions + internal class ServerResponseStream : IServerStreamWriter, IServerResponseStream where TRequest : class where TResponse : class { diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 74a7deabea0..90b6e9419f0 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -20,54 +20,18 @@ using System; using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; - namespace Grpc.Core { /// /// Context for a server-side call. /// - public class ServerCallContext + public abstract class ServerCallContext { - private readonly CallSafeHandle callHandle; - private readonly string method; - private readonly string host; - private readonly DateTime deadline; - private readonly Metadata requestHeaders; - private readonly CancellationToken cancellationToken; - private readonly Metadata responseTrailers = new Metadata(); - private readonly Func writeHeadersFunc; - private readonly IHasWriteOptions writeOptionsHolder; - private readonly Lazy authContext; - private readonly Func testingOnlyPeerGetter; - private readonly Func testingOnlyAuthContextGetter; - private readonly Func testingOnlyContextPropagationTokenFactory; - - private Status status = Status.DefaultSuccess; - - internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, - Func writeHeadersFunc, IHasWriteOptions writeOptionsHolder) - : this(callHandle, method, host, deadline, requestHeaders, cancellationToken, writeHeadersFunc, writeOptionsHolder, null, null, null) - { - } - - // Additional constructor params should be used for testing only - internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, - Func writeHeadersFunc, IHasWriteOptions writeOptionsHolder, - Func testingOnlyPeerGetter, Func testingOnlyAuthContextGetter, Func testingOnlyContextPropagationTokenFactory) + /// + /// Creates a new instance of ServerCallContext. + /// + protected ServerCallContext() { - this.callHandle = callHandle; - this.method = method; - this.host = host; - this.deadline = deadline; - this.requestHeaders = requestHeaders; - this.cancellationToken = cancellationToken; - this.writeHeadersFunc = writeHeadersFunc; - this.writeOptionsHolder = writeOptionsHolder; - this.authContext = new Lazy(GetAuthContextEager); - this.testingOnlyPeerGetter = testingOnlyPeerGetter; - this.testingOnlyAuthContextGetter = testingOnlyAuthContextGetter; - this.testingOnlyContextPropagationTokenFactory = testingOnlyContextPropagationTokenFactory; } /// @@ -79,7 +43,7 @@ namespace Grpc.Core /// The task that finished once response headers have been written. public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - return writeHeadersFunc(responseHeaders); + return WriteResponseHeadersAsyncCore(responseHeaders); } /// @@ -87,94 +51,41 @@ namespace Grpc.Core /// public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) { - if (testingOnlyContextPropagationTokenFactory != null) - { - return testingOnlyContextPropagationTokenFactory(); - } - return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); + return CreatePropagationTokenCore(options); } - + /// Name of method called in this RPC. - public string Method - { - get - { - return this.method; - } - } + public string Method => MethodCore; /// Name of host called in this RPC. - public string Host - { - get - { - return this.host; - } - } + public string Host => HostCore; /// Address of the remote endpoint in URI format. - public string Peer - { - get - { - if (testingOnlyPeerGetter != null) - { - return testingOnlyPeerGetter(); - } - // Getting the peer lazily is fine as the native call is guaranteed - // not to be disposed before user-supplied server side handler returns. - // Most users won't need to read this field anyway. - return this.callHandle.GetPeer(); - } - } + public string Peer => PeerCore; /// Deadline for this RPC. - public DateTime Deadline - { - get - { - return this.deadline; - } - } + public DateTime Deadline => DeadlineCore; /// Initial metadata sent by client. - public Metadata RequestHeaders - { - get - { - return this.requestHeaders; - } - } + public Metadata RequestHeaders => RequestHeadersCore; /// Cancellation token signals when call is cancelled. - public CancellationToken CancellationToken - { - get - { - return this.cancellationToken; - } - } + public CancellationToken CancellationToken => CancellationTokenCore; /// Trailers to send back to client after RPC finishes. - public Metadata ResponseTrailers - { - get - { - return this.responseTrailers; - } - } + public Metadata ResponseTrailers => ResponseTrailersCore; /// Status to send back to client after RPC finishes. public Status Status { get { - return this.status; + return StatusCore; } set { - status = value; + StatusCore = value; } } @@ -187,12 +98,12 @@ namespace Grpc.Core { get { - return writeOptionsHolder.WriteOptions; + return WriteOptionsCore; } set { - writeOptionsHolder.WriteOptions = value; + WriteOptionsCore = value; } } @@ -200,35 +111,31 @@ namespace Grpc.Core /// Gets the AuthContext associated with this call. /// Note: Access to AuthContext is an experimental API that can change without any prior notice. /// - public AuthContext AuthContext - { - get - { - if (testingOnlyAuthContextGetter != null) - { - return testingOnlyAuthContextGetter(); - } - return authContext.Value; - } - } - - private AuthContext GetAuthContextEager() - { - using (var authContextNative = callHandle.GetAuthContext()) - { - return authContextNative.ToAuthContext(); - } - } - } - - /// - /// Allows sharing write options between ServerCallContext and other objects. - /// - internal interface IHasWriteOptions - { - /// - /// Gets or sets the write options. - /// - WriteOptions WriteOptions { get; set; } + public AuthContext AuthContext => AuthContextCore; + + /// Provides implementation of a non-virtual public member. + protected abstract Task WriteResponseHeadersAsyncCore(Metadata responseHeaders); + /// Provides implementation of a non-virtual public member. + protected abstract ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options); + /// Provides implementation of a non-virtual public member. + protected abstract string MethodCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract string HostCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract string PeerCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract DateTime DeadlineCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Metadata RequestHeadersCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract CancellationToken CancellationTokenCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Metadata ResponseTrailersCore { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Status StatusCore { get; set; } + /// Provides implementation of a non-virtual public member. + protected abstract WriteOptions WriteOptionsCore { get; set; } + /// Provides implementation of a non-virtual public member. + protected abstract AuthContext AuthContextCore { get; } } } diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 3685969c7fe..062cb9ada86 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -22,7 +22,6 @@ import grpc from grpc import _common from grpc import _grpcio_metadata from grpc._cython import cygrpc -from grpc.framework.foundation import callable_util _LOGGER = logging.getLogger(__name__) @@ -871,9 +870,11 @@ def _deliver(state, initial_connectivity, initial_callbacks): while True: for callback in callbacks: cygrpc.block_if_fork_in_progress(state) - callable_util.call_logging_exceptions( - callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, - connectivity) + try: + callback(connectivity) + except Exception: # pylint: disable=broad-except + _LOGGER.exception( + _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE) with state.lock: callbacks = _deliveries(state) if callbacks: diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index c3ff1fa6bd3..e5db3fc9dcb 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -25,7 +25,6 @@ import grpc from grpc import _common from grpc import _interceptor from grpc._cython import cygrpc -from grpc.framework.foundation import callable_util _LOGGER = logging.getLogger(__name__) @@ -748,8 +747,10 @@ def _process_event_and_continue(state, event): else: rpc_state, callbacks = event.tag(event) for callback in callbacks: - callable_util.call_logging_exceptions(callback, - 'Exception calling callback!') + try: + callback() + except Exception: # pylint: disable=broad-except + _LOGGER.exception('Exception calling callback!') if rpc_state is not None: with state.lock: state.rpc_states.remove(rpc_state) diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index 2938a38b44e..c48aaf60a2f 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -16,12 +16,14 @@ import collections import threading import time +import logging import six import grpc from grpc import _common -from grpc.framework.foundation import callable_util + +_LOGGER = logging.getLogger(__name__) _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 'Exception calling connectivity future "done" callback!') @@ -98,8 +100,10 @@ class _ChannelReadyFuture(grpc.Future): return for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + try: + done_callback(self) + except Exception: # pylint: disable=broad-except + _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) def cancel(self): with self._condition: @@ -113,8 +117,10 @@ class _ChannelReadyFuture(grpc.Future): return False for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + try: + done_callback(self) + except Exception: # pylint: disable=broad-except + _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) return True diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index a0b82904753..57bc8ad768c 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -706,7 +706,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_timer_manager_set_threading(false); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); } grpc_set_resolver_impl(&fuzzer_resolver); grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index e21006bb673..8520fb53755 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -46,7 +46,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("client_fuzzer"); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index d370dc7de85..644f98e37ac 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -43,7 +43,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("server_fuzzer"); diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index 1d9e1ee27e2..0ae0ec888b6 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -290,7 +290,7 @@ int main(int argc, char** argv) { test_invalid_ip_addresses(); test_unparseable_hostports(); } - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); } gpr_cmdline_destroy(cl); diff --git a/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc b/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc index ec1ec1179d3..ff5593e031a 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc @@ -21,15 +21,8 @@ ulimit -c unlimited # Performance PR testing needs GH API key and PR metadata to comment results if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then - set +x sudo apt-get install -y jq export ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref) - - gsutil cp gs://grpc-testing-secrets/github_credentials/oauth_token.txt ~/ - # TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation - export JENKINS_OAUTH_TOKEN=$(cat ~/oauth_token.txt) - export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER - set -x fi sudo pip install tabulate diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc index 5b6b2569393..23619ecbb8b 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc @@ -25,21 +25,16 @@ export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db3 # If this is a PR using RUN_TESTS_FLAGS var, then add flags to filter tests if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then - set +x brew update brew install jq || brew upgrade jq ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref) export RUN_TESTS_FLAGS="$RUN_TESTS_FLAGS --filter_pr_tests --base_branch origin/$ghprbTargetBranch" - - # TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation - export JENKINS_OAUTH_TOKEN=$(cat ${KOKORO_GFILE_DIR}/oauth_token.txt) - export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER - set -x fi set +ex # rvm script is very verbose and exits with errorcode # Advice from https://github.com/Homebrew/homebrew-cask/issues/8629#issuecomment-68641176 brew update && brew upgrade brew-cask && brew cleanup && brew cask cleanup +rvm --debug requirements ruby-2.5.0 source $HOME/.rvm/scripts/rvm set -e # rvm commands are very verbose time rvm install 2.5.0 diff --git a/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg b/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg index 1c4f7b23109..dc35ce81ffd 100644 --- a/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg +++ b/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg @@ -17,7 +17,6 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/macos/grpc_ios_binary_size.sh" timeout_mins: 60 -gfile_resources: "/bigstore/grpc-testing-secrets/github_credentials/oauth_token.txt" before_action { fetch_keystore { keystore_resource { diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index cd542b0f4c5..07f144d8250 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -98,6 +98,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'go': OrderedDict([ @@ -161,6 +162,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'node': OrderedDict([ @@ -201,6 +203,10 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', + ReleaseInfo(patch=[ + 'tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh', + ])), ]), 'php': OrderedDict([ @@ -221,6 +227,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'csharp': OrderedDict([ @@ -245,6 +252,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), } diff --git a/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch b/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch new file mode 100644 index 00000000000..dfa3cfc031a --- /dev/null +++ b/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch @@ -0,0 +1,10 @@ +diff --git a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh +index 67f66090ae..e71ad91499 100755 +--- a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh ++++ b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh +@@ -30,4 +30,4 @@ cd /var/local/git/grpc + rvm --default use ruby-2.5 + + # build Ruby interop client and server +-(cd src/ruby && gem update bundler && bundle && rake compile) ++(cd src/ruby && gem install bundler -v 1.17.3 && bundle && rake compile) diff --git a/tools/run_tests/python_utils/comment_on_pr.py b/tools/run_tests/python_utils/comment_on_pr.py deleted file mode 100644 index 399c996d4db..00000000000 --- a/tools/run_tests/python_utils/comment_on_pr.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2017 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import json -import urllib2 - - -def comment_on_pr(text): - if 'JENKINS_OAUTH_TOKEN' not in os.environ: - print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting' - return - if 'ghprbPullId' not in os.environ: - print 'Missing ghprbPullId env var: not commenting' - return - req = urllib2.Request( - url='https://api.github.com/repos/grpc/grpc/issues/%s/comments' % - os.environ['ghprbPullId'], - data=json.dumps({ - 'body': text - }), - headers={ - 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'], - 'Content-Type': 'application/json', - }) - print urllib2.urlopen(req).read()