Merge branch 'master' into nocopyinterception

pull/17630/head
Yash Tibrewal 6 years ago
commit dd067fd390
  1. 4
      BUILD
  2. 2
      CMakeLists.txt
  3. 4
      Makefile
  4. 4
      build.yaml
  5. 3
      doc/g_stands_for.md
  6. 4
      gRPC-C++.podspec
  7. 2
      gRPC-Core.podspec
  8. 2
      gRPC-ProtoRPC.podspec
  9. 2
      gRPC-RxLibrary.podspec
  10. 2
      gRPC.podspec
  11. 46
      include/grpcpp/impl/codegen/call_op_set.h
  12. 16
      include/grpcpp/impl/codegen/interceptor.h
  13. 45
      include/grpcpp/impl/codegen/interceptor_common.h
  14. 2
      include/grpcpp/impl/codegen/server_interface.h
  15. 4
      package.xml
  16. 2
      src/core/lib/surface/version.cc
  17. 2
      src/cpp/common/version_cc.cc
  18. 4
      src/cpp/server/server_cc.cc
  19. 2
      src/csharp/Grpc.Core/Version.csproj.include
  20. 4
      src/csharp/Grpc.Core/VersionInfo.cs
  21. 2
      src/csharp/build_packages_dotnetcli.bat
  22. 2
      src/csharp/build_unitypackage.bat
  23. 2
      src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
  24. 2
      src/objective-c/GRPCClient/private/version.h
  25. 2
      src/objective-c/tests/version.h
  26. 2
      src/php/composer.json
  27. 2
      src/php/ext/grpc/version.h
  28. 2
      src/python/grpcio/grpc/_grpcio_metadata.py
  29. 2
      src/python/grpcio/grpc_version.py
  30. 2
      src/python/grpcio_channelz/grpc_version.py
  31. 2
      src/python/grpcio_health_checking/grpc_version.py
  32. 2
      src/python/grpcio_reflection/grpc_version.py
  33. 2
      src/python/grpcio_status/grpc_version.py
  34. 2
      src/python/grpcio_testing/grpc_version.py
  35. 2
      src/python/grpcio_tests/grpc_version.py
  36. 4
      src/python/grpcio_tests/tests/unit/_cython/_fork_test.py
  37. 104
      src/python/grpcio_tests/tests/unit/_logging_test.py
  38. 2
      src/ruby/lib/grpc/version.rb
  39. 2
      src/ruby/tools/version.rb
  40. 285
      test/cpp/end2end/client_interceptors_end2end_test.cc
  41. 10
      test/cpp/end2end/interceptors_util.cc
  42. 3
      test/cpp/end2end/interceptors_util.h
  43. 118
      test/cpp/qps/client.h
  44. 201
      test/cpp/qps/client_callback.cc
  45. 2
      tools/distrib/python/grpcio_tools/grpc_version.py
  46. 2
      tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh
  47. 2
      tools/doxygen/Doxyfile.c++
  48. 2
      tools/doxygen/Doxyfile.c++.internal

@ -64,11 +64,11 @@ config_setting(
)
# This should be updated along with build.yaml
g_stands_for = "goose"
g_stands_for = "gold"
core_version = "7.0.0-dev"
version = "1.18.0-dev"
version = "1.19.0-dev"
GPR_PUBLIC_HDRS = [
"include/grpc/support/alloc.h",

@ -24,7 +24,7 @@
cmake_minimum_required(VERSION 2.8)
set(PACKAGE_NAME "grpc")
set(PACKAGE_VERSION "1.18.0-dev")
set(PACKAGE_VERSION "1.19.0-dev")
set(PACKAGE_STRING "${PACKAGE_NAME} ${PACKAGE_VERSION}")
set(PACKAGE_TARNAME "${PACKAGE_NAME}-${PACKAGE_VERSION}")
set(PACKAGE_BUGREPORT "https://github.com/grpc/grpc/issues/")

@ -438,8 +438,8 @@ Q = @
endif
CORE_VERSION = 7.0.0-dev
CPP_VERSION = 1.18.0-dev
CSHARP_VERSION = 1.18.0-dev
CPP_VERSION = 1.19.0-dev
CSHARP_VERSION = 1.19.0-dev
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)

@ -13,8 +13,8 @@ settings:
'#09': Per-language overrides are possible with (eg) ruby_version tag here
'#10': See the expand_version.py for all the quirks here
core_version: 7.0.0-dev
g_stands_for: goose
version: 1.18.0-dev
g_stands_for: gold
version: 1.19.0-dev
filegroups:
- name: alts_proto
headers:

