s/grpc_transport_op/grpc_transport_stream_op/g

pull/2303/head
Craig Tiller 10 years ago
parent f5f1712e1f
commit b7959a0b36
  1. 14
      src/core/channel/census_filter.c
  2. 6
      src/core/channel/channel_stack.c
  3. 11
      src/core/channel/channel_stack.h
  4. 16
      src/core/channel/client_channel.c
  5. 4
      src/core/channel/connected_channel.c
  6. 7
      src/core/channel/http_client_filter.c
  7. 7
      src/core/channel/http_server_filter.c
  8. 7
      src/core/channel/noop_filter.c
  9. 7
      src/core/client_config/subchannel.h
  10. 12
      src/core/security/client_auth_filter.c
  11. 4
      src/core/security/server_auth_filter.c
  12. 14
      src/core/surface/call.c
  13. 4
      src/core/surface/client.c
  14. 6
      src/core/surface/lame_client.c
  15. 7
      src/core/surface/server.c
  16. 13
      src/core/transport/chttp2_transport.c
  17. 13
      src/core/transport/transport.c
  18. 20
      src/core/transport/transport.h
  19. 5
      src/core/transport/transport_impl.h
  20. 6
      src/core/transport/transport_op_string.c
  21. 4
      test/core/channel/channel_stack_test.c

@ -84,7 +84,8 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
}
}
static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
static void client_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
if (op->send_ops) {
@ -93,7 +94,7 @@ static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
}
static void client_start_transport_op(grpc_call_element* elem,
grpc_transport_op* op) {
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
client_mutate_op(elem, op);
@ -110,7 +111,8 @@ static void server_on_done_recv(void* ptr, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
static void server_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
if (op->recv_ops) {
/* substitute our callback for the op callback */
@ -123,7 +125,7 @@ static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
}
static void server_start_transport_op(grpc_call_element* elem,
grpc_transport_op* op) {
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
server_mutate_op(elem, op);
@ -145,7 +147,7 @@ static void channel_op(grpc_channel_element* elem,
static void client_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
grpc_transport_op* initial_op) {
grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
@ -163,7 +165,7 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
static void server_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
grpc_transport_op* initial_op) {
grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);

@ -148,7 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_op *initial_op,
grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@ -184,7 +184,7 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) {
grpc_call_element *next_elem = elem + 1;
next_elem->filter->start_transport_op(next_elem, op);
}
@ -206,7 +206,7 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
}
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
grpc_transport_op op;
grpc_transport_stream_op op;
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
grpc_call_next_op(cur_elem, &op);

@ -103,7 +103,8 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
void (*start_transport_op)(grpc_call_element *elem,
grpc_transport_stream_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@ -122,7 +123,7 @@ typedef struct {
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op);
grpc_transport_stream_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@ -201,13 +202,13 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_op *initial_op,
grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@ -219,7 +220,7 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_transport_op *op);
grpc_call_element *elem, grpc_transport_stream_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);

@ -91,7 +91,7 @@ struct call_data {
/* our child call stack */
grpc_subchannel_call *subchannel_call;
} active;
grpc_transport_op waiting_op;
grpc_transport_stream_op waiting_op;
struct {
grpc_linked_mdelem status;
grpc_linked_mdelem details;
@ -124,7 +124,7 @@ static int prepare_activate(grpc_call_element *elem,
return 1;
}
static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@ -154,7 +154,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
#endif
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
@ -194,12 +194,12 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
}
static void cc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_op waiting_op;
grpc_transport_stream_op waiting_op;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@ -457,7 +457,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
@ -555,7 +555,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
grpc_transport_op *call_ops;
grpc_transport_stream_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@ -597,7 +597,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
grpc_transport_op_finish_with_failure(&call_ops[i]);
grpc_transport_stream_op_finish_with_failure(&call_ops[i]);
}
}

@ -62,7 +62,7 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
static void con_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
@ -96,7 +96,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;

