Merge branch 'master' into genericstub

pull/1144/head
Yang Gao 10 years ago
commit d1b9e64b2d
  1. 5
      include/grpc++/async_generic_service.h
  2. 7
      include/grpc++/async_unary_call.h
  3. 4
      include/grpc++/byte_buffer.h
  4. 4
      include/grpc++/channel_interface.h
  5. 30
      include/grpc++/client_context.h
  6. 37
      include/grpc++/completion_queue.h
  7. 7
      include/grpc++/config.h
  8. 2
      include/grpc++/generic_stub.h
  9. 72
      include/grpc++/impl/call.h
  10. 8
      include/grpc++/impl/client_unary_call.h
  11. 2
      include/grpc/support/atm.h
  12. 2
      include/grpc/support/atm_win32.h
  13. 14
      include/grpc/support/port_platform.h
  14. 10
      src/cpp/client/channel.h
  15. 18
      src/cpp/client/channel_arguments.cc
  16. 8
      src/cpp/client/client_context.cc
  17. 8
      src/cpp/client/client_unary_call.cc
  18. 9
      src/cpp/client/create_channel.cc
  19. 22
      src/cpp/proto/proto_utils.cc
  20. 6
      src/cpp/proto/proto_utils.h
  21. 3
      src/cpp/server/async_generic_service.cc
  22. 16
      src/cpp/server/async_server_context.cc
  23. 3
      src/cpp/server/insecure_server_credentials.cc
  24. 12
      src/cpp/server/secure_server_credentials.cc
  25. 5
      src/cpp/server/server.cc
  26. 3
      src/cpp/server/server_builder.cc
  27. 4
      src/cpp/server/thread_pool.cc
  28. 2
      src/cpp/server/thread_pool.h
  29. 4
      src/cpp/util/slice.cc
  30. 4
      src/cpp/util/status.cc
  31. 4
      src/cpp/util/time.cc
  32. 4
      src/cpp/util/time.h
  33. 34
      src/node/ext/call.cc
  34. 2
      src/node/package.json
  35. 4
      src/node/test/call_test.js
  36. 18
      src/node/test/end_to_end_test.js
  37. 2
      test/core/transport/metadata_test.c
  38. 2
      test/cpp/client/credentials_test.cc
  39. 26
      test/cpp/end2end/async_end2end_test.cc
  40. 9
      test/cpp/end2end/generic_end2end_test.cc
  41. 26
      test/cpp/interop/client.cc
  42. 16
      test/cpp/interop/interop_test.cc
  43. 52
      test/cpp/qps/client_async.cc
  44. 2
      test/cpp/qps/qps_driver.cc
  45. 4
      test/cpp/qps/server.cc
  46. 44
      test/cpp/qps/server_async.cc
  47. 3
      test/cpp/util/create_test_channel.cc
  48. 2
      test/cpp/util/status_test.cc
  49. 76
      tools/dockerfile/grpc_dist_proto/Dockerfile
  50. 1
      tools/dockerfile/grpc_dist_proto/version.txt
  51. 2
      tools/gce_setup/cloud_prod_runner.sh
  52. 85
      tools/gce_setup/grpc_docker.sh

@ -41,7 +41,8 @@ struct grpc_server;
namespace grpc { namespace grpc {
typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> GenericServerAsyncReaderWriter; typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer>
GenericServerAsyncReaderWriter;
class GenericServerContext GRPC_FINAL : public ServerContext { class GenericServerContext GRPC_FINAL : public ServerContext {
public: public:
@ -74,6 +75,6 @@ class AsyncGenericService GRPC_FINAL {
Server* server_; Server* server_;
}; };
} // namespace grpc } // namespace grpc
#endif // GRPCXX_ASYNC_GENERIC_SERVICE_H #endif // GRPCXX_ASYNC_GENERIC_SERVICE_H

@ -48,10 +48,9 @@ template <class R>
class ClientAsyncResponseReader GRPC_FINAL { class ClientAsyncResponseReader GRPC_FINAL {
public: public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request, void* tag) const grpc::protobuf::Message& request, void* tag)
: context_(context), : context_(context), call_(channel->CreateCall(method, context, cq)) {
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag); init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request); init_buf_.AddSendMessage(request);

@ -72,9 +72,7 @@ class ByteBuffer GRPC_FINAL {
buffer_ = buf; buffer_ = buf;
} }
grpc_byte_buffer* buffer() const { grpc_byte_buffer* buffer() const { return buffer_; }
return buffer_;
}
grpc_byte_buffer* buffer_; grpc_byte_buffer* buffer_;
}; };

@ -51,8 +51,8 @@ class ChannelInterface : public CallHook {
public: public:
virtual ~ChannelInterface() {} virtual ~ChannelInterface() {}
virtual Call CreateCall(const RpcMethod &method, ClientContext *context, virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue *cq) = 0; CompletionQueue* cq) = 0;
}; };
} // namespace grpc } // namespace grpc

