Merge github.com:grpc/grpc into an-update-on-c++

pull/581/head
Craig Tiller 10 years ago
commit 2d11c93ab5
  1. 38
      INSTALL
  2. 55
      Makefile
  3. 3
      build.json
  4. 144
      include/grpc++/async_unary_call.h
  5. 4
      include/grpc++/client_context.h
  6. 7
      include/grpc++/impl/client_unary_call.h
  7. 50
      include/grpc++/stream.h
  8. 19
      include/grpc/grpc.h
  9. 22
      src/compiler/cpp_generator.cc
  10. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  11. 6
      src/core/security/credentials.c
  12. 1
      src/core/surface/call.c
  13. 2
      src/core/surface/channel.c
  14. 14
      src/core/transport/chttp2_transport.c
  15. 19
      src/core/transport/metadata.c
  16. 3
      src/core/transport/metadata.h
  17. 26
      src/cpp/client/client_unary_call.cc
  18. 6
      src/csharp/GrpcApi/MathGrpc.cs
  19. 6
      src/csharp/GrpcApi/TestServiceGrpc.cs
  20. 18
      src/csharp/GrpcApiTests/MathClientServerTests.cs
  21. 58
      src/csharp/GrpcCore/Calls.cs
  22. 6
      src/csharp/GrpcCore/GrpcCore.csproj
  23. 2
      src/csharp/GrpcCore/GrpcEnvironment.cs
  24. 588
      src/csharp/GrpcCore/Internal/AsyncCall.cs
  25. 96
      src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs
  26. 138
      src/csharp/GrpcCore/Internal/CallSafeHandle.cs
  27. 15
      src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs
  28. 16
      src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
  29. 224
      src/csharp/GrpcCore/Internal/Event.cs
  30. 47
      src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
  31. 6
      src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
  32. 24
      src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
  33. 10
      src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs
  34. 31
      src/csharp/GrpcCore/Server.cs
  35. 47
      src/csharp/GrpcCore/ServerCallHandler.cs
  36. 73
      src/csharp/GrpcCoreTests/ClientServerTest.cs
  37. 7
      src/csharp/README.md
  38. 512
      src/csharp/ext/grpc_csharp_ext.c
  39. 51
      templates/Makefile.template
  40. 2
      test/core/channel/channel_stack_test.c
  41. 2
      test/core/channel/metadata_buffer_test.c
  42. 14
      test/core/security/credentials_test.c
  43. 2
      test/core/transport/chttp2/hpack_parser_test.c
  44. 6
      test/core/transport/chttp2/hpack_table_test.c
  45. 2
      test/core/transport/chttp2/stream_encoder_test.c
  46. 20
      test/core/transport/metadata_test.c
  47. 2
      test/core/transport/transport_end2end_tests.c
  48. 82
      test/cpp/end2end/async_end2end_test.cc
  49. 11
      tools/run_tests/run_tests.py

@ -95,6 +95,44 @@ will need clang and its instrumented libc++:
# apt-get install clang libc++-dev
Mac-specific notes:
-------------------
For a Mac system, git is not available by default. You will first need to
install Xcode from the Mac AppStore and then run the following command from a
terminal:
$ sudo xcode-select --install
You should also install "port" following the instructions at
https://www.macports.org . This will reside in /opt/local/bin/port for
most Mac installations. Do the "git submodule" command listed above.
Then execute the following for all the needed build dependencies
$ sudo /opt/local/bin/port install autoconf automake libtool gflags cmake
$ mkdir ~/gtest
$ svn checkout http://googletest.googlecode.com/svn/trunk/ gtest-svn
$ mkdir mybuild
$ cd mybuild
$ cmake ../gtest-svn
$ make
$ make gtest.a gtest_main.a
$ sudo cp libgtest.a libgtest_main.a /opt/local/lib
$ sudo mkdir /opt/local/include/gtest
$ sudo cp -pr ../gtest-svn/include/gtest /opt/local/include/gtest
We will also need to make openssl and install it appropriately
$ cd <git directory>
$ cd third_party/openssl
$ sudo make install
$ cd ../../
If you are going to make changes and need to regenerate the projects file,
you will need to install certain modules for python.
$ sudo easy_install simplejson mako
A word on OpenSSL
-----------------

@ -177,7 +177,9 @@ LDFLAGS += -g -fPIC
INCLUDES = . include $(GENDIR)
ifeq ($(SYSTEM),Darwin)
INCLUDES += /usr/local/ssl/include /opt/local/include
LIBS = m z
LDFLAGS += -L/usr/local/ssl/lib -L/opt/local/lib
else
LIBS = rt m z pthread
LDFLAGS += -pthread
@ -890,16 +892,19 @@ $(LIBDIR)/$(CONFIG)/protobuf/libprotobuf.a: third_party/protobuf/configure
static: static_c static_cxx
static_c: $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
static_c: $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
static_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++.a
shared: shared_c shared_cxx
shared_c: $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.$(SHARED_EXT)
shared_c: $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.$(SHARED_EXT)
shared_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT)
shared_csharp: shared_c $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT)
grpc_csharp_ext: shared_csharp
privatelibs: privatelibs_c privatelibs_cxx
privatelibs_c: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fake_security.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fullstack_uds.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_simple_ssl_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_simple_ssl_with_oauth2_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_socket_pair.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_socket_pair_one_byte_at_a_time.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_and_writes_closed.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_invoke.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_before_invoke.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_in_a_vacuum.a $(LIBDIR)/$(CONFIG)/libend2end_test_census_simple_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_disappearing_server.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_inflight_calls.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_tags.a $(LIBDIR)/$(CONFIG)/libend2end_test_empty_batch.a $(LIBDIR)/$(CONFIG)/libend2end_test_graceful_server_shutdown.a $(LIBDIR)/$(CONFIG)/libend2end_test_invoke_large_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_max_concurrent_streams.a $(LIBDIR)/$(CONFIG)/libend2end_test_no_op.a $(LIBDIR)/$(CONFIG)/libend2end_test_ping_pong_streaming.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_binary_metadata_and_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_metadata_and_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_large_metadata.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_delayed_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_thread_stress.a $(LIBDIR)/$(CONFIG)/libend2end_test_writes_done_hangs_with_pending_read.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_and_writes_closed_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_invoke_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_before_invoke_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_in_a_vacuum_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_census_simple_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_disappearing_server_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_inflight_calls_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_tags_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_graceful_server_shutdown_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_invoke_large_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_max_concurrent_streams_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_no_op_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_ping_pong_streaming_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_binary_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_trailing_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_large_metadata_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_delayed_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_thread_stress_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_writes_done_hangs_with_pending_read_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_certs.a
@ -1763,8 +1768,6 @@ ifeq ($(CONFIG),opt)
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[STRIP] Stripping libgrpc.a"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc.a
$(E) "[STRIP] Stripping libgrpc_csharp_ext.a"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.a
$(E) "[STRIP] Stripping libgrpc_unsecure.a"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
endif
@ -1781,8 +1784,6 @@ ifeq ($(CONFIG),opt)
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT)
$(E) "[STRIP] Stripping libgrpc.so"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT)
$(E) "[STRIP] Stripping libgrpc_csharp_ext.so"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT)
$(E) "[STRIP] Stripping libgrpc_unsecure.so"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.$(SHARED_EXT)
endif
@ -1793,6 +1794,12 @@ ifeq ($(CONFIG),opt)
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT)
endif
strip-shared_csharp: shared_csharp
ifeq ($(CONFIG),opt)
$(E) "[STRIP] Stripping libgrpc_csharp_ext.so"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT)
endif
ifeq ($(NO_PROTOC),true)
$(GENDIR)/examples/pubsub/empty.pb.cc: protoc_dep_error
else
@ -1911,6 +1918,10 @@ install_c: install-headers_c install-static_c install-shared_c
install_cxx: install-headers_cxx install-static_cxx install-shared_cxx
install_csharp: install-shared_csharp install_c
install_grpc_csharp_ext: install_csharp
install-headers: install-headers_c install-headers_cxx
install-headers_c:
@ -1928,8 +1939,6 @@ install-static_c: static_c strip-static_c
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgpr.a $(prefix)/lib/libgpr.a
$(E) "[INSTALL] Installing libgrpc.a"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc.a $(prefix)/lib/libgrpc.a
$(E) "[INSTALL] Installing libgrpc_csharp_ext.a"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.a $(prefix)/lib/libgrpc_csharp_ext.a
$(E) "[INSTALL] Installing libgrpc_unsecure.a"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(prefix)/lib/libgrpc_unsecure.a
@ -1960,17 +1969,6 @@ ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf libgrpc.$(SHARED_EXT) $(prefix)/lib/libgrpc.so
endif
endif
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing grpc_csharp_ext.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/grpc_csharp_ext.$(SHARED_EXT)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext-imp.a $(prefix)/lib/libgrpc_csharp_ext-imp.a
else
$(E) "[INSTALL] Installing libgrpc_csharp_ext.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/libgrpc_csharp_ext.$(SHARED_EXT)
ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf libgrpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/libgrpc_csharp_ext.so
endif
endif
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing grpc_unsecure.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT) $(prefix)/lib/grpc_unsecure.$(SHARED_EXT)
@ -2006,6 +2004,24 @@ ifneq ($(SYSTEM),Darwin)
endif
endif
install-shared_csharp: shared_csharp strip-shared_csharp
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing grpc_csharp_ext.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/grpc_csharp_ext.$(SHARED_EXT)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext-imp.a $(prefix)/lib/libgrpc_csharp_ext-imp.a
else
$(E) "[INSTALL] Installing libgrpc_csharp_ext.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/libgrpc_csharp_ext.$(SHARED_EXT)
ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf libgrpc_csharp_ext.$(SHARED_EXT) $(prefix)/lib/libgrpc_csharp_ext.so
endif
endif
ifneq ($(SYSTEM),MINGW32)
ifneq ($(SYSTEM),Darwin)
$(Q) ldconfig
endif
endif
clean:
$(Q) $(RM) -rf $(OBJDIR) $(LIBDIR) $(BINDIR) $(GENDIR)
@ -3007,6 +3023,7 @@ LIBGRPC++_SRC = \
src/cpp/util/time.cc \
PUBLIC_HEADERS_CXX += \
include/grpc++/async_unary_call.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
include/grpc++/client_context.h \

