Merge pull request #2370 from ctiller/now-i-get-to-show-it-to-EVERYBODY

User agent string sending support
pull/2604/head
Yang Gao 9 years ago
commit 61e647819c
  1. 12
      include/grpc++/channel_arguments.h
  2. 6
      include/grpc/grpc.h
  3. 12
      include/grpc/support/port_platform.h
  4. 61
      src/core/channel/http_client_filter.c
  5. 7
      src/core/surface/call.h
  6. 8
      src/core/surface/call_log_batch.c
  7. 2
      src/core/surface/server.c
  8. 2
      src/core/transport/chttp2/parsing.c
  9. 38
      src/cpp/client/channel_arguments.cc
  10. 9
      src/cpp/client/create_channel.cc
  11. 1
      src/php/ext/grpc/server.c
  12. 2
      src/php/tests/unit_tests/EndToEndTest.php
  13. 1
      src/php/tests/unit_tests/SecureEndToEndTest.php
  14. 5
      src/python/src/grpc/_adapter/_low_test.py
  15. 9
      src/python/src/grpc/_links/_transmission_test.py
  16. 11
      src/python/src/grpc/framework/interfaces/links/test_cases.py
  17. 18
      src/ruby/spec/generic/rpc_server_spec.rb
  18. 4
      test/core/end2end/dualstack_socket_test.c
  19. 8
      test/cpp/end2end/async_end2end_test.cc
  20. 7
      test/cpp/end2end/end2end_test.cc
  21. 10
      test/cpp/qps/qps_test.cc