@ -87,7 +87,8 @@ static void hc_on_recv(void *user_data, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
static void hc_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -124,7 +125,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
static void hc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
hc_mutate_op(elem, op);
grpc_call_next_op(elem, op);
@ -150,7 +151,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;

@ -177,7 +177,8 @@ static void hs_on_recv(void *user_data, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
static void hs_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -207,7 +208,7 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
static void hs_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
hs_mutate_op(elem, op);
grpc_call_next_op(elem, op);
@ -233,7 +234,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */

@ -45,7 +45,8 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
static void noop_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -62,7 +63,7 @@ static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void noop_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
noop_mutate_op(elem, op);
/* pass control down the stack */
@ -89,7 +90,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;

@ -77,9 +77,12 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_iomgr_closure *notify);
/** construct a call */
grpc_subchannel_call *grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_call_element *parent, grpc_transport_op *initial_op);
grpc_subchannel_call *grpc_subchannel_create_call(
grpc_subchannel *subchannel, grpc_call_element *parent,
grpc_transport_stream_op *initial_op);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_op *op);
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op *op);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */

@ -58,7 +58,7 @@ typedef struct {
so that work can progress when this call wants work to
progress */
grpc_pollset *pollset;
grpc_transport_op op;
grpc_transport_stream_op op;
size_t op_md_idx;
int sent_initial_metadata;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
@ -77,7 +77,7 @@ typedef struct {
static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_op_add_cancellation(
grpc_transport_stream_op_add_cancellation(
&calld->op, GRPC_STATUS_UNAUTHENTICATED,
grpc_mdstr_from_string(chand->md_ctx, error_msg));
grpc_call_next_op(elem, &calld->op);
@ -90,7 +90,7 @@ static void on_credentials_metadata(void *user_data,
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_op *op = &calld->op;
grpc_transport_stream_op *op = &calld->op;
grpc_metadata_batch *mdb;
size_t i;
if (status != GRPC_CREDENTIALS_OK) {
@ -131,7 +131,7 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
}
static void send_security_metadata(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
@ -193,7 +193,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@ -263,7 +263,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
calld->creds = NULL;
calld->host = NULL;

@ -51,7 +51,7 @@ typedef struct channel_data {
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
/* TODO(jboeuf): Get the metadata and get a new context from it. */
/* pass control down the stack */
@ -68,7 +68,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;

@ -252,8 +252,8 @@ struct grpc_call {
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op);
static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
static void execute_op(grpc_call *call, grpc_transport_stream_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
@ -268,8 +268,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
size_t add_initial_metadata_count,
gpr_timespec send_deadline) {
size_t i;
grpc_transport_op initial_op;
grpc_transport_op *initial_op_ptr = NULL;
grpc_transport_stream_op initial_op;
grpc_transport_stream_op *initial_op_ptr = NULL;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@ -454,7 +454,7 @@ static int need_more_data(grpc_call *call) {
}
static void unlock(grpc_call *call) {
grpc_transport_op op;
grpc_transport_stream_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
int start_op = 0;
@ -868,7 +868,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
}
}
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
grpc_ioreq_data data;
gpr_uint32 flags;
grpc_metadata_batch mdb;
@ -1115,7 +1115,7 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
static void execute_op(grpc_call *call, grpc_transport_op *op) {
static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);

@ -44,7 +44,7 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
static void client_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_call_next_op(elem, op);
}
@ -69,7 +69,7 @@ static void channel_op(grpc_channel_element *elem,
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data,
grpc_transport_op *initial_op) {}
grpc_transport_stream_op *initial_op) {}
static void destroy_call_elem(grpc_call_element *elem) {}

@ -50,7 +50,7 @@ typedef struct {
typedef struct { grpc_mdctx *mdctx; } channel_data;
static void lame_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@ -98,9 +98,9 @@ static void channel_op(grpc_channel_element *elem,
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
if (initial_op) {
grpc_transport_op_finish_with_failure(initial_op);
grpc_transport_stream_op_finish_with_failure(initial_op);
}
}

@ -526,7 +526,8 @@ static void server_on_recv(void *ptr, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_ops) {
@ -541,7 +542,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
static void server_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
@ -625,7 +626,7 @@ static void shutdown_channel(channel_data *chand, int send_goaway,
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));

@ -232,7 +232,7 @@ struct transport {
gpr_uint8 writing;
/** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
/** are we calling back any grpc_transport_stream_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
@ -399,7 +399,8 @@ static void maybe_finish_read(transport *t, stream *s);
static void maybe_join_window_updates(transport *t, stream *s);
static void finish_reads(transport *t);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void perform_op_locked(transport *t, stream *s,
grpc_transport_stream_op *op);
static void add_metadata_batch(transport *t, stream *s);
static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
@ -644,7 +645,8 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data, grpc_transport_op *initial_op) {
const void *server_data,
grpc_transport_stream_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
@ -1127,7 +1129,8 @@ static void maybe_start_some_streams(transport *t) {
}
}
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
static void perform_op_locked(transport *t, stream *s,
grpc_transport_stream_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
t, s, op->cancel_with_status,
@ -1186,7 +1189,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
}
static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;

@ -53,13 +53,13 @@ void grpc_transport_destroy(grpc_transport *transport) {
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
return transport->vtable->init_stream(transport, stream, server_data,
initial_op);
}
void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_op *op) {
grpc_transport_stream_op *op) {
transport->vtable->perform_op(transport, stream, op);
}
@ -96,7 +96,8 @@ void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
setup->vtable->del_interested_party(setup, pollset);
}
void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
void grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op *op) {
if (op->send_ops) {
op->on_done_send(op->send_user_data, 0);
}
@ -108,9 +109,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
}
}
void grpc_transport_op_add_cancellation(grpc_transport_op *op,
grpc_status_code status,
grpc_mdstr *message) {
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status,
grpc_mdstr *message) {
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
op->cancel_message = message;

@ -63,7 +63,7 @@ typedef enum grpc_stream_state {
} grpc_stream_state;
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
typedef struct grpc_transport_stream_op {
void (*on_consumed)(void *user_data, int success);
void *on_consumed_user_data;
@ -84,7 +84,7 @@ typedef struct grpc_transport_op {
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
} grpc_transport_op;
} grpc_transport_stream_op;
/* Callbacks made from the transport to the upper layers of grpc. */
struct grpc_transport_callbacks {
@ -126,7 +126,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
grpc_transport_op *initial_op);
grpc_transport_stream_op *initial_op);
/* Destroy transport data for a stream.
@ -141,17 +141,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op);
void grpc_transport_op_add_cancellation(grpc_transport_op *op,
grpc_status_code status,
grpc_mdstr *message);
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status,
grpc_mdstr *message);
/* TODO(ctiller): remove this */
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset);
char *grpc_transport_op_string(grpc_transport_op *op);
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
/* Send a batch of operations on a transport
@ -161,9 +161,9 @@ char *grpc_transport_op_string(grpc_transport_op *op);
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
op - a grpc_transport_op specifying the op to perform */
op - a grpc_transport_stream_op specifying the op to perform */
void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_op *op);
grpc_transport_stream_op *op);
/* Send a ping on a transport

@ -43,11 +43,12 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
const void *server_data, grpc_transport_op *initial_op);
const void *server_data,
grpc_transport_stream_op *initial_op);
/* implementation of grpc_transport_send_batch */
void (*perform_op)(grpc_transport *self, grpc_stream *stream,
grpc_transport_op *op);
grpc_transport_stream_op *op);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);

@ -107,7 +107,7 @@ char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
return out;
}
char *grpc_transport_op_string(grpc_transport_op *op) {
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
char *tmp;
char *out;
int first = 1;
@ -158,8 +158,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_transport_op *op) {
char *str = grpc_transport_op_string(op);
grpc_call_element *elem, grpc_transport_stream_op *op) {
char *str = grpc_transport_stream_op_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}

@ -54,7 +54,7 @@ static void channel_init_func(grpc_channel_element *elem,
static void call_init_func(grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_op *initial_op) {
grpc_transport_stream_op *initial_op) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
}
@ -65,7 +65,7 @@ static void call_destroy_func(grpc_call_element *elem) {
++*(int *)(elem->channel_data);
}
static void call_func(grpc_call_element *elem, grpc_transport_op *op) {
static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op) {
++*(int *)(elem->call_data);
}

Loading…
Cancel
Save