@ -345,7 +345,7 @@
{
"name": "grpc_csharp_ext",
"build": "all",
"language": "c",
"language": "csharp",
"src": [
"src/csharp/ext/grpc_csharp_ext.c"
],
@ -398,6 +398,7 @@
"build": "all",
"language": "c++",
"public_headers": [
"include/grpc++/async_unary_call.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/client_context.h",

@ -0,0 +1,144 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_ASYNC_UNARY_CALL_H__
#define __GRPCPP_ASYNC_UNARY_CALL_H__
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
namespace grpc {
template <class R>
class ClientAsyncResponseReader final {
public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const google::protobuf::Message& request, void* tag)
: context_(context),
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(msg);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_ = nullptr;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
void BindCall(Call* call) override { call_ = *call; }
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
} // namespace grpc
#endif // __GRPCPP_ASYNC_UNARY_CALL_H__

@ -72,6 +72,8 @@ template <class W>
class ClientAsyncWriter;
template <class R, class W>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
class ClientContext {
public:
@ -119,6 +121,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {

@ -48,13 +48,6 @@ class CompletionQueue;
class RpcMethod;
class Status;
// Wrapper that begins an asynchronous unary call
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,

@ -550,56 +550,6 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
CallOpBuffer finish_buf_;
};
// TODO(yangg) Move out of stream.h
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
void BindCall(Call* call) override { call_ = *call; }
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
template <class W, class R>
class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {

@ -354,10 +354,18 @@ typedef struct grpc_op {
} data;
} grpc_op;
/* Initialize the grpc library */
/* Initialize the grpc library.
It is not safe to call any other grpc functions before calling this.
(To avoid overhead, little checking is done, and some things may work. We
do not warrant that they will continue to do so in future revisions of this
library). */
void grpc_init(void);
/* Shut down the grpc library */
/* Shut down the grpc library.
No memory is used by grpc after this call returns, nor are any instructions
executing within the grpc library.
Prior to calling, all application owned grpc objects must have been
destroyed. */
void grpc_shutdown(void);
grpc_completion_queue *grpc_completion_queue_create(void);
@ -386,7 +394,12 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
void grpc_event_finish(grpc_event *event);
/* Begin destruction of a completion queue. Once all possible events are
drained it's safe to call grpc_completion_queue_destroy. */
drained then grpc_completion_queue_next will start to produce
GRPC_QUEUE_SHUTDOWN events only. At that point it's safe to call
grpc_completion_queue_destroy.
After calling this function applications should ensure that no
NEW work is added to be published on this completion queue. */
void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
/* Destroy a completion queue. The caller must ensure that the queue is

@ -126,6 +126,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
"class RpcService;\n"
"class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
"template <class OutMessage> class ClientAsyncResponseReader;\n");
temp.append(
"template <class OutMessage> class ServerAsyncResponseWriter;\n");
}
@ -160,7 +162,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
}
std::string GetSourceIncludes() {
return "#include <grpc++/channel_interface.h>\n"
return "#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/channel_interface.h>\n"
"#include <grpc++/impl/client_unary_call.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/rpc_service_method.h>\n"
@ -181,9 +184,9 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
@ -378,14 +381,15 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"context, request, response);\n"
"}\n\n");
printer->Print(*vars,
"void $Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, ::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" ::grpc::AsyncUnaryCall(channel(),"
" return new ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, response, status, cq, tag);\n"
"context, request, tag);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(

@ -93,7 +93,7 @@ static int multipoll_with_epoll_pollset_maybe_work(
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
* if (pollset->counter == 0) { return 0 }
* if (pollset->counter != 0) { return 0; }
* here.
*/

@ -313,7 +313,7 @@ static void oauth2_token_fetcher_destroy(grpc_credentials *creds) {
grpc_mdelem_unref(c->access_token_md);
}
gpr_mu_destroy(&c->mu);
grpc_mdctx_orphan(c->md_ctx);
grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@ -587,7 +587,7 @@ static void fake_oauth2_destroy(grpc_credentials *creds) {
if (c->access_token_md != NULL) {
grpc_mdelem_unref(c->access_token_md);
}
grpc_mdctx_orphan(c->md_ctx);
grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}
@ -897,7 +897,7 @@ static void iam_destroy(grpc_credentials *creds) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
grpc_mdelem_unref(c->token_md);
grpc_mdelem_unref(c->authority_selector_md);
grpc_mdctx_orphan(c->md_ctx);
grpc_mdctx_unref(c->md_ctx);
gpr_free(c);
}

@ -313,7 +313,6 @@ static void set_status_code(grpc_call *call, status_source source,
}
if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status);
grpc_bbq_flush(&call->incoming_queue);
}
}

@ -146,7 +146,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
grpc_mdctx_orphan(channel->metadata_context);
grpc_mdctx_unref(channel->metadata_context);
gpr_free(channel);
}

