Merge branch 'master' into epoll_changes

pull/6803/head
Sree Kuchibhotla 9 years ago
commit f325ee367a
  1. 5
      BUILD
  2. 2
      Makefile
  3. 4
      build.yaml
  4. 127
      doc/interop-test-descriptions.md
  5. 43
      include/grpc++/impl/codegen/async_stream.h
  6. 0
      include/grpc++/impl/codegen/core_codegen.h
  7. 43
      include/grpc++/impl/codegen/impl/async_stream.h
  8. 3
      include/grpc++/impl/grpc_library.h
  9. 2
      setup.py
  10. 4
      src/compiler/objective_c_generator.cc
  11. 20
      src/core/lib/channel/http_client_filter.c
  12. 2
      src/core/lib/channel/http_server_filter.c
  13. 39
      src/core/lib/transport/metadata.c
  14. 2
      src/cpp/common/core_codegen.cc
  15. 5
      src/csharp/Grpc.Core/Metadata.cs
  16. 2
      src/objective-c/GRPCClient/GRPCCall.h
  17. 9
      src/objective-c/ProtoRPC/ProtoMethod.h
  18. 4
      src/objective-c/ProtoRPC/ProtoMethod.m
  19. 15
      src/objective-c/ProtoRPC/ProtoRPC.h
  20. 6
      src/objective-c/ProtoRPC/ProtoRPC.m
  21. 15
      src/objective-c/ProtoRPC/ProtoService.h
  22. 4
      src/objective-c/ProtoRPC/ProtoService.m
  23. 19
      src/proto/grpc/testing/messages.proto
  24. 15
      src/python/grpcio/grpc/_auth.py
  25. 3
      src/python/grpcio/tests/interop/client.py
  26. 13
      src/python/grpcio/tests/interop/methods.py
  27. 88
      src/python/grpcio/tests/qps/benchmark_client.py
  28. 5
      src/python/grpcio/tests/qps/client_runner.py
  29. 4
      src/python/grpcio/tests/qps/qps_worker.py
  30. 5
      src/python/grpcio/tests/qps/worker_server.py
  31. 105
      test/cpp/interop/interop_client.cc
  32. 24
      test/cpp/interop/server_main.cc
  33. 1
      tools/doxygen/Doxyfile.c++
  34. 3
      tools/doxygen/Doxyfile.c++.internal
  35. 2505
      tools/doxygen/Doxyfile.c++.internal.orig
  36. 10
      tools/run_tests/performance/scenario_config.py
  37. 2
      tools/run_tests/run_interop_tests.py
  38. 8
      tools/run_tests/sources_and_headers.json
  39. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  40. 9
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  41. 2
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  42. 6
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -1216,8 +1216,8 @@ cc_library(
cc_library(
name = "grpc++",
srcs = [
"include/grpc++/impl/codegen/core_codegen.h",
"src/cpp/client/secure_credentials.h",
"src/cpp/common/core_codegen.h",
"src/cpp/common/secure_auth_context.h",
"src/cpp/server/secure_server_credentials.h",
"src/cpp/client/create_channel_internal.h",
@ -1266,6 +1266,7 @@ cc_library(
"include/grpc++/grpc++.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/core_codegen.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/method_handler_impl.h",
"include/grpc++/impl/rpc_method.h",
@ -1369,7 +1370,6 @@ cc_library(
name = "grpc++_unsecure",
srcs = [
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/core_codegen.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/common/insecure_create_auth_context.cc",
@ -1410,6 +1410,7 @@ cc_library(
"include/grpc++/grpc++.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/core_codegen.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/method_handler_impl.h",
"include/grpc++/impl/rpc_method.h",

@ -3455,6 +3455,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/core_codegen.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/method_handler_impl.h \
include/grpc++/impl/rpc_method.h \
@ -3810,6 +3811,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/core_codegen.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/method_handler_impl.h \
include/grpc++/impl/rpc_method.h \

@ -639,6 +639,7 @@ filegroups:
- include/grpc++/grpc++.h
- include/grpc++/impl/call.h
- include/grpc++/impl/client_unary_call.h
- include/grpc++/impl/codegen/core_codegen.h
- include/grpc++/impl/grpc_library.h
- include/grpc++/impl/method_handler_impl.h
- include/grpc++/impl/rpc_method.h
@ -675,7 +676,6 @@ filegroups:
- include/grpc++/support/time.h
headers:
- src/cpp/client/create_channel_internal.h
- src/cpp/common/core_codegen.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/thread_pool_interface.h
src:
@ -934,8 +934,8 @@ libs:
build: all
language: c++
headers:
- include/grpc++/impl/codegen/core_codegen.h
- src/cpp/client/secure_credentials.h
- src/cpp/common/core_codegen.h
- src/cpp/common/secure_auth_context.h
- src/cpp/server/secure_server_credentials.h
src:

@ -93,26 +93,24 @@ Client asserts:
### large_compressed_unary
This test verifies compressed unary calls succeed in sending messages. It
sends one unary request for every combination of compression algorithm and
payload type.
sends one unary request for every payload type, with and without requesting a
compressed response from the server.
In all scenarios, whether compression was actually performed is determined by
the compression bit in the response's message flags. The response's compression
value indicates which algorithm was used if said compression bit is set.
the compression bit in the response's message flags.
Server features:
* [UnaryCall][]
* [Compressable Payload][]
* [Uncompressable Payload][]
* [Random Payload][]
Procedure:
1. Client calls UnaryCall with:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
request_compressed_response: bool
response_type: COMPRESSABLE
response_size: 314159
payload:{
@ -123,11 +121,10 @@ Procedure:
Client asserts:
* call was successful
* response payload type is COMPRESSABLE
* response compression is consistent with the requested one.
* if `response_compression == NONE`, the response MUST NOT have the
* if `request_compressed_response` is false, the response MUST NOT have the
compressed message flag set.
* if `request_compressed_response` is true, the response MUST have the
compressed message flag set.
* if `response_compression != NONE`, the response MUST have the compressed
message flag set.
* response payload body is 314159 bytes in size
* clients are free to assert that the response payload body contents are
zero and comparing the entire response message against a golden response
@ -136,7 +133,7 @@ Procedure:
2. Client calls UnaryCall with:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
request_compressed_response: bool
response_type: UNCOMPRESSABLE
response_size: 314159
payload:{
@ -147,29 +144,11 @@ Procedure:
Client asserts:
* call was successful
* response payload type is UNCOMPRESSABLE
* response compression is consistent with the requested one.
* the response MUST NOT have the compressed message flag set.
* the response MAY have the compressed message flag set. Some
implementations will choose to compress the payload even when the output
size if larger than the input.
* response payload body is 314159 bytes in size
* clients are free to assert that the response payload body contents are
identical to the golden uncompressable data at `test/cpp/interop/rnd.dat`.
3. Client calls UnaryCall with:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
response_type: RANDOM
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* call was successful
* response payload type is either COMPRESSABLE or UNCOMPRESSABLE
* the behavior is consistent with the randomly chosen incoming payload type,
as described in their respective sections.
### client_streaming
@ -245,7 +224,7 @@ Procedure:
size: 31415
}
response_parameters:{
size: 9
size: 59
}
response_parameters:{
size: 2653
@ -272,7 +251,6 @@ Server features:
* [StreamingOutputCall][]
* [Compressable Payload][]
* [Uncompressable Payload][]
* [Random Payload][]
Procedure:
@ -280,13 +258,13 @@ Procedure:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
request_compressed_response: bool
response_type:COMPRESSABLE
response_parameters:{
size: 31415
}
response_parameters:{
size: 9
size: 59
}
response_parameters:{
size: 2653
@ -301,12 +279,11 @@ Procedure:
* call was successful
* exactly four responses
* response payloads are COMPRESSABLE
* response compression is consistent with the requested one.
* if `response_compression == NONE`, the response MUST NOT have the
compressed message flag set.
* if `response_compression != NONE`, the response MUST have the compressed
message flag set.
* response payload bodies are sized (in order): 31415, 9, 2653, 58979
* if `request_compressed_response` is false, the response's messages MUST
NOT have the compressed message flag set.
* if `request_compressed_response` is true, the response's messages MUST
have the compressed message flag set.
* response payload bodies are sized (in order): 31415, 59, 2653, 58979
* clients are free to assert that the response payload body contents are
zero and comparing the entire response messages against golden responses
@ -315,13 +292,13 @@ Procedure:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
request_compressed_response: bool
response_type:UNCOMPRESSABLE
response_parameters:{
size: 31415
}
response_parameters:{
size: 9
size: 59
}
response_parameters:{
size: 2653
@ -336,40 +313,14 @@ Procedure:
* call was successful
* exactly four responses
* response payloads are UNCOMPRESSABLE
* response compressions are consistent with the requested one.
* the responses MUST NOT have the compressed message flag set.
* response payload bodies are sized (in order): 31415, 9, 2653, 58979
* the response MAY have the compressed message flag set. Some
implementations will choose to compress the payload even when the output
size if larger than the input.
* response payload bodies are sized (in order): 31415, 59, 2653, 58979
* clients are free to assert that the body of the responses are identical to
the golden uncompressable data at `test/cpp/interop/rnd.dat`.
3. Client calls StreamingOutputCall with:
```
{
response_compression: <one of {NONE, GZIP, DEFLATE}>
response_type:RANDOM
response_parameters:{
size: 31415
}
response_parameters:{
size: 9
}
response_parameters:{
size: 2653
}
response_parameters:{
size: 58979
}
}
```
Client asserts:
* call was successful
* response payload type is either COMPRESSABLE or UNCOMPRESSABLE
* the behavior is consistent with the randomly chosen incoming payload type,
as described in their respective sections.
### ping_pong
This test verifies that full duplex bidi is supported.
@ -399,7 +350,7 @@ Procedure:
{
response_type: COMPRESSABLE
response_parameters:{
size: 9
size: 59
}
payload:{
body: 8 bytes of zeros
@ -932,9 +883,9 @@ Server implements EmptyCall which immediately returns the empty message.
[UnaryCall]: #unarycall
Server implements UnaryCall which immediately returns a SimpleResponse with a
payload body of size SimpleRequest.response_size bytes and type as appropriate
for the SimpleRequest.response_type. If the server does not support the
response_type, then it should fail the RPC with INVALID_ARGUMENT.
payload body of size `SimpleRequest.response_size` bytes and type as appropriate
for the `SimpleRequest.response_type`. If the server does not support the
`response_type`, then it should fail the RPC with `INVALID_ARGUMENT`.
### StreamingInputCall
[StreamingInputCall]: #streaminginputcall
@ -974,15 +925,7 @@ COMPRESSABLE.
When the client requests UNCOMPRESSABLE payload, the response includes a payload
of the size requested containing uncompressable data and the payload type is
UNCOMPRESSABLE. A 512 kB dump from /dev/urandom is the current golden data,
stored at `test/cpp/interop/rnd.dat`
### Random Payload
[Random Payload]: #random-payload
When the client requests RANDOM payload, the response includes either a randomly
chosen COMPRESSABLE or UNCOMPRESSABLE payload. The data and the payload type
will be consistent with this choice.
UNCOMPRESSABLE.
### Echo Status
[Echo Status]: #echo-status
@ -1004,8 +947,8 @@ key and the corresponding value back to the client as trailing metadata.
[Observe ResponseParameters.interval_us]: #observe-responseparametersinterval_us
In StreamingOutputCall and FullDuplexCall, server delays sending a
StreamingOutputCallResponse by the ResponseParameters's interval_us for that
particular response, relative to the last response sent. That is, interval_us
StreamingOutputCallResponse by the ResponseParameters's `interval_us` for that
particular response, relative to the last response sent. That is, `interval_us`
acts like a sleep *before* sending the response and accumulates from one
response to the next.
@ -1027,13 +970,13 @@ an email address.
#### Echo OAuth scope
[Echo OAuth Scope]: #echo-oauth-scope
If a SimpleRequest has fill_oauth_scope=true and that request was successfully
If a SimpleRequest has `fill_oauth_scope=true` and that request was successfully
authenticated via OAuth, then the SimpleResponse should have oauth_scope filled
with the scope of the method being invoked.
Although a general server-side feature, most test servers won't implement this
feature. The TLS server grpc-test.sandbox.googleapis.com:443 supports this feature.
It requires at least the OAuth scope
feature. The TLS server `grpc-test.sandbox.googleapis.com:443` supports this
feature. It requires at least the OAuth scope
`https://www.googleapis.com/auth/xapi.zoo` for authentication to succeed.
Discussion:

@ -299,8 +299,16 @@ class ClientAsyncReaderWriter GRPC_FINAL
};
template <class W, class R>
class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const W& msg, const Status& status, void* tag) = 0;
virtual void FinishWithError(const Status& status, void* tag) = 0;
};
template <class W, class R>
class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
public:
explicit ServerAsyncReader(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -321,7 +329,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
@ -338,7 +346,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE {
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
@ -363,8 +371,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
};
template <class W>
class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
};
template <class W>
class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
public:
explicit ServerAsyncWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -391,7 +405,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
@ -414,9 +428,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
/// Server-side interface for asynchronous bi-directional streaming.
template <class W, class R>
class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
};
template <class W, class R>
class ServerAsyncReaderWriter GRPC_FINAL
: public ServerAsyncReaderWriterInterface<W, R> {
public:
explicit ServerAsyncReaderWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -449,7 +470,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,

@ -295,8 +295,16 @@ class ClientAsyncReaderWriter GRPC_FINAL
};
template <class W, class R>
class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const W& msg, const Status& status, void* tag) = 0;
virtual void FinishWithError(const Status& status, void* tag) = 0;
};
template <class W, class R>
class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
public:
explicit ServerAsyncReader(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -316,7 +324,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -332,7 +340,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE {
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
@ -356,8 +364,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
};
template <class W>
class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
};
template <class W>
class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
public:
explicit ServerAsyncWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -382,7 +396,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -404,9 +418,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
/// Server-side interface for asynchronous bi-directional streaming.
template <class W, class R>
class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
};
template <class W, class R>
class ServerAsyncReaderWriter GRPC_FINAL
: public ServerAsyncReaderWriterInterface<W, R> {
public:
explicit ServerAsyncReaderWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@ -437,7 +458,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);

@ -37,11 +37,10 @@
#include <iostream>
#include <grpc++/impl/codegen/config.h>
#include <grpc++/impl/codegen/core_codegen.h>
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc/grpc.h>
#include "src/cpp/common/core_codegen.h"
namespace grpc {
namespace internal {

@ -202,7 +202,7 @@ TEST_PACKAGE_DATA = {
}
TESTS_REQUIRE = (
'oauth2client>=1.4.7',
'oauth2client>=2.1.0',
'protobuf>=3.0.0a3',
'coverage>=4.0',
) + INSTALL_REQUIRES

@ -94,7 +94,7 @@ void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method,
void PrintAdvancedSignature(Printer *printer, const MethodDescriptor *method,
map< ::grpc::string, ::grpc::string> vars) {
vars["method_name"] = "RPCTo" + vars["method_name"];
vars["return_type"] = "ProtoRPC *";
vars["return_type"] = "GRPCProtoCall *";
PrintMethodSignature(printer, method, vars);
}
@ -199,7 +199,7 @@ void PrintMethodImplementations(Printer *printer,
" marshalling and parsing.\n");
printer.Print(vars,
"@interface $service_class$ :"
" ProtoService<$service_class$>\n");
" GRPCProtoService<$service_class$>\n");
printer.Print(
"- (instancetype)initWithHost:(NSString *)host"
" NS_DESIGNATED_INITIALIZER;\n");

@ -39,6 +39,9 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@ -74,7 +77,24 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
} else if (md->key == GRPC_MDSTR_STATUS) {
grpc_call_element_send_cancel(a->exec_ctx, a->elem);
return NULL;
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
const char *value_str = grpc_mdstr_as_c_string(md->value);
if (strncmp(value_str, EXPECTED_CONTENT_TYPE,
EXPECTED_CONTENT_TYPE_LENGTH) == 0 &&
(value_str[EXPECTED_CONTENT_TYPE_LENGTH] == '+' ||
value_str[EXPECTED_CONTENT_TYPE_LENGTH] == ';')) {
/* Although the C implementation doesn't (currently) generate them,
any custom +-suffix is explicitly valid. */
/* TODO(klempner): We should consider preallocating common values such
as +proto or +json, or at least stashing them if we see them. */
/* TODO(klempner): Should we be surfacing this to application code? */
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
gpr_log(GPR_INFO, "Unexpected content-type '%s'", value_str);
}
return NULL;
}
return md;

@ -108,7 +108,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
gpr_log(GPR_INFO, "Unexpected content-type %s", value_str);
gpr_log(GPR_INFO, "Unexpected content-type '%s'", value_str);
}
return NULL;
} else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD ||

@ -129,7 +129,10 @@ typedef struct mdtab_shard {
internal_metadata **elems;
size_t count;
size_t capacity;
size_t free;
/** Estimate of the number of unreferenced mdelems in the hash table.
This will eventually converge to the exact number, but it's instantaneous
accuracy is not guaranteed */
gpr_atm free_estimate;
} mdtab_shard;
#define LOG2_STRTAB_SHARD_COUNT 5
@ -217,7 +220,7 @@ void grpc_mdctx_global_init(void) {
mdtab_shard *shard = &g_mdtab_shard[i];
gpr_mu_init(&shard->mu);
shard->count = 0;
shard->free = 0;
gpr_atm_no_barrier_store(&shard->free_estimate, 0);
shard->capacity = INITIAL_MDTAB_CAPACITY;
shard->elems = gpr_malloc(sizeof(*shard->elems) * shard->capacity);
memset(shard->elems, 0, sizeof(*shard->elems) * shard->capacity);
@ -281,10 +284,8 @@ static void ref_md_locked(mdtab_shard *shard,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
#endif
if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 2)) {
shard->free--;
} else {
GPR_ASSERT(1 != gpr_atm_no_barrier_fetch_add(&md->refcnt, -1));
if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) {
gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -1);
}
}
@ -447,6 +448,7 @@ static void gc_mdtab(mdtab_shard *shard) {
size_t i;
internal_metadata **prev_next;
internal_metadata *md, *next;
gpr_atm num_freed = 0;
GPR_TIMER_BEGIN("gc_mdtab", 0);
for (i = 0; i < shard->capacity; i++) {
@ -463,13 +465,14 @@ static void gc_mdtab(mdtab_shard *shard) {
}
gpr_free(md);
*prev_next = next;
shard->free--;
num_freed++;
shard->count--;
} else {
prev_next = &md->bucket_next;
}
}
}
gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -num_freed);
GPR_TIMER_END("gc_mdtab", 0);
}
@ -504,7 +507,8 @@ static void grow_mdtab(mdtab_shard *shard) {
}
static void rehash_mdtab(mdtab_shard *shard) {
if (shard->free > shard->capacity / 4) {
if (gpr_atm_no_barrier_load(&shard->free_estimate) >
(gpr_atm)(shard->capacity / 4)) {
gc_mdtab(shard);
} else {
grow_mdtab(shard);
@ -553,7 +557,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey,
/* not found: create a new pair */
md = gpr_malloc(sizeof(internal_metadata));
gpr_atm_rel_store(&md->refcnt, 2);
gpr_atm_rel_store(&md->refcnt, 1);
md->key = key;
md->value = value;
md->user_data = 0;
@ -645,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
this function - meaning that no adjustment to mdtab_free is necessary,
simplifying the logic here to be just an atomic increment */
/* use C assert to have this removed in opt builds */
assert(gpr_atm_no_barrier_load(&md->refcnt) >= 2);
assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
return gmd;
}
@ -662,18 +666,13 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
#endif
if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash);
uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash);
if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
/* once the refcount hits zero, some other thread can come along and
free md at any time: it's unsafe from this point on to access it */
mdtab_shard *shard =
&g_mdtab_shard[SHARD_IDX(hash, LOG2_MDTAB_SHARD_COUNT)];
GPR_TIMER_BEGIN("grpc_mdelem_unref.to_zero", 0);
gpr_mu_lock(&shard->mu);
if (1 == gpr_atm_no_barrier_load(&md->refcnt)) {
shard->free++;
gpr_atm_no_barrier_store(&md->refcnt, 0);
}
gpr_mu_unlock(&shard->mu);
GPR_TIMER_END("grpc_mdelem_unref.to_zero", 0);
gpr_atm_no_barrier_fetch_add(&shard->free_estimate, 1);
}
}

