Review feedback: bikeshedding round

reviewable/pr9949/r4
Craig Tiller 8 years ago
parent e198b71989
commit a0f3abd925
  1. 8
      src/core/ext/census/grpc_filter.c
  2. 56
      src/core/ext/client_channel/client_channel.c
  3. 4
      src/core/ext/client_channel/subchannel.c
  4. 2
      src/core/ext/client_channel/subchannel.h
  5. 10
      src/core/ext/load_reporting/load_reporting_filter.c
  6. 11
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  7. 14
      src/core/ext/transport/cronet/transport/cronet_transport.c
  8. 8
      src/core/lib/channel/channel_stack.c
  9. 8
      src/core/lib/channel/channel_stack.h
  10. 14
      src/core/lib/channel/compress_filter.c
  11. 6
      src/core/lib/channel/connected_channel.c
  12. 20
      src/core/lib/channel/deadline_filter.c
  13. 6
      src/core/lib/channel/deadline_filter.h
  14. 8
      src/core/lib/channel/http_client_filter.c
  15. 8
      src/core/lib/channel/http_server_filter.c
  16. 8
      src/core/lib/channel/message_size_filter.c
  17. 12
      src/core/lib/security/transport/client_auth_filter.c
  18. 6
      src/core/lib/security/transport/server_auth_filter.c
  19. 16
      src/core/lib/surface/call.c
  20. 8
      src/core/lib/surface/lame_client.c
  21. 10
      src/core/lib/surface/server.c
  22. 12
      src/core/lib/transport/transport.c
  23. 30
      src/core/lib/transport/transport.h
  24. 2
      src/core/lib/transport/transport_impl.h
  25. 6
      src/core/lib/transport/transport_op_string.c
  26. 10
      src/cpp/common/channel_filter.h
  27. 2
      test/core/channel/channel_stack_test.c
  28. 6
      test/core/end2end/tests/filter_causes_close.c
  29. 10
      test/cpp/microbenchmarks/bm_call_create.cc
  30. 18
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -74,7 +74,7 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
}
static void client_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_initial_metadata) {
@ -85,7 +85,7 @@ static void client_mutate_op(grpc_call_element *elem,
static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
client_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@ -104,7 +104,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
}
static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
@ -119,7 +119,7 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
/* TODO(ctiller): this code fails. I don't know why. I expect it's
incomplete, and someone should look at it soon.

@ -755,7 +755,7 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
grpc_transport_stream_op **waiting_ops;
grpc_transport_stream_op_batch **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@ -775,7 +775,7 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return scc == CANCELLED_CALL ? NULL : scc;
}
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
@ -791,7 +791,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
grpc_error *error) {
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
@ -804,7 +804,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
}
grpc_subchannel_call *call = GET_CALL(calld);
grpc_transport_stream_op **ops = calld->waiting_ops;
grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
if (call == CANCELLED_CALL) {
fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
@ -1052,8 +1052,8 @@ static bool pick_subchannel_locked(
return false;
}
static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
static void start_transport_stream_op_batch_locked_inner(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op_batch *op,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
@ -1062,7 +1062,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
/* need to recheck that another thread hasn't set the call */
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
/* early out */
return;
@ -1077,7 +1077,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
/* recurse to retry */
start_transport_stream_op_locked_inner(exec_ctx, op, elem);
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
} else {
@ -1099,7 +1099,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
}
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
/* early out */
@ -1143,13 +1143,13 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
}
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
start_transport_stream_op_locked_inner(exec_ctx, op, elem);
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
}
@ -1177,11 +1177,11 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
grpc_transport_stream_op *op = arg;
grpc_transport_stream_op_batch *op = arg;
grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
@ -1193,11 +1193,11 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
op->on_complete = &calld->on_complete;
}
start_transport_stream_op_locked_inner(exec_ctx, op, elem);
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op");
GPR_TIMER_END("start_transport_stream_op_locked", 0);
"start_transport_stream_op_batch");
GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
@ -1208,39 +1208,39 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
We use double-checked locking to initially see if initialization has been
performed. If it has not, we acquire the combiner and perform initialization.
If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void cc_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (call == CANCELLED_CALL) {
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
if (call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, call, op);
GPR_TIMER_END("cc_start_transport_stream_op", 0);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
/* we failed; lock and figure out what to do */
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
op->handler_private.extra_arg = elem;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,
start_transport_stream_op_locked, op,
start_transport_stream_op_batch_locked, op,
grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op", 0);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@ -1299,7 +1299,7 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
*/
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op,
cc_start_transport_stream_op_batch,
cc_start_transport_op,
sizeof(call_data),
cc_init_call_elem,

@ -749,11 +749,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, op);
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}

