Add deadline API on server side.

Change on 2015/01/06 by yangg <yangg@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83351442
pull/1/merge
yangg 10 years ago committed by Nicolas Noble
parent c87b1c533f
commit ed5e7e006b
  1. 4
      include/grpc++/server_context.h
  2. 2
      src/cpp/server/server_context_impl.cc
  3. 14
      src/cpp/server/server_context_impl.h
  4. 2
      src/cpp/server/server_rpc_handler.cc
  5. 3
      src/cpp/util/time.cc
  6. 93
      test/cpp/end2end/end2end_test.cc
  7. 13
      test/cpp/util/echo.proto
  8. 6
      test/cpp/util/time_test.cc

@ -34,12 +34,16 @@
#ifndef __GRPCPP_SERVER_CONTEXT_H_ #ifndef __GRPCPP_SERVER_CONTEXT_H_
#define __GRPCPP_SERVER_CONTEXT_H_ #define __GRPCPP_SERVER_CONTEXT_H_
#include <chrono>
namespace grpc { namespace grpc {
// Interface of server side rpc context. // Interface of server side rpc context.
class ServerContext { class ServerContext {
public: public:
virtual ~ServerContext() {} virtual ~ServerContext() {}
virtual std::chrono::system_clock::time_point absolute_deadline() const = 0;
}; };
} // namespace grpc } // namespace grpc

@ -35,6 +35,4 @@
namespace grpc { namespace grpc {
ServerContextImpl::ServerContextImpl() {}
} // namespace grpc } // namespace grpc

@ -36,12 +36,24 @@
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <chrono>
#include <grpc/support/time.h>
namespace grpc { namespace grpc {
class ServerContextImpl : public ServerContext { class ServerContextImpl : public ServerContext {
public: public:
ServerContextImpl(); explicit ServerContextImpl(std::chrono::system_clock::time_point deadline)
: absolute_deadline_(deadline) {}
~ServerContextImpl() {} ~ServerContextImpl() {}
std::chrono::system_clock::time_point absolute_deadline() const {
return absolute_deadline_;
}
private:
std::chrono::system_clock::time_point absolute_deadline_;
}; };
} // namespace grpc } // namespace grpc

@ -53,7 +53,7 @@ void ServerRpcHandler::StartRpc() {
return; return;
} }
ServerContextImpl user_context; ServerContextImpl user_context(async_server_context_->absolute_deadline());
if (method_->method_type() == RpcMethod::NORMAL_RPC) { if (method_->method_type() == RpcMethod::NORMAL_RPC) {
// Start the rpc on this dedicated completion queue. // Start the rpc on this dedicated completion queue.

@ -53,6 +53,9 @@ void Timepoint2Timespec(const system_clock::time_point& from,
} }
system_clock::time_point Timespec2Timepoint(gpr_timespec t) { system_clock::time_point Timespec2Timepoint(gpr_timespec t) {
if (gpr_time_cmp(t, gpr_inf_future) == 0) {
return system_clock::time_point::max();
}
system_clock::time_point tp; system_clock::time_point tp;
tp += duration_cast<system_clock::time_point::duration>(seconds(t.tv_sec)); tp += duration_cast<system_clock::time_point::duration>(seconds(t.tv_sec));
tp += tp +=

@ -31,10 +31,12 @@
* *
*/ */
#include <chrono>
#include <thread> #include <thread>
#include "src/cpp/server/rpc_service_method.h"
#include "test/cpp/util/echo.pb.h" #include "test/cpp/util/echo.pb.h"
#include "net/util/netutil.h" #include "src/cpp/server/rpc_service_method.h"
#include "src/cpp/util/time.h"
#include <grpc++/channel_arguments.h> #include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h> #include <grpc++/channel_interface.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
@ -44,22 +46,43 @@
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <grpc++/status.h> #include <grpc++/status.h>
#include <grpc++/stream.h> #include <grpc++/stream.h>
#include "net/util/netutil.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <grpc/support/time.h>
using grpc::cpp::test::util::EchoRequest; using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse; using grpc::cpp::test::util::EchoResponse;
using grpc::cpp::test::util::TestService; using grpc::cpp::test::util::TestService;
using std::chrono::system_clock;
namespace grpc { namespace grpc {
namespace testing {
namespace {
// When echo_deadline is requested, deadline seen in the ServerContext is set in
// the response in seconds.
void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
if (request->has_param() && request->param().echo_deadline()) {
gpr_timespec deadline = gpr_inf_future;
if (context->absolute_deadline() != system_clock::time_point::max()) {
Timepoint2Timespec(context->absolute_deadline(), &deadline);
}
response->mutable_param()->set_request_deadline(deadline.tv_sec);
}
}
} // namespace
class TestServiceImpl : public TestService::Service { class TestServiceImpl : public TestService::Service {
public: public:
Status Echo(ServerContext* context, const EchoRequest* request, Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) { EchoResponse* response) {
response->set_message(request->message()); response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
return Status::OK; return Status::OK;
} }
@ -179,6 +202,71 @@ TEST_F(End2endTest, RpcDeadlineExpires) {
delete stub; delete stub;
} }
// Set a long but finite deadline.
TEST_F(End2endTest, RpcLongDeadline) {
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), ChannelArguments());
TestService::Stub* stub = TestService::NewStub(channel);
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::hours(1);
context.set_absolute_deadline(deadline);
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
delete stub;
}
// Ask server to echo back the deadline it sees.
TEST_F(End2endTest, EchoDeadline) {
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), ChannelArguments());
TestService::Stub* stub = TestService::NewStub(channel);
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
request.mutable_param()->set_echo_deadline(true);
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(100);
context.set_absolute_deadline(deadline);
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
gpr_timespec sent_deadline;
Timepoint2Timespec(deadline, &sent_deadline);
// Allow 1 second error.
EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 1);
EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
delete stub;
}
// Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) {
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), ChannelArguments());
TestService::Stub* stub = TestService::NewStub(channel);
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
request.mutable_param()->set_echo_deadline(true);
ClientContext context;
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
EXPECT_EQ(response.param().request_deadline(), gpr_inf_future.tv_sec);
delete stub;
}
TEST_F(End2endTest, UnimplementedRpc) { TEST_F(End2endTest, UnimplementedRpc) {
std::shared_ptr<ChannelInterface> channel = std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), ChannelArguments()); CreateChannel(server_address_.str(), ChannelArguments());
@ -300,6 +388,7 @@ TEST_F(End2endTest, BidiStream) {
delete stub; delete stub;
} }
} // namespace testing
} // namespace grpc } // namespace grpc
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -2,12 +2,22 @@ syntax = "proto2";
package grpc.cpp.test.util; package grpc.cpp.test.util;
message RequestParams {
optional bool echo_deadline = 1;
}
message EchoRequest { message EchoRequest {
optional string message = 1; optional string message = 1;
optional RequestParams param = 2;
}
message ResponseParams {
optional int64 request_deadline = 1;
} }
message EchoResponse { message EchoResponse {
optional string message = 1; optional string message = 1;
optional ResponseParams param = 2;
} }
service TestService { service TestService {
@ -15,6 +25,5 @@ service TestService {
rpc RequestStream(stream EchoRequest) returns (EchoResponse); rpc RequestStream(stream EchoRequest) returns (EchoResponse);
rpc ResponseStream(EchoRequest) returns (stream EchoResponse); rpc ResponseStream(EchoRequest) returns (stream EchoResponse);
rpc BidiStream(stream EchoRequest) returns (stream EchoResponse); rpc BidiStream(stream EchoRequest) returns (stream EchoResponse);
rpc Unimplemented(EchoRequest) returns (EchoResponse) { rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
} }

@ -62,5 +62,11 @@ TEST_F(TimeTest, AbsolutePointTest) {
EXPECT_TRUE(tp == tp_converted_2); EXPECT_TRUE(tp == tp_converted_2);
} }
// gpr_inf_future is treated specially and mapped to time_point::max()
TEST_F(TimeTest, InfFuture) {
EXPECT_EQ(system_clock::time_point::max(),
Timespec2Timepoint(gpr_inf_future));
}
} // namespace } // namespace
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save