Merge remote-tracking branch 'upstream/master' into fix-authorizer

pull/17751/head
Muxi Yan 6 years ago
commit e8eac7c107
  1. 3
      examples/BUILD
  2. 134
      examples/cpp/keyvaluestore/caching_interceptor.h
  3. 19
      examples/cpp/keyvaluestore/client.cc
  4. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 7
      src/core/lib/iomgr/combiner.cc
  6. 201
      src/core/lib/iomgr/executor.cc
  7. 101
      src/core/lib/iomgr/executor.h
  8. 6
      src/core/lib/iomgr/fork_posix.cc
  9. 4
      src/core/lib/iomgr/iomgr.cc
  10. 2
      src/core/lib/iomgr/iomgr_custom.cc
  11. 5
      src/core/lib/iomgr/resolve_address_posix.cc
  12. 3
      src/core/lib/iomgr/resolve_address_windows.cc
  13. 8
      src/core/lib/iomgr/tcp_posix.cc
  14. 10
      src/core/lib/iomgr/udp_server.cc
  15. 2
      src/core/lib/surface/init.cc
  16. 5
      src/core/lib/surface/server.cc
  17. 2
      src/core/lib/transport/transport.cc
  18. 1
      src/cpp/client/client_context.cc
  19. 67
      src/csharp/Grpc.Core.Testing/TestServerCallContext.cs
  20. 110
      src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs
  21. 38
      src/csharp/Grpc.Core/Internal/IServerResponseStream.cs
  22. 10
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  23. 2
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  24. 183
      src/csharp/Grpc.Core/ServerCallContext.cs
  25. 9
      src/python/grpcio/grpc/_channel.py
  26. 7
      src/python/grpcio/grpc/_server.py
  27. 16
      src/python/grpcio/grpc/_utilities.py
  28. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  29. 2
      test/core/end2end/fuzzers/client_fuzzer.cc
  30. 2
      test/core/end2end/fuzzers/server_fuzzer.cc
  31. 2
      test/core/iomgr/resolve_address_test.cc
  32. 7
      tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc
  33. 7
      tools/internal_ci/helper_scripts/prepare_build_macos_rc
  34. 1
      tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg
  35. 8
      tools/interop_matrix/client_matrix.py
  36. 10
      tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch
  37. 37
      tools/run_tests/python_utils/comment_on_pr.py

@ -101,7 +101,8 @@ cc_binary(
cc_binary(
name = "keyvaluestore_client",
srcs = ["cpp/keyvaluestore/client.cc"],
srcs = ["cpp/keyvaluestore/caching_interceptor.h",
"cpp/keyvaluestore/client.cc"],
defines = ["BAZEL_BUILD"],
deps = [":keyvaluestore", "//:grpc++"],
)

@ -0,0 +1,134 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <map>
#include <grpcpp/support/client_interceptor.h>
#ifdef BAZEL_BUILD
#include "examples/protos/keyvaluestore.grpc.pb.h"
#else
#include "keyvaluestore.grpc.pb.h"
#endif
// This is a naive implementation of a cache. A new cache is for each call. For
// each new key request, the key is first searched in the map and if found, the
// interceptor fills in the return value without making a request to the server.
// Only if the key is not found in the cache do we make a request.
class CachingInterceptor : public grpc::experimental::Interceptor {
public:
CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {}
void Intercept(
::grpc::experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
PRE_SEND_INITIAL_METADATA)) {
// Hijack all calls
hijack = true;
// Create a stream on which this interceptor can make requests
stub_ = keyvaluestore::KeyValueStore::NewStub(
methods->GetInterceptedChannel());
stream_ = stub_->GetValues(&context_);
}
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
// We know that clients perform a Read and a Write in a loop, so we don't
// need to maintain a list of the responses.
std::string requested_key;
const keyvaluestore::Request* req_msg =
static_cast<const keyvaluestore::Request*>(methods->GetSendMessage());
if (req_msg != nullptr) {
requested_key = req_msg->key();
} else {
// The non-serialized form would not be available in certain scenarios,
// so add a fallback
keyvaluestore::Request req_msg;
auto* buffer = methods->GetSerializedSendMessage();
auto copied_buffer = *buffer;
GPR_ASSERT(
grpc::SerializationTraits<keyvaluestore::Request>::Deserialize(
&copied_buffer, &req_msg)
.ok());
requested_key = req_msg.key();
}
// Check if the key is present in the map
auto search = cached_map_.find(requested_key);
if (search != cached_map_.end()) {
std::cout << "Key " << requested_key << "found in map";
response_ = search->second;
} else {
std::cout << "Key " << requested_key << "not found in cache";
// Key was not found in the cache, so make a request
keyvaluestore::Request req;
req.set_key(requested_key);
stream_->Write(req);
keyvaluestore::Response resp;
stream_->Read(&resp);
response_ = resp.value();
// Insert the pair in the cache for future requests
cached_map_.insert({requested_key, response_});
}
}
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) {
stream_->WritesDone();
}
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) {
keyvaluestore::Response* resp =
static_cast<keyvaluestore::Response*>(methods->GetRecvMessage());
resp->set_value(response_);
}
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
auto* status = methods->GetRecvStatus();
*status = grpc::Status::OK;
}
// One of Hijack or Proceed always needs to be called to make progress.
if (hijack) {
// Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in
// the hook points
methods->Hijack();
} else {
// Proceed is an indicator that the interceptor is done intercepting the
// batch.
methods->Proceed();
}
}
private:
grpc::ClientContext context_;
std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_;
std::unique_ptr<
grpc::ClientReaderWriter<keyvaluestore::Request, keyvaluestore::Response>>
stream_;
std::map<std::string, std::string> cached_map_;
std::string response_;
};
class CachingInterceptorFactory
: public grpc::experimental::ClientInterceptorFactoryInterface {
public:
grpc::experimental::Interceptor* CreateClientInterceptor(
grpc::experimental::ClientRpcInfo* info) override {
return new CachingInterceptor(info);
}
};