@ -157,7 +157,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op *op);
grpc_transport_stream_op_batch *op);
/** continue querying for peer */
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,

@ -183,10 +183,10 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void lr_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0);
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("lr_start_transport_stream_op_batch", 0);
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
@ -200,11 +200,11 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
grpc_call_next_op(exec_ctx, elem, op);
GPR_TIMER_END("lr_start_transport_stream_op", 0);
GPR_TIMER_END("lr_start_transport_stream_op_batch", 0);
}
const grpc_channel_filter grpc_load_reporting_filter = {
lr_start_transport_stream_op,
lr_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

@ -1140,13 +1140,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
grpc_transport_stream_op_batch *op = stream_op;
grpc_chttp2_stream *s = op->handler_private.extra_arg;
grpc_transport_stream_op_payload *op_payload = op->payload;
grpc_transport_stream_op_batch_payload *op_payload = op->payload;
grpc_chttp2_transport *t = s->t;
if (grpc_http_trace) {
char *str = grpc_transport_stream_op_string(op);
char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
@ -1374,13 +1374,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
grpc_stream *gs,
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
if (grpc_http_trace) {
char *str = grpc_transport_stream_op_string(op);
char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str);
gpr_free(str);
}

@ -172,7 +172,7 @@ struct op_state {
};
struct op_and_state {
grpc_transport_stream_op op;
grpc_transport_stream_op_batch op;
struct op_state state;
bool done;
struct stream_obj *s; /* Pointer back to the stream object */
@ -187,7 +187,7 @@ struct op_storage {
struct stream_obj {
gpr_arena *arena;
struct op_and_state *oas;
grpc_transport_stream_op *curr_op;
grpc_transport_stream_op_batch *curr_op;
grpc_cronet_transport *curr_ct;
grpc_stream *curr_gs;
bidirectional_stream *cbs;
@ -298,12 +298,12 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) {
/*
Add a new stream op to op storage.
*/
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op_batch *op) {
struct op_storage *storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
memset(&new_op->state, 0, sizeof(new_op->state));
new_op->s = s;
new_op->done = false;
@ -768,7 +768,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) {
Op Execution: Decide if one of the actions contained in the stream op can be
executed. This is the heart of the state machine.
*/
static bool op_can_be_run(grpc_transport_stream_op *curr_op,
static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
struct stream_obj *s, struct op_state *op_state,
enum e_op_id op_id) {
struct op_state *stream_state = &s->state;
@ -919,7 +919,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
*/
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
grpc_transport_stream_op *stream_op = &oas->op;
grpc_transport_stream_op_batch *stream_op = &oas->op;
struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
struct op_state *stream_state = &s->state;
@ -1301,7 +1301,7 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set) {}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
grpc_stream *gs, grpc_transport_stream_op_batch *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
header_has_authority(op->payload->send_initial_metadata

@ -246,9 +246,9 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
}
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op);
next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
@ -284,8 +284,8 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL);
grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
}

@ -112,9 +112,9 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx,
void (*start_transport_stream_op_batch)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op);
grpc_transport_stream_op_batch *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@ -281,7 +281,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_polling_entity *pollent);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op);
grpc_transport_stream_op_batch *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
@ -300,7 +300,7 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_transport_stream_op *op);
grpc_call_element *elem, grpc_transport_stream_op_batch *op);
void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,