@ -336,11 +336,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
static void unref_transport(transport *t) {
static void destruct_transport(transport *t) {
size_t i;
if (!gpr_unref(&t->refs)) return;
gpr_mu_lock(&t->mu);
GPR_ASSERT(t->ep == NULL);
@ -380,9 +378,16 @@ static void unref_transport(transport *t) {
grpc_sopb_destroy(&t->nuke_later_sopb);
grpc_mdctx_unref(t->metadata_context);
gpr_free(t);
}
static void unref_transport(transport *t) {
if (!gpr_unref(&t->refs)) return;
destruct_transport(t);
}
static void ref_transport(transport *t) { gpr_ref(&t->refs); }
static void init_transport(transport *t, grpc_transport_setup_callback setup,
@ -401,6 +406,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_ref_init(&t->refs, 2);
gpr_mu_init(&t->mu);
gpr_cv_init(&t->cv);
grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx;
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
@ -1025,8 +1031,6 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
gpr_log(GPR_DEBUG, "cancel %d", id);
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb.nops != 0;

@ -79,7 +79,7 @@ typedef struct internal_metadata {
struct grpc_mdctx {
gpr_uint32 hash_seed;
int orphaned;
int refs;
gpr_mu mu;
@ -114,7 +114,7 @@ static void unlock(grpc_mdctx *ctx) {
mdelems on every unlock (instead of the usual 'I'm too loaded' trigger
case), since otherwise we can be stuck waiting for a garbage collection
that will never happen. */
if (ctx->orphaned) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
/* gc_mdtab(ctx); */
@ -139,7 +139,7 @@ static void ref_md(internal_metadata *md) {
grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed) {
grpc_mdctx *ctx = gpr_malloc(sizeof(grpc_mdctx));
ctx->orphaned = 0;
ctx->refs = 1;
ctx->hash_seed = seed;
gpr_mu_init(&ctx->mu);
ctx->strtab = gpr_malloc(sizeof(internal_string *) * INITIAL_STRTAB_CAPACITY);
@ -197,10 +197,17 @@ static void metadata_context_destroy(grpc_mdctx *ctx) {
gpr_free(ctx);
}
void grpc_mdctx_orphan(grpc_mdctx *ctx) {
void grpc_mdctx_ref(grpc_mdctx *ctx) {
lock(ctx);
GPR_ASSERT(!ctx->orphaned);
ctx->orphaned = 1;
GPR_ASSERT(ctx->refs > 0);
ctx->refs++;
unlock(ctx);
}
void grpc_mdctx_unref(grpc_mdctx *ctx) {
lock(ctx);
GPR_ASSERT(ctx->refs > 0);
ctx->refs--;
unlock(ctx);
}

@ -84,7 +84,8 @@ struct grpc_mdelem {
/* Create/orphan a metadata context */
grpc_mdctx *grpc_mdctx_create(void);
grpc_mdctx *grpc_mdctx_create_with_seed(gpr_uint32 seed);
void grpc_mdctx_orphan(grpc_mdctx *mdctx);
void grpc_mdctx_ref(grpc_mdctx *mdctx);
void grpc_mdctx_unref(grpc_mdctx *mdctx);
/* Test only accessors to internal state - only for testing this code - do not
rely on it outside of metadata_test.c */

@ -61,30 +61,4 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
return status;
}
class ClientAsyncRequest final : public CallOpBuffer {
public:
bool FinalizeResult(void **tag, bool *status) override {
bool r = CallOpBuffer::FinalizeResult(tag, status);
delete this;
return r;
}
};
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag) {
ClientAsyncRequest *buf = new ClientAsyncRequest;
buf->Reset(tag);
Call call(channel->CreateCall(method, context, cq));
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
buf->AddRecvInitialMetadata(context);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
buf->AddClientRecvStatus(context, status);
call.PerformOps(buf);
}
} // namespace grpc

@ -81,7 +81,7 @@ namespace math
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
@ -109,10 +109,10 @@ namespace math
return Calls.AsyncUnaryCall(call, request, token);
}
public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel);
return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))

@ -99,7 +99,7 @@ namespace grpc.testing
Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
@ -141,9 +141,9 @@ namespace grpc.testing
return Calls.AsyncUnaryCall(call, request, token);
}
public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel);
return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))