@ -74,8 +74,8 @@ class ClientContext {
ClientContext(); ClientContext();
~ClientContext(); ~ClientContext();
void AddMetadata(const grpc::string &meta_key, void AddMetadata(const grpc::string& meta_key,
const grpc::string &meta_value); const grpc::string& meta_value);
const std::multimap<grpc::string, grpc::string>& GetServerInitialMetadata() { const std::multimap<grpc::string, grpc::string>& GetServerInitialMetadata() {
GPR_ASSERT(initial_metadata_received_); GPR_ASSERT(initial_metadata_received_);
@ -87,19 +87,17 @@ class ClientContext {
return trailing_metadata_; return trailing_metadata_;
} }
void set_absolute_deadline(const system_clock::time_point &deadline); void set_absolute_deadline(const system_clock::time_point& deadline);
system_clock::time_point absolute_deadline(); system_clock::time_point absolute_deadline();
void set_authority(const grpc::string& authority) { void set_authority(const grpc::string& authority) { authority_ = authority; }
authority_ = authority;
}
void TryCancel(); void TryCancel();
private: private:
// Disallow copy and assign. // Disallow copy and assign.
ClientContext(const ClientContext &); ClientContext(const ClientContext&);
ClientContext &operator=(const ClientContext &); ClientContext& operator=(const ClientContext&);
friend class CallOpBuffer; friend class CallOpBuffer;
friend class Channel; friend class Channel;
@ -118,24 +116,22 @@ class ClientContext {
template <class R> template <class R>
friend class ::grpc::ClientAsyncResponseReader; friend class ::grpc::ClientAsyncResponseReader;
grpc_call *call() { return call_; } grpc_call* call() { return call_; }
void set_call(grpc_call *call) { void set_call(grpc_call* call) {
GPR_ASSERT(call_ == nullptr); GPR_ASSERT(call_ == nullptr);
call_ = call; call_ = call;
} }
grpc_completion_queue *cq() { return cq_; } grpc_completion_queue* cq() { return cq_; }
void set_cq(grpc_completion_queue *cq) { cq_ = cq; } void set_cq(grpc_completion_queue* cq) { cq_ = cq; }
gpr_timespec RawDeadline() { return absolute_deadline_; } gpr_timespec RawDeadline() { return absolute_deadline_; }
grpc::string authority() { grpc::string authority() { return authority_; }
return authority_;
}
bool initial_metadata_received_; bool initial_metadata_received_;
grpc_call *call_; grpc_call* call_;
grpc_completion_queue *cq_; grpc_completion_queue* cq_;
gpr_timespec absolute_deadline_; gpr_timespec absolute_deadline_;
grpc::string authority_; grpc::string authority_;
std::multimap<grpc::string, grpc::string> send_initial_metadata_; std::multimap<grpc::string, grpc::string> send_initial_metadata_;

@ -66,37 +66,38 @@ class CompletionQueueTag {
// to do) // to do)
// If this function returns false, the tag is dropped and not returned // If this function returns false, the tag is dropped and not returned
// from the completion queue // from the completion queue
virtual bool FinalizeResult(void **tag, bool *status) = 0; virtual bool FinalizeResult(void** tag, bool* status) = 0;
}; };
// grpc_completion_queue wrapper class // grpc_completion_queue wrapper class
class CompletionQueue { class CompletionQueue {
public: public:
CompletionQueue(); CompletionQueue();
explicit CompletionQueue(grpc_completion_queue *take); explicit CompletionQueue(grpc_completion_queue* take);
~CompletionQueue(); ~CompletionQueue();
// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT // Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
enum NextStatus {SHUTDOWN, GOT_EVENT, TIMEOUT}; enum NextStatus { SHUTDOWN, GOT_EVENT, TIMEOUT };
// Nonblocking (until deadline) read from queue. // Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT // Cannot rely on result of tag or ok if return is TIMEOUT
NextStatus AsyncNext(void **tag, bool *ok, NextStatus AsyncNext(void** tag, bool* ok,
std::chrono::system_clock::time_point deadline); std::chrono::system_clock::time_point deadline);
// Blocking (until deadline) read from queue. // Blocking (until deadline) read from queue.
// Returns false if the queue is ready for destruction, true if event // Returns false if the queue is ready for destruction, true if event
bool Next(void **tag, bool *ok) {
return (AsyncNext(tag,ok, bool Next(void** tag, bool* ok) {
std::chrono::system_clock::time_point::max()) != return (
SHUTDOWN); AsyncNext(tag, ok, (std::chrono::system_clock::time_point::max)()) !=
SHUTDOWN);
} }
// Shutdown has to be called, and the CompletionQueue can only be // Shutdown has to be called, and the CompletionQueue can only be
// destructed when false is returned from Next(). // destructed when false is returned from Next().
void Shutdown(); void Shutdown();
grpc_completion_queue *cq() { return cq_; } grpc_completion_queue* cq() { return cq_; }
private: private:
// Friend synchronous wrappers so that they can access Pluck(), which is // Friend synchronous wrappers so that they can access Pluck(), which is
@ -115,20 +116,20 @@ class CompletionQueue {
friend class ::grpc::ServerReaderWriter; friend class ::grpc::ServerReaderWriter;
friend class ::grpc::Server; friend class ::grpc::Server;
friend class ::grpc::ServerContext; friend class ::grpc::ServerContext;
friend Status BlockingUnaryCall(ChannelInterface *channel, friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod &method, const RpcMethod& method,
ClientContext *context, ClientContext* context,
const grpc::protobuf::Message &request, const grpc::protobuf::Message& request,
grpc::protobuf::Message *result); grpc::protobuf::Message* result);
// Wraps grpc_completion_queue_pluck. // Wraps grpc_completion_queue_pluck.
// Cannot be mixed with calls to Next(). // Cannot be mixed with calls to Next().
bool Pluck(CompletionQueueTag *tag); bool Pluck(CompletionQueueTag* tag);
// Does a single polling pluck on tag // Does a single polling pluck on tag
void TryPluck(CompletionQueueTag *tag); void TryPluck(CompletionQueueTag* tag);
grpc_completion_queue *cq_; // owned grpc_completion_queue* cq_; // owned
}; };
} // namespace grpc } // namespace grpc

@ -59,11 +59,12 @@
#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM #ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
#include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/io/zero_copy_stream.h>
#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ::google::protobuf::io::ZeroCopyOutputStream #define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ::google::protobuf::io::ZeroCopyInputStream ::google::protobuf::io::ZeroCopyOutputStream
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
::google::protobuf::io::ZeroCopyInputStream
#endif #endif
namespace grpc { namespace grpc {
typedef GRPC_CUSTOM_STRING string; typedef GRPC_CUSTOM_STRING string;

@ -59,6 +59,6 @@ class GenericStub GRPC_FINAL {
std::shared_ptr<ChannelInterface> channel_; std::shared_ptr<ChannelInterface> channel_;
}; };
} // namespace grpc } // namespace grpc
#endif // GRPCXX_GENERIC_STUB_H #endif // GRPCXX_GENERIC_STUB_H

@ -55,89 +55,89 @@ class CallOpBuffer : public CompletionQueueTag {
CallOpBuffer(); CallOpBuffer();
~CallOpBuffer(); ~CallOpBuffer();
void Reset(void *next_return_tag); void Reset(void* next_return_tag);
// Does not take ownership. // Does not take ownership.
void AddSendInitialMetadata( void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata); std::multimap<grpc::string, grpc::string>* metadata);
void AddSendInitialMetadata(ClientContext *ctx); void AddSendInitialMetadata(ClientContext* ctx);
void AddRecvInitialMetadata(ClientContext *ctx); void AddRecvInitialMetadata(ClientContext* ctx);
void AddSendMessage(const grpc::protobuf::Message &message); void AddSendMessage(const grpc::protobuf::Message& message);
void AddSendMessage(const ByteBuffer& message); void AddSendMessage(const ByteBuffer& message);
void AddRecvMessage(grpc::protobuf::Message *message); void AddRecvMessage(grpc::protobuf::Message* message);
void AddRecvMessage(ByteBuffer *message); void AddRecvMessage(ByteBuffer* message);
void AddClientSendClose(); void AddClientSendClose();
void AddClientRecvStatus(ClientContext *ctx, Status *status); void AddClientRecvStatus(ClientContext* ctx, Status* status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
const Status &status); const Status& status);
void AddServerRecvClose(bool *cancelled); void AddServerRecvClose(bool* cancelled);
// INTERNAL API: // INTERNAL API:
// Convert to an array of grpc_op elements // Convert to an array of grpc_op elements
void FillOps(grpc_op *ops, size_t *nops); void FillOps(grpc_op* ops, size_t* nops);
// Called by completion queue just prior to returning from Next() or Pluck() // Called by completion queue just prior to returning from Next() or Pluck()
bool FinalizeResult(void **tag, bool *status) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool got_message; bool got_message;
private: private:
void *return_tag_; void* return_tag_;
// Send initial metadata // Send initial metadata
bool send_initial_metadata_; bool send_initial_metadata_;
size_t initial_metadata_count_; size_t initial_metadata_count_;
grpc_metadata *initial_metadata_; grpc_metadata* initial_metadata_;
// Recv initial metadta // Recv initial metadta
std::multimap<grpc::string, grpc::string> *recv_initial_metadata_; std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
grpc_metadata_array recv_initial_metadata_arr_; grpc_metadata_array recv_initial_metadata_arr_;
// Send message // Send message
const grpc::protobuf::Message *send_message_; const grpc::protobuf::Message* send_message_;
const ByteBuffer *send_message_buffer_; const ByteBuffer* send_message_buffer_;
grpc_byte_buffer *send_buf_; grpc_byte_buffer* send_buf_;
// Recv message // Recv message
grpc::protobuf::Message *recv_message_; grpc::protobuf::Message* recv_message_;
ByteBuffer *recv_message_buffer_; ByteBuffer* recv_message_buffer_;
grpc_byte_buffer *recv_buf_; grpc_byte_buffer* recv_buf_;
// Client send close // Client send close
bool client_send_close_; bool client_send_close_;
// Client recv status // Client recv status
std::multimap<grpc::string, grpc::string> *recv_trailing_metadata_; std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_;
Status *recv_status_; Status* recv_status_;
grpc_metadata_array recv_trailing_metadata_arr_; grpc_metadata_array recv_trailing_metadata_arr_;
grpc_status_code status_code_; grpc_status_code status_code_;
char *status_details_; char* status_details_;
size_t status_details_capacity_; size_t status_details_capacity_;
// Server send status // Server send status
const Status *send_status_; const Status* send_status_;
size_t trailing_metadata_count_; size_t trailing_metadata_count_;
grpc_metadata *trailing_metadata_; grpc_metadata* trailing_metadata_;
int cancelled_buf_; int cancelled_buf_;
bool *recv_closed_; bool* recv_closed_;
}; };
// Channel and Server implement this to allow them to hook performing ops // Channel and Server implement this to allow them to hook performing ops
class CallHook { class CallHook {
public: public:
virtual ~CallHook() {} virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
}; };
// Straightforward wrapping of the C call object // Straightforward wrapping of the C call object
class Call GRPC_FINAL { class Call GRPC_FINAL {
public: public:
/* call is owned by the caller */ /* call is owned by the caller */
Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq); Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq);
void PerformOps(CallOpBuffer *buffer); void PerformOps(CallOpBuffer* buffer);
grpc_call *call() { return call_; } grpc_call* call() { return call_; }
CompletionQueue *cq() { return cq_; } CompletionQueue* cq() { return cq_; }
private: private:
CallHook *call_hook_; CallHook* call_hook_;
CompletionQueue *cq_; CompletionQueue* cq_;
grpc_call *call_; grpc_call* call_;
}; };
} // namespace grpc } // namespace grpc

@ -45,10 +45,10 @@ class RpcMethod;
class Status; class Status;
// Wrapper that performs a blocking unary call // Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext *context, ClientContext* context,
const grpc::protobuf::Message &request, const grpc::protobuf::Message& request,
grpc::protobuf::Message *result); grpc::protobuf::Message* result);
} // namespace grpc } // namespace grpc

