Merge branch 'master' into fray

pull/3037/head
Vijay Pai 9 years ago
commit 0660eaca00
  1. 48
      Makefile
  2. 18
      build.json
  3. 3
      include/grpc++/impl/call.h
  4. 16
      include/grpc++/impl/rpc_service_method.h
  5. 17
      include/grpc++/server.h
  6. 4
      include/grpc++/server_builder.h
  7. 2
      include/grpc++/stream.h
  8. 26
      include/grpc/compression.h
  9. 14
      include/grpc/grpc_security.h
  10. 63
      src/core/channel/channel_args.c
  11. 20
      src/core/channel/channel_args.h
  12. 7
      src/core/iomgr/pollset.h
  13. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  14. 2
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  15. 15
      src/core/iomgr/pollset_posix.c
  16. 6
      src/core/iomgr/pollset_posix.h
  17. 10
      src/core/iomgr/pollset_windows.c
  18. 2
      src/core/security/google_default_credentials.c
  19. 28
      src/core/security/server_auth_filter.c
  20. 16
      src/core/surface/completion_queue.c
  21. 99
      src/cpp/server/server.cc
  22. 8
      src/cpp/server/server_builder.cc
  23. 2
      src/python/grpcio/grpc/_adapter/_c/types.h
  24. 16
      src/python/grpcio/grpc/_adapter/_c/types/server.c
  25. 3
      src/python/grpcio/grpc/_adapter/_low.py
  26. 30
      src/python/grpcio/grpc/framework/interfaces/face/__init__.py
  27. 933
      src/python/grpcio/grpc/framework/interfaces/face/face.py
  28. 178
      src/python/grpcio/grpc/framework/interfaces/face/utilities.py
  29. 199
      src/python/grpcio_test/grpc_test/_adapter/_low_test.py
  30. 141
      test/core/channel/channel_args_test.c
  31. 2
      test/core/end2end/fixtures/chttp2_fake_security.c
  32. 2
      test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
  33. 2
      test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c
  34. 2
      test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c
  35. 4
      test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
  36. 6
      test/core/httpcli/httpcli_test.c
  37. 9
      test/core/iomgr/endpoint_tests.c
  38. 12
      test/core/iomgr/fd_posix_test.c
  39. 9
      test/core/iomgr/tcp_client_posix_test.c
  40. 15
      test/core/iomgr/tcp_posix_test.c
  41. 3
      test/core/iomgr/tcp_server_posix_test.c
  42. 3
      test/core/iomgr/udp_server_test.c
  43. 2
      test/core/security/oauth2_utils.c
  44. 4
      test/core/security/print_google_default_creds_token.c
  45. 2
      test/core/security/verify_jwt.c
  46. 2
      test/core/util/port_posix.c
  47. 3
      test/core/util/reconnect_server.c
  48. 235
      test/cpp/end2end/async_end2end_test.cc
  49. 17
      tools/run_tests/sources_and_headers.json
  50. 18
      tools/run_tests/tests.json
  51. 14
      vsprojects/Grpc.mak

File diff suppressed because one or more lines are too long

@ -1365,6 +1365,20 @@
"gpr" "gpr"
] ]
}, },
{
"name": "grpc_channel_args_test",
"build": "test",
"language": "c",
"src": [
"test/core/channel/channel_args_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
},
{ {
"name": "grpc_channel_stack_test", "name": "grpc_channel_stack_test",
"build": "test", "build": "test",
@ -2594,13 +2608,9 @@
"grpc++_test_util", "grpc++_test_util",
"grpc_test_util", "grpc_test_util",
"grpc++", "grpc++",
"grpc_zookeeper",
"grpc", "grpc",
"gpr_test_util", "gpr_test_util",
"gpr" "gpr"
],
"external_deps": [
"zookeeper"
] ]
}, },
{ {

@ -541,8 +541,7 @@ class CallOpSet : public CallOpSetInterface,
template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class SneakyCallOpSet GRPC_FINAL class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
: public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public: public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;

@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler {
// Handle unknown method by returning UNIMPLEMENTED error. // Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler { class UnknownMethodHandler : public MethodHandler {
public: public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL { template <class T>
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, ""); Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; if (!context->sent_initial_metadata_) {
if (!param.server_context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_); context->sent_initial_metadata_ = true;
} }
ops.ServerSendStatus(param.server_context->trailing_metadata_, status); ops->ServerSendStatus(context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops); param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops); param.call->cq()->Pluck(&ops);
} }

@ -98,7 +98,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
// Add a listening port. Can be called multiple times. // Add a listening port. Can be called multiple times.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
// Start the server. // Start the server.
bool Start(); bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
void HandleQueueClosed(); void HandleQueueClosed();
void RunRpc(); void RunRpc();
@ -112,7 +112,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public: public:
BaseAsyncRequest(Server* server, ServerContext* context, BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag); CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest(); virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@ -123,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerAsyncStreamingInterface* const stream_; ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_; CompletionQueue* const call_cq_;
void* const tag_; void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_; grpc_call* call_;
grpc_metadata_array initial_metadata_array_; grpc_metadata_array initial_metadata_array_;
}; };
@ -184,12 +186,13 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
Message* const request_; Message* const request_;
}; };
class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest { class GenericAsyncRequest : public BaseAsyncRequest {
public: public:
GenericAsyncRequest(Server* server, GenericServerContext* context, GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag); ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@ -197,6 +200,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
grpc_call_details call_details_; grpc_call_details call_details_;
}; };
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
template <class Message> template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context, void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
@ -221,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerCompletionQueue* notification_cq, ServerCompletionQueue* notification_cq,
void* tag) { void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag); tag, true);
} }
const int max_message_size_; const int max_message_size_;

@ -97,8 +97,8 @@ class ServerBuilder {
int* selected_port = nullptr); int* selected_port = nullptr);
// Add a completion queue for handling asynchronous services // Add a completion queue for handling asynchronous services
// Caller is required to keep this completion queue live until calling // Caller is required to keep this completion queue live until
// BuildAndStart() // the server is destroyed.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
// Return a running server which is ready for processing rpcs. // Return a running server which is ready for processing rpcs.

@ -761,6 +761,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
} }
private: private:
friend class ::grpc::Server;
void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
Call call_; Call call_;

@ -36,12 +36,15 @@
#include <stdlib.h> #include <stdlib.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/** To be used in channel arguments */ /** To be used in channel arguments */
#define GRPC_COMPRESSION_ALGORITHM_ARG "grpc.compression_algorithm" #define GRPC_COMPRESSION_ALGORITHM_ARG "grpc.compression_algorithm"
#define GRPC_COMPRESSION_ALGORITHM_STATE_ARG "grpc.compression_algorithm_state"
/* The various compression algorithms supported by GRPC */ /* The various compression algorithms supported by GRPC */
typedef enum { typedef enum {
@ -60,6 +63,11 @@ typedef enum {
GRPC_COMPRESS_LEVEL_COUNT GRPC_COMPRESS_LEVEL_COUNT
} grpc_compression_level; } grpc_compression_level;
typedef struct grpc_compression_options {
gpr_uint32 enabled_algorithms_bitset; /**< All algs are enabled by default */
grpc_compression_algorithm default_compression_algorithm; /**< for channel */
} grpc_compression_options;
/** Parses the first \a name_length bytes of \a name as a /** Parses the first \a name_length bytes of \a name as a
* grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon * grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon
* success, 0 otherwise. */ * success, 0 otherwise. */
@ -67,9 +75,7 @@ int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm); grpc_compression_algorithm *algorithm);
/** Updates \a name with the encoding name corresponding to a valid \a /** Updates \a name with the encoding name corresponding to a valid \a
* algorithm. Note that the string returned through \a name upon success is * algorithm. Returns 1 upon success, 0 otherwise. */
* statically allocated and shouldn't be freed. Returns 1 upon success, 0
* otherwise. */
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name); char **name);
@ -85,6 +91,20 @@ grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level); grpc_compression_level level);
void grpc_compression_options_init(grpc_compression_options *opts);
/** Mark \a algorithm as enabled in \a opts. */
void grpc_compression_options_enable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Mark \a algorithm as disabled in \a opts. */
void grpc_compression_options_disable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Returns true if \a algorithm is marked as enabled in \a opts. */
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts, grpc_compression_algorithm algorithm);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -275,12 +275,18 @@ int grpc_auth_context_set_peer_identity_property_name(grpc_auth_context *ctx,
/* --- Auth Metadata Processing --- */ /* --- Auth Metadata Processing --- */
/* Callback function that is called when the metadata processing is done. /* Callback function that is called when the metadata processing is done.
success is 1 if processing succeeded, 0 otherwise. - Consumed metadata will be removed from the set of metadata available on the
Consumed metadata will be removed from the set of metadata available on the call. consumed_md may be NULL if no metadata has been consumed.
call. */ - Response metadata will be set on the response. response_md may be NULL.
- status is GRPC_STATUS_OK for success or a specific status for an error.
Common error status for auth metadata processing is either
GRPC_STATUS_UNAUTHENTICATED in case of an authentication failure or
GRPC_STATUS PERMISSION_DENIED in case of an authorization failure.
- error_details gives details about the error. May be NULL. */
typedef void (*grpc_process_auth_metadata_done_cb)( typedef void (*grpc_process_auth_metadata_done_cb)(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
int success); const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details);
/* Pluggable server-side metadata processor object. */ /* Pluggable server-side metadata processor object. */
typedef struct { typedef struct {

@ -37,6 +37,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include <string.h> #include <string.h>
@ -146,3 +147,65 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
tmp.value.integer = algorithm; tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1); return grpc_channel_args_copy_and_add(a, &tmp, 1);
} }
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
static int find_compression_algorithm_states_bitset(
const grpc_channel_args *a, int **states_arg) {
if (a != NULL) {
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
}
}
return 0; /* GPR_FALSE */
}
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args **a,
grpc_compression_algorithm algorithm,
int state) {
int *states_arg;
grpc_channel_args *result = *a;
const int states_arg_found =
find_compression_algorithm_states_bitset(*a, &states_arg);
if (states_arg_found) {
if (state != 0) {
GPR_BITSET(states_arg, algorithm);
} else {
GPR_BITCLEAR(states_arg, algorithm);
}
} else {
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
GPR_BITSET(&tmp.value.integer, algorithm);
} else {
GPR_BITCLEAR(&tmp.value.integer, algorithm);
}
result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
grpc_channel_args_destroy(*a);
*a = result;
}
return result;
}
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a) {
int *states_arg;
if (find_compression_algorithm_states_bitset(a, &states_arg)) {
return *states_arg;
} else {
return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
}
}