@ -23,6 +23,8 @@
#include <grpcpp/grpcpp.h>
#include "caching_interceptor.h"
#ifdef BAZEL_BUILD
#include "examples/protos/keyvaluestore.grpc.pb.h"
#else
@ -77,9 +79,20 @@ int main(int argc, char** argv) {
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
KeyValueStoreClient client(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
std::vector<std::string> keys = {"key1", "key2", "key3", "key4", "key5"};
// In this example, we are using a cache which has been added in as an
// interceptor.
grpc::ChannelArguments args;
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators;
interceptor_creators.push_back(std::unique_ptr<CachingInterceptorFactory>(
new CachingInterceptorFactory()));
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
"localhost:50051", grpc::InsecureChannelCredentials(), args,
std::move(interceptor_creators));
KeyValueStoreClient client(channel);
std::vector<std::string> keys = {"key1", "key2", "key3", "key4",
"key5", "key1", "key2", "key4"};
client.GetValues(keys);
return 0;

@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
get better latency overall if we switch writing work elsewhere and continue
with application work above */
if (!t->is_first_write_in_batch) {
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
}
/* equivalently, if it's a partial write, we *know* we're going to be taking a
thread jump to write it because of the above, may as well do so
immediately */
if (partial_write) {
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
}
switch (t->opt_target) {
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
/* executor gives us the largest probability of being able to batch a
* write with others on this transport */
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
return grpc_schedule_on_exec_ctx;
}

@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(&lock->offload, offload, lock,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_CLOSURE_INIT(
&lock->offload, offload, lock,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT));
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
return lock;
}
@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() {
// 3. the DEFAULT executor is threaded
// 4. the current thread is not a worker for any background poller
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
grpc_executor_is_threaded() &&
grpc_core::Executor::IsThreadedDefault() &&
!grpc_iomgr_is_any_background_poller_thread()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be

@ -45,20 +45,70 @@
gpr_log(GPR_INFO, "EXECUTOR " str); \
}
grpc_core::TraceFlag executor_trace(false, "executor");
namespace grpc_core {
namespace {
GPR_TLS_DECL(g_this_thread_state);
GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
Executor* executors[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)];
void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
closure, error, true /* is_short */);
}
void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
closure, error, false /* is_short */);
}
void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
closure, error, true /* is_short */);
}
void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
closure, error, false /* is_short */);
}
const grpc_closure_scheduler_vtable
vtables_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&default_enqueue_short, &default_enqueue_short,
"def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short,
"res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long,
"res-ex-long"}}};
grpc_closure_scheduler
schedulers_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::LONG)]}},
{{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::LONG)]}}};
} // namespace
TraceFlag executor_trace(false, "executor");
Executor::Executor(const char* name) : name_(name) {
adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
gpr_atm_rel_store(&num_threads_, 0);
max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
}
void GrpcExecutor::Init() { SetThreading(true); }
void Executor::Init() { SetThreading(true); }
size_t GrpcExecutor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t Executor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t n = 0;
grpc_closure* c = list.head;
@ -82,11 +132,11 @@ size_t GrpcExecutor::RunClosures(const char* executor_name,
return n;
}
bool GrpcExecutor::IsThreaded() const {
bool Executor::IsThreaded() const {
return gpr_atm_acq_load(&num_threads_) > 0;
}
void GrpcExecutor::SetThreading(bool threading) {
void Executor::SetThreading(bool threading) {
gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
@ -112,7 +162,7 @@ void GrpcExecutor::SetThreading(bool threading) {
}
thd_state_[0].thd =
grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]);
thd_state_[0].thd.Start();
} else { // !threading
if (curr_num_threads == 0) {
@ -153,9 +203,9 @@ void GrpcExecutor::SetThreading(bool threading) {
EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
}
void GrpcExecutor::Shutdown() { SetThreading(false); }
void Executor::Shutdown() { SetThreading(false); }
void GrpcExecutor::ThreadMain(void* arg) {
void Executor::ThreadMain(void* arg) {
ThreadState* ts = static_cast<ThreadState*>(arg);
gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
@ -192,8 +242,8 @@ void GrpcExecutor::ThreadMain(void* arg) {
}
}
void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
bool is_short) {
void Executor::Enqueue(grpc_closure* closure, grpc_error* error,
bool is_short) {
bool retry_push;
if (is_short) {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
@ -304,7 +354,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
thd_state_[cur_thread_count].thd = grpc_core::Thread(
name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]);
thd_state_[cur_thread_count].thd.Start();
}
gpr_spinlock_unlock(&adding_thread_lock_);
@ -316,85 +366,52 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
} while (retry_push);
}
static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
static const grpc_closure_scheduler_vtable
vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
static grpc_closure_scheduler
schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
{{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
// Executor::InitAll() and Executor::ShutdownAll() functions are called in the
// the grpc_init() and grpc_shutdown() code paths which are protected by a
// global mutex. So it is okay to assume that these functions are thread-safe
void grpc_executor_init() {
EXECUTOR_TRACE0("grpc_executor_init() enter");
void Executor::InitAll() {
EXECUTOR_TRACE0("Executor::InitAll() enter");
// Return if grpc_executor_init() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr);
// Return if Executor::InitAll() is already called earlier
if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] != nullptr) {
GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] !=
nullptr);
return;
}
executors[GRPC_DEFAULT_EXECUTOR] =
grpc_core::New<GrpcExecutor>("default-executor");
executors[GRPC_RESOLVER_EXECUTOR] =
grpc_core::New<GrpcExecutor>("resolver-executor");
executors[static_cast<size_t>(ExecutorType::DEFAULT)] =
grpc_core::New<Executor>("default-executor");
executors[static_cast<size_t>(ExecutorType::RESOLVER)] =
grpc_core::New<Executor>("resolver-executor");
executors[GRPC_DEFAULT_EXECUTOR]->Init();
executors[GRPC_RESOLVER_EXECUTOR]->Init();
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Init();
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Init();
EXECUTOR_TRACE0("grpc_executor_init() done");
EXECUTOR_TRACE0("Executor::InitAll() done");
}
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
GrpcExecutorJobType job_type) {
return &schedulers_[executor_type][job_type];
grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type,
ExecutorJobType job_type) {
return &schedulers_[static_cast<size_t>(executor_type)]
[static_cast<size_t>(job_type)];
}
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) {
return Executor::Scheduler(ExecutorType::DEFAULT, job_type);
}
void grpc_executor_shutdown() {
EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
void Executor::ShutdownAll() {
EXECUTOR_TRACE0("Executor::ShutdownAll() enter");
// Return if grpc_executor_shutdown() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr);
// Return if Executor:SshutdownAll() is already called earlier
if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] == nullptr) {
GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] ==
nullptr);
return;
}
executors[GRPC_DEFAULT_EXECUTOR]->Shutdown();
executors[GRPC_RESOLVER_EXECUTOR]->Shutdown();
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Shutdown();
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Shutdown();
// Delete the executor objects.
//
@ -408,26 +425,36 @@ void grpc_executor_shutdown() {
// By ensuring that all executors are shutdown first, we are also ensuring
// that no thread is active across all executors.
grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]);
grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]);
executors[GRPC_DEFAULT_EXECUTOR] = nullptr;
executors[GRPC_RESOLVER_EXECUTOR] = nullptr;
grpc_core::Delete<Executor>(
executors[static_cast<size_t>(ExecutorType::DEFAULT)]);
grpc_core::Delete<Executor>(
executors[static_cast<size_t>(ExecutorType::RESOLVER)]);
executors[static_cast<size_t>(ExecutorType::DEFAULT)] = nullptr;
executors[static_cast<size_t>(ExecutorType::RESOLVER)] = nullptr;
EXECUTOR_TRACE0("grpc_executor_shutdown() done");
EXECUTOR_TRACE0("Executor::ShutdownAll() done");
}
bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
return executors[executor_type]->IsThreaded();
bool Executor::IsThreaded(ExecutorType executor_type) {
GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
return executors[static_cast<size_t>(executor_type)]->IsThreaded();
}
bool grpc_executor_is_threaded() {
return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
bool Executor::IsThreadedDefault() {
return Executor::IsThreaded(ExecutorType::DEFAULT);
}
void grpc_executor_set_threading(bool enable) {
EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
void Executor::SetThreadingAll(bool enable) {
EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable);
for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS);
i++) {
executors[i]->SetThreading(enable);
}
}
void Executor::SetThreadingDefault(bool enable) {
EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable);
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable);
}
} // namespace grpc_core

