Fix read path

pull/357/head
Craig Tiller 10 years ago
parent 4069b682b3
commit 2e10357eba
  1. 55
      src/core/surface/call.c

@ -104,6 +104,7 @@ struct grpc_call {
gpr_uint8 got_status_code; gpr_uint8 got_status_code;
gpr_uint8 sending; gpr_uint8 sending;
gpr_uint8 num_completed_requests; gpr_uint8 num_completed_requests;
gpr_uint8 need_more_data;
reqinfo requests[GRPC_IOREQ_OP_COUNT]; reqinfo requests[GRPC_IOREQ_OP_COUNT];
completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
@ -222,8 +223,11 @@ static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING; send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests; int num_completed_requests = call->num_completed_requests;
int need_more_data = call->need_more_data;
int i; int i;
call->need_more_data = 0;
if (num_completed_requests != 0) { if (num_completed_requests != 0) {
memcpy(completed_requests, call->completed_requests, memcpy(completed_requests, call->completed_requests,
sizeof(completed_requests)); sizeof(completed_requests));
@ -241,6 +245,10 @@ static void unlock(grpc_call *call) {
gpr_mu_unlock(&call->mu); gpr_mu_unlock(&call->mu);
if (need_more_data) {
request_more_data(call);
}
if (sa != SEND_NOTHING) { if (sa != SEND_NOTHING) {
enact_send_action(call, sa); enact_send_action(call, sa);
} }
@ -487,7 +495,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
reqinfo *master = NULL; reqinfo *master = NULL;
reqinfo *requests = call->requests; reqinfo *requests = call->requests;
grpc_ioreq_data data; grpc_ioreq_data data;
gpr_uint8 have_send_closed = 0;
for (i = 0; i < nreqs; i++) { for (i = 0; i < nreqs; i++) {
op = reqs[i].op; op = reqs[i].op;
@ -511,26 +518,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
have_ops |= 1 << op; have_ops |= 1 << op;
data = reqs[i].data; data = reqs[i].data;
switch (op) {
default:
break;
case GRPC_IOREQ_RECV_MESSAGES:
data.recv_messages->count = 0;
if (call->buffered_messages.count > 0) {
SWAP(grpc_byte_buffer_array, *data.recv_messages,
call->buffered_messages);
precomplete |= 1 << op;
abort();
}
break;
case GRPC_IOREQ_SEND_MESSAGES:
call->write_index = 0;
break;
case GRPC_IOREQ_SEND_CLOSE:
have_send_closed = 1;
break;
}
requests[op].state = REQ_READY; requests[op].state = REQ_READY;
requests[op].data = data; requests[op].data = data;
requests[op].master = master; requests[op].master = master;
@ -542,14 +529,32 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion; master->on_complete = completion;
master->user_data = user_data; master->user_data = user_data;
if (have_send_closed) { for (i = 0; i < nreqs; i++) {
if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { op = reqs[i].op;
requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; switch (op) {
default:
break;
case GRPC_IOREQ_RECV_MESSAGES:
data.recv_messages->count = 0;
if (call->buffered_messages.count > 0) {
SWAP(grpc_byte_buffer_array, *data.recv_messages,
call->buffered_messages);
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
}
break;
case GRPC_IOREQ_SEND_MESSAGES:
call->write_index = 0;
break;
case GRPC_IOREQ_SEND_CLOSE:
if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
}
break;
} }
} }
if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) { if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) {
request_more_data(call); call->need_more_data = 1;
} }
return GRPC_CALL_OK; return GRPC_CALL_OK;

Loading…
Cancel
Save