@ -83,7 +83,7 @@
#include <grpc/support/atm_gcc_atomic.h> #include <grpc/support/atm_gcc_atomic.h>
#elif defined(GPR_GCC_SYNC) #elif defined(GPR_GCC_SYNC)
#include <grpc/support/atm_gcc_sync.h> #include <grpc/support/atm_gcc_sync.h>
#elif defined(GPR_WIN32) #elif defined(GPR_WIN32_ATOMIC)
#include <grpc/support/atm_win32.h> #include <grpc/support/atm_win32.h>
#else #else
#error could not determine platform for atm #error could not determine platform for atm

@ -51,7 +51,7 @@ static __inline gpr_atm gpr_atm_acq_load(const gpr_atm *p) {
static __inline gpr_atm gpr_atm_no_barrier_load(const gpr_atm *p) { static __inline gpr_atm gpr_atm_no_barrier_load(const gpr_atm *p) {
/* TODO(dklempner): Can we implement something better here? */ /* TODO(dklempner): Can we implement something better here? */
gpr_atm_acq_load(p); return gpr_atm_acq_load(p);
} }
static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) { static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) {

@ -43,11 +43,21 @@
#define GPR_ARCH_64 1 #define GPR_ARCH_64 1
#define GPR_GETPID_IN_PROCESS_H 1 #define GPR_GETPID_IN_PROCESS_H 1
#define GPR_WINSOCK_SOCKET 1 #define GPR_WINSOCK_SOCKET 1
#ifdef __GNUC__
#define GPR_GCC_ATOMIC 1
#else
#define GPR_WIN32_ATOMIC 1
#endif
#elif defined(_WIN32) || defined(WIN32) #elif defined(_WIN32) || defined(WIN32)
#define GPR_ARCH_32 1 #define GPR_ARCH_32 1
#define GPR_WIN32 1 #define GPR_WIN32 1
#define GPR_GETPID_IN_PROCESS_H 1 #define GPR_GETPID_IN_PROCESS_H 1
#define GPR_WINSOCK_SOCKET 1 #define GPR_WINSOCK_SOCKET 1
#ifdef __GNUC__
#define GPR_GCC_ATOMIC 1
#else
#define GPR_WIN32_ATOMIC 1
#endif
#elif defined(ANDROID) || defined(__ANDROID__) #elif defined(ANDROID) || defined(__ANDROID__)
#define GPR_ANDROID 1 #define GPR_ANDROID 1
#define GPR_ARCH_32 1 #define GPR_ARCH_32 1
@ -167,8 +177,8 @@
#endif #endif
/* Validate platform combinations */ /* Validate platform combinations */
#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32) != 1 #if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32_ATOMIC) != 1
#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32 #error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32_ATOMIC
#endif #endif
#if defined(GPR_ARCH_32) + defined(GPR_ARCH_64) != 1 #if defined(GPR_ARCH_32) + defined(GPR_ARCH_64) != 1

@ -51,16 +51,16 @@ class StreamContextInterface;
class Channel GRPC_FINAL : public ChannelInterface { class Channel GRPC_FINAL : public ChannelInterface {
public: public:
Channel(const grpc::string &target, grpc_channel *c_channel); Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE; ~Channel() GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod &method, ClientContext *context, virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue *cq) GRPC_OVERRIDE; CompletionQueue* cq) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) GRPC_OVERRIDE; virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
private: private:
const grpc::string target_; const grpc::string target_;
grpc_channel *const c_channel_; // owned grpc_channel* const c_channel_; // owned
}; };
} // namespace grpc } // namespace grpc

@ -37,7 +37,7 @@
namespace grpc { namespace grpc {
void ChannelArguments::SetSslTargetNameOverride(const grpc::string &name) { void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) {
SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name); SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name);
} }
@ -50,32 +50,32 @@ grpc::string ChannelArguments::GetSslTargetNameOverride() const {
return ""; return "";
} }
void ChannelArguments::SetInt(const grpc::string &key, int value) { void ChannelArguments::SetInt(const grpc::string& key, int value) {
grpc_arg arg; grpc_arg arg;
arg.type = GRPC_ARG_INTEGER; arg.type = GRPC_ARG_INTEGER;
strings_.push_back(key); strings_.push_back(key);
arg.key = const_cast<char *>(strings_.back().c_str()); arg.key = const_cast<char*>(strings_.back().c_str());
arg.value.integer = value; arg.value.integer = value;
args_.push_back(arg); args_.push_back(arg);
} }
void ChannelArguments::SetString(const grpc::string &key, void ChannelArguments::SetString(const grpc::string& key,
const grpc::string &value) { const grpc::string& value) {
grpc_arg arg; grpc_arg arg;
arg.type = GRPC_ARG_STRING; arg.type = GRPC_ARG_STRING;
strings_.push_back(key); strings_.push_back(key);
arg.key = const_cast<char *>(strings_.back().c_str()); arg.key = const_cast<char*>(strings_.back().c_str());
strings_.push_back(value); strings_.push_back(value);
arg.value.string = const_cast<char *>(strings_.back().c_str()); arg.value.string = const_cast<char*>(strings_.back().c_str());
args_.push_back(arg); args_.push_back(arg);
} }
void ChannelArguments::SetChannelArgs(grpc_channel_args *channel_args) const { void ChannelArguments::SetChannelArgs(grpc_channel_args* channel_args) const {
channel_args->num_args = args_.size(); channel_args->num_args = args_.size();
if (channel_args->num_args > 0) { if (channel_args->num_args > 0) {
channel_args->args = const_cast<grpc_arg *>(&args_[0]); channel_args->args = const_cast<grpc_arg*>(&args_[0]);
} }
} }