@ -54,6 +54,14 @@ class ChannelArguments {
ChannelArguments() {} ChannelArguments() {}
~ChannelArguments() {} ~ChannelArguments() {}
ChannelArguments(const ChannelArguments& other);
ChannelArguments& operator=(ChannelArguments other) {
Swap(other);
return *this;
}
void Swap(ChannelArguments& other);
// grpc specific channel argument setters // grpc specific channel argument setters
// Set target name override for SSL host name checking. // Set target name override for SSL host name checking.
void SetSslTargetNameOverride(const grpc::string& name); void SetSslTargetNameOverride(const grpc::string& name);
@ -73,10 +81,6 @@ class ChannelArguments {
friend class SecureCredentials; friend class SecureCredentials;
friend class testing::ChannelArgumentsTest; friend class testing::ChannelArgumentsTest;
// TODO(yangg) implement copy and assign
ChannelArguments(const ChannelArguments&);
ChannelArguments& operator=(const ChannelArguments&);
// Returns empty string when it is not set. // Returns empty string when it is not set.
grpc::string GetSslTargetNameOverride() const; grpc::string GetSslTargetNameOverride() const;

@ -126,6 +126,12 @@ typedef struct {
/** Initial sequence number for http2 transports */ /** Initial sequence number for http2 transports */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
"grpc.http2.initial_sequence_number" "grpc.http2.initial_sequence_number"
/** Primary user agent: goes at the start of the user-agent metadata
sent on each request */
#define GRPC_ARG_PRIMARY_USER_AGENT_STRING "grpc.primary_user_agent"
/** Secondary user agent: goes at the end of the user-agent metadata
sent on each request */
#define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent"
/** Connectivity state of a channel. */ /** Connectivity state of a channel. */
typedef enum { typedef enum {

@ -71,6 +71,7 @@
#if !defined(GPR_NO_AUTODETECT_PLATFORM) #if !defined(GPR_NO_AUTODETECT_PLATFORM)
#if defined(_WIN64) || defined(WIN64) #if defined(_WIN64) || defined(WIN64)
#define GPR_PLATFORM_STRING "windows"
#define GPR_WIN32 1 #define GPR_WIN32 1
#define GPR_ARCH_64 1 #define GPR_ARCH_64 1
#define GPR_GETPID_IN_PROCESS_H 1 #define GPR_GETPID_IN_PROCESS_H 1
@ -84,6 +85,7 @@
#endif #endif
#define GPR_WINDOWS_CRASH_HANDLER 1 #define GPR_WINDOWS_CRASH_HANDLER 1
#elif defined(_WIN32) || defined(WIN32) #elif defined(_WIN32) || defined(WIN32)
#define GPR_PLATFORM_STRING "windows"
#define GPR_ARCH_32 1 #define GPR_ARCH_32 1
#define GPR_WIN32 1 #define GPR_WIN32 1
#define GPR_GETPID_IN_PROCESS_H 1 #define GPR_GETPID_IN_PROCESS_H 1
@ -97,6 +99,7 @@
#endif #endif
#define GPR_WINDOWS_CRASH_HANDLER 1 #define GPR_WINDOWS_CRASH_HANDLER 1
#elif defined(ANDROID) || defined(__ANDROID__) #elif defined(ANDROID) || defined(__ANDROID__)
#define GPR_PLATFORM_STRING "android"
#define GPR_ANDROID 1 #define GPR_ANDROID 1
#define GPR_ARCH_32 1 #define GPR_ARCH_32 1
#define GPR_CPU_LINUX 1 #define GPR_CPU_LINUX 1
@ -117,6 +120,7 @@
#define GPR_GETPID_IN_UNISTD_H 1 #define GPR_GETPID_IN_UNISTD_H 1
#define GPR_HAVE_MSG_NOSIGNAL 1 #define GPR_HAVE_MSG_NOSIGNAL 1
#elif defined(__linux__) #elif defined(__linux__)
#define GPR_PLATFORM_STRING "linux"
#ifndef _BSD_SOURCE #ifndef _BSD_SOURCE
#define _BSD_SOURCE #define _BSD_SOURCE
#endif #endif
@ -173,9 +177,11 @@
#define _BSD_SOURCE #define _BSD_SOURCE
#endif #endif
#if TARGET_OS_IPHONE #if TARGET_OS_IPHONE
#define GPR_PLATFORM_STRING "ios"
#define GPR_CPU_IPHONE 1 #define GPR_CPU_IPHONE 1
#define GPR_PTHREAD_TLS 1 #define GPR_PTHREAD_TLS 1
#else /* TARGET_OS_IPHONE */ #else /* TARGET_OS_IPHONE */
#define GPR_PLATFORM_STRING "osx"
#define GPR_CPU_POSIX 1 #define GPR_CPU_POSIX 1
#define GPR_GCC_TLS 1 #define GPR_GCC_TLS 1
#endif #endif
@ -201,6 +207,7 @@
#define GPR_ARCH_32 1 #define GPR_ARCH_32 1
#endif /* _LP64 */ #endif /* _LP64 */
#elif defined(__FreeBSD__) #elif defined(__FreeBSD__)
#define GPR_PLATFORM_STRING "freebsd"
#ifndef _BSD_SOURCE #ifndef _BSD_SOURCE
#define _BSD_SOURCE #define _BSD_SOURCE
#endif #endif
@ -232,6 +239,11 @@
#endif #endif
#endif /* GPR_NO_AUTODETECT_PLATFORM */ #endif /* GPR_NO_AUTODETECT_PLATFORM */
#ifndef GPR_PLATFORM_STRING
#warning "GPR_PLATFORM_STRING not auto-detected"
#define GPR_PLATFORM_STRING "unknown"
#endif
/* For a common case, assume that the platform has a C99-like stdint.h */ /* For a common case, assume that the platform has a C99-like stdint.h */
#include <stdint.h> #include <stdint.h>

@ -32,13 +32,17 @@
#include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_client_filter.h"
#include <string.h> #include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
typedef struct call_data { typedef struct call_data {
grpc_linked_mdelem method; grpc_linked_mdelem method;
grpc_linked_mdelem scheme; grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers; grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type; grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
int sent_initial_metadata; int sent_initial_metadata;
int got_initial_metadata; int got_initial_metadata;
@ -58,6 +62,8 @@ typedef struct channel_data {
grpc_mdelem *scheme; grpc_mdelem *scheme;
grpc_mdelem *content_type; grpc_mdelem *content_type;
grpc_mdelem *status; grpc_mdelem *status;
/** complete user agent mdelem */
grpc_mdelem *user_agent;
} channel_data; } channel_data;
/* used to silence 'variable not used' warnings */ /* used to silence 'variable not used' warnings */
@ -115,6 +121,8 @@ static void hc_mutate_op(grpc_call_element *elem,
GRPC_MDELEM_REF(channeld->te_trailers)); GRPC_MDELEM_REF(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
GRPC_MDELEM_REF(channeld->content_type)); GRPC_MDELEM_REF(channeld->content_type));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent,
GRPC_MDELEM_REF(channeld->user_agent));
break; break;
} }
} }
@ -169,6 +177,55 @@ static const char *scheme_from_args(const grpc_channel_args *args) {
return "http"; return "http";
} }
static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
const grpc_channel_args *args) {
gpr_strvec v;
size_t i;
int is_first = 1;
char *tmp;
grpc_mdstr *result;
gpr_strvec_init(&v);
for (i = 0; args && i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
GRPC_ARG_PRIMARY_USER_AGENT_STRING);
} else {
if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
is_first = 0;
gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
}
}
}
gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ",
grpc_version_string(), GPR_PLATFORM_STRING);
is_first = 0;
gpr_strvec_add(&v, tmp);
for (i = 0; args && i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
GRPC_ARG_SECONDARY_USER_AGENT_STRING);
} else {
if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
is_first = 0;
gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
}
}
}
tmp = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
result = grpc_mdstr_from_string(mdctx, tmp);
gpr_free(tmp);
return result;
}
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx, const grpc_channel_args *args, grpc_mdctx *mdctx,
@ -189,6 +246,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->content_type = channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
channeld->user_agent = grpc_mdelem_from_metadata_strings(
mdctx, grpc_mdstr_from_string(mdctx, "user-agent"),
user_agent_from_args(mdctx, args));
} }
/* Destructor for channel data */ /* Destructor for channel data */
@ -201,6 +261,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDELEM_UNREF(channeld->scheme); GRPC_MDELEM_UNREF(channeld->scheme);
GRPC_MDELEM_UNREF(channeld->content_type); GRPC_MDELEM_UNREF(channeld->content_type);
GRPC_MDELEM_UNREF(channeld->status); GRPC_MDELEM_UNREF(channeld->status);
GRPC_MDELEM_UNREF(channeld->user_agent);
} }
const grpc_channel_filter grpc_http_client_filter = { const grpc_channel_filter grpc_http_client_filter = {

@ -134,6 +134,10 @@ void grpc_server_log_request_call(char *file, int line,
grpc_completion_queue *cq_for_notification, grpc_completion_queue *cq_for_notification,
void *tag); void *tag);
void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity,
grpc_server *server, grpc_completion_queue *cq,
void *tag);
/* Set a context pointer. /* Set a context pointer.
No thread safety guarantees are made wrt this value. */ No thread safety guarantees are made wrt this value. */
void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
@ -151,6 +155,9 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
grpc_server_log_request_call(sev, server, call, details, initial_metadata, \ grpc_server_log_request_call(sev, server, call, details, initial_metadata, \
cq_bound_to_call, cq_for_notifications, tag) cq_bound_to_call, cq_for_notifications, tag)
#define GRPC_SERVER_LOG_SHUTDOWN(sev, server, cq, tag) \
if (grpc_trace_batch) grpc_server_log_shutdown(sev, server, cq, tag)
gpr_uint8 grpc_call_is_client(grpc_call *call); gpr_uint8 grpc_call_is_client(grpc_call *call);
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ #endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */

