Initial fixes: we now send a request

pull/357/head
Craig Tiller 10 years ago
parent ee2d702555
commit 62ac155a68
  1. 155
      src/core/surface/call.c
  2. 17
      src/core/surface/call.h
  3. 21
      src/core/surface/channel.c

@ -50,7 +50,7 @@
typedef struct {
size_t md_out_count;
size_t md_out_capacity;
grpc_mdelem **md_out;
grpc_metadata *md_out;
grpc_byte_buffer *msg_out;
/* input buffers */
@ -203,8 +203,8 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
? NULL
: TOMBSTONE_MASTER;
}
master->on_complete(call, status, master->user_data);
}
master->on_complete(call, status, master->user_data);
}
}
@ -237,22 +237,42 @@ static void finish_finish_step(void *pc, grpc_op_error error) {
}
}
static void finish_start_step(void *pc, grpc_op_error error) {
grpc_call *call = pc;
if (error == GRPC_OP_OK) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK);
start_next_step_and_unlock(
call, call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].master);
} else {
gpr_log(GPR_ERROR, "not implemented");
abort();
}
}
static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) {
reqinfo *requests = call->requests;
grpc_byte_buffer *send_message = NULL;
size_t i;
gpr_uint32 incomplete = master->need_mask & ~master->complete_mask;
grpc_call_op op;
gpr_uint32 incomplete;
gpr_uint8 send_initial_metadata = 0;
gpr_uint8 send_trailing_metadata = 0;
gpr_uint8 send_blocked = 0;
gpr_uint8 send_finished = 0;
gpr_uint8 completed;
if (!IS_LIVE_MASTER(master)) {
gpr_mu_unlock(&call->mu);
return;
}
incomplete = master->need_mask & ~master->complete_mask;
if (!send_blocked &&
OP_IN_MASK(GRPC_IOREQ_SEND_INITIAL_METADATA, incomplete)) {
send_initial_metadata = 1;
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK);
master->complete_mask |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
send_blocked = 1;
}
if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_MESSAGES, incomplete)) {
@ -262,29 +282,15 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) {
call->write_index++;
}
if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) {
send_finished = 1;
send_blocked = 1;
}
if (!send_blocked &&
OP_IN_MASK(GRPC_IOREQ_SEND_TRAILING_METADATA, incomplete)) {
send_trailing_metadata = 1;
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
}
completed = !send_blocked && master->complete_mask == master->need_mask;
if (completed) {
master->on_complete(call, GRPC_OP_OK, master->user_data);
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].master == master) {
call->requests[i].master =
(i == GRPC_IOREQ_SEND_MESSAGES || i == GRPC_IOREQ_RECV_MESSAGES)
? NULL
: TOMBSTONE_MASTER;
}
}
if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) {
send_finished = 1;
send_blocked = 1;
}
gpr_mu_unlock(&call->mu);
@ -299,25 +305,20 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) {
(const gpr_uint8 *)md->value,
md->value_length));
}
}
if (send_message) {
grpc_call_op op;
op.type = GRPC_SEND_MESSAGE;
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.message = send_message;
op.done_cb = finish_write_step;
op.done_cb = finish_start_step;
op.user_data = call;
grpc_call_execute_op(call, &op);
}
if (send_finished) {
grpc_call_op op;
op.type = GRPC_SEND_FINISH;
if (send_message) {
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = finish_finish_step;
op.data.message = send_message;
op.done_cb = finish_write_step;
op.user_data = call;
grpc_call_execute_op(call, &op);
}
@ -333,6 +334,16 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) {
md->value_length));
}
}
if (send_finished) {
grpc_call_op op;
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = finish_finish_step;
op.user_data = call;
grpc_call_execute_op(call, &op);
}
}
static grpc_call_error start_ioreq_error(grpc_call *call,
@ -489,9 +500,13 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
elem->filter->call_op(elem, NULL, op);
}
void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
gpr_uint32 flags) {
legacy_state *ls = get_legacy_state(call);
grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
legacy_state *ls;
grpc_metadata *mdout;
gpr_mu_lock(&call->mu);
ls = get_legacy_state(call);
if (ls->md_out_count == ls->md_out_capacity) {
ls->md_out_capacity =
@ -499,16 +514,14 @@ void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
ls->md_out =
gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity);
}
ls->md_out[ls->md_out_count++] = mdelem;
}
mdout = &ls->md_out[ls->md_out_count++];
mdout->key = gpr_strdup(metadata->key);
mdout->value = gpr_malloc(metadata->value_length);
mdout->value_length = metadata->value_length;
memcpy(mdout->value, metadata->value, metadata->value_length);
gpr_mu_unlock(&call->mu);
grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
grpc_call_add_mdelem(
call, grpc_mdelem_from_string_and_buffer(
call->metadata_context, metadata->key,
(gpr_uint8 *)metadata->value, metadata->value_length),
flags);
return GRPC_CALL_OK;
}
@ -531,7 +544,6 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_ioreq reqs[2];
legacy_state *ls;
gpr_mu_lock(&call->mu);
@ -540,24 +552,50 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
ls->md_in.count, ls->md_in.metadata);
} else {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
NULL);
}
gpr_mu_unlock(&call->mu);
}
static void finish_send_metadata(grpc_call *call, grpc_op_error status,
void *metadata_read_tag) {
grpc_ioreq reqs[2];
legacy_state *ls;
if (status == GRPC_OP_OK) {
/* Initially I thought about refactoring so that I could acquire this mutex
only
once, and then I remembered this API surface is deprecated and I moved
on. */
gpr_mu_lock(&call->mu);
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
reqs[0].data.recv_metadata = &ls->md_in;
GPR_ASSERT(GRPC_CALL_OK == start_ioreq_and_unlock(call, reqs, 1,
finish_recv_metadata,
metadata_read_tag));
gpr_mu_lock(&call->mu);
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trail_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
reqs[1].data.recv_status = &ls->status_in;
if (GRPC_CALL_OK != start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs),
finish_status,
ls->finished_tag)) {
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
GRPC_STATUS_UNKNOWN,
"Failed to start reading status", NULL, 0);
}
GPR_ASSERT(GRPC_CALL_OK !=
start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs),
finish_status, ls->finished_tag));
} else {
gpr_mu_unlock(&call->mu);
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
NULL);
gpr_mu_lock(&call->mu);
ls = get_legacy_state(call);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
do_nothing, NULL, 0, NULL);
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
GRPC_STATUS_UNKNOWN, "Failed to read initial metadata",
NULL, 0);
gpr_mu_unlock(&call->mu);
}
}
@ -575,9 +613,10 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
err = bind_cq(call, cq);
if (err != GRPC_CALL_OK) return err;
req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req.data.recv_metadata = &ls->md_in;
return start_ioreq_and_unlock(call, &req, 1, finish_recv_metadata,
req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req.data.send_metadata.count = ls->md_out_count;
req.data.send_metadata.metadata = ls->md_out;
return start_ioreq_and_unlock(call, &req, 1, finish_send_metadata,
metadata_read_tag);
}

