Use per-method max message size limits.

pull/8303/head
Mark D. Roth 8 years ago
parent 6600645135
commit 996e95b4ad
  1. 68
      src/core/lib/channel/message_size_filter.c

@ -39,11 +39,14 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/ext/client_config/method_config.h"
// The protobuf library will (by default) start warning at 100 megs.
#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
typedef struct call_data {
int max_send_size;
int max_recv_size;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
@ -55,8 +58,10 @@ typedef struct call_data {
} call_data;
typedef struct channel_data {
size_t max_send_size;
size_t max_recv_size;
int max_send_size;
int max_recv_size;
// Method config table.
grpc_method_config_table* method_config_table;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@ -65,13 +70,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_error* error) {
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
if (*calld->recv_message != NULL &&
(*calld->recv_message)->length > chand->max_recv_size) {
if (*calld->recv_message != NULL && calld->max_recv_size >= 0 &&
(*calld->recv_message)->length > (size_t)calld->max_recv_size) {
char* message_string;
gpr_asprintf(
&message_string, "Received message larger than max (%u vs. %lu)",
(*calld->recv_message)->length, (unsigned long)chand->max_recv_size);
&message_string, "Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, calld->max_recv_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(
@ -86,13 +90,12 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
// Check max send message size.
if (op->send_message != NULL &&
op->send_message->length > chand->max_send_size) {
if (op->send_message != NULL && calld->max_send_size >= 0 &&
op->send_message->length > (size_t)calld->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)",
op->send_message->length, (unsigned long)chand->max_send_size);
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->send_message->length, calld->max_send_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(
@ -112,9 +115,37 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
calld->max_send_size = chand->max_send_size;
calld->max_recv_size = chand->max_recv_size;
if (chand->method_config_table != NULL) {
grpc_method_config* method_config =
grpc_method_config_table_get_method_config(chand->method_config_table,
args->path);
if (method_config != NULL) {
int32_t* max_request_message_bytes =
grpc_method_config_get_max_request_message_bytes(method_config);
if (max_request_message_bytes != NULL &&
(*max_request_message_bytes < calld->max_send_size ||
calld->max_send_size < 0)) {
calld->max_send_size = *max_request_message_bytes;
}
int32_t* max_response_message_bytes =
grpc_method_config_get_max_response_message_bytes(method_config);
if (max_response_message_bytes != NULL &&
(*max_response_message_bytes < calld->max_recv_size ||
calld->max_recv_size < 0)) {
calld->max_recv_size = *max_response_message_bytes;
}
}
}
return GRPC_ERROR_NONE;
}
@ -145,11 +176,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
&args->channel_args->args[i], options);
}
}
// Get method config table from channel args.
const grpc_arg *channel_arg = grpc_channel_args_find(
args->channel_args, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
chand->method_config_table = grpc_method_config_table_ref(
(grpc_method_config_table *)channel_arg->value.pointer.p);
}
}
// Destructor for channel_data.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {}
grpc_channel_element* elem) {
channel_data* chand = elem->channel_data;
grpc_method_config_table_unref(chand->method_config_table);
}
const grpc_channel_filter grpc_message_size_filter = {
start_transport_stream_op,

Loading…
Cancel
Save