|
|
|
@ -554,18 +554,10 @@ static grpc_call_error start_ioreq_error(grpc_call *call, |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void dump_req_state(const char *debug, const char *stage, grpc_call *call) { |
|
|
|
|
size_t i; |
|
|
|
|
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { |
|
|
|
|
gpr_log(GPR_DEBUG, "%p:%s:%s:%d:%d", call, debug, stage, i, call->requests[i].set); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
size_t nreqs, |
|
|
|
|
grpc_ioreq_completion_func completion, |
|
|
|
|
void *user_data, |
|
|
|
|
const char *debug) { |
|
|
|
|
void *user_data) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_uint32 have_ops = 0; |
|
|
|
|
grpc_ioreq_op op; |
|
|
|
@ -574,8 +566,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
grpc_ioreq_data data; |
|
|
|
|
gpr_uint8 set; |
|
|
|
|
|
|
|
|
|
dump_req_state(debug, "before", call); |
|
|
|
|
|
|
|
|
|
if (nreqs == 0) { |
|
|
|
|
return GRPC_CALL_OK; |
|
|
|
|
} |
|
|
|
@ -615,11 +605,21 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
*data.recv_message = grpc_bbq_pop(&call->incoming_queue); |
|
|
|
|
if (*data.recv_message) { |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); |
|
|
|
|
} else if (call->stream_closed) { |
|
|
|
|
if (call->stream_closed && grpc_bbq_empty(&call->incoming_queue)) { |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
/* no message: either end of stream or we need more bytes */ |
|
|
|
|
if (call->read_closed) { |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); |
|
|
|
|
if (call->stream_closed) { |
|
|
|
|
/* stream closed AND we've drained all messages: signal to the application */ |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
call->need_more_data = 1; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_IOREQ_RECV_STATUS: |
|
|
|
|
case GRPC_IOREQ_RECV_CLOSE: |
|
|
|
@ -627,11 +627,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
finish_ioreq_op(call, op, GRPC_OP_OK); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_IOREQ_SEND_MESSAGE: |
|
|
|
|
if (call->stream_closed) { |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_IOREQ_SEND_CLOSE: |
|
|
|
|
if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { |
|
|
|
|
requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; |
|
|
|
@ -640,6 +635,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_IOREQ_SEND_MESSAGE: |
|
|
|
|
case GRPC_IOREQ_SEND_INITIAL_METADATA: |
|
|
|
|
case GRPC_IOREQ_SEND_TRAILING_METADATA: |
|
|
|
|
case GRPC_IOREQ_SEND_STATUS: |
|
|
|
@ -673,8 +669,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dump_req_state(debug, "after", call); |
|
|
|
|
|
|
|
|
|
return GRPC_CALL_OK; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -687,7 +681,7 @@ grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, |
|
|
|
|
size_t nreqs, void *tag) { |
|
|
|
|
grpc_call_error err; |
|
|
|
|
lock(call); |
|
|
|
|
err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag); |
|
|
|
|
unlock(call); |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
@ -697,7 +691,7 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( |
|
|
|
|
grpc_ioreq_completion_func on_complete, void *user_data) { |
|
|
|
|
grpc_call_error err; |
|
|
|
|
lock(call); |
|
|
|
|
err = start_ioreq(call, reqs, nreqs, on_complete, user_data, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, nreqs, on_complete, user_data); |
|
|
|
|
unlock(call); |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
@ -833,12 +827,12 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, |
|
|
|
|
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; |
|
|
|
|
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; |
|
|
|
|
ls->md_out_buffer++; |
|
|
|
|
err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL); |
|
|
|
|
if (err != GRPC_CALL_OK) goto done; |
|
|
|
|
|
|
|
|
|
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; |
|
|
|
|
reqs[0].data.recv_metadata = &ls->initial_md_in; |
|
|
|
|
err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag); |
|
|
|
|
if (err != GRPC_CALL_OK) goto done; |
|
|
|
|
|
|
|
|
|
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; |
|
|
|
@ -848,7 +842,7 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, |
|
|
|
|
reqs[1].data.recv_status.details_capacity = &ls->details_capacity; |
|
|
|
|
reqs[1].data.recv_status.code = &ls->status; |
|
|
|
|
reqs[2].op = GRPC_IOREQ_RECV_CLOSE; |
|
|
|
|
err = start_ioreq(call, reqs, 3, finish_status, NULL, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, 3, finish_status, NULL); |
|
|
|
|
if (err != GRPC_CALL_OK) goto done; |
|
|
|
|
|
|
|
|
|
done: |
|
|
|
@ -880,7 +874,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call, |
|
|
|
|
reqs[0].data.recv_status.details_capacity = 0; |
|
|
|
|
reqs[0].data.recv_status.code = &ls->status; |
|
|
|
|
reqs[1].op = GRPC_IOREQ_RECV_CLOSE; |
|
|
|
|
err = start_ioreq(call, reqs, 2, finish_status, NULL, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, 2, finish_status, NULL); |
|
|
|
|
unlock(call); |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
@ -899,7 +893,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, |
|
|
|
|
req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; |
|
|
|
|
req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; |
|
|
|
|
req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL); |
|
|
|
|
unlock(call); |
|
|
|
|
|
|
|
|
|
return err; |
|
|
|
@ -938,7 +932,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { |
|
|
|
|
ls = get_legacy_state(call); |
|
|
|
|
req.op = GRPC_IOREQ_RECV_MESSAGE; |
|
|
|
|
req.data.recv_message = &ls->msg_in; |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_read, tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_read, tag); |
|
|
|
|
unlock(call); |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
@ -964,7 +958,7 @@ grpc_call_error grpc_call_start_write(grpc_call *call, |
|
|
|
|
ls->msg_out = grpc_byte_buffer_copy(byte_buffer); |
|
|
|
|
req.op = GRPC_IOREQ_SEND_MESSAGE; |
|
|
|
|
req.data.send_message = ls->msg_out; |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_write, tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_write, tag); |
|
|
|
|
unlock(call); |
|
|
|
|
|
|
|
|
|
return err; |
|
|
|
@ -981,7 +975,7 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { |
|
|
|
|
|
|
|
|
|
lock(call); |
|
|
|
|
req.op = GRPC_IOREQ_SEND_CLOSE; |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_finish, tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, &req, 1, finish_finish, tag); |
|
|
|
|
unlock(call); |
|
|
|
|
|
|
|
|
|
return err; |
|
|
|
@ -1005,7 +999,7 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call, |
|
|
|
|
/* MEMLEAK */ |
|
|
|
|
reqs[1].data.send_status.details = gpr_strdup(details); |
|
|
|
|
reqs[2].op = GRPC_IOREQ_SEND_CLOSE; |
|
|
|
|
err = start_ioreq(call, reqs, 3, finish_finish, tag, __FUNCTION__); |
|
|
|
|
err = start_ioreq(call, reqs, 3, finish_finish, tag); |
|
|
|
|
unlock(call); |
|
|
|
|
|
|
|
|
|
return err; |
|
|
|
|