Cleanup, debug support

reviewable/pr8008/r2
Craig Tiller 9 years ago
parent fd4c6471ce
commit bd37a21c26
  1. 98
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 29
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 3
      src/core/ext/transport/chttp2/transport/hpack_parser.c
  4. 11
      src/core/ext/transport/chttp2/transport/internal.h
  5. 2
      src/core/ext/transport/chttp2/transport/stream_map.c
  6. 5
      test/cpp/end2end/thread_stress_test.cc
  7. 3
      test/cpp/qps/client_sync.cc

@ -447,7 +447,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
s->in_stream_map = true;
}
GPR_TIMER_END("init_stream", 0);
@ -464,7 +463,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_BEGIN("destroy_stream", 0);
GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0);
GPR_ASSERT(!s->in_stream_map);
if (s->id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
}
@ -540,16 +538,23 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
static const char *write_state_name(grpc_chttp2_write_state st) {
switch (st) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: return "IDLE";
case GRPC_CHTTP2_WRITE_STATE_WRITING: return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: return "WRITING+MORE";
case GRPC_CHTTP2_WRITE_STATE_IDLE:
return "IDLE";
case GRPC_CHTTP2_WRITE_STATE_WRITING:
return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
return "WRITING+MORE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
static void set_write_state(grpc_chttp2_transport *t, grpc_chttp2_write_state st) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t, t->is_client ? "CLIENT" : "SERVER", write_state_name(t->write_state), write_state_name(st)));
t->write_state = st;
static void set_write_state(grpc_chttp2_transport *t,
grpc_chttp2_write_state st) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t,
t->is_client ? "CLIENT" : "SERVER",
write_state_name(t->write_state),
write_state_name(st)));
t->write_state = st;
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
@ -718,7 +723,6 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
s->in_stream_map = true;
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
@ -740,7 +744,8 @@ static grpc_closure *add_closure_barrier(grpc_closure *closure) {
return closure;
}
static void run_closure_and_null(grpc_exec_ctx *exec_ctx, grpc_closure **closure, grpc_error *error) {
static void null_then_run_closure(grpc_exec_ctx *exec_ctx,
grpc_closure **closure, grpc_error *error) {
grpc_closure *c = *closure;
*closure = NULL;
grpc_closure_run(exec_ctx, c, error);
@ -805,7 +810,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
ssize_t notify_offset = s->fetching_slice_end_offset;
if (notify_offset <= 0) {
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished");
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
"fetching_send_message_finished");
} else {
grpc_chttp2_write_cb *cb = t->write_cb_pool;
if (cb == NULL) {
@ -947,7 +953,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE(
"Attempt to send initial metadata after stream was closed"), "send_initial_metadata_finished");
"Attempt to send initial metadata after stream was closed"),
"send_initial_metadata_finished");
}
}
}
@ -957,7 +964,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished,
GRPC_ERROR_CREATE("Attempt to send message after stream was closed"), "fetching_send_message_finished");
GRPC_ERROR_CREATE("Attempt to send message after stream was closed"),
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr =
@ -1015,7 +1023,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_metadata_batch_is_empty(op->send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
"stream was closed"), "send_trailing_metadata_finished");
"stream was closed"),
"send_trailing_metadata_finished");
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
@ -1193,16 +1202,19 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0]) {
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
NULL) {
gpr_log(GPR_DEBUG, "discard %p", bs);
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
s->recv_initial_metadata);
run_closure_and_null(exec_ctx, &s->recv_initial_metadata_ready, GRPC_ERROR_NONE);
null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
}
@ -1214,16 +1226,22 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
while (s->final_metadata_requested && s->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
NULL) {
gpr_log(GPR_DEBUG, "discard %p", bs);
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (s->incoming_frames.head != NULL) {
*s->recv_message =
grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
GPR_ASSERT(*s->recv_message != NULL);
run_closure_and_null(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1]) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
const char *m = grpc_error_string(s->read_closed_error);
gpr_log(GPR_ERROR, "publish null :: %s", m);
abort();
grpc_error_free_string(m);
*s->recv_message = NULL;
run_closure_and_null(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
}
}
@ -1238,14 +1256,17 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
if (s->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
NULL) {
gpr_log(GPR_DEBUG, "discard %p", bs);
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
}
if (s->all_incoming_byte_streams_finished && s->recv_trailing_metadata_finished != NULL) {
if (s->all_incoming_byte_streams_finished &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, "recv_trailing_metadata_finished");
exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
"recv_trailing_metadata_finished");
}
}
}
@ -1262,7 +1283,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
GPR_ASSERT(s);
s->in_stream_map = false;
if (t->incoming_stream == s) {
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
@ -1359,7 +1379,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
to the upper layers - drop what we've got, and then publish
what we want - which is safe because we haven't told anyone
about the metadata yet */
if (!s->published_metadata[1] || s->recv_trailing_metadata_finished != NULL) {
if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
grpc_chttp2_incoming_metadata_buffer_add(
@ -1373,7 +1394,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_MDSTR_GRPC_MESSAGE,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
s->published_metadata[1] = true;
gpr_log(GPR_DEBUG, "published_metadata from fake");
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (slice) {
@ -1413,20 +1435,21 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
error = removal_error(error, s);
s->fetching_send_message = NULL;
grpc_chttp2_complete_closure_step(exec_ctx, t, s,
&s->send_initial_metadata_finished,
GRPC_ERROR_REF(error), "send_initial_metadata_finished");
grpc_chttp2_complete_closure_step(exec_ctx, t, s,
&s->send_trailing_metadata_finished,
GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
grpc_chttp2_complete_closure_step(exec_ctx, t, s,
&s->fetching_send_message_finished,
GRPC_ERROR_REF(error), "fetching_send_message_finished");
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
"send_initial_metadata_finished");
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
while (s->on_write_finished_cbs) {
grpc_chttp2_write_cb *cb = s->on_write_finished_cbs;
s->on_write_finished_cbs = cb->next;
grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
GRPC_ERROR_REF(error), "on_write_finished_cb");
GRPC_ERROR_REF(error),
"on_write_finished_cb");
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
}
@ -1445,8 +1468,11 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (close_reads && !s->read_closed) {
s->read_closed_error = GRPC_ERROR_REF(error);
s->read_closed = true;
s->published_metadata[0] = true;
s->published_metadata[1] = true;
for (int i = 0; i < 2; i++) {
if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);

@ -140,23 +140,17 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
gpr_slice slice) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
grpc_chttp2_data_parser *p = parser;
uint32_t message_flags;
grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
char *msg;
if (is_last && p->is_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
if (cur == end) {
return GRPC_ERROR_NONE;
}
@ -272,3 +266,18 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
grpc_chttp2_data_parser *p = parser;
grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
if (is_last && p->is_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
return error;
}

@ -1607,7 +1607,8 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
if (s->header_frames_received == GPR_ARRAY_SIZE(s->metadata_buffer)) {
return GRPC_ERROR_CREATE("Too many trailer frames");
}
s->published_metadata[s->header_frames_received] = true;
s->published_metadata[s->header_frames_received] =
GRPC_METADATA_PUBLISHED_FROM_WIRE;
maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s);
s->header_frames_received++;
}

@ -317,6 +317,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
};
typedef enum {
GRPC_METADATA_NOT_PUBLISHED,
GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
GRPC_METADATA_PUBLISHED_FROM_WIRE,
GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@ -370,8 +377,6 @@ struct grpc_chttp2_stream {
bool read_closed;
/** Are all published incoming byte streams closed. */
bool all_incoming_byte_streams_finished;
/** Is this stream in the stream map. */
bool in_stream_map;
/** Has this stream seen an error.
If true, then pending incoming frames can be thrown away. */
bool seen_error;
@ -381,7 +386,7 @@ struct grpc_chttp2_stream {
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
bool published_metadata[2];
grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested;
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];

@ -77,6 +77,7 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key,
GPR_ASSERT(count == 0 || keys[count - 1] < key);
GPR_ASSERT(value);
GPR_ASSERT(grpc_chttp2_stream_map_find(map, key) == NULL);
if (count == capacity) {
if (map->free > capacity / 4) {
@ -170,6 +171,7 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key) {
if (map->free == map->count) {
map->free = map->count = 0;
}
GPR_ASSERT(grpc_chttp2_stream_map_find(map, key) == NULL);
}
return out;
}

@ -339,7 +339,10 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), s.error_message().c_str());
}
ASSERT_TRUE(s.ok());
}
}

@ -130,6 +130,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
entry->set_value((UsageTimer::Now() - start) * 1e9);
if (!s.ok()) {
gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), s.error_message().c_str());
}
return s.ok();
}
};

Loading…
Cancel
Save