|
|
|
@ -43,6 +43,9 @@ |
|
|
|
|
#define EXPECTED_CONTENT_TYPE "application/grpc" |
|
|
|
|
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 |
|
|
|
|
|
|
|
|
|
/* default maximum size of payload eligable for GET request */ |
|
|
|
|
static const size_t kMaxPayloadSizeForGet = 2048; |
|
|
|
|
|
|
|
|
|
typedef struct call_data { |
|
|
|
|
grpc_linked_mdelem method; |
|
|
|
|
grpc_linked_mdelem scheme; |
|
|
|
@ -50,20 +53,39 @@ typedef struct call_data { |
|
|
|
|
grpc_linked_mdelem te_trailers; |
|
|
|
|
grpc_linked_mdelem content_type; |
|
|
|
|
grpc_linked_mdelem user_agent; |
|
|
|
|
grpc_linked_mdelem payload_bin; |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch *recv_initial_metadata; |
|
|
|
|
uint8_t *payload_bytes; |
|
|
|
|
|
|
|
|
|
/* Vars to read data off of send_message */ |
|
|
|
|
grpc_transport_stream_op send_op; |
|
|
|
|
uint32_t send_length; |
|
|
|
|
uint32_t send_flags; |
|
|
|
|
gpr_slice incoming_slice; |
|
|
|
|
grpc_slice_buffer_stream replacement_stream; |
|
|
|
|
gpr_slice_buffer slices; |
|
|
|
|
/* flag that indicates that all slices of send_messages aren't availble */ |
|
|
|
|
bool send_message_blocked; |
|
|
|
|
|
|
|
|
|
/** Closure to call when finished with the hc_on_recv hook */ |
|
|
|
|
grpc_closure *on_done_recv; |
|
|
|
|
grpc_closure *on_complete; |
|
|
|
|
grpc_closure *post_send; |
|
|
|
|
|
|
|
|
|
/** Receive closures are chained: we inject this closure as the on_done_recv
|
|
|
|
|
up-call on transport_op, and remember to call our on_done_recv member |
|
|
|
|
after handling it. */ |
|
|
|
|
grpc_closure hc_on_recv; |
|
|
|
|
grpc_closure hc_on_complete; |
|
|
|
|
grpc_closure got_slice; |
|
|
|
|
grpc_closure send_done; |
|
|
|
|
} call_data; |
|
|
|
|
|
|
|
|
|
typedef struct channel_data { |
|
|
|
|
grpc_mdelem *static_scheme; |
|
|
|
|
grpc_mdelem *user_agent; |
|
|
|
|
size_t max_payload_size_for_get; |
|
|
|
|
} channel_data; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
@ -119,6 +141,24 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, |
|
|
|
|
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = user_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (calld->payload_bytes) { |
|
|
|
|
gpr_free(calld->payload_bytes); |
|
|
|
|
calld->payload_bytes = NULL; |
|
|
|
|
} |
|
|
|
|
calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = elemp; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
gpr_slice_buffer_reset_and_unref(&calld->slices); |
|
|
|
|
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { |
|
|
|
|
/* eat the things we'd like to set ourselves */ |
|
|
|
|
if (md->key == GRPC_MDSTR_METHOD) return NULL; |
|
|
|
@ -129,22 +169,105 @@ static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { |
|
|
|
|
return md; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void hc_mutate_op(grpc_call_element *elem, |
|
|
|
|
static void continue_send_message(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
uint8_t *wrptr = calld->payload_bytes; |
|
|
|
|
while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, |
|
|
|
|
&calld->incoming_slice, ~(size_t)0, |
|
|
|
|
&calld->got_slice)) { |
|
|
|
|
memcpy(wrptr, GPR_SLICE_START_PTR(calld->incoming_slice), |
|
|
|
|
GPR_SLICE_LENGTH(calld->incoming_slice)); |
|
|
|
|
wrptr += GPR_SLICE_LENGTH(calld->incoming_slice); |
|
|
|
|
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); |
|
|
|
|
if (calld->send_length == calld->slices.length) { |
|
|
|
|
calld->send_message_blocked = false; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = elemp; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
calld->send_message_blocked = false; |
|
|
|
|
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); |
|
|
|
|
if (calld->send_length == calld->slices.length) { |
|
|
|
|
/* Pass down the original send_message op that was blocked.*/ |
|
|
|
|
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, |
|
|
|
|
calld->send_flags); |
|
|
|
|
calld->send_op.send_message = &calld->replacement_stream.base; |
|
|
|
|
calld->post_send = calld->send_op.on_complete; |
|
|
|
|
calld->send_op.on_complete = &calld->send_done; |
|
|
|
|
grpc_call_next_op(exec_ctx, elem, &calld->send_op); |
|
|
|
|
} else { |
|
|
|
|
continue_send_message(exec_ctx, elem); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
grpc_transport_stream_op *op) { |
|
|
|
|
/* grab pointers to our data from the call element */ |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *channeld = elem->channel_data; |
|
|
|
|
|
|
|
|
|
if (op->send_initial_metadata != NULL) { |
|
|
|
|
/* Decide which HTTP VERB to use. We use GET if the request is marked
|
|
|
|
|
cacheable, and the operation contains both initial metadata and send |
|
|
|
|
message, and the payload is below the size threshold, and all the data |
|
|
|
|
for this request is immediately available. */ |
|
|
|
|
grpc_mdelem *method = GRPC_MDELEM_METHOD_POST; |
|
|
|
|
calld->send_message_blocked = false; |
|
|
|
|
if ((op->send_initial_metadata_flags & |
|
|
|
|
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && |
|
|
|
|
op->send_message != NULL && |
|
|
|
|
op->send_message->length < channeld->max_payload_size_for_get) { |
|
|
|
|
method = GRPC_MDELEM_METHOD_GET; |
|
|
|
|
calld->send_message_blocked = true; |
|
|
|
|
} else if (op->send_initial_metadata_flags & |
|
|
|
|
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { |
|
|
|
|
method = GRPC_MDELEM_METHOD_PUT; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Attempt to read the data from send_message and create a header field. */ |
|
|
|
|
if (method == GRPC_MDELEM_METHOD_GET) { |
|
|
|
|
/* allocate memory to hold the entire payload */ |
|
|
|
|
calld->payload_bytes = gpr_malloc(op->send_message->length); |
|
|
|
|
GPR_ASSERT(calld->payload_bytes); |
|
|
|
|
|
|
|
|
|
/* read slices of send_message and copy into payload_bytes */ |
|
|
|
|
calld->send_op = *op; |
|
|
|
|
calld->send_length = op->send_message->length; |
|
|
|
|
calld->send_flags = op->send_message->flags; |
|
|
|
|
continue_send_message(exec_ctx, elem); |
|
|
|
|
|
|
|
|
|
if (calld->send_message_blocked == false) { |
|
|
|
|
/* when all the send_message data is available, then create a MDELEM and
|
|
|
|
|
append to headers */ |
|
|
|
|
grpc_mdelem *payload_bin = grpc_mdelem_from_metadata_strings( |
|
|
|
|
GRPC_MDSTR_GRPC_PAYLOAD_BIN, |
|
|
|
|
grpc_mdstr_from_buffer(calld->payload_bytes, |
|
|
|
|
op->send_message->length)); |
|
|
|
|
grpc_metadata_batch_add_tail(op->send_initial_metadata, |
|
|
|
|
&calld->payload_bin, payload_bin); |
|
|
|
|
calld->on_complete = op->on_complete; |
|
|
|
|
op->on_complete = &calld->hc_on_complete; |
|
|
|
|
op->send_message = NULL; |
|
|
|
|
} else { |
|
|
|
|
/* Not all data is available. Fall back to POST. */ |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"Request is marked Cacheable but not all data is available.\
|
|
|
|
|
Falling back to POST"); |
|
|
|
|
method = GRPC_MDELEM_METHOD_POST; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter, |
|
|
|
|
elem); |
|
|
|
|
/* Send : prefixed headers, which have to be before any application
|
|
|
|
|
layer headers. */ |
|
|
|
|
grpc_metadata_batch_add_head( |
|
|
|
|
op->send_initial_metadata, &calld->method, |
|
|
|
|
op->send_initial_metadata_flags & |
|
|
|
|
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST |
|
|
|
|
? GRPC_MDELEM_METHOD_PUT |
|
|
|
|
: GRPC_MDELEM_METHOD_POST); |
|
|
|
|
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, |
|
|
|
|
method); |
|
|
|
|
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, |
|
|
|
|
channeld->static_scheme); |
|
|
|
|
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, |
|
|
|
@ -169,9 +292,16 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_transport_stream_op *op) { |
|
|
|
|
GPR_TIMER_BEGIN("hc_start_transport_op", 0); |
|
|
|
|
GRPC_CALL_LOG_OP(GPR_INFO, elem, op); |
|
|
|
|
hc_mutate_op(elem, op); |
|
|
|
|
hc_mutate_op(exec_ctx, elem, op); |
|
|
|
|
GPR_TIMER_END("hc_start_transport_op", 0); |
|
|
|
|
grpc_call_next_op(exec_ctx, elem, op); |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (op->send_message != NULL && calld->send_message_blocked) { |
|
|
|
|
/* Don't forward the op. send_message contains slices that aren't ready
|
|
|
|
|
yet. The call will be forwarded by the op_complete of slice read call. |
|
|
|
|
*/ |
|
|
|
|
} else { |
|
|
|
|
grpc_call_next_op(exec_ctx, elem, op); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Constructor for call_data */ |
|
|
|
@ -180,14 +310,23 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element_args *args) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
calld->on_done_recv = NULL; |
|
|
|
|
calld->on_complete = NULL; |
|
|
|
|
calld->payload_bytes = NULL; |
|
|
|
|
gpr_slice_buffer_init(&calld->slices); |
|
|
|
|
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); |
|
|
|
|
grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem); |
|
|
|
|
grpc_closure_init(&calld->got_slice, got_slice, elem); |
|
|
|
|
grpc_closure_init(&calld->send_done, send_done, elem); |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Destructor for call_data */ |
|
|
|
|
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
const grpc_call_final_info *final_info, |
|
|
|
|
void *ignored) {} |
|
|
|
|
void *ignored) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
gpr_slice_buffer_destroy(&calld->slices); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { |
|
|
|
|
unsigned i; |
|
|
|
@ -210,6 +349,22 @@ static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { |
|
|
|
|
return GRPC_MDELEM_SCHEME_HTTP; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static size_t max_payload_size_from_args(const grpc_channel_args *args) { |
|
|
|
|
if (args != NULL) { |
|
|
|
|
for (size_t i = 0; i < args->num_args; ++i) { |
|
|
|
|
if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) { |
|
|
|
|
if (args->args[i].type != GRPC_ARG_INTEGER) { |
|
|
|
|
gpr_log(GPR_ERROR, "%s: must be an integer", |
|
|
|
|
GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET); |
|
|
|
|
} else { |
|
|
|
|
return (size_t)args->args[i].value.integer; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return kMaxPayloadSizeForGet; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args, |
|
|
|
|
const char *transport_name) { |
|
|
|
|
gpr_strvec v; |
|
|
|
@ -268,6 +423,8 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GPR_ASSERT(!args->is_last); |
|
|
|
|
GPR_ASSERT(args->optional_transport != NULL); |
|
|
|
|
chand->static_scheme = scheme_from_args(args->channel_args); |
|
|
|
|
chand->max_payload_size_for_get = |
|
|
|
|
max_payload_size_from_args(args->channel_args); |
|
|
|
|
chand->user_agent = grpc_mdelem_from_metadata_strings( |
|
|
|
|
GRPC_MDSTR_USER_AGENT, |
|
|
|
|
user_agent_from_args(args->channel_args, |
|
|
|
|