Merge branch 'merge-parse' of github.com:ctiller/grpc into merge-parse

pull/7793/head
Craig Tiller 9 years ago
commit d2e161413d
  1. 1
      src/core/ext/transport/chttp2/transport/frame_data.c
  2. 2
      src/core/ext/transport/chttp2/transport/parsing.c
  3. 34
      src/core/lib/surface/completion_queue.c
  4. 3
      src/core/lib/surface/completion_queue.h
  5. 3
      src/core/lib/surface/init.c
  6. 2
      src/core/lib/transport/transport.h

@ -56,6 +56,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"), exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"),
1); 1);
} }
GRPC_ERROR_UNREF(parser->error);
} }
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,

@ -444,7 +444,7 @@ static grpc_error *init_data_frame_parser(
} else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) {
/* handle stream errors by closing the stream */ /* handle stream errors by closing the stream */
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, false, GRPC_ERROR_REF(err)); true, false, err);
gpr_slice_buffer_add( gpr_slice_buffer_add(
&transport_global->qbuf, &transport_global->qbuf,
grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id, grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id,

@ -39,6 +39,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset.h"
@ -50,6 +51,9 @@
#include "src/core/lib/surface/event_string.h" #include "src/core/lib/surface/event_string.h"
int grpc_trace_operation_failures; int grpc_trace_operation_failures;
#ifndef NDEBUG
int grpc_trace_pending_tags;
#endif
typedef struct { typedef struct {
grpc_pollset_worker **worker; grpc_pollset_worker **worker;
@ -338,6 +342,27 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
} }
#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue *cc) {
if (!grpc_trace_pending_tags) return;
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
char *s;
gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
static void dump_pending_tags(grpc_completion_queue *cc) {}
#endif
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) { gpr_timespec deadline, void *reserved) {
grpc_event ret; grpc_event ret;
@ -357,6 +382,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
reserved)); reserved));
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
dump_pending_tags(cc);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL}; cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL};
@ -400,6 +427,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break; break;
} }
first_loop = 0; first_loop = 0;
@ -425,6 +453,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_ERROR_UNREF(err); GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break; break;
} }
} }
@ -510,6 +539,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
} }
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
dump_pending_tags(cc);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag}; cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag};
@ -561,6 +592,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */ /* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break; break;
} }
now = gpr_now(GPR_CLOCK_MONOTONIC); now = gpr_now(GPR_CLOCK_MONOTONIC);
@ -569,6 +601,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break; break;
} }
first_loop = 0; first_loop = 0;
@ -594,6 +627,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GRPC_ERROR_UNREF(err); GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break; break;
} }
} }

@ -44,6 +44,9 @@
extern int grpc_cq_pluck_trace; extern int grpc_cq_pluck_trace;
extern int grpc_cq_event_timeout_trace; extern int grpc_cq_event_timeout_trace;
extern int grpc_trace_operation_failures; extern int grpc_trace_operation_failures;
#ifndef NDEBUG
extern int grpc_trace_pending_tags;
#endif
typedef struct grpc_cq_completion { typedef struct grpc_cq_completion {
/** user supplied tag */ /** user supplied tag */

@ -173,6 +173,9 @@ void grpc_init(void) {
// Default timeout trace to 1 // Default timeout trace to 1
grpc_cq_event_timeout_trace = 1; grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures); grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
#ifndef NDEBUG
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif
grpc_security_pre_init(); grpc_security_pre_init();
grpc_iomgr_init(); grpc_iomgr_init();
grpc_executor_init(); grpc_executor_init();

@ -55,7 +55,7 @@ typedef struct grpc_transport grpc_transport;
for a stream. */ for a stream. */
typedef struct grpc_stream grpc_stream; typedef struct grpc_stream grpc_stream;
#define GRPC_STREAM_REFCOUNT_DEBUG //#define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount { typedef struct grpc_stream_refcount {
gpr_refcount refs; gpr_refcount refs;

Loading…
Cancel
Save