@ -17,4 +17,5 @@
- 1.15 'g' stands for ['glider'](https://github.com/grpc/grpc/tree/v1.15.x)
- 1.16 'g' stands for ['gao'](https://github.com/grpc/grpc/tree/v1.16.x)
- 1.17 'g' stands for ['gizmo'](https://github.com/grpc/grpc/tree/v1.17.x)
- 1.18 'g' stands for ['goose'](https://github.com/grpc/grpc/tree/master)
- 1.18 'g' stands for ['goose'](https://github.com/grpc/grpc/tree/v1.18.x)
- 1.19 'g' stands for ['gold'](https://github.com/grpc/grpc/tree/master)

@ -23,7 +23,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
# version = '1.18.0-dev'
# version = '1.19.0-dev'
version = '0.0.6-dev'
s.version = version
s.summary = 'gRPC C++ library'
@ -31,7 +31,7 @@ Pod::Spec.new do |s|
s.license = 'Apache License, Version 2.0'
s.authors = { 'The gRPC contributors' => 'grpc-packages@google.com' }
grpc_version = '1.18.0-dev'
grpc_version = '1.19.0-dev'
s.source = {
:git => 'https://github.com/grpc/grpc.git',

@ -22,7 +22,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-Core'
version = '1.18.0-dev'
version = '1.19.0-dev'
s.version = version
s.summary = 'Core cross-platform gRPC library, written in C'
s.homepage = 'https://grpc.io'

@ -21,7 +21,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-ProtoRPC'
version = '1.18.0-dev'
version = '1.19.0-dev'
s.version = version
s.summary = 'RPC library for Protocol Buffers, based on gRPC'
s.homepage = 'https://grpc.io'

@ -21,7 +21,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-RxLibrary'
version = '1.18.0-dev'
version = '1.19.0-dev'
s.version = version
s.summary = 'Reactive Extensions library for iOS/OSX.'
s.homepage = 'https://grpc.io'

@ -20,7 +20,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC'
version = '1.18.0-dev'
version = '1.19.0-dev'
s.version = version
s.summary = 'gRPC client library for iOS/OSX'
s.homepage = 'https://grpc.io'

@ -317,11 +317,15 @@ class CallOpSendMessage {
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if ((msg_ == nullptr && !send_buf_.Valid()) || hijacked_) return;
if (msg_ == nullptr && !send_buf_.Valid()) return;
if (hijacked_) {
serializer_ = nullptr;
return;
}
if (msg_ != nullptr) {
GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
serializer_ = nullptr;
}
serializer_ = nullptr;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = write_options_.flags();
@ -330,21 +334,38 @@ class CallOpSendMessage {
// Flags are per-message: clear them after use.
write_options_.Clear();
}
void FinishOp(bool* status) { send_buf_.Clear(); }
void FinishOp(bool* status) {
if (msg_ == nullptr && !send_buf_.Valid()) return;
if (hijacked_ && failed_send_) {
// Hijacking interceptor failed this Op
*status = false;
} else if (!*status) {
// This Op was passed down to core and the Op failed
failed_send_ = true;
}
}
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
if (msg_ == nullptr && !send_buf_.Valid()) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
interceptor_methods->SetSendMessage(&send_buf_, &msg_, serializer_);
interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
serializer_);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
if (msg_ != nullptr || send_buf_.Valid()) {
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_SEND_MESSAGE);
}
send_buf_.Clear();
msg_ = nullptr;
// The contents of the SendMessage value that was previously set
// has had its references stolen by core's operations
interceptor_methods->SetSendMessage(nullptr, nullptr, nullptr);
interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
@ -354,6 +375,7 @@ class CallOpSendMessage {
private:
const void* msg_ = nullptr; // The original non-serialized message
bool hijacked_ = false;
bool failed_send_ = false;
ByteBuffer send_buf_;
WriteOptions write_options_;
std::function<Status(const void*)> serializer_;
@ -379,6 +401,7 @@ Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
// Serialize immediately only if we do not have access to the message pointer
if (msg_ == nullptr) {
return serializer_(&message);
serializer_ = nullptr;
}
return Status();
}
@ -449,14 +472,16 @@ class CallOpRecvMessage {
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
if (message_ == nullptr) return;
interceptor_methods->SetRecvMessage(message_, &got_message);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
if (message_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
@ -544,20 +569,23 @@ class CallOpGenericRecvMessage {
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
if (!deserialize_) return;
interceptor_methods->SetRecvMessage(message_, &got_message);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
got_message = true;
}
private:

@ -46,9 +46,10 @@ namespace experimental {
/// operation has been requested and it is available. POST_RECV means that a
/// result is available but has not yet been passed back to the application.
enum class InterceptionHookPoints {
/// The first two in this list are for clients and servers
/// The first three in this list are for clients and servers
PRE_SEND_INITIAL_METADATA,
PRE_SEND_MESSAGE,
POST_SEND_MESSAGE,
PRE_SEND_STATUS, // server only
PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary
/// The following three are for hijacked clients only and can only be
@ -119,6 +120,10 @@ class InterceptorBatchMethods {
virtual void ModifySendMessage(const void* message) = 0;
/// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE
/// interceptions.
virtual bool GetSendMessageStatus() = 0;
/// Returns a modifiable multimap of the initial metadata to be sent. Valid
/// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates
/// that this field is not valid.
@ -164,6 +169,15 @@ class InterceptorBatchMethods {
/// started from interceptors without infinite regress through the interceptor
/// list.
virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0;
/// On a hijacked RPC, an interceptor can decide to fail a PRE_RECV_MESSAGE
/// op. This would be a signal to the reader that there will be no more
/// messages, or the stream has failed or been cancelled.
virtual void FailHijackedRecvMessage() = 0;
/// On a hijacked RPC/ to-be hijacked RPC, this can be called to fail a SEND
/// MESSAGE op
virtual void FailHijackedSendMessage() = 0;
};
/// Interface for an interceptor. Interceptor authors must create a class

@ -98,6 +98,10 @@ class InterceptorBatchMethodsImpl
*orig_send_message_ = message;
}
bool GetSendMessageStatus() override {
return !*fail_send_message_;
}
std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override {
return send_initial_metadata_;
}
@ -127,15 +131,23 @@ class InterceptorBatchMethodsImpl
Status* GetRecvStatus() override { return recv_status_; }
void FailHijackedSendMessage() override {
GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]);
*fail_send_message_ = true;
}
std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
override {
return recv_trailing_metadata_->map();
}
void SetSendMessage(ByteBuffer* buf, const void** msg,
bool* fail_send_message,
std::function<Status(const void*)> serializer) {
send_message_ = buf;
orig_send_message_ = msg;
fail_send_message_ = fail_send_message;
serializer_ = serializer;
}
@ -156,7 +168,10 @@ class InterceptorBatchMethodsImpl
send_trailing_metadata_ = metadata;
}
void SetRecvMessage(void* message) { recv_message_ = message; }
void SetRecvMessage(void* message, bool* got_message) {
recv_message_ = message;
got_message_ = got_message;
}
void SetRecvInitialMetadata(MetadataMap* map) {
recv_initial_metadata_ = map;
@ -179,6 +194,12 @@ class InterceptorBatchMethodsImpl
info->channel(), current_interceptor_index_ + 1));
}
void FailHijackedRecvMessage() override {
GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]);
*got_message_ = false;
}
// Clears all state
void ClearState() {
reverse_ = false;
@ -356,6 +377,7 @@ class InterceptorBatchMethodsImpl
std::function<void(void)> callback_;
ByteBuffer* send_message_ = nullptr;
bool* fail_send_message_ = nullptr;
const void** orig_send_message_ = nullptr;
std::function<Status(const void*)> serializer_;
@ -369,6 +391,7 @@ class InterceptorBatchMethodsImpl
std::multimap<grpc::string, grpc::string>* send_trailing_metadata_ = nullptr;
void* recv_message_ = nullptr;
bool* got_message_ = nullptr;
MetadataMap* recv_initial_metadata_ = nullptr;
@ -410,6 +433,14 @@ class CancelInterceptorBatchMethods
return nullptr;
}
bool GetSendMessageStatus() override {
GPR_CODEGEN_ASSERT(
false &&
"It is illegal to call GetSendMessageStatus on a method which "
"has a Cancel notification");
return false;
}
const void* GetSendMessage() override {
GPR_CODEGEN_ASSERT(
false &&
@ -490,6 +521,18 @@ class CancelInterceptorBatchMethods
"method which has a Cancel notification");
return std::unique_ptr<ChannelInterface>(nullptr);
}
void FailHijackedRecvMessage() override {
GPR_CODEGEN_ASSERT(false &&
"It is illegal to call FailHijackedRecvMessage on a "
"method which has a Cancel notification");
}
void FailHijackedSendMessage() override {
GPR_CODEGEN_ASSERT(false &&
"It is illegal to call FailHijackedSendMessage on a "
"method which has a Cancel notification");
}
};
} // namespace internal
} // namespace grpc