@ -64,6 +64,15 @@ namespace math.Tests
client = MathGrpc.NewStub(channel);
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
public void Div1()
{
@ -136,15 +145,6 @@ namespace math.Tests
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -47,50 +47,42 @@ namespace Google.GRPC.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
//TODO: implement this in real synchronous style once new GRPC C core API is available.
return AsyncUnaryCall(call, req, token).Result;
//TODO: implement this in real synchronous style.
try {
return AsyncUnaryCall(call, req, token).Result;
} catch(AggregateException ae) {
foreach (var e in ae.InnerExceptions)
{
if (e is RpcException)
{
throw e;
}
}
throw;
}
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
TResponse response = await asyncCall.ReadAsync();
Status status = await asyncCall.Finished;
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
throw new RpcException(status);
}
return response;
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
return await asyncCall.UnaryCallAsync(req);
}
public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
asyncCall.StartServerStreamingCall(req, outputs);
}
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
var task = asyncCall.ReadAsync();
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
var task = asyncCall.ClientStreamingCallAsync();
var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
}
@ -102,12 +94,10 @@ namespace Google.GRPC.Core
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
asyncCall.StartReadingToStream(outputs);
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
return inputs;
asyncCall.StartDuplexStreamingCall(outputs);
return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
}
private static CompletionQueueSafeHandle GetCompletionQueue() {

@ -47,21 +47,21 @@
<Compile Include="Internal\ChannelSafeHandle.cs" />
<Compile Include="Internal\CompletionQueueSafeHandle.cs" />
<Compile Include="Internal\Enums.cs" />
<Compile Include="Internal\Event.cs" />
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
<Compile Include="Internal\ServerWritingObserver.cs" />
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
<Compile Include="Utils\RecordingQueue.cs" />
<Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -42,7 +42,7 @@ namespace Google.GRPC.Core
/// </summary>
public class GrpcEnvironment
{
const int THREAD_POOL_SIZE = 1;
const int THREAD_POOL_SIZE = 4;
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_init();

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -42,171 +42,177 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Listener for call events that can be delivered from a completion queue.
/// Handles native call lifecycle and provides convenience methods.
/// </summary>
internal interface ICallEventListener {
void OnClientMetadata();
void OnRead(byte[] payload);
void OnWriteAccepted(GRPCOpError error);
void OnFinishAccepted(GRPCOpError error);
// ignore the status on server
void OnFinished(Status status);
}
/// <summary>
/// Handle native call lifecycle and provides convenience methods.
/// </summary>
internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable
internal class AsyncCall<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate callbackHandler;
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
readonly CompletionCallbackDelegate writeFinishedHandler;
readonly CompletionCallbackDelegate readFinishedHandler;
readonly CompletionCallbackDelegate halfclosedHandler;
readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
bool disposed;
GCHandle gchandle;
CallSafeHandle call;
bool disposed;
bool server;
bool started;
bool errorOccured;
bool cancelRequested;
bool readingDone;
bool halfcloseRequested;
bool halfclosed;
bool doneWithReading;
Nullable<Status> finishedStatus;
bool finished;
// Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
// Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
// Completion of a pending halfclose if not null.
TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
TaskCompletionSource<TRead> unaryResponseTcs;
// Set after status is received on client. Only used for server streaming and duplex streaming calls.
Nullable<Status> finishedStatus;
TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
// For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.callbackHandler = HandleEvent;
this.unaryResponseHandler = HandleUnaryResponse;
this.finishedHandler = HandleFinished;
this.writeFinishedHandler = HandleWriteFinished;
this.readFinishedHandler = HandleReadFinished;
this.halfclosedHandler = HandleHalfclosed;
this.finishedServersideHandler = HandleFinishedServerside;
}
public Task WriteAsync(TWrite msg)
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
return StartWrite(msg, false).Task;
InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
}
public Task WritesCompletedAsync()
public void InitializeServer(CallSafeHandle call)
{
WritesDone();
return halfcloseTcs.Task;
InitializeInternal(call, true);
}
public Task WriteStatusAsync(Status status)
public Task<TRead> UnaryCallAsync(TWrite msg)
{
WriteStatus(status);
return halfcloseTcs.Task;
}
lock (myLock)
{
started = true;
halfcloseRequested = true;
readingDone = true;
public Task<TRead> ReadAsync()
{
return StartRead().Task;
}
// TODO: handle serialization error...
byte[] payload = serializer(msg);
public Task Halfclosed
{
get
{
return halfcloseTcs.Task;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
public Task<Status> Finished
public Task<TRead> ClientStreamingCallAsync()
{
get
lock (myLock)
{
return finishedTcs.Task;
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
/// <summary>
/// Initiates reading to given observer.
/// </summary>
public void StartReadingToStream(IObserver<TRead> readObserver) {
public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
{
lock (myLock)
{
CheckStarted();
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
started = true;
halfcloseRequested = true;
this.readObserver = readObserver;
StartRead();
}
}
public void Initialize(Channel channel, String methodName) {
lock (myLock)
{
this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture);
// TODO: handle serialization error...
byte[] payload = serializer(msg);
call.StartServerStreaming(payload, finishedHandler);
ReceiveMessageAsync();
}
}
public void InitializeServer(CallSafeHandle call)
public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
{
lock(myLock)
lock (myLock)
{
this.call = call;
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
ReceiveMessageAsync();
}
}
// Client only
public void Start(bool buffered, CompletionQueueSafeHandle cq)
public Task ServerSideUnaryRequestCallAsync()
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.Invoke(cq, buffered, callbackHandler, callbackHandler);
started = true;
call.StartServerSide(finishedServersideHandler);
return finishedServersideTcs.Task;
}
}
// Server only
public void Accept(CompletionQueueSafeHandle cq)
public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
{
lock (myLock)
{
if (started)
started = true;
call.StartServerSide(finishedServersideHandler);
if (this.readObserver != null)
{
throw new InvalidOperationException("Already started.");
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
ReceiveMessageAsync();
call.ServerAccept(cq, callbackHandler);
call.ServerEndInitialMetadata(0);
started = true;
return finishedServersideTcs.Task;
}
}
public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered)
public Task SendMessageAsync(TWrite msg)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
@ -219,63 +225,62 @@ namespace Google.GRPC.Core.Internal
// TODO: wrap serialization...
byte[] payload = serializer(msg);
call.StartWrite(payload, buffered, callbackHandler);
call.StartSendMessage(payload, writeFinishedHandler);
writeTcs = new TaskCompletionSource<object>();
return writeTcs;
return writeTcs.Task;
}
}
// client only
public void WritesDone()
public Task SendCloseFromClientAsync()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.WritesDone(callbackHandler);
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
// server only
public void WriteStatus(Status status)
public Task SendStatusFromServerAsync(Status status)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartWriteStatus(status, callbackHandler);
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
public TaskCompletionSource<TRead> StartRead()
public Task<TRead> ReceiveMessageAsync()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
// TODO: add check for not cancelled?
if (doneWithReading)
if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
@ -285,10 +290,10 @@ namespace Google.GRPC.Core.Internal
throw new InvalidOperationException("Only one read can be pending at a time");
}
call.StartRead(callbackHandler);
call.StartReceiveMessage(readFinishedHandler);
readTcs = new TaskCompletionSource<TRead>();
return readTcs;
return readTcs.Task;
}
}
@ -296,9 +301,8 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
@ -309,218 +313,304 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
public void OnClientMetadata()
private void InitializeInternal(CallSafeHandle call, bool server)
{
// TODO: implement....
lock (myLock)
{
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
this.server = server;
}
}
public void OnRead(byte[] payload)
private void CheckStarted()
{
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
lock (myLock)
if (!started)
{
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
doneWithReading = true;
}
observer = readObserver;
throw new InvalidOperationException("Call not started");
}
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
oldTcs.SetResult(msg);
// TODO: make sure we deliver reads in the right order.
private void CheckNotDisposed()
{
if (disposed)
{
throw new InvalidOperationException("Call has already been disposed.");
}
}
if (observer != null)
private void CheckNoError()
{
if (errorOccured)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
throw new InvalidOperationException("Error occured when processing call.");
}
}
// start a new read
StartRead();
}
else
private bool ReleaseResourcesIfPossible()
{
if (!disposed && call != null)
{
if (halfclosed && readingDone && finished)
{
// TODO: wrap to handle exceptions;
observer.OnCompleted();
ReleaseResources();
return true;
}
}
return false;
}
private void ReleaseResources()
{
if (call != null) {
call.Dispose();
}
gchandle.Free();
disposed = true;
}
public void OnWriteAccepted(GRPCOpError error)
private void CompleteStreamObserver(Status status)
{
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
// TODO: wrap to handle exceptions;
readObserver.OnError(new RpcException(status));
} else {
// TODO: wrap to handle exceptions;
readObserver.OnCompleted();
}
}
if (errorOccured)
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
TaskCompletionSource<TRead> tcs;
lock(myLock)
{
finished = true;
halfclosed = true;
tcs = unaryResponseTcs;
ReleaseResourcesIfPossible();
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK)
{
tcs.SetException(new RpcException(
new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
));
return;
}
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
tcs.SetException(new RpcException(status));
return;
}
// TODO: handle deserialize error...
var msg = deserializer(ctx.GetReceivedMessage());
tcs.SetResult(msg);
}
catch(Exception e)
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
public void OnFinishAccepted(GRPCOpError error)
private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
{
lock (myLock)
try
{
UpdateErrorOccured(error);
halfclosed = true;
}
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
oldTcs = writeTcs;
writeTcs = null;
}
if (errorOccured)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
if (errorOccured)
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
}
}
else
catch(Exception e)
{
halfcloseTcs.SetResult(null);
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
public void OnFinished(Status status)
private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
{
lock (myLock)
try
{
finishedStatus = status;
lock (myLock)
{
halfclosed = true;
DisposeResourcesIfNeeded();
}
finishedTcs.SetResult(status);
ReleaseResourcesIfPossible();
}
}
if (error != GRPCOpError.GRPC_OP_OK)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
else
{
halfcloseTcs.SetResult(null);
}
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
protected virtual void Dispose(bool disposing)
private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
{
if (!disposed)
try
{
if (disposing)
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var payload = ctx.GetReceivedMessage();
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
Nullable<Status> status = null;
lock (myLock)
{
if (call != null)
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
call.Dispose();
readingDone = true;
}
observer = readObserver;
status = finishedStatus;
}
disposed = true;
}
}
private void UpdateErrorOccured(GRPCOpError error)
{
if (error == GRPCOpError.GRPC_OP_ERROR)
{
errorOccured = true;
}
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
private void CheckStarted()
{
if (!started)
{
throw new InvalidOperationException("Call not started");
}
}
oldTcs.SetResult(msg);
private void CheckNoError()
{
if (errorOccured)
// TODO: make sure we deliver reads in the right order.
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
// start a new read
ReceiveMessageAsync();
}
else
{
if (!server)
{
if (status.HasValue)
{
CompleteStreamObserver(status.Value);
}
}
else
{
// TODO: wrap to handle exceptions..
observer.OnCompleted();
}
// TODO: completeStreamObserver serverside...
}
}
}
catch(Exception e)
{
throw new InvalidOperationException("Error occured when processing call.");
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void CheckNotFinished()
private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
{
if (finishedStatus.HasValue)
try
{
throw new InvalidOperationException("Already finished.");
}
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var status = ctx.GetReceivedStatus();
private void CheckCancelNotRequested()
{
if (cancelRequested)
bool wasReadingDone;
lock (myLock)
{
finished = true;
finishedStatus = status;
wasReadingDone = readingDone;
ReleaseResourcesIfPossible();
}
if (wasReadingDone) {
CompleteStreamObserver(status);
}
}
catch(Exception e)
{
throw new InvalidOperationException("Cancel has been requested.");
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void DisposeResourcesIfNeeded()
private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
{
if (call != null && started && finishedStatus.HasValue)
try
{
// TODO: should we also wait for all the pending events to finish?
call.Dispose();
}
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
private void HandleEvent(IntPtr eventPtr) {
try {
var ev = new EventSafeHandleNotOwned(eventPtr);
switch (ev.GetCompletionType())
lock(myLock)
{
case GRPCCompletionType.GRPC_CLIENT_METADATA_READ:
OnClientMetadata();
break;
case GRPCCompletionType.GRPC_READ:
byte[] payload = ev.GetReadData();
OnRead(payload);
break;
finished = true;
case GRPCCompletionType.GRPC_WRITE_ACCEPTED:
OnWriteAccepted(ev.GetWriteAccepted());
break;
// TODO: because of the way server calls are implemented, we need to set
// reading done to true here. Should be fixed in the future.
readingDone = true;
case GRPCCompletionType.GRPC_FINISH_ACCEPTED:
OnFinishAccepted(ev.GetFinishAccepted());
break;
ReleaseResourcesIfPossible();
}
// TODO: handle error ...
case GRPCCompletionType.GRPC_FINISHED:
OnFinished(ev.GetFinished());
break;
finishedServersideTcs.SetResult(null);
default:
throw new ArgumentException("Unexpected completion type");
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
}
}
}

@ -0,0 +1,96 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
}
public Status GetReceivedStatus()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this));
return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details);
}
public byte[] GetReceivedMessage()
{
IntPtr len = grpcsharp_batch_context_recv_message_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetServerRpcNewCall() {
return grpcsharp_batch_context_server_rpc_new_call(this);
}
public string GetServerRpcNewMethod() {
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
}
}
}

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -38,8 +38,8 @@ using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void EventCallbackDelegate(IntPtr eventPtr);
//TODO: rename the delegate
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal
const UInt32 GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline);
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags);
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")]
static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback,
UInt32 flags);
static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")]
static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback);
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags);
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")]
static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")]
static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")]
static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call,
byte[] buffer, UIntPtr length,
IntPtr tag, UInt32 flags);
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")]
static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call,
byte[] buffer, UIntPtr length,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback,
UInt32 flags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
private CallSafeHandle()
{
}
/// <summary>
/// Creates a client call.
/// </summary>
public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline)
{
return grpcsharp_channel_create_call_old(channel, method, host, deadline);
}
public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered)
{
AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered)));
}
public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback)
{
AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered)));
}
public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag)
private CallSafeHandle()
{
AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag));
}
public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback)
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback));
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public void ServerEndInitialMetadata(UInt32 flags)
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags));
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void StartWrite(byte[] payload, IntPtr tag, bool buffered)
public void StartClientStreaming(CompletionCallbackDelegate callback)
{
grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered));
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));
}
public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback)
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback)
{
grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered));
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void StartWriteStatus(Status status, IntPtr tag)
public void StartDuplexStreaming(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag));
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback));
}
public void StartWriteStatus(Status status, EventCallbackDelegate callback)
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback));
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void WritesDone(IntPtr tag)
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_writes_done_old(this, tag));
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
public void WritesDone(EventCallbackDelegate callback)
public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback));
AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
}
public void StartRead(IntPtr tag)
public void StartReceiveMessage(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_read_old(this, tag));
AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
public void StartRead(EventCallbackDelegate callback)
public void StartServerSide(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback));
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public void Cancel()
@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}
}

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public StreamingInputObserver(AsyncCall<TWrite, TRead> call)
public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WritesCompletedAsync().Wait();
call.SendCloseFromClientAsync().Wait();
}
public void OnError(Exception error)
@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
call.SendMessageAsync(value).Wait();
}
}
}