@ -25,7 +25,9 @@
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/closure.h"
typedef struct {
namespace grpc_core {
struct ThreadState {
gpr_mu mu;
size_t id; // For debugging purposes
const char* name; // Thread state name
@ -35,17 +37,24 @@ typedef struct {
bool shutdown;
bool queued_long_job;
grpc_core::Thread thd;
} ThreadState;
};
typedef enum {
GRPC_EXECUTOR_SHORT = 0,
GRPC_EXECUTOR_LONG,
GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this
} GrpcExecutorJobType;
enum class ExecutorType {
DEFAULT = 0,
RESOLVER,
NUM_EXECUTORS // Add new values above this
};
class GrpcExecutor {
enum class ExecutorJobType {
SHORT = 0,
LONG,
NUM_JOB_TYPES // Add new values above this
};
class Executor {
public:
GrpcExecutor(const char* executor_name);
Executor(const char* executor_name);
void Init();
@ -62,55 +71,51 @@ class GrpcExecutor {
* a short job (i.e expected to not block and complete quickly) */
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private:
static size_t RunClosures(const char* executor_name, grpc_closure_list list);
static void ThreadMain(void* arg);
// TODO(sreek): Currently we have two executors (available globally): The
// default executor and the resolver executor.
//
// Some of the functions below operate on the DEFAULT executor only while some
// operate of ALL the executors. This is a bit confusing and should be cleaned
// up in future (where we make all the following functions take ExecutorType
// and/or JobType)
const char* name_;
ThreadState* thd_state_;
size_t max_threads_;
gpr_atm num_threads_;
gpr_spinlock adding_thread_lock_;
};
// == Global executor functions ==
// Initialize ALL the executors
static void InitAll();
typedef enum {
GRPC_DEFAULT_EXECUTOR = 0,
GRPC_RESOLVER_EXECUTOR,
// Shutdown ALL the executors
static void ShutdownAll();
GRPC_NUM_EXECUTORS // Add new values above this
} GrpcExecutorType;
// Set the threading mode for ALL the executors
static void SetThreadingAll(bool enable);
// TODO(sreek): Currently we have two executors (available globally): The
// default executor and the resolver executor.
//
// Some of the functions below operate on the DEFAULT executor only while some
// operate of ALL the executors. This is a bit confusing and should be cleaned
// up in future (where we make all the following functions take executor_type
// and/or job_type)
// Set the threading mode for ALL the executors
static void SetThreadingDefault(bool enable);
// Initialize ALL the executors
void grpc_executor_init();
// Get the DEFAULT executor scheduler for the given job_type
static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type);
// Shutdown ALL the executors
void grpc_executor_shutdown();
// Get the executor scheduler for a given executor_type and a job_type
static grpc_closure_scheduler* Scheduler(ExecutorType executor_type,
ExecutorJobType job_type);
// Set the threading mode for ALL the executors
void grpc_executor_set_threading(bool enable);
// Return if a given executor is running in threaded mode (i.e if
// SetThreading(true) was called previously on that executor)
static bool IsThreaded(ExecutorType executor_type);
// Get the DEFAULT executor scheduler for the given job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type);
// Return if the DEFAULT executor is threaded
static bool IsThreadedDefault();
// Get the executor scheduler for a given executor_type and a job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
GrpcExecutorJobType job_type);
private:
static size_t RunClosures(const char* executor_name, grpc_closure_list list);
static void ThreadMain(void* arg);
// Return if a given executor is running in threaded mode (i.e if
// grpc_executor_set_threading(true) was called previously on that executor)
bool grpc_executor_is_threaded(GrpcExecutorType executor_type);
const char* name_;
ThreadState* thd_state_;
size_t max_threads_;
gpr_atm num_threads_;
gpr_spinlock adding_thread_lock_;
};
// Return if the DEFAULT executor is threaded
bool grpc_executor_is_threaded();
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */

@ -71,7 +71,7 @@ void grpc_prefork() {
return;
}
grpc_timer_manager_set_threading(false);
grpc_executor_set_threading(false);
grpc_core::Executor::SetThreadingAll(false);
grpc_core::ExecCtx::Get()->Flush();
grpc_core::Fork::AwaitThreads();
skipped_handler = false;
@ -82,7 +82,7 @@ void grpc_postfork_parent() {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
grpc_core::Executor::SetThreadingAll(true);
}
}
@ -96,7 +96,7 @@ void grpc_postfork_child() {
reset_polling_engine();
}
grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
grpc_core::Executor::SetThreadingAll(true);
}
}

