Punch through flags for incoming calls

pull/5691/head
Craig Tiller 9 years ago
parent 76ee82c353
commit 5987c70bd1
  1. 35
      include/grpc/grpc.h
  2. 1
      include/grpc/impl/codegen/grpc_types.h
  3. 22
      src/core/surface/server.c
  4. 3
      src/cpp/server/server.cc

@ -36,13 +36,13 @@
#include <grpc/status.h>
#include <stddef.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/propagation_bits.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/codegen/propagation_bits.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
@ -154,9 +154,8 @@ GRPCAPI void grpc_alarm_cancel(grpc_alarm *alarm);
GRPCAPI void grpc_alarm_destroy(grpc_alarm *alarm);
/** Check the connectivity state of a channel. */
GRPCAPI grpc_connectivity_state
grpc_channel_check_connectivity_state(grpc_channel *channel,
int try_to_connect);
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect);
/** Watch for a change in connectivity state.
Once the channel connectivity state is different from last_observed_state,
@ -267,9 +266,10 @@ GRPCAPI grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved);
and description passed in.
Importantly, this function does not send status nor description to the
remote endpoint. */
GRPCAPI grpc_call_error
grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description, void *reserved);
GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description,
void *reserved);
/** Destroy a call.
THREAD SAFETY: grpc_call_destroy is thread-compatible */
@ -283,13 +283,11 @@ GRPCAPI void grpc_call_destroy(grpc_call *call);
to \a cq_bound_to_call.
Note that \a cq_for_notification must have been registered to the server via
\a grpc_server_register_completion_queue. */
GRPCAPI grpc_call_error
grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *request_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification,
void *tag_new);
GRPCAPI grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag_new);
/** Registers a method in the server.
Methods to this (host, method) pair will not be reported by
@ -299,7 +297,8 @@ grpc_server_request_call(grpc_server *server, grpc_call **call,
Must be called before grpc_server_start.
Returns NULL on failure. */
GRPCAPI void *grpc_server_register_method(grpc_server *server,
const char *method, const char *host);
const char *method, const char *host,
uint32_t flags);
/** Request notification of a new pre-registered call. 'cq_for_notification'
must have been registered to the server via

@ -250,6 +250,7 @@ typedef struct {
char *host;
size_t host_capacity;
gpr_timespec deadline;
uint32_t flags;
void *reserved;
} grpc_call_details;

@ -758,10 +758,17 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
static const grpc_channel_filter server_surface_filter = {
server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, "server",
server_start_transport_stream_op,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"server",
};
void grpc_server_register_completion_queue(grpc_server *server,
@ -845,7 +852,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
const char *host, uint32_t flags) {
registered_method *m;
GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
3, (server, method, host));
@ -930,7 +937,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
num_filters, args, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)->channel_data;
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
chand->server = s;
server_ref(s);
chand->channel = channel;
@ -951,7 +959,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
method = grpc_mdstr_from_string(rm->method);
hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
for (probes = 0; chand->registered_methods[(hash + probes) % slots]
.server_registered_method != NULL;
.server_registered_method != NULL;
probes++)
;
if (probes > max_probes) max_probes = probes;

@ -264,6 +264,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
uint32_t incoming_flags_;
grpc_call* call_;
grpc_call_details* call_details_;
gpr_timespec deadline_;
@ -334,7 +335,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
}
RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
host ? host->c_str() : nullptr, 0);
if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());

Loading…
Cancel
Save