@ -67,4 +67,24 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
grpc_channel_args *grpc_channel_args_set_compression_algorithm( grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm); grpc_channel_args *a, grpc_compression_algorithm algorithm);
/** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. It's an error to disable an algorithm set
* by grpc_channel_args_set_compression_algorithm.
*
* Returns an instance will the updated algorithm states. The \a a pointer is
* modified to point to the returned instance (which may be different from the
* input value of \a a). */
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args **a,
grpc_compression_algorithm algorithm,
int enabled);
/** Returns the bitset representing the support state (true for enabled, false
* for disabled) for compression algorithms.
*
* The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */

@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed. not be released by grpc_pollset_work AFTER worker has been destroyed.
Returns true if some work has been done, and false if the deadline Tries not to block past deadline. */
expired. */ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline);
gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset. /* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.

@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN; pfds[1].events = POLLIN;
pfds[1].revents = 0; pfds[1].revents = 0;
poll_rv = poll(pfds, 2, timeout_ms); poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
if (poll_rv < 0) { if (poll_rv < 0) {
if (errno != EINTR) { if (errno != EINTR) {

@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]); POLLOUT, &watchers[i]);
} }
r = poll(pfds, pfd_count, timeout); r = grpc_poll_function(pfds, pfd_count, timeout);
for (i = 1; i < pfd_count; i++) { for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,

@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/pollset_posix.h"
#include <errno.h> #include <errno.h>
#include <poll.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker); GPR_TLS_DECL(g_current_thread_worker);
grpc_poll_function_type grpc_poll_function = poll;
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next; worker->prev->next = worker->next;
worker->next->prev = worker->prev; worker->next->prev = worker->prev;
@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
} }
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
/* pollset->mu already held */
if (specific_worker != NULL) { if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next; for (specific_worker = p->root_worker.next;
@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg); pollset->shutdown_done_cb(pollset->shutdown_done_arg);
} }
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) { gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */ /* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0; int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
/* this must happen before we (potentially) drop pollset->mu */ /* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL; worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */ /* TODO(ctiller): pool these */
@ -217,7 +215,6 @@ done:
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
} }
} }
return 1;
} }
void grpc_pollset_shutdown(grpc_pollset *pollset, void grpc_pollset_shutdown(grpc_pollset *pollset,
@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* poll fd count (argument 2) is shortened by one if we have no events /* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */ to poll on - such that it only includes the kicker */
r = poll(pfd, nfds, timeout); r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) { if (fd) {

@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <poll.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h" #include "src/core/iomgr/wakeup_fd_posix.h"
@ -118,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */ * be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset); int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -99,14 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->mu);
} }
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) { gpr_timespec now, gpr_timespec deadline) {
gpr_timespec now;
int added_worker = 0; int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
worker->next = worker->prev = NULL; worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv); gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@ -127,7 +122,6 @@ done:
if (added_worker) { if (added_worker) {
remove_worker(pollset, worker); remove_worker(pollset, worker);
} }
return 1 /* GPR_TRUE */;
} }
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {

@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) { while (!detector.is_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker, grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)); gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));