@ -38,8 +38,9 @@
#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status,
void *user_data);
typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_op_error status,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data);
@ -51,14 +52,15 @@ void grpc_call_internal_unref(grpc_call *call);
the completion queue/surface layer */
void grpc_call_recv_metadata(grpc_call_element *surface_element,
grpc_mdelem *md);
void grpc_call_recv_message(
grpc_call_element *surface_element, grpc_byte_buffer *message);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
void grpc_call_stream_closed(grpc_call_element *surface_element);
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
/* Called when it's known that the initial batch of metadata is complete on the
client side (must not be called on the server) */
@ -73,7 +75,4 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
gpr_uint32 flags);
#endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */

@ -51,7 +51,7 @@ struct grpc_channel {
grpc_mdstr *authority_string;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
@ -74,12 +74,13 @@ grpc_channel *grpc_channel_create_from_filters(
static void do_nothing(void *ignored, grpc_op_error error) {}
grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method,
const char *host,
gpr_timespec absolute_deadline) {
grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec absolute_deadline) {
grpc_call *call;
grpc_mdelem *path_mdelem;
grpc_mdelem *authority_mdelem;
grpc_call_op op;
if (!channel->is_client) {
gpr_log(GPR_ERROR, "Cannot create a call on the server.");
@ -95,16 +96,22 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *metho
path_mdelem = grpc_mdelem_from_metadata_strings(
channel->metadata_context, channel->path_string,
grpc_mdstr_from_string(channel->metadata_context, method));
grpc_call_add_mdelem(call, path_mdelem, 0);
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.metadata = path_mdelem;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
grpc_mdstr_ref(channel->authority_string);
authority_mdelem = grpc_mdelem_from_metadata_strings(
channel->metadata_context, channel->authority_string,
grpc_mdstr_from_string(channel->metadata_context, host));
grpc_call_add_mdelem(call, authority_mdelem, 0);
op.data.metadata = authority_mdelem;
grpc_call_execute_op(call, &op);
if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
grpc_call_op op;
op.type = GRPC_SEND_DEADLINE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;

Loading…
Cancel
Save