@ -52,7 +52,7 @@ void grpc_iomgr_init() {
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_executor_init();
grpc_core::Executor::InitAll();
grpc_timer_list_init();
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = (char*)"root";
@ -88,7 +88,7 @@ void grpc_iomgr_shutdown() {
{
grpc_timer_manager_shutdown();
grpc_iomgr_platform_flush();
grpc_executor_shutdown();
grpc_core::Executor::ShutdownAll();
gpr_mu_lock(&g_mu);
g_shutdown = 1;

@ -34,7 +34,7 @@ gpr_thd_id g_init_thread;
static void iomgr_platform_init(void) {
grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false);
grpc_core::Executor::SetThreadingAll(false);
g_init_thread = gpr_thd_currentid();
grpc_pollset_global_init();
}

@ -150,7 +150,7 @@ typedef struct {
void* arg;
} request;
/* Callback to be passed to grpc_executor to asynch-ify
/* Callback to be passed to grpc Executor to asynch-ify
* grpc_blocking_resolve_address */
static void do_request_thread(void* rp, grpc_error* error) {
request* r = static_cast<request*>(rp);
@ -168,7 +168,8 @@ static void posix_resolve_address(const char* name, const char* default_port,
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;

@ -153,7 +153,8 @@ static void windows_resolve_address(const char* name, const char* default_port,
request* r = (request*)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;

@ -227,10 +227,10 @@ static void cover_self(grpc_tcp* tcp) {
}
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)),
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
grpc_core::Executor::Scheduler(
grpc_core::ExecutorJobType::LONG)),
GRPC_ERROR_NONE);
} else {
while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) ==
nullptr) {

@ -481,8 +481,9 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
if (udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
GRPC_CLOSURE_INIT(
&do_read_closure_, do_read, do_read_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
@ -542,8 +543,9 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
}
/* Schedule actual write in another thread. */
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
GRPC_CLOSURE_INIT(
&do_write_closure_, do_write, do_write_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
}

@ -165,7 +165,7 @@ void grpc_shutdown(void) {
{
grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread
grpc_executor_shutdown();
grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy();

@ -1134,8 +1134,9 @@ void grpc_server_start(grpc_server* server) {
server_ref(server);
server->starting = true;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(start_listeners, server,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
GRPC_CLOSURE_CREATE(
start_listeners, server,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_NONE);
}

@ -73,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) {
Throw this over to the executor (on a core-owned thread) and process it
there. */
refcount->destroy.scheduler =
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
}
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
}

@ -57,6 +57,7 @@ ClientContext::ClientContext()
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
census_context_(nullptr),
propagate_from_call_(nullptr),
compression_algorithm_(GRPC_COMPRESS_NONE),
initial_metadata_corked_(false) {
g_client_callbacks->DefaultConstructor(this);
}

@ -19,7 +19,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
namespace Grpc.Core.Testing
{
@ -36,23 +35,73 @@ namespace Grpc.Core.Testing
string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken,
Func<Metadata, Task> writeHeadersFunc, Func<WriteOptions> writeOptionsGetter, Action<WriteOptions> writeOptionsSetter)
{
return new ServerCallContext(null, method, host, deadline, requestHeaders, cancellationToken,
writeHeadersFunc, new WriteOptionsHolder(writeOptionsGetter, writeOptionsSetter),
() => peer, () => authContext, () => contextPropagationToken);
return new TestingServerCallContext(method, host, deadline, requestHeaders, cancellationToken, peer,
authContext, contextPropagationToken, writeHeadersFunc, writeOptionsGetter, writeOptionsSetter);
}
private class WriteOptionsHolder : IHasWriteOptions
private class TestingServerCallContext : ServerCallContext
{
Func<WriteOptions> writeOptionsGetter;
Action<WriteOptions> writeOptionsSetter;
private readonly string method;
private readonly string host;
private readonly DateTime deadline;
private readonly Metadata requestHeaders;
private readonly CancellationToken cancellationToken;
private readonly Metadata responseTrailers = new Metadata();
private Status status;
private readonly string peer;
private readonly AuthContext authContext;
private readonly ContextPropagationToken contextPropagationToken;
private readonly Func<Metadata, Task> writeHeadersFunc;
private readonly Func<WriteOptions> writeOptionsGetter;
private readonly Action<WriteOptions> writeOptionsSetter;
public WriteOptionsHolder(Func<WriteOptions> writeOptionsGetter, Action<WriteOptions> writeOptionsSetter)
public TestingServerCallContext(string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken,
Func<Metadata, Task> writeHeadersFunc, Func<WriteOptions> writeOptionsGetter, Action<WriteOptions> writeOptionsSetter)
{
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.responseTrailers = new Metadata();
this.status = Status.DefaultSuccess;
this.peer = peer;
this.authContext = authContext;
this.contextPropagationToken = contextPropagationToken;
this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsGetter = writeOptionsGetter;
this.writeOptionsSetter = writeOptionsSetter;
}
public WriteOptions WriteOptions { get => writeOptionsGetter(); set => writeOptionsSetter(value); }
protected override string MethodCore => method;
protected override string HostCore => host;
protected override string PeerCore => peer;
protected override DateTime DeadlineCore => deadline;
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore { get => status; set => status = value; }
protected override WriteOptions WriteOptionsCore { get => writeOptionsGetter(); set => writeOptionsSetter(value); }
protected override AuthContext AuthContextCore => authContext;
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
{
return contextPropagationToken;
}
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return writeHeadersFunc(responseHeaders);
}
}
}
}

