Changing the library's code to adapt with the new API.

pull/2612/head
Nicolas "Pixel" Noble 10 years ago
parent 77446adeb7
commit ebb514066e
  1. 15
      src/core/surface/call.c
  2. 9
      src/core/surface/channel.c
  3. 4
      src/core/surface/channel_create.c
  4. 10
      src/core/surface/completion_queue.c
  5. 4
      src/core/surface/server.c
  6. 3
      src/core/surface/server_create.c
  7. 10
      src/cpp/client/channel.cc
  8. 16
      src/cpp/client/client_context.cc
  9. 2
      src/cpp/client/insecure_credentials.cc
  10. 14
      src/cpp/common/completion_queue.cc
  11. 12
      src/cpp/server/server.cc
  12. 3
      src/cpp/server/server_builder.cc

@ -1182,18 +1182,22 @@ void grpc_call_destroy(grpc_call *c) {
c->cancel_alarm |= c->have_alarm; c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED; cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c); unlock(c);
if (cancel) grpc_call_cancel(c); if (cancel) grpc_call_cancel(c, NULL);
GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1); GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
} }
grpc_call_error grpc_call_cancel(grpc_call *call) { grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); (void) reserved;
return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
NULL);
} }
grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status, grpc_status_code status,
const char *description) { const char *description,
void *reserved) {
grpc_call_error r; grpc_call_error r;
(void) reserved;
lock(c); lock(c);
r = cancel_with_status(c, status, description); r = cancel_with_status(c, status, description);
unlock(c); unlock(c);
@ -1420,13 +1424,14 @@ static int are_write_flags_valid(gpr_uint32 flags) {
} }
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) { size_t nops, void *tag, void *reserved) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
size_t in; size_t in;
size_t out; size_t out;
const grpc_op *op; const grpc_op *op;
grpc_ioreq *req; grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch; void (*finish_func)(grpc_call *, int, void *) = finish_batch;
(void) reserved;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);

