Introduce a new memory reclamation scheme for channel stacks

Allows the bottom member of the stack to schedule the actual deallocation, allowing a final transport lock to be entered when destroying a call.
pull/2286/head
Craig Tiller 9 years ago
parent a67a83ebde
commit 2c8063cc72
  1. 4
      src/core/census/grpc_filter.c
  2. 6
      src/core/channel/channel_stack.c
  3. 11
      src/core/channel/channel_stack.h
  4. 5
      src/core/channel/client_channel.c
  5. 4
      src/core/channel/compress_filter.c
  6. 13
      src/core/channel/connected_channel.c
  7. 4
      src/core/channel/http_client_filter.c
  8. 4
      src/core/channel/http_server_filter.c
  9. 6
      src/core/client_config/subchannel.c
  10. 4
      src/core/security/client_auth_filter.c
  11. 4
      src/core/security/server_auth_filter.c
  12. 3
      src/core/surface/call.c
  13. 12
      src/core/surface/lame_client.c
  14. 4
      src/core/surface/server.c
  15. 57
      src/core/transport/chttp2_transport.c
  16. 5
      src/core/transport/transport.c
  17. 2
      src/core/transport/transport.h
  18. 2
      src/core/transport/transport_impl.h
  19. 6
      test/core/channel/channel_stack_test.c

@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */

@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
for (i = 0; i < count; i++) {
elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
i == count - 1 ? and_free_memory : NULL);
}
}

@ -104,8 +104,12 @@ typedef struct {
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
is complete. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *and_free_memory);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
#endif
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void *and_free_memory);
/* Ignore set pollset - used by filters to implement the set_pollset method
if they don't care about pollsets at all. Does nothing. */

@ -379,9 +379,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *and_free_memory) {
grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
gpr_free(and_free_memory);
}
/* Constructor for channel_data */

@ -246,8 +246,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices);

@ -37,13 +37,13 @@
#include <stdio.h>
#include <string.h>
#include "src/core/support/string.h"
#include "src/core/transport/transport.h"
#include "src/core/profiling/timers.h"
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/transport.h"
#define MAX_BUFFER_LENGTH 8192
@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *and_free_memory) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
and_free_memory);
}
/* Constructor for channel_data */

@ -151,8 +151,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i;

@ -212,8 +212,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,

@ -620,9 +620,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
bool success) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
gpr_free(c);
grpc_connected_subchannel *connection = c->connection;
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}

@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {
call_data *calld = elem->call_data;
grpc_call_credentials_unref(calld->creds);
if (calld->host != NULL) {

@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,

@ -375,7 +375,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_mu_destroy(&c->mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@ -394,7 +393,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
gpr_free(c);
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
GPR_TIMER_END("destroy_call", 0);
}

@ -37,13 +37,13 @@
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/support/string.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/call.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/surface/channel.h"
typedef struct {
grpc_linked_mdelem status;
@ -104,8 +104,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *and_free_memory) {
gpr_free(and_free_memory);
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,

@ -692,8 +692,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_ref(chand->server);
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;

@ -512,26 +512,20 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
return 0;
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, void *arg) {
grpc_byte_stream *bs;
#if 0
int i;
GPR_TIMER_BEGIN("destroy_stream", 0);
gpr_mu_lock(&t->mu);
GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
close_transport_locked(exec_ctx, t, NULL, NULL);
}
if (!t->parsing_active && s->global.id) {
if (!t->executor.parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL);
}
@ -539,11 +533,11 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
#endif
int i;
/* TODO(ctiller): the remainder of this function could be done without the
global lock */
for (i = 0; i < STREAM_LIST_COUNT; i++) {
for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (s->included[i]) {
gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
t->global.is_client ? "client" : "server", s->global.id, i);
@ -574,6 +568,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0);
gpr_free(arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
and_free_memory, 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@ -1509,8 +1514,8 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
for (; i < t->read_buffer.count &&
grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
i++)
;
if (i != t->read_buffer.count) {
@ -1602,10 +1607,9 @@ static void connectivity_state_set(
grpc_connectivity_state state, const char *reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
exec_ctx,
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
state, reason);
grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
->channel_callback.state_tracker,
state, reason);
}
/*******************************************************************************
@ -1915,15 +1919,10 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
set_pollset,
perform_stream_op,
perform_transport_op,
destroy_stream,
destroy_transport,
chttp2_get_peer};
static const grpc_transport_vtable vtable = {
sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset,
perform_stream_op, perform_transport_op, destroy_stream, destroy_transport,
chttp2_get_peer};
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,

@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream) {
transport->vtable->destroy_stream(exec_ctx, transport, stream);
grpc_stream *stream, void *and_free_memory) {
transport->vtable->destroy_stream(exec_ctx, transport, stream,
and_free_memory);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,

@ -208,7 +208,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream);
grpc_stream *stream, void *and_free_memory);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op);

@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream);
grpc_stream *stream, void *and_free_memory);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);

@ -62,8 +62,8 @@ static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
static void call_destroy_func(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {
++*(int *)(elem->channel_data);
}
@ -87,7 +87,7 @@ static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_call_stack_destroy(exec_ctx, arg);
grpc_call_stack_destroy(exec_ctx, arg, NULL);
gpr_free(arg);
}

Loading…
Cancel
Save