clang-format affected files

pull/2303/head
Craig Tiller 10 years ago
parent 6fa9148d51
commit 079a11bb9b
  1. 4
      include/grpc/grpc.h
  2. 24
      src/core/channel/census_filter.c
  3. 4
      src/core/channel/channel_args.c
  4. 9
      src/core/channel/channel_args.h
  5. 8
      src/core/channel/channel_stack.c
  6. 8
      src/core/channel/channel_stack.h
  7. 67
      src/core/channel/client_channel.c
  8. 2
      src/core/channel/client_channel.h
  9. 2
      src/core/channel/connected_channel.c
  10. 2
      src/core/channel/connected_channel.h
  11. 2
      src/core/channel/http_client_filter.c
  12. 2
      src/core/channel/http_server_filter.c
  13. 2
      src/core/channel/noop_filter.c
  14. 12
      src/core/client_config/resolver.c
  15. 9
      src/core/client_config/resolver.h
  16. 2
      src/core/client_config/resolvers/unix_resolver_posix.c
  17. 58
      src/core/client_config/subchannel.c
  18. 27
      src/core/client_config/subchannel.h
  19. 3
      src/core/client_config/uri_parser.c
  20. 3
      src/core/iomgr/fd_posix.c
  21. 4
      src/core/security/client_auth_filter.c
  22. 9
      src/core/security/server_auth_filter.c
  23. 12
      src/core/security/server_secure_chttp2.c
  24. 17
      src/core/surface/call.c
  25. 4
      src/core/surface/channel.c
  26. 29
      src/core/surface/channel_create.c
  27. 2
      src/core/surface/lame_client.c
  28. 29
      src/core/surface/secure_channel_create.c
  29. 23
      src/core/surface/server.c
  30. 58
      src/core/transport/chttp2_transport.c
  31. 6
      src/core/transport/connectivity_state.c
  32. 3
      src/core/transport/connectivity_state.h
  33. 5
      src/core/transport/transport.c
  34. 11
      src/core/transport/transport.h
  35. 2
      src/core/transport/transport_impl.h
  36. 11
      test/core/bad_client/bad_client.c
  37. 7
      test/core/channel/channel_stack_test.c
  38. 24
      test/core/end2end/fixtures/chttp2_socket_pair.c
  39. 24
      test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
  40. 24
      test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c
  41. 2
      test/core/end2end/multiple_server_queues_test.c
  42. 12
      test/core/iomgr/fd_conservation_posix_test.c

@ -441,7 +441,7 @@ void grpc_channel_destroy(grpc_channel *channel);
has been made. */
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
Can be called multiple times, from any thread.
THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status
are thread-safe, and can be called at any point before grpc_call_destroy
is called.*/
@ -457,7 +457,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description);
/* Destroy a call.
/* Destroy a call.
THREAD SAFETY: grpc_call_destroy is thread-compatible */
void grpc_call_destroy(grpc_call *call);

@ -202,11 +202,23 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
client_start_transport_op, channel_op, sizeof(call_data),
client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "census-client"};
client_start_transport_op,
channel_op,
sizeof(call_data),
client_init_call_elem,
client_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
"census-client"};
const grpc_channel_filter grpc_server_census_filter = {
server_start_transport_op, channel_op, sizeof(call_data),
server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "census-server"};
server_start_transport_op,
channel_op,
sizeof(call_data),
server_init_call_elem,
server_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
"census-server"};