@ -154,7 +154,8 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_completion_queue *cq, grpc_completion_queue *cq,
const char *method, const char *host, const char *method, const char *host,
gpr_timespec deadline) { gpr_timespec deadline, void *reserved) {
(void) reserved;
return grpc_channel_create_call_internal( return grpc_channel_create_call_internal(
channel, cq, channel, cq,
grpc_mdelem_from_metadata_strings( grpc_mdelem_from_metadata_strings(
@ -167,8 +168,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
} }
void *grpc_channel_register_call(grpc_channel *channel, const char *method, void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) { const char *host, void *reserved) {
registered_call *rc = gpr_malloc(sizeof(registered_call)); registered_call *rc = gpr_malloc(sizeof(registered_call));
(void) reserved;
rc->path = grpc_mdelem_from_metadata_strings( rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method)); grpc_mdstr_from_string(channel->metadata_context, method));
@ -184,8 +186,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
grpc_call *grpc_channel_create_registered_call( grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_completion_queue *completion_queue, grpc_channel *channel, grpc_completion_queue *completion_queue,
void *registered_call_handle, gpr_timespec deadline) { void *registered_call_handle, gpr_timespec deadline, void *reserved) {
registered_call *rc = registered_call_handle; registered_call *rc = registered_call_handle;
(void) reserved;
return grpc_channel_create_call_internal( return grpc_channel_create_call_internal(
channel, completion_queue, GRPC_MDELEM_REF(rc->path), channel, completion_queue, GRPC_MDELEM_REF(rc->path),
GRPC_MDELEM_REF(rc->authority), deadline); GRPC_MDELEM_REF(rc->authority), deadline);

@ -152,7 +152,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- connect to it (trying alternatives as presented) - connect to it (trying alternatives as presented)
- perform handshakes */ - perform handshakes */
grpc_channel *grpc_channel_create(const char *target, grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args) { const grpc_channel_args *args,
void *reserved) {
grpc_channel *channel = NULL; grpc_channel *channel = NULL;
#define MAX_FILTERS 3 #define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS]; const grpc_channel_filter *filters[MAX_FILTERS];
@ -160,6 +161,7 @@ grpc_channel *grpc_channel_create(const char *target,
subchannel_factory *f; subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0; int n = 0;
(void) reserved;
/* TODO(census) /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) { if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter; filters[n++] = &grpc_client_census_filter;

@ -62,8 +62,9 @@ struct grpc_completion_queue {
int is_server_cq; int is_server_cq;
}; };
grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
(void) reserved;
memset(cc, 0, sizeof(*cc)); memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */ /* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1); gpr_ref_init(&cc->pending_events, 1);
@ -145,8 +146,10 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
} }
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) { gpr_timespec deadline,
void *reserved) {
grpc_event ret; grpc_event ret;
(void) reserved;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -185,10 +188,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
} }
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) { gpr_timespec deadline, void *reserved) {
grpc_event ret; grpc_event ret;
grpc_cq_completion *c; grpc_cq_completion *c;
grpc_cq_completion *prev; grpc_cq_completion *prev;
(void) reserved;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);

@ -726,8 +726,10 @@ static const grpc_channel_filter server_surface_filter = {
}; };
void grpc_server_register_completion_queue(grpc_server *server, void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq) { grpc_completion_queue *cq,
void *reserved) {
size_t i, n; size_t i, n;
(void) reserved;
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return; if (server->cqs[i] == cq) return;
} }

@ -36,8 +36,9 @@
#include "src/core/surface/server.h" #include "src/core/surface/server.h"
#include "src/core/channel/compress_filter.h" #include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args) { grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[] = {&grpc_compress_filter}; const grpc_channel_filter *filters[] = {&grpc_compress_filter};
(void) reserved;
return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
args); args);
} }

@ -63,12 +63,13 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
method.channel_tag() && context->authority().empty() method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(c_channel_, cq->cq(), ? grpc_channel_create_registered_call(c_channel_, cq->cq(),
method.channel_tag(), method.channel_tag(),
context->raw_deadline()) context->raw_deadline(),
nullptr)
: grpc_channel_create_call(c_channel_, cq->cq(), method.name(), : grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
context->authority().empty() context->authority().empty()
? target_.c_str() ? target_.c_str()
: context->authority().c_str(), : context->authority().c_str(),
context->raw_deadline()); context->raw_deadline(), nullptr);
grpc_census_call_set_context(c_call, context->get_census_context()); grpc_census_call_set_context(c_call, context->get_census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this()); context->set_call(c_call, shared_from_this());
@ -82,12 +83,13 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
ops->FillOps(cops, &nops); ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK == GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), cops, nops, ops)); grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
} }
void* Channel::RegisterMethod(const char* method) { void* Channel::RegisterMethod(const char* method) {
return grpc_channel_register_call(c_channel_, method, target_.c_str()); return grpc_channel_register_call(c_channel_, method, target_.c_str(),
nullptr);
} }
} // namespace grpc } // namespace grpc

@ -56,9 +56,11 @@ ClientContext::~ClientContext() {
if (cq_) { if (cq_) {
// Drain cq_. // Drain cq_.
grpc_completion_queue_shutdown(cq_); grpc_completion_queue_shutdown(cq_);
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME)) gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
.type != GRPC_QUEUE_SHUTDOWN) grpc_event event;
; do {
event = grpc_completion_queue_next(cq_, deadline, nullptr);
} while (event.type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cq_); grpc_completion_queue_destroy(cq_);
} }
} }
@ -75,19 +77,19 @@ void ClientContext::set_call(grpc_call* call,
channel_ = channel; channel_ = channel;
if (creds_ && !creds_->ApplyToCall(call_)) { if (creds_ && !creds_->ApplyToCall(call_)) {
grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED,
"Failed to set credentials to rpc."); "Failed to set credentials to rpc.", nullptr);
} }
} }
void ClientContext::set_compression_algorithm( void ClientContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) { grpc_compression_algorithm algorithm) {
char* algorithm_name = NULL; char* algorithm_name = nullptr;
if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
algorithm); algorithm);
abort(); abort();
} }
GPR_ASSERT(algorithm_name != NULL); GPR_ASSERT(algorithm_name != nullptr);
AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
} }
@ -100,7 +102,7 @@ std::shared_ptr<const AuthContext> ClientContext::auth_context() const {
void ClientContext::TryCancel() { void ClientContext::TryCancel() {
if (call_) { if (call_) {
grpc_call_cancel(call_); grpc_call_cancel(call_, nullptr);
} }
} }

@ -49,7 +49,7 @@ class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
grpc_channel_args channel_args; grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args); args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel( return std::shared_ptr<ChannelInterface>(new Channel(
target, grpc_channel_create(target.c_str(), &channel_args))); target, grpc_channel_create(target.c_str(), &channel_args, nullptr)));
} }
// InsecureCredentials should not be applied to a call. // InsecureCredentials should not be applied to a call.

@ -40,7 +40,9 @@
namespace grpc { namespace grpc {
CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); } CompletionQueue::CompletionQueue() {
cq_ = grpc_completion_queue_create(nullptr);
}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
@ -51,7 +53,7 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) { void** tag, bool* ok, gpr_timespec deadline) {
for (;;) { for (;;) {
auto ev = grpc_completion_queue_next(cq_, deadline); auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
switch (ev.type) { switch (ev.type) {
case GRPC_QUEUE_TIMEOUT: case GRPC_QUEUE_TIMEOUT:
return TIMEOUT; return TIMEOUT;
@ -70,8 +72,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
} }
bool CompletionQueue::Pluck(CompletionQueueTag* tag) { bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
auto ev = auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_completion_queue_pluck(cq_, tag, gpr_inf_future(GPR_CLOCK_REALTIME)); auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
bool ok = ev.success != 0; bool ok = ev.success != 0;
void* ignored = tag; void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
@ -81,8 +83,8 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
} }
void CompletionQueue::TryPluck(CompletionQueueTag* tag) { void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
auto ev = auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
grpc_completion_queue_pluck(cq_, tag, gpr_time_0(GPR_CLOCK_REALTIME)); auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return; if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0; bool ok = ev.success != 0;
void* ignored = tag; void* ignored = tag;

@ -84,7 +84,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd; return mrd;
} }
void SetupRequest() { cq_ = grpc_completion_queue_create(); } void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() { void TeardownRequest() {
grpc_completion_queue_destroy(cq_); grpc_completion_queue_destroy(cq_);
@ -170,9 +170,9 @@ static grpc_server* CreateServer(int max_message_size) {
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size; arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg}; grpc_channel_args args = {1, &arg};
return grpc_server_create(&args); return grpc_server_create(&args, nullptr);
} else { } else {
return grpc_server_create(nullptr); return grpc_server_create(nullptr, nullptr);
} }
} }
@ -186,7 +186,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
server_(CreateServer(max_message_size)), server_(CreateServer(max_message_size)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) { thread_pool_owned_(thread_pool_owned) {
grpc_server_register_completion_queue(server_, cq_.cq()); grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
} }
Server::~Server() { Server::~Server() {
@ -297,8 +297,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
size_t nops = 0; size_t nops = 0;
grpc_op cops[MAX_OPS]; grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops); ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK == auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
grpc_call_start_batch(call->call(), cops, nops, ops)); GPR_ASSERT(GRPC_CALL_OK == result);
} }
Server::BaseAsyncRequest::BaseAsyncRequest( Server::BaseAsyncRequest::BaseAsyncRequest(

@ -103,7 +103,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server( std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_)); new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq()); grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
} }
for (auto service = services_.begin(); service != services_.end(); for (auto service = services_.begin(); service != services_.end();
service++) { service++) {

Loading…
Cancel
Save