diff --git a/CMakeLists.txt b/CMakeLists.txt index 088eb58a094..5f7d582b0f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1067,6 +1067,7 @@ add_library(end2end_nosec_tests test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc test/core/end2end/tests/server_finishes_request.cc + test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc test/core/end2end/tests/shutdown_finishes_tags.cc test/core/end2end/tests/simple_cacheable_request.cc @@ -1200,6 +1201,7 @@ add_library(end2end_tests test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc test/core/end2end/tests/server_finishes_request.cc + test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc test/core/end2end/tests/shutdown_finishes_tags.cc test/core/end2end/tests/simple_cacheable_request.cc diff --git a/Makefile b/Makefile index dd61842d777..944167672d4 100644 --- a/Makefile +++ b/Makefile @@ -3349,6 +3349,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \ test/core/end2end/tests/retry_throttled.cc \ test/core/end2end/tests/retry_too_many_attempts.cc \ test/core/end2end/tests/server_finishes_request.cc \ + test/core/end2end/tests/server_streaming.cc \ test/core/end2end/tests/shutdown_finishes_calls.cc \ test/core/end2end/tests/shutdown_finishes_tags.cc \ test/core/end2end/tests/simple_cacheable_request.cc \ @@ -3462,6 +3463,7 @@ LIBEND2END_TESTS_SRC = \ test/core/end2end/tests/retry_throttled.cc \ test/core/end2end/tests/retry_too_many_attempts.cc \ test/core/end2end/tests/server_finishes_request.cc \ + test/core/end2end/tests/server_streaming.cc \ test/core/end2end/tests/shutdown_finishes_calls.cc \ test/core/end2end/tests/shutdown_finishes_tags.cc \ test/core/end2end/tests/simple_cacheable_request.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 81f6c833975..b8c506f3b3e 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -100,6 +100,7 @@ libs: - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc - test/core/end2end/tests/server_finishes_request.cc + - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc - test/core/end2end/tests/shutdown_finishes_tags.cc - test/core/end2end/tests/simple_cacheable_request.cc @@ -209,6 +210,7 @@ libs: - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc - test/core/end2end/tests/server_finishes_request.cc + - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc - test/core/end2end/tests/shutdown_finishes_tags.cc - test/core/end2end/tests/simple_cacheable_request.cc diff --git a/doc/status_ordering.md b/doc/status_ordering.md index fccfa863a3e..d72235b7c7e 100644 --- a/doc/status_ordering.md +++ b/doc/status_ordering.md @@ -3,11 +3,12 @@ Ordering Status and Reads in the gRPC API Rules for implementors: 1. Reads and Writes Must not succeed after Status has been delivered. -2. OK Status is only delivered after all buffered messages are read. +2. Status is only delivered after all buffered messages are read. 3. Reads May continue to succeed after a failing write. However, once a write fails, all subsequent writes Must fail, and similarly, once a read fails, all subsequent reads Must fail. -4. When an error status is known to the library, if the user asks for status, +4. A non-OK status received from the server is not considered an error status. +5. When an error status is known to the library, if the user asks for status, the library Should discard messages received in the library but not delivered to the user and then deliver the status. If the user does not ask for status but continues reading, the library Should deliver buffered messages before diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index b71bc03ba4f..3934338eb2e 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1665,6 +1665,7 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', 'test/core/end2end/tests/server_finishes_request.cc', + 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', 'test/core/end2end/tests/shutdown_finishes_tags.cc', 'test/core/end2end/tests/simple_cacheable_request.cc', diff --git a/grpc.gyp b/grpc.gyp index d5b9709c35b..bc5cd898169 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -252,6 +252,7 @@ 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', 'test/core/end2end/tests/server_finishes_request.cc', + 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', 'test/core/end2end/tests/shutdown_finishes_tags.cc', 'test/core/end2end/tests/simple_cacheable_request.cc', @@ -354,6 +355,7 @@ 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', 'test/core/end2end/tests/server_finishes_request.cc', + 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', 'test/core/end2end/tests/shutdown_finishes_tags.cc', 'test/core/end2end/tests/simple_cacheable_request.cc', diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 4bb53a74423..7721b16e91f 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -391,27 +391,6 @@ static bool md_key_cmp(grpc_mdelem md, const grpc_slice& reference) { return GRPC_MDKEY(md).refcount == reference.refcount; } -static bool md_cmp(grpc_mdelem md, grpc_mdelem ref_md, - const grpc_slice& ref_key) { - if (GPR_LIKELY(GRPC_MDELEM_IS_INTERNED(md))) { - return md.payload == ref_md.payload; - } - if (md_key_cmp(md, ref_key)) { - return grpc_slice_eq_static_interned(GRPC_MDVALUE(md), - GRPC_MDVALUE(ref_md)); - } - return false; -} - -static bool is_nonzero_status(grpc_mdelem md) { - // If md.payload == GRPC_MDELEM_GRPC_STATUS_1 or GRPC_MDELEM_GRPC_STATUS_2, - // then we have seen an error. In fact, if it is a GRPC_STATUS and it's - // not equal to GRPC_MDELEM_GRPC_STATUS_0, then we have seen an error. - // TODO(ctiller): check for a status like " 0" - return md_key_cmp(md, GRPC_MDSTR_GRPC_STATUS) && - !md_cmp(md, GRPC_MDELEM_GRPC_STATUS_0, GRPC_MDSTR_GRPC_STATUS); -} - static void GPR_ATTRIBUTE_NOINLINE on_initial_header_log( grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_mdelem md) { char* key = grpc_slice_to_c_string(GRPC_MDKEY(md)); @@ -493,9 +472,7 @@ static grpc_error* on_initial_header(void* tp, grpc_mdelem md) { on_initial_header_log(t, s, md); } - if (is_nonzero_status(md)) { // not GRPC_MDELEM_GRPC_STATUS_0? - s->seen_error = true; - } else if (md_key_cmp(md, GRPC_MDSTR_GRPC_TIMEOUT)) { + if (md_key_cmp(md, GRPC_MDSTR_GRPC_TIMEOUT)) { return handle_timeout(s, md); } @@ -534,10 +511,6 @@ static grpc_error* on_trailing_header(void* tp, grpc_mdelem md) { gpr_free(value); } - if (is_nonzero_status(md)) { // not GRPC_MDELEM_GRPC_STATUS_0? - s->seen_error = true; - } - const size_t new_size = s->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md); const size_t metadata_size_limit = t->settings[GRPC_ACKED_SETTINGS] diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index 959315d6e69..b8da655fd56 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -1,20 +1,20 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #include "test/core/end2end/cq_verifier.h" @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -42,7 +43,7 @@ #define ROOT_EXPECTATION 1000 -/* a set of metadata we expect to find on an event */ +// a set of metadata we expect to find on an event typedef struct metadata { size_t count; size_t cap; @@ -50,36 +51,46 @@ typedef struct metadata { char** values; } metadata; -/* details what we expect to find on a single event - and forms a linked - list to detail other expectations */ -typedef struct expectation { - struct expectation* next; +// details what we expect to find on a single event +struct Expectation { + Expectation(const char* f, int l, grpc_completion_type t, void* tag_arg, + bool check_success_arg, int success_arg, bool* seen_arg) + : file(f), + line(l), + type(t), + tag(tag_arg), + check_success(check_success_arg), + success(success_arg), + seen(seen_arg) {} const char* file; int line; grpc_completion_type type; void* tag; bool check_success; int success; -} expectation; + bool* seen; +}; -/* the verifier itself */ +// the verifier itself struct cq_verifier { - /* bound completion queue */ + // bound completion queue grpc_completion_queue* cq; - /* start of expectation list */ - expectation* first_expectation; + // expectation list + std::list expectations; + // maybe expectation list + std::list maybe_expectations; }; +// TODO(yashykt): Convert this to constructor/destructor pair cq_verifier* cq_verifier_create(grpc_completion_queue* cq) { - cq_verifier* v = static_cast(gpr_malloc(sizeof(cq_verifier))); + cq_verifier* v = new cq_verifier; v->cq = cq; - v->first_expectation = nullptr; return v; } void cq_verifier_destroy(cq_verifier* v) { cq_verify(v); - gpr_free(v); + delete v; } static int has_metadata(const grpc_metadata* md, size_t count, const char* key, @@ -179,7 +190,7 @@ static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; } namespace { -std::string ExpectationString(const expectation& e) { +std::string ExpectationString(const Expectation& e) { std::string out; if (is_probably_integer(e.tag)) { out = absl::StrFormat("tag(%" PRIdPTR ") ", (intptr_t)e.tag); @@ -202,8 +213,8 @@ std::string ExpectationString(const expectation& e) { std::string ExpectationsString(const cq_verifier& v) { std::vector expectations; - for (expectation* e = v.first_expectation; e != nullptr; e = e->next) { - expectations.push_back(ExpectationString(*e)); + for (const auto& e : v.expectations) { + expectations.push_back(ExpectationString(e)); } return absl::StrJoin(expectations, "\n"); } @@ -216,13 +227,13 @@ static void fail_no_event_received(cq_verifier* v) { abort(); } -static void verify_matches(expectation* e, grpc_event* ev) { - GPR_ASSERT(e->type == ev->type); - switch (e->type) { +static void verify_matches(const Expectation& e, const grpc_event& ev) { + GPR_ASSERT(e.type == ev.type); + switch (e.type) { case GRPC_OP_COMPLETE: - if (e->check_success && e->success != ev->success) { + if (e.check_success && e.success != ev.success) { gpr_log(GPR_ERROR, "actual success does not match expected: %s", - ExpectationString(*e).c_str()); + ExpectationString(e).c_str()); abort(); } break; @@ -237,33 +248,38 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } +// Try to find the event in the expectations list +bool FindExpectations(std::list* expectations, + const grpc_event& ev) { + for (auto e = expectations->begin(); e != expectations->end(); ++e) { + if (e->tag == ev.tag) { + verify_matches(*e, ev); + if (e->seen != nullptr) { + *(e->seen) = true; + } + expectations->erase(e); + return true; + } + } + return false; +} + void cq_verify(cq_verifier* v, int timeout_sec) { const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); - while (v->first_expectation != nullptr) { + while (!v->expectations.empty()) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT) { fail_no_event_received(v); break; } - expectation* e; - expectation* prev = nullptr; - for (e = v->first_expectation; e != nullptr; e = e->next) { - if (e->tag == ev.tag) { - verify_matches(e, &ev); - if (e == v->first_expectation) v->first_expectation = e->next; - if (prev != nullptr) prev->next = e->next; - gpr_free(e); - break; - } - prev = e; - } - if (e == nullptr) { - gpr_log(GPR_ERROR, "cq returned unexpected event: %s", - grpc_event_string(&ev).c_str()); - gpr_log(GPR_ERROR, "expected tags:\n%s", ExpectationsString(*v).c_str()); - abort(); - } + if (FindExpectations(&v->expectations, ev)) continue; + if (FindExpectations(&v->maybe_expectations, ev)) continue; + gpr_log(GPR_ERROR, "cq returned unexpected event: %s", + grpc_event_string(&ev).c_str()); + gpr_log(GPR_ERROR, "expected tags:\n%s", ExpectationsString(*v).c_str()); + abort(); } + v->maybe_expectations.clear(); } void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { @@ -272,8 +288,7 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { gpr_time_from_seconds(timeout_sec, GPR_TIMESPAN)); grpc_event ev; - GPR_ASSERT(v->first_expectation == nullptr && - "expectation queue must be empty"); + GPR_ASSERT(v->expectations.empty() && "expectation queue must be empty"); ev = grpc_completion_queue_next(v->cq, deadline, nullptr); if (ev.type != GRPC_QUEUE_TIMEOUT) { @@ -285,18 +300,17 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { void cq_verify_empty(cq_verifier* v) { cq_verify_empty_timeout(v, 1); } +void cq_maybe_expect_completion(cq_verifier* v, const char* file, int line, + void* tag, bool success, bool* seen) { + v->maybe_expectations.emplace_back(file, line, GRPC_OP_COMPLETE, tag, true, + true, seen); +} + static void add(cq_verifier* v, const char* file, int line, grpc_completion_type type, void* tag, bool check_success, bool success) { - expectation* e = static_cast(gpr_malloc(sizeof(expectation))); - e->type = type; - e->file = file; - e->line = line; - e->tag = tag; - e->check_success = check_success; - e->success = success; - e->next = v->first_expectation; - v->first_expectation = e; + v->expectations.emplace_back(file, line, type, tag, check_success, success, + nullptr); } void cq_expect_completion(cq_verifier* v, const char* file, int line, void* tag, diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index fd0a7380f91..9976919a00c 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -49,10 +49,15 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec); the event. */ void cq_expect_completion(cq_verifier* v, const char* file, int line, void* tag, bool success); +/* If the \a tag is seen, \a seen is set to true. */ +void cq_maybe_expect_completion(cq_verifier* v, const char* file, int line, + void* tag, bool success, bool* seen); void cq_expect_completion_any_status(cq_verifier* v, const char* file, int line, void* tag); #define CQ_EXPECT_COMPLETION(v, tag, success) \ cq_expect_completion(v, __FILE__, __LINE__, tag, success) +#define CQ_MAYBE_EXPECT_COMPLETION(v, tag, success, seen) \ + cq_maybe_expect_completion(v, __FILE__, __LINE__, tag, success, seen) #define CQ_EXPECT_COMPLETION_ANY_STATUS(v, tag) \ cq_expect_completion_any_status(v, __FILE__, __LINE__, tag) diff --git a/test/core/end2end/end2end_nosec_tests.cc b/test/core/end2end/end2end_nosec_tests.cc index f5c9c5ca294..eacb2c64c58 100644 --- a/test/core/end2end/end2end_nosec_tests.cc +++ b/test/core/end2end/end2end_nosec_tests.cc @@ -157,6 +157,8 @@ extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); +extern void server_streaming(grpc_end2end_test_config config); +extern void server_streaming_pre_init(void); extern void shutdown_finishes_calls(grpc_end2end_test_config config); extern void shutdown_finishes_calls_pre_init(void); extern void shutdown_finishes_tags(grpc_end2end_test_config config); @@ -253,6 +255,7 @@ void grpc_end2end_tests_pre_init(void) { retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); server_finishes_request_pre_init(); + server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); shutdown_finishes_tags_pre_init(); simple_cacheable_request_pre_init(); @@ -340,6 +343,7 @@ void grpc_end2end_tests(int argc, char **argv, retry_throttled(config); retry_too_many_attempts(config); server_finishes_request(config); + server_streaming(config); shutdown_finishes_calls(config); shutdown_finishes_tags(config); simple_cacheable_request(config); @@ -614,6 +618,10 @@ void grpc_end2end_tests(int argc, char **argv, server_finishes_request(config); continue; } + if (0 == strcmp("server_streaming", argv[i])) { + server_streaming(config); + continue; + } if (0 == strcmp("shutdown_finishes_calls", argv[i])) { shutdown_finishes_calls(config); continue; diff --git a/test/core/end2end/end2end_tests.cc b/test/core/end2end/end2end_tests.cc index 3d81b61eb85..ec8a55e489d 100644 --- a/test/core/end2end/end2end_tests.cc +++ b/test/core/end2end/end2end_tests.cc @@ -159,6 +159,8 @@ extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); +extern void server_streaming(grpc_end2end_test_config config); +extern void server_streaming_pre_init(void); extern void shutdown_finishes_calls(grpc_end2end_test_config config); extern void shutdown_finishes_calls_pre_init(void); extern void shutdown_finishes_tags(grpc_end2end_test_config config); @@ -256,6 +258,7 @@ void grpc_end2end_tests_pre_init(void) { retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); server_finishes_request_pre_init(); + server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); shutdown_finishes_tags_pre_init(); simple_cacheable_request_pre_init(); @@ -344,6 +347,7 @@ void grpc_end2end_tests(int argc, char **argv, retry_throttled(config); retry_too_many_attempts(config); server_finishes_request(config); + server_streaming(config); shutdown_finishes_calls(config); shutdown_finishes_tags(config); simple_cacheable_request(config); @@ -622,6 +626,10 @@ void grpc_end2end_tests(int argc, char **argv, server_finishes_request(config); continue; } + if (0 == strcmp("server_streaming", argv[i])) { + server_streaming(config); + continue; + } if (0 == strcmp("shutdown_finishes_calls", argv[i])) { shutdown_finishes_calls(config); continue; diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 7eabb251413..c4191accd23 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -343,6 +343,7 @@ END2END_TESTS = { proxyable = False, ), "server_finishes_request": _test_options(), + "server_streaming": _test_options(needs_http2 = True), "shutdown_finishes_calls": _test_options(), "shutdown_finishes_tags": _test_options(), "simple_cacheable_request": _test_options(), diff --git a/test/core/end2end/tests/server_streaming.cc b/test/core/end2end/tests/server_streaming.cc new file mode 100644 index 00000000000..ece86f80951 --- /dev/null +++ b/test/core/end2end/tests/server_streaming.cc @@ -0,0 +1,280 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include +#include + +#include +#include +#include +#include +#include "test/core/end2end/cq_verifier.h" + +static void* tag(intptr_t t) { return (void*)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +/* Client requests status along with the initial metadata. Server streams + * messages and ends with a non-OK status. Client reads after server is done + * writing, and expects to get the status after the messages. */ +static void test_server_streaming(grpc_end2end_test_config config, + int num_messages) { + grpc_end2end_test_fixture f = + begin_test(config, "test_server_streaming", nullptr, nullptr); + grpc_call* c; + grpc_call* s; + cq_verifier* cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + grpc_byte_buffer* request_payload_recv; + grpc_byte_buffer* response_payload; + grpc_slice response_payload_slice = + grpc_slice_from_copied_string("hello world"); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/foo"), nullptr, + deadline, nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + // Client requests status early but should not receive status till all the + // messages are received. + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client sends close early + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(3), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(3), 1); + cq_verify(cqv); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(100), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(101), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + // Server writes bunch of messages + for (int i = 0; i < num_messages; i++) { + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), + tag(103), nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); + cq_verify(cqv); + + grpc_byte_buffer_destroy(response_payload); + } + + // Server sends status + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(104), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + bool seen_status = false; + CQ_MAYBE_EXPECT_COMPLETION(cqv, tag(1), true, &seen_status); + CQ_EXPECT_COMPLETION(cqv, tag(104), 1); + cq_verify(cqv); + + // Client keeps reading messages till it gets the status + int num_messages_received = 0; + while (true) { + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), + tag(102), nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_MAYBE_EXPECT_COMPLETION(cqv, tag(1), true, &seen_status); + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + cq_verify(cqv); + if (request_payload_recv == nullptr) { + // The transport has received the trailing metadata. + break; + } + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, "hello world")); + grpc_byte_buffer_destroy(request_payload_recv); + num_messages_received++; + } + GPR_ASSERT(num_messages_received == num_messages); + if (!seen_status) { + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + } + GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + + grpc_slice_unref(response_payload_slice); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_slice_unref(details); + + end_test(&f); + config.tear_down_data(&f); +} + +void server_streaming(grpc_end2end_test_config config) { + test_server_streaming(config, 0); + test_server_streaming(config, 1); + test_server_streaming(config, 10); +} + +void server_streaming_pre_init(void) {} diff --git a/test/core/end2end/tests/streaming_error_response.cc b/test/core/end2end/tests/streaming_error_response.cc index 8d195be6abf..0502bba15cb 100644 --- a/test/core/end2end/tests/streaming_error_response.cc +++ b/test/core/end2end/tests/streaming_error_response.cc @@ -88,7 +88,10 @@ static void end_test(grpc_end2end_test_fixture* f) { grpc_completion_queue_destroy(f->shutdown_cq); } -// Client sends a request with payload, server reads then returns status. +// Client sends a request with payload, potentially requesting status early. The +// server reads and streams responses. The client cancels the RPC to get an +// error status. (Server sending a non-OK status is not considered an error +// status.) static void test(grpc_end2end_test_config config, bool request_status_early, bool recv_message_separately) { grpc_call* c; @@ -221,17 +224,13 @@ static void test(grpc_end2end_test_config config, bool request_status_early, cq_verify(cqv); } + // Cancel the call so that the client sets up an error status. + grpc_call_cancel(c, nullptr); memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; - op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op->data.send_status_from_server.trailing_metadata_count = 0; - op->data.send_status_from_server.status = GRPC_STATUS_FAILED_PRECONDITION; - grpc_slice status_details = grpc_slice_from_static_string("xyz"); - op->data.send_status_from_server.status_details = &status_details; - op++; error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(104), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); @@ -261,10 +260,8 @@ static void test(grpc_end2end_test_config config, bool request_status_early, GPR_ASSERT(response_payload2_recv != nullptr); } - GPR_ASSERT(status == GRPC_STATUS_FAILED_PRECONDITION); - GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); - GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(status == GRPC_STATUS_CANCELLED); + GPR_ASSERT(was_cancelled == 1); grpc_slice_unref(details); grpc_metadata_array_destroy(&initial_metadata_recv);