@ -53,7 +53,7 @@ ClientContext::~ClientContext() {
if (cq_) { if (cq_) {
grpc_completion_queue_shutdown(cq_); grpc_completion_queue_shutdown(cq_);
// Drain cq_. // Drain cq_.
grpc_event *ev; grpc_event* ev;
grpc_completion_type t; grpc_completion_type t;
do { do {
ev = grpc_completion_queue_next(cq_, gpr_inf_future); ev = grpc_completion_queue_next(cq_, gpr_inf_future);
@ -65,7 +65,7 @@ ClientContext::~ClientContext() {
} }
void ClientContext::set_absolute_deadline( void ClientContext::set_absolute_deadline(
const system_clock::time_point &deadline) { const system_clock::time_point& deadline) {
Timepoint2Timespec(deadline, &absolute_deadline_); Timepoint2Timespec(deadline, &absolute_deadline_);
} }
@ -73,8 +73,8 @@ system_clock::time_point ClientContext::absolute_deadline() {
return Timespec2Timepoint(absolute_deadline_); return Timespec2Timepoint(absolute_deadline_);
} }
void ClientContext::AddMetadata(const grpc::string &meta_key, void ClientContext::AddMetadata(const grpc::string& meta_key,
const grpc::string &meta_value) { const grpc::string& meta_value) {
send_initial_metadata_.insert(std::make_pair(meta_key, meta_value)); send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));
} }

@ -42,10 +42,10 @@
namespace grpc { namespace grpc {
// Wrapper that performs a blocking unary call // Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext *context, ClientContext* context,
const grpc::protobuf::Message &request, const grpc::protobuf::Message& request,
grpc::protobuf::Message *result) { grpc::protobuf::Message* result) {
CompletionQueue cq; CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq)); Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf; CallOpBuffer buf;

@ -41,9 +41,10 @@ namespace grpc {
class ChannelArguments; class ChannelArguments;
std::shared_ptr<ChannelInterface> CreateChannel( std::shared_ptr<ChannelInterface> CreateChannel(
const grpc::string &target, const std::unique_ptr<Credentials> &creds, const grpc::string& target, const std::unique_ptr<Credentials>& creds,
const ChannelArguments &args) { const ChannelArguments& args) {
return creds ? creds->CreateChannel(target, args) : return creds ? creds->CreateChannel(target, args)
std::shared_ptr<ChannelInterface>(new Channel(target, grpc_lame_client_channel_create())); : std::shared_ptr<ChannelInterface>(
new Channel(target, grpc_lame_client_channel_create()));
} }
} // namespace grpc } // namespace grpc

@ -45,7 +45,7 @@ const int kMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream { : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public: public:
explicit GrpcBufferWriter(grpc_byte_buffer **bp, explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength) int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) { : block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_byte_buffer_create(NULL, 0); *bp = grpc_byte_buffer_create(NULL, 0);
@ -58,7 +58,7 @@ class GrpcBufferWriter GRPC_FINAL
} }
} }
bool Next(void **data, int *size) GRPC_OVERRIDE { bool Next(void** data, int* size) GRPC_OVERRIDE {
if (have_backup_) { if (have_backup_) {
slice_ = backup_slice_; slice_ = backup_slice_;
have_backup_ = false; have_backup_ = false;
@ -89,7 +89,7 @@ class GrpcBufferWriter GRPC_FINAL
private: private:
const int block_size_; const int block_size_;
gpr_int64 byte_count_; gpr_int64 byte_count_;
gpr_slice_buffer *slice_buffer_; gpr_slice_buffer* slice_buffer_;
bool have_backup_; bool have_backup_;
gpr_slice backup_slice_; gpr_slice backup_slice_;
gpr_slice slice_; gpr_slice slice_;
@ -98,7 +98,7 @@ class GrpcBufferWriter GRPC_FINAL
class GrpcBufferReader GRPC_FINAL class GrpcBufferReader GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyInputStream { : public ::grpc::protobuf::io::ZeroCopyInputStream {
public: public:
explicit GrpcBufferReader(grpc_byte_buffer *buffer) explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) { : byte_count_(0), backup_count_(0) {
reader_ = grpc_byte_buffer_reader_create(buffer); reader_ = grpc_byte_buffer_reader_create(buffer);
} }
@ -106,7 +106,7 @@ class GrpcBufferReader GRPC_FINAL
grpc_byte_buffer_reader_destroy(reader_); grpc_byte_buffer_reader_destroy(reader_);
} }
bool Next(const void **data, int *size) GRPC_OVERRIDE { bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) { if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_; backup_count_;
@ -123,12 +123,10 @@ class GrpcBufferReader GRPC_FINAL
return true; return true;
} }
void BackUp(int count) GRPC_OVERRIDE { void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
backup_count_ = count;
}
bool Skip(int count) GRPC_OVERRIDE { bool Skip(int count) GRPC_OVERRIDE {
const void *data; const void* data;
int size; int size;
while (Next(&data, &size)) { while (Next(&data, &size)) {
if (size >= count) { if (size >= count) {
@ -149,18 +147,18 @@ class GrpcBufferReader GRPC_FINAL
private: private:
gpr_int64 byte_count_; gpr_int64 byte_count_;
gpr_int64 backup_count_; gpr_int64 backup_count_;
grpc_byte_buffer_reader *reader_; grpc_byte_buffer_reader* reader_;
gpr_slice slice_; gpr_slice slice_;
}; };
namespace grpc { namespace grpc {
bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp); GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer); return msg.SerializeToZeroCopyStream(&writer);
} }
bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) { bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) {
GrpcBufferReader reader(buffer); GrpcBufferReader reader(buffer);
return msg->ParseFromZeroCopyStream(&reader); return msg->ParseFromZeroCopyStream(&reader);
} }

@ -43,11 +43,11 @@ namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller // Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails, // should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged. // false is returned and buffer is left unchanged.
bool SerializeProto(const grpc::protobuf::Message &msg, bool SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer **buffer); grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg. // The caller keeps ownership of buffer and msg.
bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg); bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg);
} // namespace grpc } // namespace grpc

@ -47,5 +47,4 @@ CompletionQueue* AsyncGenericService::completion_queue() {
return &server_->cq_; return &server_->cq_;
} }
} // namespace grpc } // namespace grpc