@ -62,7 +62,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
grpc_transport_stream_op *send_op;
grpc_transport_stream_op_batch *send_op;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
@ -243,19 +243,19 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
}
}
static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void compress_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
if (op->send_initial_metadata) {
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
@ -270,7 +270,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
GPR_TIMER_END("compress_start_transport_stream_op", 0);
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@ -339,7 +339,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op,
compress_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

@ -62,9 +62,9 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void con_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@ -142,7 +142,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
const grpc_channel_info *channel_info) {}
const grpc_channel_filter grpc_connected_filter = {
con_start_transport_stream_op,
con_start_transport_stream_op_batch,
con_start_transport_op,
sizeof(call_data),
init_call_elem,

@ -134,7 +134,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
// Inject our own on_complete callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op* op) {
grpc_transport_stream_op_batch* op) {
deadline_state->next_on_complete = op->on_complete;
grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
grpc_schedule_on_exec_ctx);
@ -196,9 +196,9 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
start_timer_if_needed(exec_ctx, elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op(
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_transport_stream_op_batch* op) {
grpc_deadline_state* deadline_state = elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, deadline_state);
@ -261,10 +261,10 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
}
// Method for starting a call op for client filter.
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static void client_start_transport_stream_op_batch(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
grpc_transport_stream_op_batch* op) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, op);
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}
@ -282,9 +282,9 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
}
// Method for starting a call op for server filter.
static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static void server_start_transport_stream_op_batch(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_transport_stream_op_batch* op) {
server_call_data* calld = elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
@ -317,7 +317,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
}
const grpc_channel_filter grpc_client_deadline_filter = {
client_start_transport_stream_op,
client_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(base_call_data),
init_call_elem,
@ -332,7 +332,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
};
const grpc_channel_filter grpc_server_deadline_filter = {
server_start_transport_stream_op,
server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(server_call_data),
init_call_elem,

@ -83,15 +83,15 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline);
// To be called from the client-side filter's start_transport_stream_op()
// To be called from the client-side filter's start_transport_stream_op_batch()
// method. Ensures that the deadline timer is cancelled when the call
// is completed.
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
void grpc_deadline_state_client_start_transport_stream_op(
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op);
grpc_transport_stream_op_batch* op);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the

