pull/357/head
Craig Tiller 10 years ago
parent 11e3c929bc
commit fa8f4016ee
  1. 105
      src/core/surface/call.c

@ -59,8 +59,6 @@ typedef struct {
grpc_recv_status status_in;
size_t msg_in_read_idx;
grpc_byte_buffer_array msg_in;
void *finished_tag;
} legacy_state;
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
@ -558,6 +556,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
for (i = 0; i < nreqs; i++) {
op = reqs[i].op;
data = reqs[i].data;
switch (op) {
default:
break;
@ -688,9 +687,9 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
if (ls->md_out_count == ls->md_out_capacity) {
ls->md_out_capacity =
GPR_MAX(ls->md_out_count * 3 / 2, ls->md_out_count + 8);
GPR_MAX(ls->md_out_capacity * 3 / 2, ls->md_out_capacity + 8);
ls->md_out =
gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity);
gpr_realloc(ls->md_out, sizeof(grpc_metadata) * ls->md_out_capacity);
}
mdout = &ls->md_out[ls->md_out_count++];
mdout->key = gpr_strdup(metadata->key);
@ -703,18 +702,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
static void publish_failed_finished(grpc_call *call, grpc_status_code status,
const char *desc) {
grpc_status_code status_code;
const char *details;
set_status_code(call, STATUS_FROM_FAILED_OP, status);
set_status_details(call, STATUS_FROM_FAILED_OP,
grpc_mdstr_from_string(call->metadata_context, desc));
get_final_status(call, &status_code, &details);
grpc_cq_end_finished(call->cq, get_legacy_state(call)->finished_tag, call,
do_nothing, NULL, status_code, details, NULL, 0);
}
static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
legacy_state *ls;
@ -722,13 +709,9 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
ls = get_legacy_state(call);
unlock(call);
if (status == GRPC_OP_OK) {
grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
ls->status_in.status, ls->status_in.details,
ls->trail_md_in.metadata, ls->trail_md_in.count);
} else {
publish_failed_finished(call, GRPC_STATUS_UNKNOWN, "Read status failed");
}
grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
ls->status_in.status, ls->status_in.details,
ls->trail_md_in.metadata, ls->trail_md_in.count);
}
static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
@ -748,56 +731,44 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
unlock(call);
}
static void finish_send_metadata(grpc_call *call, grpc_op_error status,
void *metadata_read_tag) {
grpc_ioreq reqs[2];
legacy_state *ls;
lock(call);
if (status == GRPC_OP_OK) {
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
reqs[0].data.recv_metadata = &ls->md_in;
GPR_ASSERT(GRPC_CALL_OK == start_ioreq(call, reqs, 1, finish_recv_metadata,
metadata_read_tag));
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trail_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
reqs[1].data.recv_status = &ls->status_in;
GPR_ASSERT(GRPC_CALL_OK ==
start_ioreq(call, reqs, 2, finish_status, ls->finished_tag));
} else {
ls = get_legacy_state(call);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
do_nothing, NULL, 0, NULL);
publish_failed_finished(call, GRPC_STATUS_UNKNOWN,
"Failed to read initial metadata");
}
unlock(call);
}
static void finish_send_metadata(grpc_call *call, grpc_op_error status, void *tag) {}
grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags) {
grpc_ioreq req;
legacy_state *ls = get_legacy_state(call);
grpc_ioreq reqs[2];
legacy_state *ls;
grpc_call_error err;
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
lock(call);
ls = get_legacy_state(call);
err = bind_cq(call, cq);
if (err != GRPC_CALL_OK) return err;
get_legacy_state(call)->finished_tag = finished_tag;
req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req.data.send_metadata.count = ls->md_out_count;
req.data.send_metadata.metadata = ls->md_out;
err = start_ioreq(call, &req, 1, finish_send_metadata, metadata_read_tag);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
reqs[0].data.send_metadata.count = ls->md_out_count;
reqs[0].data.send_metadata.metadata = ls->md_out;
ls->md_out_count = 0;
err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
reqs[0].data.recv_metadata = &ls->md_in;
err = start_ioreq(call, reqs, 1, finish_recv_metadata,
metadata_read_tag);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trail_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
reqs[1].data.recv_status = &ls->status_in;
err = start_ioreq(call, reqs, 2, finish_status, finished_tag);
if (err != GRPC_CALL_OK) goto done;
done:
unlock(call);
return err;
}
@ -938,12 +909,14 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
const char *details, void *tag) {
grpc_ioreq reqs[2];
grpc_call_error err;
legacy_state *ls;
grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
lock(call);
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
reqs[0].data.send_metadata.count = call->legacy_state->md_out_count;
reqs[0].data.send_metadata.metadata = call->legacy_state->md_out;
reqs[0].data.send_metadata.count = ls->md_out_count;
reqs[0].data.send_metadata.metadata = ls->md_out;
reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
reqs[1].data.send_close.status = status;
reqs[1].data.send_close.details = details;
@ -1071,8 +1044,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
.data.recv_metadata
: &call->buffered_initial_metadata;
} else {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].state == REQ_READY
? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
.data.recv_metadata
: &call->buffered_trailing_metadata;
}

Loading…
Cancel
Save