@ -31,7 +31,7 @@
*
*/
#include "src/cpp/common/core_codegen.h"
#include <grpc++/impl/codegen/core_codegen.h>
#include <stdlib.h>

@ -95,6 +95,7 @@ namespace Grpc.Core
public void Insert(int index, Metadata.Entry item)
{
GrpcPreconditions.CheckNotNull(item);
CheckWriteable();
entries.Insert(index, item);
}
@ -114,6 +115,7 @@ namespace Grpc.Core
set
{
GrpcPreconditions.CheckNotNull(value);
CheckWriteable();
entries[index] = value;
}
@ -121,6 +123,7 @@ namespace Grpc.Core
public void Add(Metadata.Entry item)
{
GrpcPreconditions.CheckNotNull(item);
CheckWriteable();
entries.Add(item);
}
@ -187,7 +190,7 @@ namespace Grpc.Core
/// <summary>
/// Metadata entry
/// </summary>
public struct Entry
public class Entry
{
private static readonly Encoding Encoding = Encoding.ASCII;
private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$");

@ -220,6 +220,8 @@ extern id const kGRPCTrailersKey;
* messages to the response side of the call indefinitely (depending on the semantics of the
* specific remote method called).
* To finish a call right away, invoke cancel.
* host parameter should not contain the scheme (http:// or https://), only the name or IP addr
* and the port number, for example @"localhost:5050".
*/
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path

@ -37,6 +37,7 @@
* A fully-qualified proto service method name. Full qualification is needed because a gRPC endpoint
* can implement multiple services.
*/
__attribute__((deprecated("Please use GRPCProtoMethod.")))
@interface ProtoMethod : NSObject
@property(nonatomic, readonly) NSString *package;
@property(nonatomic, readonly) NSString *service;
@ -48,3 +49,11 @@
service:(NSString *)service
method:(NSString *)method;
@end
/**
* This subclass is empty now. Eventually we'll remove ProtoMethod class
* to avoid potential naming conflict
*/
@interface GRPCProtoMethod : ProtoMethod
@end

@ -53,3 +53,7 @@
}
}
@end
@implementation GRPCProtoMethod
@end