@ -136,3 +136,11 @@ void grpc_server_log_request_call(char *file, int line,
"tag=%p)", server, call, details, initial_metadata, "tag=%p)", server, call, details, initial_metadata,
cq_bound_to_call, cq_for_notification, tag); cq_bound_to_call, cq_for_notification, tag);
} }
void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity,
grpc_server *server, grpc_completion_queue *cq,
void *tag) {
gpr_log(file, line, severity,
"grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", server,
cq, tag);
}

@ -980,6 +980,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
channel_broadcaster broadcaster; channel_broadcaster broadcaster;
request_killer reqkill; request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
/* lock, and gather up some stuff to do */ /* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global); gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq); grpc_cq_begin_op(cq);

@ -588,7 +588,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(stream_parsing); GPR_ASSERT(stream_parsing);
GRPC_CHTTP2_IF_TRACING(gpr_log( GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", stream_parsing->id,
transport_parsing->is_client ? "CLI" : "SVR", transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));

@ -33,10 +33,48 @@
#include <grpc++/channel_arguments.h> #include <grpc++/channel_arguments.h>
#include <grpc/support/log.h>
#include "src/core/channel/channel_args.h" #include "src/core/channel/channel_args.h"
namespace grpc { namespace grpc {
ChannelArguments::ChannelArguments(const ChannelArguments& other)
: strings_(other.strings_) {
args_.reserve(other.args_.size());
auto list_it_dst = strings_.begin();
auto list_it_src = other.strings_.begin();
for (auto a = other.args_.begin(); a != other.args_.end(); ++a) {
grpc_arg ap;
ap.type = a->type;
GPR_ASSERT(list_it_src->c_str() == a->key);
ap.key = const_cast<char*>(list_it_dst->c_str());
++list_it_src;
++list_it_dst;
switch (a->type) {
case GRPC_ARG_INTEGER:
ap.value.integer = a->value.integer;
break;
case GRPC_ARG_STRING:
GPR_ASSERT(list_it_src->c_str() == a->value.string);
ap.value.string = const_cast<char*>(list_it_dst->c_str());
++list_it_src;
++list_it_dst;
break;
case GRPC_ARG_POINTER:
ap.value.pointer = a->value.pointer;
ap.value.pointer.p = a->value.pointer.copy(ap.value.pointer.p);
break;
}
args_.push_back(ap);
}
}
void ChannelArguments::Swap(ChannelArguments& other) {
args_.swap(other.args_);
strings_.swap(other.strings_);
}
void ChannelArguments::SetCompressionAlgorithm( void ChannelArguments::SetCompressionAlgorithm(
grpc_compression_algorithm algorithm) { grpc_compression_algorithm algorithm) {
SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm); SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm);

@ -32,9 +32,11 @@
*/ */
#include <memory> #include <memory>
#include <sstream>
#include "src/cpp/client/channel.h" #include "src/cpp/client/channel.h"
#include <grpc++/channel_interface.h> #include <grpc++/channel_interface.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/create_channel.h> #include <grpc++/create_channel.h>
namespace grpc { namespace grpc {
@ -43,7 +45,12 @@ class ChannelArguments;
std::shared_ptr<ChannelInterface> CreateChannel( std::shared_ptr<ChannelInterface> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds, const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args) { const ChannelArguments& args) {
return creds ? creds->CreateChannel(target, args) ChannelArguments cp_args = args;
std::ostringstream user_agent_prefix;
user_agent_prefix << "grpc-c++/" << grpc_version_string();
cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
: std::shared_ptr<ChannelInterface>( : std::shared_ptr<ChannelInterface>(
new Channel(target, grpc_lame_client_channel_create())); new Channel(target, grpc_lame_client_channel_create()));
} }

@ -64,6 +64,7 @@ void free_wrapped_grpc_server(void *object TSRMLS_DC) {
wrapped_grpc_server *server = (wrapped_grpc_server *)object; wrapped_grpc_server *server = (wrapped_grpc_server *)object;
if (server->wrapped != NULL) { if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL); grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL);
grpc_server_cancel_all_calls(server->wrapped);
grpc_completion_queue_pluck(completion_queue, NULL, grpc_completion_queue_pluck(completion_queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_server_destroy(server->wrapped); grpc_server_destroy(server->wrapped);

@ -61,7 +61,6 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$event = $this->server->requestCall(); $event = $this->server->requestCall();
$this->assertSame('dummy_method', $event->method); $this->assertSame('dummy_method', $event->method);
$this->assertSame([], $event->metadata);
$server_call = $event->call; $server_call = $event->call;
$event = $server_call->startBatch([ $event = $server_call->startBatch([
@ -83,7 +82,6 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
Grpc\OP_RECV_STATUS_ON_CLIENT => true Grpc\OP_RECV_STATUS_ON_CLIENT => true
]); ]);
$this->assertSame([], $event->metadata);
$status = $event->status; $status = $event->status;
$this->assertSame([], $status->metadata); $this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code); $this->assertSame(Grpc\STATUS_OK, $status->code);

@ -73,7 +73,6 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$event = $this->server->requestCall(); $event = $this->server->requestCall();
$this->assertSame('dummy_method', $event->method); $this->assertSame('dummy_method', $event->method);
$this->assertSame([], $event->metadata);
$server_call = $event->call; $server_call = $event->call;
$event = $server_call->startBatch([ $event = $server_call->startBatch([

@ -129,7 +129,10 @@ class InsecureServerInsecureClient(unittest.TestCase):
self.assertIsInstance(request_event.call, _low.Call) self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag) self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results)) self.assertEquals(1, len(request_event.results))
self.assertEquals(dict(client_initial_metadata), dict(request_event.results[0].initial_metadata)) got_initial_metadata = dict(request_event.results[0].initial_metadata)
self.assertEquals(
dict(client_initial_metadata),
dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
self.assertEquals(METHOD, request_event.call_details.method) self.assertEquals(METHOD, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host) self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)

@ -93,8 +93,13 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def create_service_completion(self): def create_service_completion(self):
return _intermediary_low.Code.OK, 'An exuberant test "details" message!' return _intermediary_low.Code.OK, 'An exuberant test "details" message!'
def assertMetadataEqual(self, original_metadata, transmitted_metadata): def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
self.assertSequenceEqual(original_metadata, transmitted_metadata) # we need to filter out any additional metadata added in transmitted_metadata
# since implementations are allowed to add to what is sent (in any position)
keys, _ = zip(*original_metadata)
self.assertSequenceEqual(
original_metadata,
[x for x in transmitted_metadata if x[0] in keys])
class RoundTripTest(unittest.TestCase): class RoundTripTest(unittest.TestCase):

@ -161,8 +161,8 @@ class TransmissionTest(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def assertMetadataEqual(self, original_metadata, transmitted_metadata): def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
"""Asserts that two metadata objects are equal. """Asserts that transmitted_metadata contains original_metadata.
Args: Args:
original_metadata: A metadata object used in this test. original_metadata: A metadata object used in this test.
@ -170,7 +170,8 @@ class TransmissionTest(object):
through the system under test. through the system under test.
Raises: Raises:
AssertionError: if the two metadata objects are not equal. AssertionError: if the transmitted_metadata object does not contain
original_metadata.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -239,7 +240,7 @@ class TransmissionTest(object):
self.assertFalse(initial_metadata_seen) self.assertFalse(initial_metadata_seen)
self.assertFalse(seen_payloads) self.assertFalse(seen_payloads)
self.assertFalse(terminal_metadata_seen) self.assertFalse(terminal_metadata_seen)
self.assertMetadataEqual(initial_metadata, ticket.initial_metadata) self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata)
initial_metadata_seen = True initial_metadata_seen = True
if ticket.payload is not None: if ticket.payload is not None:
@ -248,7 +249,7 @@ class TransmissionTest(object):
if ticket.terminal_metadata is not None: if ticket.terminal_metadata is not None:
self.assertFalse(terminal_metadata_seen) self.assertFalse(terminal_metadata_seen)
self.assertMetadataEqual(terminal_metadata, ticket.terminal_metadata) self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata)
terminal_metadata_seen = True terminal_metadata_seen = True
self.assertSequenceEqual(payloads, seen_payloads) self.assertSequenceEqual(payloads, seen_payloads)

@ -35,6 +35,14 @@ def load_test_certs
files.map { |f| File.open(File.join(test_root, f)).read } files.map { |f| File.open(File.join(test_root, f)).read }
end end
def check_md(wanted_md, received_md)
wanted_md.zip(received_md).each do |w, r|
w.each do |key, value|
expect(r[key]).to eq(value)
end
end
end
# A test message # A test message
class EchoMsg class EchoMsg
def self.marshal(_o) def self.marshal(_o)
@ -376,7 +384,7 @@ describe GRPC::RpcServer do
stub = EchoStub.new(@host, **client_opts) stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md) check_md(wanted_md, service.received_md)
@srv.stop @srv.stop
t.join t.join
end end
@ -391,7 +399,7 @@ describe GRPC::RpcServer do
deadline = service.delay + 1.0 # wait for long enough deadline = service.delay + 1.0 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md) check_md(wanted_md, service.received_md)
@srv.stop @srv.stop
t.join t.join
end end
@ -443,7 +451,7 @@ describe GRPC::RpcServer do
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2',
'jwt_aud_uri' => "https://#{@host}/EchoService" }] 'jwt_aud_uri' => "https://#{@host}/EchoService" }]
expect(service.received_md).to eq(wanted_md) check_md(wanted_md, service.received_md)
@srv.stop @srv.stop
t.join t.join
end end
@ -535,7 +543,9 @@ describe GRPC::RpcServer do
'method' => '/EchoService/an_rpc', 'method' => '/EchoService/an_rpc',
'connect_k1' => 'connect_v1' 'connect_k1' => 'connect_v1'
} }
expect(op.metadata).to eq(wanted_md) wanted_md.each do |key, value|
expect(op.metadata[key]).to eq(value)
end
@srv.stop @srv.stop
t.join t.join
end end

