diff --git a/.travis.yml b/.travis.yml
index 4cdad37c6c5..94bf382b25e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,33 +8,37 @@ env:
- TEST=objc
- JOBS=1
matrix:
- - SCHEME="RxLibraryUnitTests" WORKSPACE="Tests.xcworkspace"
- TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
+ - SCHEME="RxLibraryUnitTests"
+ WORKSPACE="Tests.xcworkspace" TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
INTEROP_SERVER="false"
- - SCHEME="InteropTestsLocalSSL" WORKSPACE="Tests.xcworkspace"
- TEST_PATH="src/objective-c/tests" BUILD_ONLY="false" INTEROP_SERVER="true"
- - SCHEME="InteropTestsLocalCleartext" WORKSPACE="Tests.xcworkspace"
- TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
+ - SCHEME="InteropTestsLocalSSL"
+ WORKSPACE="Tests.xcworkspace" TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
+ INTEROP_SERVER="true"
+ - SCHEME="InteropTestsLocalCleartext"
+ WORKSPACE="Tests.xcworkspace" TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
INTEROP_SERVER="true"
# TODO(jcanizales): Make tests an app project (instead of library), so the following will work.
- # - SCHEME="InteropTestsRemote" WORKSPACE="Tests.xcworkspace"
- # TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
+ # - SCHEME="InteropTestsRemote"
+ # WORKSPACE="Tests.xcworkspace" TEST_PATH="src/objective-c/tests" BUILD_ONLY="false"
# INTEROP_SERVER="true"
- - SCHEME="HelloWorld" WORKSPACE="HelloWorld.xcworkspace"
- TEST_PATH="examples/objective-c/helloworld" BUILD_ONLY="true"
- INTEROP_SERVER="false"
- - SCHEME="RouteGuideClient" WORKSPACE="RouteGuideClient.xcworkspace"
- TEST_PATH="examples/objective-c/route_guide" BUILD_ONLY="true"
- INTEROP_SERVER="false"
- - SCHEME="AuthSample" WORKSPACE="AuthSample.xcworkspace"
- TEST_PATH="examples/objective-c/auth_sample" BUILD_ONLY="true"
- INTEROP_SERVER="false"
- - SCHEME="Sample" WORKSPACE="Sample.xcworkspace"
- TEST_PATH="src/objective-c/examples/Sample" BUILD_ONLY="true"
- INTEROP_SERVER="false"
- - SCHEME="SwiftSample" WORKSPACE="SwiftSample.xcworkspace"
- TEST_PATH="src/objective-c/examples/SwiftSample" BUILD_ONLY="true"
+ - SCHEME="HelloWorld"
+ WORKSPACE="HelloWorld.xcworkspace" TEST_PATH="examples/objective-c/helloworld"
+ BUILD_ONLY="true" INTEROP_SERVER="false"
+ - SCHEME="RouteGuideClient"
+ WORKSPACE="RouteGuideClient.xcworkspace" TEST_PATH="examples/objective-c/route_guide"
+ BUILD_ONLY="true" INTEROP_SERVER="false"
+ - SCHEME="AuthSample"
+ WORKSPACE="AuthSample.xcworkspace" TEST_PATH="examples/objective-c/auth_sample"
+ BUILD_ONLY="true" INTEROP_SERVER="false"
+ - SCHEME="Sample"
+ WORKSPACE="Sample.xcworkspace" TEST_PATH="src/objective-c/examples/Sample" BUILD_ONLY="true"
INTEROP_SERVER="false"
+ - SCHEME="Sample"
+ WORKSPACE="Sample.xcworkspace" TEST_PATH="src/objective-c/examples/Sample" BUILD_ONLY="true"
+ INTEROP_SERVER="false" FRAMEWORKS="YES"
+ - SCHEME="SwiftSample"
+ WORKSPACE="SwiftSample.xcworkspace" TEST_PATH="src/objective-c/examples/SwiftSample"
+ BUILD_ONLY="true" INTEROP_SERVER="false"
before_install:
# Until Travis upgrades from Cocoapods 0.39, we need to do it here.
- pod --version
diff --git a/Makefile b/Makefile
index 992eee102b8..ed362f9752b 100644
--- a/Makefile
+++ b/Makefile
@@ -991,7 +991,6 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test
uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test
-workqueue_test: $(BINDIR)/$(CONFIG)/workqueue_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
@@ -1295,7 +1294,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
$(BINDIR)/$(CONFIG)/uri_parser_test \
- $(BINDIR)/$(CONFIG)/workqueue_test \
$(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \
$(BINDIR)/$(CONFIG)/badreq_bad_client_test \
$(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \
@@ -1674,8 +1672,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 )
$(E) "[RUN] Testing uri_parser_test"
$(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 )
- $(E) "[RUN] Testing workqueue_test"
- $(Q) $(BINDIR)/$(CONFIG)/workqueue_test || ( echo test workqueue_test failed ; exit 1 )
$(E) "[RUN] Testing public_headers_must_be_c89"
$(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 )
$(E) "[RUN] Testing badreq_bad_client_test"
@@ -10175,38 +10171,6 @@ endif
endif
-WORKQUEUE_TEST_SRC = \
- test/core/iomgr/workqueue_test.c \
-
-WORKQUEUE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WORKQUEUE_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/workqueue_test: openssl_dep_error
-
-else
-
-
-
-$(BINDIR)/$(CONFIG)/workqueue_test: $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LD) $(LDFLAGS) $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/workqueue_test
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/iomgr/workqueue_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_workqueue_test: $(WORKQUEUE_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(WORKQUEUE_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
ALARM_CPP_TEST_SRC = \
test/cpp/common/alarm_cpp_test.cc \
diff --git a/build.yaml b/build.yaml
index c5d92c1e63a..42ea3cb810b 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2430,20 +2430,6 @@ targets:
- grpc
- gpr_test_util
- gpr
-- name: workqueue_test
- build: test
- language: c
- src:
- - test/core/iomgr/workqueue_test.c
- deps:
- - grpc_test_util
- - grpc
- - gpr_test_util
- - gpr
- platforms:
- - mac
- - linux
- - posix
- name: alarm_cpp_test
gtest: true
build: test
@@ -3367,6 +3353,7 @@ php_config_m4:
- src/php/ext/grpc/channel.h
- src/php/ext/grpc/channel_credentials.h
- src/php/ext/grpc/completion_queue.h
+ - src/php/ext/grpc/php7_wrapper.h
- src/php/ext/grpc/php_grpc.h
- src/php/ext/grpc/server.h
- src/php/ext/grpc/server_credentials.h
diff --git a/composer.json b/composer.json
index 0ebe0a11084..78366208edb 100644
--- a/composer.json
+++ b/composer.json
@@ -7,7 +7,7 @@
"license": "BSD-3-Clause",
"require": {
"php": ">=5.5.0",
- "stanley-cheung/protobuf-php": "dev-master"
+ "stanley-cheung/protobuf-php": "v0.6"
},
"require-dev": {
"google/auth": "v0.9"
diff --git a/doc/.gitignore b/doc/.gitignore
new file mode 100644
index 00000000000..95464d3e664
--- /dev/null
+++ b/doc/.gitignore
@@ -0,0 +1,2 @@
+build/
+src/
diff --git a/examples/cpp/helloworld/Makefile b/examples/cpp/helloworld/Makefile
index 67a16f2585b..bfb78e0d417 100644
--- a/examples/cpp/helloworld/Makefile
+++ b/examples/cpp/helloworld/Makefile
@@ -32,7 +32,9 @@
CXX = g++
CPPFLAGS += -I/usr/local/include -pthread
CXXFLAGS += -std=c++11
-LDFLAGS += -L/usr/local/lib `pkg-config --libs grpc++ grpc` -lprotobuf -lpthread -ldl
+LDFLAGS += -L/usr/local/lib `pkg-config --libs grpc++ grpc` \
+ -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed \
+ -lprotobuf -lpthread -ldl
PROTOC = protoc
GRPC_CPP_PLUGIN = grpc_cpp_plugin
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
diff --git a/examples/cpp/route_guide/Makefile b/examples/cpp/route_guide/Makefile
index df40b04f32d..3ce089ce607 100644
--- a/examples/cpp/route_guide/Makefile
+++ b/examples/cpp/route_guide/Makefile
@@ -32,7 +32,9 @@
CXX = g++
CPPFLAGS += -I/usr/local/include -pthread
CXXFLAGS += -std=c++11
-LDFLAGS += -L/usr/local/lib `pkg-config --libs grpc++` -lprotobuf -lpthread -ldl
+LDFLAGS += -L/usr/local/lib `pkg-config --libs grpc++` \
+ -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed \
+ -lprotobuf -lpthread -ldl
PROTOC = protoc
GRPC_CPP_PLUGIN = grpc_cpp_plugin
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
diff --git a/examples/objective-c/auth_sample/Misc/GoogleService-Info.plist b/examples/objective-c/auth_sample/Misc/GoogleService-Info.plist
index 86909d84a31..ff225074238 100644
--- a/examples/objective-c/auth_sample/Misc/GoogleService-Info.plist
+++ b/examples/objective-c/auth_sample/Misc/GoogleService-Info.plist
@@ -2,9 +2,39 @@
+ AD_UNIT_ID_FOR_BANNER_TEST
+ redacted
+ AD_UNIT_ID_FOR_INTERSTITIAL_TEST
+ redacted
CLIENT_ID
15087385131-lh9bpkiai9nls53uadju0if6k7un3uih.apps.googleusercontent.com
REVERSED_CLIENT_ID
com.googleusercontent.apps.15087385131-lh9bpkiai9nls53uadju0if6k7un3uih
+ API_KEY
+ redacted
+ GCM_SENDER_ID
+ redacted
+ PLIST_VERSION
+ 1
+ BUNDLE_ID
+ io.grpc.AuthSample
+ PROJECT_ID
+ grpc-authsample
+ STORAGE_BUCKET
+ grpc-authsample.appspot.com
+ IS_ADS_ENABLED
+
+ IS_ANALYTICS_ENABLED
+
+ IS_APPINVITE_ENABLED
+
+ IS_GCM_ENABLED
+
+ IS_SIGNIN_ENABLED
+
+ GOOGLE_APP_ID
+ 1:15087385131:ios:d547168abe3c362f
+ DATABASE_URL
+ https://grpc-authsample.firebaseio.com
-
\ No newline at end of file
+
diff --git a/examples/php/composer.json b/examples/php/composer.json
index a8b790b1de1..d40b5db059e 100644
--- a/examples/php/composer.json
+++ b/examples/php/composer.json
@@ -3,6 +3,6 @@
"description": "gRPC example for PHP",
"minimum-stability": "dev",
"require": {
- "grpc/grpc": "v0.15.0"
+ "grpc/grpc": "v0.15.2"
}
}
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index e96d224ddbe..70533aa4d9f 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -330,6 +330,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface {
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -345,6 +348,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@@ -363,6 +369,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@@ -400,6 +409,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface {
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -409,6 +421,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface {
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@@ -421,6 +436,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@@ -459,6 +477,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@@ -474,6 +495,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@@ -486,6 +510,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 47ac5bee925..544dace32b9 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -126,6 +126,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_buf_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
@@ -135,6 +138,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_buf_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@@ -153,6 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_buf_.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index fab85d15176..dfac177970a 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -180,17 +180,23 @@ class CallNoOp {
class CallOpSendInitialMetadata {
public:
- CallOpSendInitialMetadata() : send_(false) {}
+ CallOpSendInitialMetadata() : send_(false) {
+ maybe_compression_level_.is_set = false;
+ }
void SendInitialMetadata(
const std::multimap& metadata,
uint32_t flags) {
+ maybe_compression_level_.is_set = false;
send_ = true;
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
- // TODO(dgq): expose compression level in API so it can be properly set.
- maybe_compression_level_.is_set = false;
+ }
+
+ void set_compression_level(grpc_compression_level level) {
+ maybe_compression_level_.is_set = true;
+ maybe_compression_level_.level = level;
}
protected:
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
index 21ac6c4fb55..2f4be644bae 100644
--- a/include/grpc++/impl/codegen/method_handler_impl.h
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -65,6 +65,9 @@ class RpcMethodHandler : public MethodHandler {
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
+ if (param.server_context->compression_level_set()) {
+ ops.set_compression_level(param.server_context->compression_level());
+ }
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@@ -104,6 +107,9 @@ class ClientStreamingHandler : public MethodHandler {
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
+ if (param.server_context->compression_level_set()) {
+ ops.set_compression_level(param.server_context->compression_level());
+ }
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@@ -144,6 +150,9 @@ class ServerStreamingHandler : public MethodHandler {
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
+ if (param.server_context->compression_level_set()) {
+ ops.set_compression_level(param.server_context->compression_level());
+ }
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@@ -177,6 +186,9 @@ class BidiStreamingHandler : public MethodHandler {
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
+ if (param.server_context->compression_level_set()) {
+ ops.set_compression_level(param.server_context->compression_level());
+ }
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@@ -199,6 +211,9 @@ class UnknownMethodHandler : public MethodHandler {
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
+ if (context->compression_level_set()) {
+ ops->set_compression_level(context->compression_level());
+ }
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index cea13a513f6..08212af861b 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -130,7 +130,13 @@ class ServerContext {
grpc_compression_level compression_level() const {
return compression_level_;
}
- void set_compression_level(grpc_compression_level level);
+
+ void set_compression_level(grpc_compression_level level) {
+ compression_level_set_ = true;
+ compression_level_ = level;
+ }
+
+ bool compression_level_set() const { return compression_level_set_; }
grpc_compression_algorithm compression_algorithm() const {
return compression_algorithm_;
@@ -217,6 +223,7 @@ class ServerContext {
std::multimap initial_metadata_;
std::multimap trailing_metadata_;
+ bool compression_level_set_;
grpc_compression_level compression_level_;
grpc_compression_algorithm compression_algorithm_;
};
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index cbfa4106995..b2b972760db 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -347,6 +347,9 @@ class ServerReader GRPC_FINAL : public ReaderInterface {
CallOpSet ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -375,6 +378,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface {
CallOpSet ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -389,6 +395,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface {
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
@@ -413,6 +422,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface,
CallOpSet ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@@ -434,6 +446,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface,
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops.set_compression_level(ctx_->compression_level());
+ }
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 7a8858ef194..6876961e210 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -179,10 +179,13 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
grpc::mutex mu_;
bool started_;
bool shutdown_;
+ bool shutdown_notified_;
// The number of threads which are running callbacks.
int num_running_cb_;
grpc::condition_variable callback_cv_;
+ grpc::condition_variable shutdown_cv_;
+
std::shared_ptr global_callbacks_;
std::list* sync_methods_;
diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h
index 9065d1edd02..3034182d4c5 100644
--- a/include/grpc/impl/codegen/compression_types.h
+++ b/include/grpc/impl/codegen/compression_types.h
@@ -46,12 +46,27 @@ extern "C" {
#define GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \
"grpc-internal-encoding-request"
-/** To be used in channel arguments */
+/** To be used in channel arguments.
+ *
+ * \addtogroup grpc_arg_keys
+ * \{ */
+/** Default compression algorithm for the channel.
+ * Its value is an int from the \a grpc_compression_algorithm enum. */
#define GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \
"grpc.default_compression_algorithm"
+/** Default compression level for the channel.
+ * Its value is an int from the \a grpc_compression_level enum. */
#define GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL "grpc.default_compression_level"
+/** Compression algorithms supported by the channel.
+ * Its value is a bitset (an int). Bits correspond to algorithms in \a
+ * grpc_compression_algorithm. For example, its LSB corresponds to
+ * GRPC_COMPRESS_NONE, the next bit to GRPC_COMPRESS_DEFLATE, etc.
+ * Unset bits disable support for the algorithm. By default all algorithms are
+ * supported. It's not possible to disable GRPC_COMPRESS_NONE (the attempt will
+ * be ignored). */
#define GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET \
"grpc.compression_enabled_algorithms_bitset"
+/** \} */
/* The various compression algorithms supported by gRPC */
typedef enum {
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index c0ed1395066..e5a82883be5 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -106,58 +106,66 @@ typedef struct {
by grpc_arg; keys are strings to allow easy backwards-compatible extension
by arbitrary parties.
All evaluation is performed at channel creation time (i.e. the values in
- this structure need only live through the creation invocation). */
+ this structure need only live through the creation invocation).
+
+ See the description of the \ref grpc_arg_keys "available args" for more
+ details. */
typedef struct {
size_t num_args;
grpc_arg *args;
} grpc_channel_args;
-/* Channel argument keys: */
-/** Enable census for tracing and stats collection */
+/** \defgroup grpc_arg_keys
+ * Channel argument keys.
+ * \{
+ */
+/** If non-zero, enable census for tracing and stats collection. */
#define GRPC_ARG_ENABLE_CENSUS "grpc.census"
-/** Enable load reporting */
+/** If non-zero, enable load reporting. */
#define GRPC_ARG_ENABLE_LOAD_REPORTING "grpc.loadreporting"
/** Maximum number of concurrent incoming streams to allow on a http2
- connection */
+ connection. Int valued. */
#define GRPC_ARG_MAX_CONCURRENT_STREAMS "grpc.max_concurrent_streams"
-/** Maximum message length that the channel can receive */
+/** Maximum message length that the channel can receive. Int valued, bytes. */
#define GRPC_ARG_MAX_MESSAGE_LENGTH "grpc.max_message_length"
-/** Initial sequence number for http2 transports */
+/** Initial sequence number for http2 transports. Int valued. */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
"grpc.http2.initial_sequence_number"
/** Amount to read ahead on individual streams. Defaults to 64kb, larger
values can help throughput on high-latency connections.
NOTE: at some point we'd like to auto-tune this, and this parameter
- will become a no-op. */
+ will become a no-op. Int valued, bytes. */
#define GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES "grpc.http2.lookahead_bytes"
-/** How much memory to use for hpack decoding */
+/** How much memory to use for hpack decoding. Int valued, bytes. */
#define GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER \
"grpc.http2.hpack_table_size.decoder"
-/** How much memory to use for hpack encoding */
+/** How much memory to use for hpack encoding. Int valued, bytes. */
#define GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER \
"grpc.http2.hpack_table_size.encoder"
-/** Default authority to pass if none specified on call construction */
+/** Default authority to pass if none specified on call construction. A string.
+ * */
#define GRPC_ARG_DEFAULT_AUTHORITY "grpc.default_authority"
/** Primary user agent: goes at the start of the user-agent metadata
- sent on each request */
+ sent on each request. A string. */
#define GRPC_ARG_PRIMARY_USER_AGENT_STRING "grpc.primary_user_agent"
/** Secondary user agent: goes at the end of the user-agent metadata
- sent on each request */
+ sent on each request. A string. */
#define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent"
/** The maximum time between subsequent connection attempts, in ms */
#define GRPC_ARG_MAX_RECONNECT_BACKOFF_MS "grpc.max_reconnect_backoff_ms"
/* The caller of the secure_channel_create functions may override the target
name used for SSL host name checking using this channel argument which is of
- type GRPC_ARG_STRING. This *should* be used for testing only.
+ type \a GRPC_ARG_STRING. This *should* be used for testing only.
If this argument is not specified, the name used for SSL host name checking
will be the target parameter (assuming that the secure channel is an SSL
channel). If this parameter is specified and the underlying is not an SSL
channel, it will just be ignored. */
#define GRPC_SSL_TARGET_NAME_OVERRIDE_ARG "grpc.ssl_target_name_override"
-/* Maximum metadata size */
+/* Maximum metadata size, in bytes. */
#define GRPC_ARG_MAX_METADATA_SIZE "grpc.max_metadata_size"
/** If non-zero, allow the use of SO_REUSEPORT if it's available (default 1) */
#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
+/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a
particular operation, the grpc_call_error returned will be GRPC_CALL_OK.
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index 3ad665a7a29..7c67bad5ae1 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -119,7 +119,7 @@
// libraries; it should be integrated with the `__linux__` definitions below.
#define GPR_PLATFORM_STRING "manylinux"
#define GPR_POSIX_CRASH_HANDLER 1
-#define GPR_CPU_LINUX 1
+#define GPR_CPU_POSIX 1
#define GPR_GCC_ATOMIC 1
#define GPR_GCC_TLS 1
#define GPR_LINUX 1
diff --git a/include/grpc/module.modulemap b/include/grpc/module.modulemap
index ae11a78b74a..02fb2f39b20 100644
--- a/include/grpc/module.modulemap
+++ b/include/grpc/module.modulemap
@@ -1,5 +1,15 @@
framework module grpc {
umbrella header "grpc.h"
+
+ header "byte_buffer_reader.h"
+ header "grpc_security.h"
+ header "grpc_security_constants.h"
+ header "impl/codegen/alloc.h"
+ header "impl/codegen/byte_buffer_reader.h"
+ header "support/alloc.h"
+ header "support/port_platform.h"
+ header "support/string_util.h"
+
export *
module * { export * }
}
diff --git a/package.xml b/package.xml
index c5006dade57..65764e88889 100644
--- a/package.xml
+++ b/package.xml
@@ -10,7 +10,7 @@
grpc-packages@google.com
yes
- 2016-07-13
+ 2016-07-28
1.0.0
@@ -22,8 +22,7 @@
BSD
-- GA release
-- Fix shutdown hang problem #4017
+- PHP7 Support continued, reduce code duplication #7543
@@ -47,6 +46,7 @@
+
@@ -1087,8 +1087,8 @@ Update to wrap gRPC C Core version 0.10.0
- 1.0.0
- 1.0.0
+ 1.0.0RC1
+ 1.0.0RC1
stable
@@ -1101,5 +1101,35 @@ Update to wrap gRPC C Core version 0.10.0
- Fix shutdown hang problem #4017
+
+
+ 1.0.0RC2
+ 1.0.0RC2
+
+
+ stable
+ stable
+
+ 2016-07-21
+ BSD
+
+- PHP7 Support #7464
+
+
+
+
+ 1.0.0RC3
+ 1.0.0RC3
+
+
+ stable
+ stable
+
+ 2016-07-28
+ BSD
+
+- PHP7 Support continued, reduce code duplication #7543
+
+
diff --git a/setup.cfg b/setup.cfg
index 7194716f61b..dd9161ca8b9 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -9,5 +9,5 @@ build_base=python_build
[build_ext]
inplace=1
-[build_proto_modules]
+[build_package_protos]
exclude=.*protoc_plugin/protoc_plugin_test\.proto$
diff --git a/setup.py b/setup.py
index 24414457cd5..b43ec9ae3d4 100644
--- a/setup.py
+++ b/setup.py
@@ -30,10 +30,12 @@
"""A setup module for the GRPC Python package."""
from distutils import extension as _extension
+from distutils import util
import os
import os.path
import pkg_resources
import platform
+import re
import shlex
import shutil
import sys
@@ -133,6 +135,10 @@ if 'darwin' in sys.platform and PY3:
if mac_target and (pkg_resources.parse_version(mac_target) <
pkg_resources.parse_version('10.7.0')):
os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.7'
+ os.environ['_PYTHON_HOST_PLATFORM'] = re.sub(
+ r'macosx-[0-9]+\.[0-9]+-(.+)',
+ r'macosx-10.7-\1',
+ util.get_platform())
def cython_extensions(module_names, extra_sources, include_dirs,
diff --git a/src/compiler/ruby_generator_helpers-inl.h b/src/compiler/ruby_generator_helpers-inl.h
index ff6939ed9fe..aa3c22fea6d 100644
--- a/src/compiler/ruby_generator_helpers-inl.h
+++ b/src/compiler/ruby_generator_helpers-inl.h
@@ -48,7 +48,7 @@ inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file,
file->name().find_last_of(".proto") == file->name().size() - 1) {
*file_name_or_error =
file->name().substr(0, file->name().size() - proto_suffix_length) +
- "_services.rb";
+ "_services_pb.rb";
return true;
} else {
*file_name_or_error = "Invalid proto file name: must end with .proto";
@@ -58,7 +58,7 @@ inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file,
inline grpc::string MessagesRequireName(
const grpc::protobuf::FileDescriptor *file) {
- return Replace(file->name(), ".proto", "");
+ return Replace(file->name(), ".proto", "_pb");
}
// Get leading or trailing comments in a string. Comment lines start with "# ".
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 721ba82d8f7..9acacbd92d4 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
connector *c = arg;
grpc_closure *notify;
gpr_mu_lock(&c->mu);
+ grpc_error *error = GRPC_ERROR_NONE;
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
gpr_mu_unlock(&c->mu);
} else if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status);
+ error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"),
+ GRPC_ERROR_INT_SECURITY_STATUS, status);
memset(c->result, 0, sizeof(*c->result));
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
@@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index bd87253ed32..7d5279b9da4 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -36,11 +36,14 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/metadata.h"
+extern int grpc_http_write_state_trace;
+
void grpc_chttp2_plugin_init(void) {
grpc_chttp2_base64_encode_and_huffman_compress =
grpc_chttp2_base64_encode_and_huffman_compress_impl;
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+ grpc_register_tracer("http_write_state", &grpc_http_write_state_trace);
}
void grpc_chttp2_plugin_shutdown(void) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index e2dd463a773..d050467a020 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -60,9 +61,9 @@
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
-
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
+int grpc_http_write_state_trace = 0;
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
@@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error);
/** Set a transport level setting, and push it to our peer */
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value);
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
@@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
+/*#define REFCOUNTING_DEBUG 1*/
#ifdef REFCOUNTING_DEBUG
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
@@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
- grpc_endpoint *ep, uint8_t is_client) {
+ grpc_endpoint *ep, bool is_client) {
size_t i;
int j;
@@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_add(
&t->global.qbuf,
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* configure http2 the way we like it */
if (is_client) {
- push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ DEFAULT_WINDOW);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
if (channel_args) {
@@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_MAX_METADATA_SIZE);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
}
@@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p close transport", t);
+ }
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
@@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_trailing_metadata);
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
- GRPC_ERROR_UNREF(s->global.removal_error);
+ GRPC_ERROR_UNREF(s->global.read_closed_error);
+ GRPC_ERROR_UNREF(s->global.write_closed_error);
UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
* LOCK MANAGEMENT
*/
+static const char *write_state_name(grpc_chttp2_write_state state) {
+ switch (state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ return "INACTIVE";
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ return "REQUESTED[p=0]";
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ return "REQUESTED[p=1]";
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ return "SCHEDULED";
+ case GRPC_CHTTP2_WRITING:
+ return "WRITING";
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ return "WRITING[p=1]";
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ return "WRITING[p=0]";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_write_state(grpc_chttp2_transport *t,
+ grpc_chttp2_write_state state, const char *reason) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t,
+ write_state_name(t->executor.write_state), write_state_name(state),
+ reason);
+ }
+ t->executor.write_state = state;
+}
+
static void finish_global_actions(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_executor_action_header *hdr;
@@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("finish_global_actions", 0);
for (;;) {
- if (!t->executor.writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
- t->executor.writing_active = 1;
- REF_TRANSPORT(t, "writing");
- prevent_endpoint_shutdown(t);
- grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
- }
check_read_ops(exec_ctx, &t->global);
gpr_mu_lock(&t->executor.mu);
@@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
continue;
} else {
t->executor.global_active = false;
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
+ REF_TRANSPORT(t, "initiate_writing");
+ gpr_mu_unlock(&t->executor.mu);
+ grpc_exec_ctx_sched(
+ exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
+ t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ start_writing(exec_ctx, t);
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITING:
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
}
- gpr_mu_unlock(&t->executor.mu);
break;
}
@@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
* OUTPUT PROCESSING
*/
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason) {
+ /* Perform state checks, and transition to a scheduled state if appropriate.
+ Each time we finish the global lock execution, we check if we need to
+ write. If we do:
+ - (if there is a poller surrounding the write) schedule
+ initiate_writing, which locks and calls initiate_writing_locked to...
+ - call start_writing, which verifies (under the global lock) that there
+ are things that need to be written by calling
+ grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
+ against the current exec_ctx, to be executed OUTSIDE of the global lock
+ - eventually writing_action results in grpc_chttp2_terminate_writing being
+ called, which re-takes the global lock, updates state, checks if we need
+ to do *another* write immediately, and if so loops back to
+ start_writing.
+
+ Current problems:
+ - too much lock entry/exiting
+ - the writing thread can become stuck indefinitely (punt through the
+ workqueue periodically to fix) */
+
+ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ set_write_state(t, covered_by_poller
+ ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
+ : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ }
+ break;
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ /* nothing to do: write already scheduled */
+ break;
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t,
+ covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
+ : GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason);
+ }
+ break;
+ }
+}
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
+ t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ if (!t->closed &&
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
+ set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
+ REF_TRANSPORT(t, "writing");
+ prevent_endpoint_shutdown(t);
+ grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
+ } else {
+ if (t->closed) {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:transport_closed");
+ } else {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:nothing_to_write");
+ }
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
+ }
+}
+
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg_ignored) {
+ start_writing(exec_ctx, t);
+ UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
+}
+
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
+ NULL, 0);
+}
+
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller,
+ reason);
}
}
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value) {
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
@@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->global.dirtied_local_settings = 1;
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting");
}
}
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error) {
+ grpc_chttp2_stream_global *stream_global;
+ while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
+ &stream_global)) {
+ fail_pending_writes(exec_ctx, &t->global, stream_global,
+ GRPC_ERROR_REF(error));
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_ignored,
@@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- grpc_chttp2_stream_global *stream_global;
- while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
- &stream_global)) {
- fail_pending_writes(exec_ctx, &t->global, stream_global,
- GRPC_ERROR_REF(error));
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ end_waiting_for_write(exec_ctx, t, error);
+
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ GPR_UNREACHABLE_CODE(break);
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ "terminate_writing");
+ break;
}
- /* leave the writing flag up on shutdown to prevent further writes in
- unlock()
- from starting */
- t->executor.writing_active = 0;
if (t->ep && !t->endpoint_reading) {
destroy_endpoint(exec_ctx, t);
}
UNREF_TRANSPORT(exec_ctx, t, "writing");
- GRPC_ERROR_UNREF(error);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
@@ -878,7 +1059,8 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_initial_metadata");
}
} else {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished,
@@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
stream_global->send_message = op->send_message;
if (stream_global->id != 0) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_message");
}
}
}
@@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_global->write_closed) {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished,
@@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_trailing_metadata");
}
}
}
@@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
(stream_global->incoming_frames.head == NULL ||
stream_global->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(
- transport_global, stream_global, transport_global->stream_lookahead,
- 0);
+ exec_ctx, transport_global, stream_global,
+ transport_global->stream_lookahead, 0);
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof(*op));
}
-static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
p->next = &t->global.pings;
p->prev = p->next->prev;
@@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff);
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
close_transport = grpc_chttp2_has_streams(t)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("GOAWAY sent");
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent");
}
if (op->set_accept_stream) {
@@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(t, op->send_ping);
+ send_ping_locked(exec_ctx, t, op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
&stream_global->stats.outgoing));
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "rst_stream");
}
const char *msg =
@@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
+static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
+ if (error == GRPC_ERROR_NONE) return;
+ for (size_t i = 0; i < *nrefs; i++) {
+ if (error == refs[i]) {
+ return;
+ }
+ }
+ refs[*nrefs] = error;
+ ++*nrefs;
+}
+
+static grpc_error *removal_error(grpc_error *extra_error,
+ grpc_chttp2_stream_global *stream_global) {
+ grpc_error *refs[3];
+ size_t nrefs = 0;
+ add_error(stream_global->read_closed_error, refs, &nrefs);
+ add_error(stream_global->write_closed_error, refs, &nrefs);
+ add_error(extra_error, refs, &nrefs);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (nrefs > 0) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs,
+ nrefs);
+ }
+ GRPC_ERROR_UNREF(extra_error);
+ return error;
+}
+
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error) {
+ error = removal_error(error, stream_global);
+ stream_global->send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error));
@@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed(
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
+ stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
stream_global->published_initial_metadata = true;
stream_global->published_trailing_metadata = true;
decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
+ stream_global->write_closed_error = GRPC_ERROR_REF(error);
stream_global->write_closed = true;
- if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
+ if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state !=
+ GRPC_CHTTP2_WRITING_INACTIVE) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
@@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed(
}
}
if (stream_global->read_closed && stream_global->write_closed) {
- stream_global->removal_error = GRPC_ERROR_REF(error);
if (stream_global->id != 0 &&
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
@@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed(
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
- stream_global->id, GRPC_ERROR_REF(error));
+ stream_global->id,
+ removal_error(GRPC_ERROR_REF(error), stream_global));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, error);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "close_from_api");
}
typedef struct {
@@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
/** update window from a settings change */
+typedef struct {
+ grpc_chttp2_transport *t;
+ grpc_exec_ctx *exec_ctx;
+} update_global_window_args;
+
static void update_global_window(void *args, uint32_t id, void *stream) {
- grpc_chttp2_transport *t = args;
+ update_global_window_args *a = args;
+ grpc_chttp2_transport *t = a->t;
grpc_chttp2_stream *s = stream;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
@@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global,
+ true, "update_global_window");
}
}
@@ -1801,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
- gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ if (t->parsing.qbuf.count > 0) {
+ gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "parsing_qbuf");
+ }
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
transport_global->concurrent_stream_count =
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (transport_parsing->initial_window_update != 0) {
+ update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
+ update_global_window, &args);
transport_parsing->initial_window_update = 0;
}
/* handle higher level things */
@@ -1831,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
- GRPC_ERROR_REF(stream_global->removal_error));
+ removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1854,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
}
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- if (!t->executor.writing_active && t->ep) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe as we still have a ref for read */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
+ write_state_name(t->executor.write_state));
+ }
+ if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
+ destroy_endpoint(exec_ctx, t);
}
} else if (!t->closed) {
keep_reading = true;
@@ -1942,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
@@ -1977,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "read_incoming_stream");
}
}
@@ -1999,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(
- transport_global, stream_global, arg->max_size_hint, bs->slices.length);
+ incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
+ stream_global, arg->max_size_hint,
+ bs->slices.length);
}
if (bs->slices.count > 0) {
*arg->slice = gpr_slice_buffer_take_first(&bs->slices);
@@ -2184,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
if (context == NULL) {
*scope = NULL;
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2197,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_free(tmp);
}
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2230,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
tmp_phase = gpr_leftpad(phase, ' ', 8);
tmp_scope1 = gpr_leftpad(scope1, ' ', 11);
- gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1);
+ gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1);
gpr_free(tmp_phase);
gpr_free(tmp_scope1);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8d79e93ceb3..e1dcf5262a1 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header {
void *arg;
} grpc_chttp2_executor_action_header;
+typedef enum {
+ /** no writing activity */
+ GRPC_CHTTP2_WRITING_INACTIVE,
+ /** write has been requested, but not scheduled yet */
+ GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ /** write has been requested and scheduled against the workqueue */
+ GRPC_CHTTP2_WRITE_SCHEDULED,
+ /** write has been initiated after being reaped from the workqueue */
+ GRPC_CHTTP2_WRITING,
+ /** write has been initiated, AND another write needs to be started once it's
+ done */
+ GRPC_CHTTP2_WRITING_STALE_WITH_POLLER,
+ GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+} grpc_chttp2_write_state;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -319,10 +335,10 @@ struct grpc_chttp2_transport {
/** is a thread currently in the global lock */
bool global_active;
- /** is a thread currently writing */
- bool writing_active;
/** is a thread currently parsing */
bool parsing_active;
+ /** write execution state of the transport */
+ grpc_chttp2_write_state write_state;
grpc_chttp2_executor_action_header *pending_actions_head;
grpc_chttp2_executor_action_header *pending_actions_tail;
@@ -342,7 +358,8 @@ struct grpc_chttp2_transport {
/** global state for reading/writing */
grpc_chttp2_transport_global global;
/** state only accessible by the chain of execution that
- set writing_active=1 */
+ set writing_state >= GRPC_WRITING, and only by the writing closure
+ chain. */
grpc_chttp2_transport_writing writing;
/** state only accessible by the chain of execution that
set parsing_active=1 */
@@ -363,6 +380,8 @@ struct grpc_chttp2_transport {
grpc_closure reading_action;
/** closure to actually do parsing */
grpc_closure parsing_action;
+ /** closure to initiate writing */
+ grpc_closure initiate_writing;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -436,8 +455,10 @@ typedef struct {
bool seen_error;
bool exceeded_metadata_size;
- /** the error that resulted in this stream being removed */
- grpc_error *removal_error;
+ /** the error that resulted in this stream being read-closed */
+ grpc_error *read_closed_error;
+ /** the error that resulted in this stream being write-closed */
+ grpc_error *write_closed_error;
bool published_initial_metadata;
bool published_trailing_metadata;
@@ -514,15 +535,20 @@ struct grpc_chttp2_stream {
};
/** Transport writing call flow:
- chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes
- are required;
- if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the
- writes.
- Once writes have been completed (meaning another write could potentially be
- started),
- grpc_chttp2_terminate_writing is called. This will call
- grpc_chttp2_cleanup_writing, at which
- point the write phase is complete. */
+ grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
+ go out on the wire.
+ If no other write has been started, a task is enqueued onto our workqueue.
+ When that task executes, it obtains the global lock, and gathers the data
+ to write.
+ The global lock is dropped and we do the syscall to write.
+ After writing, a follow-up check is made to see if another round of writing
+ should be performed.
+
+ The actual call chain is documented in the implementation of this function.
+ */
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason);
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
@@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available);
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
@@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 84eb5752f16..e1fc0ddee20 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads(
transport_parsing, outgoing_window);
is_zero = transport_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
- &stream_global)) {
- grpc_chttp2_become_writable(transport_global, stream_global);
- }
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "new_global_flow_control");
}
if (transport_parsing->incoming_window <
@@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads(
announce_incoming_window, announce_bytes);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
incoming_window, announce_bytes);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "global incoming window");
}
/* for each stream that saw an update, fixup global state */
@@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "stream.read_flow_control");
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 8f3ab00e6df..2eb5f5f632e 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
+ gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id);
if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
}
@@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available) {
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
+ bool out = false;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
- if (is_window_available) {
- grpc_chttp2_become_writable(&transport->global, &stream->global);
- } else {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- &stream->writing);
- }
+ gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled",
+ stream->global.id);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ &stream->writing);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
+ out = true;
}
+ return out;
}
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
+ gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index b19f5f068df..e0d87725e9d 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
- bool is_window_available = transport_writing->outgoing_window > 0;
- grpc_chttp2_list_flush_writing_stalled_by_transport(
- exec_ctx, transport_writing, is_window_available);
+ if (transport_writing->outgoing_window > 0) {
+ while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
+ &stream_global)) {
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "transport.read_flow_control");
+ }
+ }
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
+ if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
+ transport_writing)) {
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "resume_stalled_stream");
+ }
+
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index 1ab3733d381..f901fcf9622 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
+
+grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
+ return ep->vtable->get_workqueue(ep);
+}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index f9808bbda18..894efc0b238 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -51,6 +51,7 @@ struct grpc_endpoint_vtable {
gpr_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
+ grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+/* Retrieve a reference to the workqueue associated with this endpoint */
+grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
+
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index cf0fe736a0b..6a63c4d1d18 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -57,6 +57,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -113,9 +114,7 @@ struct grpc_fd {
grpc_closure *read_closure;
grpc_closure *write_closure;
- /* The polling island to which this fd belongs to and the mutex protecting the
- the field */
- gpr_mu pi_mu;
+ /* The polling island to which this fd belongs to (protected by mu) */
struct polling_island *polling_island;
struct grpc_fd *freelist_next;
@@ -152,16 +151,17 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-// #define GRPC_PI_REF_COUNT_DEBUG
+//#define GRPC_PI_REF_COUNT_DEBUG
#ifdef GRPC_PI_REF_COUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
+#define PI_UNREF(exec_ctx, p, r) \
+ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
-#define PI_UNREF(p, r) pi_unref((p))
+#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
@@ -172,7 +172,7 @@ typedef struct polling_island {
Once the ref count becomes zero, this structure is destroyed which means
we should ensure that there is never a scenario where a PI_ADD_REF() is
racing with a PI_UNREF() that just made the ref_count zero. */
- gpr_refcount ref_count;
+ gpr_atm ref_count;
/* Pointer to the polling_island this merged into.
* merged_to value is only set once in polling_island's lifetime (and that too
@@ -184,6 +184,9 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
+ /* The workqueue associated with this polling island */
+ grpc_workqueue *workqueue;
+
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -191,11 +194,6 @@ typedef struct polling_island {
size_t fd_cnt;
size_t fd_capacity;
grpc_fd **fds;
-
- /* Polling islands that are no longer needed are kept in a freelist so that
- they can be reused. This field points to the next polling island in the
- free list */
- struct polling_island *next_free;
} polling_island;
/*******************************************************************************
@@ -253,13 +251,14 @@ struct grpc_pollset_set {
* Common helpers
*/
-static void append_error(grpc_error **composite, grpc_error *error,
+static bool append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
- if (error == GRPC_ERROR_NONE) return;
+ if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
+ return false;
}
/*******************************************************************************
@@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
-/* Polling island freelist */
-static gpr_mu g_pi_freelist_mu;
-static polling_island *g_pi_freelist = NULL;
-
-static void polling_island_delete(); /* Forward declaration */
+/* Forward declaration */
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -293,28 +289,35 @@ gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
#ifdef GRPC_PI_REF_COUNT_DEBUG
-void pi_add_ref(polling_island *pi);
-void pi_unref(polling_island *pi);
+static void pi_add_ref(polling_island *pi);
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
+static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
+ int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, old_cnt + 1, reason, file, line);
}
-void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
- pi_unref(pi);
+static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
+ char *reason, char *file, int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
+ pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
#endif
-void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
+static void pi_add_ref(polling_island *pi) {
+ gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
+}
-void pi_unref(polling_island *pi) {
- /* If ref count went to zero, delete the polling island.
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
+ /* If ref count went to one, we're back to just the workqueue owning a ref.
+ Unref the workqueue to break the loop.
+
+ If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
polling island (and that there is no racing pi_add_ref() call either).
@@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) {
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
- if (gpr_unref(&pi->ref_count)) {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(pi);
- if (next != NULL) {
- PI_UNREF(next, "pi_delete"); /* Recursive call */
+ switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+ case 2: /* last external ref: the only one now owned is by the workqueue */
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ break;
+ case 1: {
+ polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+ polling_island_delete(exec_ctx, pi);
+ if (next != NULL) {
+ PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+ }
+ break;
}
+ case 0:
+ GPR_UNREACHABLE_CODE(return );
}
}
@@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
}
/* Might return NULL in case of an error */
-static polling_island *polling_island_create(grpc_fd *initial_fd,
+static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
+ grpc_fd *initial_fd,
grpc_error **error) {
polling_island *pi = NULL;
- char *err_msg;
const char *err_desc = "polling_island_create";
- /* Try to get one from the polling island freelist */
- gpr_mu_lock(&g_pi_freelist_mu);
- if (g_pi_freelist != NULL) {
- pi = g_pi_freelist;
- g_pi_freelist = g_pi_freelist->next_free;
- pi->next_free = NULL;
- }
- gpr_mu_unlock(&g_pi_freelist_mu);
+ *error = GRPC_ERROR_NONE;
- /* Create new polling island if we could not get one from the free list */
- if (pi == NULL) {
- pi = gpr_malloc(sizeof(*pi));
- gpr_mu_init(&pi->mu);
- pi->fd_cnt = 0;
- pi->fd_capacity = 0;
- pi->fds = NULL;
- }
+ pi = gpr_malloc(sizeof(*pi));
+ gpr_mu_init(&pi->mu);
+ pi->fd_cnt = 0;
+ pi->fd_capacity = 0;
+ pi->fds = NULL;
+ pi->epoll_fd = -1;
+ pi->workqueue = NULL;
- gpr_ref_init(&pi->ref_count, 0);
+ gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
- gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
- strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- } else {
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
- pi->next_free = NULL;
+ append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
+ goto done;
+ }
- if (initial_fd != NULL) {
- /* Lock the polling island here just in case we got this structure from
- the freelist and the polling island lock was not released yet (by the
- code that adds the polling island to the freelist) */
- gpr_mu_lock(&pi->mu);
- polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- gpr_mu_unlock(&pi->mu);
- }
+ polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+
+ if (initial_fd != NULL) {
+ polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
+ }
+
+ if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
+ err_desc) &&
+ *error == GRPC_ERROR_NONE) {
+ polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
+ error);
+ GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
+ pi->workqueue->wakeup_read_fd->polling_island = pi;
+ PI_ADD_REF(pi, "fd");
}
+done:
+ if (*error != GRPC_ERROR_NONE) {
+ if (pi->workqueue != NULL) {
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ }
+ polling_island_delete(exec_ctx, pi);
+ pi = NULL;
+ }
return pi;
}
-static void polling_island_delete(polling_island *pi) {
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
GPR_ASSERT(pi->fd_cnt == 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
-
- close(pi->epoll_fd);
- pi->epoll_fd = -1;
-
- gpr_mu_lock(&g_pi_freelist_mu);
- pi->next_free = g_pi_freelist;
- g_pi_freelist = pi;
- gpr_mu_unlock(&g_pi_freelist_mu);
+ if (pi->epoll_fd >= 0) {
+ close(pi->epoll_fd);
+ }
+ gpr_mu_destroy(&pi->mu);
+ gpr_free(pi->fds);
+ gpr_free(pi);
}
/* Attempts to gets the last polling island in the linked list (liked by the
@@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p,
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_init(&g_pi_freelist_mu);
- g_pi_freelist = NULL;
-
error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
if (error == GRPC_ERROR_NONE) {
error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
@@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() {
}
static void polling_island_global_shutdown() {
- polling_island *next;
- gpr_mu_lock(&g_pi_freelist_mu);
- gpr_mu_unlock(&g_pi_freelist_mu);
- while (g_pi_freelist != NULL) {
- next = g_pi_freelist->next_free;
- gpr_mu_destroy(&g_pi_freelist->mu);
- gpr_free(g_pi_freelist->fds);
- gpr_free(g_pi_freelist);
- g_pi_freelist = next;
- }
- gpr_mu_destroy(&g_pi_freelist_mu);
-
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
@@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu);
- gpr_mu_init(&new_fd->pi_mu);
}
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a
@@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
+ polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->mu);
fd->on_done_closure = on_done;
@@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island
before doing this.) */
- gpr_mu_lock(&fd->pi_mu);
if (fd->polling_island != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
- PI_UNREF(fd->polling_island, "fd_orphan");
+ unref_pi = fd->polling_island;
fd->polling_island = NULL;
}
- gpr_mu_unlock(&fd->pi_mu);
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
+ if (unref_pi != NULL) {
+ /* Unref stale polling island here, outside the fd lock above.
+ The polling island owns a workqueue which owns an fd, and unreffing
+ inside the lock can cause an eventual lock loop that makes TSAN very
+ unhappy. */
+ PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
+ }
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
}
@@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ grpc_workqueue *workqueue = NULL;
+ if (fd->polling_island != NULL) {
+ workqueue =
+ GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
+ }
+ gpr_mu_unlock(&fd->mu);
+ return workqueue;
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
-static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
+static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) {
- PI_UNREF(ps->polling_island, reason);
+ PI_UNREF(exec_ctx, ps->polling_island, reason);
}
ps->polling_island = NULL;
}
@@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */
- pollset_release_polling_island(pollset, "ps_shutdown");
+ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
@@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
- pollset_release_polling_island(pollset, "ps_reset");
+ GPR_ASSERT(pollset->polling_island == NULL);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
@@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(NULL, error);
+ pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
@@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
pollset->polling_island = pi;
}
@@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
- PI_UNREF(pi, "ps_work");
+ PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
@@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
polling_island *pi_new = NULL;
+retry:
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
* equal, do nothing.
* 2) If fd->polling_island and pollset->polling_island are both NULL, create
@@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* polling_island fields in both fd and pollset to point to the merged
* polling island.
*/
+
+ if (fd->orphaned) {
+ gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&pollset->mu);
+ /* early out */
+ return;
+ }
+
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
- pi_new = polling_island_create(fd, &error);
-
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
- "pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point where
+ that loop possibility disappears), but the advantages of keeping TSAN
+ happy outweigh any performance advantage we might have by keeping the
+ lock held. */
+ gpr_mu_unlock(&fd->mu);
+ pi_new = polling_island_create(exec_ctx, fd, &error);
+ gpr_mu_lock(&fd->mu);
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
+ if (fd->polling_island != NULL) {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Raced creating new polling island. pi_new: %p "
+ "(fd: %d, pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ PI_ADD_REF(pi_new, "dance_of_destruction");
+ PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ goto retry;
+ } else {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
+ "pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ }
}
} else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island);
@@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
- PI_UNREF(fd->polling_island, "fd");
+ PI_UNREF(exec_ctx, fd->polling_island, "fd");
}
fd->polling_island = pi_new;
}
@@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->polling_island != pi_new) {
PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) {
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
}
pollset->polling_island = pi_new;
}
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
gpr_mu_unlock(&pollset->mu);
+
+ GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
/*******************************************************************************
@@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi;
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
pi = fd->polling_island;
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
return pi;
}
@@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 9e306af5fac..c2107e5e393 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 45c0a5e9546..4b593f4b2c2 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index a3c1e9db9a0..65366726859 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
+ return g_event_engine->fd_get_workqueue(fd);
+}
+
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 579c84ef707..c2aa1756ea2 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
+ grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
+/* Get a workqueue that's associated with this fd */
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
+
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index c44aafcddf0..ac7785ec135 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include
#include
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
@@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ if (offload_target_or_null == NULL) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ } else {
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ }
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 38f27d9b136..917f332f03d 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed at the next flush/finish point */
+/** Add a closure to be executed in the future.
+ If \a offload_target_or_null is NULL, the closure will be executed at the
+ next exec_ctx.{finish,flush} point.
+ If \a offload_target_or_null is non-NULL, the closure will be scheduled
+ against the workqueue, and a reference to the workqueue will be consumed. */
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null);
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 89292a153ed..d67d388b8c9 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
@@ -62,6 +63,7 @@ void grpc_iomgr_init(void) {
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
+ grpc_network_status_init();
grpc_iomgr_platform_init();
}
@@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_platform_shutdown();
grpc_exec_ctx_global_shutdown();
+ grpc_network_status_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index 38a1c9b7d41..b4bb7e3cf7b 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -42,10 +42,16 @@ typedef struct endpoint_ll_node {
static endpoint_ll_node *head = NULL;
static gpr_mu g_endpoint_mutex;
-static bool g_init_done = false;
-void grpc_initialize_network_status_monitor() {
- g_init_done = true;
+void grpc_network_status_shutdown(void) {
+ if (head != NULL) {
+ gpr_log(GPR_ERROR,
+ "Memory leaked as all network endpoints were not shut down");
+ }
+ gpr_mu_destroy(&g_endpoint_mutex);
+}
+
+void grpc_network_status_init(void) {
gpr_mu_init(&g_endpoint_mutex);
// TODO(makarandd): Install callback with OS to monitor network status.
}
@@ -60,9 +66,6 @@ void grpc_destroy_network_status_monitor() {
}
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
- if (!g_init_done) {
- grpc_initialize_network_status_monitor();
- }
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h
index 74a1aa8135f..67cb645f445 100644
--- a/src/core/lib/iomgr/network_status_tracker.h
+++ b/src/core/lib/iomgr/network_status_tracker.h
@@ -35,7 +35,11 @@
#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
#include "src/core/lib/iomgr/endpoint.h"
+void grpc_network_status_init(void);
+void grpc_network_status_shutdown(void);
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep);
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep);
void grpc_network_status_shutdown_all_endpoints();
+
#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2ab45e33ce3..ec21e039448 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 16
+#define MAX_WRITE_IOVEC 1024
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
@@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
- tcp_shutdown, tcp_destroy, tcp_get_peer};
+static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return grpc_fd_get_workqueue(tcp->em_fd);
+}
+
+static const grpc_endpoint_vtable vtable = {tcp_read,
+ tcp_write,
+ tcp_get_workqueue,
+ tcp_add_to_pollset,
+ tcp_add_to_pollset_set,
+ tcp_shutdown,
+ tcp_destroy,
+ tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index d3803c3bd0e..cb2ff782d60 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
}
for (unsigned i = 0; i < count; i++) {
- int fd, port;
+ int fd = -1;
+ int port = -1;
grpc_dualstack_mode dsmode;
err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
&dsmode, &fd);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 37ab59021e3..35054c42b55 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_endpoint_vtable vtable = {
- win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
+static grpc_endpoint_vtable vtable = {win_read,
+ win_write,
+ win_get_workqueue,
+ win_add_to_pollset,
+ win_add_to_pollset_set,
+ win_shutdown,
+ win_destroy,
+ win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 5cc40eea505..7156e490d73 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -38,6 +38,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/workqueue_posix.h"
@@ -49,35 +50,45 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-/** Create a work queue */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
+/* Deprecated: do not use.
+ This has *already* been removed in a future commit. */
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
+/* Reference counting functions. Use the macro's always
+ (GRPC_WORKQUEUE_{REF,UNREF}).
+
+ Pass in a descriptive reason string for reffing/unreffing as the last
+ argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
+ string will be printed alongside the refcount. When it is not defined, the
+ string will be discarded at compilation time. */
+
+//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
- grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) \
- grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r))
+ (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
+#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
+ grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
-#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
+#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Bind this workqueue to a pollset */
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset);
+/** Add a work item to a workqueue. Items added to a work queue will be started
+ in approximately the order they were enqueued, on some thread that may or
+ may not be the current thread. Successive closures enqueued onto a workqueue
+ MAY be executed concurrently.
+
+ It is generally more expensive to add a closure to a workqueue than to the
+ execution context, both in terms of CPU work and in execution latency.
-/** Add a work item to a workqueue */
+ Use work queues when it's important that other threads be given a chance to
+ tackle some workload. */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 45e0f6063b4..e0d6dac2308 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
- GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list));
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
}
@@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
}
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index dcb47e7b59d..0f26ba58e27 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -50,4 +50,9 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
+/** Create a work queue. Returns an error if creation fails. If creation
+ succeeds, sets *workqueue to point to it. */
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
+
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 275f040b1cc..23e2dea1859 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -37,4 +37,26 @@
#include "src/core/lib/iomgr/workqueue.h"
+// Minimal implementation of grpc_workqueue for Windows
+// Works by directly enqueuing workqueue items onto the current execution
+// context, which is at least correct, if not performant or in the spirit of
+// workqueues.
+
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 7650d68e892..bc50f9d1b00 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
-static const grpc_endpoint_vtable vtable = {
- endpoint_read, endpoint_write,
- endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_destroy,
- endpoint_get_peer};
+static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
+ secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ return grpc_endpoint_get_workqueue(ep->wrapped_ep);
+}
+
+static const grpc_endpoint_vtable vtable = {endpoint_read,
+ endpoint_write,
+ endpoint_get_workqueue,
+ endpoint_add_to_pollset,
+ endpoint_add_to_pollset_set,
+ endpoint_shutdown,
+ endpoint_destroy,
+ endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c
index bae0957df72..899f1218b60 100644
--- a/src/core/lib/support/log.c
+++ b/src/core/lib/support/log.c
@@ -79,17 +79,18 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
void gpr_log_verbosity_init() {
char *verbosity = gpr_getenv("GRPC_VERBOSITY");
- if (verbosity == NULL) return;
- gpr_atm min_severity_to_print = GPR_LOG_VERBOSITY_UNSET;
- if (strcmp(verbosity, "DEBUG") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG;
- } else if (strcmp(verbosity, "INFO") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO;
- } else if (strcmp(verbosity, "ERROR") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR;
+ gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR;
+ if (verbosity != NULL) {
+ if (strcmp(verbosity, "DEBUG") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG;
+ } else if (strcmp(verbosity, "INFO") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO;
+ } else if (strcmp(verbosity, "ERROR") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR;
+ }
+ gpr_free(verbosity);
}
- gpr_free(verbosity);
if ((gpr_atm_no_barrier_load(&g_min_severity_to_print)) ==
GPR_LOG_VERBOSITY_UNSET) {
gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b0..2f108af48a1 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
+ size_t cq_idx;
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
@@ -206,11 +207,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls indices */
- gpr_stack_lockfree *request_freelist;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
- requested_call *requested_calls;
- size_t max_requested_calls;
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) {
- fail_call(exec_ctx, server, i, &server->requested_calls[request_id],
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error));
}
}
@@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
- gpr_stack_lockfree_destroy(server->request_freelist);
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
- gpr_free(server->requested_calls);
gpr_free(server);
}
@@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
requested_call *rc = req;
grpc_server *server = rc->server;
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}
@@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue(
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- size_t i;
-
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
&server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls = 32768;
- server->request_freelist =
- gpr_stack_lockfree_create(server->max_requested_calls);
- for (i = 0; i < (size_t)server->max_requested_calls; i++) {
- gpr_stack_lockfree_push(server->request_freelist, (int)i);
- }
- server->requested_calls = gpr_malloc(server->max_requested_calls *
- sizeof(*server->requested_calls));
-
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
size_t pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server->max_requested_calls,
- server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
for (l = server->listeners; l; l = l->next) {
@@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_CREATE("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist);
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
- GRPC_ERROR_CREATE("Server Shutdown"));
+ grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT,
+ server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- server->requested_calls[request_id] = *rc;
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
@@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
+ rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
@@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call(
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
+ rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 054f112127b..68d05e3a858 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
+ if (grpc_connectivity_state_trace) {
+ gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify);
+ }
grpc_exec_ctx_sched(exec_ctx, w->notify,
GRPC_ERROR_REF(tracker->current_error), NULL);
gpr_free(w);
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index fb4c68ebe49..af04fd4ca64 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -281,6 +281,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
+ shutdown_notified_(false),
num_running_cb_(0),
sync_methods_(new std::list),
has_generic_service_(false),
@@ -462,13 +463,16 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
while (num_running_cb_ != 0) {
callback_cv_.wait(lock);
}
+
+ shutdown_notified_ = true;
+ shutdown_cv_.notify_all();
}
}
void Server::Wait() {
grpc::unique_lock lock(mu_);
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
+ while (started_ && !shutdown_notified_) {
+ shutdown_cv_.wait(lock);
}
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 43117fd1e95..1ca6a2b906e 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -129,7 +129,8 @@ ServerContext::ServerContext()
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
call_(nullptr),
cq_(nullptr),
- sent_initial_metadata_(false) {}
+ sent_initial_metadata_(false),
+ compression_level_set_(false) {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
@@ -139,7 +140,8 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
deadline_(deadline),
call_(nullptr),
cq_(nullptr),
- sent_initial_metadata_(false) {
+ sent_initial_metadata_(false),
+ compression_level_set_(false) {
for (size_t i = 0; i < metadata_count; i++) {
client_metadata_.insert(std::pair(
metadata[i].key,
@@ -194,15 +196,6 @@ bool ServerContext::IsCancelled() const {
}
}
-void ServerContext::set_compression_level(grpc_compression_level level) {
- // TODO(dgq): get rid of grpc_call_compression_for_level and propagate the
- // compression level by adding a new argument to
- // CallOpSendInitialMetadata::SendInitialMetadata.
- const grpc_compression_algorithm algorithm_for_level =
- grpc_call_compression_for_level(call_, level);
- set_compression_algorithm(algorithm_for_level);
-}
-
void ServerContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
char* algorithm_name = NULL;
diff --git a/src/csharp/.nuget/packages.config b/src/csharp/.nuget/packages.config
deleted file mode 100644
index 6154b3561f4..00000000000
--- a/src/csharp/.nuget/packages.config
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config
index aa7d951fdc7..6a930c17ee6 100644
--- a/src/csharp/Grpc.Core.Tests/packages.config
+++ b/src/csharp/Grpc.Core.Tests/packages.config
@@ -4,4 +4,7 @@
-
\ No newline at end of file
+
+
+
+
diff --git a/src/csharp/Grpc.Core.Tests/project.json b/src/csharp/Grpc.Core.Tests/project.json
index d4c9a2ef316..4a682d927e4 100644
--- a/src/csharp/Grpc.Core.Tests/project.json
+++ b/src/csharp/Grpc.Core.Tests/project.json
@@ -54,7 +54,10 @@
},
"Newtonsoft.Json": "8.0.3",
"NUnit": "3.2.0",
- "NUnitLite": "3.2.0-*"
+ "NUnitLite": "3.2.0-*",
+ "NUnit.ConsoleRunner": "3.2.0",
+ "OpenCover": "4.6.519",
+ "ReportGenerator": "2.4.4.0"
},
"frameworks": {
"net45": { },
diff --git a/src/csharp/Grpc.Examples.MathClient/packages.config b/src/csharp/Grpc.Examples.MathClient/packages.config
new file mode 100644
index 00000000000..79ece06bef6
--- /dev/null
+++ b/src/csharp/Grpc.Examples.MathClient/packages.config
@@ -0,0 +1,3 @@
+
+
+
diff --git a/src/csharp/Grpc.Examples.MathServer/packages.config b/src/csharp/Grpc.Examples.MathServer/packages.config
new file mode 100644
index 00000000000..79ece06bef6
--- /dev/null
+++ b/src/csharp/Grpc.Examples.MathServer/packages.config
@@ -0,0 +1,3 @@
+
+
+
diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/packages.config b/src/csharp/Grpc.IntegrationTesting.QpsWorker/packages.config
new file mode 100644
index 00000000000..79ece06bef6
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting.QpsWorker/packages.config
@@ -0,0 +1,3 @@
+
+
+
diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/packages.config b/src/csharp/Grpc.IntegrationTesting.StressClient/packages.config
new file mode 100644
index 00000000000..79ece06bef6
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting.StressClient/packages.config
@@ -0,0 +1,3 @@
+
+
+
diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc
index 3c8f0c56da4..81fc552fd10 100644
--- a/src/node/ext/call_credentials.cc
+++ b/src/node/ext/call_credentials.cc
@@ -68,6 +68,8 @@ using v8::Value;
Nan::Callback *CallCredentials::constructor;
Persistent CallCredentials::fun_tpl;
+static Callback *plugin_callback;
+
CallCredentials::CallCredentials(grpc_call_credentials *credentials)
: wrapped_credentials(credentials) {}
@@ -88,6 +90,11 @@ void CallCredentials::Init(Local