@ -36,13 +36,26 @@
#import "ProtoMethod.h"
__attribute__((deprecated("Please use GRPCProtoCall.")))
@interface ProtoRPC : GRPCCall
/**
* host parameter should not contain the scheme (http:// or https://), only the name or IP addr
* and the port number, for example @"localhost:5050".
*/
- (instancetype)initWithHost:(NSString *)host
method:(ProtoMethod *)method
method:(GRPCProtoMethod *)method
requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable NS_DESIGNATED_INITIALIZER;
- (void)start;
@end
/**
* This subclass is empty now. Eventually we'll remove ProtoRPC class
* to avoid potential naming conflict
*/
@interface GRPCProtoCall : ProtoRPC
@end

@ -70,7 +70,7 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
method:(ProtoMethod *)method
method:(GRPCProtoMethod *)method
requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable {
@ -117,3 +117,7 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
_responseWriteable = nil;
}
@end
@implementation GRPCProtoCall
@end

@ -33,17 +33,28 @@
#import <Foundation/Foundation.h>
@class ProtoRPC;
@class GRPCProtoCall;
@protocol GRXWriteable;
@class GRXWriter;
__attribute__((deprecated("Please use GRPCProtoService.")))
@interface ProtoService : NSObject
- (instancetype)initWithHost:(NSString *)host
packageName:(NSString *)packageName
serviceName:(NSString *)serviceName NS_DESIGNATED_INITIALIZER;
- (ProtoRPC *)RPCToMethod:(NSString *)method
- (GRPCProtoCall *)RPCToMethod:(NSString *)method
requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable;
@end
/**
* This subclass is empty now. Eventually we'll remove ProtoService class
* to avoid potential naming conflict
*/
@interface GRPCProtoService : ProtoService
@end