@ -45,12 +45,6 @@ namespace Google.GRPC.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
[DllImport("grpc_csharp_ext.dll")]
static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal
return grpcsharp_completion_queue_create();
}
public EventSafeHandle Next(Timespec deadline)
{
return grpcsharp_completion_queue_next(this, deadline);
}
public GRPCCompletionType NextWithCallback()
{
return grpcsharp_completion_queue_next_with_callback(this);
}
public EventSafeHandle Pluck(IntPtr tag, Timespec deadline)
{
return grpcsharp_completion_queue_pluck(this, tag, deadline);
}
public void Shutdown()
{
grpcsharp_completion_queue_shutdown(this);

@ -1,224 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// grpc_event from grpc/grpc.h
/// </summary>
internal class EventSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_finish(IntPtr ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char*
public GRPCCompletionType GetCompletionType()
{
return grpcsharp_event_type(this);
}
public GRPCOpError GetWriteAccepted()
{
return grpcsharp_event_write_accepted(this);
}
public GRPCOpError GetFinishAccepted()
{
return grpcsharp_event_finish_accepted(this);
}
public Status GetFinished()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
return new Status(grpcsharp_event_finished_status(this), details);
}
public byte[] GetReadData()
{
IntPtr len = grpcsharp_event_read_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetCall() {
return grpcsharp_event_call(this);
}
public string GetServerRpcNewMethod() {
// TODO: can the native method return string directly?
return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
}
//TODO: client_metadata_read event type
protected override bool ReleaseHandle()
{
grpcsharp_event_finish(handle);
return true;
}
}
// TODO: this is basically c&p of EventSafeHandle. Unify!
/// <summary>
/// Not owned version of
/// grpc_event from grpc/grpc.h
/// </summary>
internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_finish(IntPtr ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char*
public EventSafeHandleNotOwned() : base(false)
{
}
public EventSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
}
public GRPCCompletionType GetCompletionType()
{
return grpcsharp_event_type(this);
}
public GRPCOpError GetWriteAccepted()
{
return grpcsharp_event_write_accepted(this);
}
public GRPCOpError GetFinishAccepted()
{
return grpcsharp_event_finish_accepted(this);
}
public Status GetFinished()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
return new Status(grpcsharp_event_finished_status(this), details);
}
public byte[] GetReadData()
{
IntPtr len = grpcsharp_event_read_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetCall() {
return grpcsharp_event_call(this);
}
public string GetServerRpcNewMethod() {
// TODO: can the native method return string directly?
return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
}
//TODO: client_metadata_read event type
protected override bool ReleaseHandle()
{
grpcsharp_event_finish(handle);
return true;
}
}
}

@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly Action<EventSafeHandle> eventHandler;
CompletionQueueSafeHandle cq;
@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal
this.poolSize = poolSize;
}
internal GrpcThreadPool(int poolSize, Action<EventSafeHandle> eventHandler) {
this.poolSize = poolSize;
this.eventHandler = eventHandler;
}
public void Start() {
lock (myLock)
@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal
}
}
private Thread CreateAndStartThread(int i) {
Action body;
if (eventHandler != null)
{
body = ThreadBodyWithHandler;
}
else
{
body = ThreadBodyNoHandler;
}
var thread = new Thread(new ThreadStart(body));
private Thread CreateAndStartThread(int i)
{
var thread = new Thread(new ThreadStart(RunHandlerLoop));
thread.IsBackground = false;
thread.Start();
if (eventHandler != null)
{
thread.Name = "grpc_server_newrpc " + i;
}
else
{
thread.Name = "grpc " + i;
}
thread.Name = "grpc " + i;
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void ThreadBodyNoHandler()
private void RunHandlerLoop()
{
GRPCCompletionType completionType;
do
@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal
} while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void ThreadBodyWithHandler()
{
GRPCCompletionType completionType;
do
{
using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) {
completionType = ev.GetCompletionType();
eventHandler(ev);
}
} while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}
}

@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal
return handle == IntPtr.Zero;
}
}
protected override bool ReleaseHandle()
{
// handle is not owned.
return true;
}
}
}

@ -38,24 +38,22 @@ using System.Collections.Concurrent;
namespace Google.GRPC.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")]
static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
// TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
// TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr);
static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
@ -63,8 +61,9 @@ namespace Google.GRPC.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
// TODO: get rid of the old callback style
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")]
static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
@ -81,7 +80,6 @@ namespace Google.GRPC.Core.Internal
public int AddPort(string addr)
{
// TODO: also grpc_server_add_secure_http2_port...
return grpcsharp_server_add_http2_port(this, addr);
}
@ -95,14 +93,14 @@ namespace Google.GRPC.Core.Internal
grpcsharp_server_shutdown(this);
}
public void ShutdownAndNotify(EventCallbackDelegate callback)
public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback)
{
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(EventCallbackDelegate callback)
public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
return grpcsharp_server_request_call_old_CALLBACK(this, callback);
return grpcsharp_server_request_call(this, cq, callback);
}
protected override bool ReleaseHandle()

@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
}
public void OnError(Exception error)
{
// TODO: handle this...
// TODO: implement this...
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
call.SendMessageAsync(value).Wait();
}
}
}

