diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 1aecd3400aa..1cd5e3d6841 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -155,7 +155,7 @@ struct grpc_call { legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -201,6 +201,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, return call; } +void grpc_call_set_completion_queue(grpc_call *call, + grpc_completion_queue *cq) { + call->cq = cq; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { @@ -284,8 +289,8 @@ static void unlock(grpc_call *call) { send_action sa = SEND_NOTHING; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int num_completed_requests = call->num_completed_requests; - int need_more_data = - call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); + int need_more_data = call->need_more_data && + !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); int i; if (need_more_data) { @@ -461,8 +466,7 @@ static void finish_start_step(void *pc, grpc_op_error error) { static send_action choose_send_action(grpc_call *call) { switch (call->write_state) { case WRITE_STATE_INITIAL: - if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != - REQSET_EMPTY) { + if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != REQSET_EMPTY) { call->write_state = WRITE_STATE_STARTED; return SEND_INITIAL_METADATA; } @@ -807,7 +811,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -887,7 +891,9 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); } -static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {} +static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { + grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); +} grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 723f132015e..05014c631c7 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -88,6 +88,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); +void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); + void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b87117bf727..8b94aa920af 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -185,6 +185,17 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { + event *ev; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_OP_COMPLETE); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} + void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index cbf1c4c7f36..205cb76cee8 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -78,6 +78,10 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error); +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); /* Queue a GRPC_CLIENT_METADATA_READ operation */ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, grpc_call *call, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 60eeab68b0f..d7e1dcd800a 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,15 @@ struct channel_data { }; typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, call_data *calld, void *user_data); typedef struct { void *user_data; grpc_completion_queue *cq; + grpc_call **call; + grpc_call_details *details; grpc_metadata_array *initial_metadata; new_call_cb cb; } requested_call; @@ -134,6 +137,7 @@ struct call_data { grpc_mdstr *host; legacy_data *legacy; + grpc_call_details *details; gpr_uint8 included[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -242,7 +246,8 @@ static void start_new_rpc(grpc_call_element *elem) { requested_call rc = server->requested_calls[--server->requested_call_count]; calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data); + rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld, + rc.user_data); } else { calld->state = PENDING; call_list_join(server, calld, PENDING_START); @@ -619,6 +624,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, /* terminate all the requested calls */ for (i = 0; i < requested_call_count; i++) { requested_calls[i].cb(server, requested_calls[i].cq, + requested_calls[i].call, requested_calls[i].details, requested_calls[i].initial_metadata, NULL, requested_calls[i].user_data); } @@ -669,6 +675,8 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, + grpc_call_details *details, grpc_metadata_array *initial_metadata, new_call_cb cb, void *user_data) { call_data *calld; @@ -676,7 +684,7 @@ static grpc_call_error queue_call_request(grpc_server *server, gpr_mu_lock(&server->mu); if (server->shutdown) { gpr_mu_unlock(&server->mu); - cb(server, cq, initial_metadata, NULL, user_data); + cb(server, cq, call, details, initial_metadata, NULL, user_data); return GRPC_CALL_OK; } calld = call_list_remove_head(server, PENDING_START); @@ -684,7 +692,7 @@ static grpc_call_error queue_call_request(grpc_server *server, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - cb(server, cq, initial_metadata, calld, user_data); + cb(server, cq, call, details, initial_metadata, calld, user_data); return GRPC_CALL_OK; } else { if (server->requested_call_count == server->requested_call_capacity) { @@ -698,6 +706,8 @@ static grpc_call_error queue_call_request(grpc_server *server, rc = &server->requested_calls[server->requested_call_count++]; rc->cb = cb; rc->cq = cq; + rc->call = call; + rc->details = details; rc->user_data = user_data; rc->initial_metadata = initial_metadata; gpr_mu_unlock(&server->mu); @@ -705,10 +715,55 @@ static grpc_call_error queue_call_request(grpc_server *server, } } +static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { + gpr_slice slice = value->slice; + size_t len = GPR_SLICE_LENGTH(slice); + + if (len + 1 > *capacity) { + *capacity = GPR_MAX(len + 1, *capacity * 2); + *dest = gpr_realloc(*dest, *capacity); + } + memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); +} + +static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + + if (status == GRPC_OP_OK) { + cpstr(&calld->details->host, &calld->details->host_capacity, calld->host); + cpstr(&calld->details->method, &calld->details->method_capacity, + calld->path); + calld->details->deadline = calld->deadline; + grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, + GRPC_OP_OK); + } else { + abort(); + } +} + static void begin_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - call_data *call_data, void *tag) { - abort(); + call_data *calld, void *tag) { + grpc_ioreq req; + if (!calld) { + *call = NULL; + initial_metadata->count = 0; + grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); + return; + } + calld->details = details; + grpc_call_set_completion_queue(calld->call, cq); + *call = calld->call; + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = initial_metadata; + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request, + tag); } grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, @@ -716,7 +771,8 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_metadata_array *initial_metadata, grpc_completion_queue *cq, void *tag) { grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); - return queue_call_request(server, cq, initial_metadata, begin_request, tag); + return queue_call_request(server, cq, call, details, initial_metadata, + begin_request, tag); } static void publish_legacy_request(grpc_call *call, grpc_op_error status, @@ -739,9 +795,12 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status, } static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, call_data *calld, void *tag) { grpc_ioreq req; + GPR_ASSERT(call == NULL); + GPR_ASSERT(details == NULL); if (!calld) { gpr_free(initial_metadata); grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL, @@ -764,7 +823,7 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, gpr_malloc(sizeof(grpc_metadata_array)); memset(client_metadata, 0, sizeof(*client_metadata)); grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - return queue_call_request(server, server->cq, client_metadata, + return queue_call_request(server, server->cq, NULL, NULL, client_metadata, begin_legacy_request, tag_new); } diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index f51db8cc080..e367ed3d9fb 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -178,7 +178,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "test.google.com")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); GPR_ASSERT(initial_metadata_recv.count == 0); GPR_ASSERT(trailing_metadata_recv.count == 0); GPR_ASSERT(request_metadata_recv.count == 0);