@ -0,0 +1,110 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
/// Default implementation of <c>ServerCallContext</c>.
/// </summary>
internal class DefaultServerCallContext : ServerCallContext
{
private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly DateTime deadline;
private readonly Metadata requestHeaders;
private readonly CancellationToken cancellationToken;
private readonly Metadata responseTrailers;
private Status status;
private readonly IServerResponseStream serverResponseStream;
private readonly Lazy<AuthContext> authContext;
/// <summary>
/// Creates a new instance of <c>ServerCallContext</c>.
/// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally.
/// To provide state, this <c>ServerCallContext</c> instance and <c>extraData</c> will be passed to the member implementations.
/// </summary>
internal DefaultServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline,
Metadata requestHeaders, CancellationToken cancellationToken, IServerResponseStream serverResponseStream)
{
this.callHandle = callHandle;
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.responseTrailers = new Metadata();
this.status = Status.DefaultSuccess;
this.serverResponseStream = serverResponseStream;
// TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object
this.authContext = new Lazy<AuthContext>(GetAuthContextEager);
}
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
{
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return serverResponseStream.WriteResponseHeadersAsync(responseHeaders);
}
protected override string MethodCore => method;
protected override string HostCore => host;
protected override string PeerCore => callHandle.GetPeer();
protected override DateTime DeadlineCore => deadline;
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions WriteOptionsCore
{
get => serverResponseStream.WriteOptions;
set => serverResponseStream.WriteOptions = value;
}
protected override AuthContext AuthContextCore => authContext.Value;
private AuthContext GetAuthContextEager()
{
using (var authContextNative = callHandle.GetAuthContext())
{
return authContextNative.ToAuthContext();
}
}
}
}