@ -49,8 +49,8 @@ namespace Google.GRPC.Core
{
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate newRpcHandler;
readonly EventCallbackDelegate serverShutdownHandler;
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
@ -61,9 +61,8 @@ namespace Google.GRPC.Core
public Server()
{
// TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newRpcHandler = HandleNewRpc;
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
@ -99,7 +98,7 @@ namespace Google.GRPC.Core
{
var rpcInfo = newRpcQueue.Take();
Console.WriteLine("Server received RPC " + rpcInfo.Method);
//Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
@ -138,23 +137,25 @@ namespace Google.GRPC.Core
private void AllowOneRpc()
{
AssertCallOk(handle.RequestCall(newRpcHandler));
AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
}
private void HandleNewRpc(IntPtr eventPtr)
{
try
{
var ev = new EventSafeHandleNotOwned(eventPtr);
var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod());
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) {
try {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK) {
// TODO: handle error
}
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) {
newRpcQueue.Add(rpcInfo);
}
}
catch (Exception e)
{
} catch(Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}

@ -59,15 +59,16 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
var request = asyncCall.ReadAsync().Result;
var request = asyncCall.ReceiveMessageAsync().Result;
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
handler(request, responseObserver);
asyncCall.Halfclosed.Wait();
asyncCall.Finished.Wait();
finishedTask.Wait();
}
}
@ -89,16 +90,11 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
// feed the requests
asyncCall.StartReadingToStream(requestObserver);
asyncCall.Halfclosed.Wait();
asyncCall.Finished.Wait();
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
finishedTask.Wait();
}
}
@ -110,12 +106,31 @@ namespace Google.GRPC.Core
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
asyncCall.Finished.Wait();
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
finishedTask.Wait();
}
}
internal class NullObserver<T> : IObserver<T>
{
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(T value)
{
}
}
}

@ -36,6 +36,7 @@ using NUnit.Framework;
using Google.GRPC.Core;
using Google.GRPC.Core.Internal;
using System.Threading;
using System.Diagnostics;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
@ -51,11 +52,21 @@ namespace Google.GRPC.Core.Tests
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
[Test]
public void EmptyCall()
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void UnaryCall()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@ -69,19 +80,71 @@ namespace Google.GRPC.Core.Tests
var call = new Call<string, string>(unaryEchoStringMethod, channel);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
}
server.ShutdownAsync().Wait();
}
GrpcEnvironment.Shutdown();
[Test]
public void UnaryCallPerformance()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < 1000; i++)
{
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
}
server.ShutdownAsync().Wait();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
[Test]
public void UnknownMethodHandler()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build());
int port = server.AddPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
try {
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
Assert.Fail();
} catch(RpcException e) {
Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode);
}
}
server.ShutdownAsync().Wait();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver)
{
responseObserver.OnNext(request);
responseObserver.OnCompleted();
}
}
}

@ -25,10 +25,11 @@ INSTALLATION AND USAGE: WINDOWS
INSTALLATION AND USAGE: LINUX & MONO
------------------------------------
- Compile and install the gRPC C Core library
- Compile and install the gRPC C# extension library (that will be used via
P/Invoke from C#).
```
make shared_c
sudo make install
make grpc_csharp_ext
sudo make install_grpc_csharp_ext
```
- Prerequisites for development: Mono framework, MonoDevelop (IDE)

@ -32,9 +32,11 @@
*/
#include <grpc/support/port_platform.h>
#include <grpc/support/alloc.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include <grpc/support/string.h>
#include <string.h>
@ -58,6 +60,139 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
return bb;
}
typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error,
void *batch_context);
/*
* Helper to maintain lifetime of batch op inputs and store batch op outputs.
*/
typedef struct gprcsharp_batch_context {
grpc_metadata_array send_initial_metadata;
grpc_byte_buffer *send_message;
struct {
grpc_metadata_array trailing_metadata;
char *status_details;
} send_status_from_server;
grpc_metadata_array recv_initial_metadata;
grpc_byte_buffer *recv_message;
struct {
grpc_metadata_array trailing_metadata;
grpc_status_code status;
char *status_details;
size_t status_details_capacity;
} recv_status_on_client;
int recv_close_on_server_cancelled;
struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
/* callback will be called upon completion */
callback_funcptr callback;
} grpcsharp_batch_context;
grpcsharp_batch_context *grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
}
/**
* Destroys metadata array including keys and values.
*/
void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) {
if (!array->metadata) {
return;
}
/* TODO: destroy also keys and values */
grpc_metadata_array_destroy(array);
}
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata));
grpc_byte_buffer_destroy(ctx->send_message);
grpcsharp_metadata_array_destroy_recursive(
&(ctx->send_status_from_server.trailing_metadata));
gpr_free(ctx->send_status_from_server.status_details);
grpc_metadata_array_destroy(&(ctx->recv_initial_metadata));
grpc_byte_buffer_destroy(ctx->recv_message);
grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details);
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
supposed
to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata));
gpr_free(ctx);
}
GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(
const grpcsharp_batch_context *ctx) {
if (!ctx->recv_message) {
return -1;
}
return grpc_byte_buffer_length(ctx->recv_message);
}
/*
* Copies data from recv_message to a buffer. Fatal error occurs if
* buffer is too small.
*/
GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer(
const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) {
grpc_byte_buffer_reader *reader;
gpr_slice slice;
size_t offset = 0;
reader = grpc_byte_buffer_reader_create(ctx->recv_message);
while (grpc_byte_buffer_reader_next(reader, &slice)) {
size_t len = GPR_SLICE_LENGTH(slice);
GPR_ASSERT(offset + len <= buffer_len);
memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
offset += len;
gpr_slice_unref(slice);
}
grpc_byte_buffer_reader_destroy(reader);
}
GPR_EXPORT grpc_status_code GPR_CALLTYPE
grpcsharp_batch_context_recv_status_on_client_status(
const grpcsharp_batch_context *ctx) {
return ctx->recv_status_on_client.status;
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_recv_status_on_client_details(
const grpcsharp_batch_context *ctx) {
return ctx->recv_status_on_client.status_details;
}
GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call;
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_method(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call_details.method;
}
/* Init & shutdown */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }
@ -71,18 +206,6 @@ grpcsharp_completion_queue_create(void) {
return grpc_completion_queue_create();
}
GPR_EXPORT grpc_event *GPR_CALLTYPE
grpcsharp_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline) {
return grpc_completion_queue_next(cq, deadline);
}
GPR_EXPORT grpc_event *GPR_CALLTYPE
grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline) {
return grpc_completion_queue_pluck(cq, tag, deadline);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_completion_queue_shutdown(grpc_completion_queue *cq) {
grpc_completion_queue_shutdown(cq);
@ -96,12 +219,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event *ev;
grpcsharp_batch_context *batch_context;
grpc_completion_type t;
void(GPR_CALLTYPE * callback)(grpc_event *);
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev->type;
if (ev->tag) {
if (t == GRPC_OP_COMPLETE && ev->tag) {
/* NEW API handler */
batch_context = (grpcsharp_batch_context *)ev->tag;
batch_context->callback(ev->data.op_complete, batch_context);
grpcsharp_batch_context_destroy(batch_context);
} else if (ev->tag) {
/* call the callback in ev->tag */
/* C forbids to cast object pointers to function pointers, so
* we cast to intptr first.
@ -129,204 +258,286 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method,
const char *host, gpr_timespec deadline) {
return grpc_channel_create_call_old(channel, method, host, deadline);
grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call(channel, cq, method, host, deadline);
}
/* Event */
/* Timespec */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_event_finish(grpc_event *event) {
grpc_event_finish(event);
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_event_type(const grpc_event *event) {
return event->type;
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
return gpr_inf_future;
}
GPR_EXPORT grpc_op_error GPR_CALLTYPE
grpcsharp_event_write_accepted(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED);
return event->data.invoke_accepted;
GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
return sizeof(gpr_timespec);
}
GPR_EXPORT grpc_op_error GPR_CALLTYPE
grpcsharp_event_finish_accepted(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED);
return event->data.finish_accepted;
}
/* Call */
GPR_EXPORT grpc_status_code GPR_CALLTYPE
grpcsharp_event_finished_status(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISHED);
return event->data.finished.status;
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
return grpc_call_cancel(call);
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_event_finished_details(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISHED);
return event->data.finished.details;
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description) {
return grpc_call_cancel_with_status(call, status, description);
}
GPR_EXPORT gpr_intptr GPR_CALLTYPE
grpcsharp_event_read_length(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_READ);
if (!event->data.read) {
return -1;
}
return grpc_byte_buffer_length(event->data.read);
GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
grpc_call_destroy(call);
}
/*
* Copies data from read event to a buffer. Fatal error occurs if
* buffer is too small.
*/
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_event_read_copy_to_buffer(const grpc_event *event, char *buffer,
size_t buffer_len) {
grpc_byte_buffer_reader *reader;
gpr_slice slice;
size_t offset = 0;
grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
const char *buffer, size_t len,
void *tag, gpr_uint32 flags) {
grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
GRPC_CALL_OK);
grpc_byte_buffer_destroy(byte_buffer);
}
GPR_ASSERT(event->type == GRPC_READ);
reader = grpc_byte_buffer_reader_create(event->data.read);
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_ASSERT(event->data.read);
while (grpc_byte_buffer_reader_next(reader, &slice)) {
size_t len = GPR_SLICE_LENGTH(slice);
GPR_ASSERT(offset + len <= buffer_len);
memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
offset += len;
gpr_slice_unref(slice);
}
grpc_byte_buffer_reader_destroy(reader);
}
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_event_call(const grpc_event *event) {
/* we only allow this for newly incoming server calls. */
GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
return event->call;
}
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_event_server_rpc_new_method(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
return event->data.server_rpc_new.method;
}
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
/* Timespec */
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message = &(ctx->recv_message);
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
return gpr_inf_future;
}
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[5].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[5].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[5].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
return sizeof(gpr_timespec);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Call */
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_add_metadata_old(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
return grpc_call_add_metadata_old(call, metadata, flags);
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[2].op = GRPC_OP_RECV_MESSAGE;
ops[2].data.recv_message = &(ctx->recv_message);
ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[3].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[3].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[3].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags) {
return grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, flags);
grpcsharp_call_start_server_streaming(grpc_call *call,
callback_funcptr callback,
const char *send_buffer,
size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[4].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[4].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[4].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[4].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_server_accept_old(grpc_call *call, grpc_completion_queue *cq,
void *finished_tag) {
return grpc_call_server_accept_old(call, cq, finished_tag);
grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[2].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[2].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[2].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[2].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_server_end_initial_metadata_old(grpc_call *call,
gpr_uint32 flags) {
return grpc_call_server_end_initial_metadata_old(call, flags);
}
grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
return grpc_call_cancel(call);
}
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description) {
return grpc_call_cancel_with_status(call, status, description);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_write_old(grpc_call *call, grpc_byte_buffer *byte_buffer,
void *tag, gpr_uint32 flags) {
return grpc_call_start_write_old(call, byte_buffer, tag, flags);
grpcsharp_call_send_close_from_client(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_write_status_old(grpc_call *call,
grpc_status_code status_code,
const char *status_message, void *tag) {
return grpc_call_start_write_status_old(call, status_code, status_message,
tag);
grpcsharp_call_send_status_from_server(grpc_call *call,
callback_funcptr callback,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
gpr_strdup(status_details);
ops[0].data.send_status_from_server.trailing_metadata = NULL;
ops[0].data.send_status_from_server.trailing_metadata_count = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_writes_done_old(grpc_call *call, void *tag) {
return grpc_call_writes_done_old(call, tag);
grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_read_old(grpc_call *call, void *tag) {
return grpc_call_start_read_old(call, tag);
}
grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[2];
GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
grpc_call_destroy(call);
}
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
const char *buffer, size_t len,
void *tag, gpr_uint32 flags) {
grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
GRPC_CALL_OK);
grpc_byte_buffer_destroy(byte_buffer);
}
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
/* Server */
ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[1].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call_old(grpc_server *server, void *tag_new) {
return grpc_server_request_call_old(server, tag_new);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
return grpc_server_create(cq, args);
}
GPR_EXPORT int GPR_CALLTYPE
GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) {
return grpc_server_add_http2_port(server, addr);
}
GPR_EXPORT int GPR_CALLTYPE
grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) {
return grpc_server_add_secure_http2_port(server, addr);
}
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
grpc_server_start(server);
}
@ -343,3 +554,14 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) {
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
grpc_server_destroy(server);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx);
}