@ -42,7 +42,7 @@
namespace grpc { namespace grpc {
AsyncServerContext::AsyncServerContext( AsyncServerContext::AsyncServerContext(
grpc_call *call, const grpc::string &method, const grpc::string &host, grpc_call* call, const grpc::string& method, const grpc::string& host,
system_clock::time_point absolute_deadline) system_clock::time_point absolute_deadline)
: method_(method), : method_(method),
host_(host), host_(host),
@ -52,22 +52,22 @@ AsyncServerContext::AsyncServerContext(
AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); } AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); }
void AsyncServerContext::Accept(grpc_completion_queue *cq) { void AsyncServerContext::Accept(grpc_completion_queue* cq) {
GPR_ASSERT(grpc_call_server_accept_old(call_, cq, this) == GRPC_CALL_OK); GPR_ASSERT(grpc_call_server_accept_old(call_, cq, this) == GRPC_CALL_OK);
GPR_ASSERT(grpc_call_server_end_initial_metadata_old( GPR_ASSERT(grpc_call_server_end_initial_metadata_old(
call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
} }
bool AsyncServerContext::StartRead(grpc::protobuf::Message *request) { bool AsyncServerContext::StartRead(grpc::protobuf::Message* request) {
GPR_ASSERT(request); GPR_ASSERT(request);
request_ = request; request_ = request;
grpc_call_error err = grpc_call_start_read_old(call_, this); grpc_call_error err = grpc_call_start_read_old(call_, this);
return err == GRPC_CALL_OK; return err == GRPC_CALL_OK;
} }
bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response, bool AsyncServerContext::StartWrite(const grpc::protobuf::Message& response,
int flags) { int flags) {
grpc_byte_buffer *buffer = nullptr; grpc_byte_buffer* buffer = nullptr;
if (!SerializeProto(response, &buffer)) { if (!SerializeProto(response, &buffer)) {
return false; return false;
} }
@ -76,16 +76,16 @@ bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response,
return err == GRPC_CALL_OK; return err == GRPC_CALL_OK;
} }
bool AsyncServerContext::StartWriteStatus(const Status &status) { bool AsyncServerContext::StartWriteStatus(const Status& status) {
grpc_call_error err = grpc_call_start_write_status_old( grpc_call_error err = grpc_call_start_write_status_old(
call_, static_cast<grpc_status_code>(status.code()), call_, static_cast<grpc_status_code>(status.code()),
status.details().empty() ? nullptr status.details().empty() ? nullptr
: const_cast<char *>(status.details().c_str()), : const_cast<char*>(status.details().c_str()),
this); this);
return err == GRPC_CALL_OK; return err == GRPC_CALL_OK;
} }
bool AsyncServerContext::ParseRead(grpc_byte_buffer *read_buffer) { bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) {
GPR_ASSERT(request_); GPR_ASSERT(request_);
bool success = DeserializeProto(read_buffer, request_); bool success = DeserializeProto(read_buffer, request_);
request_ = nullptr; request_ = nullptr;

@ -46,7 +46,8 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
} // namespace } // namespace
std::shared_ptr<ServerCredentials> InsecureServerCredentials() { std::shared_ptr<ServerCredentials> InsecureServerCredentials() {
return std::shared_ptr<ServerCredentials>(new InsecureServerCredentialsImpl()); return std::shared_ptr<ServerCredentials>(
new InsecureServerCredentialsImpl());
} }
} // namespace grpc } // namespace grpc

@ -40,7 +40,8 @@ namespace grpc {
namespace { namespace {
class SecureServerCredentials GRPC_FINAL : public ServerCredentials { class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
public: public:
explicit SecureServerCredentials(grpc_server_credentials* creds) : creds_(creds) {} explicit SecureServerCredentials(grpc_server_credentials* creds)
: creds_(creds) {}
~SecureServerCredentials() GRPC_OVERRIDE { ~SecureServerCredentials() GRPC_OVERRIDE {
grpc_server_credentials_release(creds_); grpc_server_credentials_release(creds_);
} }
@ -56,16 +57,17 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
} // namespace } // namespace
std::shared_ptr<ServerCredentials> SslServerCredentials( std::shared_ptr<ServerCredentials> SslServerCredentials(
const SslServerCredentialsOptions &options) { const SslServerCredentialsOptions& options) {
std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
for (const auto &key_cert_pair : options.pem_key_cert_pairs) { for (const auto& key_cert_pair : options.pem_key_cert_pairs) {
pem_key_cert_pairs.push_back( pem_key_cert_pairs.push_back(
{key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()}); {key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()});
} }
grpc_server_credentials *c_creds = grpc_ssl_server_credentials_create( grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
&pem_key_cert_pairs[0], pem_key_cert_pairs.size()); &pem_key_cert_pairs[0], pem_key_cert_pairs.size());
return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(c_creds)); return std::shared_ptr<ServerCredentials>(
new SecureServerCredentials(c_creds));
} }
} // namespace grpc } // namespace grpc

@ -322,11 +322,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) { payload_(nullptr) {
memset(&array_, 0, sizeof(array_)); memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_); grpc_call_details_init(&call_details_);
grpc_server_request_call( grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
server->server_, &call_, &call_details_, &array_, cq->cq(), this); cq->cq(), this);
} }
~AsyncRequest() { ~AsyncRequest() {
if (payload_) { if (payload_) {
grpc_byte_buffer_destroy(payload_); grpc_byte_buffer_destroy(payload_);

@ -56,7 +56,8 @@ void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
if (generic_service_) { if (generic_service_) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. " "Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p", service); "Dropping the service %p",
service);
return; return;
} }
generic_service_ = service; generic_service_ = service;

@ -66,12 +66,12 @@ ThreadPool::~ThreadPool() {
shutdown_ = true; shutdown_ = true;
cv_.notify_all(); cv_.notify_all();
} }
for (auto &t : threads_) { for (auto& t : threads_) {
t.join(); t.join();
} }
} }
void ThreadPool::ScheduleCallback(const std::function<void()> &callback) { void ThreadPool::ScheduleCallback(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mu_); std::lock_guard<std::mutex> lock(mu_);
callbacks_.push(callback); callbacks_.push(callback);
cv_.notify_one(); cv_.notify_one();

@ -50,7 +50,7 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
explicit ThreadPool(int num_threads); explicit ThreadPool(int num_threads);
~ThreadPool(); ~ThreadPool();
void ScheduleCallback(const std::function<void()> &callback) GRPC_OVERRIDE; void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE;
private: private:
std::mutex mu_; std::mutex mu_;

@ -37,9 +37,7 @@ namespace grpc {
Slice::Slice() : slice_(gpr_empty_slice()) {} Slice::Slice() : slice_(gpr_empty_slice()) {}
Slice::~Slice() { Slice::~Slice() { gpr_slice_unref(slice_); }
gpr_slice_unref(slice_);
}
Slice::Slice(gpr_slice slice, AddRef) : slice_(gpr_slice_ref(slice)) {} Slice::Slice(gpr_slice slice, AddRef) : slice_(gpr_slice_ref(slice)) {}

@ -35,7 +35,7 @@
namespace grpc { namespace grpc {
const Status &Status::OK = Status(); const Status& Status::OK = Status();
const Status &Status::Cancelled = Status(StatusCode::CANCELLED); const Status& Status::Cancelled = Status(StatusCode::CANCELLED);
} // namespace grpc } // namespace grpc

@ -43,8 +43,8 @@ using std::chrono::system_clock;
namespace grpc { namespace grpc {
// TODO(yangg) prevent potential overflow. // TODO(yangg) prevent potential overflow.
void Timepoint2Timespec(const system_clock::time_point &from, void Timepoint2Timespec(const system_clock::time_point& from,
gpr_timespec *to) { gpr_timespec* to) {
system_clock::duration deadline = from.time_since_epoch(); system_clock::duration deadline = from.time_since_epoch();
seconds secs = duration_cast<seconds>(deadline); seconds secs = duration_cast<seconds>(deadline);
nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs); nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs);

@ -41,8 +41,8 @@
namespace grpc { namespace grpc {
// from and to should be absolute time. // from and to should be absolute time.
void Timepoint2Timespec(const std::chrono::system_clock::time_point &from, void Timepoint2Timespec(const std::chrono::system_clock::time_point& from,
gpr_timespec *to); gpr_timespec* to);
std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t); std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t);

@ -75,6 +75,9 @@ using v8::Value;
NanCallback *Call::constructor; NanCallback *Call::constructor;
Persistent<FunctionTemplate> Call::fun_tpl; Persistent<FunctionTemplate> Call::fun_tpl;
bool EndsWith(const char *str, const char *substr) {
return strcmp(str+strlen(str)-strlen(substr), substr) == 0;
}
bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
shared_ptr<Resources> resources) { shared_ptr<Resources> resources) {
@ -99,14 +102,19 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
Handle<Value> value = values->Get(j); Handle<Value> value = values->Get(j);
grpc_metadata *current = &array->metadata[array->count]; grpc_metadata *current = &array->metadata[array->count];
current->key = **utf8_key; current->key = **utf8_key;
if (::node::Buffer::HasInstance(value)) { // Only allow binary headers for "-bin" keys
current->value = ::node::Buffer::Data(value); if (EndsWith(current->key, "-bin")) {
current->value_length = ::node::Buffer::Length(value); if (::node::Buffer::HasInstance(value)) {
Persistent<Value> *handle = new Persistent<Value>(); current->value = ::node::Buffer::Data(value);
NanAssignPersistent(*handle, value); current->value_length = ::node::Buffer::Length(value);
resources->handles.push_back(unique_ptr<PersistentHolder>( Persistent<Value> *handle = new Persistent<Value>();
new PersistentHolder(handle))); NanAssignPersistent(*handle, value);
} else if (value->IsString()) { resources->handles.push_back(unique_ptr<PersistentHolder>(
new PersistentHolder(handle)));
continue;
}
}
if (value->IsString()) {
Handle<String> string_value = value->ToString(); Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value); NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
@ -146,9 +154,13 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
array = NanNew<Array>(size_map[elem->key]); array = NanNew<Array>(size_map[elem->key]);
metadata_object->Set(key_string, array); metadata_object->Set(key_string, array);
} }
array->Set(index_map[elem->key], if (EndsWith(elem->key, "-bin")) {
MakeFastBuffer( array->Set(index_map[elem->key],
NanNewBufferHandle(elem->value, elem->value_length))); MakeFastBuffer(
NanNewBufferHandle(elem->value, elem->value_length)));
} else {
array->Set(index_map[elem->key], NanNew(elem->value));
}
index_map[elem->key] += 1; index_map[elem->key] += 1;
} }
return NanEscapeScope(metadata_object); return NanEscapeScope(metadata_object);

@ -1,6 +1,6 @@
{ {
"name": "grpc", "name": "grpc",
"version": "0.5.5", "version": "0.6.0",
"author": "Google Inc.", "author": "Google Inc.",
"description": "gRPC Library for Node", "description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/", "homepage": "http://www.grpc.io/",

@ -142,8 +142,8 @@ describe('call', function() {
assert.doesNotThrow(function() { assert.doesNotThrow(function() {
var batch = {}; var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = { batch[grpc.opType.SEND_INITIAL_METADATA] = {
'key1': [new Buffer('value1')], 'key1-bin': [new Buffer('value1')],
'key2': [new Buffer('value2')] 'key2-bin': [new Buffer('value2')]
}; };
call.startBatch(batch, function(err, resp) { call.startBatch(batch, function(err, resp) {
assert.ifError(err); assert.ifError(err);

@ -138,21 +138,21 @@ describe('end-to-end', function() {
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) { call.startBatch(client_batch, function(err, response) {
assert.ifError(err); assert.ifError(err);
assert(response['send metadata']); assert.deepEqual(response,{
assert(response['client close']); 'send metadata': true,
assert(response.hasOwnProperty('metadata')); 'client close': true,
assert.strictEqual(response.metadata.server_key[0].toString(), metadata: {server_key: ['server_value']},
'server_value'); status: {'code': grpc.status.OK,
assert.deepEqual(response.status, {'code': grpc.status.OK, 'details': status_text,
'details': status_text, 'metadata': {}}
'metadata': {}}); });
done(); done();
}); });
server.requestCall(function(err, call_details) { server.requestCall(function(err, call_details) {
var new_call = call_details['new call']; var new_call = call_details['new call'];
assert.notEqual(new_call, null); assert.notEqual(new_call, null);
assert.strictEqual(new_call.metadata.client_key[0].toString(), assert.strictEqual(new_call.metadata.client_key[0],
'client_value'); 'client_value');
var server_call = new_call.call; var server_call = new_call.call;
assert.notEqual(server_call, null); assert.notEqual(server_call, null);

@ -178,7 +178,7 @@ static void test_things_stick_around(void) {
grpc_mdctx *ctx; grpc_mdctx *ctx;
int i, j; int i, j;
char *buffer; char *buffer;
int nstrs = 10000; int nstrs = 1000;
grpc_mdstr **strs = gpr_malloc(sizeof(grpc_mdstr *) * nstrs); grpc_mdstr **strs = gpr_malloc(sizeof(grpc_mdstr *) * nstrs);
int *shuf = gpr_malloc(sizeof(int) * nstrs); int *shuf = gpr_malloc(sizeof(int) * nstrs);
grpc_mdstr *test; grpc_mdstr *test;

@ -54,7 +54,7 @@ TEST_F(CredentialsTest, InvalidServiceAccountCreds) {
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
int main(int argc, char **argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
grpc_init(); grpc_init();
int ret = RUN_ALL_TESTS(); int ret = RUN_ALL_TESTS();

@ -66,7 +66,7 @@ namespace testing {
namespace { namespace {
void* tag(int i) { return (void*)(gpr_intptr)i; } void* tag(int i) { return (void*)(gpr_intptr) i; }
void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
bool ok; bool ok;
@ -76,11 +76,11 @@ void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
EXPECT_EQ(tag(i), got_tag); EXPECT_EQ(tag(i), got_tag);
} }
void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok, void verify_timed_ok(
std::chrono::system_clock::time_point deadline = CompletionQueue* cq, int i, bool expect_ok,
std::chrono::system_clock::time_point::max(), std::chrono::system_clock::time_point deadline =
CompletionQueue::NextStatus expected_outcome = std::chrono::system_clock::time_point::max(),
CompletionQueue::GOT_EVENT) { CompletionQueue::NextStatus expected_outcome = CompletionQueue::GOT_EVENT) {
bool ok; bool ok;
void* got_tag; void* got_tag;
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome); EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
@ -195,18 +195,17 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello"); send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
response_reader(stub_->AsyncEcho(&cli_ctx, send_request, stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
&cli_cq_, tag(1)));
std::chrono::system_clock::time_point std::chrono::system_clock::time_point time_now(
time_now(std::chrono::system_clock::now()), std::chrono::system_clock::now()),
time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5)); time_limit(std::chrono::system_clock::now() + std::chrono::seconds(5));
verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
verify_timed_ok(&srv_cq_, 2, true, time_limit); verify_timed_ok(&srv_cq_, 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
@ -221,7 +220,6 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk()); EXPECT_TRUE(recv_status.IsOk());
} }
// Two pings and a final pong. // Two pings and a final pong.

@ -68,7 +68,7 @@ namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
void* tag(int i) { return (void*)(gpr_intptr)i; } void* tag(int i) { return (void*)(gpr_intptr) i; }
void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
bool ok; bool ok;
@ -91,7 +91,7 @@ bool ParseFromByteBuffer(ByteBuffer* buffer, grpc::protobuf::Message* message) {
class GenericEnd2endTest : public ::testing::Test { class GenericEnd2endTest : public ::testing::Test {
protected: protected:
GenericEnd2endTest() : generic_service_("*") {} GenericEnd2endTest() : generic_service_("*") {}
void SetUp() GRPC_OVERRIDE { void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
@ -116,8 +116,8 @@ class GenericEnd2endTest : public ::testing::Test {
} }
void ResetStub() { void ResetStub() {
std::shared_ptr<ChannelInterface> channel = std::shared_ptr<ChannelInterface> channel = CreateChannel(
CreateChannel(server_address_.str(), InsecureCredentials(), ChannelArguments()); server_address_.str(), InsecureCredentials(), ChannelArguments());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
} }
@ -238,7 +238,6 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
client_ok(6); client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
client_ok(7); client_ok(7);

@ -161,6 +161,15 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
} }
} }
void AssertOkOrPrintErrorStatus(const grpc::Status& s) {
if (s.IsOk()) {
return;
}
gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.code(),
s.details().c_str());
GPR_ASSERT(0);
}
void DoEmpty() { void DoEmpty() {
gpr_log(GPR_INFO, "Sending an empty rpc..."); gpr_log(GPR_INFO, "Sending an empty rpc...");
std::shared_ptr<ChannelInterface> channel = std::shared_ptr<ChannelInterface> channel =
@ -172,8 +181,8 @@ void DoEmpty() {
ClientContext context; ClientContext context;
grpc::Status s = stub->EmptyCall(&context, request, &response); grpc::Status s = stub->EmptyCall(&context, request, &response);
AssertOkOrPrintErrorStatus(s);
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Empty rpc done."); gpr_log(GPR_INFO, "Empty rpc done.");
} }
@ -190,7 +199,7 @@ void PerformLargeUnary(std::shared_ptr<ChannelInterface> channel,
grpc::Status s = stub->UnaryCall(&context, *request, response); grpc::Status s = stub->UnaryCall(&context, *request, response);
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
GPR_ASSERT(response->payload().type() == GPR_ASSERT(response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE); grpc::testing::PayloadType::COMPRESSABLE);
GPR_ASSERT(response->payload().body() == GPR_ASSERT(response->payload().body() ==
@ -237,8 +246,7 @@ void DoServiceAccountCreds() {
} }
void DoJwtTokenCreds() { void DoJwtTokenCreds() {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
"Sending a large unary rpc with JWT token credentials ...");
std::shared_ptr<ChannelInterface> channel = std::shared_ptr<ChannelInterface> channel =
CreateChannelForTestCase("jwt_token_creds"); CreateChannelForTestCase("jwt_token_creds");
SimpleRequest request; SimpleRequest request;
@ -285,7 +293,7 @@ void DoRequestStreaming() {
grpc::Status s = stream->Finish(); grpc::Status s = stream->Finish();
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Request streaming done."); gpr_log(GPR_INFO, "Request streaming done.");
} }
@ -314,7 +322,7 @@ void DoResponseStreaming() {
GPR_ASSERT(response_stream_sizes.size() == i); GPR_ASSERT(response_stream_sizes.size() == i);
grpc::Status s = stream->Finish(); grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done."); gpr_log(GPR_INFO, "Response streaming done.");
} }
@ -346,7 +354,7 @@ void DoResponseStreamingWithSlowConsumer() {
GPR_ASSERT(kNumResponseMessages == i); GPR_ASSERT(kNumResponseMessages == i);
grpc::Status s = stream->Finish(); grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done."); gpr_log(GPR_INFO, "Response streaming done.");
} }
@ -379,7 +387,7 @@ void DoHalfDuplex() {
} }
GPR_ASSERT(response_stream_sizes.size() == i); GPR_ASSERT(response_stream_sizes.size() == i);
grpc::Status s = stream->Finish(); grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Half-duplex streaming rpc done."); gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
} }
@ -412,7 +420,7 @@ void DoPingPong() {
stream->WritesDone(); stream->WritesDone();
GPR_ASSERT(!stream->Read(&response)); GPR_ASSERT(!stream->Read(&response));
grpc::Status s = stream->Finish(); grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk()); AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Ping pong streaming done."); gpr_log(GPR_INFO, "Ping pong streaming done.");
} }

@ -54,13 +54,13 @@ extern "C" {
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "test/core/util/port.h" #include "test/core/util/port.h"
int test_client(const char *root, const char *host, int port) { int test_client(const char* root, const char* host, int port) {
int status; int status;
pid_t cli; pid_t cli;
cli = fork(); cli = fork();
if (cli == 0) { if (cli == 0) {
char *binary_path; char* binary_path;
char *port_arg; char* port_arg;
gpr_asprintf(&binary_path, "%s/interop_client", root); gpr_asprintf(&binary_path, "%s/interop_client", root);
gpr_asprintf(&port_arg, "--server_port=%d", port); gpr_asprintf(&port_arg, "--server_port=%d", port);
@ -78,9 +78,9 @@ int test_client(const char *root, const char *host, int port) {
return 0; return 0;
} }
int main(int argc, char **argv) { int main(int argc, char** argv) {
char *me = argv[0]; char* me = argv[0];
char *lslash = strrchr(me, '/'); char* lslash = strrchr(me, '/');
char root[1024]; char root[1024];
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
int status; int status;
@ -104,8 +104,8 @@ int main(int argc, char **argv) {
/* start the server */ /* start the server */
svr = fork(); svr = fork();
if (svr == 0) { if (svr == 0) {
char *binary_path; char* binary_path;
char *port_arg; char* port_arg;
gpr_asprintf(&binary_path, "%s/interop_server", root); gpr_asprintf(&binary_path, "%s/interop_server", root);
gpr_asprintf(&port_arg, "--port=%d", port); gpr_asprintf(&port_arg, "--port=%d", port);

@ -61,23 +61,23 @@ class ClientRpcContext {
virtual ~ClientRpcContext() {} virtual ~ClientRpcContext() {}
virtual bool RunNextState() = 0; // do next state, return false if steps done virtual bool RunNextState() = 0; // do next state, return false if steps done
virtual void StartNewClone() = 0; virtual void StartNewClone() = 0;
static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); } static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext *detag(void *t) { static ClientRpcContext* detag(void* t) {
return reinterpret_cast<ClientRpcContext *>(t); return reinterpret_cast<ClientRpcContext*>(t);
} }
virtual void report_stats(Histogram *hist) = 0; virtual void report_stats(Histogram* hist) = 0;
}; };
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext { class ClientRpcContextUnaryImpl : public ClientRpcContext {
public: public:
ClientRpcContextUnaryImpl( ClientRpcContextUnaryImpl(
TestService::Stub *stub, const RequestType &req, TestService::Stub* stub, const RequestType& req,
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &, TestService::Stub*, grpc::ClientContext*, const RequestType&,
void *)> start_req, void*)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
req_(req), req_(req),
@ -90,7 +90,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); }
void report_stats(Histogram *hist) GRPC_OVERRIDE { void report_stats(Histogram* hist) GRPC_OVERRIDE {
hist->Add((Timer::Now() - start_) * 1e9); hist->Add((Timer::Now() - start_) * 1e9);
} }
@ -113,13 +113,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return false; return false;
} }
grpc::ClientContext context_; grpc::ClientContext context_;
TestService::Stub *stub_; TestService::Stub* stub_;
RequestType req_; RequestType req_;
ResponseType response_; ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(); bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &, void *)> TestService::Stub*, grpc::ClientContext*, const RequestType&, void*)>
start_req_; start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
@ -129,13 +129,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
class AsyncClient GRPC_FINAL : public Client { class AsyncClient GRPC_FINAL : public Client {
public: public:
explicit AsyncClient(const ClientConfig &config) : Client(config) { explicit AsyncClient(const ClientConfig& config) : Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) { for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
} }
auto payload_size = config.payload_size(); auto payload_size = config.payload_size();
auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) { auto check_done = [payload_size](grpc::Status s, SimpleResponse* response) {
GPR_ASSERT(s.IsOk() && (response->payload().type() == GPR_ASSERT(s.IsOk() && (response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) && grpc::testing::PayloadType::COMPRESSABLE) &&
(response->payload().body().length() == (response->payload().body().length() ==
@ -144,16 +144,16 @@ class AsyncClient GRPC_FINAL : public Client {
int t = 0; int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto &channel : channels_) { for (auto& channel : channels_) {
auto *cq = cli_cqs_[t].get(); auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size(); t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest &request, void *tag) { const SimpleRequest& request, void* tag) {
return stub->AsyncUnaryCall(ctx, request, cq, tag); return stub->AsyncUnaryCall(ctx, request, cq, tag);
}; };
TestService::Stub *stub = channel.get_stub(); TestService::Stub* stub = channel.get_stub();
const SimpleRequest &request = request_; const SimpleRequest& request = request_;
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, request, start_req, check_done); stub, request, start_req, check_done);
} }
@ -165,9 +165,9 @@ class AsyncClient GRPC_FINAL : public Client {
~AsyncClient() GRPC_OVERRIDE { ~AsyncClient() GRPC_OVERRIDE {
EndThreads(); EndThreads();
for (auto &cq : cli_cqs_) { for (auto& cq : cli_cqs_) {
cq->Shutdown(); cq->Shutdown();
void *got_tag; void* got_tag;
bool ok; bool ok;
while (cq->Next(&got_tag, &ok)) { while (cq->Next(&got_tag, &ok)) {
delete ClientRpcContext::detag(got_tag); delete ClientRpcContext::detag(got_tag);
@ -175,12 +175,12 @@ class AsyncClient GRPC_FINAL : public Client {
} }
} }
void ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE { void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
void *got_tag; void* got_tag;
bool ok; bool ok;
cli_cqs_[thread_idx]->Next(&got_tag, &ok); cli_cqs_[thread_idx]->Next(&got_tag, &ok);
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState() == false) { if (ctx->RunNextState() == false) {
// call the callback and then delete it // call the callback and then delete it
ctx->report_stats(histogram); ctx->report_stats(histogram);
@ -193,7 +193,7 @@ class AsyncClient GRPC_FINAL : public Client {
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
}; };
std::unique_ptr<Client> CreateAsyncClient(const ClientConfig &args) { std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args) {
return std::unique_ptr<Client>(new AsyncClient(args)); return std::unique_ptr<Client>(new AsyncClient(args));
} }

@ -69,7 +69,7 @@ namespace gflags {}
using namespace google; using namespace google;
using namespace gflags; using namespace gflags;
int main(int argc, char **argv) { int main(int argc, char** argv) {
grpc_init(); grpc_init();
ParseCommandLineFlags(&argc, &argv, true); ParseCommandLineFlags(&argc, &argv, true);

@ -73,8 +73,8 @@ using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others, // In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both. // in gflags. This hack is enabling us to find both.
namespace google { } namespace google {}
namespace gflags { } namespace gflags {}
using namespace google; using namespace google;
using namespace gflags; using namespace gflags;

@ -62,9 +62,9 @@ namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig &config, int port) AsyncQpsServerTest(const ServerConfig& config, int port)
: srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL; char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder; ServerBuilder builder;
@ -87,10 +87,10 @@ class AsyncQpsServerTest : public Server {
threads_.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void *got_tag; void* got_tag;
while (srv_cq_.Next(&got_tag, &ok)) { while (srv_cq_.Next(&got_tag, &ok)) {
if (ok) { if (ok) {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext* ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState() == false) { if (ctx->RunNextState() == false) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
@ -105,7 +105,7 @@ class AsyncQpsServerTest : public Server {
~AsyncQpsServerTest() { ~AsyncQpsServerTest() {
server_->Shutdown(); server_->Shutdown();
srv_cq_.Shutdown(); srv_cq_.Shutdown();
for (auto &thr : threads_) { for (auto& thr : threads_) {
thr.join(); thr.join();
} }
while (!contexts_.empty()) { while (!contexts_.empty()) {
@ -122,21 +122,21 @@ class AsyncQpsServerTest : public Server {
virtual bool RunNextState() = 0; // do next state, return false if all done virtual bool RunNextState() = 0; // do next state, return false if all done
virtual void Reset() = 0; // start this back at a clean state virtual void Reset() = 0; // start this back at a clean state
}; };
static void *tag(ServerRpcContext *func) { static void* tag(ServerRpcContext* func) {
return reinterpret_cast<void *>(func); return reinterpret_cast<void*>(func);
} }
static ServerRpcContext *detag(void *tag) { static ServerRpcContext* detag(void* tag) {
return reinterpret_cast<ServerRpcContext *>(tag); return reinterpret_cast<ServerRpcContext*>(tag);
} }
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class ServerRpcContextUnaryImpl : public ServerRpcContext { class ServerRpcContextUnaryImpl : public ServerRpcContext {
public: public:
ServerRpcContextUnaryImpl( ServerRpcContextUnaryImpl(
std::function<void(ServerContext *, RequestType *, std::function<void(ServerContext*, RequestType*,
grpc::ServerAsyncResponseWriter<ResponseType> *, grpc::ServerAsyncResponseWriter<ResponseType>*,
void *)> request_method, void*)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)> std::function<grpc::Status(const RequestType*, ResponseType*)>
invoke_method) invoke_method)
: next_state_(&ServerRpcContextUnaryImpl::invoker), : next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method), request_method_(request_method),
@ -175,16 +175,16 @@ class AsyncQpsServerTest : public Server {
ServerContext srv_ctx_; ServerContext srv_ctx_;
RequestType req_; RequestType req_;
bool (ServerRpcContextUnaryImpl::*next_state_)(); bool (ServerRpcContextUnaryImpl::*next_state_)();
std::function<void(ServerContext *, RequestType *, std::function<void(ServerContext*, RequestType*,
grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
request_method_; request_method_;
std::function<grpc::Status(const RequestType *, ResponseType *)> std::function<grpc::Status(const RequestType*, ResponseType*)>
invoke_method_; invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
}; };
static Status UnaryCall(const SimpleRequest *request, static Status UnaryCall(const SimpleRequest* request,
SimpleResponse *response) { SimpleResponse* response) {
if (request->has_response_size() && request->response_size() > 0) { if (request->has_response_size() && request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(), if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) { response->mutable_payload())) {
@ -197,13 +197,13 @@ class AsyncQpsServerTest : public Server {
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_; std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *, std::function<void(ServerContext*, SimpleRequest*,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
request_unary_; request_unary_;
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext*> contexts_;
}; };
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config,
int port) { int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
} }

@ -72,8 +72,7 @@ std::shared_ptr<ChannelInterface> CreateTestChannel(
const grpc::string& connect_to = const grpc::string& connect_to =
server.empty() ? override_hostname : server; server.empty() ? override_hostname : server;
if (creds.get()) { if (creds.get()) {
channel_creds = channel_creds = CompositeCredentials(creds, channel_creds);
CompositeCredentials(creds, channel_creds);
} }
return CreateChannel(connect_to, channel_creds, channel_args); return CreateChannel(connect_to, channel_creds, channel_args);
} else { } else {

@ -36,7 +36,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
// Make sure the existing grpc_status_code match with grpc::Code. // Make sure the existing grpc_status_code match with grpc::Code.
int main(int argc, char **argv) { int main(int argc, char** argv) {
GPR_ASSERT(grpc::StatusCode::OK == GPR_ASSERT(grpc::StatusCode::OK ==
static_cast<grpc::StatusCode>(GRPC_STATUS_OK)); static_cast<grpc::StatusCode>(GRPC_STATUS_OK));
GPR_ASSERT(grpc::StatusCode::CANCELLED == GPR_ASSERT(grpc::StatusCode::CANCELLED ==

@ -0,0 +1,76 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Dockerfile to build protoc and plugins for inclusion in a release.
FROM grpc/base
# Add the file containing the gRPC version
ADD version.txt version.txt
# Install tools needed for building protoc.
RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev
# Get the protobuf source from GitHub.
RUN mkdir -p /var/local/git
RUN git clone https://github.com/google/protobuf.git /var/local/git/protobuf
# Build the protobuf library statically and install to /tmp/protoc_static.
WORKDIR /var/local/git/protobuf
RUN ./autogen.sh && \
./configure --disable-shared --prefix=/tmp/protoc_static \
LDFLAGS="-lgcc_eh -static-libgcc -static-libstdc++" && \
make -j12 && make check && make install
# Build the protobuf library dynamically and install to /usr/local.
WORKDIR /var/local/git/protobuf
RUN ./autogen.sh && \
./configure --prefix=/usr/local && \
make -j12 && make check && make install
# Build the grpc plugins.
RUN git clone https://github.com/google/grpc.git /var/local/git/grpc
WORKDIR /var/local/git/grpc
RUN LDFLAGS=-static make plugins
# Create an archive containing all the generated binaries.
RUN mkdir /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
RUN cp -v bins/opt/* /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
RUN cp -v /tmp/protoc_static/bin/protoc /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
RUN cd /tmp && \
tar -czf proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz proto-bins_$(cat /version.txt)_linux-$(uname -m)
# List the tar contents: provides a way to visually confirm that the contents
# are correct.
RUN echo 'proto-bins_$(cat /version.txt)_linux-tar-$(uname -m) contents:' && \
tar -ztf /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz

@ -36,7 +36,7 @@ echo $result_file_name
main() { main() {
source grpc_docker.sh source grpc_docker.sh
test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response)
auth_test_cases=(service_account_creds compute_engine_creds) auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds)
clients=(cxx java go ruby node csharp_mono) clients=(cxx java go ruby node csharp_mono)
for test_case in "${test_cases[@]}" for test_case in "${test_cases[@]}"
do do

@ -560,7 +560,7 @@ grpc_sync_scripts() {
_grpc_ensure_gcloud_ssh || return 1; _grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
local grpc_hosts grpc_gce_script_root local grpc_hosts grpc_gce_script_root
@ -600,7 +600,7 @@ grpc_sync_images() {
_grpc_ensure_gcloud_ssh || return 1; _grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
local grpc_hosts local grpc_hosts
@ -645,7 +645,7 @@ _grpc_show_servers_args() {
# Shows the grpc servers on the GCE instance <server_name> # Shows the grpc servers on the GCE instance <server_name>
grpc_show_servers() { grpc_show_servers() {
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# set by _grpc_show_servers # set by _grpc_show_servers
local host local host
@ -663,6 +663,58 @@ grpc_show_servers() {
gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" gcloud compute $project_opt ssh $zone_opt $host --command "$cmd"
} }
_grpc_build_proto_bins_args() {
[[ -n $1 ]] && { # host
host=$1
shift
} || {
host='grpc-docker-builder'
}
}
# grpc_build_proto_bins
#
# - rebuilds the dist_proto docker image
# * doing this builds the protoc and the ruby, python and cpp bins statically
#
# - runs a docker command that copies the built protos to the GCE host
# - copies the built protos to the local machine
grpc_build_proto_bins() {
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
# where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# set by _grpc_build_proto_bins_args
local host
# set the project zone and check that all necessary args are provided
_grpc_set_project_and_zone -f _grpc_build_proto_bins_args "$@" || return 1
gce_has_instance $grpc_project $host || return 1;
local project_opt="--project $grpc_project"
local zone_opt="--zone $grpc_zone"
# rebuild the dist_proto image
local label='dist_proto'
grpc_update_image -- -h $host $label || return 1
# run a command to copy the generated archive to the docker host
local docker_prefix='sudo docker run -v /tmp:/tmp/proto_bins_out'
local tar_name='proto-bins*.tar.gz'
local cp_cmd="/bin/bash -c 'cp -v /tmp/$tar_name /tmp/proto_bins_out'"
local cmd="$docker_prefix grpc/$label $cp_cmd"
local ssh_cmd="bash -l -c \"$cmd\""
echo "will run:"
echo " $ssh_cmd"
echo "on $host"
gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" || return 1
# copy the tar.gz locally
local rmt_tar="$host:/tmp/$tar_name"
local local_copy="$(pwd)"
gcloud compute copy-files $rmt_tar $local_copy $project_opt $zone_opt || return 1
}
_grpc_launch_servers_args() { _grpc_launch_servers_args() {
[[ -n $1 ]] && { # host [[ -n $1 ]] && { # host
host=$1 host=$1
@ -690,7 +742,7 @@ _grpc_launch_servers_args() {
# If no servers are specified, it launches all known servers # If no servers are specified, it launches all known servers
grpc_launch_servers() { grpc_launch_servers() {
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# set by _grpc_launch_servers_args # set by _grpc_launch_servers_args
local host servers local host servers
@ -811,7 +863,7 @@ test_runner() {
grpc_interop_test() { grpc_interop_test() {
_grpc_ensure_gcloud_ssh || return 1; _grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_interop_test_args # grpc_interop_test_args
@ -853,7 +905,7 @@ grpc_interop_test() {
grpc_cloud_prod_test() { grpc_cloud_prod_test() {
_grpc_ensure_gcloud_ssh || return 1; _grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_cloud_prod_test_args # grpc_cloud_prod_test_args
@ -892,7 +944,7 @@ grpc_cloud_prod_test() {
grpc_cloud_prod_auth_test() { grpc_cloud_prod_auth_test() {
_grpc_ensure_gcloud_ssh || return 1; _grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment # declare vars local so that they don't pollute the shell environment
# where they this func is used. # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_cloud_prod_test_args # grpc_cloud_prod_test_args
@ -1192,6 +1244,20 @@ grpc_cloud_prod_auth_compute_engine_creds_gen_cxx_cmd() {
echo $the_cmd echo $the_cmd
} }
# constructs the full dockerized cpp jwt_token auth interop test cmd.
#
# call-seq:
# flags= .... # generic flags to include the command
# cmd=$($grpc_gen_test_cmd $flags)
grpc_cloud_prod_auth_jwt_token_creds_gen_cxx_cmd() {
local cmd_prefix="sudo docker run grpc/cxx";
local test_script="/var/local/git/grpc/bins/opt/interop_client --enable_ssl --use_prod_roots";
local gfe_flags=$(_grpc_prod_gfe_flags)
local added_gfe_flags=$(_grpc_jwt_token_test_flags)
local the_cmd="$cmd_prefix $test_script $gfe_flags $added_gfe_flags $@";
echo $the_cmd
}
# constructs the full dockerized csharp-mono interop test cmd. # constructs the full dockerized csharp-mono interop test cmd.
# #
# call-seq: # call-seq:
@ -1230,6 +1296,11 @@ _grpc_svc_acc_test_flags() {
echo " --service_account_key_file=/service_account/stubbyCloudTestingTest-7dd63462c60c.json --oauth_scope=https://www.googleapis.com/auth/xapi.zoo" echo " --service_account_key_file=/service_account/stubbyCloudTestingTest-7dd63462c60c.json --oauth_scope=https://www.googleapis.com/auth/xapi.zoo"
} }
# outputs the flags passed to the service account auth tests
_grpc_jwt_token_test_flags() {
echo " --service_account_key_file=/service_account/stubbyCloudTestingTest-7dd63462c60c.json"
}
# default credentials test flag # default credentials test flag
_grpc_default_creds_test_flags() { _grpc_default_creds_test_flags() {
echo " --oauth_scope=https://www.googleapis.com/auth/xapi.zoo" echo " --oauth_scope=https://www.googleapis.com/auth/xapi.zoo"

Loading…
Cancel
Save