@ -139,8 +139,8 @@ grpc_compression_level grpc_channel_args_get_compression_level(
return GRPC_COMPRESS_LEVEL_NONE;
}
void grpc_channel_args_set_compression_level(
grpc_channel_args **a, grpc_compression_level level) {
void grpc_channel_args_set_compression_level(grpc_channel_args **a,
grpc_compression_level level) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_LEVEL_ARG;

@ -47,7 +47,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
size_t num_to_add);
/** Copy args from a then args from b into a new channel args */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b);
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Destroy arguments created by grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
@ -62,7 +63,7 @@ grpc_compression_level grpc_channel_args_get_compression_level(
/** Sets the compression level in \a a to \a level. Setting it to
* GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */
void grpc_channel_args_set_compression_level(
grpc_channel_args **a, grpc_compression_level level);
void grpc_channel_args_set_compression_level(grpc_channel_args **a,
grpc_compression_level level);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */

@ -102,7 +102,8 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
}
void grpc_channel_stack_init(const grpc_channel_filter **filters,
size_t filter_count, grpc_channel *master, const grpc_channel_args *args,
size_t filter_count, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack) {
size_t call_size =
@ -122,8 +123,9 @@ void grpc_channel_stack_init(const grpc_channel_filter **filters,
for (i = 0; i < filter_count; i++) {
elems[i].filter = filters[i];
elems[i].channel_data = user_data;
elems[i].filter->init_channel_elem(&elems[i], master, args, metadata_context,
i == 0, i == (filter_count - 1));
elems[i].filter->init_channel_elem(&elems[i], master, args,
metadata_context, i == 0,
i == (filter_count - 1));
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
}

@ -65,7 +65,7 @@ typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
void (*start_transport_stream_op)(grpc_call_element *elem,
grpc_transport_stream_op *op);
grpc_transport_stream_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@ -96,8 +96,7 @@ typedef struct {
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
void (*init_channel_elem)(grpc_channel_element *elem,
grpc_channel *master,
void (*init_channel_elem)(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last);
@ -152,7 +151,8 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
void grpc_channel_stack_init(const grpc_channel_filter **filters,
size_t filter_count, grpc_channel *master,const grpc_channel_args *args,
size_t filter_count, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack);
/* Destroy a channel stack */

@ -138,7 +138,9 @@ typedef struct {
grpc_call_element *elem;
} waiting_call;
static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
static void perform_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation);
static void continue_with_pick(void *arg, int iomgr_success) {
waiting_call *wc = arg;
@ -147,7 +149,8 @@ static void continue_with_pick(void *arg, int iomgr_success) {
gpr_free(wc);
}
static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
static void add_to_lb_policy_wait_queue_locked_state_config(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
@ -182,7 +185,8 @@ static void started_call(void *arg, int iomgr_success) {
calld->state = CALL_ACTIVE;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
grpc_subchannel_call_process_op(calld->subchannel_call,
&calld->waiting_op);
}
} else {
calld->state = CALL_CANCELLED;
@ -233,17 +237,16 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
initial_metadata = &op->send_ops->ops[0].data.metadata;
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(lb_policy, op->bind_pollset,
initial_metadata, &calld->picked_channel, &calld->async_setup_task);
grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata,
&calld->picked_channel, &calld->async_setup_task);
}
static void merge_into_waiting_op(grpc_call_element *elem, grpc_transport_stream_op *new_op) {
static void merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops == NULL) !=
(new_op->send_ops == NULL));
GPR_ASSERT((waiting_op->recv_ops == NULL) !=
(new_op->recv_ops == NULL));
GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL));
GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL));
if (new_op->send_ops != NULL) {
waiting_op->send_ops = new_op->send_ops;
waiting_op->is_last_send = new_op->is_last_send;
@ -263,7 +266,9 @@ static void merge_into_waiting_op(grpc_call_element *elem, grpc_transport_stream
}
}
static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
static void perform_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
@ -311,7 +316,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
}
break;
}
/* fall through */
/* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
@ -343,7 +348,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
grpc_transport_stream_op *op) {
perform_transport_stream_op(elem, op, 0);
}
@ -382,12 +387,14 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
GRPC_RESOLVER_REF(resolver, "channel-next");
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
} else {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
grpc_connectivity_state_set(&chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE);
gpr_mu_unlock(&chand->mu_config);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
@ -404,7 +411,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
static void cc_start_transport_op(grpc_channel_element *elem,
grpc_transport_op *op) {
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
@ -416,13 +424,16 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
grpc_connectivity_state_notify_on_state_change(
&chand->state_tracker, op->connectivity_state,
op->on_connectivity_state_change);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
grpc_connectivity_state_set(&chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE);
destroy_resolver = chand->resolver;
chand->resolver = NULL;
}
@ -496,7 +507,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@ -510,7 +521,8 @@ static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
}
@ -530,9 +542,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "client-channel",
cc_start_transport_stream_op,
cc_start_transport_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
"client-channel",
};
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
@ -544,5 +562,6 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
chand->resolver = resolver;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
GRPC_RESOLVER_REF(resolver, "channel");
grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}

@ -52,4 +52,4 @@ extern const grpc_channel_filter grpc_client_channel_filter;
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */

@ -103,7 +103,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;

@ -46,4 +46,4 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
grpc_transport *transport);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */

@ -170,7 +170,7 @@ static const char *scheme_from_args(const grpc_channel_args *args) {
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */

@ -229,7 +229,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */

@ -95,7 +95,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */

@ -34,16 +34,17 @@
#include "src/core/client_config/resolver.h"
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable) {
const grpc_resolver_vtable *vtable) {
resolver->vtable = vtable;
gpr_ref_init(&resolver->refs, 1);
}
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line,
const char *reason) {
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s",
resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, reason);
resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1,
reason);
#else
void grpc_resolver_ref(grpc_resolver *resolver) {
#endif
@ -52,9 +53,10 @@ void grpc_resolver_ref(grpc_resolver *resolver) {
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line,
const char *reason) {
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s",
resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, reason);
resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1,
reason);
#else
void grpc_resolver_unref(grpc_resolver *resolver) {
#endif

@ -59,14 +59,13 @@ struct grpc_resolver_vtable {
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
#define GRPC_RESOLVER_REF(p, r) \
grpc_resolver_ref((p), __FILE__, __LINE__, (r))
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r))
#define GRPC_RESOLVER_UNREF(p, r) \
grpc_resolver_unref((p), __FILE__, __LINE__, (r))
void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line,
const char *reason);
const char *reason);
void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line,
const char *reason);
const char *reason);
#else
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p))
#define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p))
@ -75,7 +74,7 @@ void grpc_resolver_unref(grpc_resolver *policy);
#endif
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable);
const grpc_resolver_vtable *vtable);
void grpc_resolver_shutdown(grpc_resolver *resolver);