@ -194,7 +194,9 @@ LDFLAGS += -g -fPIC
INCLUDES = . include $(GENDIR)
ifeq ($(SYSTEM),Darwin)
INCLUDES += /usr/local/ssl/include /opt/local/include
LIBS = m z
LDFLAGS += -L/usr/local/ssl/lib -L/opt/local/lib
else
LIBS = rt m z pthread
LDFLAGS += -pthread
@ -527,6 +529,15 @@ shared_cxx: \
% endfor
shared_csharp: shared_c \
% for lib in libs:
% if lib.build == 'all' and lib.language == 'csharp':
$(LIBDIR)/$(CONFIG)/lib${lib.name}.$(SHARED_EXT)\
% endif
% endfor
grpc_csharp_ext: shared_csharp
privatelibs: privatelibs_c privatelibs_cxx
privatelibs_c: \
@ -660,6 +671,18 @@ ifeq ($(CONFIG),opt)
% endfor
endif
strip-shared_csharp: shared_csharp
ifeq ($(CONFIG),opt)
% for lib in libs:
% if lib.language == "csharp":
% if lib.build == "all":
$(E) "[STRIP] Stripping lib${lib.name}.so"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/lib${lib.name}.$(SHARED_EXT)
% endif
% endif
% endfor
endif
% for p in protos:
ifeq ($(NO_PROTOC),true)
$(GENDIR)/${p}.pb.cc: protoc_dep_error
@ -699,6 +722,10 @@ install_c: install-headers_c install-static_c install-shared_c
install_cxx: install-headers_cxx install-static_cxx install-shared_cxx
install_csharp: install-shared_csharp install_c
install_grpc_csharp_ext: install_csharp
install-headers: install-headers_c install-headers_cxx
install-headers_c:
@ -779,6 +806,30 @@ ifneq ($(SYSTEM),Darwin)
endif
endif
install-shared_csharp: shared_csharp strip-shared_csharp
% for lib in libs:
% if lib.language == "csharp":
% if lib.build == "all":
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing ${lib.name}.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/${lib.name}.$(SHARED_EXT) $(prefix)/lib/${lib.name}.$(SHARED_EXT)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/lib${lib.name}-imp.a $(prefix)/lib/lib${lib.name}-imp.a
else
$(E) "[INSTALL] Installing lib${lib.name}.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/lib${lib.name}.$(SHARED_EXT) $(prefix)/lib/lib${lib.name}.$(SHARED_EXT)
ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf lib${lib.name}.$(SHARED_EXT) $(prefix)/lib/lib${lib.name}.so
endif
endif
% endif
% endif
% endfor
ifneq ($(SYSTEM),MINGW32)
ifneq ($(SYSTEM),Darwin)
$(Q) ldconfig
endif
endif
clean:
$(Q) $(RM) -rf $(OBJDIR) $(LIBDIR) $(BINDIR) $(GENDIR)

