Don't consider receiving non-OK status as an error for HTTP2

pull/19545/head
Yash Tibrewal 4 years ago
parent 6b61cb57fe
commit 4331328fc2
  1. 2
      CMakeLists.txt
  2. 2
      Makefile
  3. 2
      build_autogenerated.yaml
  4. 5
      doc/status_ordering.md
  5. 1
      gRPC-Core.podspec
  6. 2
      grpc.gyp
  7. 29
      src/core/ext/transport/chttp2/transport/parsing.cc
  8. 150
      test/core/end2end/cq_verifier.cc
  9. 5
      test/core/end2end/cq_verifier.h
  10. 8
      test/core/end2end/end2end_nosec_tests.cc
  11. 8
      test/core/end2end/end2end_tests.cc
  12. 1
      test/core/end2end/generate_tests.bzl
  13. 280
      test/core/end2end/tests/server_streaming.cc
  14. 19
      test/core/end2end/tests/streaming_error_response.cc

@ -1067,6 +1067,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/server_finishes_request.cc
test/core/end2end/tests/server_streaming.cc
test/core/end2end/tests/shutdown_finishes_calls.cc
test/core/end2end/tests/shutdown_finishes_tags.cc
test/core/end2end/tests/simple_cacheable_request.cc
@ -1200,6 +1201,7 @@ add_library(end2end_tests
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/server_finishes_request.cc
test/core/end2end/tests/server_streaming.cc
test/core/end2end/tests/shutdown_finishes_calls.cc
test/core/end2end/tests/shutdown_finishes_tags.cc
test/core/end2end/tests/simple_cacheable_request.cc

@ -3349,6 +3349,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/retry_throttled.cc \
test/core/end2end/tests/retry_too_many_attempts.cc \
test/core/end2end/tests/server_finishes_request.cc \
test/core/end2end/tests/server_streaming.cc \
test/core/end2end/tests/shutdown_finishes_calls.cc \
test/core/end2end/tests/shutdown_finishes_tags.cc \
test/core/end2end/tests/simple_cacheable_request.cc \
@ -3462,6 +3463,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/retry_throttled.cc \
test/core/end2end/tests/retry_too_many_attempts.cc \
test/core/end2end/tests/server_finishes_request.cc \
test/core/end2end/tests/server_streaming.cc \
test/core/end2end/tests/shutdown_finishes_calls.cc \
test/core/end2end/tests/shutdown_finishes_tags.cc \
test/core/end2end/tests/simple_cacheable_request.cc \

@ -100,6 +100,7 @@ libs:
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/server_finishes_request.cc
- test/core/end2end/tests/server_streaming.cc
- test/core/end2end/tests/shutdown_finishes_calls.cc
- test/core/end2end/tests/shutdown_finishes_tags.cc
- test/core/end2end/tests/simple_cacheable_request.cc
@ -209,6 +210,7 @@ libs:
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/server_finishes_request.cc
- test/core/end2end/tests/server_streaming.cc
- test/core/end2end/tests/shutdown_finishes_calls.cc
- test/core/end2end/tests/shutdown_finishes_tags.cc
- test/core/end2end/tests/simple_cacheable_request.cc

@ -3,11 +3,12 @@ Ordering Status and Reads in the gRPC API
Rules for implementors:
1. Reads and Writes Must not succeed after Status has been delivered.
2. OK Status is only delivered after all buffered messages are read.
2. Status is only delivered after all buffered messages are read.
3. Reads May continue to succeed after a failing write.
However, once a write fails, all subsequent writes Must fail,
and similarly, once a read fails, all subsequent reads Must fail.
4. When an error status is known to the library, if the user asks for status,
4. A non-OK status received from the server is not considered an error status.
5. When an error status is known to the library, if the user asks for status,
the library Should discard messages received in the library but not delivered
to the user and then deliver the status. If the user does not ask for status
but continues reading, the library Should deliver buffered messages before

@ -1665,6 +1665,7 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',
'test/core/end2end/tests/shutdown_finishes_calls.cc',
'test/core/end2end/tests/shutdown_finishes_tags.cc',
'test/core/end2end/tests/simple_cacheable_request.cc',

@ -252,6 +252,7 @@
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',
'test/core/end2end/tests/shutdown_finishes_calls.cc',
'test/core/end2end/tests/shutdown_finishes_tags.cc',
'test/core/end2end/tests/simple_cacheable_request.cc',
@ -354,6 +355,7 @@
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',
'test/core/end2end/tests/shutdown_finishes_calls.cc',
'test/core/end2end/tests/shutdown_finishes_tags.cc',
'test/core/end2end/tests/simple_cacheable_request.cc',

@ -391,27 +391,6 @@ static bool md_key_cmp(grpc_mdelem md, const grpc_slice& reference) {
return GRPC_MDKEY(md).refcount == reference.refcount;
}
static bool md_cmp(grpc_mdelem md, grpc_mdelem ref_md,
const grpc_slice& ref_key) {
if (GPR_LIKELY(GRPC_MDELEM_IS_INTERNED(md))) {
return md.payload == ref_md.payload;
}
if (md_key_cmp(md, ref_key)) {
return grpc_slice_eq_static_interned(GRPC_MDVALUE(md),
GRPC_MDVALUE(ref_md));
}
return false;
}
static bool is_nonzero_status(grpc_mdelem md) {
// If md.payload == GRPC_MDELEM_GRPC_STATUS_1 or GRPC_MDELEM_GRPC_STATUS_2,
// then we have seen an error. In fact, if it is a GRPC_STATUS and it's
// not equal to GRPC_MDELEM_GRPC_STATUS_0, then we have seen an error.
// TODO(ctiller): check for a status like " 0"
return md_key_cmp(md, GRPC_MDSTR_GRPC_STATUS) &&
!md_cmp(md, GRPC_MDELEM_GRPC_STATUS_0, GRPC_MDSTR_GRPC_STATUS);
}
static void GPR_ATTRIBUTE_NOINLINE on_initial_header_log(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_mdelem md) {
char* key = grpc_slice_to_c_string(GRPC_MDKEY(md));
@ -493,9 +472,7 @@ static grpc_error* on_initial_header(void* tp, grpc_mdelem md) {
on_initial_header_log(t, s, md);
}
if (is_nonzero_status(md)) { // not GRPC_MDELEM_GRPC_STATUS_0?
s->seen_error = true;
} else if (md_key_cmp(md, GRPC_MDSTR_GRPC_TIMEOUT)) {
if (md_key_cmp(md, GRPC_MDSTR_GRPC_TIMEOUT)) {
return handle_timeout(s, md);
}
@ -534,10 +511,6 @@ static grpc_error* on_trailing_header(void* tp, grpc_mdelem md) {
gpr_free(value);
}
if (is_nonzero_status(md)) { // not GRPC_MDELEM_GRPC_STATUS_0?
s->seen_error = true;
}
const size_t new_size = s->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md);
const size_t metadata_size_limit =
t->settings[GRPC_ACKED_SETTINGS]

@ -1,20 +1,20 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "test/core/end2end/cq_verifier.h"
@ -23,6 +23,7 @@
#include <stdio.h>
#include <string.h>
#include <list>
#include <string>
#include <vector>
@ -42,7 +43,7 @@
#define ROOT_EXPECTATION 1000
/* a set of metadata we expect to find on an event */
// a set of metadata we expect to find on an event
typedef struct metadata {
size_t count;
size_t cap;
@ -50,36 +51,46 @@ typedef struct metadata {
char** values;
} metadata;
/* details what we expect to find on a single event - and forms a linked
list to detail other expectations */
typedef struct expectation {
struct expectation* next;
// details what we expect to find on a single event
struct Expectation {
Expectation(const char* f, int l, grpc_completion_type t, void* tag_arg,
bool check_success_arg, int success_arg, bool* seen_arg)
: file(f),
line(l),
type(t),
tag(tag_arg),
check_success(check_success_arg),
success(success_arg),
seen(seen_arg) {}
const char* file;
int line;
grpc_completion_type type;
void* tag;
bool check_success;
int success;
} expectation;
bool* seen;
};
/* the verifier itself */
// the verifier itself
struct cq_verifier {
/* bound completion queue */
// bound completion queue
grpc_completion_queue* cq;
/* start of expectation list */
expectation* first_expectation;
// expectation list
std::list<Expectation> expectations;
// maybe expectation list
std::list<Expectation> maybe_expectations;
};
// TODO(yashykt): Convert this to constructor/destructor pair
cq_verifier* cq_verifier_create(grpc_completion_queue* cq) {
cq_verifier* v = static_cast<cq_verifier*>(gpr_malloc(sizeof(cq_verifier)));
cq_verifier* v = new cq_verifier;
v->cq = cq;
v->first_expectation = nullptr;
return v;
}
void cq_verifier_destroy(cq_verifier* v) {
cq_verify(v);
gpr_free(v);
delete v;
}
static int has_metadata(const grpc_metadata* md, size_t count, const char* key,
@ -179,7 +190,7 @@ static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; }
namespace {
std::string ExpectationString(const expectation& e) {
std::string ExpectationString(const Expectation& e) {
std::string out;
if (is_probably_integer(e.tag)) {
out = absl::StrFormat("tag(%" PRIdPTR ") ", (intptr_t)e.tag);
@ -202,8 +213,8 @@ std::string ExpectationString(const expectation& e) {
std::string ExpectationsString(const cq_verifier& v) {
std::vector<std::string> expectations;
for (expectation* e = v.first_expectation; e != nullptr; e = e->next) {
expectations.push_back(ExpectationString(*e));
for (const auto& e : v.expectations) {
expectations.push_back(ExpectationString(e));
}
return absl::StrJoin(expectations, "\n");
}
@ -216,13 +227,13 @@ static void fail_no_event_received(cq_verifier* v) {
abort();
}
static void verify_matches(expectation* e, grpc_event* ev) {
GPR_ASSERT(e->type == ev->type);
switch (e->type) {
static void verify_matches(const Expectation& e, const grpc_event& ev) {
GPR_ASSERT(e.type == ev.type);
switch (e.type) {
case GRPC_OP_COMPLETE:
if (e->check_success && e->success != ev->success) {
if (e.check_success && e.success != ev.success) {
gpr_log(GPR_ERROR, "actual success does not match expected: %s",
ExpectationString(*e).c_str());
ExpectationString(e).c_str());
abort();
}
break;
@ -237,33 +248,38 @@ static void verify_matches(expectation* e, grpc_event* ev) {
}
}
// Try to find the event in the expectations list
bool FindExpectations(std::list<Expectation>* expectations,
const grpc_event& ev) {
for (auto e = expectations->begin(); e != expectations->end(); ++e) {
if (e->tag == ev.tag) {
verify_matches(*e, ev);
if (e->seen != nullptr) {
*(e->seen) = true;
}
expectations->erase(e);
return true;
}
}
return false;
}
void cq_verify(cq_verifier* v, int timeout_sec) {
const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec);
while (v->first_expectation != nullptr) {
while (!v->expectations.empty()) {
grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
fail_no_event_received(v);
break;
}
expectation* e;
expectation* prev = nullptr;
for (e = v->first_expectation; e != nullptr; e = e->next) {
if (e->tag == ev.tag) {
verify_matches(e, &ev);
if (e == v->first_expectation) v->first_expectation = e->next;
if (prev != nullptr) prev->next = e->next;
gpr_free(e);
break;
}
prev = e;
}
if (e == nullptr) {
gpr_log(GPR_ERROR, "cq returned unexpected event: %s",
grpc_event_string(&ev).c_str());
gpr_log(GPR_ERROR, "expected tags:\n%s", ExpectationsString(*v).c_str());
abort();
}
if (FindExpectations(&v->expectations, ev)) continue;
if (FindExpectations(&v->maybe_expectations, ev)) continue;
gpr_log(GPR_ERROR, "cq returned unexpected event: %s",
grpc_event_string(&ev).c_str());
gpr_log(GPR_ERROR, "expected tags:\n%s", ExpectationsString(*v).c_str());
abort();
}
v->maybe_expectations.clear();
}
void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) {
@ -272,8 +288,7 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) {
gpr_time_from_seconds(timeout_sec, GPR_TIMESPAN));
grpc_event ev;
GPR_ASSERT(v->first_expectation == nullptr &&
"expectation queue must be empty");
GPR_ASSERT(v->expectations.empty() && "expectation queue must be empty");
ev = grpc_completion_queue_next(v->cq, deadline, nullptr);
if (ev.type != GRPC_QUEUE_TIMEOUT) {
@ -285,18 +300,17 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) {
void cq_verify_empty(cq_verifier* v) { cq_verify_empty_timeout(v, 1); }
void cq_maybe_expect_completion(cq_verifier* v, const char* file, int line,
void* tag, bool success, bool* seen) {
v->maybe_expectations.emplace_back(file, line, GRPC_OP_COMPLETE, tag, true,
true, seen);
}
static void add(cq_verifier* v, const char* file, int line,
grpc_completion_type type, void* tag, bool check_success,
bool success) {
expectation* e = static_cast<expectation*>(gpr_malloc(sizeof(expectation)));
e->type = type;
e->file = file;
e->line = line;
e->tag = tag;
e->check_success = check_success;
e->success = success;
e->next = v->first_expectation;
v->first_expectation = e;
v->expectations.emplace_back(file, line, type, tag, check_success, success,
nullptr);
}
void cq_expect_completion(cq_verifier* v, const char* file, int line, void* tag,

@ -49,10 +49,15 @@ void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec);
the event. */
void cq_expect_completion(cq_verifier* v, const char* file, int line, void* tag,
bool success);
/* If the \a tag is seen, \a seen is set to true. */
void cq_maybe_expect_completion(cq_verifier* v, const char* file, int line,
void* tag, bool success, bool* seen);
void cq_expect_completion_any_status(cq_verifier* v, const char* file, int line,
void* tag);
#define CQ_EXPECT_COMPLETION(v, tag, success) \
cq_expect_completion(v, __FILE__, __LINE__, tag, success)
#define CQ_MAYBE_EXPECT_COMPLETION(v, tag, success, seen) \
cq_maybe_expect_completion(v, __FILE__, __LINE__, tag, success, seen)
#define CQ_EXPECT_COMPLETION_ANY_STATUS(v, tag) \
cq_expect_completion_any_status(v, __FILE__, __LINE__, tag)

@ -157,6 +157,8 @@ extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void server_finishes_request(grpc_end2end_test_config config);
extern void server_finishes_request_pre_init(void);
extern void server_streaming(grpc_end2end_test_config config);
extern void server_streaming_pre_init(void);
extern void shutdown_finishes_calls(grpc_end2end_test_config config);
extern void shutdown_finishes_calls_pre_init(void);
extern void shutdown_finishes_tags(grpc_end2end_test_config config);
@ -253,6 +255,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
server_finishes_request_pre_init();
server_streaming_pre_init();
shutdown_finishes_calls_pre_init();
shutdown_finishes_tags_pre_init();
simple_cacheable_request_pre_init();
@ -340,6 +343,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_throttled(config);
retry_too_many_attempts(config);
server_finishes_request(config);
server_streaming(config);
shutdown_finishes_calls(config);
shutdown_finishes_tags(config);
simple_cacheable_request(config);
@ -614,6 +618,10 @@ void grpc_end2end_tests(int argc, char **argv,
server_finishes_request(config);
continue;
}
if (0 == strcmp("server_streaming", argv[i])) {
server_streaming(config);
continue;
}
if (0 == strcmp("shutdown_finishes_calls", argv[i])) {
shutdown_finishes_calls(config);
continue;

@ -159,6 +159,8 @@ extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void server_finishes_request(grpc_end2end_test_config config);
extern void server_finishes_request_pre_init(void);
extern void server_streaming(grpc_end2end_test_config config);
extern void server_streaming_pre_init(void);
extern void shutdown_finishes_calls(grpc_end2end_test_config config);
extern void shutdown_finishes_calls_pre_init(void);
extern void shutdown_finishes_tags(grpc_end2end_test_config config);
@ -256,6 +258,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
server_finishes_request_pre_init();
server_streaming_pre_init();
shutdown_finishes_calls_pre_init();
shutdown_finishes_tags_pre_init();
simple_cacheable_request_pre_init();
@ -344,6 +347,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_throttled(config);
retry_too_many_attempts(config);
server_finishes_request(config);
server_streaming(config);
shutdown_finishes_calls(config);
shutdown_finishes_tags(config);
simple_cacheable_request(config);
@ -622,6 +626,10 @@ void grpc_end2end_tests(int argc, char **argv,
server_finishes_request(config);
continue;
}
if (0 == strcmp("server_streaming", argv[i])) {
server_streaming(config);
continue;
}
if (0 == strcmp("shutdown_finishes_calls", argv[i])) {
shutdown_finishes_calls(config);
continue;

@ -343,6 +343,7 @@ END2END_TESTS = {
proxyable = False,
),
"server_finishes_request": _test_options(),
"server_streaming": _test_options(needs_http2 = True),
"shutdown_finishes_calls": _test_options(),
"shutdown_finishes_tags": _test_options(),
"simple_cacheable_request": _test_options(),

@ -0,0 +1,280 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "test/core/end2end/cq_verifier.h"
static void* tag(intptr_t t) { return (void*)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
/* Client requests status along with the initial metadata. Server streams
* messages and ends with a non-OK status. Client reads after server is done
* writing, and expects to get the status after the messages. */
static void test_server_streaming(grpc_end2end_test_config config,
int num_messages) {
grpc_end2end_test_fixture f =
begin_test(config, "test_server_streaming", nullptr, nullptr);
grpc_call* c;
grpc_call* s;
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
grpc_byte_buffer* request_payload_recv;
grpc_byte_buffer* response_payload;
grpc_slice response_payload_slice =
grpc_slice_from_copied_string("hello world");
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"), nullptr,
deadline, nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
// Client requests status early but should not receive status till all the
// messages are received.
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Client sends close early
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(3),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
cq_verify(cqv);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(100));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(100), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(101),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
// Server writes bunch of messages
for (int i = 0; i < num_messages; i++) {
response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
tag(103), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
cq_verify(cqv);
grpc_byte_buffer_destroy(response_payload);
}
// Server sends status
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(104),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
bool seen_status = false;
CQ_MAYBE_EXPECT_COMPLETION(cqv, tag(1), true, &seen_status);
CQ_EXPECT_COMPLETION(cqv, tag(104), 1);
cq_verify(cqv);
// Client keeps reading messages till it gets the status
int num_messages_received = 0;
while (true) {
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
tag(102), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_MAYBE_EXPECT_COMPLETION(cqv, tag(1), true, &seen_status);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
cq_verify(cqv);
if (request_payload_recv == nullptr) {
// The transport has received the trailing metadata.
break;
}
GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, "hello world"));
grpc_byte_buffer_destroy(request_payload_recv);
num_messages_received++;
}
GPR_ASSERT(num_messages_received == num_messages);
if (!seen_status) {
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
}
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
grpc_slice_unref(response_payload_slice);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_slice_unref(details);
end_test(&f);
config.tear_down_data(&f);
}
void server_streaming(grpc_end2end_test_config config) {
test_server_streaming(config, 0);
test_server_streaming(config, 1);
test_server_streaming(config, 10);
}
void server_streaming_pre_init(void) {}

@ -88,7 +88,10 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Client sends a request with payload, server reads then returns status.
// Client sends a request with payload, potentially requesting status early. The
// server reads and streams responses. The client cancels the RPC to get an
// error status. (Server sending a non-OK status is not considered an error
// status.)
static void test(grpc_end2end_test_config config, bool request_status_early,
bool recv_message_separately) {
grpc_call* c;
@ -221,17 +224,13 @@ static void test(grpc_end2end_test_config config, bool request_status_early,
cq_verify(cqv);
}
// Cancel the call so that the client sets up an error status.
grpc_call_cancel(c, nullptr);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_FAILED_PRECONDITION;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(104),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
@ -261,10 +260,8 @@ static void test(grpc_end2end_test_config config, bool request_status_early,
GPR_ASSERT(response_payload2_recv != nullptr);
}
GPR_ASSERT(status == GRPC_STATUS_FAILED_PRECONDITION);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(status == GRPC_STATUS_CANCELLED);
GPR_ASSERT(was_cancelled == 1);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);

Loading…
Cancel
Save