From 996e95b4ad4e401dc8b04ae9cc092bd167307e10 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 27 Sep 2016 10:55:37 -0700 Subject: [PATCH] Use per-method max message size limits. --- src/core/lib/channel/message_size_filter.c | 68 +++++++++++++++++----- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index a254fe3997f..e34e365d2cc 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -39,11 +39,14 @@ #include #include "src/core/lib/channel/channel_args.h" +#include "src/core/ext/client_config/method_config.h" // The protobuf library will (by default) start warning at 100 megs. #define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024) typedef struct call_data { + int max_send_size; + int max_recv_size; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. @@ -55,8 +58,10 @@ typedef struct call_data { } call_data; typedef struct channel_data { - size_t max_send_size; - size_t max_recv_size; + int max_send_size; + int max_recv_size; + // Method config table. + grpc_method_config_table* method_config_table; } channel_data; // Callback invoked when we receive a message. Here we check the max @@ -65,13 +70,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { grpc_call_element* elem = user_data; call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; - if (*calld->recv_message != NULL && - (*calld->recv_message)->length > chand->max_recv_size) { + if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && + (*calld->recv_message)->length > (size_t)calld->max_recv_size) { char* message_string; gpr_asprintf( - &message_string, "Received message larger than max (%u vs. %lu)", - (*calld->recv_message)->length, (unsigned long)chand->max_recv_size); + &message_string, "Received message larger than max (%u vs. %d)", + (*calld->recv_message)->length, calld->max_recv_size); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message( @@ -86,13 +90,12 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; // Check max send message size. - if (op->send_message != NULL && - op->send_message->length > chand->max_send_size) { + if (op->send_message != NULL && calld->max_send_size >= 0 && + op->send_message->length > (size_t)calld->max_send_size) { char* message_string; - gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)", - op->send_message->length, (unsigned long)chand->max_send_size); + gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", + op->send_message->length, calld->max_send_size); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message( @@ -112,9 +115,37 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_element_args* args) { + channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + calld->max_send_size = chand->max_send_size; + calld->max_recv_size = chand->max_recv_size; + if (chand->method_config_table != NULL) { + grpc_method_config* method_config = + grpc_method_config_table_get_method_config(chand->method_config_table, + args->path); + if (method_config != NULL) { + int32_t* max_request_message_bytes = + grpc_method_config_get_max_request_message_bytes(method_config); + if (max_request_message_bytes != NULL && + (*max_request_message_bytes < calld->max_send_size || + calld->max_send_size < 0)) { + calld->max_send_size = *max_request_message_bytes; + } + int32_t* max_response_message_bytes = + grpc_method_config_get_max_response_message_bytes(method_config); + if (max_response_message_bytes != NULL && + (*max_response_message_bytes < calld->max_recv_size || + calld->max_recv_size < 0)) { + calld->max_recv_size = *max_response_message_bytes; + } + } + } return GRPC_ERROR_NONE; } @@ -145,11 +176,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, &args->channel_args->args[i], options); } } + // Get method config table from channel args. + const grpc_arg *channel_arg = grpc_channel_args_find( + args->channel_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + chand->method_config_table = grpc_method_config_table_ref( + (grpc_method_config_table *)channel_arg->value.pointer.p); + } } // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} + grpc_channel_element* elem) { + channel_data* chand = elem->channel_data; + grpc_method_config_table_unref(chand->method_config_table); +} const grpc_channel_filter grpc_message_size_filter = { start_transport_stream_op,