First test passes

pull/381/head
Craig Tiller 10 years ago
parent 5b9efed999
commit 166e25002c
  1. 16
      src/core/surface/call.c
  2. 2
      src/core/surface/call.h
  3. 11
      src/core/surface/completion_queue.c
  4. 4
      src/core/surface/completion_queue.h
  5. 73
      src/core/surface/server.c
  6. 2
      test/core/end2end/tests/simple_request.c

@ -201,6 +201,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
return call;
}
void grpc_call_set_completion_queue(grpc_call *call,
grpc_completion_queue *cq) {
call->cq = cq;
}
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {
@ -284,8 +289,8 @@ static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests;
int need_more_data =
call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
int need_more_data = call->need_more_data &&
!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
int i;
if (need_more_data) {
@ -461,8 +466,7 @@ static void finish_start_step(void *pc, grpc_op_error error) {
static send_action choose_send_action(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_INITIAL:
if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] !=
REQSET_EMPTY) {
if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != REQSET_EMPTY) {
call->write_state = WRITE_STATE_STARTED;
return SEND_INITIAL_METADATA;
}
@ -887,7 +891,9 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {}
static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {

@ -88,6 +88,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);

@ -185,6 +185,17 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
grpc_call *call, grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_OP_COMPLETE);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error) {

@ -78,6 +78,10 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error);
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
grpc_call *call, grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error);
/* Queue a GRPC_CLIENT_METADATA_READ operation */
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
grpc_call *call,

@ -72,12 +72,15 @@ struct channel_data {
};
typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *user_data);
typedef struct {
void *user_data;
grpc_completion_queue *cq;
grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
new_call_cb cb;
} requested_call;
@ -134,6 +137,7 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
grpc_call_details *details;
gpr_uint8 included[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@ -242,7 +246,8 @@ static void start_new_rpc(grpc_call_element *elem) {
requested_call rc = server->requested_calls[--server->requested_call_count];
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld,
rc.user_data);
} else {
calld->state = PENDING;
call_list_join(server, calld, PENDING_START);
@ -619,6 +624,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
/* terminate all the requested calls */
for (i = 0; i < requested_call_count; i++) {
requested_calls[i].cb(server, requested_calls[i].cq,
requested_calls[i].call, requested_calls[i].details,
requested_calls[i].initial_metadata, NULL,
requested_calls[i].user_data);
}
@ -669,6 +675,8 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
grpc_completion_queue *cq,
grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
new_call_cb cb, void *user_data) {
call_data *calld;
@ -676,7 +684,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
cb(server, cq, initial_metadata, NULL, user_data);
cb(server, cq, call, details, initial_metadata, NULL, user_data);
return GRPC_CALL_OK;
}
calld = call_list_remove_head(server, PENDING_START);
@ -684,7 +692,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
cb(server, cq, initial_metadata, calld, user_data);
cb(server, cq, call, details, initial_metadata, calld, user_data);
return GRPC_CALL_OK;
} else {
if (server->requested_call_count == server->requested_call_capacity) {
@ -698,6 +706,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
rc = &server->requested_calls[server->requested_call_count++];
rc->cb = cb;
rc->cq = cq;
rc->call = call;
rc->details = details;
rc->user_data = user_data;
rc->initial_metadata = initial_metadata;
gpr_mu_unlock(&server->mu);
@ -705,10 +715,55 @@ static grpc_call_error queue_call_request(grpc_server *server,
}
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
if (len + 1 > *capacity) {
*capacity = GPR_MAX(len + 1, *capacity * 2);
*dest = gpr_realloc(*dest, *capacity);
}
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
static void publish_request(grpc_call *call, grpc_op_error status, void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
cpstr(&calld->details->host, &calld->details->host_capacity, calld->host);
cpstr(&calld->details->method, &calld->details->method_capacity,
calld->path);
calld->details->deadline = calld->deadline;
grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL,
GRPC_OP_OK);
} else {
abort();
}
}
static void begin_request(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *call_data, void *tag) {
abort();
call_data *calld, void *tag) {
grpc_ioreq req;
if (!calld) {
*call = NULL;
initial_metadata->count = 0;
grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
return;
}
calld->details = details;
grpc_call_set_completion_queue(calld->call, cq);
*call = calld->call;
req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req.data.recv_metadata = initial_metadata;
grpc_call_internal_ref(calld->call);
grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request,
tag);
}
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
@ -716,7 +771,8 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq, void *tag) {
grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
return queue_call_request(server, cq, initial_metadata, begin_request, tag);
return queue_call_request(server, cq, call, details, initial_metadata,
begin_request, tag);
}
static void publish_legacy_request(grpc_call *call, grpc_op_error status,
@ -739,9 +795,12 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status,
}
static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *tag) {
grpc_ioreq req;
GPR_ASSERT(call == NULL);
GPR_ASSERT(details == NULL);
if (!calld) {
gpr_free(initial_metadata);
grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
@ -764,7 +823,7 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
gpr_malloc(sizeof(grpc_metadata_array));
memset(client_metadata, 0, sizeof(*client_metadata));
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
return queue_call_request(server, server->cq, client_metadata,
return queue_call_request(server, server->cq, NULL, NULL, client_metadata,
begin_legacy_request, tag_new);
}

@ -178,7 +178,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "test.google.com"));
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(was_cancelled == 1);
GPR_ASSERT(initial_metadata_recv.count == 0);
GPR_ASSERT(trailing_metadata_recv.count == 0);
GPR_ASSERT(request_metadata_recv.count == 0);

Loading…
Cancel
Save