@ -211,6 +211,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
drain_cq(cq); drain_cq(cq);
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details); grpc_call_details_destroy(&call_details);
gpr_free(details); gpr_free(details);
} }

@ -415,7 +415,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -563,7 +563,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
@ -574,7 +574,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size()); EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
@ -590,7 +590,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second); EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);
EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size()); EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
} }
} // namespace } // namespace
} // namespace testing } // namespace testing

@ -249,9 +249,10 @@ class End2endTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() { void ResetStub() {
std::shared_ptr<ChannelInterface> channel = ChannelArguments args;
CreateChannel(server_address_.str(), FakeTransportSecurityCredentials(), args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
ChannelArguments()); std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), FakeTransportSecurityCredentials(), args);
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
} }

@ -44,8 +44,8 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
static const int WARMUP = 5; static const int WARMUP = 20;
static const int BENCHMARK = 10; static const int BENCHMARK = 40;
static void RunQPS() { static void RunQPS() {
gpr_log(GPR_INFO, "Running QPS test"); gpr_log(GPR_INFO, "Running QPS test");
@ -53,8 +53,8 @@ static void RunQPS() {
ClientConfig client_config; ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT); client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false); client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_outstanding_rpcs_per_channel(10);
client_config.set_client_channels(8); client_config.set_client_channels(800);
client_config.set_payload_size(1); client_config.set_payload_size(1);
client_config.set_async_client_threads(8); client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY); client_config.set_rpc_type(UNARY);
@ -62,7 +62,7 @@ static void RunQPS() {
ServerConfig server_config; ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER); server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false); server_config.set_enable_ssl(false);
server_config.set_threads(4); server_config.set_threads(8);
const auto result = const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

Loading…
Cancel
Save