@ -0,0 +1,38 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core.Internal
{
/// <summary>
/// Exposes non-generic members of <c>ServerReponseStream</c>.
/// </summary>
internal interface IServerResponseStream
{
/// <summary>
/// Asynchronously sends response headers for the current call to the client. See <c>ServerCallContext.WriteResponseHeadersAsync</c> for exact semantics.
/// </summary>
Task WriteResponseHeadersAsync(Metadata responseHeaders);
/// <summary>
/// Gets or sets the write options.
/// </summary>
WriteOptions WriteOptions { get; set; }
}
}

@ -71,7 +71,7 @@ namespace Grpc.Core.Internal
var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
}
catch (Exception e)
{
if (!(e is RpcException))
@ -345,14 +345,10 @@ namespace Grpc.Core.Internal
return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
return new DefaultServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream);
}
}
}

@ -23,7 +23,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IServerResponseStream
where TRequest : class
where TResponse : class
{

@ -20,54 +20,18 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
public class ServerCallContext
public abstract class ServerCallContext
{
private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly DateTime deadline;
private readonly Metadata requestHeaders;
private readonly CancellationToken cancellationToken;
private readonly Metadata responseTrailers = new Metadata();
private readonly Func<Metadata, Task> writeHeadersFunc;
private readonly IHasWriteOptions writeOptionsHolder;
private readonly Lazy<AuthContext> authContext;
private readonly Func<string> testingOnlyPeerGetter;
private readonly Func<AuthContext> testingOnlyAuthContextGetter;
private readonly Func<ContextPropagationToken> testingOnlyContextPropagationTokenFactory;
private Status status = Status.DefaultSuccess;
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
: this(callHandle, method, host, deadline, requestHeaders, cancellationToken, writeHeadersFunc, writeOptionsHolder, null, null, null)
{
}
// Additional constructor params should be used for testing only
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder,
Func<string> testingOnlyPeerGetter, Func<AuthContext> testingOnlyAuthContextGetter, Func<ContextPropagationToken> testingOnlyContextPropagationTokenFactory)
/// <summary>
/// Creates a new instance of <c>ServerCallContext</c>.
/// </summary>
protected ServerCallContext()
{
this.callHandle = callHandle;
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsHolder = writeOptionsHolder;
this.authContext = new Lazy<AuthContext>(GetAuthContextEager);
this.testingOnlyPeerGetter = testingOnlyPeerGetter;
this.testingOnlyAuthContextGetter = testingOnlyAuthContextGetter;
this.testingOnlyContextPropagationTokenFactory = testingOnlyContextPropagationTokenFactory;
}
/// <summary>
@ -79,7 +43,7 @@ namespace Grpc.Core
/// <returns>The task that finished once response headers have been written.</returns>
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
return writeHeadersFunc(responseHeaders);
return WriteResponseHeadersAsyncCore(responseHeaders);
}
/// <summary>
@ -87,94 +51,41 @@ namespace Grpc.Core
/// </summary>
public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
{
if (testingOnlyContextPropagationTokenFactory != null)
{
return testingOnlyContextPropagationTokenFactory();
}
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
return CreatePropagationTokenCore(options);
}
/// <summary>Name of method called in this RPC.</summary>
public string Method
{
get
{
return this.method;
}
}
public string Method => MethodCore;
/// <summary>Name of host called in this RPC.</summary>
public string Host
{
get
{
return this.host;
}
}
public string Host => HostCore;
/// <summary>Address of the remote endpoint in URI format.</summary>
public string Peer
{
get
{
if (testingOnlyPeerGetter != null)
{
return testingOnlyPeerGetter();
}
// Getting the peer lazily is fine as the native call is guaranteed
// not to be disposed before user-supplied server side handler returns.
// Most users won't need to read this field anyway.
return this.callHandle.GetPeer();
}
}
public string Peer => PeerCore;
/// <summary>Deadline for this RPC.</summary>
public DateTime Deadline
{
get
{
return this.deadline;
}
}
public DateTime Deadline => DeadlineCore;
/// <summary>Initial metadata sent by client.</summary>
public Metadata RequestHeaders
{
get
{
return this.requestHeaders;
}
}
public Metadata RequestHeaders => RequestHeadersCore;
/// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
{
return this.cancellationToken;
}
}
public CancellationToken CancellationToken => CancellationTokenCore;
/// <summary>Trailers to send back to client after RPC finishes.</summary>
public Metadata ResponseTrailers
{
get
{
return this.responseTrailers;
}
}
public Metadata ResponseTrailers => ResponseTrailersCore;
/// <summary> Status to send back to client after RPC finishes.</summary>
public Status Status
{
get
{
return this.status;
return StatusCore;
}
set
{
status = value;
StatusCore = value;
}
}
@ -187,12 +98,12 @@ namespace Grpc.Core
{
get
{
return writeOptionsHolder.WriteOptions;
return WriteOptionsCore;
}
set
{
writeOptionsHolder.WriteOptions = value;
WriteOptionsCore = value;
}
}
@ -200,35 +111,31 @@ namespace Grpc.Core
/// Gets the <c>AuthContext</c> associated with this call.
/// Note: Access to AuthContext is an experimental API that can change without any prior notice.
/// </summary>
public AuthContext AuthContext
{
get
{
if (testingOnlyAuthContextGetter != null)
{
return testingOnlyAuthContextGetter();
}
return authContext.Value;
}
}
private AuthContext GetAuthContextEager()
{
using (var authContextNative = callHandle.GetAuthContext())
{
return authContextNative.ToAuthContext();
}
}
}
/// <summary>
/// Allows sharing write options between ServerCallContext and other objects.
/// </summary>
internal interface IHasWriteOptions
{
/// <summary>
/// Gets or sets the write options.
/// </summary>
WriteOptions WriteOptions { get; set; }
public AuthContext AuthContext => AuthContextCore;
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract Task WriteResponseHeadersAsyncCore(Metadata responseHeaders);
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options);
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract string MethodCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract string HostCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract string PeerCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract DateTime DeadlineCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract Metadata RequestHeadersCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract CancellationToken CancellationTokenCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract Metadata ResponseTrailersCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract Status StatusCore { get; set; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract WriteOptions WriteOptionsCore { get; set; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract AuthContext AuthContextCore { get; }
}
}