@ -104,24 +104,34 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md; return md;
} }
static void on_md_processing_done(void *user_data, static void on_md_processing_done(
const grpc_metadata *consumed_md, void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
size_t num_consumed_md, int success) { const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data; grpc_call_element *elem = user_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (success) { /* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md; calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md; calld->num_consumed_md = num_consumed_md;
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md, grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem); elem);
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
} else { } else {
gpr_slice message = gpr_slice_from_copied_string( gpr_slice message;
"Authentication metadata processing failed."); error_details = error_details != NULL
? error_details
: "Authentication metadata processing failed.";
message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops); grpc_sopb_reset(calld->recv_ops);
grpc_transport_stream_op_add_close(&calld->transport_op, grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
GRPC_STATUS_UNAUTHENTICATED, &message);
grpc_call_next_op(elem, &calld->transport_op); grpc_call_next_op(elem, &calld->transport_op);
} }
} }

@ -170,6 +170,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) { gpr_timespec deadline, void *reserved) {
grpc_event ret; grpc_event ret;
grpc_pollset_worker worker; grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -196,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN; ret.type = GRPC_QUEUE_SHUTDOWN;
break; break;
} }
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
} }
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next"); GRPC_CQ_INTERNAL_UNREF(cc, "next");
@ -239,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c; grpc_cq_completion *c;
grpc_cq_completion *prev; grpc_cq_completion *prev;
grpc_pollset_worker worker; grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker); del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
del_plucker(cc, tag, &worker); del_plucker(cc, tag, &worker);
} }
done: done:

@ -50,6 +50,52 @@
namespace grpc { namespace grpc {
class Server::UnimplementedAsyncRequestContext {
protected:
UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
GenericServerContext server_context_;
GenericServerAsyncReaderWriter generic_stream_;
};
class Server::UnimplementedAsyncRequest GRPC_FINAL
: public UnimplementedAsyncRequestContext,
public GenericAsyncRequest {
public:
UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
: GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
NULL, false),
server_(server),
cq_(cq) {}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
ServerContext* context() { return &server_context_; }
GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
private:
Server* const server_;
ServerCompletionQueue* const cq_;
};
typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse GRPC_FINAL
: public UnimplementedAsyncResponseOp {
public:
UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse() { delete request_; }
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
delete this;
return r;
}
private:
UnimplementedAsyncRequest* const request_;
};
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public: public:
bool FinalizeResult(void** tag, bool* status) { bool FinalizeResult(void** tag, bool* status) {
@ -297,18 +343,23 @@ int Server::AddListeningPort(const grpc::string& addr,
return creds->AddPortToServer(addr, server_); return creds->AddPortToServer(addr, server_);
} }
bool Server::Start() { bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_); GPR_ASSERT(!started_);
started_ = true; started_ = true;
grpc_server_start(server_); grpc_server_start(server_);
if (!has_generic_service_) { if (!has_generic_service_) {
unknown_method_.reset(new RpcServiceMethod( if (!sync_methods_->empty()) {
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); unknown_method_.reset(new RpcServiceMethod(
// Use of emplace_back with just constructor arguments is not accepted here "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
// by gcc-4.4 because it can't match the anonymous nullptr with a proper // Use of emplace_back with just constructor arguments is not accepted
// constructor implicitly. Construct the object and use push_back. // here by gcc-4.4 because it can't match the anonymous nullptr with a
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); // proper constructor implicitly. Construct the object and use push_back.
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
}
for (size_t i = 0; i < num_cqs; i++) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
} }
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
@ -370,12 +421,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
Server::BaseAsyncRequest::BaseAsyncRequest( Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context, Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server), : server_(server),
context_(context), context_(context),
stream_(stream), stream_(stream),
call_cq_(call_cq), call_cq_(call_cq),
tag_(tag), tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr) { call_(nullptr) {
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
} }
@ -402,14 +455,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
// just the pointers inside call are copied here // just the pointers inside call are copied here
stream_->BindCall(&call); stream_->BindCall(&call);
*tag = tag_; *tag = tag_;
delete this; if (delete_on_finalize_) {
delete this;
}
return true; return true;
} }
Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context, Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag) {} : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest( void Server::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload, void* registered_method, grpc_byte_buffer** payload,
@ -423,8 +478,9 @@ void Server::RegisteredAsyncRequest::IssueRequest(
Server::GenericAsyncRequest::GenericAsyncRequest( Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context, Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag) { : BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
grpc_call_details_init(&call_details_); grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq); GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq); GPR_ASSERT(call_cq);
@ -445,6 +501,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
return BaseAsyncRequest::FinalizeResult(tag, status); return BaseAsyncRequest::FinalizeResult(tag, status);
} }
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
new UnimplementedAsyncRequest(server_, cq_);
new UnimplementedAsyncResponse(this);
} else {
delete this;
}
return false;
}
Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
void Server::ScheduleCallback() { void Server::ScheduleCallback() {
{ {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);

@ -99,12 +99,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool(); thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true; thread_pool_owned = true;
} }
// Async services only, create a thread pool to handle requests to unknown
// services.
if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
thread_pool_ = new FixedSizeThreadPool(1);
thread_pool_owned = true;
}
std::unique_ptr<Server> server( std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_)); new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@ -134,7 +128,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r; *port->selected_port = r;
} }
} }
if (!server->Start()) { if (!server->Start(&cqs_[0], cqs_.size())) {
return nullptr; return nullptr;
} }
return server; return server;

@ -146,6 +146,7 @@ typedef struct Server {
PyObject_HEAD PyObject_HEAD
grpc_server *c_serv; grpc_server *c_serv;
CompletionQueue *cq; CompletionQueue *cq;
int shutdown_called;
} Server; } Server;
Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs); Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs);
void pygrpc_Server_dealloc(Server *self); void pygrpc_Server_dealloc(Server *self);
@ -156,6 +157,7 @@ PyObject *pygrpc_Server_add_http2_port(
PyObject *pygrpc_Server_start(Server *self, PyObject *ignored); PyObject *pygrpc_Server_start(Server *self, PyObject *ignored);
PyObject *pygrpc_Server_shutdown( PyObject *pygrpc_Server_shutdown(
Server *self, PyObject *args, PyObject *kwargs); Server *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Server_cancel_all_calls(Server *self, PyObject *unused);
extern PyTypeObject pygrpc_Server_type; extern PyTypeObject pygrpc_Server_type;
/*=========*/ /*=========*/

@ -45,6 +45,8 @@ PyMethodDef pygrpc_Server_methods[] = {
METH_KEYWORDS, ""}, METH_KEYWORDS, ""},
{"start", (PyCFunction)pygrpc_Server_start, METH_NOARGS, ""}, {"start", (PyCFunction)pygrpc_Server_start, METH_NOARGS, ""},
{"shutdown", (PyCFunction)pygrpc_Server_shutdown, METH_KEYWORDS, ""}, {"shutdown", (PyCFunction)pygrpc_Server_shutdown, METH_KEYWORDS, ""},
{"cancel_all_calls", (PyCFunction)pygrpc_Server_cancel_all_calls,
METH_NOARGS, ""},
{NULL} {NULL}
}; };
const char pygrpc_Server_doc[] = "See grpc._adapter._types.Server."; const char pygrpc_Server_doc[] = "See grpc._adapter._types.Server.";
@ -109,6 +111,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
pygrpc_discard_channel_args(c_args); pygrpc_discard_channel_args(c_args);
self->cq = cq; self->cq = cq;
Py_INCREF(self->cq); Py_INCREF(self->cq);
self->shutdown_called = 0;
return self; return self;
} }
@ -163,6 +166,7 @@ PyObject *pygrpc_Server_add_http2_port(
PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) { PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) {
grpc_server_start(self->c_serv); grpc_server_start(self->c_serv);
self->shutdown_called = 0;
Py_RETURN_NONE; Py_RETURN_NONE;
} }
@ -176,5 +180,17 @@ PyObject *pygrpc_Server_shutdown(
} }
tag = pygrpc_produce_server_shutdown_tag(user_tag); tag = pygrpc_produce_server_shutdown_tag(user_tag);
grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag); grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag);
self->shutdown_called = 1;
Py_RETURN_NONE;
}
PyObject *pygrpc_Server_cancel_all_calls(Server *self, PyObject *unused) {
if (!self->shutdown_called) {
PyErr_SetString(
PyExc_RuntimeError,
"shutdown must have been called prior to calling cancel_all_calls!");
return NULL;
}
grpc_server_cancel_all_calls(self->c_serv);
Py_RETURN_NONE; Py_RETURN_NONE;
} }

