Merge branch 'master' into bugfix/enable-ptrace-for-asan

pull/17815/head
Bill Feng 6 years ago
commit 1480d600d3
  1. 38
      doc/interop-test-descriptions.md
  2. 4
      gRPC-C++.podspec
  3. 4
      gRPC-Core.podspec
  4. 1
      gRPC-ProtoRPC.podspec
  5. 1
      gRPC-RxLibrary.podspec
  6. 1
      gRPC.podspec
  7. 4
      include/grpc/impl/codegen/grpc_types.h
  8. 27
      include/grpcpp/impl/codegen/interceptor.h
  9. 29
      include/grpcpp/server.h
  10. 3
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  11. 5
      src/core/ext/filters/client_channel/subchannel.cc
  12. 5
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 1
      src/core/lib/iomgr/exec_ctx.cc
  14. 57
      src/core/lib/iomgr/exec_ctx.h
  15. 7
      src/core/lib/iomgr/executor.cc
  16. 7
      src/core/lib/iomgr/timer_manager.cc
  17. 6
      src/core/lib/surface/call.cc
  18. 5
      src/core/lib/surface/completion_queue.cc
  19. 23
      src/core/lib/surface/server.cc
  20. 6
      src/core/lib/transport/transport.cc
  21. 3
      src/cpp/common/alarm.cc
  22. 4
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  23. 225
      src/cpp/server/server_cc.cc
  24. 1
      src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
  25. 4
      src/objective-c/BoringSSL-GRPC.podspec
  26. 8
      src/php/bin/run_tests.sh
  27. 2310
      src/php/tests/MemoryLeakTest/MemoryLeakTest.php
  28. 10
      src/php/tests/unit_tests/CallTest.php
  29. 18
      src/proto/grpc/core/BUILD
  30. 94
      src/proto/grpc/testing/BUILD
  31. 2
      src/python/grpcio/_parallel_compile_patch.py
  32. 12
      src/python/grpcio_tests/setup.py
  33. 1
      src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
  34. 103
      src/python/grpcio_tests/tests/qps/BUILD
  35. 102
      src/python/grpcio_tests/tests/qps/README.md
  36. 45
      src/python/grpcio_tests/tests/qps/basic_benchmark_test.sh
  37. 96
      src/python/grpcio_tests/tests/qps/scenarios.json
  38. 4
      templates/gRPC-C++.podspec.template
  39. 4
      templates/gRPC-Core.podspec.template
  40. 1
      templates/gRPC-ProtoRPC.podspec.template
  41. 1
      templates/gRPC-RxLibrary.podspec.template
  42. 1
      templates/gRPC.podspec.template
  43. 1
      templates/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec.template
  44. 4
      templates/src/objective-c/BoringSSL-GRPC.podspec.template
  45. 1
      templates/test/cpp/naming/resolver_component_tests_defs.include
  46. 7
      templates/tools/dockerfile/php_valgrind.include
  47. 1
      templates/tools/dockerfile/test/php7_jessie_x64/Dockerfile.template
  48. 1
      templates/tools/dockerfile/test/php_jessie_x64/Dockerfile.template
  49. 2
      test/core/client_channel/resolvers/dns_resolver_test.cc
  50. 2
      test/core/end2end/tests/filter_status_code.cc
  51. 6
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  52. 69
      test/core/surface/completion_queue_test.cc
  53. 1
      test/cpp/end2end/BUILD
  54. 6
      test/cpp/interop/client.cc
  55. 19
      test/cpp/interop/interop_client.cc
  56. 2
      test/cpp/interop/interop_client.h
  57. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  58. 1
      test/cpp/naming/resolver_component_tests_runner.py
  59. 6
      test/cpp/qps/BUILD
  60. 8
      tools/dockerfile/test/php7_jessie_x64/Dockerfile
  61. 8
      tools/dockerfile/test/php_jessie_x64/Dockerfile
  62. 3
      tools/internal_ci/macos/grpc_interop_toprod.sh
  63. 93
      tools/run_tests/run_interop_tests.py

@ -679,6 +679,44 @@ Client asserts:
by the auth library. The client can optionally check the username matches the
email address in the key file.
### google_default_credentials
Similar to the other auth tests, this test should only be run against prod
servers. Different from some of the other auth tests however, this test
may be also run from outside of GCP.
This test verifies unary calls succeed when the client uses
GoogleDefaultCredentials. The path to a service account key file in the
GOOGLE_APPLICATION_CREDENTIALS environment variable may or may not be
provided by the test runner. For example, the test runner might set
this environment when outside of GCP but keep it unset when on GCP.
The test uses `--default_service_account` with GCE service account email.
Server features:
* [UnaryCall][]
* [Echo Authenticated Username][]
Procedure:
1. Client configures the channel to use GoogleDefaultCredentials
* Note: the term `GoogleDefaultCredentials` within the context
of this test description refers to an API which encapsulates
both "transport credentials" and "call credentials" and which
is capable of transport creds auto-selection (including ALTS).
Similar APIs involving only auto-selection of OAuth mechanisms
might work for this test but aren't the intended subjects.
2. Client calls UnaryCall with:
```
{
fill_username: true
}
```
Client asserts:
* call was successful
* received SimpleResponse.username matches the value of
`--default_service_account`
### custom_metadata

@ -24,7 +24,7 @@ Pod::Spec.new do |s|
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
# version = '1.19.0-dev'
version = '0.0.6-dev'
version = '0.0.8-dev'
s.version = version
s.summary = 'gRPC C++ library'
s.homepage = 'https://grpc.io'
@ -40,6 +40,8 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpcpp'

@ -40,6 +40,8 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpc'
@ -181,7 +183,7 @@ Pod::Spec.new do |s|
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
ss.dependency 'BoringSSL-GRPC', '0.0.2'
ss.dependency 'BoringSSL-GRPC', '0.0.3'
ss.dependency 'nanopb', '~> 0.3'
ss.compiler_flags = '-DGRPC_SHADOW_BORINGSSL_SYMBOLS'

@ -35,6 +35,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'ProtoRPC'
s.module_name = name

@ -35,6 +35,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'RxLibrary'
s.module_name = name

@ -34,6 +34,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'GRPCClient'
s.module_name = name

@ -693,6 +693,10 @@ typedef struct grpc_experimental_completion_queue_functor {
pointer to this functor and a boolean that indicates whether the
operation succeeded (non-zero) or failed (zero) */
void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
/** The following fields are not API. They are meant for internal use. */
int internal_success;
struct grpc_experimental_completion_queue_functor* internal_next;
} grpc_experimental_completion_queue_functor;
/* The upgrade to version 2 is currently experimental. */