@ -136,7 +136,7 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) {
}
static void unix_destroy(grpc_resolver *gr) {
unix_resolver *r = (unix_resolver*)gr;
unix_resolver *r = (unix_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r);

@ -134,21 +134,31 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void subchannel_ref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static int subchannel_unref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
GRPC_MUST_USE_RESULT;
static grpc_subchannel *connection_unref_locked(
connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_subchannel *c);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) \
subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) \
connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(p, r) \
connection_unref_locked((p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
#define REF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", (name), (p), (p)->refs, (p)->refs + 1, reason)
#define UNREF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", (name), (p), (p)->refs, (p)->refs - 1, reason)
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs + 1, reason)
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs - 1, reason)
#else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
@ -174,13 +184,15 @@ static void connection_destroy(connection *c) {
gpr_free(c);
}
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
static void connection_ref_locked(
connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("CONNECTION", c);
subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
++c->refs;
}
static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
static grpc_subchannel *connection_unref_locked(
connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *destroy = NULL;
UNREF_LOG("CONNECTION", c);
if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
@ -196,12 +208,14 @@ static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_RE
* grpc_subchannel implementation
*/
static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
static void subchannel_ref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("SUBCHANNEL", c);
++c->refs;
++c->refs;
}
static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
static int subchannel_unref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("SUBCHANNEL", c);
return --c->refs == 0;
}
@ -385,7 +399,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
grpc_channel_element *top_elem =
grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(top_elem, op);
gpr_mu_lock(&c->mu);
@ -485,7 +500,8 @@ static void publish_transport(grpc_subchannel *c) {
stk = (grpc_channel_stack *)(con + 1);
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, stk);
grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx,
stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free(c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@ -601,11 +617,13 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation
*/
void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_ref(&c->refs);
void grpc_subchannel_call_ref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_ref(&c->refs);
}
void grpc_subchannel_call_unref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_subchannel_call_unref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (gpr_unref(&c->refs)) {
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;

@ -44,11 +44,16 @@ typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS , const char *file, int line, const char *reason
#define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) \
grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
, const char *file, int line, const char *reason
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
@ -57,10 +62,14 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_ref(
grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(
grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,

@ -39,7 +39,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, int suppress_errors) {
static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section,
int suppress_errors) {
char *line_prefix;
int pfx_len;

@ -199,7 +199,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
static int has_watchers(grpc_fd *fd) {
return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
return fd->read_watcher != NULL || fd->write_watcher != NULL ||
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,

@ -280,7 +280,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@ -326,6 +326,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_auth_filter = {
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "client-auth"};

@ -84,8 +84,7 @@ static void init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
}
static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
@ -115,6 +114,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_server_auth_filter = {
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "server-auth"};
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "server-auth"};

@ -75,9 +75,8 @@ static void state_unref(grpc_server_secure_state *state) {
}
}
static void setup_transport(void *statep,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static void setup_transport(void *statep, grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep;
@ -85,8 +84,7 @@ static void setup_transport(void *statep,
grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
grpc_server_get_channel_args(state->server), &connector_arg, 1);
grpc_server_setup_transport(state->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx,
args_copy);
GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy);
grpc_channel_args_destroy(args_copy);
}
@ -101,8 +99,8 @@ static void on_secure_transport_setup_done(void *statep,
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server),
secure_endpoint, NULL, 0, mdctx, 0);
grpc_server_get_channel_args(state->server), secure_endpoint, NULL, 0,
mdctx, 0);
setup_transport(state, transport, mdctx);
} else {
/* We need to consume this here, because the server may already have gone

@ -462,8 +462,7 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
(call->cancel_with_status != GRPC_STATUS_OK) ||
call->destroy_called;
(call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
}
static void unlock(grpc_call *call) {
@ -1151,7 +1150,8 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
} else {
finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
args->call = call;
grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
args);
op->on_consumed = &args->closure;
}
}
@ -1223,13 +1223,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
} else {
gpr_uint32 parsed_clevel_bytes;
if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
&parsed_clevel_bytes)) {
GPR_SLICE_LENGTH(md->value->slice),
&parsed_clevel_bytes)) {
/* the following cast is safe, as a gpr_uint32 should be able to hold all
* possible values of the grpc_compression_level enum */
clevel = (grpc_compression_level) parsed_clevel_bytes;
clevel = (grpc_compression_level)parsed_clevel_bytes;
} else {
clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
}
grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
@ -1252,7 +1252,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
} else if (key ==
grpc_channel_get_compresssion_level_string(call->channel)) {
set_decode_compression_level(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];

@ -129,7 +129,8 @@ grpc_channel *grpc_channel_create_from_filters(
}
}
grpc_channel_stack_init(filters, num_filters, channel, args, channel->metadata_context,
grpc_channel_stack_init(filters, num_filters, channel, args,
channel->metadata_context,
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
@ -264,7 +265,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
return channel->grpc_compression_level_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]);

