Initial interface rework to allow knowing whether to pull payload at registration, not at request time

pull/6070/head
Craig Tiller 9 years ago
parent 307a7207a0
commit 06cb1a9d93
  1. 15
      include/grpc/grpc.h
  2. 4
      include/grpc/impl/codegen/grpc_types.h
  3. 139
      src/core/lib/surface/server.c
  4. 18
      src/cpp/server/server.cc
  5. 6
      test/core/bad_client/bad_client.c
  6. 20
      test/core/surface/server_test.c

@ -289,6 +289,14 @@ GRPCAPI grpc_call_error grpc_server_request_call(
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag_new);
/** How to handle payloads for a registered method */
typedef enum {
/** Don't try to read the payload */
GRPC_SRM_PAYLOAD_NONE,
/** Read the initial payload as a byte buffer */
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER
} grpc_server_register_method_payload_handling;
/** Registers a method in the server.
Methods to this (host, method) pair will not be reported by
grpc_server_request_call, but instead be reported by
@ -296,9 +304,10 @@ GRPCAPI grpc_call_error grpc_server_request_call(
registered_method (as returned by this function).
Must be called before grpc_server_start.
Returns NULL on failure. */
GRPCAPI void *grpc_server_register_method(grpc_server *server,
const char *method, const char *host,
uint32_t flags);
GRPCAPI void *grpc_server_register_method(
grpc_server *server, const char *method, const char *host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags);
/** Request notification of a new pre-registered call. 'cq_for_notification'
must have been registered to the server via

@ -185,7 +185,9 @@ typedef enum grpc_call_error {
server */
GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE,
/** this batch of operations leads to more operations than allowed */
GRPC_CALL_ERROR_BATCH_TOO_BIG
GRPC_CALL_ERROR_BATCH_TOO_BIG,
/** payload type requested is not the type registered */
GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH
} grpc_call_error;
/* Write Flags: */

@ -173,6 +173,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
request_matcher request_matcher;
registered_method *next;
@ -417,6 +418,69 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
if (len + 1 > *capacity) {
*capacity = GPR_MAX(len + 1, *capacity * 2);
*dest = gpr_realloc(*dest, *capacity);
}
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, requested_call *rc) {
grpc_op ops[1];
grpc_op *op = ops;
memset(ops, 0, sizeof(ops));
/* called once initial metadata has been read by the call, but BEFORE
the ioreq to fetch it out of the call has been executed.
This means metadata related fields can be relied on in calld, but to
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
*rc->call = calld->call;
calld->cq_new = rc->cq_for_notification;
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
switch (rc->type) {
case BATCH_CALL:
GPR_ASSERT(calld->host != NULL);
GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
rc->data.batch.details->flags =
0 | (calld->recv_idempotent_request
? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
: 0);
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
if (rc->data.registered.optional_payload) {
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = rc->data.registered.optional_payload;
op++;
}
break;
default:
GPR_UNREACHABLE_CODE(return );
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
(size_t)(op - ops), &rc->publish);
}
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_call_element *elem, request_matcher *rm) {
call_data *calld = elem->call_data;
@ -840,8 +904,10 @@ static int streq(const char *a, const char *b) {
return 0 == strcmp(a, b);
}
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host, uint32_t flags) {
void *grpc_server_register_method(
grpc_server *server, const char *method, const char *host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) {
registered_method *m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
@ -1209,6 +1275,12 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
if ((optional_payload == NULL) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
gpr_free(rc);
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
@ -1226,69 +1298,6 @@ done:
return error;
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
if (len + 1 > *capacity) {
*capacity = GPR_MAX(len + 1, *capacity * 2);
*dest = gpr_realloc(*dest, *capacity);
}
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, requested_call *rc) {
grpc_op ops[1];
grpc_op *op = ops;
memset(ops, 0, sizeof(ops));
/* called once initial metadata has been read by the call, but BEFORE
the ioreq to fetch it out of the call has been executed.
This means metadata related fields can be relied on in calld, but to
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
*rc->call = calld->call;
calld->cq_new = rc->cq_for_notification;
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
switch (rc->type) {
case BATCH_CALL:
GPR_ASSERT(calld->host != NULL);
GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
rc->data.batch.details->flags =
0 | (calld->recv_idempotent_request
? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
: 0);
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
if (rc->data.registered.optional_payload) {
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = rc->data.registered.optional_payload;
op++;
}
break;
default:
GPR_UNREACHABLE_CODE(return );
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
(size_t)(op - ops), &rc->publish);
}
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
requested_call *rc = req;

@ -321,6 +321,19 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks.reset(callbacks);
}
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
case RpcMethod::NORMAL_RPC:
case RpcMethod::SERVER_STREAMING:
return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
case RpcMethod::CLIENT_STREAMING:
case RpcMethod::BIDI_STREAMING:
return GRPC_SRM_PAYLOAD_NONE;
}
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
}
bool Server::RegisterService(const grpc::string* host, Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
@ -334,8 +347,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr, 0);
void* tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method), 0);
if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());

@ -107,9 +107,9 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
gpr_event_init(&a.done_write);
a.validator = validator;
grpc_server_register_completion_queue(a.server, a.cq, NULL);
a.registered_method =
grpc_server_register_method(a.server, GRPC_BAD_CLIENT_REGISTERED_METHOD,
GRPC_BAD_CLIENT_REGISTERED_HOST, 0);
a.registered_method = grpc_server_register_method(
a.server, GRPC_BAD_CLIENT_REGISTERED_METHOD,
GRPC_BAD_CLIENT_REGISTERED_HOST, GRPC_SRM_PAYLOAD_NONE, 0);
grpc_server_start(a.server);
transport = grpc_create_chttp2_transport(&exec_ctx, NULL, sfd.server, 0);
server_setup_transport(&a, transport);

@ -42,19 +42,25 @@ void test_register_method_fail(void) {
grpc_server *server = grpc_server_create(NULL, NULL);
void *method;
void *method_old;
method = grpc_server_register_method(server, NULL, NULL, 0);
method =
grpc_server_register_method(server, NULL, NULL, GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method == NULL);
method_old = grpc_server_register_method(server, "m", "h", 0);
method_old =
grpc_server_register_method(server, "m", "h", GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method_old != NULL);
method = grpc_server_register_method(server, "m", "h", 0);
method = grpc_server_register_method(
server, "m", "h", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
GPR_ASSERT(method == NULL);
method_old = grpc_server_register_method(
server, "m2", "h2", GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
method_old =
grpc_server_register_method(server, "m2", "h2", GRPC_SRM_PAYLOAD_NONE,
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
GPR_ASSERT(method_old != NULL);
method = grpc_server_register_method(server, "m2", "h2", 0);
method =
grpc_server_register_method(server, "m2", "h2", GRPC_SRM_PAYLOAD_NONE, 0);
GPR_ASSERT(method == NULL);
method = grpc_server_register_method(
server, "m2", "h2", GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
server, "m2", "h2", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
GPR_ASSERT(method == NULL);
grpc_server_destroy(server);
}

Loading…
Cancel
Save