@ -107,6 +107,24 @@ class InterceptorBatchMethods {
/// of the hijacking interceptor.
virtual void Hijack() = 0;
/// Send Message Methods
/// GetSerializedSendMessage and GetSendMessage/ModifySendMessage are the
/// available methods to view and modify the request payload. An interceptor
/// can access the payload in either serialized form or non-serialized form
/// but not both at the same time.
/// gRPC performs serialization in a lazy manner, which means
/// that a call to GetSerializedSendMessage will result in a serialization
/// operation if the payload stored is not in the serialized form already; the
/// non-serialized form will be lost and GetSendMessage will no longer return
/// a valid pointer, and this will remain true for later interceptors too.
/// This can change however if ModifySendMessage is used to replace the
/// current payload. Note that ModifySendMessage requires a new payload
/// message in the non-serialized form. This will overwrite the existing
/// payload irrespective of whether it had been serialized earlier. Also note
/// that gRPC Async API requires early serialization of the payload which
/// means that the payload would be available in the serialized form only
/// unless an interceptor replaces the payload with ModifySendMessage.
/// Returns a modifable ByteBuffer holding the serialized form of the message
/// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions.
/// A return value of nullptr indicates that this ByteBuffer is not valid.
@ -114,15 +132,16 @@ class InterceptorBatchMethods {
/// Returns a non-modifiable pointer to the non-serialized form of the message
/// to be sent. Valid for PRE_SEND_MESSAGE interceptions. A return value of
/// nullptr indicates that this field is not valid. Also note that this is
/// only supported for sync and callback APIs at the present moment.
/// nullptr indicates that this field is not valid.
virtual const void* GetSendMessage() = 0;
/// Overwrites the message to be sent with \a message. \a message should be in
/// the non-serialized form expected by the method. Valid for PRE_SEND_MESSAGE
/// interceptions. Note that the interceptor is responsible for maintaining
/// the life of the message for the duration on the send operation, i.e., till
/// POST_SEND_MESSAGE.
/// the life of the message till it is serialized or it receives the
/// POST_SEND_MESSAGE interception point, whichever happens earlier. The
/// modifying interceptor may itself force early serialization by calling
/// GetSerializedSendMessage.
virtual void ModifySendMessage(const void* message) = 0;
/// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE

@ -248,8 +248,22 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
/// Outstanding callback requests
std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_;
// Outstanding callback requests. The vector is indexed by method with a list
// per method. Each element should store its own iterator in the list and
// should erase it when the request is actually bound to an RPC. Synchronize
// this list with its own mu_ (not the server mu_) since these must be active
// at Shutdown when the server mu_ is locked.
// TODO(vjpai): Merge with the core request matcher to avoid duplicate work
struct MethodReqList {
std::mutex reqs_mu;
// Maintain our own list size count since list::size is still linear
// for some libraries (supposed to be constant since C++11)
// TODO(vjpai): Remove reqs_list_sz and use list::size when possible
size_t reqs_list_sz{0};
std::list<CallbackRequest*> reqs_list;
using iterator = decltype(reqs_list)::iterator;
};
std::vector<MethodReqList*> callback_reqs_;
// Server status
std::mutex mu_;
@ -259,6 +273,17 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::condition_variable shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
// decremented under the lock in case it is the last request and enables the
// server shutdown. The increment is performance-critical since it happens
// during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::vector<grpc::string> services_;

@ -478,8 +478,7 @@ static grpc_address_resolver_vtable ares_resolver = {
grpc_resolve_address_ares, blocking_resolve_address_ares};
static bool should_use_ares(const char* resolver_env) {
return resolver_env == nullptr || strlen(resolver_env) == 0 ||
gpr_stricmp(resolver_env, "ares") == 0;
return resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0;
}
void grpc_resolver_dns_ares_init() {

@ -236,6 +236,7 @@ class ConnectedSubchannelStateWatcher
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
}
// Must be called while holding subchannel_->mu.
void Orphan() override { health_check_client_.reset(); }
private:
@ -302,12 +303,12 @@ class ConnectedSubchannelStateWatcher
static void OnHealthChanged(void* arg, grpc_error* error) {
auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
grpc_subchannel* c = self->subchannel_;
MutexLock lock(&c->mu);
if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) {
self->Unref();
return;
}
grpc_subchannel* c = self->subchannel_;
MutexLock lock(&c->mu);
if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker,
self->health_state_, GRPC_ERROR_REF(error),

@ -43,6 +43,7 @@
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -963,6 +964,10 @@ void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
bool early_results_scheduled,
bool partial_write) {
// If we're already in a background poller, don't offload this to an executor
if (grpc_iomgr_is_any_background_poller_thread()) {
return grpc_schedule_on_exec_ctx;
}
/* if it's not the first write in a batch, always offload to the executor:
we'll probably end up queuing against the kernel anyway, so we'll likely
get better latency overall if we switch writing work elsewhere and continue

@ -115,6 +115,7 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_);
// WARNING: for testing purposes only!
void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) {

@ -21,12 +21,14 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/atm.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/closure.h"
typedef int64_t grpc_millis;
@ -34,9 +36,8 @@ typedef int64_t grpc_millis;
#define GRPC_MILLIS_INF_FUTURE INT64_MAX
#define GRPC_MILLIS_INF_PAST INT64_MIN
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
typedef struct grpc_workqueue grpc_workqueue;
/** A combiner represents a list of work to be executed later.
Forward declared here to avoid a circular dependency with combiner.h. */
typedef struct grpc_combiner grpc_combiner;
/* This exec_ctx is ready to return: either pre-populated, or cached as soon as
@ -226,6 +227,56 @@ class ExecCtx {
GPR_TLS_CLASS_DECL(exec_ctx_);
ExecCtx* last_exec_ctx_ = Get();
};
class ApplicationCallbackExecCtx {
public:
ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
grpc_core::Fork::IncExecCtxCount();
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(this));
}
}
~ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == this) {
while (head_ != nullptr) {
auto* f = head_;
head_ = f->internal_next;
if (f->internal_next == nullptr) {
tail_ = nullptr;
}
(*f->functor_run)(f, f->internal_success);
}
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
grpc_core::Fork::DecExecCtxCount();
} else {
GPR_DEBUG_ASSERT(head_ == nullptr);
GPR_DEBUG_ASSERT(tail_ == nullptr);
}
}
static void Enqueue(grpc_experimental_completion_queue_functor* functor,
int is_success) {
functor->internal_success = is_success;
functor->internal_next = nullptr;
auto* ctx = reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_));
if (ctx->head_ == nullptr) {
ctx->head_ = functor;
}
if (ctx->tail_ != nullptr) {
ctx->tail_->internal_next = functor;
}
ctx->tail_ = functor;
}
private:
grpc_experimental_completion_queue_functor* head_{nullptr};
grpc_experimental_completion_queue_functor* tail_{nullptr};
GPR_TLS_CLASS_DECL(callback_exec_ctx_);
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */

@ -111,6 +111,13 @@ size_t Executor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t n = 0;
// In the executor, the ExecCtx for the thread is declared in the executor
// thread itself, but this is the point where we could start seeing
// application-level callbacks. No need to create a new ExecCtx, though,
// since there already is one and it is flushed (but not destructed) in this
// function itself.
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_closure* c = list.head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;

@ -105,6 +105,13 @@ void grpc_timer_manager_tick() {
}
static void run_some_timers() {
// In the case of timers, the ExecCtx for the thread is declared
// in the timer thread itself, but this is the point where we
// could start seeing application-level callbacks. No need to
// create a new ExecCtx, though, since there already is one and it is
// flushed (but not destructed) in this function itself
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary

@ -556,6 +556,7 @@ void grpc_call_unref(grpc_call* c) {
GPR_TIMER_SCOPE("grpc_call_unref", 0);
child_call* cc = c->child;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
@ -597,6 +598,7 @@ void grpc_call_unref(grpc_call* c) {
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
@ -646,6 +648,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
@ -1894,7 +1897,6 @@ done_with_error:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) {
grpc_core::ExecCtx exec_ctx;
grpc_call_error err;
GRPC_API_TRACE(
@ -1905,6 +1907,8 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
if (reserved != nullptr) {
err = GRPC_CALL_ERROR;
} else {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
err = call_start_batch(call, ops, nops, tag, 0);
}

@ -868,7 +868,7 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
(*functor->functor_run)(functor, is_success);
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1352,7 +1352,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
(*callback->functor_run)(callback, true);
grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
@ -1385,6 +1385,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
cq->vtable->shutdown(cq);

@ -1302,6 +1302,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
listener* l;
shutdown_tag* sdt;
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
@ -1369,6 +1370,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
void grpc_server_cancel_all_calls(grpc_server* server) {
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
@ -1384,6 +1386,7 @@ void grpc_server_cancel_all_calls(grpc_server* server) {
void grpc_server_destroy(grpc_server* server) {
listener* l;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
@ -1469,6 +1472,7 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
@ -1515,11 +1519,11 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
registered_method* rm = static_cast<registered_method*>(rmp);
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@ -1537,19 +1541,17 @@ grpc_call_error grpc_server_request_registered_call(
}
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
if ((optional_payload == nullptr) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
gpr_free(rc);
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
}
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
@ -1561,10 +1563,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
error = queue_call_request(server, cq_idx, rc);
done:
return error;
return queue_call_request(server, cq_idx, rc);
}
static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,

@ -30,6 +30,7 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/transport_impl.h"
@ -63,8 +64,9 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
void grpc_stream_unref(grpc_stream_refcount* refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
if (grpc_core::ExecCtx::Get()->flags() &
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
if (!grpc_iomgr_is_any_background_poller_thread() &&
(grpc_core::ExecCtx::Get()->flags() &
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
/* Ick.
The thread we're running on MAY be owned (indirectly) by a call-stack.
If that's the case, destroying the call-stack MAY try to destroy the

@ -52,6 +52,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
return true;
}
void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
@ -72,6 +73,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
@ -87,6 +89,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
}

@ -211,8 +211,8 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
load_key_);
const auto& load_report_interval = initial_request.load_report_interval();
load_report_interval_ms_ =
static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
load_report_interval.nanos() / 1000);
static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
load_report_interval.nanos() / 1000);
gpr_log(
GPR_INFO,
"[LRS %p] Initial request received. Start load reporting (load "

@ -59,7 +59,15 @@ namespace {
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
// How many callback requests of each method should we pre-register at start
#define DEFAULT_CALLBACK_REQS_PER_METHOD 32
#define DEFAULT_CALLBACK_REQS_PER_METHOD 512
// What is the (soft) limit for outstanding requests in the server
#define SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000
// If the number of unmatched requests for a method drops below this amount, try
// to allocate extra unless it pushes the total number of callbacks above the
// soft maximum
#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
@ -177,11 +185,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
if (grpc_server_request_registered_call(
server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this)) {
notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
@ -343,9 +350,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
class Server::CallbackRequest final : public internal::CompletionQueueTag {
public:
CallbackRequest(Server* server, internal::RpcServiceMethod* method,
void* method_tag)
CallbackRequest(Server* server, Server::MethodReqList* list,
internal::RpcServiceMethod* method, void* method_tag)
: server_(server),
req_list_(list),
method_(method),
method_tag_(method_tag),
has_request_payload_(
@ -353,12 +361,22 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
method->method_type() == internal::RpcMethod::SERVER_STREAMING),
cq_(server->CallbackCQ()),
tag_(this) {
server_->callback_reqs_outstanding_++;
Setup();
}
~CallbackRequest() { Clear(); }
~CallbackRequest() {
Clear();
// The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown.
std::lock_guard<std::mutex> l(server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.notify_one();
}
}
void Request() {
bool Request() {
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
@ -366,7 +384,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
&request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
cq_->cq(), static_cast<void*>(&tag_))) {
return;
return false;
}
} else {
if (!call_details_) {
@ -376,9 +394,10 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
&request_metadata_, cq_->cq(), cq_->cq(),
static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
return;
return false;
}
}
return true;
}
bool FinalizeResult(void** tag, bool* status) override { return false; }
@ -409,10 +428,48 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == req_);
if (!ok) {
// The call has been shutdown
req_->Clear();
return;
bool spawn_new = false;
{
std::unique_lock<std::mutex> l(req_->req_list_->reqs_mu);
req_->req_list_->reqs_list.erase(req_->req_list_iterator_);
req_->req_list_->reqs_list_sz--;
if (!ok) {
// The call has been shutdown.
// Delete its contents to free up the request.
// First release the lock in case the deletion of the request
// completes the full server shutdown and allows the destructor
// of the req_list to proceed.
l.unlock();
delete req_;
return;
}
// If this was the last request in the list or it is below the soft
// minimum and there are spare requests available, set up a new one, but
// do it outside the lock since the Request could otherwise deadlock
if (req_->req_list_->reqs_list_sz == 0 ||
(req_->req_list_->reqs_list_sz <
SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
spawn_new = true;
}
}
if (spawn_new) {
auto* new_req = new CallbackRequest(req_->server_, req_->req_list_,
req_->method_, req_->method_tag_);
if (!new_req->Request()) {
// The server must have just decided to shutdown. Erase
// from the list under lock but release the lock before
// deleting the new_req (in case that request was what
// would allow the destruction of the req_list)
{
std::lock_guard<std::mutex> l(new_req->req_list_->reqs_mu);
new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_);
new_req->req_list_->reqs_list_sz--;
}
delete new_req;
}
}
// Bind the call, deadline, and metadata from what we got
@ -462,17 +519,30 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
[this] {
req_->Reset();
req_->Request();
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
}
};
void Reset() {
Clear();
Setup();
}
void Clear() {
if (call_details_) {
delete call_details_;
@ -492,9 +562,15 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
request_payload_ = nullptr;
request_ = nullptr;
request_status_ = Status();
std::lock_guard<std::mutex> l(req_list_->reqs_mu);
req_list_->reqs_list.push_front(this);
req_list_->reqs_list_sz++;
req_list_iterator_ = req_list_->reqs_list.begin();
}
Server* const server_;
Server::MethodReqList* req_list_;
Server::MethodReqList::iterator req_list_iterator_;
internal::RpcServiceMethod* const method_;
void* const method_tag_;
const bool has_request_payload_;
@ -715,6 +791,13 @@ Server::~Server() {
}
grpc_server_destroy(server_);
for (auto* method_list : callback_reqs_) {
// The entries of the method_list should have already been emptied
// during Shutdown as each request is failed by Shutdown. Check that
// this actually happened.
GPR_ASSERT(method_list->reqs_list.empty());
delete method_list;
}
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@ -794,10 +877,12 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
}
} else {
// a callback method. Register at least some callback requests
callback_reqs_.push_back(new Server::MethodReqList);
auto* method_req_list = callback_reqs_.back();
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
auto* req = new CallbackRequest(this, method, method_registration_tag);
callback_reqs_.emplace_back(req);
new CallbackRequest(this, method_req_list, method,
method_registration_tag);
}
// Enqueue it so that it will be Request'ed later once
// all request matchers are created at core server startup
@ -889,8 +974,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start();
}
for (auto& cbreq : callback_reqs_) {
cbreq->Request();
for (auto* cbmethods : callback_reqs_) {
for (auto* cbreq : cbmethods->reqs_list) {
GPR_ASSERT(cbreq->Request());
}
}
if (default_health_check_service_impl != nullptr) {
@ -900,49 +987,69 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_) {
shutdown_ = true;
if (shutdown_) {
return;
}
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
shutdown_ = true;
shutdown_cq.Shutdown();
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
shutdown_cq.Shutdown();
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
// Wait for all outstanding callback requests to complete
// (whether waiting for a match or already active).
// We know that no new requests will be created after this point
// because they are only created at server startup time or when
// we have a successful match on a request. During the shutdown phase,
// requests that have not yet matched will be failed rather than
// allowed to succeed, which will cause the server to delete the
// request and decrement the count. Possibly a request will match before
// the shutdown but then find that shutdown has already started by the
// time it tries to register a new request. In that case, the registration
// will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter.
{
std::unique_lock<std::mutex> cblock(callback_reqs_mu_);
callback_reqs_done_cv_.wait(
cblock, [this] { return callback_reqs_outstanding_ == 0; });
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
}
void Server::Wait() {

@ -105,6 +105,7 @@ Pod::Spec.new do |s|
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
# Restrict the gRPC runtime version to the one supported by this plugin.
s.dependency 'gRPC-ProtoRPC', v

@ -1,4 +1,5 @@
# This file has been automatically generated from a template file.
# Please make modifications to
# `templates/src/objective-c/BoringSSL-GRPC.podspec.template` instead. This
@ -38,7 +39,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL-GRPC'
version = '0.0.2'
version = '0.0.3'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google\'s needs.'
# Adapted from the homepage:
@ -80,6 +81,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '5.0'
s.osx.deployment_target = '10.7'
s.tvos.deployment_target = '10.0'
name = 'openssl_grpc'

@ -22,9 +22,17 @@ cd src/php/bin
source ./determine_extension_dir.sh
# in some jenkins macos machine, somehow the PHP build script can't find libgrpc.dylib
export DYLD_LIBRARY_PATH=$root/libs/$CONFIG
php $extension_dir -d max_execution_time=300 $(which phpunit) -v --debug \
--exclude-group persistent_list_bound_tests ../tests/unit_tests
php $extension_dir -d max_execution_time=300 $(which phpunit) -v --debug \
../tests/unit_tests/PersistentChannelTests
export ZEND_DONT_UNLOAD_MODULES=1
export USE_ZEND_ALLOC=0
# Detect whether valgrind is executable
if [ -x "$(command -v valgrind)" ]; then
valgrind --error-exitcode=10 --leak-check=yes php $extension_dir -d max_execution_time=300 \
../tests/MemoryLeakTest/MemoryLeakTest.php
fi

File diff suppressed because it is too large Load Diff

@ -86,6 +86,16 @@ class CallTest extends PHPUnit_Framework_TestCase
$this->assertTrue($result->send_metadata);
}
public function testAddMultiAndMultiValueMetadata()
{
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1', 'value2'],
'key2' => ['value3', 'value4'],],
];
$result = $this->call->startBatch($batch);
$this->assertTrue($result->send_metadata);
}
public function testGetPeer()
{
$this->assertTrue(is_string($this->call->getPeer()));

@ -14,11 +14,25 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
grpc_package(name = "core", visibility = "public")
grpc_package(
name = "core",
visibility = "public",
)
grpc_proto_library(
name = "stats_proto",
srcs = ["stats.proto"],
)
py_proto_library(
name = "py_stats_proto",
protos = ["stats.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)

@ -14,11 +14,14 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
grpc_package(name = "testing", visibility = "public")
grpc_package(
name = "testing",
visibility = "public",
)
exports_files([
"echo.proto",
@ -50,9 +53,11 @@ grpc_proto_library(
grpc_proto_library(
name = "echo_proto",
srcs = ["echo.proto"],
deps = ["echo_messages_proto",
"simple_messages_proto"],
generate_mocks = True,
deps = [
"echo_messages_proto",
"simple_messages_proto",
],
)
grpc_proto_library(
@ -63,10 +68,10 @@ grpc_proto_library(
py_proto_library(
name = "py_empty_proto",
protos = ["empty.proto",],
protos = ["empty.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
@ -78,10 +83,10 @@ grpc_proto_library(
py_proto_library(
name = "py_messages_proto",
protos = ["messages.proto",],
protos = ["messages.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
@ -100,7 +105,7 @@ grpc_proto_library(
name = "benchmark_service_proto",
srcs = ["benchmark_service.proto"],
deps = [
"messages_proto",
"messages_proto",
],
)
@ -108,7 +113,7 @@ grpc_proto_library(
name = "report_qps_scenario_service_proto",
srcs = ["report_qps_scenario_service.proto"],
deps = [
"control_proto",
"control_proto",
],
)
@ -116,7 +121,7 @@ grpc_proto_library(
name = "worker_service_proto",
srcs = ["worker_service.proto"],
deps = [
"control_proto",
"control_proto",
],
)
@ -132,7 +137,7 @@ grpc_proto_library(
has_services = False,
deps = [
"//src/proto/grpc/core:stats_proto",
]
],
)
grpc_proto_library(
@ -146,14 +151,71 @@ grpc_proto_library(
py_proto_library(
name = "py_test_proto",
protos = ["test.proto",],
proto_deps = [
":py_empty_proto",
":py_messages_proto",
],
protos = ["test.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
py_proto_library(
name = "py_benchmark_service_proto",
proto_deps = [
":py_empty_proto",
":py_messages_proto",
]
],
protos = ["benchmark_service.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_payloads_proto",
protos = ["payloads.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_stats_proto",
proto_deps = [
"//src/proto/grpc/core:py_stats_proto",
],
protos = ["stats.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_control_proto",
proto_deps = [
":py_payloads_proto",
":py_stats_proto",
],
protos = ["control.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_worker_service_proto",
proto_deps = [
":py_control_proto",
],
protos = ["worker_service.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)

@ -40,7 +40,7 @@ def _parallel_compile(self,
# setup the same way as distutils.ccompiler.CCompiler
# https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564
macros, objects, extra_postargs, pp_opts, build = self._setup_compile(
output_dir, macros, include_dirs, sources, depends, extra_postargs)
str(output_dir), macros, include_dirs, sources, depends, extra_postargs)
cc_args = self._get_cc_args(pp_opts, debug, extra_preargs)
def _compile_single_file(obj):

@ -37,19 +37,13 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
'coverage>=4.0',
'enum34>=1.0.4',
'coverage>=4.0', 'enum34>=1.0.4',
'grpcio>={version}'.format(version=grpc_version.VERSION),
# TODO(https://github.com/pypa/warehouse/issues/5196)
# Re-enable it once we got the name back
# 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-status>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
'oauth2client>=1.4.7',
'protobuf>=3.6.0',
'six>=1.10',
'google-auth>=1.0.0',
'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0',
'requests>=2.14.2')
if not PY3:

@ -91,7 +91,6 @@ def _close_channel_server_pairs(pairs):
pair.channel.close()
@unittest.skip('https://github.com/pypa/warehouse/issues/5196')
class ChannelzServicerTest(unittest.TestCase):
def _send_successful_unary_unary(self, idx):

@ -0,0 +1,103 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
package(default_visibility = ["//visibility:public"])
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
py_library(
name = "benchmark_client",
srcs = ["benchmark_client.py"],
imports = ["../../"],
deps = [
requirement("six"),
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_messages_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:resources",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
py_library(
name = "benchmark_server",
srcs = ["benchmark_server.py"],
imports = ["../../"],
deps = [
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_messages_proto",
],
)
py_library(
name = "client_runner",
srcs = ["client_runner.py"],
imports = ["../../"],
)
py_library(
name = "histogram",
srcs = ["histogram.py"],
imports = ["../../"],
deps = [
"//src/proto/grpc/testing:py_stats_proto",
],
)
py_library(
name = "worker_server",
srcs = ["worker_server.py"],
imports = ["../../"],
deps = [
":benchmark_client",
":benchmark_server",
":client_runner",
":histogram",
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_control_proto",
"//src/proto/grpc/testing:py_stats_proto",
"//src/proto/grpc/testing:py_worker_service_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:resources",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
py_binary(
name = "qps_worker",
srcs = ["qps_worker.py"],
imports = ["../../"],
main = "qps_worker.py",
deps = [
":worker_server",
"//src/proto/grpc/testing:py_worker_service_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
filegroup(
name = "scenarios",
srcs = ["scenarios.json"],
)
sh_test(
name = "basic_benchmark_test",
srcs = ["basic_benchmark_test.sh"],
data = [
":qps_worker",
":scenarios",
"//test/cpp/qps:qps_json_driver",
],
)

@ -0,0 +1,102 @@
# Python Benchmark Tools
## Scenarios
In `src/proto/grpc/testing/control.proto`, it defines the fields of a scenario.
In `tools/run_tests/performance/scenario_config.py`, the script generates actual scenario content that usually in json format, or piped to another script.
All Python related benchmark scenarios are:
* netperf
* python_generic_sync_streaming_ping_pong
* python_protobuf_sync_streaming_ping_pong
* python_protobuf_async_unary_ping_pong
* python_protobuf_sync_unary_ping_pong
* python_protobuf_sync_unary_qps_unconstrained
* python_protobuf_sync_streaming_qps_unconstrained
* python_protobuf_sync_unary_ping_pong_1MB
Here we picked a small but representative subset, and reduce their benchmark duration from 30 seconds to 10 seconds:
* python_protobuf_async_unary_ping_pong
* python_protobuf_sync_streaming_ping_pong
## Why keep the scenario file if it can be generated?
Well... The `tools/run_tests/performance/scenario_config.py` is 1274 lines long. The intention of building these benchmark tools is reducing the complexity of existing infrastructure code. So, instead of calling layers of abstraction to generate the scenario file, keeping a valid static copy is preferable.
Also, if the use case for this tool grows beyond simple static scenarios, we can incorporate automatic generation and selection of scenarios into the tool.
## How to run it?
```shell
bazel test --test_output=streamed src/python/grpcio_tests/tests/qps:basic_benchmark_test
```
## What does the output look like?
```
RUNNING SCENARIO: python_protobuf_async_unary_ping_pong
I0123 00:26:04.746195000 140736237159296 driver.cc:288] Starting server on localhost:10086 (worker #0)
D0123 00:26:04.747190000 140736237159296 ev_posix.cc:170] Using polling engine: poll
D0123 00:26:04.747264000 140736237159296 dns_resolver_ares.cc:488] Using ares dns resolver
I0123 00:26:04.748445000 140736237159296 subchannel.cc:869] Connect failed: {"created":"@1548203164.748403000","description":"Failed to connect to remote host: Connection refused","errno":61,"file":"src/core/lib/iomgr/tcp_client_posix.cc","file_line":207,"os_error":"Connection refused","syscall":"connect","target_address":"ipv6:[::1]:10086"}
I0123 00:26:04.748585000 140736237159296 subchannel.cc:869] Connect failed: {"created":"@1548203164.748564000","description":"Failed to connect to remote host: Connection refused","errno":61,"file":"src/core/lib/iomgr/tcp_client_posix.cc","file_line":207,"os_error":"Connection refused","syscall":"connect","target_address":"ipv4:127.0.0.1:10086"}
I0123 00:26:04.748596000 140736237159296 subchannel.cc:751] Subchannel 0x7fca43c19360: Retry in 999 milliseconds
I0123 00:26:05.751251000 123145571299328 subchannel.cc:710] Failed to connect to channel, retrying
I0123 00:26:05.752209000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca45000060 for subchannel 0x7fca43c19360
I0123 00:26:05.772291000 140736237159296 driver.cc:349] Starting client on localhost:10087 (worker #1)
D0123 00:26:05.772384000 140736237159296 driver.cc:373] Client 0 gets 1 channels
I0123 00:26:05.773286000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca45004a80 for subchannel 0x7fca451034b0
I0123 00:26:05.789797000 140736237159296 driver.cc:394] Initiating
I0123 00:26:05.790858000 140736237159296 driver.cc:415] Warming up
I0123 00:26:07.791078000 140736237159296 driver.cc:421] Starting
I0123 00:26:07.791860000 140736237159296 driver.cc:448] Running
I0123 00:26:17.790915000 140736237159296 driver.cc:462] Finishing clients
I0123 00:26:17.791821000 140736237159296 driver.cc:476] Received final status from client 0
I0123 00:26:17.792148000 140736237159296 driver.cc:508] Finishing servers
I0123 00:26:17.792493000 140736237159296 driver.cc:522] Received final status from server 0
I0123 00:26:17.795786000 140736237159296 report.cc:82] QPS: 2066.6
I0123 00:26:17.795799000 140736237159296 report.cc:122] QPS: 2066.6 (258.3/server core)
I0123 00:26:17.795805000 140736237159296 report.cc:127] Latencies (50/90/95/99/99.9%-ile): 467.9/504.8/539.0/653.3/890.4 us
I0123 00:26:17.795811000 140736237159296 report.cc:137] Server system time: 100.00%
I0123 00:26:17.795815000 140736237159296 report.cc:139] Server user time: 100.00%
I0123 00:26:17.795818000 140736237159296 report.cc:141] Client system time: 100.00%
I0123 00:26:17.795821000 140736237159296 report.cc:143] Client user time: 100.00%
I0123 00:26:17.795825000 140736237159296 report.cc:148] Server CPU usage: 0.00%
I0123 00:26:17.795828000 140736237159296 report.cc:153] Client Polls per Request: 0.00
I0123 00:26:17.795831000 140736237159296 report.cc:155] Server Polls per Request: 0.00
I0123 00:26:17.795834000 140736237159296 report.cc:160] Server Queries/CPU-sec: 1033.19
I0123 00:26:17.795837000 140736237159296 report.cc:162] Client Queries/CPU-sec: 1033.32
RUNNING SCENARIO: python_protobuf_sync_streaming_ping_pong
I0123 00:26:17.795888000 140736237159296 driver.cc:288] Starting server on localhost:10086 (worker #0)
D0123 00:26:17.795964000 140736237159296 ev_posix.cc:170] Using polling engine: poll
D0123 00:26:17.795978000 140736237159296 dns_resolver_ares.cc:488] Using ares dns resolver
I0123 00:26:17.796613000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca43c15820 for subchannel 0x7fca43d12140
I0123 00:26:17.810911000 140736237159296 driver.cc:349] Starting client on localhost:10087 (worker #1)
D0123 00:26:17.811037000 140736237159296 driver.cc:373] Client 0 gets 1 channels
I0123 00:26:17.811892000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca43d18f40 for subchannel 0x7fca43d16b80
I0123 00:26:17.818902000 140736237159296 driver.cc:394] Initiating
I0123 00:26:17.820776000 140736237159296 driver.cc:415] Warming up
I0123 00:26:19.824685000 140736237159296 driver.cc:421] Starting
I0123 00:26:19.825970000 140736237159296 driver.cc:448] Running
I0123 00:26:29.821866000 140736237159296 driver.cc:462] Finishing clients
I0123 00:26:29.823259000 140736237159296 driver.cc:476] Received final status from client 0
I0123 00:26:29.827195000 140736237159296 driver.cc:508] Finishing servers
I0123 00:26:29.827599000 140736237159296 driver.cc:522] Received final status from server 0
I0123 00:26:29.828739000 140736237159296 report.cc:82] QPS: 619.5
I0123 00:26:29.828752000 140736237159296 report.cc:122] QPS: 619.5 (77.4/server core)
I0123 00:26:29.828760000 140736237159296 report.cc:127] Latencies (50/90/95/99/99.9%-ile): 1589.8/1854.3/1920.4/2015.8/2204.8 us
I0123 00:26:29.828765000 140736237159296 report.cc:137] Server system time: 100.00%
I0123 00:26:29.828769000 140736237159296 report.cc:139] Server user time: 100.00%
I0123 00:26:29.828772000 140736237159296 report.cc:141] Client system time: 100.00%
I0123 00:26:29.828776000 140736237159296 report.cc:143] Client user time: 100.00%
I0123 00:26:29.828780000 140736237159296 report.cc:148] Server CPU usage: 0.00%
I0123 00:26:29.828784000 140736237159296 report.cc:153] Client Polls per Request: 0.00
I0123 00:26:29.828788000 140736237159296 report.cc:155] Server Polls per Request: 0.00
I0123 00:26:29.828792000 140736237159296 report.cc:160] Server Queries/CPU-sec: 309.58
I0123 00:26:29.828795000 140736237159296 report.cc:162] Client Queries/CPU-sec: 309.75
```
## Future Works (TODOs)
1. Generate a target for each scenario.
2. Simplify the main entrance of our benchmark related code, or make it depends on Bazel.

@ -0,0 +1,45 @@
#! /bin/bash
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This test benchmarks Python client/server.
set -ex
declare -a DRIVER_PORTS=("10086" "10087")
SCENARIOS_FILE=src/python/grpcio_tests/tests/qps/scenarios.json
function join { local IFS="$1"; shift; echo "$*"; }
if [[ -e "${SCENARIOS_FILE}" ]]; then
echo "Running against ${SCENARIOS_FILE}:"
cat "${SCENARIOS_FILE}"
else
echo "Failed to find ${SCENARIOS_FILE}!"
exit 1
fi
echo "Starting Python qps workers..."
qps_workers=()
for DRIVER_PORT in "${DRIVER_PORTS[@]}"
do
echo -e "\tRunning Python qps worker listening at localhost:${DRIVER_PORT}..."
src/python/grpcio_tests/tests/qps/qps_worker \
--driver_port="${DRIVER_PORT}" &
qps_workers+=("localhost:${DRIVER_PORT}")
done
echo "Running qps json driver..."
QPS_WORKERS=$(join , ${qps_workers[@]})
export QPS_WORKERS
test/cpp/qps/qps_json_driver --scenarios_file="${SCENARIOS_FILE}"

@ -0,0 +1,96 @@
{
"scenarios": [
{
"name": "python_protobuf_async_unary_ping_pong",
"clientConfig": {
"clientType": "ASYNC_CLIENT",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"outstandingRpcsPerChannel": 1,
"clientChannels": 1,
"asyncClientThreads": 1,
"loadParams": {
"closedLoop": {}
},
"payloadConfig": {
"simpleParams": {}
},
"histogramParams": {
"resolution": 0.01,
"maxPossible": 60000000000
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numClients": 1,
"serverConfig": {
"serverType": "ASYNC_SERVER",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numServers": 1,
"warmupSeconds": 2,
"benchmarkSeconds": 10
},
{
"name": "python_protobuf_sync_streaming_ping_pong",
"clientConfig": {
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"outstandingRpcsPerChannel": 1,
"clientChannels": 1,
"asyncClientThreads": 1,
"rpcType": "STREAMING",
"loadParams": {
"closedLoop": {}
},
"payloadConfig": {
"simpleParams": {}
},
"histogramParams": {
"resolution": 0.01,
"maxPossible": 60000000000
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numClients": 1,
"serverConfig": {
"serverType": "ASYNC_SERVER",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numServers": 1,
"warmupSeconds": 2,
"benchmarkSeconds": 10
}
]
}

@ -140,7 +140,7 @@
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
# version = '${settings.version}'
version = '${modify_podspec_version_string('0.0.6', settings.version)}'
version = '${modify_podspec_version_string('0.0.8', settings.version)}'
s.version = version
s.summary = 'gRPC C++ library'
s.homepage = 'https://grpc.io'
@ -156,6 +156,8 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpcpp'

@ -99,6 +99,8 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpc'
@ -174,7 +176,7 @@
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
ss.dependency 'BoringSSL-GRPC', '0.0.2'
ss.dependency 'BoringSSL-GRPC', '0.0.3'
ss.dependency 'nanopb', '~> 0.3'
ss.compiler_flags = '-DGRPC_SHADOW_BORINGSSL_SYMBOLS'

@ -37,6 +37,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'ProtoRPC'
s.module_name = name

@ -37,6 +37,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'RxLibrary'
s.module_name = name

@ -36,6 +36,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'GRPCClient'
s.module_name = name

@ -107,6 +107,7 @@
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
# Restrict the gRPC runtime version to the one supported by this plugin.
s.dependency 'gRPC-ProtoRPC', v

@ -4,6 +4,7 @@
def expand_symbol_list(symbol_list):
return ',\n '.join("'#define %s GRPC_SHADOW_%s'" % (symbol, symbol) for symbol in symbol_list)
%>
# This file has been automatically generated from a template file.
# Please make modifications to
# `templates/src/objective-c/BoringSSL-GRPC.podspec.template` instead. This
@ -43,7 +44,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL-GRPC'
version = '0.0.2'
version = '0.0.3'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google\'s needs.'
# Adapted from the homepage:
@ -85,6 +86,7 @@
s.ios.deployment_target = '5.0'
s.osx.deployment_target = '10.7'
s.tvos.deployment_target = '10.0'
name = 'openssl_grpc'

@ -55,6 +55,7 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,

@ -0,0 +1,7 @@
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y ${'\\'}
valgrind

@ -19,6 +19,7 @@
<%include file="../../php7_deps.include"/>
<%include file="../../gcp_api_libraries.include"/>
<%include file="../../python_deps.include"/>
<%include file="../../php_valgrind.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
CMD ["bash"]

@ -20,6 +20,7 @@
<%include file="../../gcp_api_libraries.include"/>
<%include file="../../python_deps.include"/>
<%include file="../../php_deps.include"/>
<%include file="../../php_valgrind.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
CMD ["bash"]

@ -75,7 +75,7 @@ int main(int argc, char** argv) {
test_succeeds(dns, "dns:www.google.com");
test_succeeds(dns, "dns:///www.google.com");
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
if (resolver_env != nullptr && gpr_stricmp(resolver_env, "native") == 0) {
if (resolver_env == nullptr || gpr_stricmp(resolver_env, "native") == 0) {
test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888");
} else {
test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");

@ -260,6 +260,7 @@ typedef struct final_status_data {
static void server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
auto* data = static_cast<final_status_data*>(elem->call_data);
gpr_mu_lock(&g_mu);
if (data->call == g_server_call_stack) {
if (op->send_initial_metadata) {
auto* batch = op->payload->send_initial_metadata.send_initial_metadata;
@ -270,6 +271,7 @@ static void server_start_transport_stream_op_batch(
}
}
}
gpr_mu_unlock(&g_mu);
grpc_call_next_op(elem, op);
}

@ -167,7 +167,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
grpc_endpoint_write(ep_, &write_slices, &write_done);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
@ -224,7 +224,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
grpc_endpoint_write(ep_, &write_slices, &write_done);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
@ -273,7 +273,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
grpc_endpoint_write(ep_, &write_slices, &write_done);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);

@ -389,46 +389,49 @@ static void test_callback(void) {
attr.cq_shutdown_cb = &shutdown_cb;
for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
grpc_core::ExecCtx exec_ctx; // reset exec_ctx
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int sumtags = 0;
int counter = 0;
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
{
// reset exec_ctx types
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb,
int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
};
private:
int* counter_;
int tag_;
};
private:
int* counter_;
int tag_;
};
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
int sumtags = 0;
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
shutdown_and_destroy(cc);
}
GPR_ASSERT(sumtags == counter);
shutdown_and_destroy(cc);
GPR_ASSERT(got_shutdown);
got_shutdown = false;
}

@ -223,6 +223,7 @@ grpc_cc_test(
":end2end_test_lib",
],
size = "large", # with poll-cv this takes long, see #17493
timeout = "long",
)
grpc_cc_test(

@ -54,6 +54,7 @@ DEFINE_string(
"custom_metadata: server will echo custom metadata;\n"
"empty_stream : bi-di stream with no request/response;\n"
"empty_unary : empty (zero bytes) request and response;\n"
"google_default_credentials: large unary using GDC;\n"
"half_duplex : half-duplex streaming;\n"
"jwt_token_creds: large_unary with JWT token auth;\n"
"large_unary : single request and (large) response;\n"
@ -151,6 +152,11 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client,
GetServiceAccountJsonKey());
}
if (FLAGS_custom_credentials_type == "google_default_credentials") {
actions["google_default_credentials"] =
std::bind(&grpc::testing::InteropClient::DoGoogleDefaultCredentials,
&client, FLAGS_default_service_account);
}
actions["status_code_and_message"] =
std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client);
actions["custom_metadata"] =

@ -294,6 +294,25 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
return true;
}
bool InteropClient::DoGoogleDefaultCredentials(
const grpc::string& default_service_account) {
gpr_log(GPR_DEBUG,
"Sending a large unary rpc with GoogleDefaultCredentials...");
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
if (!PerformLargeUnary(&request, &response)) {
return false;
}
gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(response.username().c_str() == default_service_account);
gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
return true;
}
bool InteropClient::DoLargeUnary() {
gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
SimpleRequest request;

@ -89,6 +89,8 @@ class InteropClient {
const grpc::string& oauth_scope);
// username is a string containing the user email
bool DoPerRpcCreds(const grpc::string& json_key);
// username is the GCE default service account email
bool DoGoogleDefaultCredentials(const grpc::string& username);
private:
class ServiceStub {

@ -101,8 +101,6 @@ class DummyEndpoint : public grpc_endpoint {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; }
static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {

@ -55,6 +55,7 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,

@ -14,8 +14,10 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package")
load("//test/cpp/qps:qps_benchmark_script.bzl", "qps_json_driver_batch", "json_run_localhost_batch")
package(default_visibility = ["//visibility:public"])
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/qps:qps_benchmark_script.bzl", "json_run_localhost_batch", "qps_json_driver_batch")
grpc_package(name = "test/cpp/qps")

@ -79,6 +79,14 @@ RUN pip install --upgrade pip==10.0.1
RUN pip install virtualenv
RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 twisted==17.5.0
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y \
valgrind
RUN mkdir /var/local/jenkins

@ -76,6 +76,14 @@ RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 t
RUN apt-get update && apt-get install -y \
git php5 php5-dev phpunit unzip
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y \
valgrind
RUN mkdir /var/local/jenkins

@ -30,7 +30,8 @@ export GRPC_DEFAULT_SSL_ROOTS_FILE_PATH="$(pwd)/etc/roots.pem"
# building all languages in the same working copy can also lead to conflicts
# due to different compilation flags
tools/run_tests/run_interop_tests.py -l c++ \
--cloud_to_prod --cloud_to_prod_auth --prod_servers default gateway_v4 \
--cloud_to_prod --cloud_to_prod_auth --on_gce=false \
--prod_servers default gateway_v4 \
--service_account_key_file="${KOKORO_GFILE_DIR}/GrpcTesting-726eb1347f15.json" \
--skip_compute_engine_creds --internal_ci -t -j 4 || FAILED="true"

@ -65,6 +65,12 @@ _SKIP_ADVANCED = [
_SKIP_SPECIAL_STATUS_MESSAGE = ['special_status_message']
_GOOGLE_DEFAULT_CREDS_TEST_CASE = 'google_default_credentials'
_SKIP_GOOGLE_DEFAULT_CREDS = [
_GOOGLE_DEFAULT_CREDS_TEST_CASE,
]
_TEST_TIMEOUT = 3 * 60
# disable this test on core-based languages,
@ -129,7 +135,7 @@ class CSharpLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -158,7 +164,7 @@ class CSharpCoreCLRLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -188,7 +194,7 @@ class DartLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE
@ -223,7 +229,7 @@ class JavaLanguage:
return {}
def unimplemented_test_cases(self):
return []
return _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -248,7 +254,7 @@ class JavaOkHttpClient:
return {}
def unimplemented_test_cases(self):
return _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def __str__(self):
return 'javaokhttp'
@ -279,7 +285,7 @@ class GoLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION
return _SKIP_COMPRESSION + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -309,7 +315,7 @@ class Http2Server:
return {}
def unimplemented_test_cases(self):
return _TEST_CASES + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _TEST_CASES + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _TEST_CASES
@ -339,7 +345,7 @@ class Http2Client:
return {}
def unimplemented_test_cases(self):
return _TEST_CASES + _SKIP_SPECIAL_STATUS_MESSAGE
return _TEST_CASES + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _TEST_CASES
@ -376,7 +382,7 @@ class NodeLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -406,7 +412,7 @@ class NodePureJSLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return []
@ -431,7 +437,7 @@ class PHPLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return []
@ -456,7 +462,7 @@ class PHP7Language:
return {}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return []
@ -491,7 +497,7 @@ class ObjcLanguage:
# cmdline argument. Here we return all but one test cases as unimplemented,
# and depend upon ObjC test's behavior that it runs all cases even when
# we tell it to run just one.
return _TEST_CASES[1:] + _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _TEST_CASES[1:] + _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -526,7 +532,7 @@ class RubyLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE
return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -571,7 +577,7 @@ class PythonLanguage:
}
def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_GOOGLE_DEFAULT_CREDS
def unimplemented_test_cases_server(self):
return _SKIP_COMPRESSION
@ -614,8 +620,11 @@ _TEST_CASES = [
]
_AUTH_TEST_CASES = [
'compute_engine_creds', 'jwt_token_creds', 'oauth2_auth_token',
'per_rpc_creds'
'compute_engine_creds',
'jwt_token_creds',
'oauth2_auth_token',
'per_rpc_creds',
_GOOGLE_DEFAULT_CREDS_TEST_CASE,
]
_HTTP2_TEST_CASES = ['tls', 'framing']
@ -714,7 +723,7 @@ def compute_engine_creds_required(language, test_case):
return False
def auth_options(language, test_case, service_account_key_file=None):
def auth_options(language, test_case, on_gce, service_account_key_file=None):
"""Returns (cmdline, env) tuple with cloud_to_prod_auth test options."""
language = str(language)
@ -728,9 +737,6 @@ def auth_options(language, test_case, service_account_key_file=None):
key_file_arg = '--service_account_key_file=%s' % service_account_key_file
default_account_arg = '--default_service_account=830293263384-compute@developer.gserviceaccount.com'
# TODO: When using google_default_credentials outside of cloud-to-prod, the environment variable
# 'GOOGLE_APPLICATION_CREDENTIALS' needs to be set for the test case
# 'jwt_token_creds' to work.
if test_case in ['jwt_token_creds', 'per_rpc_creds', 'oauth2_auth_token']:
if language in [
'csharp', 'csharpcoreclr', 'node', 'php', 'php7', 'python',
@ -750,6 +756,11 @@ def auth_options(language, test_case, service_account_key_file=None):
if test_case == 'compute_engine_creds':
cmdargs += [oauth_scope_arg, default_account_arg]
if test_case == _GOOGLE_DEFAULT_CREDS_TEST_CASE:
if not on_gce:
env['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_key_file
cmdargs += [default_account_arg]
return (cmdargs, env)
@ -767,6 +778,7 @@ def cloud_to_prod_jobspec(language,
test_case,
server_host_nickname,
server_host,
on_gce,
docker_image=None,
auth=False,
manual_cmd_log=None,
@ -792,7 +804,7 @@ def cloud_to_prod_jobspec(language,
cmdargs = cmdargs + transport_security_options
environ = dict(language.cloud_to_prod_env(), **language.global_env())
if auth:
auth_cmdargs, auth_env = auth_options(language, test_case,
auth_cmdargs, auth_env = auth_options(language, test_case, on_gce,
service_account_key_file)
cmdargs += auth_cmdargs
environ.update(auth_env)
@ -1070,6 +1082,12 @@ argp.add_argument(
action='store_const',
const=True,
help='Run cloud_to_prod_auth tests.')
argp.add_argument(
'--on_gce',
default=True,
action='store_const',
const=True,
help='Whether or not this test script is running on GCE.')
argp.add_argument(
'--prod_servers',
choices=prod_servers.keys(),
@ -1325,6 +1343,7 @@ try:
test_case,
server_host_nickname,
prod_servers[server_host_nickname],
on_gce=args.on_gce,
docker_image=docker_images.get(str(language)),
manual_cmd_log=client_manual_cmd_log,
service_account_key_file=args.
@ -1339,6 +1358,7 @@ try:
test_case,
server_host_nickname,
prod_servers[server_host_nickname],
on_gce=args.on_gce,
docker_image=docker_images.get(
str(language)),
manual_cmd_log=client_manual_cmd_log,
@ -1355,6 +1375,7 @@ try:
test_case,
server_host_nickname,
prod_servers[server_host_nickname],
on_gce=args.on_gce,
docker_image=docker_images.get(str(http2Interop)),
manual_cmd_log=client_manual_cmd_log,
service_account_key_file=args.service_account_key_file,
@ -1373,36 +1394,22 @@ try:
not compute_engine_creds_required(
language, test_case)):
if not test_case in language.unimplemented_test_cases():
tls_test_job = cloud_to_prod_jobspec(
transport_security = 'tls'
if test_case == _GOOGLE_DEFAULT_CREDS_TEST_CASE:
transport_security = 'google_default_credentials'
test_job = cloud_to_prod_jobspec(
language,
test_case,
server_host_nickname,
prod_servers[server_host_nickname],
on_gce=args.on_gce,
docker_image=docker_images.get(str(language)),
auth=True,
manual_cmd_log=client_manual_cmd_log,
service_account_key_file=args.
service_account_key_file,
transport_security='tls')
jobs.append(tls_test_job)
if str(language) in [
'go'
]: # Add more languages to the list to turn on tests.
google_default_creds_test_job = cloud_to_prod_jobspec(
language,
test_case,
server_host_nickname,
prod_servers[server_host_nickname],
docker_image=docker_images.get(
str(language)),
auth=True,
manual_cmd_log=client_manual_cmd_log,
service_account_key_file=args.
service_account_key_file,
transport_security=
'google_default_credentials')
jobs.append(google_default_creds_test_job)
transport_security=transport_security)
jobs.append(test_job)
for server in args.override_server:
server_name = server[0]
(server_host, server_port) = server[1].split(':')

Loading…
Cancel
Save