@ -71,10 +71,10 @@ static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
c->result->transport =
grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter*));
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
c->result->filters[0] = &grpc_http_client_filter;
c->result->num_filters = 1;
} else {
@ -85,19 +85,22 @@ static void connected(void *arg, grpc_endpoint *tcp) {
grpc_iomgr_add_callback(notify);
}
static void connector_connect(
grpc_connector *con, const grpc_connect_in_args *args,
grpc_connect_out_args *result, grpc_iomgr_closure *notify) {
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
grpc_iomgr_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline);
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect};
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_connect};
typedef struct {
grpc_subchannel_factory base;
@ -119,7 +122,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
}
}
static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
subchannel_factory *f = (subchannel_factory *)scf;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
@ -136,7 +140,9 @@ static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_fac
return s;
}
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel};
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
/* Create a client channel:
Asynchronously: - resolve target
@ -170,7 +176,8 @@ grpc_channel *grpc_channel_create(const char *target,
}
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver);
GRPC_RESOLVER_UNREF(resolver, "create");
grpc_subchannel_factory_unref(&f->base);

@ -105,7 +105,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *chand = elem->channel_data;

@ -82,9 +82,9 @@ static void on_secure_transport_setup_done(void *arg,
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
memset(c->result, 0, sizeof(*c->result));
} else {
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, secure_endpoint,
NULL, 0, c->args.metadata_context, 1);
c->result->transport =
grpc_create_chttp2_transport(c->args.channel_args, secure_endpoint,
NULL, 0, c->args.metadata_context, 1);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_client_auth_filter;
c->result->filters[1] = &grpc_http_client_filter;
@ -109,19 +109,22 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
}
static void connector_connect(
grpc_connector *con, const grpc_connect_in_args *args,
grpc_connect_out_args *result, grpc_iomgr_closure *notify) {
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
grpc_iomgr_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline);
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect};
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_connect};
typedef struct {
grpc_subchannel_factory base;
@ -144,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
}
}
static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
subchannel_factory *f = (subchannel_factory *)scf;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
@ -162,7 +166,9 @@ static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_fac
return s;
}
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel};
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
/* Create a secure client channel:
Asynchronously: - resolve target
@ -219,7 +225,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
}
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver);
GRPC_RESOLVER_UNREF(resolver, "create");
grpc_channel_args_destroy(args_copy);

@ -151,7 +151,7 @@ struct grpc_server {
before mu_call. This is currently used in shutdown processing
(grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
@ -226,11 +226,10 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
channel_data *c;
size_t count = 0;
size_t dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data;
c = c->next) {
count ++;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
count++;
if (c->num_calls == 0) {
dc_count ++;
dc_count++;
}
}
cb->num_channels = count;
@ -239,8 +238,7 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
count = 0;
dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data;
c = c->next) {
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
if (c->num_calls == 0) {
@ -261,7 +259,8 @@ static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
gpr_free(a);
}
static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disconnect) {
static void send_shutdown(grpc_channel *channel, int send_goaway,
int send_disconnect) {
grpc_transport_op op;
struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
@ -277,12 +276,12 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, int send_disco
grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
elem->filter->start_transport_op(elem, &op);
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int send_disconnect) {
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
int send_goaway, int send_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@ -721,7 +720,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
server_unref(chand->server);
}
static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {

@ -117,7 +117,9 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
static void maybe_start_some_streams(
grpc_chttp2_transport_global *transport_global);
static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state);
static void connectivity_state_set(
grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state);
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@ -330,8 +332,7 @@ static void destroy_transport(grpc_transport *gt) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(&t->global,
GRPC_CHANNEL_FATAL_FAILURE);
connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE);
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
}
@ -339,7 +340,8 @@ static void close_transport_locked(grpc_chttp2_transport *t) {
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data, grpc_transport_stream_op *initial_op) {
const void *server_data,
grpc_transport_stream_op *initial_op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@ -521,14 +523,13 @@ static void writing_action(void *gt, int iomgr_success_ignored) {
void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT);
char *msg = gpr_hexdump((char *)GPR_SLICE_START_PTR(goaway_text),
GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT);
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
connectivity_state_set(
transport_global,
GRPC_CHANNEL_FATAL_FAILURE);
connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE);
}
static void maybe_start_some_streams(
@ -735,9 +736,8 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
grpc_chttp2_parsing_become_skip_parser(&t->parsing);
}
new_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
if (new_stream_count != t->global.concurrent_stream_count) {
t->global.concurrent_stream_count = new_stream_count;
maybe_start_some_streams(&t->global);
@ -772,11 +772,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(stream_global->cancelled_status, buffer);
grpc_chttp2_incoming_metadata_buffer_add(&stream_global->incoming_metadata,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
grpc_chttp2_incoming_metadata_buffer_add(
&stream_global->incoming_metadata,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status",
buffer));
grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
&stream_global->incoming_metadata,
&stream_global->incoming_sopb);
&stream_global->incoming_metadata, &stream_global->incoming_sopb);
stream_global->published_cancelled = 1;
}
}
@ -825,10 +826,10 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global->cancelled = 1;
stream_global->cancelled_status = status;
if (stream_global->id != 0) {
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_rst_stream_create(
stream_global->id,
grpc_chttp2_grpc_status_to_http2_error(status)));
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(
stream_global->id, grpc_chttp2_grpc_status_to_http2_error(status)));
}
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
@ -907,7 +908,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map);
t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
@ -934,17 +936,19 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP
*/
static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closure) {
static void schedule_closure_for_connectivity(void *a,
grpc_iomgr_closure *closure) {
grpc_chttp2_schedule_closure(a, closure, 1);
}
static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
static void connectivity_state_set(
grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set_with_scheduler(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
state,
schedule_closure_for_connectivity,
transport_global);
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
state, schedule_closure_for_connectivity, transport_global);
}
void grpc_chttp2_schedule_closure(

@ -81,8 +81,7 @@ int grpc_connectivity_state_notify_on_state_change(
void grpc_connectivity_state_set_with_scheduler(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
void (*scheduler)(void *arg, grpc_iomgr_closure *closure),
void *arg) {
void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
/*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/
@ -111,5 +110,6 @@ static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state) {
grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, NULL);
grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
NULL);
}

@ -61,8 +61,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state);
void grpc_connectivity_state_set_with_scheduler(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
void (*scheduler)(void *arg, grpc_iomgr_closure *closure),
void *arg);
void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);

@ -49,8 +49,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
initial_op);
}
void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_stream_op *op) {
void grpc_transport_perform_stream_op(grpc_transport *transport,
grpc_stream *stream,
grpc_transport_stream_op *op) {
transport->vtable->perform_stream_op(transport, stream, op);
}

@ -99,7 +99,8 @@ typedef struct grpc_transport_op {
gpr_slice *goaway_message;
/** set the callback for accepting new streams;
this is a permanent callback, unlike the other one-shot closures */
void (*set_accept_stream)(void *user_data, grpc_transport *transport, const void *server_data);
void (*set_accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
void *set_accept_stream_user_data;
/** add this transport to a pollset */
grpc_pollset *bind_pollset;
@ -154,10 +155,12 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
op - a grpc_transport_stream_op specifying the op to perform */
void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_stream_op *op);
void grpc_transport_perform_stream_op(grpc_transport *transport,
grpc_stream *stream,
grpc_transport_stream_op *op);
void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op);
void grpc_transport_perform_op(grpc_transport *transport,
grpc_transport_op *op);
/* Send a ping on a transport

@ -48,7 +48,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_perform_stream_op */
void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream,
grpc_transport_stream_op *op);
grpc_transport_stream_op *op);
/* implementation of grpc_transport_perform_op */
void (*perform_op)(grpc_transport *self, grpc_transport_op *op);