@ -79,3 +79,7 @@
responsesWriteable:responsesWriteable];
}
@end
@implementation GRPCProtoService
@end

@ -41,17 +41,6 @@ enum PayloadType {
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
}
// Compression algorithms
enum CompressionType {
// No compression
NONE = 0;
GZIP = 1;
DEFLATE = 2;
}
// A block of data, to simply increase gRPC message size.
@ -88,8 +77,8 @@ message SimpleRequest {
// Whether SimpleResponse should include OAuth scope.
bool fill_oauth_scope = 5;
// Compression algorithm to be used by the server for the response (stream)
CompressionType response_compression = 6;
// Whether to request the server to compress the response.
bool request_compressed_response = 6;
// Whether server should return a given status
EchoStatus response_status = 7;
@ -145,8 +134,8 @@ message StreamingOutputCallRequest {
// Optional input payload sent along with the request.
Payload payload = 3;
// Compression algorithm to be used by the server for the response (stream)
CompressionType response_compression = 6;
// Whether to request the server to compress the response.
bool request_compressed_response = 6;
// Whether server should return a given status
EchoStatus response_status = 7;

@ -29,6 +29,7 @@
"""GRPCAuthMetadataPlugins for standard authentication."""
import inspect
from concurrent import futures
import grpc
@ -46,9 +47,21 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
self._credentials = credentials
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Hack to determine if these are JWT creds and we need to pass
# additional_claims when getting a token
if 'additional_claims' in inspect.getargspec(
credentials.get_access_token).args:
self._is_jwt = True
else:
self._is_jwt = False
def __call__(self, context, callback):
# MetadataPlugins cannot block (see grpc.beta.interfaces.py)
future = self._pool.submit(self._credentials.get_access_token)
if self._is_jwt:
future = self._pool.submit(self._credentials.get_access_token,
additional_claims={'aud': context.service_url})
else:
future = self._pool.submit(self._credentials.get_access_token)
future.add_done_callback(lambda x: self._get_token_callback(callback, x))
def _get_token_callback(self, callback, future):

@ -76,6 +76,9 @@ def _stub(args):
creds = oauth2client_client.GoogleCredentials.get_application_default()
scoped_creds = creds.create_scoped([args.oauth_scope])
call_creds = implementations.google_call_credentials(scoped_creds)
elif args.test_case == 'jwt_token_creds':
creds = oauth2client_client.GoogleCredentials.get_application_default()
call_creds = implementations.google_call_credentials(creds)
else:
call_creds = None
if args.use_tls:

@ -310,6 +310,16 @@ def _oauth2_auth_token(stub, args):
(response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, False)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
def _per_rpc_creds(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
@ -338,6 +348,7 @@ class TestCase(enum.Enum):
EMPTY_STREAM = 'empty_stream'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
JWT_TOKEN_CREDS = 'jwt_token_creds'
PER_RPC_CREDS = 'per_rpc_creds'
TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
@ -364,6 +375,8 @@ class TestCase(enum.Enum):
_compute_engine_creds(stub, args)
elif self is TestCase.OAUTH2_AUTH_TOKEN:
_oauth2_auth_token(stub, args)
elif self is TestCase.JWT_TOKEN_CREDS:
_jwt_token_creds(stub, args)
elif self is TestCase.PER_RPC_CREDS:
_per_rpc_creds(stub, args)
else:

@ -82,6 +82,7 @@ class BenchmarkClient:
self._response_callbacks = []
def add_response_callback(self, callback):
"""callback will be invoked as callback(client, query_time)"""
self._response_callbacks.append(callback)
@abc.abstractmethod
@ -95,10 +96,10 @@ class BenchmarkClient:
def stop(self):
pass
def _handle_response(self, query_time):
def _handle_response(self, client, query_time):
self._hist.add(query_time * 1e9) # Report times in nanoseconds
for callback in self._response_callbacks:
callback(query_time)
callback(client, query_time)
class UnarySyncBenchmarkClient(BenchmarkClient):
@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient):
start_time = time.time()
self._stub.UnaryCall(self._request, _TIMEOUT)
end_time = time.time()
self._handle_response(end_time - start_time)
self._handle_response(self, end_time - start_time)
class UnaryAsyncBenchmarkClient(BenchmarkClient):
@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
def _response_received(self, start_time, resp):
resp.result()
end_time = time.time()
self._handle_response(end_time - start_time)
self._handle_response(self, end_time - start_time)
def stop(self):
self._stub = None
class StreamingSyncBenchmarkClient(BenchmarkClient):
class _SyncStream(object):
def __init__(self, server, config, hist):
super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
def __init__(self, stub, generic, request, handle_response):
self._stub = stub
self._generic = generic
self._request = request
self._handle_response = handle_response
self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream
self._request_queue = queue.Queue()
self._send_time_queue = queue.Queue()
@ -157,15 +159,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
self._request_queue.put(self._request)
def start(self):
self._is_streaming = True
self._pool.submit(self._request_stream)
def stop(self):
self._is_streaming = False
self._pool.shutdown(wait=True)
self._stub = None
def _request_stream(self):
self._is_streaming = True
if self._generic:
stream_callable = self._stub.stream_stream(
@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait())
self._handle_response(
self, time.time() - self._send_time_queue.get_nowait())
def stop(self):
self._is_streaming = False
def _request_generator(self):
while self._is_streaming:
@ -187,46 +183,28 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
pass
class AsyncReceiver(face.ResponseReceiver):
"""Receiver for async stream responses."""
def __init__(self, send_time_queue, response_handler):
self._send_time_queue = send_time_queue
self._response_handler = response_handler
def initial_metadata(self, initial_mdetadata):
pass
def response(self, response):
end_time = time.time()
self._response_handler(end_time - self._send_time_queue.get_nowait())
def complete(self, terminal_metadata, code, details):
pass
class StreamingAsyncBenchmarkClient(BenchmarkClient):
class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
self._send_time_queue = queue.Queue()
self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
self._rendezvous = None
super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._pool = futures.ThreadPoolExecutor(
max_workers=config.outstanding_rpcs_per_channel)
self._streams = [_SyncStream(self._stub, self._generic,
self._request, self._handle_response)
for _ in xrange(config.outstanding_rpcs_per_channel)]
self._curr_stream = 0
def send_request(self):
if self._rendezvous is not None:
self._send_time_queue.put(time.time())
self._rendezvous.consume(self._request)
# Use a round_robin scheduler to determine what stream to send on
self._streams[self._curr_stream].send_request()
self._curr_stream = (self._curr_stream + 1) % len(self._streams)
def start(self):
if self._generic:
stream_callable = self._stub.stream_stream(
'grpc.testing.BenchmarkService', 'StreamingCall')
else:
stream_callable = self._stub.StreamingCall
self._rendezvous = stream_callable.event(
self._receiver, lambda *args: None, _TIMEOUT)
for stream in self._streams:
self._pool.submit(stream.start)
def stop(self):
self._rendezvous.terminate()
self._rendezvous = None
for stream in self._streams:
stream.stop()
self._pool.shutdown(wait=True)
self._stub = None

@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner):
self._client.stop()
self._client = None
def _send_request(self, response_time):
def _send_request(self, client, response_time):
if self._is_running:
self._client.send_request()
client.send_request()

@ -43,9 +43,7 @@ def run_worker_server(port):
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
# Drain outstanding requests for clean exit
time.sleep(2)
server.stop(0)
server.stop(2)
if __name__ == '__main__':

@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
server, config, qps_data)
elif config.rpc_type == control_pb2.STREAMING:
client = benchmark_client.StreamingAsyncBenchmarkClient(
server, config, qps_data)
else:
raise Exception('Async streaming client not supported')
else:
raise Exception('Unsupported client type {}'.format(config.client_type))

