Merge branch 'new_op' into hpack_fix

pull/4033/head
Craig Tiller 9 years ago
commit 83e73373cb
  1. 140
      Makefile
  2. 23
      build.yaml
  3. 91
      doc/PROTOCOL-HTTP2.md
  4. 2
      include/grpc/support/histogram.h
  5. 3
      src/core/channel/channel_stack.h
  6. 19
      src/core/channel/subchannel_call_holder.h
  7. 8
      src/core/client_config/subchannel.c
  8. 11
      src/core/client_config/subchannel.h
  9. 2
      src/core/support/histogram.c
  10. 2
      src/core/surface/call.c
  11. 13
      src/core/transport/chttp2/frame_data.c
  12. 3
      src/core/transport/chttp2/frame_data.h
  13. 2
      src/core/transport/chttp2/internal.h
  14. 88
      src/core/transport/chttp2_transport.c
  15. 120
      test/core/client_config/lb_policies_test.c
  16. 70
      test/core/network_benchmarks/low_level_ping_pong.c
  17. 4
      test/core/surface/byte_buffer_reader_test.c
  18. 9
      test/cpp/qps/async_streaming_ping_pong_test.cc
  19. 9
      test/cpp/qps/async_unary_ping_pong_test.cc
  20. 154
      test/cpp/qps/client.h
  21. 41
      test/cpp/qps/client_async.cc
  22. 2
      test/cpp/qps/client_sync.cc
  23. 27
      test/cpp/qps/driver.cc
  24. 8
      test/cpp/qps/driver.h
  25. 4
      test/cpp/qps/histogram.h
  26. 2
      test/cpp/qps/perf_db.proto
  27. 18
      test/cpp/qps/qps-sweep.sh
  28. 119
      test/cpp/qps/qps_driver.cc
  29. 2
      test/cpp/qps/qps_interarrival_test.cc
  30. 9
      test/cpp/qps/qps_openloop_test.cc
  31. 9
      test/cpp/qps/qps_test.cc
  32. 9
      test/cpp/qps/qps_test_with_poll.cc
  33. 54
      test/cpp/qps/qps_worker.cc
  34. 6
      test/cpp/qps/qps_worker.h
  35. 7
      test/cpp/qps/report.cc
  36. 1
      test/cpp/qps/report.h
  37. 84
      test/cpp/qps/secure_sync_unary_ping_pong_test.cc
  38. 52
      test/cpp/qps/server.h
  39. 35
      test/cpp/qps/server_async.cc
  40. 25
      test/cpp/qps/server_sync.cc
  41. 4
      test/cpp/qps/single_run_localhost.sh
  42. 12
      test/cpp/qps/sync_streaming_ping_pong_test.cc
  43. 12
      test/cpp/qps/sync_unary_ping_pong_test.cc
  44. 2
      test/cpp/qps/timer.cc
  45. 2
      test/cpp/qps/timer.h
  46. 5
      test/cpp/qps/worker.cc
  47. 148
      test/proto/benchmarks/control.proto
  48. 55
      test/proto/benchmarks/payloads.proto
  49. 55
      test/proto/benchmarks/services.proto
  50. 59
      test/proto/benchmarks/stats.proto
  51. BIN
      tools/http2_interop/http2_interop.test
  52. 137
      tools/http2_interop/http2interop.go
  53. 95
      tools/http2_interop/http2interop_test.go
  54. 1
      tools/jenkins/build_docker_and_run_tests.sh
  55. 1
      tools/jenkins/build_interop_image.sh
  56. 36
      tools/jenkins/grpc_interop_http2/Dockerfile
  57. 42
      tools/jenkins/grpc_interop_http2/build_interop.sh
  58. 23
      tools/run_tests/jobset.py
  59. 28
      tools/run_tests/report_utils.py
  60. 57
      tools/run_tests/run_interop_tests.py
  61. 61
      tools/run_tests/run_tests.py
  62. 29
      tools/run_tests/sources_and_headers.json
  63. 16
      tools/run_tests/tests.json
  64. 40
      vsprojects/vcxproj/qps/qps.vcxproj
  65. 17
      vsprojects/vcxproj/qps/qps.vcxproj.filters

File diff suppressed because one or more lines are too long

@ -753,7 +753,11 @@ libs:
- test/cpp/qps/timer.h
- test/cpp/util/benchmark_config.h
src:
- test/proto/qpstest.proto
- test/proto/messages.proto
- test/proto/benchmarks/control.proto
- test/proto/benchmarks/payloads.proto
- test/proto/benchmarks/services.proto
- test/proto/benchmarks/stats.proto
- test/cpp/qps/perf_db.proto
- test/cpp/qps/client_async.cc
- test/cpp/qps/client_sync.cc
@ -2030,6 +2034,23 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: secure_sync_unary_ping_pong_test
build: test
language: c++
src:
- test/cpp/qps/secure_sync_unary_ping_pong_test.cc
deps:
- qps
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
platforms:
- mac
- linux
- posix
- name: server_crash_test
build: test
language: c++

@ -10,58 +10,64 @@ Production rules are using <a href="http://tools.ietf.org/html/rfc5234">ABNF syn
The following is the general sequence of message atoms in a GRPC request & response message stream
* Request → Request-Headers *Delimited-Message EOS
* Response → (Response-Headers *Delimited-Message Trailers) / Trailers-Only
* Request → Request-Headers \*Length-Prefixed-Message EOS
* Response → (Response-Headers \*Length-Prefixed-Message Trailers) / Trailers-Only
### Requests
* Request → Request-Headers *Delimited-Message EOS
* Request → Request-Headers \*Length-Prefixed-Message EOS
Request-Headers are delivered as HTTP2 headers in HEADERS + CONTINUATION frames.
* **Request-Headers** → Call-Definition *Custom-Metadata
* **Call-Definition** → Method Scheme Path TE [Authority] [Timeout] [Content-Type] [Message-Type] [Message-Encoding] [Message-Accept-Encoding] [User-Agent]
* **Method**“:method POST”
* **Scheme**“:scheme ” (“http” / “https”)
* **Path**“:path” {_path identifying method within exposed API_}
* **Authority**“:authority” {_virtual host name of authority_}
* **TE**“te” “trailers” # Used to detect incompatible proxies
* **Timeout**“grpc-timeout” TimeoutValue TimeoutUnit
* **Request-Headers** → Call-Definition \*Custom-Metadata
* **Call-Definition** → Method Scheme Path TE [Authority] [Timeout] Content-Type [Message-Type] [Message-Encoding] [Message-Accept-Encoding] [User-Agent]
* **Method**":method POST"
* **Scheme**":scheme " ("http" / "https")
* **Path**":path" {_path identifying method within exposed API_}
* **Authority**":authority" {_virtual host name of authority_}
* **TE**"te" "trailers" # Used to detect incompatible proxies
* **Timeout**"grpc-timeout" TimeoutValue TimeoutUnit
* **TimeoutValue** → {_positive integer as ASCII string of at most 8 digits_}
* **TimeoutUnit** → Hour / Minute / Second / Millisecond / Microsecond / Nanosecond
* **Hour**“H”
* **Minute**“M”
* **Second**“S”
* **Millisecond**“m”
* **Microsecond**“u”
* **Nanosecond**“n”
* **Content-Type**“content-type” “application/grpc” [(“+proto” / “+json” / {_custom_})]
* **Content-Coding**“gzip” / “deflate” / “snappy” / {_custom_}
* **Message-Encoding**“grpc-encoding” Content-Coding
* **Message-Accept-Encoding**“grpc-accept-encoding” Content-Coding *("," Content-Coding)
* **User-Agent**“user-agent” {_structured user-agent string_}
* **Message-Type**“grpc-message-type” {_type name for message schema_}
* **Hour**"H"
* **Minute**"M"
* **Second**"S"
* **Millisecond**"m"
* **Microsecond**"u"
* **Nanosecond**"n"
* **Content-Type**"content-type" "application/grpc" [("+proto" / "+json" / {_custom_})]
* **Content-Coding**"identity" / "gzip" / "deflate" / "snappy" / {_custom_}
* **Message-Encoding**"grpc-encoding" Content-Coding
* **Message-Accept-Encoding**"grpc-accept-encoding" Content-Coding \*("," Content-Coding)
* **User-Agent**"user-agent" {_structured user-agent string_}
* **Message-Type**"grpc-message-type" {_type name for message schema_}
* **Custom-Metadata** → Binary-Header / ASCII-Header
* **Binary-Header** → {Header-Name “-bin” } {_base64 encoded value_}
* **Binary-Header** → {Header-Name "-bin" } {_base64 encoded value_}
* **ASCII-Header** → Header-Name {_value_}
* **Header-Name** → 1*( %x30-39 / %x61-7A / “_” / “-”) ; 0-9 a-z
* **Header-Name** → 1\*( %x30-39 / %x61-7A / "\_" / "-") ; 0-9 a-z \_ -
HTTP2 requires that reserved headers, ones starting with “:” appear before all other headers. Additionally implementations should send **Timeout** immediately after the reserved headers and they should send the **Call-Definition** headers before sending **Custom-Metadata**.
HTTP2 requires that reserved headers, ones starting with ":" appear before all other headers. Additionally implementations should send **Timeout** immediately after the reserved headers and they should send the **Call-Definition** headers before sending **Custom-Metadata**.
If **Timeout** is omitted a server should assume an infinite timeout. Client implementations are free to send a default minimum timeout based on their deployment requirements.
**Custom-Metadata** is an arbitrary set of key-value pairs defined by the application layer. Aside from transport limits on the total length of HTTP2 HEADERS the only other constraint is that header names starting with “grpc-” are reserved for future use.
**Custom-Metadata** is an arbitrary set of key-value pairs defined by the application layer. Header names starting with "grpc-" but not listed here are reserved for future GRPC use and should not be used by applications as **Custom-Metadata**.
Note that HTTP2 does not allow arbitrary octet sequences for header values so binary header values must be encoded using Base64 as per https://tools.ietf.org/html/rfc4648#section-4. Implementations MUST accept padded and un-padded values and should emit un-padded values. Applications define binary headers by having their names end with “-bin”. Runtime libraries use this suffix to detect binary headers and properly apply base64 encoding & decoding as headers are sent and received.
Note that HTTP2 does not allow arbitrary octet sequences for header values so binary header values must be encoded using Base64 as per https://tools.ietf.org/html/rfc4648#section-4. Implementations MUST accept padded and un-padded values and should emit un-padded values. Applications define binary headers by having their names end with "-bin". Runtime libraries use this suffix to detect binary headers and properly apply base64 encoding & decoding as headers are sent and received.
The repeated sequence of **Delimited-Message** items is delivered in DATA frames
Servers may limit the size of **Request-Headers**, with a default of 8 KiB
suggested. Implementations are encouraged to compute total header size like
HTTP/2's `SETTINGS_MAX_HEADER_LIST_SIZE`: the sum of all header fields, for each
field the sum of the uncompressed field name and value lengths plus 32, with
binary values' lengths being post-Base64.
* **Delimited-Message** → Compressed-Flag Message-Length Message
The repeated sequence of **Length-Prefixed-Message** items is delivered in DATA frames
* **Length-Prefixed-Message** → Compressed-Flag Message-Length Message
* **Compressed-Flag** → 0 / 1 # encoded as 1 byte unsigned integer
* **Message-Length** → {_length of Message_} # encoded as 4 byte unsigned integer
* **Message** → *{binary octet}
* **Message**\*{binary octet}
A **Compressed-Flag** value of 1 indicates that the binary octet sequence of **Message** is compressed using the mechanism declared by the **Message-Encoding** header. A value of 0 indicates that no encoding of **Message** bytes has occurred. Compression contexts are NOT maintained over message boundaries, implementations must create a new context for each message in the stream. If the **Message-Encoding** header is omitted then the **Compressed-Flag** must be 0.
@ -69,13 +75,13 @@ For requests, **EOS** (end-of-stream) is indicated by the presence of the END_ST
###Responses
* **Response** → (Response-Headers *Delimited-Message Trailers) / Trailers-Only
* **Response-Headers** → HTTP-Status [Message-Encoding] [Message-Accept-Encoding] Content-Type *Custom-Metadata
* **Response** → (Response-Headers \*Length-Prefixed-Message Trailers) / Trailers-Only
* **Response-Headers** → HTTP-Status [Message-Encoding] [Message-Accept-Encoding] Content-Type \*Custom-Metadata
* **Trailers-Only** → HTTP-Status Content-Type Trailers
* **Trailers** → Status [Status-Message] *Custom-Metadata
* **HTTP-Status**“:status 200”
* **Status**“grpc-status” <status-code-as-ASCII-string>
* **Status-Message**“grpc-message” <descriptive text for status as ASCII string>
* **Trailers** → Status [Status-Message] \*Custom-Metadata
* **HTTP-Status**":status 200"
* **Status**"grpc-status" <status-code-as-ASCII-string>
* **Status-Message**"grpc-message" <descriptive text for status as ASCII string>
**Response-Headers** & **Trailers-Only** are each delivered in a single HTTP2 HEADERS frame block. Most responses are expected to have both headers and trailers but **Trailers-Only** is permitted for calls that produce an immediate error. Status must be sent in **Trailers** even if the status code is OK.
@ -83,6 +89,9 @@ For responses end-of-stream is indicated by the presence of the END_STREAM flag
Implementations should expect broken deployments to send non-200 HTTP status codes in responses as well as a variety of non-GRPC content-types and to omit **Status** & **Status-Message**. Implementations must synthesize a **Status** & **Status-Message** to propagate to the application layer when this occurs.
Clients may limit the size of **Response-Headers**, **Trailers**, and
**Trailers-Only**, with a default of 8 KiB each suggested.
####Example
Sample unary-call showing HTTP2 framing sequence
@ -101,7 +110,7 @@ grpc-encoding = gzip
authorization = Bearer y235.wef315yfh138vh31hv93hv8h3v
DATA (flags = END_STREAM)
<Delimited Message>
<Length-Prefixed Message>
```
**Response**
```
@ -110,7 +119,7 @@ HEADERS (flags = END_HEADERS)
grpc-encoding = gzip
DATA
<Delimited Message>
<Length-Prefixed Message>
HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
@ -120,7 +129,7 @@ trace-proto-bin = jher831yy13JHy3hc
While the protocol does not require a user-agent to function it is recommended that clients provide a structured user-agent string that provides a basic description of the calling library, version & platform to facilitate issue diagnosis in heterogeneous environments. The following structure is recommended to library developers
```
User-Agent → “grpc-” Language ?(“-” Variant) “/” Version ?( “ (“ *(AdditionalProperty “;”) “)” )
User-Agent → "grpc-" Language ?("-" Variant) "/" Version ?( " (" *(AdditionalProperty ";") ")" )
```
E.g.
@ -136,7 +145,7 @@ grpc-java-android/0.9.1 (gingerbread/1.2.4; nexus5; tmobile)
All GRPC calls need to specify an internal ID. We will use HTTP2 stream-ids as call identifiers in this scheme. NOTE: These id’s are contextual to an open HTTP2 session and will not be unique within a given process that is handling more than one HTTP2 session nor can they be used as GUIDs.
#####Data Frames
DATA frame boundaries have no relation to **Delimited-Message** boundaries and implementations should make no assumptions about their alignment.
DATA frame boundaries have no relation to **Length-Prefixed-Message** boundaries and implementations should make no assumptions about their alignment.
#####Errors