@ -63,14 +63,14 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
gpr_event_set(&a->done_write, (void *)1);
}
static void server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
thd_args *a = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(a->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(a->server));
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(a->server));
}
void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
@ -105,8 +105,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
a.validator = validator;
grpc_server_register_completion_queue(a.server, a.cq);
grpc_server_start(a.server);
transport = grpc_create_chttp2_transport(NULL, sfd.server,
NULL, 0, mdctx, 0);
transport = grpc_create_chttp2_transport(NULL, sfd.server, NULL, 0, mdctx, 0);
server_setup_transport(&a, transport, mdctx);
/* Bind everything into the same pollset */

@ -39,7 +39,7 @@
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
static void channel_init_func(grpc_channel_element *elem,grpc_channel *master,
static void channel_init_func(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@ -75,8 +75,9 @@ static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) {
static void test_create_channel_stack(void) {
const grpc_channel_filter filter = {
call_func, channel_func, sizeof(int), call_init_func, call_destroy_func,
sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"};
call_func, channel_func, sizeof(int),
call_init_func, call_destroy_func, sizeof(int),
channel_init_func, channel_destroy_func, "some_test_filter"};
const grpc_channel_filter *filters = &filter;
grpc_channel_stack *channel_stack;
grpc_call_stack *call_stack;

@ -55,14 +55,14 @@
/* chttp2 transport that is immediately available (used for testing
connected_channel without a client_channel */
static void server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
}
typedef struct {
@ -70,8 +70,8 @@ typedef struct {
grpc_channel_args *client_args;
} sp_client_setup;
static void client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void client_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
@ -82,8 +82,8 @@ static void client_setup_transport(
cs->f->client = channel;
grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
transport);
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -108,8 +108,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
transport = grpc_create_chttp2_transport(client_args,
sfd->client, NULL, 0, mdctx, 1);
transport =
grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
}
@ -123,8 +123,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq);
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args,
sfd->server, NULL, 0, mdctx, 0);
transport =
grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0);
server_setup_transport(f, transport, mdctx);
}