@ -22,7 +22,6 @@ import grpc
from grpc import _common
from grpc import _grpcio_metadata
from grpc._cython import cygrpc
from grpc.framework.foundation import callable_util
_LOGGER = logging.getLogger(__name__)
@ -871,9 +870,11 @@ def _deliver(state, initial_connectivity, initial_callbacks):
while True:
for callback in callbacks:
cygrpc.block_if_fork_in_progress(state)
callable_util.call_logging_exceptions(
callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
connectivity)
try:
callback(connectivity)
except Exception: # pylint: disable=broad-except
_LOGGER.exception(
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
with state.lock:
callbacks = _deliveries(state)
if callbacks:

@ -25,7 +25,6 @@ import grpc
from grpc import _common
from grpc import _interceptor
from grpc._cython import cygrpc
from grpc.framework.foundation import callable_util
_LOGGER = logging.getLogger(__name__)
@ -748,8 +747,10 @@ def _process_event_and_continue(state, event):
else:
rpc_state, callbacks = event.tag(event)
for callback in callbacks:
callable_util.call_logging_exceptions(callback,
'Exception calling callback!')
try:
callback()
except Exception: # pylint: disable=broad-except
_LOGGER.exception('Exception calling callback!')
if rpc_state is not None:
with state.lock:
state.rpc_states.remove(rpc_state)

@ -16,12 +16,14 @@
import collections
import threading
import time
import logging
import six
import grpc
from grpc import _common
from grpc.framework.foundation import callable_util
_LOGGER = logging.getLogger(__name__)
_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
'Exception calling connectivity future "done" callback!')
@ -98,8 +100,10 @@ class _ChannelReadyFuture(grpc.Future):
return
for done_callback in done_callbacks:
callable_util.call_logging_exceptions(
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
try:
done_callback(self)
except Exception: # pylint: disable=broad-except
_LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
def cancel(self):
with self._condition:
@ -113,8 +117,10 @@ class _ChannelReadyFuture(grpc.Future):
return False
for done_callback in done_callbacks:
callable_util.call_logging_exceptions(
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
try:
done_callback(self)
except Exception: # pylint: disable=broad-except
_LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
return True

@ -706,7 +706,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_timer_manager_set_threading(false);
{
grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false);
grpc_core::Executor::SetThreadingAll(false);
}
grpc_set_resolver_impl(&fuzzer_resolver);
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;

@ -46,7 +46,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false);
grpc_core::Executor::SetThreadingAll(false);
grpc_resource_quota* resource_quota =
grpc_resource_quota_create("client_fuzzer");