@ -50,7 +50,7 @@ void gpr_histogram_add(gpr_histogram *h, double x);
/* The following merges the second histogram into the first. It only works
if they have the same buckets and resolution. Returns 0 on failure, 1
on success */
int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src);
int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src);
double gpr_histogram_percentile(gpr_histogram *histogram, double percentile);
double gpr_histogram_mean(gpr_histogram *histogram);

@ -214,7 +214,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
/* Ignore set pollset */
/* Ignore set pollset - used by filters to implement the set_pollset method
if they don't care about pollsets at all. Does nothing. */
void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset);

@ -36,6 +36,10 @@
#include "src/core/client_config/subchannel.h"
/** Pick a subchannel for grpc_subchannel_call_holder;
Return 1 if subchannel is available immediately (in which case on_ready
should not be called), or 0 otherwise (in which case on_ready should be
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel, grpc_closure *on_ready);
@ -46,10 +50,21 @@ typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
} grpc_subchannel_call_holder_creation_phase;
/** Wrapper for holding a pointer to grpc_subchannel_call, and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
for initial metadata before trying to create a call object,
and handling cancellation gracefully.
Both the channel and uchannel filter use this as their call_data. */
typedef struct grpc_subchannel_call_holder {
/* either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
/** Helper function to choose the subchannel on which to create
the call object. Channel filter delegates to the load
balancing policy (once it's ready); uchannel returns
immediately */
grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
void *pick_subchannel_arg;

@ -22,7 +22,7 @@
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
@ -395,12 +395,12 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
int call_creation_status;
int call_creation_finished_ok;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
call_creation_status = grpc_subchannel_create_call(
call_creation_finished_ok = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
GPR_ASSERT(call_creation_status == 1);
GPR_ASSERT(call_creation_finished_ok == 1);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);

@ -77,12 +77,11 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call (possibly asynchronously).
*
* If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
* return immediately and \a target will point to a connected \a subchannel_call
* instance. Note that \a notify will \em not be invoked in this case.
* Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
* subchannel call will be created asynchronously, invoking the \a notify
* callback upon completion. */
* If the returned status is 1, the call will return immediately and \a target
* will point to a connected \a subchannel_call instance. Note that \a notify
* will \em not be invoked in this case.
* Otherwise, if the returned status is 0, the subchannel call will be created
* asynchronously, invoking the \a notify callback upon completion. */
int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_pollset *pollset, gpr_atm *target,

@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) {
h->buckets[bucket_for(h, x)]++;
}
int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) {
int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) {
if ((dst->num_buckets != src->num_buckets) ||
(dst->multiplier != src->multiplier)) {
/* Fail because these histograms don't match */

@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(gpr_uint8)(bctl->call->used_batches &
~(gpr_uint8)(1 << (bctl - bctl->call->active_batches)));
gpr_mu_unlock(&call->mu);
grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,

@ -45,11 +45,16 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
parser->parsing_frame = NULL;
return GRPC_CHTTP2_PARSE_OK;
}
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
if (parser->parsing_frame) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
grpc_byte_stream_destroy(bs);
@ -198,8 +203,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(
transport_parsing, stream_parsing, p->frame_size, message_flags,
&p->incoming_frames);
exec_ctx, transport_parsing, stream_parsing, p->frame_size,
message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {

@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser);
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser);
/* start processing a new data frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(

@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,

@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_destroy(
@ -806,7 +806,8 @@ static void perform_stream_op_locked(
}
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
exec_ctx, &stream_global->send_trailing_metadata_finished,
grpc_metadata_batch_is_empty(op->send_trailing_metadata));
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
@ -1364,6 +1365,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
static void incoming_byte_stream_update_flow_control(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already) {
gpr_uint32 max_recv_bytes;
/* clamp max recv hint to an allowable size */
if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
} else {
max_recv_bytes = (gpr_uint32)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= have_already) {
max_recv_bytes -= (gpr_uint32)have_already;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
if (stream_global->max_recv_bytes < max_recv_bytes) {
gpr_uint32 add_max_recv_bytes =
max_recv_bytes - stream_global->max_recv_bytes;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_parse,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_writing,
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
@ -1372,41 +1413,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
gpr_uint32 max_recv_bytes;
lock(bs->transport);
if (bs->is_tail) {
/* clamp max recv hint to an allowable size */
if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
} else {
max_recv_bytes = (gpr_uint32)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= bs->slices.length) {
max_recv_bytes -= (gpr_uint32)bs->slices.length;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
if (stream_global->max_recv_bytes < max_recv_bytes) {
gpr_uint32 add_max_recv_bytes =
max_recv_bytes - stream_global->max_recv_bytes;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_parse,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_writing,
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(
transport_global, stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
incoming_byte_stream_update_flow_control(transport_global, stream_global,
max_size_hint, bs->slices.length);
}
if (bs->slices.count > 0) {
*slice = gpr_slice_buffer_take_first(&bs->slices);
@ -1451,7 +1462,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
@ -1474,6 +1485,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
if (frame_size == 0) {
lock(TRANSPORT_FROM_PARSING(transport_parsing));
incoming_byte_stream_update_flow_control(
&TRANSPORT_FROM_PARSING(transport_parsing)->global,
&STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
}
return incoming_byte_stream;
}

@ -119,14 +119,15 @@ static void test_spec_destroy(test_spec *spec) {
static void *tag(gpr_intptr t) { return (void *)t; }
static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
static gpr_timespec n_millis_time(int n) {
return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(n, GPR_TIMESPAN));
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL);
ev = grpc_completion_queue_next(cq, n_millis_time(5000), NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
@ -134,29 +135,47 @@ static void kill_server(const servers_fixture *f, size_t i) {
gpr_log(GPR_INFO, "KILLING SERVER %d", i);
GPR_ASSERT(f->servers[i] != NULL);
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
NULL).type == GRPC_OP_COMPLETE);
GPR_ASSERT(
grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
f->servers[i] = NULL;
}
static void revive_server(const servers_fixture *f, size_t i) {
typedef struct request_data {
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
char *details;
size_t details_capacity;
grpc_status_code status;
grpc_call_details *call_details;
} request_data;
static void revive_server(const servers_fixture *f, request_data *rdata,
size_t i) {
int got_port;
gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i);
GPR_ASSERT(f->servers[i] == NULL);
gpr_log(GPR_DEBUG, "revive: %s", f->servers_hostports[i]);
f->servers[i] = grpc_server_create(NULL, NULL);
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
f->servers[i], f->servers_hostports[i])) > 0);
grpc_server_start(f->servers[i]);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(f->servers[i], &f->server_calls[i],
&rdata->call_details[i],
&f->request_metadata_recv[i], f->cq,
f->cq, tag(1000 + (int)i)));
}
static servers_fixture *setup_servers(const char *server_host,
request_data *rdata,
const size_t num_servers) {
servers_fixture *f = gpr_malloc(sizeof(servers_fixture));
int *ports;
int got_port;
size_t i;
f->num_servers = num_servers;
@ -164,23 +183,16 @@ static servers_fixture *setup_servers(const char *server_host,
f->request_metadata_recv =
gpr_malloc(sizeof(grpc_metadata_array) * num_servers);
/* Create servers. */
ports = gpr_malloc(sizeof(int *) * num_servers);
f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers);
f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers);
f->cq = grpc_completion_queue_create(NULL);
for (i = 0; i < num_servers; i++) {
ports[i] = grpc_pick_unused_port_or_die();
gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]);
f->servers[i] = grpc_server_create(NULL, NULL);
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
f->servers[i], f->servers_hostports[i])) > 0);
GPR_ASSERT(ports[i] == got_port);
grpc_server_start(f->servers[i]);
grpc_metadata_array_init(&f->request_metadata_recv[i]);
gpr_join_host_port(&f->servers_hostports[i], server_host,
grpc_pick_unused_port_or_die());
f->servers[i] = 0;
revive_server(f, rdata, i);
}
gpr_free(ports);
return f;
}
@ -191,8 +203,8 @@ static void teardown_servers(servers_fixture *f) {
if (f->servers[i] == NULL) continue;
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
NULL).type == GRPC_OP_COMPLETE);
n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
}
grpc_completion_queue_shutdown(f->cq);
@ -203,6 +215,7 @@ static void teardown_servers(servers_fixture *f) {
for (i = 0; i < f->num_servers; i++) {
gpr_free(f->servers_hostports[i]);
grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
}
gpr_free(f->servers_hostports);
@ -211,22 +224,12 @@ static void teardown_servers(servers_fixture *f) {
gpr_free(f);
}
typedef struct request_data {
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
char *details;
size_t details_capacity;
grpc_status_code status;
grpc_call_details *call_details;
} request_data;
/** Returns connection sequence (server indices), which must be freed */
int *perform_request(servers_fixture *f, grpc_channel *client,
request_data *rdata, const test_spec *spec) {
grpc_call *c;
int s_idx;
int *s_valid;
gpr_timespec deadline;
grpc_op ops[6];
grpc_op *op;
int was_cancelled;
@ -237,7 +240,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
int completed_client;
s_valid = gpr_malloc(sizeof(int) * f->num_servers);
rdata->call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers);
connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters);
/* Send a trivial request. */
@ -253,7 +255,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
kill_server(f, i);
} else if (spec->revive_at[iter_num][i] != 0) {
/* killing takes precedence */
revive_server(f, i);
revive_server(f, rdata, i);
}
}
@ -266,9 +268,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
memset(s_valid, 0, f->num_servers * sizeof(int));
deadline = n_seconds_time(1);
c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq,
"/foo", "foo.test.google.fr", deadline, NULL);
"/foo", "foo.test.google.fr", gpr_inf_future(GPR_CLOCK_REALTIME),
NULL);
GPR_ASSERT(c);
completed_client = 0;
@ -300,22 +302,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL));
/* "listen" on all servers */
for (i = 0; i < f->num_servers; i++) {
grpc_metadata_array_init(&f->request_metadata_recv[i]);
if (f->servers[i] != NULL) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(f->servers[i], &f->server_calls[i],
&rdata->call_details[i],
&f->request_metadata_recv[i], f->cq,
f->cq, tag(1000 + (int)i)));
}
}
s_idx = -1;
while ((ev = grpc_completion_queue_next(
f->cq, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(300), NULL)).type !=
GRPC_QUEUE_TIMEOUT) {
while ((ev = grpc_completion_queue_next(f->cq, n_millis_time(s_idx == -1 ? 3000 : 200), NULL))
.type != GRPC_QUEUE_TIMEOUT) {
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
read_tag = ((int)(gpr_intptr)ev.tag);
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",
@ -327,11 +316,14 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_valid[s_idx] = 1;
connection_sequence[iter_num] = s_idx;
} else if (read_tag == 1) {
gpr_log(GPR_DEBUG, "client timed out");
GPR_ASSERT(ev.success);
completed_client = 1;
}
}
gpr_log(GPR_DEBUG, "s_idx=%d", s_idx);
if (s_idx >= 0) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@ -361,25 +353,30 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
cq_verify(cqv);
gpr_log(GPR_DEBUG, "status=%d; %s", rdata->status, rdata->details);
GPR_ASSERT(rdata->status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(rdata->details, "xyz"));
GPR_ASSERT(0 == strcmp(rdata->call_details[s_idx].method, "/foo"));
GPR_ASSERT(0 ==
strcmp(rdata->call_details[s_idx].host, "foo.test.google.fr"));
GPR_ASSERT(was_cancelled == 1);
grpc_call_destroy(f->server_calls[s_idx]);
/* ask for the next request on this server */
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
f->servers[s_idx], &f->server_calls[s_idx],
&rdata->call_details[s_idx],
&f->request_metadata_recv[s_idx], f->cq,
f->cq, tag(1000 + (int)s_idx)));
} else {
grpc_call_cancel(c, NULL);
if (!completed_client) {
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
}
}
for (i = 0; i < f->num_servers; i++) {
if (s_valid[i] != 0) {
grpc_call_destroy(f->server_calls[i]);
}
grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
}
grpc_metadata_array_destroy(&rdata->initial_metadata_recv);
grpc_metadata_array_destroy(&rdata->trailing_metadata_recv);
@ -393,7 +390,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
gpr_free(rdata->details);
}
gpr_free(rdata->call_details);
gpr_free(s_valid);
return connection_sequence;
@ -456,7 +452,10 @@ void run_spec(const test_spec *spec) {
char *servers_hostports_str;
int *actual_connection_sequence;
request_data rdata;
servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers);
servers_fixture *f;
rdata.call_details =
gpr_malloc(sizeof(grpc_call_details) * spec->num_servers);
f = setup_servers("127.0.0.1", &rdata, spec->num_servers);
/* Create client. */
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
@ -475,6 +474,7 @@ void run_spec(const test_spec *spec) {
gpr_free(client_hostport);
gpr_free(servers_hostports_str);
gpr_free(actual_connection_sequence);
gpr_free(rdata.call_details);
grpc_channel_destroy(client);
teardown_servers(f);

@ -82,9 +82,9 @@ typedef struct thread_args {
/* Basic call to read() */
static int read_bytes(int fd, char *buf, size_t read_size, int spin) {
size_t bytes_read = 0;
int err;
ssize_t err;
do {
err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
err = read(fd, buf + bytes_read, read_size - bytes_read);
if (err < 0) {
if (errno == EINTR) {
continue;
@ -115,6 +115,7 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
struct pollfd pfd;
size_t bytes_read = 0;
int err;
ssize_t err2;
pfd.fd = fd;
pfd.events = POLLIN;
@ -132,13 +133,13 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
GPR_ASSERT(err == 1);
GPR_ASSERT(pfd.revents == POLLIN);
do {
err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
} while (err < 0 && errno == EINTR);
if (err < 0 && errno != EAGAIN) {
err2 = read(fd, buf + bytes_read, read_size - bytes_read);
} while (err2 < 0 && errno == EINTR);
if (err2 < 0 && errno != EAGAIN) {
gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
return -1;
}
bytes_read += (size_t)err;
bytes_read += (size_t) err2;
} while (bytes_read < read_size);
return 0;
}
@ -157,6 +158,7 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
struct epoll_event ev;
size_t bytes_read = 0;
int err;
ssize_t err2;
size_t read_size = args->msg_size;
do {
@ -172,11 +174,11 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
GPR_ASSERT(ev.data.fd == args->fds.read_fd);
do {
do {
err = (int)read(args->fds.read_fd, buf + bytes_read,
read_size - bytes_read);
} while (err < 0 && errno == EINTR);
err2 = read(args->fds.read_fd, buf + bytes_read,
read_size - bytes_read);
} while (err2 < 0 && errno == EINTR);
if (errno == EAGAIN) break;
bytes_read += (size_t)err;
bytes_read += (size_t) err2;
/* TODO(klempner): This should really be doing an extra call after we are
done to ensure we see an EAGAIN */
} while (bytes_read < read_size);
@ -200,11 +202,11 @@ static int epoll_read_bytes_spin(struct thread_args *args, char *buf) {
*/
static int blocking_write_bytes(struct thread_args *args, char *buf) {
size_t bytes_written = 0;
int err;
ssize_t err;
size_t write_size = args->msg_size;
do {
err = (int)write(args->fds.write_fd, buf + bytes_written,
write_size - bytes_written);
err = write(args->fds.write_fd, buf + bytes_written,
write_size - bytes_written);
if (err < 0) {
if (errno == EINTR) {
continue;
@ -298,7 +300,7 @@ static void print_histogram(gpr_histogram *histogram) {
static double now(void) {
gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
return 1e9 * (double)tv.tv_sec + tv.tv_nsec;
return 1e9 * (double)tv.tv_sec + (double)tv.tv_nsec;
}
static void client_thread(thread_args *args) {
@ -374,7 +376,7 @@ error:
return -1;
}
static int connect_client(struct sockaddr *addr, int len) {
static int connect_client(struct sockaddr *addr, socklen_t len) {
int fd = socket(addr->sa_family, SOCK_STREAM, 0);
int err;
if (fd < 0) {
@ -388,7 +390,7 @@ static int connect_client(struct sockaddr *addr, int len) {
}
do {
err = (int)connect(fd, addr, (socklen_t)len);
err = connect(fd, addr, len);
} while (err < 0 && errno == EINTR);
if (err < 0) {
@ -587,27 +589,27 @@ static int run_benchmark(char *socket_type, thread_args *client_args,
return 0;
}
static int run_all_benchmarks(int msg_size) {
static int run_all_benchmarks(size_t msg_size) {
int error = 0;
size_t i;
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
test_strategy *ts = &test_strategies[i];
test_strategy *strategy = &test_strategies[i];
size_t j;
for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
thread_args *client_args = malloc(sizeof(thread_args));
thread_args *server_args = malloc(sizeof(thread_args));
char *socket_type = socket_types[j];
client_args->read_bytes = ts->read_strategy;
client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
client_args->setup = ts->setup;
client_args->msg_size = (size_t)msg_size;
client_args->strategy_name = ts->name;
server_args->read_bytes = ts->read_strategy;
client_args->setup = strategy->setup;
client_args->msg_size = msg_size;
client_args->strategy_name = strategy->name;
server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
server_args->setup = ts->setup;
server_args->msg_size = (size_t)msg_size;
server_args->strategy_name = ts->name;
server_args->setup = strategy->setup;
server_args->msg_size = msg_size;
server_args->strategy_name = strategy->name;
error = run_benchmark(socket_type, client_args, server_args);
if (error < 0) {
return error;
@ -624,7 +626,7 @@ int main(int argc, char **argv) {
char *read_strategy = NULL;
char *socket_type = NULL;
size_t i;
const test_strategy *ts = NULL;
const test_strategy *strategy = NULL;
int error = 0;
gpr_cmdline *cmdline =
@ -644,7 +646,7 @@ int main(int argc, char **argv) {
if (read_strategy == NULL) {
gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
return run_all_benchmarks(msg_size);
return run_all_benchmarks((size_t)msg_size);
}
if (socket_type == NULL) {
@ -658,22 +660,22 @@ int main(int argc, char **argv) {
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
if (strcmp(test_strategies[i].name, read_strategy) == 0) {
ts = &test_strategies[i];
strategy = &test_strategies[i];
}
}
if (ts == NULL) {
if (strategy == NULL) {
fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
return -1;
}
client_args->read_bytes = ts->read_strategy;
client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
client_args->setup = ts->setup;
client_args->setup = strategy->setup;
client_args->msg_size = (size_t)msg_size;
client_args->strategy_name = read_strategy;
server_args->read_bytes = ts->read_strategy;
server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
server_args->setup = ts->setup;
server_args->setup = strategy->setup;
server_args->msg_size = (size_t)msg_size;
server_args->strategy_name = read_strategy;

@ -185,8 +185,8 @@ static void test_byte_buffer_from_reader(void) {
}
static void test_readall(void) {
const char* lotsa_as[512];
const char* lotsa_bs[1024];
char* lotsa_as[512];
char* lotsa_bs[1024];
gpr_slice slices[2];
grpc_byte_buffer *buffer;
grpc_byte_buffer_reader reader;

@ -35,8 +35,6 @@
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@ -52,17 +50,15 @@ static void RunAsyncStreamingPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -77,7 +73,6 @@ static void RunAsyncStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
return 0;
}

@ -35,8 +35,6 @@
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@ -52,17 +50,15 @@ static void RunAsyncUnaryPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -75,7 +71,6 @@ static void RunAsyncUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncUnaryPingPong();
return 0;

@ -40,8 +40,9 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/payloads.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
@ -75,27 +76,54 @@ class Client {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size());
if (config.payload_config().has_bytebuf_params()) {
GPR_ASSERT(false); // not yet implemented
} else if (config.payload_config().has_simple_params()) {
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(
config.payload_config().simple_params().resp_size());
request_.mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
int size = config.payload_config().simple_params().req_size();
std::unique_ptr<char[]> body(new char[size]);
request_.mutable_payload()->set_body(body.get(), size);
} else if (config.payload_config().has_complex_params()) {
GPR_ASSERT(false); // not yet implemented
} else {
// default should be simple proto without payloads
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(0);
request_.mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
}
}
virtual ~Client() {}
ClientStats Mark() {
ClientStats Mark(bool reset) {
Histogram latencies;
Timer::Result timer_result;
// avoid std::vector for old compilers that expect a copy constructor
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(to_merge[i]);
}
delete[] to_merge;
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->MergeStatsInto(&latencies);
}
timer_result = timer_->Mark();
}
delete[] to_merge;
auto timer_result = timer->Mark();
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
@ -122,15 +150,18 @@ class Client {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(target, config.enable_ssl());
stub_ = TestService::NewStub(channel_);
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
config.has_security_params(),
!config.security_params().use_test_ca());
stub_ = BenchmarkService::NewStub(channel_);
}
Channel* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
BenchmarkService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<BenchmarkService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
@ -146,37 +177,41 @@ class Client {
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
if (config.load_type() == CLOSED_LOOP) {
const auto& load = config.load_params();
std::unique_ptr<RandomDist> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
case LoadParams::kUniform:
random_dist.reset(
new UniformDist(load.uniform().interarrival_lo() * num_threads,
load.uniform().interarrival_hi() * num_threads));
break;
case LoadParams::kDeterm:
random_dist.reset(
new DetDist(num_threads / load.determ().offered_load()));
break;
case LoadParams::kPareto:
random_dist.reset(
new ParetoDist(load.pareto().interarrival_base() * num_threads,
load.pareto().alpha()));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
std::unique_ptr<RandomDist> random_dist;
const auto& load = config.load_params();
switch (config.load_type()) {
case POISSON:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
case UNIFORM:
random_dist.reset(
new UniformDist(load.uniform().interarrival_lo() * num_threads,
load.uniform().interarrival_hi() * num_threads));
break;
case DETERMINISTIC:
random_dist.reset(
new DetDist(num_threads / load.determ().offered_load()));
break;
case PARETO:
random_dist.reset(
new ParetoDist(load.pareto().interarrival_base() * num_threads,
load.pareto().alpha()));
break;
default:
GPR_ASSERT(false);
break;
}
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(
@ -204,7 +239,7 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_(nullptr),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@ -219,16 +254,21 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_ = n;
new_stats_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_ != nullptr) {
while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
hist->Merge(histogram_);
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@ -246,21 +286,21 @@ class Client {
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
size_t idx_;

@ -48,10 +48,10 @@
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
@ -88,10 +88,10 @@ template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
int channel_id, TestService::Stub* stub, const RequestType& req,
int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&,
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
@ -131,13 +131,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_;
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&,
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
grpc::Status status_;
double start_;
@ -151,7 +151,7 @@ class AsyncClient : public Client {
public:
explicit AsyncClient(
const ClientConfig& config,
std::function<ClientRpcContext*(int, TestService::Stub*,
std::function<ClientRpcContext*(int, BenchmarkService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
channel_lock_(new std::mutex[config.client_channels()]),
@ -354,11 +354,12 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
static ClientRpcContext* SetupCtx(int channel_id,
BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncUnaryClient::StartReq,
@ -370,10 +371,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
int channel_id, TestService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
CompletionQueue*, void*)> start_req,
int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
@ -420,15 +422,15 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return StartWrite(ok);
}
grpc::ClientContext context_;
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
start_req_;
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@ -439,8 +441,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx) {
// async streaming currently only supported closed loop
GPR_ASSERT(config.load_type() == CLOSED_LOOP);
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
StartThreads(config.async_client_threads());
}
@ -451,12 +453,13 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
static ClientRpcContext* SetupCtx(int channel_id,
BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncStreamingClient::StartReq,

@ -54,10 +54,10 @@
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"

@ -48,6 +48,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
using std::list;
using std::thread;
@ -91,12 +92,12 @@ static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
}
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
@ -131,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
local_workers.emplace_back(new QpsWorker(driver_port));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@ -161,11 +161,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
servers[i].stub =
Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
servers[i].stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureCredentials()));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
@ -189,14 +188,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
clients[i].stub = Worker::NewStub(
clients[i].stub = WorkerService::NewStub(
CreateChannel(workers[i + num_servers], InsecureCredentials()));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
clients[i].stream =
clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
@ -211,9 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start a run
gpr_log(GPR_INFO, "Starting");
ServerArgs server_mark;
server_mark.mutable_mark();
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark();
client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
@ -251,14 +249,15 @@ std::unique_ptr<ScenarioResult> RunScenario(
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
stats.time_elapsed(), stats.time_user(), stats.time_system(),
server_status.cores());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {

@ -37,22 +37,24 @@
#include <memory>
#include "test/cpp/qps/histogram.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class ResourceUsage {
public:
ResourceUsage(double w, double u, double s)
: wall_time_(w), user_time_(u), system_time_(s) {}
ResourceUsage(double w, double u, double s, int c)
: wall_time_(w), user_time_(u), system_time_(s), cores_(c) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
int cores() const { return cores_; }
private:
double wall_time_;
double user_time_;
double system_time_;
int cores_;
};
struct ScenarioResult {

@ -35,7 +35,7 @@
#define TEST_QPS_HISTOGRAM_H
#include <grpc/support/histogram.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/benchmarks/stats.grpc.pb.h"
namespace grpc {
namespace testing {
@ -48,7 +48,7 @@ class Histogram {
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) const {
return gpr_histogram_percentile(impl_, pctile);

@ -29,7 +29,7 @@
syntax = "proto3";
import "test/proto/qpstest.proto";
import "test/proto/benchmarks/control.proto";
package grpc.testing;

@ -37,17 +37,21 @@ fi
bins=`find . .. ../.. ../../.. -name bins | head -1`
for channels in 1 2 4 8
for secure in true false
do
for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT
for channels in 1 2 4 8
do
for server in SYNCHRONOUS_SERVER ASYNC_SERVER
for client in SYNC_CLIENT ASYNC_CLIENT
do
for rpc in UNARY STREAMING
for server in SYNC_SERVER ASYNC_SERVER
do
echo "Test $rpc $client $server , $channels channels"
"$bins"/opt/qps_driver --rpc_type=$rpc \
--client_type=$client --server_type=$server
for rpc in UNARY STREAMING
do
echo "Test $rpc $client $server, $channels channels, secure=$secure"
"$bins"/opt/qps_driver --rpc_type=$rpc \
--client_type=$client --server_type=$server \
--secure_test=$secure
done
done
done
done

@ -33,7 +33,6 @@
#include <memory>
#include <set>
#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
@ -50,31 +49,39 @@ DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)");
DEFINE_int32(local_workers, 0, "Number of local workers to start");
// Common config
DEFINE_bool(enable_ssl, false, "Use SSL");
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
// Server config
DEFINE_int32(server_threads, 1, "Number of server threads");
DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
DEFINE_string(server_type, "SYNC_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
DEFINE_int32(simple_req_size, -1, "Simple proto request payload size");
DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size");
DEFINE_string(client_type, "SYNC_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
DEFINE_string(load_type, "CLOSED_LOOP", "Load type");
DEFINE_double(load_param_1, 0.0, "Load parameter 1");
DEFINE_double(load_param_2, 0.0, "Load parameter 2");
DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)");
DEFINE_double(uniform_lo, -1.0, "Uniform low interarrival time (us)");
DEFINE_double(uniform_hi, -1.0, "Uniform high interarrival time (us)");
DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)");
DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)");
DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value");
DEFINE_bool(secure_test, false, "Run a secure test");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
using grpc::testing::ServerType;
using grpc::testing::LoadType;
using grpc::testing::RpcType;
using grpc::testing::ResourceUsage;
using grpc::testing::SecurityParams;
namespace grpc {
namespace testing {
@ -85,72 +92,63 @@ static void QpsDriver() {
ClientType client_type;
ServerType server_type;
LoadType load_type;
GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type));
ClientConfig client_config;
client_config.set_client_type(client_type);
client_config.set_load_type(load_type);
client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_outstanding_rpcs_per_channel(
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
client_config.set_payload_size(FLAGS_payload_size);
// Decide which type to use based on the response type
if (FLAGS_simple_resp_size >= 0) {
auto params =
client_config.mutable_payload_config()->mutable_simple_params();
params->set_resp_size(FLAGS_simple_resp_size);
if (FLAGS_simple_req_size >= 0) {
params->set_req_size(FLAGS_simple_req_size);
}
} else {
// set a reasonable default: proto but no payload
client_config.mutable_payload_config()->mutable_simple_params();
}
client_config.set_async_client_threads(FLAGS_async_client_threads);
client_config.set_rpc_type(rpc_type);
// set up the load parameters
switch (load_type) {
case grpc::testing::CLOSED_LOOP:
break;
case grpc::testing::POISSON: {
auto poisson = client_config.mutable_load_params()->mutable_poisson();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
poisson->set_offered_load(FLAGS_load_param_1);
break;
}
case grpc::testing::UNIFORM: {
auto uniform = client_config.mutable_load_params()->mutable_uniform();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
GPR_ASSERT(FLAGS_load_param_2 != 0.0);
uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6);
uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6);
break;
}
case grpc::testing::DETERMINISTIC: {
auto determ = client_config.mutable_load_params()->mutable_determ();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
determ->set_offered_load(FLAGS_load_param_1);
break;
}
case grpc::testing::PARETO: {
auto pareto = client_config.mutable_load_params()->mutable_pareto();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
GPR_ASSERT(FLAGS_load_param_2 != 0.0);
pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6);
pareto->set_alpha(FLAGS_load_param_2);
break;
}
default:
GPR_ASSERT(false);
break;
if (FLAGS_poisson_load > 0.0) {
auto poisson = client_config.mutable_load_params()->mutable_poisson();
poisson->set_offered_load(FLAGS_poisson_load);
} else if (FLAGS_uniform_lo > 0.0) {
auto uniform = client_config.mutable_load_params()->mutable_uniform();
uniform->set_interarrival_lo(FLAGS_uniform_lo / 1e6);
uniform->set_interarrival_hi(FLAGS_uniform_hi / 1e6);
} else if (FLAGS_determ_load > 0.0) {
auto determ = client_config.mutable_load_params()->mutable_determ();
determ->set_offered_load(FLAGS_determ_load);
} else if (FLAGS_pareto_base > 0.0) {
auto pareto = client_config.mutable_load_params()->mutable_pareto();
pareto->set_interarrival_base(FLAGS_pareto_base / 1e6);
pareto->set_alpha(FLAGS_pareto_alpha);
} else {
client_config.mutable_load_params()->mutable_closed_loop();
// No further load parameters to set up for closed loop
}
ServerConfig server_config;
server_config.set_server_type(server_type);
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);
// If we're running a sync-server streaming test, make sure
// that we have at least as many threads as the active streams
// or else threads will be blocked from forward progress and the
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads <
FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
server_config.set_async_server_threads(FLAGS_async_server_threads);
if (FLAGS_secure_test) {
// Set up security params
SecurityParams security;
security.set_use_test_ca(true);
security.set_server_host_override("foo.test.google.fr");
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
}
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
@ -168,7 +166,6 @@ static void QpsDriver() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::QpsDriver();
return 0;

@ -42,7 +42,7 @@
using grpc::testing::RandomDist;
using grpc::testing::InterarrivalTimer;
void RunTest(RandomDist &&r, int threads, std::string title) {
static void RunTest(RandomDist &&r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9));

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -52,20 +50,16 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.set_load_type(POISSON);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(4);
server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -80,7 +74,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -52,17 +50,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(8);
server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -77,7 +73,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -56,17 +54,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(4);
server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -83,7 +79,6 @@ int main(int argc, char** argv) {
grpc_platform_become_multipoller = grpc_poll_become_multipoller;
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -52,17 +52,17 @@
#include <grpc++/security/server_credentials.h>
#include "test/core/util/grpc_profiler.h"
#include "test/proto/qpstest.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.pb.h"
namespace grpc {
namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
? CreateSynchronousUnaryClient(config)
: CreateSynchronousStreamingClient(config);
@ -76,26 +76,29 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
std::unique_ptr<Server> CreateServer(const ServerConfig& config,
int server_port) {
static void LimitCores(int cores) {}
static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
if (config.core_limit() > 0) {
LimitCores(config.core_limit());
}
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port);
case ServerType::SYNC_SERVER:
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config, server_port);
return CreateAsyncServer(config);
default:
abort();
}
abort();
}
class WorkerImpl GRPC_FINAL : public Worker::Service {
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
explicit WorkerImpl(int server_port)
: server_port_(server_port), acquired_(false) {}
explicit WorkerServiceImpl() : acquired_(false) {}
Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
@ -103,7 +106,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
Status ret = RunTestBody(ctx, stream);
Status ret = RunClientBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@ -126,7 +129,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
// Protect against multiple clients using this worker at once.
class InstanceGuard {
public:
InstanceGuard(WorkerImpl* impl)
InstanceGuard(WorkerServiceImpl* impl)
: impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() {
if (acquired_) {
@ -137,7 +140,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
bool Acquired() const { return acquired_; }
private:
WorkerImpl* const impl_;
WorkerServiceImpl* const impl_;
const bool acquired_;
};
@ -154,8 +157,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
acquired_ = false;
}
Status RunTestBody(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
Status RunClientBody(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
@ -175,7 +178,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = client->Mark();
*status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
}
@ -191,12 +194,13 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
auto server = CreateServer(args.setup(), server_port_);
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
ServerStatus status;
status.set_port(server_port_);
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
@ -204,21 +208,19 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = server->Mark();
*status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
}
return Status::OK;
}
const int server_port_;
std::mutex mu_;
bool acquired_;
};
QpsWorker::QpsWorker(int driver_port, int server_port) {
impl_.reset(new WorkerImpl(server_port));
QpsWorker::QpsWorker(int driver_port) {
impl_.reset(new WorkerServiceImpl());
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);

@ -42,15 +42,15 @@ class Server;
namespace testing {
class WorkerImpl;
class WorkerServiceImpl;
class QpsWorker {
public:
QpsWorker(int driver_port, int server_port);
explicit QpsWorker(int driver_port);
~QpsWorker();
private:
std::unique_ptr<WorkerImpl> impl_;
std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_;
};

@ -43,6 +43,7 @@ namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
static int Cores(ResourceUsage u) { return u.cores(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads());
qps / sum(result.server_resources, Cores));
}
void GprLogReporter::ReportLatency(const ScenarioResult& result) {
@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
auto qpsPerCore = qps / result.server_config.threads();
auto qps_per_core = qps / sum(result.server_resources, Cores);
perf_db_client_.setQps(qps);
perf_db_client_.setQpsPerCore(qpsPerCore);
perf_db_client_.setQpsPerCore(qps_per_core);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}

@ -41,7 +41,6 @@
#include <grpc++/support/config.h>
#include "test/cpp/qps/driver.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/perf_db_client.h"
namespace grpc {

@ -0,0 +1,84 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <set>
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
static const int WARMUP = 5;
static const int BENCHMARK = 10;
static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNC_SERVER);
// Set up security params
SecurityParams security;
security.set_use_test_ca(true);
security.set_server_host_override("foo.test.google.fr");
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;
}

@ -34,22 +34,38 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include <grpc/support/cpu.h>
#include <grpc++/security/server_credentials.h>
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class Server {
public:
Server() : timer_(new Timer) {}
explicit Server(const ServerConfig& config) : timer_(new Timer) {
if (config.port()) {
port_ = config.port();
} else {
port_ = grpc_pick_unused_port_or_die();
}
}
virtual ~Server() {}
ServerStats Mark() {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats Mark(bool reset) {
Timer::Result timer_result;
if (reset) {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
timer_result = timer->Mark();
} else {
timer_result = timer_->Mark();
}
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
@ -70,13 +86,29 @@ class Server {
return true;
}
int port() const { return port_; }
int cores() const { return gpr_cpu_num_cores(); }
static std::shared_ptr<ServerCredentials> CreateServerCredentials(
const ServerConfig& config) {
if (config.has_security_params()) {
SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
test_server1_cert};
SslServerCredentialsOptions ssl_opts;
ssl_opts.pem_root_certs = "";
ssl_opts.pem_key_cert_pairs.push_back(pkcp);
return SslServerCredentials(ssl_opts);
} else {
return InsecureServerCredentials();
}
}
private:
int port_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
int port);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc

@ -49,38 +49,40 @@
#include <grpc++/security/server_credentials.h>
#include <gtest/gtest.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) {
explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
gpr_join_host_port(&server_address, "::", port());
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.AddListeningPort(server_address,
Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
for (int i = 0; i < 10000 / config.threads(); i++) {
for (int j = 0; j < config.threads(); j++) {
for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
for (int j = 0; j < config.async_server_threads(); j++) {
auto request_unary = std::bind(
&TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
_2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
&BenchmarkService::AsyncService::RequestUnaryCall, &async_service_,
_1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
auto request_streaming = std::bind(
&TestService::AsyncService::RequestStreamingCall, &async_service_,
_1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
&BenchmarkService::AsyncService::RequestStreamingCall,
&async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary, ProcessRPC));
@ -89,10 +91,10 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
@ -309,7 +311,7 @@ class AsyncQpsServerTest : public Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_;
BenchmarkService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
class PerThreadShutdownState {
@ -333,9 +335,8 @@ class AsyncQpsServerTest : public Server {
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config));
}
} // namespace testing

@ -43,14 +43,14 @@
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class TestServiceImpl GRPC_FINAL : public TestService::Service {
class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) GRPC_OVERRIDE {
@ -84,30 +84,29 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
explicit SynchronousServer(const ServerConfig& config) : Server(config) {
ServerBuilder builder;
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
builder.AddListeningPort(server_address, InsecureServerCredentials());
gpr_join_host_port(&server_address, "::", port());
builder.AddListeningPort(server_address,
Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterService(&service_);
return builder.BuildAndStart();
impl_ = builder.BuildAndStart();
}
TestServiceImpl service_;
private:
BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new SynchronousServer(config, port));
const ServerConfig& config) {
return std::unique_ptr<Server>(new SynchronousServer(config));
}
} // namespace testing

@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
bins/$config/qps_worker -driver_port 10000 &
PID1=$!
bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
bins/$config/qps_worker -driver_port 10010 &
PID2=$!
export QPS_WORKERS="localhost:10000,localhost:10010"

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -51,17 +49,14 @@ static void RunSynchronousStreamingPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -75,7 +70,6 @@ static void RunSynchronousStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousStreamingPingPong();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -51,17 +49,14 @@ static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -76,7 +71,6 @@ static void RunSynchronousUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;

@ -61,7 +61,7 @@ Timer::Result Timer::Sample() {
return r;
}
Timer::Result Timer::Mark() {
Timer::Result Timer::Mark() const {
Result s = Sample();
Result r;
r.wall = s.wall - start_.wall;

@ -44,7 +44,7 @@ class Timer {
double system;
};
Result Mark();
Result Mark() const;
static double Now();

@ -43,8 +43,7 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
DEFINE_int32(driver_port, 0, "Driver server port.");
DEFINE_int32(server_port, 0, "Spawned server port.");
DEFINE_int32(driver_port, 0, "Port for communication with driver");
static bool got_sigint = false;
@ -54,7 +53,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
QpsWorker worker(FLAGS_driver_port);
while (!got_sigint) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),

@ -1,4 +1,3 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,62 +27,20 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto3";
package grpc.testing;
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
}
message StatsRequest {
// run number
int32 test_num = 1;
}
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
}
message Payload {
// The type of data in body.
PayloadType type = 1;
// Primary contents of payload.
bytes body = 2;
}
import "test/proto/benchmarks/payloads.proto";
import "test/proto/benchmarks/stats.proto";
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
package grpc.testing;
enum ClientType {
SYNCHRONOUS_CLIENT = 0;
SYNC_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 0;
SYNC_SERVER = 0;
ASYNC_SERVER = 1;
}
@ -92,14 +49,6 @@ enum RpcType {
STREAMING = 1;
}
enum LoadType {
CLOSED_LOOP = 0;
POISSON = 1;
UNIFORM = 2;
DETERMINISTIC = 3;
PARETO = 4;
}
message PoissonParams {
double offered_load = 1;
}
@ -118,32 +67,45 @@ message ParetoParams {
double alpha = 2;
}
message ClosedLoopParams {
}
message LoadParams {
oneof load {
PoissonParams poisson = 1;
UniformParams uniform = 2;
DeterministicParams determ = 3;
ParetoParams pareto = 4;
ClosedLoopParams closed_loop = 1;
PoissonParams poisson = 2;
UniformParams uniform = 3;
DeterministicParams determ = 4;
ParetoParams pareto = 5;
};
}
// presence of SecurityParams implies use of TLS
message SecurityParams {
bool use_test_ca = 1;
string server_host_override = 2;
}
message ClientConfig {
repeated string server_targets = 1;
ClientType client_type = 2;
bool enable_ssl = 3;
SecurityParams security_params = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
string host = 9;
LoadType load_type = 10;
LoadParams load_params = 11;
LoadParams load_params = 10;
PayloadConfig payload_config = 11;
}
message ClientStatus {
ClientStats stats = 1;
}
// Request current stats
message Mark {
bool reset = 1;
}
message ClientArgs {
@ -153,22 +115,15 @@ message ClientArgs {
}
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 2;
double time_user = 3;
double time_system = 4;
}
message ClientStatus {
ClientStats stats = 1;
}
message ServerConfig {
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
string host = 4;
SecurityParams security_params = 2;
int32 port = 4;
// only for async server
int32 async_server_threads = 7;
// restrict core usage
int32 core_limit = 8;
PayloadConfig payload_config = 9;
}
message ServerArgs {
@ -181,38 +136,5 @@ message ServerArgs {
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
int32 response_size = 2;
// Optional input payload sent along with the request.
Payload payload = 3;
}
message SimpleResponse {
Payload payload = 1;
}
service TestService {
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
service Worker {
// Start test with specified workload
rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
// Start test with specified workload
rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
int32 cores = 3;
}

@ -0,0 +1,55 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.testing;
message ByteBufferParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message SimpleProtoParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message ComplexProtoParams {
// TODO (vpai): Fill this in once the details of complex, representative
// protos are decided
}
message PayloadConfig {
oneof payload {
ByteBufferParams bytebuf_params = 1;
SimpleProtoParams simple_params = 2;
ComplexProtoParams complex_params = 3;
}
}

@ -0,0 +1,55 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto3";
import "test/proto/messages.proto";
import "test/proto/benchmarks/control.proto";
package grpc.testing;
service BenchmarkService {
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
service WorkerService {
// Start server with specified workload
rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
// Start client with specified workload
rpc RunClient(stream ClientArgs) returns (stream ClientStatus);
}

@ -0,0 +1,59 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.testing;
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
}
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 2;
double time_user = 3;
double time_system = 4;
}

@ -2,15 +2,38 @@ package http2interop
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"log"
"net"
"testing"
"time"
)
const (
Preface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
)
var (
defaultTimeout = 1 * time.Second
)
type HTTP2InteropCtx struct {
// Inputs
ServerHost string
ServerPort int
UseTLS bool
UseTestCa bool
ServerHostnameOverride string
T *testing.T
// Derived
serverSpec string
authority string
rootCAs *x509.CertPool
}
func parseFrame(r io.Reader) (Frame, error) {
fh := FrameHeader{}
if err := fh.Parse(r); err != nil {
@ -49,22 +72,8 @@ func streamFrame(w io.Writer, f Frame) error {
return nil
}
func getHttp2Conn(addr string) (*tls.Conn, error) {
config := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"h2"},
}
conn, err := tls.Dial("tcp", addr, config)
if err != nil {
return nil, err
}
return conn, nil
}
func testClientShortSettings(addr string, length int) error {
c, err := getHttp2Conn(addr)
func testClientShortSettings(ctx *HTTP2InteropCtx, length int) error {
c, err := connect(ctx)
if err != nil {
return err
}
@ -82,22 +91,22 @@ func testClientShortSettings(addr string, length int) error {
Data: make([]byte, length),
}
if err := streamFrame(c, sf); err != nil {
ctx.T.Log("Unable to stream frame", sf)
return err
}
for {
frame, err := parseFrame(c)
if err != nil {
if _, err := parseFrame(c); err != nil {
ctx.T.Log("Unable to parse frame")
return err
}
log.Println(frame)
}
return nil
}
func testClientPrefaceWithStreamId(addr string) error {
c, err := getHttp2Conn(addr)
func testClientPrefaceWithStreamId(ctx *HTTP2InteropCtx) error {
c, err := connect(ctx)
if err != nil {
return err
}
@ -119,18 +128,16 @@ func testClientPrefaceWithStreamId(addr string) error {
}
for {
frame, err := parseFrame(c)
if err != nil {
if _, err := parseFrame(c); err != nil {
return err
}
log.Println(frame)
}
return nil
}
func testUnknownFrameType(addr string) error {
c, err := getHttp2Conn(addr)
func testUnknownFrameType(ctx *HTTP2InteropCtx) error {
c, err := connect(ctx)
if err != nil {
return err
}
@ -143,6 +150,7 @@ func testUnknownFrameType(addr string) error {
// Send some settings, which are part of the client preface
sf := &SettingsFrame{}
if err := streamFrame(c, sf); err != nil {
ctx.T.Log("Unable to stream frame", sf)
return err
}
@ -154,6 +162,7 @@ func testUnknownFrameType(addr string) error {
},
}
if err := streamFrame(c, fh); err != nil {
ctx.T.Log("Unable to stream frame", fh)
return err
}
}
@ -162,12 +171,14 @@ func testUnknownFrameType(addr string) error {
Data: []byte("01234567"),
}
if err := streamFrame(c, pf); err != nil {
ctx.T.Log("Unable to stream frame", sf)
return err
}
for {
frame, err := parseFrame(c)
if err != nil {
ctx.T.Log("Unable to parse frame")
return err
}
if npf, ok := frame.(*PingFrame); !ok {
@ -183,8 +194,8 @@ func testUnknownFrameType(addr string) error {
return nil
}
func testShortPreface(addr string, prefacePrefix string) error {
c, err := getHttp2Conn(addr)
func testShortPreface(ctx *HTTP2InteropCtx, prefacePrefix string) error {
c, err := connect(ctx)
if err != nil {
return err
}
@ -201,17 +212,15 @@ func testShortPreface(addr string, prefacePrefix string) error {
return err
}
func testTLSMaxVersion(addr string, version uint16) error {
config := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"h2"},
MaxVersion: version,
}
conn, err := tls.Dial("tcp", addr, config)
func testTLSMaxVersion(ctx *HTTP2InteropCtx, version uint16) error {
config := buildTlsConfig(ctx)
config.MaxVersion = version
conn, err := connectWithTls(ctx, config)
if err != nil {
return err
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(defaultTimeout))
buf := make([]byte, 256)
if n, err := conn.Read(buf); err != nil {
@ -223,16 +232,15 @@ func testTLSMaxVersion(addr string, version uint16) error {
return nil
}
func testTLSApplicationProtocol(addr string) error {
config := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"h2c"},
}
conn, err := tls.Dial("tcp", addr, config)
func testTLSApplicationProtocol(ctx *HTTP2InteropCtx) error {
config := buildTlsConfig(ctx)
config.NextProtos = []string{"h2c"}
conn, err := connectWithTls(ctx, config)
if err != nil {
return err
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(defaultTimeout))
buf := make([]byte, 256)
if n, err := conn.Read(buf); err != nil {
@ -243,3 +251,48 @@ func testTLSApplicationProtocol(addr string) error {
}
return nil
}
func connect(ctx *HTTP2InteropCtx) (net.Conn, error) {
var conn net.Conn
var err error
if !ctx.UseTLS {
conn, err = connectWithoutTls(ctx)
} else {
config := buildTlsConfig(ctx)
conn, err = connectWithTls(ctx, config)
}
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(defaultTimeout))
return conn, nil
}
func buildTlsConfig(ctx *HTTP2InteropCtx) *tls.Config {
return &tls.Config{
RootCAs: ctx.rootCAs,
NextProtos: []string{"h2"},
ServerName: ctx.authority,
MinVersion: tls.VersionTLS12,
// TODO(carl-mastrangelo): remove this once all test certificates have been updated.
InsecureSkipVerify: true,
}
}
func connectWithoutTls(ctx *HTTP2InteropCtx) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", ctx.serverSpec, defaultTimeout)
if err != nil {
return nil, err
}
return conn, nil
}
func connectWithTls(ctx *HTTP2InteropCtx, config *tls.Config) (*tls.Conn, error) {
conn, err := connectWithoutTls(ctx)
if err != nil {
return nil, err
}
return tls.Client(conn, config), nil
}

@ -2,46 +2,117 @@ package http2interop
import (
"crypto/tls"
"crypto/x509"
"strings"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"testing"
)
var (
serverSpec = flag.String("spec", ":50051", "The server spec to test")
serverHost = flag.String("server_host", "", "The host to test")
serverPort = flag.Int("server_port", 443, "The port to test")
useTls = flag.Bool("use_tls", true, "Should TLS tests be run")
// TODO: implement
testCase = flag.String("test_case", "", "What test cases to run")
// The rest of these are unused, but present to fulfill the client interface
serverHostOverride = flag.String("server_host_override", "", "Unused")
useTestCa = flag.Bool("use_test_ca", false, "Unused")
defaultServiceAccount = flag.String("default_service_account", "", "Unused")
oauthScope = flag.String("oauth_scope", "", "Unused")
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Unused")
)
func InteropCtx(t *testing.T) *HTTP2InteropCtx {
ctx := &HTTP2InteropCtx{
ServerHost: *serverHost,
ServerPort: *serverPort,
ServerHostnameOverride: *serverHostOverride,
UseTLS: *useTls,
UseTestCa: *useTestCa,
T: t,
}
ctx.serverSpec = ctx.ServerHost
if ctx.ServerPort != -1 {
ctx.serverSpec += ":" + strconv.Itoa(ctx.ServerPort)
}
if ctx.ServerHostnameOverride == "" {
ctx.authority = ctx.ServerHost
} else {
ctx.authority = ctx.ServerHostnameOverride
}
if ctx.UseTestCa {
// It would be odd if useTestCa was true, but not useTls. meh
certData, err := ioutil.ReadFile("src/core/tsi/test_creds/ca.pem")
if err != nil {
t.Fatal(err)
}
ctx.rootCAs = x509.NewCertPool()
if !ctx.rootCAs.AppendCertsFromPEM(certData) {
t.Fatal(fmt.Errorf("Unable to parse pem data"))
}
}
return ctx
}
func (ctx *HTTP2InteropCtx) Close() error {
// currently a noop
return nil
}
func TestShortPreface(t *testing.T) {
ctx := InteropCtx(t)
for i := 0; i < len(Preface)-1; i++ {
if err := testShortPreface(*serverSpec, Preface[:i]+"X"); err != io.EOF {
if err := testShortPreface(ctx, Preface[:i]+"X"); err != io.EOF {
t.Error("Expected an EOF but was", err)
}
}
}
func TestUnknownFrameType(t *testing.T) {
if err := testUnknownFrameType(*serverSpec); err != nil {
ctx := InteropCtx(t)
if err := testUnknownFrameType(ctx); err != nil {
t.Fatal(err)
}
}
func TestTLSApplicationProtocol(t *testing.T) {
if err := testTLSApplicationProtocol(*serverSpec); err != io.EOF {
t.Fatal("Expected an EOF but was", err)
}
ctx := InteropCtx(t)
err := testTLSApplicationProtocol(ctx);
matchError(t, err, "EOF")
}
func TestTLSMaxVersion(t *testing.T) {
if err := testTLSMaxVersion(*serverSpec, tls.VersionTLS11); err != io.EOF {
t.Fatal("Expected an EOF but was", err)
}
ctx := InteropCtx(t)
err := testTLSMaxVersion(ctx, tls.VersionTLS11);
matchError(t, err, "EOF", "server selected unsupported protocol")
}
func TestClientPrefaceWithStreamId(t *testing.T) {
if err := testClientPrefaceWithStreamId(*serverSpec); err != io.EOF {
t.Fatal("Expected an EOF but was", err)
}
ctx := InteropCtx(t)
err := testClientPrefaceWithStreamId(ctx)
matchError(t, err, "EOF")
}
func matchError(t *testing.T, err error, matches ... string) {
if err == nil {
t.Fatal("Expected an error")
}
for _, s := range matches {
if strings.Contains(err.Error(), s) {
return
}
}
t.Fatalf("Error %v not in %+v", err, matches)
}
func TestMain(m *testing.M) {

@ -63,6 +63,7 @@ docker run \
-e "arch=$arch" \
-e CCACHE_DIR=/tmp/ccache \
-e XDG_CACHE_HOME=/tmp/xdg-cache-home \
-e THIS_IS_REALLY_NEEDED='see https://github.com/docker/docker/issues/14203 for why docker is awful' \
-i $TTY_FLAG \
-v "$git_root:/var/local/jenkins/grpc" \
-v /tmp/ccache:/tmp/ccache \

@ -84,6 +84,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)"
# Prepare image for interop tests, commit it on success.
(docker run \
-e CCACHE_DIR=/tmp/ccache \
-e THIS_IS_REALLY_NEEDED='see https://github.com/docker/docker/issues/14203 for why docker is awful' \
-i $TTY_FLAG \
$MOUNT_ARGS \
$BUILD_INTEROP_DOCKER_EXTRA_ARGS \

@ -0,0 +1,36 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
FROM golang:1.4
# Using login shell removes Go from path, so we add it.
RUN ln -s /usr/src/go/bin/go /usr/local/bin
# Define the default command.
CMD ["bash"]

@ -0,0 +1,42 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Builds http2 interop client in a base image.
set -e
mkdir -p /var/local/git
git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
# copy service account keys if available
cp -r /var/local/jenkins/service_account $HOME || true
# compile the tests
(cd /var/local/git/grpc/tools/http2_interop && go test -c)

@ -203,12 +203,23 @@ class Job(object):
env.update(self._spec.environ)
env.update(self._add_env)
self._start = time.time()
self._process = subprocess.Popen(args=self._spec.cmdline,
stderr=subprocess.STDOUT,
stdout=self._tempfile,
cwd=self._spec.cwd,
shell=self._spec.shell,
env=env)
try_start = lambda: subprocess.Popen(args=self._spec.cmdline,
stderr=subprocess.STDOUT,
stdout=self._tempfile,
cwd=self._spec.cwd,
shell=self._spec.shell,
env=env)
delay = 0.3
for i in range(0, 4):
try:
self._process = try_start()
break
except OSError:
message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self._spec.shortname, delay))
time.sleep(delay)
delay *= 2
else:
self._process = try_start()
self._state = _RUNNING
def state(self, update_cache):

@ -108,10 +108,12 @@ def fill_one_test_result(shortname, resultset, html_str):
def render_html_report(client_langs, server_langs, test_cases, auth_test_cases,
resultset, num_failures, cloud_to_prod):
http2_cases, resultset, num_failures, cloud_to_prod,
http2_interop):
"""Generate html report."""
sorted_test_cases = sorted(test_cases)
sorted_auth_test_cases = sorted(auth_test_cases)
sorted_http2_cases = sorted(http2_cases)
sorted_client_langs = sorted(client_langs)
sorted_server_langs = sorted(server_langs)
html_str = ('<!DOCTYPE html>\n'
@ -149,6 +151,30 @@ def render_html_report(client_langs, server_langs, test_cases, auth_test_cases,
html_str = fill_one_test_result(shortname, resultset, html_str)
html_str = '%s</tr>\n' % html_str
html_str = '%s</table>\n' % html_str
if http2_interop:
# Each column header is the server language.
html_str = ('%s<h2>HTTP/2 Interop</h2>\n'
'<table style=\"width:100%%\" border=\"1\">\n'
'<tr bgcolor=\"#00BFFF\">\n'
'<th>Servers &#9658;<br/>'
'Test Cases &#9660;</th>\n') % html_str
for server_lang in sorted_server_langs:
html_str = '%s<th>%s\n' % (html_str, server_lang)
if cloud_to_prod:
html_str = '%s<th>%s\n' % (html_str, "prod")
html_str = '%s</tr>\n' % html_str
for test_case in sorted_http2_cases:
html_str = '%s<tr><td><b>%s</b></td>\n' % (html_str, test_case)
# Fill up the cells with test result.
for server_lang in sorted_server_langs:
shortname = 'cloud_to_cloud:%s:%s_server:%s' % (
"http2", server_lang, test_case)
html_str = fill_one_test_result(shortname, resultset, html_str)
if cloud_to_prod:
shortname = 'cloud_to_prod:%s:%s' % ("http2", test_case)
html_str = fill_one_test_result(shortname, resultset, html_str)
html_str = '%s</tr>\n' % html_str
html_str = '%s</table>\n' % html_str
if server_langs:
for test_case in sorted_test_cases:
# Each column header is the client language.

@ -159,6 +159,31 @@ class GoLanguage:
return 'go'
class Http2Client:
"""Represents the HTTP/2 Interop Test
This pretends to be a language in order to be built and run, but really it
isn't.
"""
def __init__(self):
self.client_cwd = None
self.safename = str(self)
def client_args(self):
return ['tools/http2_interop/http2_interop.test']
def cloud_to_prod_env(self):
return {}
def global_env(self):
return {}
def unimplemented_test_cases(self):
return _TEST_CASES
def __str__(self):
return 'http2'
class NodeLanguage:
def __init__(self):
@ -281,6 +306,7 @@ _TEST_CASES = ['large_unary', 'empty_unary', 'ping_pong',
_AUTH_TEST_CASES = ['compute_engine_creds', 'jwt_token_creds',
'oauth2_auth_token', 'per_rpc_creds']
_HTTP2_TEST_CASES = ["tls"]
def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None):
"""Wraps given cmdline array to create 'docker run' cmdline from it."""
@ -439,6 +465,7 @@ def server_jobspec(language, docker_image):
environ=environ,
docker_args=['-p', str(_DEFAULT_SERVER_PORT),
'--name', container_name])
server_job = jobset.JobSpec(
cmdline=docker_cmdline,
environ=environ,
@ -516,6 +543,12 @@ argp.add_argument('--allow_flakes',
action='store_const',
const=True,
help='Allow flaky tests to show as passing (re-runs failed tests up to five times)')
argp.add_argument('--http2_interop',
default=False,
action='store_const',
const=True,
help='Enable HTTP/2 interop tests')
args = argp.parse_args()
servers = set(s for s in itertools.chain.from_iterable(_SERVERS
@ -539,12 +572,16 @@ languages = set(_LANGUAGES[l]
for l in itertools.chain.from_iterable(
_LANGUAGES.iterkeys() if x == 'all' else [x]
for x in args.language))
http2Interop = Http2Client() if args.http2_interop else None
docker_images={}
if args.use_docker:
# languages for which to build docker images
languages_to_build = set(_LANGUAGES[k] for k in set([str(l) for l in languages] +
[s for s in servers]))
if args.http2_interop:
languages_to_build.add(http2Interop)
build_jobs = []
for l in languages_to_build:
@ -586,6 +623,13 @@ try:
test_job = cloud_to_prod_jobspec(language, test_case,
docker_image=docker_images.get(str(language)))
jobs.append(test_job)
if args.http2_interop:
for test_case in _HTTP2_TEST_CASES:
test_job = cloud_to_prod_jobspec(http2Interop, test_case,
docker_image=docker_images.get(str(http2Interop)))
jobs.append(test_job)
if args.cloud_to_prod_auth:
for language in languages:
@ -613,6 +657,16 @@ try:
server_port,
docker_image=docker_images.get(str(language)))
jobs.append(test_job)
if args.http2_interop:
for test_case in _HTTP2_TEST_CASES:
test_job = cloud_to_cloud_jobspec(http2Interop,
test_case,
server_name,
server_host,
server_port,
docker_image=docker_images.get(str(http2Interop)))
jobs.append(test_job)
if not jobs:
print 'No jobs to run.'
@ -631,7 +685,8 @@ try:
report_utils.render_html_report(
set([str(l) for l in languages]), servers, _TEST_CASES, _AUTH_TEST_CASES,
resultset, num_failures, args.cloud_to_prod_auth or args.cloud_to_prod)
_HTTP2_TEST_CASES, resultset, num_failures,
args.cloud_to_prod_auth or args.cloud_to_prod, args.http2_interop)
finally:
# Check if servers are still running.

@ -147,9 +147,9 @@ class CLanguage(object):
self.platform = platform_string()
self.test_lang = test_lang
def test_specs(self, config, travis):
def test_specs(self, config, args):
out = []
binaries = get_c_tests(travis, self.test_lang)
binaries = get_c_tests(args.travis, self.test_lang)
for target in binaries:
if config.build_config in target['exclude_configs']:
continue
@ -160,11 +160,16 @@ class CLanguage(object):
binary = 'bins/%s/%s' % (config.build_config, target['name'])
if os.path.isfile(binary):
out.append(config.job_spec([binary], [binary]))
else:
elif args.regex == '.*' or platform_string() == 'windows':
print '\nWARNING: binary not found, skipping', binary
return sorted(out)
def make_targets(self):
def make_targets(self, test_regex):
if platform_string() != 'windows' and test_regex != '.*':
# use the regex to minimize the number of things to build
return [target['name']
for target in get_c_tests(False, self.test_lang)
if re.search(test_regex, target['name'])]
if platform_string() == 'windows':
# don't build tools on windows just yet
return ['buildtests_%s' % self.make_target]
@ -196,7 +201,7 @@ class CLanguage(object):
class NodeLanguage(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return [config.job_spec(['tools/run_tests/run_node.sh'], None,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
@ -204,7 +209,7 @@ class NodeLanguage(object):
# Default to 1 week cache expiration
return [['tools/run_tests/pre_build_node.sh']]
def make_targets(self):
def make_targets(self, test_regex):
return []
def build_steps(self):
@ -225,14 +230,14 @@ class NodeLanguage(object):
class PhpLanguage(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return [config.job_spec(['src/php/bin/run_tests.sh'], None,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def pre_build_steps(self):
return []
def make_targets(self):
def make_targets(self, test_regex):
return ['static_c', 'shared_c']
def build_steps(self):
@ -257,7 +262,7 @@ class PythonLanguage(object):
self._build_python_versions = ['2.7']
self._has_python_versions = []
def test_specs(self, config, travis):
def test_specs(self, config, args):
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
environment['PYVER'] = '2.7'
return [config.job_spec(
@ -271,7 +276,7 @@ class PythonLanguage(object):
def pre_build_steps(self):
return []
def make_targets(self):
def make_targets(self, test_regex):
return ['static_c', 'grpc_python_plugin', 'shared_c']
def build_steps(self):
@ -303,14 +308,14 @@ class PythonLanguage(object):
class RubyLanguage(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return [config.job_spec(['tools/run_tests/run_ruby.sh'], None,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def pre_build_steps(self):
return [['tools/run_tests/pre_build_ruby.sh']]
def make_targets(self):
def make_targets(self, test_regex):
return ['static_c']
def build_steps(self):
@ -333,7 +338,7 @@ class CSharpLanguage(object):
def __init__(self):
self.platform = platform_string()
def test_specs(self, config, travis):
def test_specs(self, config, args):
assemblies = ['Grpc.Core.Tests',
'Grpc.Examples.Tests',
'Grpc.HealthCheck.Tests',
@ -361,7 +366,7 @@ class CSharpLanguage(object):
else:
return [['tools/run_tests/pre_build_csharp.sh']]
def make_targets(self):
def make_targets(self, test_regex):
# For Windows, this target doesn't really build anything,
# everything is build by buildall script later.
if self.platform == 'windows':
@ -390,14 +395,14 @@ class CSharpLanguage(object):
class ObjCLanguage(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return [config.job_spec(['src/objective-c/tests/run_tests.sh'], None,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def pre_build_steps(self):
return []
def make_targets(self):
def make_targets(self, test_regex):
return ['grpc_objective_c_plugin', 'interop_server']
def build_steps(self):
@ -418,14 +423,14 @@ class ObjCLanguage(object):
class Sanity(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return [config.job_spec(['tools/run_tests/run_sanity.sh'], None),
config.job_spec(['tools/run_tests/check_sources_and_headers.py'], None)]
def pre_build_steps(self):
return []
def make_targets(self):
def make_targets(self, test_regex):
return ['run_dep_checks']
def build_steps(self):
@ -446,13 +451,13 @@ class Sanity(object):
class Build(object):
def test_specs(self, config, travis):
def test_specs(self, config, args):
return []
def pre_build_steps(self):
return []
def make_targets(self):
def make_targets(self, test_regex):
return ['static']
def build_steps(self):
@ -662,7 +667,7 @@ make_targets = {}
for l in languages:
makefile = l.makefile_name()
make_targets[makefile] = make_targets.get(makefile, set()).union(
set(l.make_targets()))
set(l.make_targets(args.regex)))
build_steps = list(set(
jobset.JobSpec(cmdline, environ={'CONFIG': cfg}, flake_retries=5)
@ -830,12 +835,12 @@ def _calculate_num_runs_failures(list_of_results):
return num_runs, num_failures
def _build_and_run(
check_cancelled, newline_on_success, travis, cache, xml_report=None):
check_cancelled, newline_on_success, cache, xml_report=None):
"""Do one pass of building & running tests."""
# build latest sequentially
num_failures, _ = jobset.run(
build_steps, maxjobs=1, stop_on_failure=True,
newline_on_success=newline_on_success, travis=travis)
newline_on_success=newline_on_success, travis=args.travis)
if num_failures:
return 1
@ -851,11 +856,11 @@ def _build_and_run(
spec
for config in run_configs
for language in languages
for spec in language.test_specs(config, args.travis)
for spec in language.test_specs(config, args)
if re.search(args.regex, spec.shortname))
# When running on travis, we want out test runs to be as similar as possible
# for reproducibility purposes.
if travis:
if args.travis:
massaged_one_run = sorted(one_run, key=lambda x: x.shortname)
else:
# whereas otherwise, we want to shuffle things up to give all tests a
@ -870,7 +875,7 @@ def _build_and_run(
number_failures, resultset = jobset.run(
all_runs, check_cancelled, newline_on_success=newline_on_success,
travis=travis, infinite_runs=infinite_runs, maxjobs=args.jobs,
travis=args.travis, infinite_runs=infinite_runs, maxjobs=args.jobs,
stop_on_failure=args.stop_on_failure,
cache=cache if not xml_report else None,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
@ -895,7 +900,7 @@ def _build_and_run(
number_failures, _ = jobset.run(
post_tests_steps, maxjobs=1, stop_on_failure=True,
newline_on_success=newline_on_success, travis=travis)
newline_on_success=newline_on_success, travis=args.travis)
if number_failures:
return 3
@ -916,7 +921,6 @@ if forever:
previous_success = success
success = _build_and_run(check_cancelled=have_files_changed,
newline_on_success=False,
travis=args.travis,
cache=test_cache) == 0
if not previous_success and success:
jobset.message('SUCCESS',
@ -928,7 +932,6 @@ if forever:
else:
result = _build_and_run(check_cancelled=lambda: False,
newline_on_success=args.newline_on_success,
travis=args.travis,
cache=test_cache,
xml_report=args.xml_report)
if result == 0:

@ -1573,6 +1573,23 @@
"test/cpp/common/secure_auth_context_test.cc"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util",
"qps"
],
"headers": [],
"language": "c++",
"name": "secure_sync_unary_ping_pong_test",
"src": [
"test/cpp/qps/secure_sync_unary_ping_pong_test.cc"
]
},
{
"deps": [
"gpr",
@ -15371,8 +15388,16 @@
"test/cpp/qps/stats.h",
"test/cpp/qps/timer.h",
"test/cpp/util/benchmark_config.h",
"test/proto/qpstest.grpc.pb.h",
"test/proto/qpstest.pb.h"
"test/proto/benchmarks/control.grpc.pb.h",
"test/proto/benchmarks/control.pb.h",
"test/proto/benchmarks/payloads.grpc.pb.h",
"test/proto/benchmarks/payloads.pb.h",
"test/proto/benchmarks/services.grpc.pb.h",
"test/proto/benchmarks/services.pb.h",
"test/proto/benchmarks/stats.grpc.pb.h",
"test/proto/benchmarks/stats.pb.h",
"test/proto/messages.grpc.pb.h",
"test/proto/messages.pb.h"
],
"language": "c++",
"name": "qps",

@ -1515,6 +1515,22 @@
"windows"
]
},
{
"ci_platforms": [
"linux",
"mac",
"posix"
],
"exclude_configs": [],
"flaky": false,
"language": "c++",
"name": "secure_sync_unary_ping_pong_test",
"platforms": [
"linux",
"mac",
"posix"
]
},
{
"ci_platforms": [
"linux",

@ -147,13 +147,45 @@
<ClInclude Include="..\..\..\test\cpp\util\benchmark_config.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\..\test\proto\qpstest.pb.cc">
<ClCompile Include="..\..\..\test\proto\messages.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\qpstest.pb.h">
<ClInclude Include="..\..\..\test\proto\messages.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\qpstest.grpc.pb.cc">
<ClCompile Include="..\..\..\test\proto\messages.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\qpstest.grpc.pb.h">
<ClInclude Include="..\..\..\test\proto\messages.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\control.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\control.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\payloads.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\payloads.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\services.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\services.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\stats.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\stats.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\cpp\qps\perf_db.pb.cc">
</ClCompile>

@ -1,9 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="..\..\..\test\proto\qpstest.proto">
<ClCompile Include="..\..\..\test\proto\messages.proto">
<Filter>test\proto</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\cpp\qps\perf_db.proto">
<Filter>test\cpp\qps</Filter>
</ClCompile>
@ -90,6 +102,9 @@
<Filter Include="test\proto">
<UniqueIdentifier>{44e63a33-67f4-0575-e87a-711a7c9111e2}</UniqueIdentifier>
</Filter>
<Filter Include="test\proto\benchmarks">
<UniqueIdentifier>{4180a094-39b4-e46c-1576-940bfe87d284}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>

Loading…
Cancel
Save