@ -272,7 +272,7 @@ class ServerInterface : public internal::CallHook {
/* Set interception point for recv message */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
interceptor_methods_.SetRecvMessage(request_);
interceptor_methods_.SetRecvMessage(request_, nullptr);
return RegisteredAsyncRequest::FinalizeResult(tag, status);
}

@ -13,8 +13,8 @@
<date>2018-01-19</date>
<time>16:06:07</time>
<version>
<release>1.18.0dev</release>
<api>1.18.0dev</api>
<release>1.19.0dev</release>
<api>1.19.0dev</api>
</version>
<stability>
<release>beta</release>

@ -25,4 +25,4 @@
const char* grpc_version_string(void) { return "7.0.0-dev"; }
const char* grpc_g_stands_for(void) { return "goose"; }
const char* grpc_g_stands_for(void) { return "gold"; }

@ -22,5 +22,5 @@
#include <grpcpp/grpcpp.h>
namespace grpc {
grpc::string Version() { return "1.18.0-dev"; }
grpc::string Version() { return "1.19.0-dev"; }
} // namespace grpc

@ -278,7 +278,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
request_payload_ = nullptr;
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
interceptor_methods_.SetRecvMessage(request_);
interceptor_methods_.SetRecvMessage(request_, nullptr);
}
if (interceptor_methods_.RunInterceptors(
@ -446,7 +446,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
req_->request_payload_ = nullptr;
req_->interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
req_->interceptor_methods_.SetRecvMessage(req_->request_);
req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
}
if (req_->interceptor_methods_.RunInterceptors(

@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
<GrpcCsharpVersion>1.18.0-dev</GrpcCsharpVersion>
<GrpcCsharpVersion>1.19.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.6.1</GoogleProtobufVersion>
</PropertyGroup>
</Project>

@ -33,11 +33,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
public const string CurrentAssemblyFileVersion = "1.18.0.0";
public const string CurrentAssemblyFileVersion = "1.19.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
public const string CurrentVersion = "1.18.0-dev";
public const string CurrentVersion = "1.19.0-dev";
}
}

@ -13,7 +13,7 @@
@rem limitations under the License.
@rem Current package versions
set VERSION=1.18.0-dev
set VERSION=1.19.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe

@ -13,7 +13,7 @@
@rem limitations under the License.
@rem Current package versions
set VERSION=1.18.0-dev
set VERSION=1.19.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe

@ -42,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
v = '1.18.0-dev'
v = '1.19.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC

@ -22,4 +22,4 @@
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"1.18.0-dev"
#define GRPC_OBJC_VERSION_STRING @"1.19.0-dev"

@ -22,5 +22,5 @@
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"1.18.0-dev"
#define GRPC_OBJC_VERSION_STRING @"1.19.0-dev"
#define GRPC_C_VERSION_STRING @"7.0.0-dev"

@ -2,7 +2,7 @@
"name": "grpc/grpc-dev",
"description": "gRPC library for PHP - for Developement use only",
"license": "Apache-2.0",
"version": "1.18.0",
"version": "1.19.0",
"require": {
"php": ">=5.5.0",
"google/protobuf": "^v3.3.0"

@ -20,6 +20,6 @@
#ifndef VERSION_H
#define VERSION_H
#define PHP_GRPC_VERSION "1.18.0dev"
#define PHP_GRPC_VERSION "1.19.0dev"
#endif /* VERSION_H */

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
__version__ = """1.18.0.dev0"""
__version__ = """1.19.0.dev0"""

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_channelz/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_status/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -27,6 +27,7 @@ def _get_number_active_threads():
class ForkPosixTester(unittest.TestCase):
def setUp(self):
self._saved_fork_support_flag = cygrpc._GRPC_ENABLE_FORK_SUPPORT
cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
def testForkManagedThread(self):
@ -50,6 +51,9 @@ class ForkPosixTester(unittest.TestCase):
thread.join()
self.assertEqual(0, _get_number_active_threads())
def tearDown(self):
cygrpc._GRPC_ENABLE_FORK_SUPPORT = self._saved_fork_support_flag
@unittest.skipUnless(os.name == 'nt', 'Windows-specific tests')
class ForkWindowsTester(unittest.TestCase):

@ -14,66 +14,86 @@
"""Test of gRPC Python's interaction with the python logging module"""
import unittest
import six
from six.moves import reload_module
import logging
import grpc
import functools
import subprocess
import sys
INTERPRETER = sys.executable
def patch_stderr(f):
@functools.wraps(f)
def _impl(*args, **kwargs):
old_stderr = sys.stderr
sys.stderr = six.StringIO()
try:
f(*args, **kwargs)
finally:
sys.stderr = old_stderr
class LoggingTest(unittest.TestCase):
return _impl
def test_logger_not_occupied(self):
script = """if True:
import logging
import grpc
def isolated_logging(f):
if len(logging.getLogger().handlers) != 0:
raise Exception('expected 0 logging handlers')
@functools.wraps(f)
def _impl(*args, **kwargs):
reload_module(logging)
reload_module(grpc)
try:
f(*args, **kwargs)
finally:
reload_module(logging)
"""
self._verifyScriptSucceeds(script)
return _impl
def test_handler_found(self):
script = """if True:
import logging
import grpc
"""
out, err = self._verifyScriptSucceeds(script)
self.assertEqual(0, len(err), 'unexpected output to stderr')
class LoggingTest(unittest.TestCase):
def test_can_configure_logger(self):
script = """if True:
import logging
import six
@isolated_logging
def test_logger_not_occupied(self):
self.assertEqual(0, len(logging.getLogger().handlers))
import grpc
@patch_stderr
@isolated_logging
def test_handler_found(self):
self.assertEqual(0, len(sys.stderr.getvalue()))
@isolated_logging
def test_can_configure_logger(self):
intended_stream = six.StringIO()
logging.basicConfig(stream=intended_stream)
self.assertEqual(1, len(logging.getLogger().handlers))
self.assertIs(logging.getLogger().handlers[0].stream, intended_stream)
intended_stream = six.StringIO()
logging.basicConfig(stream=intended_stream)
if len(logging.getLogger().handlers) != 1:
raise Exception('expected 1 logging handler')
if logging.getLogger().handlers[0].stream is not intended_stream:
raise Exception('wrong handler stream')
"""
self._verifyScriptSucceeds(script)
@isolated_logging
def test_grpc_logger(self):
self.assertIn("grpc", logging.Logger.manager.loggerDict)
root_logger = logging.getLogger("grpc")
self.assertEqual(1, len(root_logger.handlers))
self.assertIsInstance(root_logger.handlers[0], logging.NullHandler)
script = """if True:
import logging
import grpc
if "grpc" not in logging.Logger.manager.loggerDict:
raise Exception('grpc logger not found')
root_logger = logging.getLogger("grpc")
if len(root_logger.handlers) != 1:
raise Exception('expected 1 root logger handler')
if not isinstance(root_logger.handlers[0], logging.NullHandler):
raise Exception('expected logging.NullHandler')
"""
self._verifyScriptSucceeds(script)
def _verifyScriptSucceeds(self, script):
process = subprocess.Popen(
[INTERPRETER, '-c', script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
self.assertEqual(
0, process.returncode,
'process failed with exit code %d (stdout: %s, stderr: %s)' %
(process.returncode, out, err))
return out, err
if __name__ == '__main__':

@ -14,5 +14,5 @@
# GRPC contains the General RPC module.
module GRPC
VERSION = '1.18.0.dev'
VERSION = '1.19.0.dev'
end

@ -14,6 +14,6 @@
module GRPC
module Tools
VERSION = '1.18.0.dev'
VERSION = '1.19.0.dev'
end
end

@ -270,6 +270,235 @@ class HijackingInterceptorMakesAnotherCallFactory
}
};
class BidiStreamingRpcHijackingInterceptor : public experimental::Interceptor {
public:
BidiStreamingRpcHijackingInterceptor(experimental::ClientRpcInfo* info) {
info_ = info;
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
CheckMetadata(*methods->GetSendInitialMetadata(), "testkey", "testvalue");
hijack = true;
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
EchoRequest req;
auto* buffer = methods->GetSerializedSendMessage();
auto copied_buffer = *buffer;
EXPECT_TRUE(
SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
.ok());
EXPECT_EQ(req.message().find("Hello"), 0u);
msg = req.message();
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) {
// Got nothing to do here for now
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_STATUS)) {
CheckMetadata(*methods->GetRecvTrailingMetadata(), "testkey",
"testvalue");
auto* status = methods->GetRecvStatus();
EXPECT_EQ(status->ok(), true);
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) {
EchoResponse* resp =
static_cast<EchoResponse*>(methods->GetRecvMessage());
resp->set_message(msg);
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
EXPECT_EQ(static_cast<EchoResponse*>(methods->GetRecvMessage())
->message()
.find("Hello"),
0u);
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
auto* map = methods->GetRecvTrailingMetadata();
// insert the metadata that we want
EXPECT_EQ(map->size(), static_cast<unsigned>(0));
map->insert(std::make_pair("testkey", "testvalue"));
auto* status = methods->GetRecvStatus();
*status = Status(StatusCode::OK, "");
}
if (hijack) {
methods->Hijack();
} else {
methods->Proceed();
}
}
private:
experimental::ClientRpcInfo* info_;
grpc::string msg;
};
class ClientStreamingRpcHijackingInterceptor
: public experimental::Interceptor {
public:
ClientStreamingRpcHijackingInterceptor(experimental::ClientRpcInfo* info) {
info_ = info;
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
hijack = true;
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
if (++count_ > 10) {
methods->FailHijackedSendMessage();
}
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
EXPECT_FALSE(got_failed_send_);
got_failed_send_ = !methods->GetSendMessageStatus();
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
auto* status = methods->GetRecvStatus();
*status = Status(StatusCode::UNAVAILABLE, "Done sending 10 messages");
}
if (hijack) {
methods->Hijack();
} else {
methods->Proceed();
}
}
static bool GotFailedSend() { return got_failed_send_; }
private:
experimental::ClientRpcInfo* info_;
int count_ = 0;
static bool got_failed_send_;
};
bool ClientStreamingRpcHijackingInterceptor::got_failed_send_ = false;
class ClientStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
virtual experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new ClientStreamingRpcHijackingInterceptor(info);
}
};
class ServerStreamingRpcHijackingInterceptor
: public experimental::Interceptor {
public:
ServerStreamingRpcHijackingInterceptor(experimental::ClientRpcInfo* info) {
info_ = info;
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto* map = methods->GetSendInitialMetadata();
// Check that we can see the test metadata
ASSERT_EQ(map->size(), static_cast<unsigned>(1));
auto iterator = map->begin();
EXPECT_EQ("testkey", iterator->first);
EXPECT_EQ("testvalue", iterator->second);
hijack = true;
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
EchoRequest req;
auto* buffer = methods->GetSerializedSendMessage();
auto copied_buffer = *buffer;
EXPECT_TRUE(
SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
.ok());
EXPECT_EQ(req.message(), "Hello");
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) {
// Got nothing to do here for now
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_STATUS)) {
auto* map = methods->GetRecvTrailingMetadata();
bool found = false;
// Check that we received the metadata as an echo
for (const auto& pair : *map) {
found = pair.first.starts_with("testkey") &&
pair.second.starts_with("testvalue");
if (found) break;
}
EXPECT_EQ(found, true);
auto* status = methods->GetRecvStatus();
EXPECT_EQ(status->ok(), true);
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) {
if (++count_ > 10) {
methods->FailHijackedRecvMessage();
}
EchoResponse* resp =
static_cast<EchoResponse*>(methods->GetRecvMessage());
resp->set_message("Hello");
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
// Only the last message will be a failure
EXPECT_FALSE(got_failed_message_);
got_failed_message_ = methods->GetRecvMessage() == nullptr;
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
auto* map = methods->GetRecvTrailingMetadata();
// insert the metadata that we want
EXPECT_EQ(map->size(), static_cast<unsigned>(0));
map->insert(std::make_pair("testkey", "testvalue"));
auto* status = methods->GetRecvStatus();
*status = Status(StatusCode::OK, "");
}
if (hijack) {
methods->Hijack();
} else {
methods->Proceed();
}
}
static bool GotFailedMessage() { return got_failed_message_; }
private:
experimental::ClientRpcInfo* info_;
static bool got_failed_message_;
int count_ = 0;
};
bool ServerStreamingRpcHijackingInterceptor::got_failed_message_ = false;
class ServerStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
virtual experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new ServerStreamingRpcHijackingInterceptor(info);
}
};
class BidiStreamingRpcHijackingInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
virtual experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new BidiStreamingRpcHijackingInterceptor(info);
}
};
class LoggingInterceptor : public experimental::Interceptor {
public:
LoggingInterceptor(experimental::ClientRpcInfo* info) { info_ = info; }
@ -550,6 +779,62 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingTest) {
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingHijackingTest) {
ChannelArguments args;
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
std::unique_ptr<ClientStreamingRpcHijackingInterceptorFactory>(
new ClientStreamingRpcHijackingInterceptorFactory()));
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
auto stub = grpc::testing::EchoTestService::NewStub(channel);
ClientContext ctx;
EchoRequest req;
EchoResponse resp;
req.mutable_param()->set_echo_metadata(true);
req.set_message("Hello");
string expected_resp = "";
auto writer = stub->RequestStream(&ctx, &resp);
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(writer->Write(req));
expected_resp += "Hello";
}
// The interceptor will reject the 11th message
writer->Write(req);
Status s = writer->Finish();
EXPECT_EQ(s.ok(), false);
EXPECT_TRUE(ClientStreamingRpcHijackingInterceptor::GotFailedSend());
}
TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingHijackingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
std::unique_ptr<ServerStreamingRpcHijackingInterceptorFactory>(
new ServerStreamingRpcHijackingInterceptorFactory()));
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeServerStreamingCall(channel);
EXPECT_TRUE(ServerStreamingRpcHijackingInterceptor::GotFailedMessage());
}
TEST_F(ClientInterceptorsStreamingEnd2endTest, BidiStreamingHijackingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(
std::unique_ptr<BidiStreamingRpcHijackingInterceptorFactory>(
new BidiStreamingRpcHijackingInterceptorFactory()));
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeBidiStreamingCall(channel);
}
TEST_F(ClientInterceptorsStreamingEnd2endTest, BidiStreamingTest) {
ChannelArguments args;
DummyInterceptor::Reset();

@ -132,6 +132,16 @@ bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map,
return false;
}
bool CheckMetadata(const std::multimap<grpc::string, grpc::string>& map,
const string& key, const string& value) {
for (const auto& pair : map) {
if (pair.first == key && pair.second == value) {
return true;
}
}
return false;
}
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
CreateDummyClientInterceptors() {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>

@ -165,6 +165,9 @@ void MakeCallbackCall(const std::shared_ptr<Channel>& channel);
bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map,
const string& key, const string& value);
bool CheckMetadata(const std::multimap<grpc::string, grpc::string>& map,
const string& key, const string& value);
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
CreateDummyClientInterceptors();