@ -124,3 +124,6 @@ class Server(_types.Server):
def request_call(self, completion_queue, tag): def request_call(self, completion_queue, tag):
return self.server.request_call(completion_queue.completion_queue, tag) return self.server.request_call(completion_queue.completion_queue, tag)
def cancel_all_calls(self):
return self.server.cancel_all_calls()

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,933 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces defining the Face layer of RPC Framework."""
import abc
import collections
import enum
# cardinality, style, abandonment, future, and stream are
# referenced from specification in this module.
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.common import style # pylint: disable=unused-import
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
from grpc.framework.foundation import future # pylint: disable=unused-import
from grpc.framework.foundation import stream # pylint: disable=unused-import
class NoSuchMethodError(Exception):
"""Raised by customer code to indicate an unrecognized method.
Attributes:
group: The group of the unrecognized method.
name: The name of the unrecognized method.
"""
def __init__(self, group, method):
"""Constructor.
Args:
group: The group identifier of the unrecognized RPC name.
method: The method identifier of the unrecognized RPC name.
"""
super(NoSuchMethodError, self).__init__()
self.group = group
self.method = method
def __repr__(self):
return 'face.NoSuchMethodError(%s, %s)' % (self.group, self.method,)
class Abortion(
collections.namedtuple(
'Abortion',
('kind', 'initial_metadata', 'terminal_metadata', 'code', 'details',))):
"""A value describing RPC abortion.
Attributes:
kind: A Kind value identifying how the RPC failed.
initial_metadata: The initial metadata from the other side of the RPC or
None if no initial metadata value was received.
terminal_metadata: The terminal metadata from the other side of the RPC or
None if no terminal metadata value was received.
code: The code value from the other side of the RPC or None if no code value
was received.
details: The details value from the other side of the RPC or None if no
details value was received.
"""
@enum.unique
class Kind(enum.Enum):
"""Types of RPC abortion."""
CANCELLED = 'cancelled'
EXPIRED = 'expired'
LOCAL_SHUTDOWN = 'local shutdown'
REMOTE_SHUTDOWN = 'remote shutdown'
NETWORK_FAILURE = 'network failure'
LOCAL_FAILURE = 'local failure'
REMOTE_FAILURE = 'remote failure'
class AbortionError(Exception):
"""Common super type for exceptions indicating RPC abortion.
initial_metadata: The initial metadata from the other side of the RPC or
None if no initial metadata value was received.
terminal_metadata: The terminal metadata from the other side of the RPC or
None if no terminal metadata value was received.
code: The code value from the other side of the RPC or None if no code value
was received.
details: The details value from the other side of the RPC or None if no
details value was received.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, initial_metadata, terminal_metadata, code, details):
super(AbortionError, self).__init__()
self.initial_metadata = initial_metadata
self.terminal_metadata = terminal_metadata
self.code = code
self.details = details
class CancellationError(AbortionError):
"""Indicates that an RPC has been cancelled."""
class ExpirationError(AbortionError):
"""Indicates that an RPC has expired ("timed out")."""
class LocalShutdownError(AbortionError):
"""Indicates that an RPC has terminated due to local shutdown of RPCs."""
class RemoteShutdownError(AbortionError):
"""Indicates that an RPC has terminated due to remote shutdown of RPCs."""
class NetworkError(AbortionError):
"""Indicates that some error occurred on the network."""
class LocalError(AbortionError):
"""Indicates that an RPC has terminated due to a local defect."""
class RemoteError(AbortionError):
"""Indicates that an RPC has terminated due to a remote defect."""
class RpcContext(object):
"""Provides RPC-related information and action."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Describes whether the RPC is active or has terminated."""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have timed
out.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_abortion_callback(self, abortion_callback):
"""Registers a callback to be called if the RPC is aborted.
Args:
abortion_callback: A callable to be called and passed an Abortion value
in the event of RPC abortion.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
"""Cancels the RPC.
Idempotent and has no effect if the RPC has already terminated.
"""
raise NotImplementedError()
class Call(RpcContext):
"""Invocation-side utility object for an RPC."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def initial_metadata(self):
"""Accesses the initial metadata from the service-side of the RPC.
This method blocks until the value is available or is known not to have been
emitted from the service-side of the RPC.
Returns:
The initial metadata object emitted by the service-side of the RPC, or
None if there was no such value.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminal_metadata(self):
"""Accesses the terminal metadata from the service-side of the RPC.
This method blocks until the value is available or is known not to have been
emitted from the service-side of the RPC.
Returns:
The terminal metadata object emitted by the service-side of the RPC, or
None if there was no such value.
"""
raise NotImplementedError()
@abc.abstractmethod
def code(self):
"""Accesses the code emitted by the service-side of the RPC.
This method blocks until the value is available or is known not to have been
emitted from the service-side of the RPC.
Returns:
The code object emitted by the service-side of the RPC, or None if there
was no such value.
"""
raise NotImplementedError()
@abc.abstractmethod
def details(self):
"""Accesses the details value emitted by the service-side of the RPC.
This method blocks until the value is available or is known not to have been
emitted from the service-side of the RPC.
Returns:
The details value emitted by the service-side of the RPC, or None if there
was no such value.
"""
raise NotImplementedError()
class ServicerContext(RpcContext):
"""A context object passed to method implementations."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def invocation_metadata(self):
"""Accesses the metadata from the invocation-side of the RPC.
This method blocks until the value is available or is known not to have been
emitted from the invocation-side of the RPC.
Returns:
The metadata object emitted by the invocation-side of the RPC, or None if
there was no such value.
"""
raise NotImplementedError()
@abc.abstractmethod
def initial_metadata(self, initial_metadata):
"""Accepts the service-side initial metadata value of the RPC.
This method need not be called by method implementations if they have no
service-side initial metadata to transmit.
Args:
initial_metadata: The service-side initial metadata value of the RPC to
be transmitted to the invocation side of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminal_metadata(self, terminal_metadata):
"""Accepts the service-side terminal metadata value of the RPC.
This method need not be called by method implementations if they have no
service-side terminal metadata to transmit.
Args:
terminal_metadata: The service-side terminal metadata value of the RPC to
be transmitted to the invocation side of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def code(self, code):
"""Accepts the service-side code of the RPC.
This method need not be called by method implementations if they have no
code to transmit.
Args:
code: The code of the RPC to be transmitted to the invocation side of the
RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def details(self, details):
"""Accepts the service-side details of the RPC.
This method need not be called by method implementations if they have no
service-side details to transmit.
Args:
details: The service-side details value of the RPC to be transmitted to
the invocation side of the RPC.
"""
raise NotImplementedError()
class ResponseReceiver(object):
"""Invocation-side object used to accept the output of an RPC."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def initial_metadata(self, initial_metadata):
"""Receives the initial metadata from the service-side of the RPC.
Args:
initial_metadata: The initial metadata object emitted from the
service-side of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def response(self, response):
"""Receives a response from the service-side of the RPC.
Args:
response: A response object emitted from the service-side of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def complete(self, terminal_metadata, code, details):
"""Receives the completion values emitted from the service-side of the RPC.
Args:
terminal_metadata: The terminal metadata object emitted from the
service-side of the RPC.
code: The code object emitted from the service-side of the RPC.
details: The details object emitted from the service-side of the RPC.
"""
raise NotImplementedError()
class UnaryUnaryMultiCallable(object):
"""Affords invoking a unary-unary RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(
self, request, timeout, metadata=None, with_call=False):
"""Synchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
set to True at invocation.
Raises:
AbortionError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future(self, request, timeout, metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
event of RPC completion, the return Future's result value will be the
response value of the RPC. In the event of RPC abortion, the returned
Future's exception value will be an AbortionError.
"""
raise NotImplementedError()
@abc.abstractmethod
def event(
self, request, receiver, abortion_callback, timeout,
metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
A Call for the RPC.
"""
raise NotImplementedError()
class UnaryStreamMultiCallable(object):
"""Affords invoking a unary-stream RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request, timeout, metadata=None):
"""Invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned iterator may raise
AbortionError indicating abortion of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event(
self, request, receiver, abortion_callback, timeout,
metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
A Call object for the RPC.
"""
raise NotImplementedError()
class StreamUnaryMultiCallable(object):
"""Affords invoking a stream-unary RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(
self, request_iterator, timeout, metadata=None,
with_call=False):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
set to True at invocation.
Raises:
AbortionError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future(self, request_iterator, timeout, metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
event of RPC completion, the return Future's result value will be the
response value of the RPC. In the event of RPC abortion, the returned
Future's exception value will be an AbortionError.
"""
raise NotImplementedError()
@abc.abstractmethod
def event(
self, receiver, abortion_callback, timeout, metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
A single object that is both a Call object for the RPC and a
stream.Consumer to which the request values of the RPC should be passed.
"""
raise NotImplementedError()
class StreamStreamMultiCallable(object):
"""Affords invoking a stream-stream RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request_iterator, timeout, metadata=None):
"""Invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned iterator may raise
AbortionError indicating abortion of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event(
self, receiver, abortion_callback, timeout, metadata=None):
"""Asynchronously invokes the underlying RPC.
Args:
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
Returns:
A single object that is both a Call object for the RPC and a
stream.Consumer to which the request values of the RPC should be passed.
"""
raise NotImplementedError()
class MethodImplementation(object):
"""A sum type that describes a method implementation.
Attributes:
cardinality: A cardinality.Cardinality value.
style: A style.Service value.
unary_unary_inline: The implementation of the method as a callable value
that takes a request value and a ServicerContext object and returns a
response value. Only non-None if cardinality is
cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
unary_stream_inline: The implementation of the method as a callable value
that takes a request value and a ServicerContext object and returns an
iterator of response values. Only non-None if cardinality is
cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
stream_unary_inline: The implementation of the method as a callable value
that takes an iterator of request values and a ServicerContext object and
returns a response value. Only non-None if cardinality is
cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
stream_stream_inline: The implementation of the method as a callable value
that takes an iterator of request values and a ServicerContext object and
returns an iterator of response values. Only non-None if cardinality is
cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
unary_unary_event: The implementation of the method as a callable value that
takes a request value, a response callback to which to pass the response
value of the RPC, and a ServicerContext. Only non-None if cardinality is
cardinality.Cardinality.UNARY_UNARY and style is style.Service.EVENT.
unary_stream_event: The implementation of the method as a callable value
that takes a request value, a stream.Consumer to which to pass the
response values of the RPC, and a ServicerContext. Only non-None if
cardinality is cardinality.Cardinality.UNARY_STREAM and style is
style.Service.EVENT.
stream_unary_event: The implementation of the method as a callable value
that takes a response callback to which to pass the response value of the
RPC and a ServicerContext and returns a stream.Consumer to which the
request values of the RPC should be passed. Only non-None if cardinality
is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
stream_stream_event: The implementation of the method as a callable value
that takes a stream.Consumer to which to pass the response values of the
RPC and a ServicerContext and returns a stream.Consumer to which the
request values of the RPC should be passed. Only non-None if cardinality
is cardinality.Cardinality.STREAM_STREAM and style is
style.Service.EVENT.
"""
__metaclass__ = abc.ABCMeta
class MultiMethodImplementation(object):
"""A general type able to service many methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, group, method, response_consumer, context):
"""Services an RPC.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: a ServicerContext object.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
NoSuchMethodError: If this MultiMethod does not recognize the given group
and name for the RPC and is not able to service the RPC.
"""
raise NotImplementedError()
class GenericStub(object):
"""Affords RPC invocation via generic methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def blocking_unary_unary(
self, group, method, request, timeout, metadata=None,
with_call=False):
"""Invokes a unary-request-unary-response method.
This method blocks until either returning the response value of the RPC
(in the event of RPC completion) or raising an exception (in the event of
RPC abortion).
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
set to True at invocation.
Raises:
AbortionError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future_unary_unary(
self, group, method, request, timeout, metadata=None):
"""Invokes a unary-request-unary-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
event of RPC completion, the return Future's result value will be the
response value of the RPC. In the event of RPC abortion, the returned
Future's exception value will be an AbortionError.
"""
raise NotImplementedError()
@abc.abstractmethod
def inline_unary_stream(
self, group, method, request, timeout, metadata=None):
"""Invokes a unary-request-stream-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned iterator may raise
AbortionError indicating abortion of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None,
with_call=False):
"""Invokes a stream-request-unary-response method.
This method blocks until either returning the response value of the RPC
(in the event of RPC completion) or raising an exception (in the event of
RPC abortion).
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
set to True at invocation.
Raises:
AbortionError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future_stream_unary(
self, group, method, request_iterator, timeout, metadata=None):
"""Invokes a stream-request-unary-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
event of RPC completion, the return Future's result value will be the
response value of the RPC. In the event of RPC abortion, the returned
Future's exception value will be an AbortionError.
"""
raise NotImplementedError()
@abc.abstractmethod
def inline_stream_stream(
self, group, method, request_iterator, timeout, metadata=None):
"""Invokes a stream-request-stream-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned iterator may raise
AbortionError indicating abortion of the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout,
metadata=None):
"""Event-driven invocation of a unary-request-unary-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request: The request value for the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
A Call for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout,
metadata=None):
"""Event-driven invocation of a unary-request-stream-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
request: The request value for the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
A Call for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout,
metadata=None):
"""Event-driven invocation of a unary-request-unary-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout,
metadata=None):
"""Event-driven invocation of a unary-request-stream-response method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
receiver: A ResponseReceiver to be passed the response data of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_unary(self, group, method):
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
Returns:
A UnaryUnaryMultiCallable value for the named unary-unary method.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_stream(self, group, method):
"""Creates a UnaryStreamMultiCallable for a unary-stream method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
Returns:
A UnaryStreamMultiCallable value for the name unary-stream method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary(self, group, method):
"""Creates a StreamUnaryMultiCallable for a stream-unary method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
Returns:
A StreamUnaryMultiCallable value for the named stream-unary method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_stream(self, group, method):
"""Creates a StreamStreamMultiCallable for a stream-stream method.
Args:
group: The group identifier of the RPC.
method: The method identifier of the RPC.
Returns:
A StreamStreamMultiCallable value for the named stream-stream method.
"""
raise NotImplementedError()
class DynamicStub(object):
"""Affords RPC invocation via attributes corresponding to afforded methods.
Instances of this type may be scoped to a single group so that attribute
access is unambiguous.
Instances of this type respond to attribute access as follows: if the
requested attribute is the name of a unary-unary method, the value of the
attribute will be a UnaryUnaryMultiCallable with which to invoke an RPC; if
the requested attribute is the name of a unary-stream method, the value of the
attribute will be a UnaryStreamMultiCallable with which to invoke an RPC; if
the requested attribute is the name of a stream-unary method, the value of the
attribute will be a StreamUnaryMultiCallable with which to invoke an RPC; and
if the requested attribute is the name of a stream-stream method, the value of
the attribute will be a StreamStreamMultiCallable with which to invoke an RPC.
"""
__metaclass__ = abc.ABCMeta

