From f8d01f3834593dc134168464a4dab25d11d9065a Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 27 Apr 2017 17:28:43 -0700 Subject: [PATCH] Intercept compression when workaround is active --- .../workaround_cronet_compression_filter.c | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index 6d00900ccc4..8dbf4c2b0a0 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -37,8 +37,8 @@ #include #include "src/core/ext/filters/workarounds/workaround_utils.h" -#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata.h" typedef struct call_data { @@ -47,7 +47,7 @@ typedef struct call_data { // call our next_recv_initial_metadata_ready member after handling it. grpc_closure recv_initial_metadata_ready; // Used by recv_initial_metadata_ready. - grpc_metadata_batch *recv_initial_metadata; + grpc_metadata_batch* recv_initial_metadata; // Original recv_initial_metadata_ready callback, invoked after our own. grpc_closure* next_recv_initial_metadata_ready; @@ -59,9 +59,9 @@ typedef struct channel_data { } channel_data; // Find the user agent metadata element in the batch -static bool get_user_agent_mdelem(const grpc_metadata_batch *batch, - grpc_mdelem *md) { - grpc_linked_mdelem *t = batch->list.head; +static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, + grpc_mdelem* md) { + grpc_linked_mdelem* t = batch->list.head; while (t != NULL) { *md = t->md; if (grpc_slice_eq(GRPC_MDKEY(*md), GRPC_MDSTR_USER_AGENT)) { @@ -74,16 +74,17 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch *batch, } // Callback invoked when we receive an initial metadata. -static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* error) { +static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, + void* user_data, grpc_error* error) { grpc_call_element* elem = user_data; call_data* calld = elem->call_data; if (GRPC_ERROR_NONE == error) { grpc_mdelem md; if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) { - grpc_user_agent_md *user_agent_md = grpc_parse_user_agent(md); - if (user_agent_md->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) { + grpc_user_agent_md* user_agent_md = grpc_parse_user_agent(md); + if (user_agent_md + ->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) { calld->workaround_active = true; } // Remove with caching @@ -92,7 +93,8 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data } // Invoke the next callback. - grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); + grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Start transport stream op. @@ -105,8 +107,17 @@ static void start_transport_stream_op_batch( if (op->recv_initial_metadata) { calld->next_recv_initial_metadata_ready = op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; - calld->recv_initial_metadata = op->payload->recv_initial_metadata.recv_initial_metadata; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + } + + if (op->send_message) { + /* Send message happens after client's user-agent (initial metadata) is received, so workaround_active must be set already */ + if (calld->workaround_active) { + op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS; + } } // Chain to the next filter. @@ -120,7 +131,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, call_data* calld = elem->call_data; calld->next_recv_initial_metadata_ready = NULL; calld->workaround_active = false; - grpc_closure_init(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -148,12 +160,12 @@ static bool parse_user_agent(grpc_mdelem md) { const char cronet_specifier[] = "cronet_http"; const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1; - char *user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); bool grpc_objc_specifier_seen = false; bool cronet_specifier_seen = false; char *major_version = user_agent_str, *minor_version; - char *head = strtok(user_agent_str, " "); + char* head = strtok(user_agent_str, " "); while (head != NULL) { if (!grpc_objc_specifier_seen && 0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) { @@ -173,8 +185,7 @@ static bool parse_user_agent(grpc_mdelem md) { } gpr_free(user_agent_str); - return (grpc_objc_specifier_seen && - cronet_specifier_seen && + return (grpc_objc_specifier_seen && cronet_specifier_seen && (atol(major_version) < 1 || (atol(major_version) == 1 && atol(minor_version) <= 3))); } @@ -193,9 +204,8 @@ const grpc_channel_filter grpc_workaround_cronet_compression_filter = { grpc_channel_next_get_info, "workaround_cronet_compression"}; -static bool register_workaround_cronet_compression(grpc_exec_ctx* exec_ctx, - grpc_channel_stack_builder* builder, - void* arg) { +static bool register_workaround_cronet_compression( + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) { grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION, parse_user_agent); return grpc_channel_stack_builder_prepend_filter( @@ -203,9 +213,9 @@ static bool register_workaround_cronet_compression(grpc_exec_ctx* exec_ctx, } void grpc_workaround_cronet_compression_filter_init(void) { - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, - GRPC_WORKAROUND_PRIORITY_HIGH, - register_workaround_cronet_compression, NULL); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH, + register_workaround_cronet_compression, NULL); } void grpc_workaround_cronet_compression_filter_shutdown(void) {}