@ -128,7 +128,7 @@ static void test_create_channel_stack(void) {
grpc_channel_stack_destroy(channel_stack);
gpr_free(channel_stack);
grpc_mdctx_orphan(metadata_context);
grpc_mdctx_unref(metadata_context);
}
int main(int argc, char **argv) {

@ -182,7 +182,7 @@ static void test_case(size_t key_prefix_len, size_t value_prefix_len,
gpr_free(stk);
grpc_metadata_buffer_destroy(&buffer, GRPC_OP_OK);
grpc_mdctx_orphan(mdctx);
grpc_mdctx_unref(mdctx);
}
int main(int argc, char **argv) {

@ -138,7 +138,7 @@ static void test_oauth2_token_fetcher_creds_parsing_ok(void) {
GPR_ASSERT(!strcmp(grpc_mdstr_as_c_string(token_elem->value),
"Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_"));
grpc_mdelem_unref(token_elem);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_bad_http_status(void) {
@ -150,7 +150,7 @@ static void test_oauth2_token_fetcher_creds_parsing_bad_http_status(void) {
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_empty_http_body(void) {
@ -161,7 +161,7 @@ static void test_oauth2_token_fetcher_creds_parsing_empty_http_body(void) {
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_invalid_json(void) {
@ -176,7 +176,7 @@ static void test_oauth2_token_fetcher_creds_parsing_invalid_json(void) {
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_missing_token(void) {
@ -190,7 +190,7 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token(void) {
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_missing_token_type(void) {
@ -205,7 +205,7 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token_type(void) {
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_missing_token_lifetime(
@ -220,7 +220,7 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token_lifetime(
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
&response, ctx, &token_elem, &token_lifetime) ==
GRPC_CREDENTIALS_ERROR);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void check_metadata(expected_md *expected, grpc_mdelem **md_elems,

@ -214,7 +214,7 @@ static void test_vectors(grpc_slice_split_mode mode) {
"set-cookie",
"foo=ASDJKHQKBZXOQWEOPIUAXQWEOIU; max-age=3600; version=1", NULL);
grpc_chttp2_hpack_parser_destroy(&parser);
grpc_mdctx_orphan(mdctx);
grpc_mdctx_unref(mdctx);
}
int main(int argc, char **argv) {

@ -126,7 +126,7 @@ static void test_static_lookup(void) {
assert_index(&tbl, 61, "www-authenticate", "");
grpc_chttp2_hptbl_destroy(&tbl);
grpc_mdctx_orphan(mdctx);
grpc_mdctx_unref(mdctx);
}
static void test_many_additions(void) {
@ -158,7 +158,7 @@ static void test_many_additions(void) {
}
grpc_chttp2_hptbl_destroy(&tbl);
grpc_mdctx_orphan(mdctx);
grpc_mdctx_unref(mdctx);
}
static grpc_chttp2_hptbl_find_result find_simple(grpc_chttp2_hptbl *tbl,
@ -262,7 +262,7 @@ static void test_find(void) {
GPR_ASSERT(r.has_value == 0);
grpc_chttp2_hptbl_destroy(&tbl);
grpc_mdctx_orphan(mdctx);
grpc_mdctx_unref(mdctx);
}
int main(int argc, char **argv) {

@ -309,7 +309,7 @@ static void run_test(void (*test)(), const char *name) {
grpc_sopb_init(&g_sopb);
test();
grpc_chttp2_hpack_compressor_destroy(&g_compressor);
grpc_mdctx_orphan(g_mdctx);
grpc_mdctx_unref(g_mdctx);
grpc_sopb_destroy(&g_sopb);
}

@ -44,7 +44,7 @@
#define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__)
/* a large number */
#define MANY 100000
#define MANY 10000
static void test_no_op(void) {
grpc_mdctx *ctx;
@ -52,7 +52,7 @@ static void test_no_op(void) {
LOG_TEST();
ctx = grpc_mdctx_create();
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_create_string(void) {
@ -71,7 +71,7 @@ static void test_create_string(void) {
GPR_ASSERT(gpr_slice_str_cmp(s3->slice, "very much not hello") == 0);
grpc_mdstr_unref(s1);
grpc_mdstr_unref(s2);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
grpc_mdstr_unref(s3);
}
@ -95,7 +95,7 @@ static void test_create_metadata(void) {
grpc_mdelem_unref(m1);
grpc_mdelem_unref(m2);
grpc_mdelem_unref(m3);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_create_many_ephemeral_metadata(void) {
@ -116,7 +116,7 @@ static void test_create_many_ephemeral_metadata(void) {
/* capacity should not grow */
GPR_ASSERT(mdtab_capacity_before ==
grpc_mdctx_get_mdtab_capacity_test_only(ctx));
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_create_many_persistant_metadata(void) {
@ -145,7 +145,7 @@ static void test_create_many_persistant_metadata(void) {
for (i = 0; i < MANY; i++) {
grpc_mdelem_unref(created[i]);
}
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
gpr_free(created);
}
@ -171,7 +171,7 @@ static void test_spin_creating_the_same_thing(void) {
GPR_ASSERT(grpc_mdctx_get_mdtab_count_test_only(ctx) == 1);
GPR_ASSERT(grpc_mdctx_get_mdtab_free_test_only(ctx) == 1);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_things_stick_around(void) {
@ -218,7 +218,7 @@ static void test_things_stick_around(void) {
}
}
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
gpr_free(strs);
gpr_free(shuf);
}
@ -245,7 +245,7 @@ static void test_slices_work(void) {
gpr_slice_unref(slice);
grpc_mdstr_unref(str);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
static void test_base64_and_huffman_works(void) {
@ -264,7 +264,7 @@ static void test_base64_and_huffman_works(void) {
gpr_slice_unref(slice2);
grpc_mdstr_unref(str);
grpc_mdctx_orphan(ctx);
grpc_mdctx_unref(ctx);
}
int main(int argc, char **argv) {

@ -927,7 +927,7 @@ void grpc_transport_end2end_tests(grpc_transport_test_config *config) {
test_request_with_flow_ctl_cb(config, interesting_message_lengths[i]);
}
grpc_mdctx_orphan(g_metadata_context);
grpc_mdctx_unref(g_metadata_context);
gpr_log(GPR_INFO, "tests completed ok");
}

@ -38,6 +38,7 @@
#include "test/cpp/util/echo_duplicate.pb.h"
#include "test/cpp/util/echo.pb.h"
#include "src/cpp/util/time.h"
#include <grpc++/async_unary_call.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@ -134,21 +135,23 @@ class AsyncEnd2endTest : public ::testing::Test {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
response_reader(stub_->Echo(
&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
client_ok(1);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
server_ok(3);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(4));
client_ok(4);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
@ -351,8 +354,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -362,13 +365,15 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, client_initial_metadata.size());
client_ok(1);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
server_ok(3);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(4));
client_ok(4);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
@ -391,8 +396,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -400,22 +405,26 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
client_ok(1);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(4));
response_reader->ReadInitialMetadata(tag(4));
client_ok(4);
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
server_ok(4);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
server_ok(5);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
}
TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
@ -435,8 +444,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -444,6 +453,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
client_ok(1);
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
@ -452,8 +462,9 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
server_ok(4);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(5));
client_ok(5);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -477,17 +488,20 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta2(
"key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta3("key3", "val3");
std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta6("key4-bin",
{"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta5("key5", "val5");
std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
std::pair<grpc::string, grpc::string> meta4("key6-bin",
{"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -497,27 +511,31 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, client_initial_metadata.size());
client_ok(1);
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
response_reader->ReadInitialMetadata(tag(4));
client_ok(4);
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(4));
response_writer.Finish(send_response, Status::OK, tag(5));
server_ok(4);
server_ok(5);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);

@ -62,15 +62,18 @@ class SimpleConfig(object):
# ValgrindConfig: compile with some CONFIG=config, but use valgrind to run
class ValgrindConfig(object):
def __init__(self, config, tool):
def __init__(self, config, tool, args=[]):
self.build_config = config
self.tool = tool
self.args = args
self.maxjobs = 2 * multiprocessing.cpu_count()
self.allow_hashing = False
def job_spec(self, binary, hash_targets):
return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool, binary],
hash_targets=None)
return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] +
self.args + [binary],
shortname='valgrind %s' % binary,
hash_targets=None)
class CLanguage(object):
@ -144,7 +147,7 @@ _CONFIGS = {
'asan': SimpleConfig('asan', environ={
'ASAN_OPTIONS': 'detect_leaks=1:color=always:suppressions=tools/tsan_suppressions.txt'}),
'gcov': SimpleConfig('gcov'),
'memcheck': ValgrindConfig('valgrind', 'memcheck'),
'memcheck': ValgrindConfig('valgrind', 'memcheck', ['--leak-check=full']),
'helgrind': ValgrindConfig('dbg', 'helgrind')
}

Loading…
Cancel
Save