@ -63,7 +63,7 @@ typedef struct call_data {
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
grpc_transport_stream_op *send_op;
grpc_transport_stream_op_batch *send_op;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
@ -254,7 +254,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -422,12 +422,12 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("hc_start_transport_op", 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
} else {
call_data *calld = elem->call_data;
if (op->send_message && calld->send_message_blocked) {

@ -318,7 +318,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@ -341,7 +341,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
@ -377,7 +377,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
@ -385,7 +385,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
GPR_TIMER_BEGIN("hs_start_transport_op", 0);
hs_mutate_op(exec_ctx, elem, op);

@ -136,9 +136,9 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
}
// Start transport stream op.
static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static void start_transport_stream_op_batch(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_transport_stream_op_batch* op) {
call_data* calld = elem->call_data;
// Check max send message size.
if (op->send_message && calld->max_send_size >= 0 &&
@ -148,7 +148,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->payload->send_message.send_message->length,
calld->max_send_size);
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS,
@ -256,7 +256,7 @@ static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
}
const grpc_channel_filter grpc_message_size_filter = {
start_transport_stream_op,
start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

@ -64,7 +64,7 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
grpc_transport_stream_op op;
grpc_transport_stream_op_batch op;
uint8_t security_context_set;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
@ -108,7 +108,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
const char *error_details) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op *op = &calld->op;
grpc_transport_stream_op_batch *op = &calld->op;
grpc_metadata_batch *mdb;
size_t i;
reset_auth_metadata_context(&calld->auth_md_context);
@ -136,7 +136,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
if (error == GRPC_ERROR_NONE) {
grpc_call_next_op(exec_ctx, elem, op);
} else {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
}
}
@ -172,7 +172,7 @@ void build_auth_metadata_context(grpc_security_connector *sc,
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
@ -193,7 +193,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
calld->creds = grpc_composite_call_credentials_create(channel_call_creds,
ctx->creds, NULL);
if (calld->creds == NULL) {
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -244,7 +244,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
that is being sent or received. */
static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("auth_start_transport_op", 0);
/* grab pointers to our data from the call element */

@ -49,7 +49,7 @@ typedef struct call_data {
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
grpc_closure auth_on_recv;
grpc_transport_stream_op *transport_op;
grpc_transport_stream_op_batch *transport_op;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@ -172,7 +172,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
@ -194,7 +194,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
that is being sent or received. */
static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
set_recv_ops_md_callbacks(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}

@ -138,7 +138,7 @@ typedef struct batch_control {
grpc_error *errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
grpc_transport_stream_op op;
grpc_transport_stream_op_batch op;
} batch_control;
struct grpc_call {
@ -172,7 +172,7 @@ struct grpc_call {
bool has_initial_md_been_received;
batch_control active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_payload stream_op_payload;
grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
grpc_metadata_batch metadata_batch[2][2];
@ -243,7 +243,7 @@ int grpc_call_error_trace = 0;
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@ -540,12 +540,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
}
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
grpc_call_element *elem;
GPR_TIMER_BEGIN("execute_op", 0);
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
GPR_TIMER_END("execute_op", 0);
}
@ -598,7 +598,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
grpc_transport_stream_op *op = grpc_make_transport_stream_op(
grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
@ -1381,8 +1381,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->completion_data.notify_tag.is_closure =
(uint8_t)(is_notify_tag_closure != 0);
grpc_transport_stream_op *stream_op = &bctl->op;
grpc_transport_stream_op_payload *stream_op_payload =
grpc_transport_stream_op_batch *stream_op = &bctl->op;
grpc_transport_stream_op_batch_payload *stream_op_payload =
&call->stream_op_payload;
stream_op->covered_by_poller = true;

@ -80,9 +80,9 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void lame_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
@ -91,7 +91,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
fill_metadata(exec_ctx, elem,
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
@ -150,7 +150,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_lame_filter = {
lame_start_transport_stream_op,
lame_start_transport_stream_op_batch,
lame_start_transport_op,
sizeof(call_data),
init_call_elem,

@ -776,7 +776,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
}
static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
@ -792,9 +792,9 @@ static void server_mutate_op(grpc_call_element *elem,
}
}
static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
@ -958,7 +958,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_server_top_filter = {
server_start_transport_stream_op,
server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

@ -170,7 +170,7 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
transport->vtable->perform_stream_op(exec_ctx, transport, stream, op);
}
@ -213,8 +213,8 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_endpoint(exec_ctx, transport);
}
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
void grpc_transport_stream_op_batch_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op_batch *op,
grpc_error *error) {
if (op->recv_message) {
grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
@ -258,8 +258,8 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
typedef struct {
grpc_closure outer_on_complete;
grpc_closure *inner_on_complete;
grpc_transport_stream_op op;
grpc_transport_stream_op_payload payload;
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload payload;
} made_transport_stream_op;
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
@ -270,7 +270,7 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_transport_stream_op_batch *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_zalloc(sizeof(*op));
op->op.payload = &op->payload;

@ -113,19 +113,19 @@ typedef struct {
grpc_closure closure;
} grpc_handler_private_op_data;
typedef struct grpc_transport_stream_op_payload
grpc_transport_stream_op_payload;
typedef struct grpc_transport_stream_op_batch_payload
grpc_transport_stream_op_batch_payload;
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
typedef struct grpc_transport_stream_op_batch {
/** Should be enqueued when all requested operations (excluding recv_message
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
grpc_closure *on_complete;
/** Values for the stream op (fields set are determined by flags above) */
grpc_transport_stream_op_payload *payload;
grpc_transport_stream_op_batch_payload *payload;
/** Is the completion of this op covered by a poller (if false: the op should
complete independently of some pollset being polled) */
@ -161,9 +161,9 @@ typedef struct grpc_transport_stream_op {
* current handler of the op */
grpc_handler_private_op_data handler_private;
} grpc_transport_stream_op;
} grpc_transport_stream_op_batch;
struct grpc_transport_stream_op_payload {
struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch *send_initial_metadata;
/** Iff send_initial_metadata != NULL, flags associated with
@ -289,11 +289,11 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_stream *stream,
grpc_closure *then_schedule_closure);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error);
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
grpc_error *error);
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op);
char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
@ -304,11 +304,12 @@ char *grpc_transport_op_string(grpc_transport_op *op);
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
op - a grpc_transport_stream_op specifying the op to perform */
op - a grpc_transport_stream_op_batch specifying the op to perform
*/
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream,
grpc_transport_stream_op *op);
grpc_transport_stream_op_batch *op);
void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
@ -340,9 +341,10 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
/* Allocate a grpc_transport_stream_op, and preconfigure the on_consumed closure
/* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed
closure
to \a on_consumed and then delete the returned transport op */
grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_transport_stream_op_batch *grpc_make_transport_stream_op(
grpc_closure *on_consumed);
#ifdef __cplusplus

@ -59,7 +59,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_perform_stream_op */
void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_transport_stream_op *op);
grpc_stream *stream, grpc_transport_stream_op_batch *op);
/* implementation of grpc_transport_perform_op */
void (*perform_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self,

@ -71,7 +71,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
}
}
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op) {
char *tmp;
char *out;
@ -208,8 +208,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_transport_stream_op *op) {
char *str = grpc_transport_stream_op_string(op);
grpc_call_element *elem, grpc_transport_stream_op_batch *op) {
char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}

