Intercept compression when workaround is active

pull/10849/head
Muxi Yan 8 years ago
parent f63afec89d
commit f8d01f3834
  1. 56
      src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c

@ -37,8 +37,8 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/ext/filters/workarounds/workaround_utils.h" #include "src/core/ext/filters/workarounds/workaround_utils.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata.h"
typedef struct call_data { typedef struct call_data {
@ -47,7 +47,7 @@ typedef struct call_data {
// call our next_recv_initial_metadata_ready member after handling it. // call our next_recv_initial_metadata_ready member after handling it.
grpc_closure recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready;
// Used by recv_initial_metadata_ready. // Used by recv_initial_metadata_ready.
grpc_metadata_batch *recv_initial_metadata; grpc_metadata_batch* recv_initial_metadata;
// Original recv_initial_metadata_ready callback, invoked after our own. // Original recv_initial_metadata_ready callback, invoked after our own.
grpc_closure* next_recv_initial_metadata_ready; grpc_closure* next_recv_initial_metadata_ready;
@ -59,9 +59,9 @@ typedef struct channel_data {
} channel_data; } channel_data;
// Find the user agent metadata element in the batch // Find the user agent metadata element in the batch
static bool get_user_agent_mdelem(const grpc_metadata_batch *batch, static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,
grpc_mdelem *md) { grpc_mdelem* md) {
grpc_linked_mdelem *t = batch->list.head; grpc_linked_mdelem* t = batch->list.head;
while (t != NULL) { while (t != NULL) {
*md = t->md; *md = t->md;
if (grpc_slice_eq(GRPC_MDKEY(*md), GRPC_MDSTR_USER_AGENT)) { if (grpc_slice_eq(GRPC_MDKEY(*md), GRPC_MDSTR_USER_AGENT)) {
@ -74,16 +74,17 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch *batch,
} }
// Callback invoked when we receive an initial metadata. // Callback invoked when we receive an initial metadata.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data, static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
grpc_error* error) { void* user_data, grpc_error* error) {
grpc_call_element* elem = user_data; grpc_call_element* elem = user_data;
call_data* calld = elem->call_data; call_data* calld = elem->call_data;
if (GRPC_ERROR_NONE == error) { if (GRPC_ERROR_NONE == error) {
grpc_mdelem md; grpc_mdelem md;
if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) { if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) {
grpc_user_agent_md *user_agent_md = grpc_parse_user_agent(md); grpc_user_agent_md* user_agent_md = grpc_parse_user_agent(md);
if (user_agent_md->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) { if (user_agent_md
->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) {
calld->workaround_active = true; calld->workaround_active = true;
} }
// Remove with caching // Remove with caching
@ -92,7 +93,8 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data
} }
// Invoke the next callback. // Invoke the next callback.
grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
} }
// Start transport stream op. // Start transport stream op.
@ -105,8 +107,17 @@ static void start_transport_stream_op_batch(
if (op->recv_initial_metadata) { if (op->recv_initial_metadata) {
calld->next_recv_initial_metadata_ready = calld->next_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready; op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; op->payload->recv_initial_metadata.recv_initial_metadata_ready =
calld->recv_initial_metadata = op->payload->recv_initial_metadata.recv_initial_metadata; &calld->recv_initial_metadata_ready;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
}
if (op->send_message) {
/* Send message happens after client's user-agent (initial metadata) is received, so workaround_active must be set already */
if (calld->workaround_active) {
op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS;
}
} }
// Chain to the next filter. // Chain to the next filter.
@ -120,7 +131,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
call_data* calld = elem->call_data; call_data* calld = elem->call_data;
calld->next_recv_initial_metadata_ready = NULL; calld->next_recv_initial_metadata_ready = NULL;
calld->workaround_active = false; calld->workaround_active = false;
grpc_closure_init(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_closure_init(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -148,12 +160,12 @@ static bool parse_user_agent(grpc_mdelem md) {
const char cronet_specifier[] = "cronet_http"; const char cronet_specifier[] = "cronet_http";
const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1; const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1;
char *user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
bool grpc_objc_specifier_seen = false; bool grpc_objc_specifier_seen = false;
bool cronet_specifier_seen = false; bool cronet_specifier_seen = false;
char *major_version = user_agent_str, *minor_version; char *major_version = user_agent_str, *minor_version;
char *head = strtok(user_agent_str, " "); char* head = strtok(user_agent_str, " ");
while (head != NULL) { while (head != NULL) {
if (!grpc_objc_specifier_seen && if (!grpc_objc_specifier_seen &&
0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) { 0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) {
@ -173,8 +185,7 @@ static bool parse_user_agent(grpc_mdelem md) {
} }
gpr_free(user_agent_str); gpr_free(user_agent_str);
return (grpc_objc_specifier_seen && return (grpc_objc_specifier_seen && cronet_specifier_seen &&
cronet_specifier_seen &&
(atol(major_version) < 1 || (atol(major_version) < 1 ||
(atol(major_version) == 1 && atol(minor_version) <= 3))); (atol(major_version) == 1 && atol(minor_version) <= 3)));
} }
@ -193,9 +204,8 @@ const grpc_channel_filter grpc_workaround_cronet_compression_filter = {
grpc_channel_next_get_info, grpc_channel_next_get_info,
"workaround_cronet_compression"}; "workaround_cronet_compression"};
static bool register_workaround_cronet_compression(grpc_exec_ctx* exec_ctx, static bool register_workaround_cronet_compression(
grpc_channel_stack_builder* builder, grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) {
void* arg) {
grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION, grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION,
parse_user_agent); parse_user_agent);
return grpc_channel_stack_builder_prepend_filter( return grpc_channel_stack_builder_prepend_filter(
@ -203,9 +213,9 @@ static bool register_workaround_cronet_compression(grpc_exec_ctx* exec_ctx,
} }
void grpc_workaround_cronet_compression_filter_init(void) { void grpc_workaround_cronet_compression_filter_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, grpc_channel_init_register_stage(
GRPC_WORKAROUND_PRIORITY_HIGH, GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH,
register_workaround_cronet_compression, NULL); register_workaround_cronet_compression, NULL);
} }
void grpc_workaround_cronet_compression_filter_shutdown(void) {} void grpc_workaround_cronet_compression_filter_shutdown(void) {}

Loading…
Cancel
Save