@ -236,58 +236,7 @@ class Client {
return 0;
}
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}
void EndThreads() {
MaybeStartRequests();
threads_.clear();
}
virtual void DestroyMultithreading() = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
const auto& load = config.load_params();
std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(gpr_time_add(
now,
gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
}
}
}
bool IsClosedLoop() { return closed_loop_; }
gpr_timespec NextIssueTime(int thread_idx) {
const gpr_timespec result = next_time_[thread_idx];
@ -297,9 +246,9 @@ class Client {
GPR_TIMESPAN));
return result;
}
std::function<gpr_timespec()> NextIssuer(int thread_idx) {
return closed_loop_ ? std::function<gpr_timespec()>()
: std::bind(&Client::NextIssueTime, this, thread_idx);
bool ThreadCompleted() {
return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
class Thread {
@ -380,8 +329,62 @@ class Client {
double interval_start_time_;
};
bool ThreadCompleted() {
return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}
void EndThreads() {
MaybeStartRequests();
threads_.clear();
}
virtual void DestroyMultithreading() = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
const auto& load = config.load_params();
std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(gpr_time_add(
now,
gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
}
}
}
std::function<gpr_timespec()> NextIssuer(int thread_idx) {
return closed_loop_ ? std::function<gpr_timespec()>()
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
@ -436,6 +439,7 @@ class ClientImpl : public Client {
config.payload_config());
}
virtual ~ClientImpl() {}
const RequestType* request() { return &request_; }
void WaitForChannelsToConnect() {
int connect_deadline_seconds = 10;

@ -66,13 +66,35 @@ class CallbackClient
config, BenchmarkStubCreator) {
num_threads_ = NumThreads(config);
rpcs_done_ = 0;
SetupLoadTest(config, num_threads_);
// Don't divide the fixed load among threads as the user threads
// only bootstrap the RPCs
SetupLoadTest(config, 1);
total_outstanding_rpcs_ =
config.client_channels() * config.outstanding_rpcs_per_channel();
}
virtual ~CallbackClient() {}
/**
* The main thread of the benchmark will be waiting on DestroyMultithreading.
* Increment the rpcs_done_ variable to signify that the Callback RPC
* after thread completion is done. When the last outstanding rpc increments
* the counter it should also signal the main thread's conditional variable.
*/
void NotifyMainThreadOfThreadCompletion() {
std::lock_guard<std::mutex> l(shutdown_mu_);
rpcs_done_++;
if (rpcs_done_ == total_outstanding_rpcs_) {
shutdown_cv_.notify_one();
}
}
gpr_timespec NextRPCIssueTime() {
std::lock_guard<std::mutex> l(next_issue_time_mu_);
return Client::NextIssueTime(0);
}
protected:
size_t num_threads_;
size_t total_outstanding_rpcs_;
@ -93,24 +115,9 @@ class CallbackClient
ThreadFuncImpl(t, thread_idx);
}
virtual void ScheduleRpc(Thread* t, size_t thread_idx,
size_t ctx_vector_idx) = 0;
/**
* The main thread of the benchmark will be waiting on DestroyMultithreading.
* Increment the rpcs_done_ variable to signify that the Callback RPC
* after thread completion is done. When the last outstanding rpc increments
* the counter it should also signal the main thread's conditional variable.
*/
void NotifyMainThreadOfThreadCompletion() {
std::lock_guard<std::mutex> l(shutdown_mu_);
rpcs_done_++;
if (rpcs_done_ == total_outstanding_rpcs_) {
shutdown_cv_.notify_one();
}
}
private:
std::mutex next_issue_time_mu_; // Used by next issue time
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@ -149,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient {
bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) {
ScheduleRpc(t, thread_idx, vector_idx);
ScheduleRpc(t, vector_idx);
}
return true;
}
@ -157,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient {
void InitThreadFuncImpl(size_t thread_idx) override { return; }
private:
void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) override {
void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) {
gpr_timespec next_issue_time = NextIssueTime(thread_idx);
gpr_timespec next_issue_time = NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set(
next_issue_time, [this, t, thread_idx, vector_idx](bool ok) {
IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
next_issue_time, [this, t, vector_idx](bool ok) {
IssueUnaryCallbackRpc(t, vector_idx);
});
} else {
IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
IssueUnaryCallbackRpc(t, vector_idx);
}
}
void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) {
void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
double start = UsageTimer::Now();
ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
(&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
[this, t, thread_idx, start, vector_idx](grpc::Status s) {
[this, t, start, vector_idx](grpc::Status s) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (s.ok()) {
@ -193,17 +200,157 @@ class CallbackUnaryClient final : public CallbackClient {
ctx_[vector_idx].reset(
new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
// Schedule a new RPC
ScheduleRpc(t, thread_idx, vector_idx);
ScheduleRpc(t, vector_idx);
}
});
}
};
class CallbackStreamingClient : public CallbackClient {
public:
CallbackStreamingClient(const ClientConfig& config)
: CallbackClient(config),
messages_per_stream_(config.messages_per_stream()) {
for (int ch = 0; ch < config.client_channels(); ch++) {
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
ctx_.emplace_back(
new CallbackClientRpcContext(channels_[ch].get_stub()));
}
}
StartThreads(num_threads_);
}
~CallbackStreamingClient() {}
void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (ok) {
entry.set_value((UsageTimer::Now() - start_) * 1e9);
}
thread_ptr->UpdateHistogram(&entry);
}
int messages_per_stream() { return messages_per_stream_; }
protected:
const int messages_per_stream_;
};
class CallbackStreamingPingPongClient : public CallbackStreamingClient {
public:
CallbackStreamingPingPongClient(const ClientConfig& config)
: CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient() {}
};
class CallbackStreamingPingPongReactor final
: public grpc::experimental::ClientBidiReactor<SimpleRequest,
SimpleResponse> {
public:
CallbackStreamingPingPongReactor(
CallbackStreamingPingPongClient* client,
std::unique_ptr<CallbackClientRpcContext> ctx)
: client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
void StartNewRpc() {
if (client_->ThreadCompleted()) return;
start_ = UsageTimer::Now();
ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
StartWrite(client_->request());
StartCall();
}
void OnWriteDone(bool ok) override {
if (!ok || client_->ThreadCompleted()) {
if (!ok) gpr_log(GPR_ERROR, "Error writing RPC");
StartWritesDone();
return;
}
StartRead(&ctx_->response_);
}
void OnReadDone(bool ok) override {
client_->AddHistogramEntry(start_, ok, thread_ptr_);
if (client_->ThreadCompleted() || !ok ||
(client_->messages_per_stream() != 0 &&
++messages_issued_ >= client_->messages_per_stream())) {
if (!ok) {
gpr_log(GPR_ERROR, "Error reading RPC");
}
StartWritesDone();
return;
}
StartWrite(client_->request());
}
void OnDone(const Status& s) override {
if (client_->ThreadCompleted() || !s.ok()) {
client_->NotifyMainThreadOfThreadCompletion();
return;
}
ctx_.reset(new CallbackClientRpcContext(ctx_->stub_));
ScheduleRpc();
}
void ScheduleRpc() {
if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) {
gpr_timespec next_issue_time = client_->NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_->alarm_.experimental().Set(next_issue_time,
[this](bool ok) { StartNewRpc(); });
} else {
StartNewRpc();
}
}
void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_;
Client::Thread* thread_ptr_; // Needed to update histogram entries
double start_; // Track message start time
int messages_issued_; // Messages issued by this stream
};
class CallbackStreamingPingPongClientImpl final
: public CallbackStreamingPingPongClient {
public:
CallbackStreamingPingPongClientImpl(const ClientConfig& config)
: CallbackStreamingPingPongClient(config) {
for (size_t i = 0; i < total_outstanding_rpcs_; i++)
reactor_.emplace_back(
new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
}
~CallbackStreamingPingPongClientImpl() {}
bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) {
reactor_[vector_idx]->set_thread_ptr(t);
reactor_[vector_idx]->ScheduleRpc();
}
return true;
}
void InitThreadFuncImpl(size_t thread_idx) override {}
private:
std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
};
// TODO(mhaidry) : Implement Streaming from client, server and both ways
std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
switch (config.rpc_type()) {
case UNARY:
return std::unique_ptr<Client>(new CallbackUnaryClient(config));
case STREAMING:
return std::unique_ptr<Client>(
new CallbackStreamingPingPongClientImpl(config));
case STREAMING_FROM_CLIENT:
case STREAMING_FROM_SERVER:
case STREAMING_BOTH_WAYS:

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/tools/distrib/python/grpcio_tools/grpc_version.py.template`!!!
VERSION = '1.18.0.dev0'
VERSION = '1.19.0.dev0'

@ -30,4 +30,4 @@ cd /var/local/git/grpc
rvm --default use ruby-2.5
# build Ruby interop client and server
(cd src/ruby && gem update bundler && bundle && rake compile)
(cd src/ruby && gem install bundler -v 1.17.3 && bundle && rake compile)

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.18.0-dev
PROJECT_NUMBER = 1.19.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.18.0-dev
PROJECT_NUMBER = 1.19.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

Loading…
Cancel
Save