@ -141,13 +141,13 @@ class TransportOp {
grpc_transport_op *op_; // Not owned.
};
/// A C++ wrapper for the \c grpc_transport_stream_op struct.
/// A C++ wrapper for the \c grpc_transport_stream_op_batch struct.
class TransportStreamOp {
public:
/// Borrows a pointer to \a op, but does NOT take ownership.
/// The caller must ensure that \a op continues to exist for as
/// long as the TransportStreamOp object does.
explicit TransportStreamOp(grpc_transport_stream_op *op)
explicit TransportStreamOp(grpc_transport_stream_op_batch *op)
: op_(op),
send_initial_metadata_(
op->send_initial_metadata
@ -166,7 +166,7 @@ class TransportStreamOp {
? op->payload->recv_trailing_metadata.recv_trailing_metadata
: nullptr) {}
grpc_transport_stream_op *op() const { return op_; }
grpc_transport_stream_op_batch *op() const { return op_; }
grpc_closure *on_complete() const { return op_->on_complete; }
void set_on_complete(grpc_closure *closure) { op_->on_complete = closure; }
@ -226,7 +226,7 @@ class TransportStreamOp {
}
private:
grpc_transport_stream_op *op_; // Not owned.
grpc_transport_stream_op_batch *op_; // Not owned.
MetadataBatch send_initial_metadata_;
MetadataBatch send_trailing_metadata_;
MetadataBatch recv_initial_metadata_;
@ -344,7 +344,7 @@ class ChannelFilter final {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
CallDataType *call_data = (CallDataType *)elem->call_data;
TransportStreamOp op_wrapper(op);
call_data->StartTransportStreamOp(exec_ctx, elem, &op_wrapper);

@ -73,7 +73,7 @@ static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
static void call_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
++*(int *)(elem->call_data);
}

@ -216,9 +216,9 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_STATUS_PERMISSION_DENIED));
}
static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
calld->recv_im_ready =
@ -249,7 +249,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
static const grpc_channel_filter test_filter = {
start_transport_stream_op,
start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

@ -221,7 +221,7 @@ namespace dummy_filter {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {}
grpc_transport_stream_op_batch *op) {}
static void StartTransportOp(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@ -296,7 +296,7 @@ void SetPollsetSet(grpc_exec_ctx *exec_ctx, grpc_transport *self,
/* implementation of grpc_transport_perform_stream_op */
void PerformStreamOp(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_transport_stream_op *op) {
grpc_stream *stream, grpc_transport_stream_op_batch *op) {
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
@ -368,8 +368,8 @@ class SendEmptyMetadata {
const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC);
const gpr_timespec start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_slice method_ = grpc_slice_from_static_string("/foo/bar");
grpc_transport_stream_op op_;
grpc_transport_stream_op_payload op_payload_;
grpc_transport_stream_op_batch op_;
grpc_transport_stream_op_batch_payload op_payload_;
grpc_closure closure_;
};
@ -491,7 +491,7 @@ namespace isolated_call_filter {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op_batch *op) {
if (op->recv_initial_metadata) {
grpc_closure_sched(
exec_ctx,

@ -207,7 +207,7 @@ class Stream {
static_cast<grpc_stream *>(stream_), closure);
}
void Op(grpc_transport_stream_op *op) {
void Op(grpc_transport_stream_op_batch *op) {
grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(),
static_cast<grpc_stream *>(stream_), op);
}
@ -305,8 +305,8 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
grpc_transport_stream_op op;
grpc_transport_stream_op_payload op_payload;
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
std::unique_ptr<Closure> start;
std::unique_ptr<Closure> done;
@ -356,8 +356,8 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
grpc_transport_stream_op op;
grpc_transport_stream_op_payload op_payload;
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
auto reset_op = [&]() {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
@ -383,8 +383,8 @@ static void BM_TransportStreamSend(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
grpc_transport_stream_op op;
grpc_transport_stream_op_payload op_payload;
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
auto reset_op = [&]() {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
@ -504,8 +504,8 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
grpc_transport_stream_op_payload op_payload;
grpc_transport_stream_op op;
grpc_transport_stream_op_batch_payload op_payload;
grpc_transport_stream_op_batch op;
grpc_byte_stream *recv_stream;
grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);

Loading…
Cancel
Save