@ -55,8 +55,6 @@
namespace grpc {
namespace testing {
static const char* kRandomFile = "test/cpp/interop/rnd.dat";
namespace {
// The same value is defined by the Java client.
const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
@ -67,30 +65,26 @@ const int kReceiveDelayMilliSeconds = 20;
const int kLargeRequestSize = 271828;
const int kLargeResponseSize = 314159;
CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
grpc_compression_algorithm algorithm) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
return CompressionType::NONE;
case GRPC_COMPRESS_GZIP:
return CompressionType::GZIP;
case GRPC_COMPRESS_DEFLATE:
return CompressionType::DEFLATE;
default:
GPR_ASSERT(false);
}
}
void NoopChecks(const InteropClientContextInspector& inspector,
const SimpleRequest* request, const SimpleResponse* response) {}
void CompressionChecks(const InteropClientContextInspector& inspector,
const SimpleRequest* request,
const SimpleResponse* response) {
GPR_ASSERT(request->response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
inspector.GetCallCompressionAlgorithm()));
if (request->response_compression() == NONE) {
const grpc_compression_algorithm received_compression =
inspector.GetCallCompressionAlgorithm();
if (request->request_compressed_response() &&
received_compression == GRPC_COMPRESS_NONE) {
if (request->request_compressed_response() &&
received_compression == GRPC_COMPRESS_NONE) {
// Requested some compression, got NONE. This is an error.
gpr_log(GPR_ERROR,
"Failure: Requested compression but got uncompressed response "
"from server.");
abort();
}
}
if (!request->request_compressed_response()) {
GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
} else if (request->response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should always
@ -211,20 +205,22 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request,
custom_checks_fn(inspector, request, response);
// Payload related checks.
if (request->response_type() != PayloadType::RANDOM) {
GPR_ASSERT(response->payload().type() == request->response_type());
}
GPR_ASSERT(response->payload().type() == request->response_type());
switch (response->payload().type()) {
case PayloadType::COMPRESSABLE:
GPR_ASSERT(response->payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
break;
case PayloadType::UNCOMPRESSABLE: {
std::ifstream rnd_file(kRandomFile);
GPR_ASSERT(rnd_file.good());
for (int i = 0; i < kLargeResponseSize; i++) {
GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
}
// We don't really check anything: We can't assert that the payload is
// uncompressed because it's the server's prerogative to decide on that,
// and different implementations decide differently (ie, Java always
// compresses when requested to do so, whereas C core throws away the
// compressed payload if the output is larger than the input).
// In addition, we don't compare the actual random bytes received because
// asserting that data is sent/received properly isn't the purpose of this
// test. Moreover, different implementations are also free to use
// different sets of random bytes.
} break;
default:
GPR_ASSERT(false);
@ -341,13 +337,13 @@ bool InteropClient::DoLargeUnary() {
}
bool InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
const bool request_compression[] = {false, true};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_types[j]).c_str(),
request_compression[j] ? "true" : "false",
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.",
@ -355,7 +351,7 @@ bool InteropClient::DoLargeCompressedUnary() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
request.set_request_compressed_response(request_compression[j]);
if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
@ -452,23 +448,23 @@ bool InteropClient::DoResponseStreaming() {
}
bool InteropClient::DoResponseCompressedStreaming() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
const bool request_compression[] = {false, true};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
ClientContext context;
InteropClientContextInspector inspector(context);
StreamingOutputCallRequest request;
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_types[j]).c_str(),
request_compression[j] ? "true" : "false",
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix);
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
request.set_request_compressed_response(request_compression[j]);
for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
ResponseParameters* response_parameter =
@ -483,37 +479,32 @@ bool InteropClient::DoResponseCompressedStreaming() {
size_t k = 0;
while (stream->Read(&response)) {
// Payload related checks.
if (request.response_type() != PayloadType::RANDOM) {
GPR_ASSERT(response.payload().type() == request.response_type());
}
GPR_ASSERT(response.payload().type() == request.response_type());
switch (response.payload().type()) {
case PayloadType::COMPRESSABLE:
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[k], '\0'));
break;
case PayloadType::UNCOMPRESSABLE: {
std::ifstream rnd_file(kRandomFile);
GPR_ASSERT(rnd_file.good());
for (int n = 0; n < response_stream_sizes[k]; n++) {
GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
}
} break;
case PayloadType::UNCOMPRESSABLE:
break;
default:
GPR_ASSERT(false);
}
// Compression related checks.
GPR_ASSERT(request.response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
inspector.GetCallCompressionAlgorithm()));
if (request.response_compression() == NONE) {
if (request.request_compressed_response()) {
GPR_ASSERT(inspector.GetCallCompressionAlgorithm() >
GRPC_COMPRESS_NONE);
if (request.response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should
// always be compressed.
GPR_ASSERT(inspector.GetMessageFlags() &
GRPC_WRITE_INTERNAL_COMPRESS);
}
} else {
// requested *no* compression.
GPR_ASSERT(
!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
} else if (request.response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should
// always be compressed.
GPR_ASSERT(inspector.GetMessageFlags() &
GRPC_WRITE_INTERNAL_COMPRESS);
}
++k;

@ -110,14 +110,7 @@ void MaybeEchoMetadata(ServerContext* context) {
}
}
bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type;
if (type == PayloadType::RANDOM) {
response_type =
rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
} else {
response_type = type;
}
bool SetPayload(PayloadType response_type, int size, Payload* payload) {
payload->set_type(response_type);
switch (response_type) {
case PayloadType::COMPRESSABLE: {
@ -141,18 +134,9 @@ bool SetPayload(PayloadType type, int size, Payload* payload) {
template <typename RequestType>
void SetResponseCompression(ServerContext* context,
const RequestType& request) {
switch (request.response_compression()) {
case grpc::testing::NONE:
context->set_compression_algorithm(GRPC_COMPRESS_NONE);
break;
case grpc::testing::GZIP:
context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
break;
case grpc::testing::DEFLATE:
context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE);
break;
default:
abort();
if (request.request_compressed_response()) {
// Any level would do, let's go for HIGH because we are overachievers.
context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
}
}

@ -770,6 +770,7 @@ include/grpc++/generic/generic_stub.h \
include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/core_codegen.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/method_handler_impl.h \
include/grpc++/impl/rpc_method.h \

@ -770,6 +770,7 @@ include/grpc++/generic/generic_stub.h \
include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/core_codegen.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/method_handler_impl.h \
include/grpc++/impl/rpc_method.h \
@ -855,8 +856,8 @@ include/grpc/impl/codegen/sync_generic.h \
include/grpc/impl/codegen/sync_posix.h \
include/grpc/impl/codegen/sync_windows.h \
include/grpc/impl/codegen/time.h \
include/grpc++/impl/codegen/core_codegen.h \
src/cpp/client/secure_credentials.h \
src/cpp/common/core_codegen.h \
src/cpp/common/secure_auth_context.h \
src/cpp/server/secure_server_credentials.h \
src/cpp/client/create_channel_internal.h \

File diff suppressed because it is too large Load Diff

@ -395,8 +395,8 @@ class PythonLanguage:
# categories=[SMOKETEST])
yield _ping_pong_scenario(
'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER')
yield _ping_pong_scenario(
'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
@ -413,9 +413,9 @@ class PythonLanguage:
unconstrained_client='sync')
yield _ping_pong_scenario(
'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='SYNC_SERVER',
unconstrained_client='async')
'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',

@ -317,7 +317,7 @@ class PythonLanguage:
'PYTHONPATH': '{}/src/python/gens'.format(DOCKER_WORKDIR_ROOT)}
def unimplemented_test_cases(self):
return _SKIP_ADVANCED + _SKIP_COMPRESSION + ['jwt_token_creds']
return _SKIP_ADVANCED + _SKIP_COMPRESSION
def unimplemented_test_cases_server(self):
return _SKIP_ADVANCED + _SKIP_COMPRESSION

@ -4261,18 +4261,18 @@
"grpc++_codegen_base_src"
],
"headers": [
"include/grpc++/impl/codegen/core_codegen.h",
"src/cpp/client/secure_credentials.h",
"src/cpp/common/core_codegen.h",
"src/cpp/common/secure_auth_context.h",
"src/cpp/server/secure_server_credentials.h"
],
"language": "c++",
"name": "grpc++",
"src": [
"include/grpc++/impl/codegen/core_codegen.h",
"src/cpp/client/secure_credentials.cc",
"src/cpp/client/secure_credentials.h",
"src/cpp/common/auth_property_iterator.cc",
"src/cpp/common/core_codegen.h",
"src/cpp/common/secure_auth_context.cc",
"src/cpp/common/secure_auth_context.h",
"src/cpp/common/secure_channel_arguments.cc",
@ -6405,6 +6405,7 @@
"include/grpc++/grpc++.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/core_codegen.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/method_handler_impl.h",
"include/grpc++/impl/rpc_method.h",
@ -6440,7 +6441,6 @@
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/time.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/core_codegen.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h"
],
@ -6457,6 +6457,7 @@
"include/grpc++/grpc++.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/core_codegen.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/method_handler_impl.h",
"include/grpc++/impl/rpc_method.h",
@ -6502,7 +6503,6 @@
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/core_codegen.h",
"src/cpp/common/rpc_method.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",

@ -268,6 +268,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\grpc++.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\method_handler_impl.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\rpc_method.h" />
@ -355,8 +356,8 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\time.h" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\secure_credentials.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\core_codegen.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\secure_auth_context.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\secure_server_credentials.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />

@ -126,6 +126,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
@ -383,12 +386,12 @@
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\secure_credentials.h">
<Filter>src\cpp\client</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\core_codegen.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\secure_auth_context.h">
<Filter>src\cpp\common</Filter>
</ClInclude>

@ -268,6 +268,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\grpc++.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\method_handler_impl.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\rpc_method.h" />
@ -356,7 +357,6 @@
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\core_codegen.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
</ItemGroup>

@ -111,6 +111,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\core_codegen.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
@ -371,9 +374,6 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h">
<Filter>src\cpp\client</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\core_codegen.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h">
<Filter>src\cpp\server</Filter>
</ClInclude>

Loading…
Cancel
Save