@ -0,0 +1,178 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for RPC Framework's Face interface."""
import collections
# stream is referenced from specification in this module.
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.foundation import stream # pylint: disable=unused-import
from grpc.framework.interfaces.face import face
class _MethodImplementation(
face.MethodImplementation,
collections.namedtuple(
'_MethodImplementation',
['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
pass
def unary_unary_inline(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable value
that takes a request value and an face.ServicerContext object and
returns a response value.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
None, None, None, None, None, None, None)
def unary_stream_inline(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value and an face.ServicerContext object and
returns an iterator of response values.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
behavior, None, None, None, None, None, None)
def stream_unary_inline(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes an iterator of request values and an
face.ServicerContext object and returns a response value.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
behavior, None, None, None, None, None)
def stream_stream_inline(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes an iterator of request values and an
face.ServicerContext object and returns an iterator of response values.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
None, behavior, None, None, None, None)
def unary_unary_event(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value, a response callback to which to pass
the response value of the RPC, and an face.ServicerContext.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
None, None, behavior, None, None, None)
def unary_stream_event(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and an face.ServicerContext.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
None, None, None, behavior, None, None)
def stream_unary_event(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and an face.ServicerContext and returns a stream.Consumer to
which the request values of the RPC should be passed.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
None, None, None, None, behavior, None)
def stream_stream_event(behavior):
"""Creates an face.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and an face.ServicerContext and returns a stream.Consumer to
which the request values of the RPC should be passed.
Returns:
An face.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
None, None, None, None, None, behavior)

@ -52,7 +52,6 @@ def wait_for_events(completion_queues, deadline):
def set_ith_result(i, completion_queue): def set_ith_result(i, completion_queue):
result = completion_queue.next(deadline) result = completion_queue.next(deadline)
with lock: with lock:
print i, completion_queue, result, time.time() - deadline
results[i] = result results[i] = result
for i, completion_queue in enumerate(completion_queues): for i, completion_queue in enumerate(completion_queues):
thread = threading.Thread(target=set_ith_result, thread = threading.Thread(target=set_ith_result,
@ -80,10 +79,12 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.client_channel del self.client_channel
self.client_completion_queue.shutdown() self.client_completion_queue.shutdown()
while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN: while (self.client_completion_queue.next().type !=
_types.EventType.QUEUE_SHUTDOWN):
pass pass
self.server_completion_queue.shutdown() self.server_completion_queue.shutdown()
while self.server_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN: while (self.server_completion_queue.next().type !=
_types.EventType.QUEUE_SHUTDOWN):
pass pass
del self.client_completion_queue del self.client_completion_queue
@ -91,58 +92,68 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.server del self.server
def testEcho(self): def testEcho(self):
DEADLINE = time.time()+5 deadline = time.time() + 5
DEADLINE_TOLERANCE = 0.25 event_time_tolerance = 2
CLIENT_METADATA_ASCII_KEY = 'key' deadline_tolerance = 0.25
CLIENT_METADATA_ASCII_VALUE = 'val' client_metadata_ascii_key = 'key'
CLIENT_METADATA_BIN_KEY = 'key-bin' client_metadata_ascii_value = 'val'
CLIENT_METADATA_BIN_VALUE = b'\0'*1000 client_metadata_bin_key = 'key-bin'
SERVER_INITIAL_METADATA_KEY = 'init_me_me_me' client_metadata_bin_value = b'\0'*1000
SERVER_INITIAL_METADATA_VALUE = 'whodawha?' server_initial_metadata_key = 'init_me_me_me'
SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' server_initial_metadata_value = 'whodawha?'
SERVER_TRAILING_METADATA_VALUE = 'zomg it is' server_trailing_metadata_key = 'california_is_in_a_drought'
SERVER_STATUS_CODE = _types.StatusCode.OK server_trailing_metadata_value = 'zomg it is'
SERVER_STATUS_DETAILS = 'our work is never over' server_status_code = _types.StatusCode.OK
REQUEST = 'in death a member of project mayhem has a name' server_status_details = 'our work is never over'
RESPONSE = 'his name is robert paulson' request = 'blarghaflargh'
METHOD = 'twinkies' response = 'his name is robert paulson'
HOST = 'hostess' method = 'twinkies'
host = 'hostess'
server_request_tag = object() server_request_tag = object()
request_call_result = self.server.request_call(self.server_completion_queue, server_request_tag) request_call_result = self.server.request_call(self.server_completion_queue,
server_request_tag)
self.assertEquals(_types.CallError.OK, request_call_result) self.assertEqual(_types.CallError.OK, request_call_result)
client_call_tag = object() client_call_tag = object()
client_call = self.client_channel.create_call(self.client_completion_queue, METHOD, HOST, DEADLINE) client_call = self.client_channel.create_call(
client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)] self.client_completion_queue, method, host, deadline)
client_initial_metadata = [
(client_metadata_ascii_key, client_metadata_ascii_value),
(client_metadata_bin_key, client_metadata_bin_value)
]
client_start_batch_result = client_call.start_batch([ client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata), _types.OpArgs.send_initial_metadata(client_initial_metadata),
_types.OpArgs.send_message(REQUEST, 0), _types.OpArgs.send_message(request, 0),
_types.OpArgs.send_close_from_client(), _types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(), _types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(), _types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client() _types.OpArgs.recv_status_on_client()
], client_call_tag) ], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result) self.assertEqual(_types.CallError.OK, client_start_batch_result)
client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2) client_no_event, request_event, = wait_for_events(
self.assertEquals(client_no_event, None) [self.client_completion_queue, self.server_completion_queue],
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) time.time() + event_time_tolerance)
self.assertEqual(client_no_event, None)
self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call) self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag) self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results)) self.assertEqual(1, len(request_event.results))
received_initial_metadata = dict(request_event.results[0].initial_metadata) received_initial_metadata = dict(request_event.results[0].initial_metadata)
# Check that our metadata were transmitted # Check that our metadata were transmitted
self.assertEquals( self.assertEqual(
dict(client_initial_metadata), dict(client_initial_metadata),
dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) dict((x, received_initial_metadata[x])
for x in zip(*client_initial_metadata)[0]))
# Check that Python's user agent string is a part of the full user agent # Check that Python's user agent string is a part of the full user agent
# string # string
self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
received_initial_metadata['user-agent']) received_initial_metadata['user-agent'])
self.assertEquals(METHOD, request_event.call_details.method) self.assertEqual(method, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host) self.assertEqual(host, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) self.assertLess(abs(deadline - request_event.call_details.deadline),
deadline_tolerance)
# Check that the channel is connected, and that both it and the call have # Check that the channel is connected, and that both it and the call have
# the proper target and peer; do this after the first flurry of messages to # the proper target and peer; do this after the first flurry of messages to
@ -155,33 +166,43 @@ class InsecureServerInsecureClient(unittest.TestCase):
server_call_tag = object() server_call_tag = object()
server_call = request_event.call server_call = request_event.call
server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)] server_initial_metadata = [
server_trailing_metadata = [(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)] (server_initial_metadata_key, server_initial_metadata_value)
]
server_trailing_metadata = [
(server_trailing_metadata_key, server_trailing_metadata_value)
]
server_start_batch_result = server_call.start_batch([ server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata), _types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(), _types.OpArgs.recv_message(),
_types.OpArgs.send_message(RESPONSE, 0), _types.OpArgs.send_message(response, 0),
_types.OpArgs.recv_close_on_server(), _types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) _types.OpArgs.send_status_from_server(
server_trailing_metadata, server_status_code, server_status_details)
], server_call_tag) ], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result) self.assertEqual(_types.CallError.OK, server_start_batch_result)
client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1) client_event, server_event, = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
self.assertEquals(6, len(client_event.results)) self.assertEqual(6, len(client_event.results))
found_client_op_types = set() found_client_op_types = set()
for client_result in client_event.results: for client_result in client_event.results:
self.assertNotIn(client_result.type, found_client_op_types) # we expect each op type to be unique # we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type) found_client_op_types.add(client_result.type)
if client_result.type == _types.OpType.RECV_INITIAL_METADATA: if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
self.assertEquals(dict(server_initial_metadata), dict(client_result.initial_metadata)) self.assertEqual(dict(server_initial_metadata),
dict(client_result.initial_metadata))
elif client_result.type == _types.OpType.RECV_MESSAGE: elif client_result.type == _types.OpType.RECV_MESSAGE:
self.assertEquals(RESPONSE, client_result.message) self.assertEqual(response, client_result.message)
elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT: elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
self.assertEquals(dict(server_trailing_metadata), dict(client_result.trailing_metadata)) self.assertEqual(dict(server_trailing_metadata),
self.assertEquals(SERVER_STATUS_DETAILS, client_result.status.details) dict(client_result.trailing_metadata))
self.assertEquals(SERVER_STATUS_CODE, client_result.status.code) self.assertEqual(server_status_details, client_result.status.details)
self.assertEquals(set([ self.assertEqual(server_status_code, client_result.status.code)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA, _types.OpType.SEND_INITIAL_METADATA,
_types.OpType.SEND_MESSAGE, _types.OpType.SEND_MESSAGE,
_types.OpType.SEND_CLOSE_FROM_CLIENT, _types.OpType.SEND_CLOSE_FROM_CLIENT,
@ -190,16 +211,16 @@ class InsecureServerInsecureClient(unittest.TestCase):
_types.OpType.RECV_STATUS_ON_CLIENT _types.OpType.RECV_STATUS_ON_CLIENT
]), found_client_op_types) ]), found_client_op_types)
self.assertEquals(5, len(server_event.results)) self.assertEqual(5, len(server_event.results))
found_server_op_types = set() found_server_op_types = set()
for server_result in server_event.results: for server_result in server_event.results:
self.assertNotIn(client_result.type, found_server_op_types) self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type) found_server_op_types.add(server_result.type)
if server_result.type == _types.OpType.RECV_MESSAGE: if server_result.type == _types.OpType.RECV_MESSAGE:
self.assertEquals(REQUEST, server_result.message) self.assertEqual(request, server_result.message)
elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER: elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
self.assertFalse(server_result.cancelled) self.assertFalse(server_result.cancelled)
self.assertEquals(set([ self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA, _types.OpType.SEND_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE, _types.OpType.RECV_MESSAGE,
_types.OpType.SEND_MESSAGE, _types.OpType.SEND_MESSAGE,
@ -211,5 +232,81 @@ class InsecureServerInsecureClient(unittest.TestCase):
del server_call del server_call
class HangingServerShutdown(unittest.TestCase):
def setUp(self):
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue, [])
self.port = self.server.add_http2_port('[::]:0')
self.client_completion_queue = _low.CompletionQueue()
self.client_channel = _low.Channel('localhost:%d'%self.port, [])
self.server.start()
def tearDown(self):
self.server.shutdown()
del self.client_channel
self.client_completion_queue.shutdown()
self.server_completion_queue.shutdown()
while True:
client_event, server_event = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
float("+inf"))
if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and
server_event.type == _types.EventType.QUEUE_SHUTDOWN):
break
del self.client_completion_queue
del self.server_completion_queue
del self.server
def testHangingServerCall(self):
deadline = time.time() + 5
deadline_tolerance = 0.25
event_time_tolerance = 2
cancel_all_calls_time_tolerance = 0.5
request = 'blarghaflargh'
method = 'twinkies'
host = 'hostess'
server_request_tag = object()
request_call_result = self.server.request_call(self.server_completion_queue,
server_request_tag)
client_call_tag = object()
client_call = self.client_channel.create_call(self.client_completion_queue,
method, host, deadline)
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata([]),
_types.OpArgs.send_message(request, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client()
], client_call_tag)
client_no_event, request_event, = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
# Now try to shutdown the server and expect that we see server shutdown
# almost immediately after calling cancel_all_calls.
with self.assertRaises(RuntimeError):
self.server.cancel_all_calls()
shutdown_tag = object()
self.server.shutdown(shutdown_tag)
pre_cancel_timestamp = time.time()
self.server.cancel_all_calls()
finish_shutdown_timestamp = None
client_call_event, server_shutdown_event = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
self.assertIs(shutdown_tag, server_shutdown_event.tag)
self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance,
time.time())
del client_call
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(verbosity=2) unittest.main(verbosity=2)

@ -0,0 +1,141 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <string.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/channel/channel_args.h"
#include "test/core/util/test_config.h"
static void test_create(void) {
grpc_arg arg_int;
grpc_arg arg_string;
grpc_arg to_add[2];
grpc_channel_args *ch_args;
arg_int.key = "int_arg";
arg_int.type = GRPC_ARG_INTEGER;
arg_int.value.integer = 123;
arg_string.key = "str key";
arg_string.type = GRPC_ARG_STRING;
arg_string.value.string = "str value";
to_add[0] = arg_int;
to_add[1] = arg_string;
ch_args = grpc_channel_args_copy_and_add(NULL, to_add, 2);
GPR_ASSERT(ch_args->num_args == 2);
GPR_ASSERT(strcmp(ch_args->args[0].key, arg_int.key) == 0);
GPR_ASSERT(ch_args->args[0].type == arg_int.type);
GPR_ASSERT(ch_args->args[0].value.integer == arg_int.value.integer);
GPR_ASSERT(strcmp(ch_args->args[1].key, arg_string.key) == 0);
GPR_ASSERT(ch_args->args[1].type == arg_string.type);
GPR_ASSERT(strcmp(ch_args->args[1].value.string, arg_string.value.string) ==
0);
grpc_channel_args_destroy(ch_args);
}
static void test_set_compression_algorithm(void) {
grpc_channel_args *ch_args;
ch_args =
grpc_channel_args_set_compression_algorithm(NULL, GRPC_COMPRESS_GZIP);
GPR_ASSERT(ch_args->num_args == 1);
GPR_ASSERT(strcmp(ch_args->args[0].key, GRPC_COMPRESSION_ALGORITHM_ARG) == 0);
GPR_ASSERT(ch_args->args[0].type == GRPC_ARG_INTEGER);
grpc_channel_args_destroy(ch_args);
}
static void test_compression_algorithm_states(void) {
grpc_channel_args *ch_args, *ch_args_wo_gzip, *ch_args_wo_gzip_deflate;
int states_bitset;
size_t i;
ch_args = grpc_channel_args_copy_and_add(NULL, NULL, 0);
/* by default, all enabled */
states_bitset = grpc_channel_args_compression_algorithm_get_states(ch_args);
for (i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
GPR_ASSERT(GPR_BITGET(states_bitset, i));
}
/* disable gzip and deflate */
ch_args_wo_gzip = grpc_channel_args_compression_algorithm_set_state(
&ch_args, GRPC_COMPRESS_GZIP, 0);
GPR_ASSERT(ch_args == ch_args_wo_gzip);
ch_args_wo_gzip_deflate = grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip, GRPC_COMPRESS_DEFLATE, 0);
GPR_ASSERT(ch_args_wo_gzip == ch_args_wo_gzip_deflate);
states_bitset = grpc_channel_args_compression_algorithm_get_states(
ch_args_wo_gzip_deflate);
for (i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_GZIP || i == GRPC_COMPRESS_DEFLATE) {
GPR_ASSERT(GPR_BITGET(states_bitset, i) == 0);
} else {
GPR_ASSERT(GPR_BITGET(states_bitset, i) != 0);
}
}
/* re-enabled gzip only */
ch_args_wo_gzip = grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip_deflate, GRPC_COMPRESS_GZIP, 1);
GPR_ASSERT(ch_args_wo_gzip == ch_args_wo_gzip_deflate);
states_bitset =
grpc_channel_args_compression_algorithm_get_states(ch_args_wo_gzip);
for (i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_DEFLATE) {
GPR_ASSERT(GPR_BITGET(states_bitset, i) == 0);
} else {
GPR_ASSERT(GPR_BITGET(states_bitset, i) != 0);
}
}
grpc_channel_args_destroy(ch_args);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_create();
test_set_compression_algorithm();
test_compression_algorithm_states();
return 0;
}

@ -70,7 +70,7 @@ static void process_auth_failure(void *state, grpc_auth_context *ctx,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void *user_data) { void *user_data) {
GPR_ASSERT(state == NULL); GPR_ASSERT(state == NULL);
cb(user_data, NULL, 0, 0); cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
} }
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,

@ -73,7 +73,7 @@ static void process_auth_failure(void *state, grpc_auth_context *ctx,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void *user_data) { void *user_data) {
GPR_ASSERT(state == NULL); GPR_ASSERT(state == NULL);
cb(user_data, NULL, 0, 0); cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
} }
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,

@ -73,7 +73,7 @@ static void process_auth_failure(void *state, grpc_auth_context *ctx,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void *user_data) { void *user_data) {
GPR_ASSERT(state == NULL); GPR_ASSERT(state == NULL);
cb(user_data, NULL, 0, 0); cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
} }
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,

@ -101,7 +101,7 @@ static void process_auth_failure(void *state, grpc_auth_context *ctx,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void *user_data) { void *user_data) {
GPR_ASSERT(state == NULL); GPR_ASSERT(state == NULL);
cb(user_data, NULL, 0, 0); cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
} }
static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f,

@ -79,7 +79,7 @@ static void process_oauth2_success(void *state, grpc_auth_context *ctx,
client_identity); client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name( GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1); ctx, client_identity_property_name) == 1);
cb(user_data, oauth2, 1, 1); cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_OK, NULL);
} }
static void process_oauth2_failure(void *state, grpc_auth_context *ctx, static void process_oauth2_failure(void *state, grpc_auth_context *ctx,
@ -90,7 +90,7 @@ static void process_oauth2_failure(void *state, grpc_auth_context *ctx,
find_metadata(md, md_count, "Authorization", oauth2_md); find_metadata(md, md_count, "Authorization", oauth2_md);
GPR_ASSERT(state == NULL); GPR_ASSERT(state == NULL);
GPR_ASSERT(oauth2 != NULL); GPR_ASSERT(oauth2 != NULL);
cb(user_data, oauth2, 1, 0); cb(user_data, oauth2, 1, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
} }
static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(

@ -88,7 +88,8 @@ static void test_get(int use_ssl, int port) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) { while (!g_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
n_seconds_time(20));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host); gpr_free(host);
@ -114,7 +115,8 @@ static void test_post(int use_ssl, int port) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) { while (!g_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
n_seconds_time(20));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host); gpr_free(host);

@ -256,7 +256,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
while (!state.read_done || !state.write_done) { while (!state.read_done || !state.write_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, deadline); grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
@ -353,7 +354,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
while (!write_st.done) { while (!write_st.done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, deadline); grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(write_st.ep); grpc_endpoint_destroy(write_st.ep);
@ -361,7 +363,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
while (!read_st.done) { while (!read_st.done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, deadline); grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices); gpr_free(slices);

@ -250,7 +250,8 @@ static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) { while (!sv->done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} }
@ -358,7 +359,8 @@ static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) { while (!cl->done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} }
@ -448,7 +450,8 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (a.cb_that_ran == NULL) { while (a.cb_that_ran == NULL) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
GPR_ASSERT(a.cb_that_ran == first_read_callback); GPR_ASSERT(a.cb_that_ran == first_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -467,7 +470,8 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (b.cb_that_ran == NULL) { while (b.cb_that_ran == NULL) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
/* Except now we verify that second_read_callback ran instead */ /* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback); GPR_ASSERT(b.cb_that_ran == second_read_callback);

@ -112,7 +112,8 @@ void test_succeeds(void) {
while (g_connections_complete == connections_complete_before) { while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -142,7 +143,8 @@ void test_fails(void) {
/* wait for the connection callback to finish */ /* wait for the connection callback to finish */
while (g_connections_complete == connections_complete_before) { while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, test_deadline()); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
test_deadline());
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -211,7 +213,8 @@ void test_times_out(void) {
GPR_ASSERT(g_connections_complete == GPR_ASSERT(g_connections_complete ==
connections_complete_before + is_after_deadline); connections_complete_before + is_after_deadline);
} }
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));

@ -187,7 +187,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
GPR_ASSERT(state.read_bytes == state.target_read_bytes); GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -224,7 +225,8 @@ static void large_read_test(ssize_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) { while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
GPR_ASSERT(state.read_bytes == state.target_read_bytes); GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -285,7 +287,8 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
for (;;) { for (;;) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
do { do {
bytes_read = bytes_read =
@ -365,7 +368,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
if (state.write_done) { if (state.write_done) {
break; break;
} }
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} }
@ -422,7 +426,8 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
if (state.write_done) { if (state.write_done) {
break; break;
} }
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break; break;

@ -137,7 +137,8 @@ static void test_connect(int n) {
while (g_nconnects == nconnects_before && while (g_nconnects == nconnects_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
gpr_log(GPR_DEBUG, "wait done"); gpr_log(GPR_DEBUG, "wait done");

@ -146,7 +146,8 @@ static void test_receive(int number_of_clients) {
while (g_number_of_reads == number_of_reads_before && while (g_number_of_reads == number_of_reads_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
} }
GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd); close(clifd);

@ -85,7 +85,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) {
gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
while (!request.is_done) { while (!request.is_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&request.pollset, &worker, grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)); gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));

@ -96,8 +96,8 @@ int main(int argc, char **argv) {
gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
while (!sync.is_done) { while (!sync.is_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&sync.pollset, &worker, grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));

@ -111,7 +111,7 @@ int main(int argc, char **argv) {
gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
while (!sync.is_done) { while (!sync.is_done) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&sync.pollset, &worker, grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)); gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));

@ -178,7 +178,7 @@ static int pick_port_using_server(char *server) {
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (pr.port == -1) { while (pr.port == -1) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_pollset_work(&pr.pollset, &worker, grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));

@ -134,7 +134,8 @@ void reconnect_server_poll(reconnect_server *server, int seconds) {
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(seconds, GPR_TIMESPAN)); gpr_time_from_seconds(seconds, GPR_TIMESPAN));
gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset));
grpc_pollset_work(&server->pollset, &worker, deadline); grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset));
} }

@ -56,6 +56,10 @@
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_posix.h"
#endif
using grpc::cpp::test::util::EchoRequest; using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse; using grpc::cpp::test::util::EchoResponse;
using std::chrono::system_clock; using std::chrono::system_clock;
@ -67,8 +71,41 @@ namespace {
void* tag(int i) { return (void*)(gpr_intptr)i; } void* tag(int i) { return (void*)(gpr_intptr)i; }
class Verifier { #ifdef GPR_POSIX_SOCKET
static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
int timeout) {
GPR_ASSERT(timeout == 0);
return poll(pfds, nfds, timeout);
}
class PollOverride {
public:
PollOverride(grpc_poll_function_type f) {
prev_ = grpc_poll_function;
grpc_poll_function = f;
}
~PollOverride() { grpc_poll_function = prev_; }
private:
grpc_poll_function_type prev_;
};
class PollingCheckRegion : public PollOverride {
public: public:
explicit PollingCheckRegion(bool allow_blocking)
: PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {}
};
#else
class PollingCheckRegion {
public:
explicit PollingCheckRegion(bool allow_blocking) {}
};
#endif
class Verifier : public PollingCheckRegion {
public:
explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {}
Verifier& Expect(int i, bool expect_ok) { Verifier& Expect(int i, bool expect_ok) {
expectations_[tag(i)] = expect_ok; expectations_[tag(i)] = expect_ok;
return *this; return *this;
@ -78,7 +115,17 @@ class Verifier {
while (!expectations_.empty()) { while (!expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
EXPECT_TRUE(cq->Next(&got_tag, &ok)); if (spin_) {
for (;;) {
auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_TRUE(cq->Next(&got_tag, &ok));
}
auto it = expectations_.find(got_tag); auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end()); EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok); EXPECT_EQ(it->second, ok);
@ -90,14 +137,34 @@ class Verifier {
if (expectations_.empty()) { if (expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), if (spin_) {
CompletionQueue::TIMEOUT); while (std::chrono::system_clock::now() < deadline) {
EXPECT_EQ(
cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
CompletionQueue::TIMEOUT);
}
} else {
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
CompletionQueue::TIMEOUT);
}
} else { } else {
while (!expectations_.empty()) { while (!expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), if (spin_) {
CompletionQueue::GOT_EVENT); for (;;) {
GPR_ASSERT(std::chrono::system_clock::now() < deadline);
auto r =
cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
CompletionQueue::GOT_EVENT);
}
auto it = expectations_.find(got_tag); auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end()); EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok); EXPECT_EQ(it->second, ok);
@ -108,9 +175,10 @@ class Verifier {
private: private:
std::map<void*, bool> expectations_; std::map<void*, bool> expectations_;
bool spin_;
}; };
class AsyncEnd2endTest : public ::testing::Test { class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
protected: protected:
AsyncEnd2endTest() {} AsyncEnd2endTest() {}
@ -160,15 +228,15 @@ class AsyncEnd2endTest : public ::testing::Test {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -182,18 +250,18 @@ class AsyncEnd2endTest : public ::testing::Test {
std::ostringstream server_address_; std::ostringstream server_address_;
}; };
TEST_F(AsyncEnd2endTest, SimpleRpc) { TEST_P(AsyncEnd2endTest, SimpleRpc) {
ResetStub(); ResetStub();
SendRpc(1); SendRpc(1);
} }
TEST_F(AsyncEnd2endTest, SequentialRpcs) { TEST_P(AsyncEnd2endTest, SequentialRpcs) {
ResetStub(); ResetStub();
SendRpc(10); SendRpc(10);
} }
// Test a simple RPC using the async version of Next // Test a simple RPC using the async version of Next
TEST_F(AsyncEnd2endTest, AsyncNextRpc) { TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -214,30 +282,32 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now()); std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10)); std::chrono::system_clock::now() + std::chrono::seconds(10));
Verifier().Verify(cq_.get(), time_now); Verifier(GetParam()).Verify(cq_.get(), time_now);
Verifier().Verify(cq_.get(), time_now); Verifier(GetParam()).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get(), time_limit); Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
Verifier().Expect(3, true).Verify( Verifier(GetParam())
cq_.get(), std::chrono::system_clock::time_point::max()); .Expect(3, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify( Verifier(GetParam())
cq_.get(), std::chrono::system_clock::time_point::max()); .Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
// Two pings and a final pong. // Two pings and a final pong.
TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -256,41 +326,41 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
cli_stream->Write(send_request, tag(3)); cli_stream->Write(send_request, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5)); cli_stream->Write(send_request, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(6)); srv_stream.Read(&recv_request, tag(6));
Verifier().Expect(6, true).Verify(cq_.get()); Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
Verifier().Expect(7, true).Verify(cq_.get()); Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8)); srv_stream.Read(&recv_request, tag(8));
Verifier().Expect(8, false).Verify(cq_.get()); Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9)); srv_stream.Finish(send_response, Status::OK, tag(9));
Verifier().Expect(9, true).Verify(cq_.get()); Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier().Expect(10, true).Verify(cq_.get()); Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
// One ping, two pongs. // One ping, two pongs.
TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -309,38 +379,38 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3)); srv_stream.Write(send_response, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(4)); cli_stream->Read(&recv_response, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5)); srv_stream.Write(send_response, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier().Expect(6, true).Verify(cq_.get()); Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7)); srv_stream.Finish(Status::OK, tag(7));
Verifier().Expect(7, true).Verify(cq_.get()); Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(8)); cli_stream->Read(&recv_response, tag(8));
Verifier().Expect(8, false).Verify(cq_.get()); Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9)); cli_stream->Finish(&recv_status, tag(9));
Verifier().Expect(9, true).Verify(cq_.get()); Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
// One ping, one pong. // One ping, one pong.
TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -359,40 +429,40 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
cli_stream->Write(send_request, tag(3)); cli_stream->Write(send_request, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5)); srv_stream.Write(send_response, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier().Expect(6, true).Verify(cq_.get()); Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
Verifier().Expect(7, true).Verify(cq_.get()); Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8)); srv_stream.Read(&recv_request, tag(8));
Verifier().Expect(8, false).Verify(cq_.get()); Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9)); srv_stream.Finish(Status::OK, tag(9));
Verifier().Expect(9, true).Verify(cq_.get()); Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier().Expect(10, true).Verify(cq_.get()); Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
// Metadata tests // Metadata tests
TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -416,7 +486,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@ -426,16 +496,16 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -457,15 +527,15 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4)); response_reader->ReadInitialMetadata(tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
@ -473,16 +543,16 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5)); response_writer.Finish(send_response, Status::OK, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6)); response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier().Expect(6, true).Verify(cq_.get()); Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -504,20 +574,20 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4)); response_writer.Finish(send_response, Status::OK, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(5)); response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -526,7 +596,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size()); EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
} }
TEST_F(AsyncEnd2endTest, MetadataRpc) { TEST_P(AsyncEnd2endTest, MetadataRpc) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -563,7 +633,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@ -573,9 +643,9 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4)); response_reader->ReadInitialMetadata(tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
@ -586,10 +656,10 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5)); response_writer.Finish(send_response, Status::OK, tag(5));
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6)); response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier().Expect(6, true).Verify(cq_.get()); Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -599,7 +669,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
} }
// Server uses AsyncNotifyWhenDone API to check for cancellation // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_F(AsyncEnd2endTest, ServerCheckCancellation) { TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -620,21 +690,21 @@ TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel(); cli_ctx.TryCancel();
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled()); EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, false).Verify(cq_.get()); Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
} }
// Server uses AsyncNotifyWhenDone API to check for normal finish // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_F(AsyncEnd2endTest, ServerCheckDone) { TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ResetStub(); ResetStub();
EchoRequest send_request; EchoRequest send_request;
@ -655,23 +725,23 @@ TEST_F(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier().Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
Verifier().Expect(3, true).Verify(cq_.get()); Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier().Expect(5, true).Verify(cq_.get()); Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_FALSE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify(cq_.get()); Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
TEST_F(AsyncEnd2endTest, UnimplementedRpc) { TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
std::shared_ptr<ChannelInterface> channel = CreateChannel( std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments()); server_address_.str(), InsecureCredentials(), ChannelArguments());
std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub; std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
@ -687,12 +757,15 @@ TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, false).Verify(cq_.get()); Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message()); EXPECT_EQ("", recv_status.error_message());
} }
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
::testing::Values(false, true));
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

@ -459,6 +459,20 @@
"test/core/surface/byte_buffer_reader_test.c" "test/core/surface/byte_buffer_reader_test.c"
] ]
}, },
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "grpc_channel_args_test",
"src": [
"test/core/channel/channel_args_test.c"
]
},
{ {
"deps": [ "deps": [
"gpr", "gpr",
@ -1597,8 +1611,7 @@
"grpc", "grpc",
"grpc++", "grpc++",
"grpc++_test_util", "grpc++_test_util",
"grpc_test_util", "grpc_test_util"
"grpc_zookeeper"
], ],
"headers": [], "headers": [],
"language": "c++", "language": "c++",

@ -567,6 +567,24 @@
"windows" "windows"
] ]
}, },
{
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"exclude_configs": [],
"flaky": false,
"language": "c",
"name": "grpc_channel_args_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
]
},
{ {
"ci_platforms": [ "ci_platforms": [
"linux", "linux",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save