@ -55,14 +55,14 @@
/* chttp2 transport that is immediately available (used for testing
connected_channel without a client_channel */
static void server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
}
typedef struct {
@ -70,8 +70,8 @@ typedef struct {
grpc_channel_args *client_args;
} sp_client_setup;
static void client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void client_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
@ -82,8 +82,8 @@ static void client_setup_transport(
cs->f->client = channel;
grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
transport);
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -108,8 +108,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
transport = grpc_create_chttp2_transport(client_args,
sfd->client, NULL, 0, mdctx, 1);
transport =
grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
}
@ -123,8 +123,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq);
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args,
sfd->server, NULL, 0, mdctx, 0);
transport =
grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0);
server_setup_transport(f, transport, mdctx);
}

@ -56,14 +56,14 @@
/* chttp2 transport that is immediately available (used for testing
connected_channel without a client_channel */
static void server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(f->server));
}
typedef struct {
@ -71,8 +71,8 @@ typedef struct {
grpc_channel_args *client_args;
} sp_client_setup;
static void client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
static void client_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
@ -83,8 +83,8 @@ static void client_setup_transport(
cs->f->client = channel;
grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
transport);
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -109,8 +109,8 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
transport = grpc_create_chttp2_transport(client_args,
sfd->client, NULL, 0, mdctx, 1);
transport =
grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
}
@ -124,8 +124,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq);
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args,
sfd->server, NULL, 0, mdctx, 0);
transport =
grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0);
server_setup_transport(f, transport, mdctx);
}

@ -49,7 +49,7 @@ int main(int argc, char **argv) {
grpc_server_register_completion_queue(server, cq2);
grpc_server_start(server);
grpc_server_shutdown_and_notify(server, cq2, NULL);
grpc_completion_queue_next(cq2, gpr_inf_future); /* cue queue hang */
grpc_completion_queue_next(cq2, gpr_inf_future); /* cue queue hang */
grpc_completion_queue_shutdown(cq1);
grpc_completion_queue_shutdown(cq2);
grpc_completion_queue_next(cq1, gpr_inf_future);

@ -40,9 +40,9 @@
#include "src/core/iomgr/iomgr.h"
int main(int argc, char **argv) {
int i;
struct rlimit rlim;
grpc_endpoint_pair p;
int i;
struct rlimit rlim;
grpc_endpoint_pair p;
grpc_test_init(argc, argv);
grpc_iomgr_init();
@ -53,9 +53,9 @@ int main(int argc, char **argv) {
GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
for (i = 0; i < 100; i++) {
p = grpc_iomgr_create_endpoint_pair("test", 1);
grpc_endpoint_destroy(p.client);
grpc_endpoint_destroy(p.server);
p = grpc_iomgr_create_endpoint_pair("test", 1);
grpc_endpoint_destroy(p.client);
grpc_endpoint_destroy(p.server);
}
grpc_iomgr_shutdown();

Loading…
Cancel
Save