Mechanism for reporting stats up

pull/5699/head
Craig Tiller 9 years ago
parent e2d6a61e35
commit 466129eb24
  1. 5
      src/core/surface/call.c
  2. 4
      src/core/transport/chttp2/internal.h
  3. 8
      src/core/transport/chttp2/writing.c
  4. 51
      src/core/transport/chttp2_transport.c
  5. 21
      src/core/transport/transport.c
  6. 33
      src/core/transport/transport.h

@ -174,6 +174,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
/* Call stats: only valid after trailing metadata received */
grpc_transport_stream_stats stats;
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
@ -1371,6 +1374,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op.collect_stats = &call->stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@ -1392,6 +1396,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op.collect_stats = &call->stats;
break;
}
}

@ -394,6 +394,9 @@ typedef struct {
grpc_metadata_batch *recv_trailing_metadata;
grpc_closure *recv_trailing_metadata_finished;
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
/** when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
@ -635,6 +638,7 @@ void grpc_chttp2_parsing_become_skip_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_closure **pclosure, int success);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

@ -328,17 +328,19 @@ void grpc_chttp2_cleanup_writing(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 1);
exec_ctx, stream_global,
&stream_global->send_initial_metadata_finished, 1);
}
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_message_finished, 1);
exec_ctx, stream_global, &stream_global->send_message_finished, 1);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished, 1);
exec_ctx, stream_global,
&stream_global->send_trailing_metadata_finished, 1);
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,

@ -758,23 +758,35 @@ static void maybe_start_some_streams(
}
}
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
closure->final_data += 2;
closure->final_data += CLOSURE_BARRIER_FIRST_REF_BIT;
return closure;
}
void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_closure **pclosure, int success) {
grpc_closure *closure = *pclosure;
if (closure == NULL) {
return;
}
closure->final_data -= 2;
closure->final_data -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (!success) {
closure->final_data |= 1;
closure->final_data |= CLOSURE_BARRIER_FAILURE_BIT;
}
if (closure->final_data < 2) {
grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL);
if (closure->final_data < CLOSURE_BARRIER_FIRST_REF_BIT) {
if (closure->final_data & CLOSURE_BARRIER_STATS_BIT) {
grpc_transport_move_stats(&stream_global->stats,
stream_global->collecting_stats);
stream_global->collecting_stats = NULL;
}
grpc_exec_ctx_enqueue(
exec_ctx, closure,
(closure->final_data & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL);
}
*pclosure = NULL;
}
@ -807,7 +819,13 @@ static void perform_stream_op_locked(
}
/* use final_data as a barrier until enqueue time; the inital counter is
dropped at the end of this function */
on_complete->final_data = 2;
on_complete->final_data = CLOSURE_BARRIER_FIRST_REF_BIT;
if (op->collect_stats != NULL) {
GPR_ASSERT(stream_global->collecting_stats == NULL);
stream_global->collecting_stats = op->collect_stats;
on_complete->final_data |= CLOSURE_BARRIER_STATS_BIT;
}
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(exec_ctx, transport_global, stream_global,
@ -840,7 +858,8 @@ static void perform_stream_op_locked(
}
} else {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 0);
exec_ctx, stream_global,
&stream_global->send_initial_metadata_finished, 0);
}
}
@ -850,7 +869,7 @@ static void perform_stream_op_locked(
stream_global->send_message_finished = add_closure_barrier(on_complete);
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_message_finished, 0);
exec_ctx, stream_global, &stream_global->send_message_finished, 0);
} else if (stream_global->id != 0) {
stream_global->send_message = op->send_message;
grpc_chttp2_become_writable(transport_global, stream_global);
@ -868,7 +887,8 @@ static void perform_stream_op_locked(
}
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished,
exec_ctx, stream_global,
&stream_global->send_trailing_metadata_finished,
grpc_metadata_batch_is_empty(op->send_trailing_metadata));
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
@ -907,7 +927,7 @@ static void perform_stream_op_locked(
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1);
grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1);
GPR_TIMER_END("perform_stream_op_locked", 0);
}
@ -1078,7 +1098,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
&stream_global->received_trailing_metadata,
stream_global->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->recv_trailing_metadata_finished, 1);
exec_ctx, stream_global,
&stream_global->recv_trailing_metadata_finished, 1);
}
}
}
@ -1177,10 +1198,12 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 0);
exec_ctx, stream_global, &stream_global->send_initial_metadata_finished,
0);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
grpc_chttp2_complete_closure_step(exec_ctx,
exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished,
0);
grpc_chttp2_complete_closure_step(exec_ctx, stream_global,
&stream_global->send_message_finished, 0);
}

@ -77,15 +77,22 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_closure_init(&refcount->destroy, cb, cb_arg);
}
static void one_way_stats_init(grpc_transport_one_way_stats *stats) {
gpr_stats_init(&stats->framing_bytes, 0);
gpr_stats_init(&stats->data_bytes, 0);
gpr_stats_init(&stats->header_bytes, 0);
static void move64(uint64_t *from, uint64_t *to) {
*to += *from;
*from = 0;
}
void grpc_transport_stream_stats_init(grpc_transport_stream_stats *stats) {
one_way_stats_init(&stats->incoming);
one_way_stats_init(&stats->outgoing);
void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
grpc_transport_one_way_stats *to) {
move64(&from->framing_bytes, &to->framing_bytes);
move64(&from->data_bytes, &to->data_bytes);
move64(&from->header_bytes, &to->header_bytes);
}
void grpc_transport_move_stats(grpc_transport_stream_stats *from,
grpc_transport_stream_stats *to) {
grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
}
size_t grpc_transport_stream_size(grpc_transport *transport) {

@ -78,6 +78,23 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
typedef struct {
uint64_t framing_bytes;
uint64_t data_bytes;
uint64_t header_bytes;
} grpc_transport_one_way_stats;
typedef struct grpc_transport_stream_stats {
grpc_transport_one_way_stats incoming;
grpc_transport_one_way_stats outgoing;
} grpc_transport_stream_stats;
void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
grpc_transport_one_way_stats *to);
void grpc_transport_move_stats(grpc_transport_stream_stats *from,
grpc_transport_stream_stats *to);
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
@ -104,6 +121,9 @@ typedef struct grpc_transport_stream_op {
*/
grpc_metadata_batch *recv_trailing_metadata;
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
/** Should be enqueued when all requested operations (excluding recv_message
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
@ -154,19 +174,6 @@ typedef struct grpc_transport_op {
grpc_closure *send_ping;
} grpc_transport_op;
typedef struct {
gpr_stats_counter framing_bytes;
gpr_stats_counter data_bytes;
gpr_stats_counter header_bytes;
} grpc_transport_one_way_stats;
typedef struct grpc_transport_stream_stats {
grpc_transport_one_way_stats incoming;
grpc_transport_one_way_stats outgoing;
} grpc_transport_stream_stats;
void grpc_transport_stream_stats_init(grpc_transport_stream_stats *stats);
/* Returns the amount of memory required to store a grpc_stream for this
transport */
size_t grpc_transport_stream_size(grpc_transport *transport);

Loading…
Cancel
Save