@ -43,7 +43,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false);
grpc_core::Executor::SetThreadingAll(false);
grpc_resource_quota* resource_quota =
grpc_resource_quota_create("server_fuzzer");

@ -290,7 +290,7 @@ int main(int argc, char** argv) {
test_invalid_ip_addresses();
test_unparseable_hostports();
}
grpc_executor_shutdown();
grpc_core::Executor::ShutdownAll();
}
gpr_cmdline_destroy(cl);

@ -21,15 +21,8 @@ ulimit -c unlimited
# Performance PR testing needs GH API key and PR metadata to comment results
if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then
set +x
sudo apt-get install -y jq
export ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref)
gsutil cp gs://grpc-testing-secrets/github_credentials/oauth_token.txt ~/
# TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation
export JENKINS_OAUTH_TOKEN=$(cat ~/oauth_token.txt)
export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER
set -x
fi
sudo pip install tabulate

@ -25,21 +25,16 @@ export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db3
# If this is a PR using RUN_TESTS_FLAGS var, then add flags to filter tests
if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then
set +x
brew update
brew install jq || brew upgrade jq
ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref)
export RUN_TESTS_FLAGS="$RUN_TESTS_FLAGS --filter_pr_tests --base_branch origin/$ghprbTargetBranch"
# TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation
export JENKINS_OAUTH_TOKEN=$(cat ${KOKORO_GFILE_DIR}/oauth_token.txt)
export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER
set -x
fi
set +ex # rvm script is very verbose and exits with errorcode
# Advice from https://github.com/Homebrew/homebrew-cask/issues/8629#issuecomment-68641176
brew update && brew upgrade brew-cask && brew cleanup && brew cask cleanup
rvm --debug requirements ruby-2.5.0
source $HOME/.rvm/scripts/rvm
set -e # rvm commands are very verbose
time rvm install 2.5.0

@ -17,7 +17,6 @@
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/macos/grpc_ios_binary_size.sh"
timeout_mins: 60
gfile_resources: "/bigstore/grpc-testing-secrets/github_credentials/oauth_token.txt"
before_action {
fetch_keystore {
keystore_resource {

@ -98,6 +98,7 @@ LANG_RELEASE_MATRIX = {
('v1.15.0', ReleaseInfo()),
('v1.16.0', ReleaseInfo()),
('v1.17.1', ReleaseInfo()),
('v1.18.0', ReleaseInfo()),
]),
'go':
OrderedDict([
@ -161,6 +162,7 @@ LANG_RELEASE_MATRIX = {
('v1.15.0', ReleaseInfo()),
('v1.16.0', ReleaseInfo()),
('v1.17.1', ReleaseInfo()),
('v1.18.0', ReleaseInfo()),
]),
'node':
OrderedDict([
@ -201,6 +203,10 @@ LANG_RELEASE_MATRIX = {
('v1.15.0', ReleaseInfo()),
('v1.16.0', ReleaseInfo()),
('v1.17.1', ReleaseInfo()),
('v1.18.0',
ReleaseInfo(patch=[
'tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh',
])),
]),
'php':
OrderedDict([
@ -221,6 +227,7 @@ LANG_RELEASE_MATRIX = {
('v1.15.0', ReleaseInfo()),
('v1.16.0', ReleaseInfo()),
('v1.17.1', ReleaseInfo()),
('v1.18.0', ReleaseInfo()),
]),
'csharp':
OrderedDict([
@ -245,6 +252,7 @@ LANG_RELEASE_MATRIX = {
('v1.15.0', ReleaseInfo()),
('v1.16.0', ReleaseInfo()),
('v1.17.1', ReleaseInfo()),
('v1.18.0', ReleaseInfo()),
]),
}

@ -0,0 +1,10 @@
diff --git a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh
index 67f66090ae..e71ad91499 100755
--- a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh
+++ b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh
@@ -30,4 +30,4 @@ cd /var/local/git/grpc
rvm --default use ruby-2.5
# build Ruby interop client and server
-(cd src/ruby && gem update bundler && bundle && rake compile)
+(cd src/ruby && gem install bundler -v 1.17.3 && bundle && rake compile)

@ -1,37 +0,0 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import urllib2
def comment_on_pr(text):
if 'JENKINS_OAUTH_TOKEN' not in os.environ:
print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting'
return
if 'ghprbPullId' not in os.environ:
print 'Missing ghprbPullId env var: not commenting'
return
req = urllib2.Request(
url='https://api.github.com/repos/grpc/grpc/issues/%s/comments' %
os.environ['ghprbPullId'],
data=json.dumps({
'body': text
}),
headers={
'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'],
'Content-Type': 'application/json',
})
print urllib2.urlopen(req).read()
Loading…
Cancel
Save