mirror of https://github.com/grpc/grpc.git
commit
5a6183f1bd
209 changed files with 4921 additions and 3676 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,110 @@ |
||||
#
|
||||
# Copyright 2018 gRPC authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
HOST_SYSTEM = $(shell uname | cut -f 1 -d_)
|
||||
SYSTEM ?= $(HOST_SYSTEM)
|
||||
CXX = g++
|
||||
CPPFLAGS += `pkg-config --cflags protobuf grpc`
|
||||
CXXFLAGS += -std=c++11
|
||||
ifeq ($(SYSTEM),Darwin) |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++ grpc`\
|
||||
-lgrpc++_reflection\
|
||||
-ldl
|
||||
else |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++ grpc`\
|
||||
-Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed\
|
||||
-ldl
|
||||
endif |
||||
PROTOC = protoc
|
||||
GRPC_CPP_PLUGIN = grpc_cpp_plugin
|
||||
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
|
||||
|
||||
PROTOS_PATH = ../../protos
|
||||
|
||||
vpath %.proto $(PROTOS_PATH) |
||||
|
||||
all: system-check greeter_client greeter_server |
||||
|
||||
greeter_client: helloworld.pb.o helloworld.grpc.pb.o greeter_client.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
greeter_server: helloworld.pb.o helloworld.grpc.pb.o greeter_server.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
.PRECIOUS: %.grpc.pb.cc |
||||
%.grpc.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $<
|
||||
|
||||
.PRECIOUS: %.pb.cc |
||||
%.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $<
|
||||
|
||||
clean: |
||||
rm -f *.o *.pb.cc *.pb.h greeter_client greeter_server
|
||||
|
||||
|
||||
# The following is to test your system and ensure a smoother experience.
|
||||
# They are by no means necessary to actually compile a grpc-enabled software.
|
||||
|
||||
PROTOC_CMD = which $(PROTOC)
|
||||
PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3
|
||||
PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN)
|
||||
HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false)
|
||||
ifeq ($(HAS_PROTOC),true) |
||||
HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false)
|
||||
endif |
||||
HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false)
|
||||
|
||||
SYSTEM_OK = false
|
||||
ifeq ($(HAS_VALID_PROTOC),true) |
||||
ifeq ($(HAS_PLUGIN),true) |
||||
SYSTEM_OK = true
|
||||
endif |
||||
endif |
||||
|
||||
system-check: |
||||
ifneq ($(HAS_VALID_PROTOC),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have protoc 3.0.0 installed in your path."
|
||||
@echo "Please install Google protocol buffers 3.0.0 and its compiler."
|
||||
@echo "You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/google/protobuf/releases/tag/v3.0.0"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to evaluate your version of protoc:"
|
||||
@echo
|
||||
-$(PROTOC) --version
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(HAS_PLUGIN),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have the grpc c++ protobuf plugin installed in your path."
|
||||
@echo "Please install grpc. You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/grpc/grpc"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to detect if you have the plugin:"
|
||||
@echo
|
||||
-which $(GRPC_CPP_PLUGIN)
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(SYSTEM_OK),true) |
||||
@false
|
||||
endif |
@ -0,0 +1,64 @@ |
||||
# gRPC C++ Load Balancing Tutorial |
||||
|
||||
### Prerequisite |
||||
Make sure you have run the [hello world example](../helloworld) or understood the basics of gRPC. We will not dive into the details that have been discussed in the hello world example. |
||||
|
||||
### Get the tutorial source code |
||||
|
||||
The example code for this and our other examples lives in the `examples` directory. Clone this repository to your local machine by running the following command: |
||||
|
||||
|
||||
```sh |
||||
$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc |
||||
``` |
||||
|
||||
Change your current directory to examples/cpp/load_balancing |
||||
|
||||
```sh |
||||
$ cd examples/cpp/load_balancing/ |
||||
``` |
||||
|
||||
### Generating gRPC code |
||||
|
||||
To generate the client and server side interfaces: |
||||
|
||||
```sh |
||||
$ make helloworld.grpc.pb.cc helloworld.pb.cc |
||||
``` |
||||
Which internally invokes the proto-compiler as: |
||||
|
||||
```sh |
||||
$ protoc -I ../../protos/ --grpc_out=. --plugin=protoc-gen-grpc=grpc_cpp_plugin ../../protos/helloworld.proto |
||||
$ protoc -I ../../protos/ --cpp_out=. ../../protos/helloworld.proto |
||||
``` |
||||
|
||||
### Writing a client and a server |
||||
|
||||
The client and the server can be based on the hello world example. |
||||
|
||||
Additionally, we can configure the load balancing policy. (To see what load balancing policies are available, check out [this folder](https://github.com/grpc/grpc/tree/master/src/core/ext/filters/client_channel/lb_policy).) |
||||
|
||||
In the client, set the load balancing policy of the channel via the channel arg (to, for example, Round Robin). |
||||
|
||||
```cpp |
||||
ChannelArguments args; |
||||
// Set the load balancing policy for the channel. |
||||
args.SetLoadBalancingPolicyName("round_robin"); |
||||
GreeterClient greeter(grpc::CreateCustomChannel( |
||||
"localhost:50051", grpc::InsecureChannelCredentials(), args)); |
||||
``` |
||||
|
||||
For a working example, refer to [greeter_client.cc](greeter_client.cc) and [greeter_server.cc](greeter_server.cc). |
||||
|
||||
Build and run the client and the server with the following commands. |
||||
|
||||
```sh |
||||
make |
||||
./greeter_server |
||||
``` |
||||
|
||||
```sh |
||||
./greeter_client |
||||
``` |
||||
|
||||
(Note that the case in this example is trivial because there is only one server resolved from the name.) |
@ -0,0 +1,90 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
using grpc::Channel; |
||||
using grpc::ChannelArguments; |
||||
using grpc::ClientContext; |
||||
using grpc::Status; |
||||
using helloworld::HelloRequest; |
||||
using helloworld::HelloReply; |
||||
using helloworld::Greeter; |
||||
|
||||
class GreeterClient { |
||||
public: |
||||
GreeterClient(std::shared_ptr<Channel> channel) |
||||
: stub_(Greeter::NewStub(channel)) {} |
||||
|
||||
// Assembles the client's payload, sends it and presents the response back
|
||||
// from the server.
|
||||
std::string SayHello(const std::string& user) { |
||||
// Data we are sending to the server.
|
||||
HelloRequest request; |
||||
request.set_name(user); |
||||
|
||||
// Container for the data we expect from the server.
|
||||
HelloReply reply; |
||||
|
||||
// Context for the client. It could be used to convey extra information to
|
||||
// the server and/or tweak certain RPC behaviors.
|
||||
ClientContext context; |
||||
|
||||
// The actual RPC.
|
||||
Status status = stub_->SayHello(&context, request, &reply); |
||||
|
||||
// Act upon its status.
|
||||
if (status.ok()) { |
||||
return reply.message(); |
||||
} else { |
||||
std::cout << status.error_code() << ": " << status.error_message() |
||||
<< std::endl; |
||||
return "RPC failed"; |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::unique_ptr<Greeter::Stub> stub_; |
||||
}; |
||||
|
||||
int main(int argc, char** argv) { |
||||
// Instantiate the client. It requires a channel, out of which the actual RPCs
|
||||
// are created. This channel models a connection to an endpoint (in this case,
|
||||
// localhost at port 50051). We indicate that the channel isn't authenticated
|
||||
// (use of InsecureChannelCredentials()).
|
||||
ChannelArguments args; |
||||
// Set the load balancing policy for the channel.
|
||||
args.SetLoadBalancingPolicyName("round_robin"); |
||||
GreeterClient greeter(grpc::CreateCustomChannel( |
||||
"localhost:50051", grpc::InsecureChannelCredentials(), args)); |
||||
std::string user("world"); |
||||
std::string reply = greeter.SayHello(user); |
||||
std::cout << "Greeter received: " << reply << std::endl; |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,72 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
using grpc::Server; |
||||
using grpc::ServerBuilder; |
||||
using grpc::ServerContext; |
||||
using grpc::Status; |
||||
using helloworld::HelloRequest; |
||||
using helloworld::HelloReply; |
||||
using helloworld::Greeter; |
||||
|
||||
// Logic and data behind the server's behavior.
|
||||
class GreeterServiceImpl final : public Greeter::Service { |
||||
Status SayHello(ServerContext* context, const HelloRequest* request, |
||||
HelloReply* reply) override { |
||||
std::string prefix("Hello "); |
||||
reply->set_message(prefix + request->name()); |
||||
return Status::OK; |
||||
} |
||||
}; |
||||
|
||||
void RunServer() { |
||||
std::string server_address("0.0.0.0:50051"); |
||||
GreeterServiceImpl service; |
||||
|
||||
ServerBuilder builder; |
||||
// Listen on the given address without any authentication mechanism.
|
||||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); |
||||
// Register "service" as the instance through which we'll communicate with
|
||||
// clients. In this case it corresponds to an *synchronous* service.
|
||||
builder.RegisterService(&service); |
||||
// Finally assemble the server.
|
||||
std::unique_ptr<Server> server(builder.BuildAndStart()); |
||||
std::cout << "Server listening on " << server_address << std::endl; |
||||
|
||||
// Wait for the server to shutdown. Note that some other thread must be
|
||||
// responsible for shutting down the server for this call to ever return.
|
||||
server->Wait(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
RunServer(); |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,24 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H |
||||
#define GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H |
||||
|
||||
#include <grpcpp/impl/codegen/client_interceptor.h> |
||||
|
||||
#endif // GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H
|
@ -0,0 +1,24 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPCPP_SUPPORT_INTERCEPTOR_H |
||||
#define GRPCPP_SUPPORT_INTERCEPTOR_H |
||||
|
||||
#include <grpcpp/impl/codegen/interceptor.h> |
||||
|
||||
#endif // GRPCPP_SUPPORT_INTERCEPTOR_H
|
@ -0,0 +1,24 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H |
||||
#define GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H |
||||
|
||||
#include <grpcpp/impl/codegen/server_interceptor.h> |
||||
|
||||
#endif // GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,936 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/request_routing.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdbool.h> |
||||
#include <stdio.h> |
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
||||
#include "src/core/ext/filters/client_channel/resolver_registry.h" |
||||
#include "src/core/ext/filters/client_channel/retry_throttle.h" |
||||
#include "src/core/ext/filters/client_channel/server_address.h" |
||||
#include "src/core/ext/filters/client_channel/subchannel.h" |
||||
#include "src/core/ext/filters/deadline/deadline_filter.h" |
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/connected_channel.h" |
||||
#include "src/core/lib/channel/status_util.h" |
||||
#include "src/core/lib/gpr/string.h" |
||||
#include "src/core/lib/gprpp/inlined_vector.h" |
||||
#include "src/core/lib/gprpp/manual_constructor.h" |
||||
#include "src/core/lib/iomgr/combiner.h" |
||||
#include "src/core/lib/iomgr/iomgr.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/profiling/timers.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_string_helpers.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
#include "src/core/lib/transport/error_utils.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/service_config.h" |
||||
#include "src/core/lib/transport/static_metadata.h" |
||||
#include "src/core/lib/transport/status_metadata.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
//
|
||||
// RequestRouter::Request::ResolverResultWaiter
|
||||
//
|
||||
|
||||
// Handles waiting for a resolver result.
|
||||
// Used only for the first call on an idle channel.
|
||||
class RequestRouter::Request::ResolverResultWaiter { |
||||
public: |
||||
explicit ResolverResultWaiter(Request* request) |
||||
: request_router_(request->request_router_), |
||||
request_(request), |
||||
tracer_enabled_(request_router_->tracer_->enabled()) { |
||||
if (tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: deferring pick pending resolver " |
||||
"result", |
||||
request_router_, request); |
||||
} |
||||
// Add closure to be run when a resolver result is available.
|
||||
GRPC_CLOSURE_INIT(&done_closure_, &DoneLocked, this, |
||||
grpc_combiner_scheduler(request_router_->combiner_)); |
||||
AddToWaitingList(); |
||||
// Set cancellation closure, so that we abort if the call is cancelled.
|
||||
GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, |
||||
grpc_combiner_scheduler(request_router_->combiner_)); |
||||
grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, |
||||
&cancel_closure_); |
||||
} |
||||
|
||||
private: |
||||
// Adds done_closure_ to
|
||||
// request_router_->waiting_for_resolver_result_closures_.
|
||||
void AddToWaitingList() { |
||||
grpc_closure_list_append( |
||||
&request_router_->waiting_for_resolver_result_closures_, &done_closure_, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
// Invoked when a resolver result is available.
|
||||
static void DoneLocked(void* arg, grpc_error* error) { |
||||
ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
||||
RequestRouter* request_router = self->request_router_; |
||||
// If CancelLocked() has already run, delete ourselves without doing
|
||||
// anything. Note that the call stack may have already been destroyed,
|
||||
// so it's not safe to access anything in state_.
|
||||
if (GPR_UNLIKELY(self->finished_)) { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p: call cancelled before resolver result", |
||||
request_router); |
||||
} |
||||
Delete(self); |
||||
return; |
||||
} |
||||
// Otherwise, process the resolver result.
|
||||
Request* request = self->request_; |
||||
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: resolver failed to return data", |
||||
request_router, request); |
||||
} |
||||
GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_REF(error)); |
||||
} else if (GPR_UNLIKELY(request_router->resolver_ == nullptr)) { |
||||
// Shutting down.
|
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, "request_router=%p request=%p: resolver disconnected", |
||||
request_router, request); |
||||
} |
||||
GRPC_CLOSURE_RUN(request->on_route_done_, |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
||||
} else if (GPR_UNLIKELY(request_router->lb_policy_ == nullptr)) { |
||||
// Transient resolver failure.
|
||||
// If call has wait_for_ready=true, try again; otherwise, fail.
|
||||
if (*request->pick_.initial_metadata_flags & |
||||
GRPC_INITIAL_METADATA_WAIT_FOR_READY) { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: resolver returned but no LB " |
||||
"policy; wait_for_ready=true; trying again", |
||||
request_router, request); |
||||
} |
||||
// Re-add ourselves to the waiting list.
|
||||
self->AddToWaitingList(); |
||||
// Return early so that we don't set finished_ to true below.
|
||||
return; |
||||
} else { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: resolver returned but no LB " |
||||
"policy; wait_for_ready=false; failing", |
||||
request_router, request); |
||||
} |
||||
GRPC_CLOSURE_RUN( |
||||
request->on_route_done_, |
||||
grpc_error_set_int( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), |
||||
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
||||
} |
||||
} else { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: resolver returned, doing LB " |
||||
"pick", |
||||
request_router, request); |
||||
} |
||||
request->ProcessServiceConfigAndStartLbPickLocked(); |
||||
} |
||||
self->finished_ = true; |
||||
} |
||||
|
||||
// Invoked when the call is cancelled.
|
||||
// Note: This runs under the client_channel combiner, but will NOT be
|
||||
// holding the call combiner.
|
||||
static void CancelLocked(void* arg, grpc_error* error) { |
||||
ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
||||
RequestRouter* request_router = self->request_router_; |
||||
// If DoneLocked() has already run, delete ourselves without doing anything.
|
||||
if (self->finished_) { |
||||
Delete(self); |
||||
return; |
||||
} |
||||
Request* request = self->request_; |
||||
// If we are being cancelled, immediately invoke on_route_done_
|
||||
// to propagate the error back to the caller.
|
||||
if (error != GRPC_ERROR_NONE) { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: cancelling call waiting for " |
||||
"name resolution", |
||||
request_router, request); |
||||
} |
||||
// Note: Although we are not in the call combiner here, we are
|
||||
// basically stealing the call combiner from the pending pick, so
|
||||
// it's safe to run on_route_done_ here -- we are essentially
|
||||
// calling it here instead of calling it in DoneLocked().
|
||||
GRPC_CLOSURE_RUN(request->on_route_done_, |
||||
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
||||
"Pick cancelled", &error, 1)); |
||||
} |
||||
self->finished_ = true; |
||||
} |
||||
|
||||
RequestRouter* request_router_; |
||||
Request* request_; |
||||
const bool tracer_enabled_; |
||||
grpc_closure done_closure_; |
||||
grpc_closure cancel_closure_; |
||||
bool finished_ = false; |
||||
}; |
||||
|
||||
//
|
||||
// RequestRouter::Request::AsyncPickCanceller
|
||||
//
|
||||
|
||||
// Handles the call combiner cancellation callback for an async LB pick.
|
||||
class RequestRouter::Request::AsyncPickCanceller { |
||||
public: |
||||
explicit AsyncPickCanceller(Request* request) |
||||
: request_router_(request->request_router_), |
||||
request_(request), |
||||
tracer_enabled_(request_router_->tracer_->enabled()) { |
||||
GRPC_CALL_STACK_REF(request->owning_call_, "pick_callback_cancel"); |
||||
// Set cancellation closure, so that we abort if the call is cancelled.
|
||||
GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, |
||||
grpc_combiner_scheduler(request_router_->combiner_)); |
||||
grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, |
||||
&cancel_closure_); |
||||
} |
||||
|
||||
void MarkFinishedLocked() { |
||||
finished_ = true; |
||||
GRPC_CALL_STACK_UNREF(request_->owning_call_, "pick_callback_cancel"); |
||||
} |
||||
|
||||
private: |
||||
// Invoked when the call is cancelled.
|
||||
// Note: This runs under the client_channel combiner, but will NOT be
|
||||
// holding the call combiner.
|
||||
static void CancelLocked(void* arg, grpc_error* error) { |
||||
AsyncPickCanceller* self = static_cast<AsyncPickCanceller*>(arg); |
||||
Request* request = self->request_; |
||||
RequestRouter* request_router = self->request_router_; |
||||
if (!self->finished_) { |
||||
// Note: request_router->lb_policy_ may have changed since we started our
|
||||
// pick, in which case we will be cancelling the pick on a policy other
|
||||
// than the one we started it on. However, this will just be a no-op.
|
||||
if (error != GRPC_ERROR_NONE && request_router->lb_policy_ != nullptr) { |
||||
if (self->tracer_enabled_) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: cancelling pick from LB " |
||||
"policy %p", |
||||
request_router, request, request_router->lb_policy_.get()); |
||||
} |
||||
request_router->lb_policy_->CancelPickLocked(&request->pick_, |
||||
GRPC_ERROR_REF(error)); |
||||
} |
||||
request->pick_canceller_ = nullptr; |
||||
GRPC_CALL_STACK_UNREF(request->owning_call_, "pick_callback_cancel"); |
||||
} |
||||
Delete(self); |
||||
} |
||||
|
||||
RequestRouter* request_router_; |
||||
Request* request_; |
||||
const bool tracer_enabled_; |
||||
grpc_closure cancel_closure_; |
||||
bool finished_ = false; |
||||
}; |
||||
|
||||
//
|
||||
// RequestRouter::Request
|
||||
//
|
||||
|
||||
RequestRouter::Request::Request(grpc_call_stack* owning_call, |
||||
grpc_call_combiner* call_combiner, |
||||
grpc_polling_entity* pollent, |
||||
grpc_metadata_batch* send_initial_metadata, |
||||
uint32_t* send_initial_metadata_flags, |
||||
ApplyServiceConfigCallback apply_service_config, |
||||
void* apply_service_config_user_data, |
||||
grpc_closure* on_route_done) |
||||
: owning_call_(owning_call), |
||||
call_combiner_(call_combiner), |
||||
pollent_(pollent), |
||||
apply_service_config_(apply_service_config), |
||||
apply_service_config_user_data_(apply_service_config_user_data), |
||||
on_route_done_(on_route_done) { |
||||
pick_.initial_metadata = send_initial_metadata; |
||||
pick_.initial_metadata_flags = send_initial_metadata_flags; |
||||
} |
||||
|
||||
RequestRouter::Request::~Request() { |
||||
if (pick_.connected_subchannel != nullptr) { |
||||
pick_.connected_subchannel.reset(); |
||||
} |
||||
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { |
||||
if (pick_.subchannel_call_context[i].destroy != nullptr) { |
||||
pick_.subchannel_call_context[i].destroy( |
||||
pick_.subchannel_call_context[i].value); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Invoked once resolver results are available.
|
||||
void RequestRouter::Request::ProcessServiceConfigAndStartLbPickLocked() { |
||||
// Get service config data if needed.
|
||||
if (!apply_service_config_(apply_service_config_user_data_)) return; |
||||
// Start LB pick.
|
||||
StartLbPickLocked(); |
||||
} |
||||
|
||||
void RequestRouter::Request::MaybeAddCallToInterestedPartiesLocked() { |
||||
if (!pollent_added_to_interested_parties_) { |
||||
pollent_added_to_interested_parties_ = true; |
||||
grpc_polling_entity_add_to_pollset_set( |
||||
pollent_, request_router_->interested_parties_); |
||||
} |
||||
} |
||||
|
||||
void RequestRouter::Request::MaybeRemoveCallFromInterestedPartiesLocked() { |
||||
if (pollent_added_to_interested_parties_) { |
||||
pollent_added_to_interested_parties_ = false; |
||||
grpc_polling_entity_del_from_pollset_set( |
||||
pollent_, request_router_->interested_parties_); |
||||
} |
||||
} |
||||
|
||||
// Starts a pick on the LB policy.
|
||||
void RequestRouter::Request::StartLbPickLocked() { |
||||
if (request_router_->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: starting pick on lb_policy=%p", |
||||
request_router_, this, request_router_->lb_policy_.get()); |
||||
} |
||||
GRPC_CLOSURE_INIT(&on_pick_done_, &LbPickDoneLocked, this, |
||||
grpc_combiner_scheduler(request_router_->combiner_)); |
||||
pick_.on_complete = &on_pick_done_; |
||||
GRPC_CALL_STACK_REF(owning_call_, "pick_callback"); |
||||
grpc_error* error = GRPC_ERROR_NONE; |
||||
const bool pick_done = |
||||
request_router_->lb_policy_->PickLocked(&pick_, &error); |
||||
if (pick_done) { |
||||
// Pick completed synchronously.
|
||||
if (request_router_->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: pick completed synchronously", |
||||
request_router_, this); |
||||
} |
||||
GRPC_CLOSURE_RUN(on_route_done_, error); |
||||
GRPC_CALL_STACK_UNREF(owning_call_, "pick_callback"); |
||||
} else { |
||||
// Pick will be returned asynchronously.
|
||||
// Add the request's polling entity to the request_router's
|
||||
// interested_parties, so that the I/O of the LB policy can be done
|
||||
// under it. It will be removed in LbPickDoneLocked().
|
||||
MaybeAddCallToInterestedPartiesLocked(); |
||||
// Request notification on call cancellation.
|
||||
// We allocate a separate object to track cancellation, since the
|
||||
// cancellation closure might still be pending when we need to reuse
|
||||
// the memory in which this Request object is stored for a subsequent
|
||||
// retry attempt.
|
||||
pick_canceller_ = New<AsyncPickCanceller>(this); |
||||
} |
||||
} |
||||
|
||||
// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
|
||||
// Unrefs the LB policy and invokes on_route_done_.
|
||||
void RequestRouter::Request::LbPickDoneLocked(void* arg, grpc_error* error) { |
||||
Request* self = static_cast<Request*>(arg); |
||||
RequestRouter* request_router = self->request_router_; |
||||
if (request_router->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p request=%p: pick completed asynchronously", |
||||
request_router, self); |
||||
} |
||||
self->MaybeRemoveCallFromInterestedPartiesLocked(); |
||||
if (self->pick_canceller_ != nullptr) { |
||||
self->pick_canceller_->MarkFinishedLocked(); |
||||
} |
||||
GRPC_CLOSURE_RUN(self->on_route_done_, GRPC_ERROR_REF(error)); |
||||
GRPC_CALL_STACK_UNREF(self->owning_call_, "pick_callback"); |
||||
} |
||||
|
||||
//
|
||||
// RequestRouter::LbConnectivityWatcher
|
||||
//
|
||||
|
||||
class RequestRouter::LbConnectivityWatcher { |
||||
public: |
||||
LbConnectivityWatcher(RequestRouter* request_router, |
||||
grpc_connectivity_state state, |
||||
LoadBalancingPolicy* lb_policy, |
||||
grpc_channel_stack* owning_stack, |
||||
grpc_combiner* combiner) |
||||
: request_router_(request_router), |
||||
state_(state), |
||||
lb_policy_(lb_policy), |
||||
owning_stack_(owning_stack) { |
||||
GRPC_CHANNEL_STACK_REF(owning_stack_, "LbConnectivityWatcher"); |
||||
GRPC_CLOSURE_INIT(&on_changed_, &OnLbPolicyStateChangedLocked, this, |
||||
grpc_combiner_scheduler(combiner)); |
||||
lb_policy_->NotifyOnStateChangeLocked(&state_, &on_changed_); |
||||
} |
||||
|
||||
~LbConnectivityWatcher() { |
||||
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "LbConnectivityWatcher"); |
||||
} |
||||
|
||||
private: |
||||
static void OnLbPolicyStateChangedLocked(void* arg, grpc_error* error) { |
||||
LbConnectivityWatcher* self = static_cast<LbConnectivityWatcher*>(arg); |
||||
// If the notification is not for the current policy, we're stale,
|
||||
// so delete ourselves.
|
||||
if (self->lb_policy_ != self->request_router_->lb_policy_.get()) { |
||||
Delete(self); |
||||
return; |
||||
} |
||||
// Otherwise, process notification.
|
||||
if (self->request_router_->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: lb_policy=%p state changed to %s", |
||||
self->request_router_, self->lb_policy_, |
||||
grpc_connectivity_state_name(self->state_)); |
||||
} |
||||
self->request_router_->SetConnectivityStateLocked( |
||||
self->state_, GRPC_ERROR_REF(error), "lb_changed"); |
||||
// If shutting down, terminate watch.
|
||||
if (self->state_ == GRPC_CHANNEL_SHUTDOWN) { |
||||
Delete(self); |
||||
return; |
||||
} |
||||
// Renew watch.
|
||||
self->lb_policy_->NotifyOnStateChangeLocked(&self->state_, |
||||
&self->on_changed_); |
||||
} |
||||
|
||||
RequestRouter* request_router_; |
||||
grpc_connectivity_state state_; |
||||
// LB policy address. No ref held, so not safe to dereference unless
|
||||
// it happens to match request_router->lb_policy_.
|
||||
LoadBalancingPolicy* lb_policy_; |
||||
grpc_channel_stack* owning_stack_; |
||||
grpc_closure on_changed_; |
||||
}; |
||||
|
||||
//
|
||||
// RequestRounter::ReresolutionRequestHandler
|
||||
//
|
||||
|
||||
class RequestRouter::ReresolutionRequestHandler { |
||||
public: |
||||
ReresolutionRequestHandler(RequestRouter* request_router, |
||||
LoadBalancingPolicy* lb_policy, |
||||
grpc_channel_stack* owning_stack, |
||||
grpc_combiner* combiner) |
||||
: request_router_(request_router), |
||||
lb_policy_(lb_policy), |
||||
owning_stack_(owning_stack) { |
||||
GRPC_CHANNEL_STACK_REF(owning_stack_, "ReresolutionRequestHandler"); |
||||
GRPC_CLOSURE_INIT(&closure_, &OnRequestReresolutionLocked, this, |
||||
grpc_combiner_scheduler(combiner)); |
||||
lb_policy_->SetReresolutionClosureLocked(&closure_); |
||||
} |
||||
|
||||
private: |
||||
static void OnRequestReresolutionLocked(void* arg, grpc_error* error) { |
||||
ReresolutionRequestHandler* self = |
||||
static_cast<ReresolutionRequestHandler*>(arg); |
||||
RequestRouter* request_router = self->request_router_; |
||||
// If this invocation is for a stale LB policy, treat it as an LB shutdown
|
||||
// signal.
|
||||
if (self->lb_policy_ != request_router->lb_policy_.get() || |
||||
error != GRPC_ERROR_NONE || request_router->resolver_ == nullptr) { |
||||
GRPC_CHANNEL_STACK_UNREF(request_router->owning_stack_, |
||||
"ReresolutionRequestHandler"); |
||||
Delete(self); |
||||
return; |
||||
} |
||||
if (request_router->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: started name re-resolving", |
||||
request_router); |
||||
} |
||||
request_router->resolver_->RequestReresolutionLocked(); |
||||
// Give back the closure to the LB policy.
|
||||
self->lb_policy_->SetReresolutionClosureLocked(&self->closure_); |
||||
} |
||||
|
||||
RequestRouter* request_router_; |
||||
// LB policy address. No ref held, so not safe to dereference unless
|
||||
// it happens to match request_router->lb_policy_.
|
||||
LoadBalancingPolicy* lb_policy_; |
||||
grpc_channel_stack* owning_stack_; |
||||
grpc_closure closure_; |
||||
}; |
||||
|
||||
//
|
||||
// RequestRouter
|
||||
//
|
||||
|
||||
RequestRouter::RequestRouter( |
||||
grpc_channel_stack* owning_stack, grpc_combiner* combiner, |
||||
grpc_client_channel_factory* client_channel_factory, |
||||
grpc_pollset_set* interested_parties, TraceFlag* tracer, |
||||
ProcessResolverResultCallback process_resolver_result, |
||||
void* process_resolver_result_user_data, const char* target_uri, |
||||
const grpc_channel_args* args, grpc_error** error) |
||||
: owning_stack_(owning_stack), |
||||
combiner_(combiner), |
||||
client_channel_factory_(client_channel_factory), |
||||
interested_parties_(interested_parties), |
||||
tracer_(tracer), |
||||
process_resolver_result_(process_resolver_result), |
||||
process_resolver_result_user_data_(process_resolver_result_user_data) { |
||||
GRPC_CLOSURE_INIT(&on_resolver_result_changed_, |
||||
&RequestRouter::OnResolverResultChangedLocked, this, |
||||
grpc_combiner_scheduler(combiner)); |
||||
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, |
||||
"request_router"); |
||||
grpc_channel_args* new_args = nullptr; |
||||
if (process_resolver_result == nullptr) { |
||||
grpc_arg arg = grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0); |
||||
new_args = grpc_channel_args_copy_and_add(args, &arg, 1); |
||||
} |
||||
resolver_ = ResolverRegistry::CreateResolver( |
||||
target_uri, (new_args == nullptr ? args : new_args), interested_parties_, |
||||
combiner_); |
||||
grpc_channel_args_destroy(new_args); |
||||
if (resolver_ == nullptr) { |
||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); |
||||
} |
||||
} |
||||
|
||||
RequestRouter::~RequestRouter() { |
||||
if (resolver_ != nullptr) { |
||||
// The only way we can get here is if we never started resolving,
|
||||
// because we take a ref to the channel stack when we start
|
||||
// resolving and do not release it until the resolver callback is
|
||||
// invoked after the resolver shuts down.
|
||||
resolver_.reset(); |
||||
} |
||||
if (lb_policy_ != nullptr) { |
||||
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
||||
interested_parties_); |
||||
lb_policy_.reset(); |
||||
} |
||||
if (client_channel_factory_ != nullptr) { |
||||
grpc_client_channel_factory_unref(client_channel_factory_); |
||||
} |
||||
grpc_connectivity_state_destroy(&state_tracker_); |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
const char* GetChannelConnectivityStateChangeString( |
||||
grpc_connectivity_state state) { |
||||
switch (state) { |
||||
case GRPC_CHANNEL_IDLE: |
||||
return "Channel state change to IDLE"; |
||||
case GRPC_CHANNEL_CONNECTING: |
||||
return "Channel state change to CONNECTING"; |
||||
case GRPC_CHANNEL_READY: |
||||
return "Channel state change to READY"; |
||||
case GRPC_CHANNEL_TRANSIENT_FAILURE: |
||||
return "Channel state change to TRANSIENT_FAILURE"; |
||||
case GRPC_CHANNEL_SHUTDOWN: |
||||
return "Channel state change to SHUTDOWN"; |
||||
} |
||||
GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
void RequestRouter::SetConnectivityStateLocked(grpc_connectivity_state state, |
||||
grpc_error* error, |
||||
const char* reason) { |
||||
if (lb_policy_ != nullptr) { |
||||
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||
// Cancel picks with wait_for_ready=false.
|
||||
lb_policy_->CancelMatchingPicksLocked( |
||||
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, |
||||
/* check= */ 0, GRPC_ERROR_REF(error)); |
||||
} else if (state == GRPC_CHANNEL_SHUTDOWN) { |
||||
// Cancel all picks.
|
||||
lb_policy_->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, |
||||
GRPC_ERROR_REF(error)); |
||||
} |
||||
} |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: setting connectivity state to %s", |
||||
this, grpc_connectivity_state_name(state)); |
||||
} |
||||
if (channelz_node_ != nullptr) { |
||||
channelz_node_->AddTraceEvent( |
||||
channelz::ChannelTrace::Severity::Info, |
||||
grpc_slice_from_static_string( |
||||
GetChannelConnectivityStateChangeString(state))); |
||||
} |
||||
grpc_connectivity_state_set(&state_tracker_, state, error, reason); |
||||
} |
||||
|
||||
void RequestRouter::StartResolvingLocked() { |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: starting name resolution", this); |
||||
} |
||||
GPR_ASSERT(!started_resolving_); |
||||
started_resolving_ = true; |
||||
GRPC_CHANNEL_STACK_REF(owning_stack_, "resolver"); |
||||
resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_); |
||||
} |
||||
|
||||
// Invoked from the resolver NextLocked() callback when the resolver
|
||||
// is shutting down.
|
||||
void RequestRouter::OnResolverShutdownLocked(grpc_error* error) { |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: shutting down", this); |
||||
} |
||||
if (lb_policy_ != nullptr) { |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, |
||||
lb_policy_.get()); |
||||
} |
||||
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
||||
interested_parties_); |
||||
lb_policy_.reset(); |
||||
} |
||||
if (resolver_ != nullptr) { |
||||
// This should never happen; it can only be triggered by a resolver
|
||||
// implementation spotaneously deciding to report shutdown without
|
||||
// being orphaned. This code is included just to be defensive.
|
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p: spontaneous shutdown from resolver %p", this, |
||||
resolver_.get()); |
||||
} |
||||
resolver_.reset(); |
||||
SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, |
||||
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
||||
"Resolver spontaneous shutdown", &error, 1), |
||||
"resolver_spontaneous_shutdown"); |
||||
} |
||||
grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, |
||||
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
||||
"Channel disconnected", &error, 1)); |
||||
GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); |
||||
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "resolver"); |
||||
grpc_channel_args_destroy(resolver_result_); |
||||
resolver_result_ = nullptr; |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
// Creates a new LB policy, replacing any previous one.
|
||||
// If the new policy is created successfully, sets *connectivity_state and
|
||||
// *connectivity_error to its initial connectivity state; otherwise,
|
||||
// leaves them unchanged.
|
||||
void RequestRouter::CreateNewLbPolicyLocked( |
||||
const char* lb_policy_name, grpc_json* lb_config, |
||||
grpc_connectivity_state* connectivity_state, |
||||
grpc_error** connectivity_error, TraceStringVector* trace_strings) { |
||||
LoadBalancingPolicy::Args lb_policy_args; |
||||
lb_policy_args.combiner = combiner_; |
||||
lb_policy_args.client_channel_factory = client_channel_factory_; |
||||
lb_policy_args.args = resolver_result_; |
||||
lb_policy_args.lb_config = lb_config; |
||||
OrphanablePtr<LoadBalancingPolicy> new_lb_policy = |
||||
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(lb_policy_name, |
||||
lb_policy_args); |
||||
if (GPR_UNLIKELY(new_lb_policy == nullptr)) { |
||||
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); |
||||
if (channelz_node_ != nullptr) { |
||||
char* str; |
||||
gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); |
||||
trace_strings->push_back(str); |
||||
} |
||||
} else { |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: created new LB policy \"%s\" (%p)", |
||||
this, lb_policy_name, new_lb_policy.get()); |
||||
} |
||||
if (channelz_node_ != nullptr) { |
||||
char* str; |
||||
gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); |
||||
trace_strings->push_back(str); |
||||
} |
||||
// Swap out the LB policy and update the fds in interested_parties_.
|
||||
if (lb_policy_ != nullptr) { |
||||
if (tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, |
||||
lb_policy_.get()); |
||||
} |
||||
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
||||
interested_parties_); |
||||
lb_policy_->HandOffPendingPicksLocked(new_lb_policy.get()); |
||||
} |
||||
lb_policy_ = std::move(new_lb_policy); |
||||
grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), |
||||
interested_parties_); |
||||
// Create re-resolution request handler for the new LB policy. It
|
||||
// will delete itself when no longer needed.
|
||||
New<ReresolutionRequestHandler>(this, lb_policy_.get(), owning_stack_, |
||||
combiner_); |
||||
// Get the new LB policy's initial connectivity state and start a
|
||||
// connectivity watch.
|
||||
GRPC_ERROR_UNREF(*connectivity_error); |
||||
*connectivity_state = |
||||
lb_policy_->CheckConnectivityLocked(connectivity_error); |
||||
if (exit_idle_when_lb_policy_arrives_) { |
||||
lb_policy_->ExitIdleLocked(); |
||||
exit_idle_when_lb_policy_arrives_ = false; |
||||
} |
||||
// Create new watcher. It will delete itself when done.
|
||||
New<LbConnectivityWatcher>(this, *connectivity_state, lb_policy_.get(), |
||||
owning_stack_, combiner_); |
||||
} |
||||
} |
||||
|
||||
void RequestRouter::MaybeAddTraceMessagesForAddressChangesLocked( |
||||
TraceStringVector* trace_strings) { |
||||
const ServerAddressList* addresses = |
||||
FindServerAddressListChannelArg(resolver_result_); |
||||
const bool resolution_contains_addresses = |
||||
addresses != nullptr && addresses->size() > 0; |
||||
if (!resolution_contains_addresses && |
||||
previous_resolution_contained_addresses_) { |
||||
trace_strings->push_back(gpr_strdup("Address list became empty")); |
||||
} else if (resolution_contains_addresses && |
||||
!previous_resolution_contained_addresses_) { |
||||
trace_strings->push_back(gpr_strdup("Address list became non-empty")); |
||||
} |
||||
previous_resolution_contained_addresses_ = resolution_contains_addresses; |
||||
} |
||||
|
||||
void RequestRouter::ConcatenateAndAddChannelTraceLocked( |
||||
TraceStringVector* trace_strings) const { |
||||
if (!trace_strings->empty()) { |
||||
gpr_strvec v; |
||||
gpr_strvec_init(&v); |
||||
gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); |
||||
bool is_first = 1; |
||||
for (size_t i = 0; i < trace_strings->size(); ++i) { |
||||
if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); |
||||
is_first = false; |
||||
gpr_strvec_add(&v, (*trace_strings)[i]); |
||||
} |
||||
char* flat; |
||||
size_t flat_len = 0; |
||||
flat = gpr_strvec_flatten(&v, &flat_len); |
||||
channelz_node_->AddTraceEvent( |
||||
grpc_core::channelz::ChannelTrace::Severity::Info, |
||||
grpc_slice_new(flat, flat_len, gpr_free)); |
||||
gpr_strvec_destroy(&v); |
||||
} |
||||
} |
||||
|
||||
// Callback invoked when a resolver result is available.
|
||||
void RequestRouter::OnResolverResultChangedLocked(void* arg, |
||||
grpc_error* error) { |
||||
RequestRouter* self = static_cast<RequestRouter*>(arg); |
||||
if (self->tracer_->enabled()) { |
||||
const char* disposition = |
||||
self->resolver_result_ != nullptr |
||||
? "" |
||||
: (error == GRPC_ERROR_NONE ? " (transient error)" |
||||
: " (resolver shutdown)"); |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p: got resolver result: resolver_result=%p " |
||||
"error=%s%s", |
||||
self, self->resolver_result_, grpc_error_string(error), |
||||
disposition); |
||||
} |
||||
// Handle shutdown.
|
||||
if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) { |
||||
self->OnResolverShutdownLocked(GRPC_ERROR_REF(error)); |
||||
return; |
||||
} |
||||
// Data used to set the channel's connectivity state.
|
||||
bool set_connectivity_state = true; |
||||
// We only want to trace the address resolution in the follow cases:
|
||||
// (a) Address resolution resulted in service config change.
|
||||
// (b) Address resolution that causes number of backends to go from
|
||||
// zero to non-zero.
|
||||
// (c) Address resolution that causes number of backends to go from
|
||||
// non-zero to zero.
|
||||
// (d) Address resolution that causes a new LB policy to be created.
|
||||
//
|
||||
// we track a list of strings to eventually be concatenated and traced.
|
||||
TraceStringVector trace_strings; |
||||
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
||||
grpc_error* connectivity_error = |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); |
||||
// resolver_result_ will be null in the case of a transient
|
||||
// resolution error. In that case, we don't have any new result to
|
||||
// process, which means that we keep using the previous result (if any).
|
||||
if (self->resolver_result_ == nullptr) { |
||||
if (self->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, "request_router=%p: resolver transient failure", self); |
||||
} |
||||
// Don't override connectivity state if we already have an LB policy.
|
||||
if (self->lb_policy_ != nullptr) set_connectivity_state = false; |
||||
} else { |
||||
// Parse the resolver result.
|
||||
const char* lb_policy_name = nullptr; |
||||
grpc_json* lb_policy_config = nullptr; |
||||
const bool service_config_changed = self->process_resolver_result_( |
||||
self->process_resolver_result_user_data_, *self->resolver_result_, |
||||
&lb_policy_name, &lb_policy_config); |
||||
GPR_ASSERT(lb_policy_name != nullptr); |
||||
// Check to see if we're already using the right LB policy.
|
||||
const bool lb_policy_name_changed = |
||||
self->lb_policy_ == nullptr || |
||||
strcmp(self->lb_policy_->name(), lb_policy_name) != 0; |
||||
if (self->lb_policy_ != nullptr && !lb_policy_name_changed) { |
||||
// Continue using the same LB policy. Update with new addresses.
|
||||
if (self->tracer_->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"request_router=%p: updating existing LB policy \"%s\" (%p)", |
||||
self, lb_policy_name, self->lb_policy_.get()); |
||||
} |
||||
self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); |
||||
// No need to set the channel's connectivity state; the existing
|
||||
// watch on the LB policy will take care of that.
|
||||
set_connectivity_state = false; |
||||
} else { |
||||
// Instantiate new LB policy.
|
||||
self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config, |
||||
&connectivity_state, &connectivity_error, |
||||
&trace_strings); |
||||
} |
||||
// Add channel trace event.
|
||||
if (self->channelz_node_ != nullptr) { |
||||
if (service_config_changed) { |
||||
// TODO(ncteisen): might be worth somehow including a snippet of the
|
||||
// config in the trace, at the risk of bloating the trace logs.
|
||||
trace_strings.push_back(gpr_strdup("Service config changed")); |
||||
} |
||||
self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings); |
||||
self->ConcatenateAndAddChannelTraceLocked(&trace_strings); |
||||
} |
||||
// Clean up.
|
||||
grpc_channel_args_destroy(self->resolver_result_); |
||||
self->resolver_result_ = nullptr; |
||||
} |
||||
// Set the channel's connectivity state if needed.
|
||||
if (set_connectivity_state) { |
||||
self->SetConnectivityStateLocked(connectivity_state, connectivity_error, |
||||
"resolver_result"); |
||||
} else { |
||||
GRPC_ERROR_UNREF(connectivity_error); |
||||
} |
||||
// Invoke closures that were waiting for results and renew the watch.
|
||||
GRPC_CLOSURE_LIST_SCHED(&self->waiting_for_resolver_result_closures_); |
||||
self->resolver_->NextLocked(&self->resolver_result_, |
||||
&self->on_resolver_result_changed_); |
||||
} |
||||
|
||||
void RequestRouter::RouteCallLocked(Request* request) { |
||||
GPR_ASSERT(request->pick_.connected_subchannel == nullptr); |
||||
request->request_router_ = this; |
||||
if (lb_policy_ != nullptr) { |
||||
// We already have resolver results, so process the service config
|
||||
// and start an LB pick.
|
||||
request->ProcessServiceConfigAndStartLbPickLocked(); |
||||
} else if (resolver_ == nullptr) { |
||||
GRPC_CLOSURE_RUN(request->on_route_done_, |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
||||
} else { |
||||
// We do not yet have an LB policy, so wait for a resolver result.
|
||||
if (!started_resolving_) { |
||||
StartResolvingLocked(); |
||||
} |
||||
// Create a new waiter, which will delete itself when done.
|
||||
New<Request::ResolverResultWaiter>(request); |
||||
// Add the request's polling entity to the request_router's
|
||||
// interested_parties, so that the I/O of the resolver can be done
|
||||
// under it. It will be removed in LbPickDoneLocked().
|
||||
request->MaybeAddCallToInterestedPartiesLocked(); |
||||
} |
||||
} |
||||
|
||||
void RequestRouter::ShutdownLocked(grpc_error* error) { |
||||
if (resolver_ != nullptr) { |
||||
SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), |
||||
"disconnect"); |
||||
resolver_.reset(); |
||||
if (!started_resolving_) { |
||||
grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, |
||||
GRPC_ERROR_REF(error)); |
||||
GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); |
||||
} |
||||
if (lb_policy_ != nullptr) { |
||||
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
||||
interested_parties_); |
||||
lb_policy_.reset(); |
||||
} |
||||
} |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
grpc_connectivity_state RequestRouter::GetConnectivityState() { |
||||
return grpc_connectivity_state_check(&state_tracker_); |
||||
} |
||||
|
||||
void RequestRouter::NotifyOnConnectivityStateChange( |
||||
grpc_connectivity_state* state, grpc_closure* closure) { |
||||
grpc_connectivity_state_notify_on_state_change(&state_tracker_, state, |
||||
closure); |
||||
} |
||||
|
||||
void RequestRouter::ExitIdleLocked() { |
||||
if (lb_policy_ != nullptr) { |
||||
lb_policy_->ExitIdleLocked(); |
||||
} else { |
||||
exit_idle_when_lb_policy_arrives_ = true; |
||||
if (!started_resolving_ && resolver_ != nullptr) { |
||||
StartResolvingLocked(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void RequestRouter::ResetConnectionBackoffLocked() { |
||||
if (resolver_ != nullptr) { |
||||
resolver_->ResetBackoffLocked(); |
||||
resolver_->RequestReresolutionLocked(); |
||||
} |
||||
if (lb_policy_ != nullptr) { |
||||
lb_policy_->ResetBackoffLocked(); |
||||
} |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,177 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" |
||||
#include "src/core/ext/filters/client_channel/client_channel_factory.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||
#include "src/core/ext/filters/client_channel/resolver.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/inlined_vector.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/iomgr/call_combiner.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/iomgr/pollset_set.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class RequestRouter { |
||||
public: |
||||
class Request { |
||||
public: |
||||
// Synchronous callback that applies the service config to a call.
|
||||
// Returns false if the call should be failed.
|
||||
typedef bool (*ApplyServiceConfigCallback)(void* user_data); |
||||
|
||||
Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner, |
||||
grpc_polling_entity* pollent, |
||||
grpc_metadata_batch* send_initial_metadata, |
||||
uint32_t* send_initial_metadata_flags, |
||||
ApplyServiceConfigCallback apply_service_config, |
||||
void* apply_service_config_user_data, grpc_closure* on_route_done); |
||||
|
||||
~Request(); |
||||
|
||||
// TODO(roth): It seems a bit ugly to expose this member in a
|
||||
// non-const way. Find a better API to avoid this.
|
||||
LoadBalancingPolicy::PickState* pick() { return &pick_; } |
||||
|
||||
private: |
||||
friend class RequestRouter; |
||||
|
||||
class ResolverResultWaiter; |
||||
class AsyncPickCanceller; |
||||
|
||||
void ProcessServiceConfigAndStartLbPickLocked(); |
||||
void StartLbPickLocked(); |
||||
static void LbPickDoneLocked(void* arg, grpc_error* error); |
||||
|
||||
void MaybeAddCallToInterestedPartiesLocked(); |
||||
void MaybeRemoveCallFromInterestedPartiesLocked(); |
||||
|
||||
// Populated by caller.
|
||||
grpc_call_stack* owning_call_; |
||||
grpc_call_combiner* call_combiner_; |
||||
grpc_polling_entity* pollent_; |
||||
ApplyServiceConfigCallback apply_service_config_; |
||||
void* apply_service_config_user_data_; |
||||
grpc_closure* on_route_done_; |
||||
LoadBalancingPolicy::PickState pick_; |
||||
|
||||
// Internal state.
|
||||
RequestRouter* request_router_ = nullptr; |
||||
bool pollent_added_to_interested_parties_ = false; |
||||
grpc_closure on_pick_done_; |
||||
AsyncPickCanceller* pick_canceller_ = nullptr; |
||||
}; |
||||
|
||||
// Synchronous callback that takes the service config JSON string and
|
||||
// LB policy name.
|
||||
// Returns true if the service config has changed since the last result.
|
||||
typedef bool (*ProcessResolverResultCallback)(void* user_data, |
||||
const grpc_channel_args& args, |
||||
const char** lb_policy_name, |
||||
grpc_json** lb_policy_config); |
||||
|
||||
RequestRouter(grpc_channel_stack* owning_stack, grpc_combiner* combiner, |
||||
grpc_client_channel_factory* client_channel_factory, |
||||
grpc_pollset_set* interested_parties, TraceFlag* tracer, |
||||
ProcessResolverResultCallback process_resolver_result, |
||||
void* process_resolver_result_user_data, const char* target_uri, |
||||
const grpc_channel_args* args, grpc_error** error); |
||||
|
||||
~RequestRouter(); |
||||
|
||||
void set_channelz_node(channelz::ClientChannelNode* channelz_node) { |
||||
channelz_node_ = channelz_node; |
||||
} |
||||
|
||||
void RouteCallLocked(Request* request); |
||||
|
||||
// TODO(roth): Add methods to cancel picks.
|
||||
|
||||
void ShutdownLocked(grpc_error* error); |
||||
|
||||
void ExitIdleLocked(); |
||||
void ResetConnectionBackoffLocked(); |
||||
|
||||
grpc_connectivity_state GetConnectivityState(); |
||||
void NotifyOnConnectivityStateChange(grpc_connectivity_state* state, |
||||
grpc_closure* closure); |
||||
|
||||
LoadBalancingPolicy* lb_policy() const { return lb_policy_.get(); } |
||||
|
||||
private: |
||||
using TraceStringVector = grpc_core::InlinedVector<char*, 3>; |
||||
|
||||
class ReresolutionRequestHandler; |
||||
class LbConnectivityWatcher; |
||||
|
||||
void StartResolvingLocked(); |
||||
void OnResolverShutdownLocked(grpc_error* error); |
||||
void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config, |
||||
grpc_connectivity_state* connectivity_state, |
||||
grpc_error** connectivity_error, |
||||
TraceStringVector* trace_strings); |
||||
void MaybeAddTraceMessagesForAddressChangesLocked( |
||||
TraceStringVector* trace_strings); |
||||
void ConcatenateAndAddChannelTraceLocked( |
||||
TraceStringVector* trace_strings) const; |
||||
static void OnResolverResultChangedLocked(void* arg, grpc_error* error); |
||||
|
||||
void SetConnectivityStateLocked(grpc_connectivity_state state, |
||||
grpc_error* error, const char* reason); |
||||
|
||||
// Passed in from caller at construction time.
|
||||
grpc_channel_stack* owning_stack_; |
||||
grpc_combiner* combiner_; |
||||
grpc_client_channel_factory* client_channel_factory_; |
||||
grpc_pollset_set* interested_parties_; |
||||
TraceFlag* tracer_; |
||||
|
||||
channelz::ClientChannelNode* channelz_node_ = nullptr; |
||||
|
||||
// Resolver and associated state.
|
||||
OrphanablePtr<Resolver> resolver_; |
||||
ProcessResolverResultCallback process_resolver_result_; |
||||
void* process_resolver_result_user_data_; |
||||
bool started_resolving_ = false; |
||||
grpc_channel_args* resolver_result_ = nullptr; |
||||
bool previous_resolution_contained_addresses_ = false; |
||||
grpc_closure_list waiting_for_resolver_result_closures_; |
||||
grpc_closure on_resolver_result_changed_; |
||||
|
||||
// LB policy and associated state.
|
||||
OrphanablePtr<LoadBalancingPolicy> lb_policy_; |
||||
bool exit_idle_when_lb_policy_arrives_ = false; |
||||
|
||||
grpc_connectivity_state_tracker state_tracker_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H */ |
@ -0,0 +1,97 @@ |
||||
# Copyright 2018 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" |
||||
|
||||
import argparse |
||||
import os |
||||
import threading |
||||
import time |
||||
import logging |
||||
|
||||
import grpc |
||||
from tests.unit import test_common |
||||
|
||||
from concurrent import futures |
||||
from six.moves import queue |
||||
|
||||
WAIT_TIME = 1000 |
||||
|
||||
REQUEST = b'request' |
||||
RESPONSE = b'response' |
||||
|
||||
SERVER_RAISES_EXCEPTION = 'server_raises_exception' |
||||
SERVER_DEALLOCATED = 'server_deallocated' |
||||
SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' |
||||
|
||||
FORK_EXIT = '/test/ForkExit' |
||||
|
||||
|
||||
def fork_and_exit(request, servicer_context): |
||||
pid = os.fork() |
||||
if pid == 0: |
||||
os._exit(0) |
||||
return RESPONSE |
||||
|
||||
|
||||
class GenericHandler(grpc.GenericRpcHandler): |
||||
|
||||
def service(self, handler_call_details): |
||||
if handler_call_details.method == FORK_EXIT: |
||||
return grpc.unary_unary_rpc_method_handler(fork_and_exit) |
||||
else: |
||||
return None |
||||
|
||||
|
||||
def run_server(port_queue): |
||||
server = test_common.test_server() |
||||
port = server.add_insecure_port('[::]:0') |
||||
port_queue.put(port) |
||||
server.add_generic_rpc_handlers((GenericHandler(),)) |
||||
server.start() |
||||
# threading.Event.wait() does not exhibit the bug identified in |
||||
# https://github.com/grpc/grpc/issues/17093, sleep instead |
||||
time.sleep(WAIT_TIME) |
||||
|
||||
|
||||
def run_test(args): |
||||
if args.scenario == SERVER_RAISES_EXCEPTION: |
||||
server = test_common.test_server() |
||||
server.start() |
||||
raise Exception() |
||||
elif args.scenario == SERVER_DEALLOCATED: |
||||
server = test_common.test_server() |
||||
server.start() |
||||
server.__del__() |
||||
while server._state.stage != grpc._server._ServerStage.STOPPED: |
||||
pass |
||||
elif args.scenario == SERVER_FORK_CAN_EXIT: |
||||
port_queue = queue.Queue() |
||||
thread = threading.Thread(target=run_server, args=(port_queue,)) |
||||
thread.daemon = True |
||||
thread.start() |
||||
port = port_queue.get() |
||||
channel = grpc.insecure_channel('localhost:%d' % port) |
||||
multi_callable = channel.unary_unary(FORK_EXIT) |
||||
result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) |
||||
os.wait() |
||||
else: |
||||
raise ValueError('unknown test scenario') |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig() |
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument('scenario', type=str) |
||||
args = parser.parse_args() |
||||
run_test(args) |
@ -0,0 +1,90 @@ |
||||
# Copyright 2018 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Tests clean shutdown of server on various interpreter exit conditions. |
||||
|
||||
The tests in this module spawn a subprocess for each test case, the |
||||
test is considered successful if it doesn't hang/timeout. |
||||
""" |
||||
|
||||
import atexit |
||||
import os |
||||
import subprocess |
||||
import sys |
||||
import threading |
||||
import unittest |
||||
import logging |
||||
|
||||
from tests.unit import _server_shutdown_scenarios |
||||
|
||||
SCENARIO_FILE = os.path.abspath( |
||||
os.path.join( |
||||
os.path.dirname(os.path.realpath(__file__)), |
||||
'_server_shutdown_scenarios.py')) |
||||
INTERPRETER = sys.executable |
||||
BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] |
||||
|
||||
processes = [] |
||||
process_lock = threading.Lock() |
||||
|
||||
|
||||
# Make sure we attempt to clean up any |
||||
# processes we may have left running |
||||
def cleanup_processes(): |
||||
with process_lock: |
||||
for process in processes: |
||||
try: |
||||
process.kill() |
||||
except Exception: # pylint: disable=broad-except |
||||
pass |
||||
|
||||
|
||||
atexit.register(cleanup_processes) |
||||
|
||||
|
||||
def wait(process): |
||||
with process_lock: |
||||
processes.append(process) |
||||
process.wait() |
||||
|
||||
|
||||
class ServerShutdown(unittest.TestCase): |
||||
|
||||
# Currently we shut down a server (if possible) after the Python server |
||||
# instance is garbage collected. This behavior may change in the future. |
||||
def test_deallocated_server_stops(self): |
||||
process = subprocess.Popen( |
||||
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], |
||||
stdout=sys.stdout, |
||||
stderr=sys.stderr) |
||||
wait(process) |
||||
|
||||
def test_server_exception_exits(self): |
||||
process = subprocess.Popen( |
||||
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], |
||||
stdout=sys.stdout, |
||||
stderr=sys.stderr) |
||||
wait(process) |
||||
|
||||
@unittest.skipIf(os.name == 'nt', 'fork not supported on windows') |
||||
def test_server_fork_can_exit(self): |
||||
process = subprocess.Popen( |
||||
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], |
||||
stdout=sys.stdout, |
||||
stderr=sys.stderr) |
||||
wait(process) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig() |
||||
unittest.main(verbosity=2) |
@ -0,0 +1,61 @@ |
||||
#!/usr/bin/env ruby |
||||
|
||||
# Copyright 2015 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
require_relative './end2end_common' |
||||
|
||||
# Test client. Sends RPC's as normal but process also has signal handlers |
||||
class SigHandlingClientController < ClientControl::ClientController::Service |
||||
def initialize(stub) |
||||
@stub = stub |
||||
end |
||||
|
||||
def do_echo_rpc(req, _) |
||||
response = @stub.echo(Echo::EchoRequest.new(request: req.request)) |
||||
fail 'bad response' unless response.response == req.request |
||||
ClientControl::Void.new |
||||
end |
||||
end |
||||
|
||||
def main |
||||
client_control_port = '' |
||||
server_port = '' |
||||
OptionParser.new do |opts| |
||||
opts.on('--client_control_port=P', String) do |p| |
||||
client_control_port = p |
||||
end |
||||
opts.on('--server_port=P', String) do |p| |
||||
server_port = p |
||||
end |
||||
end.parse! |
||||
|
||||
# Allow a few seconds to be safe. |
||||
srv = new_rpc_server_for_testing |
||||
srv.add_http2_port("0.0.0.0:#{client_control_port}", |
||||
:this_port_is_insecure) |
||||
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", |
||||
:this_channel_is_insecure) |
||||
control_service = SigHandlingClientController.new(stub) |
||||
srv.handle(control_service) |
||||
server_thread = Thread.new do |
||||
srv.run_till_terminated_or_interrupted(['int']) |
||||
end |
||||
srv.wait_till_running |
||||
# send a first RPC to notify the parent process that we've started |
||||
stub.echo(Echo::EchoRequest.new(request: 'client/child started')) |
||||
server_thread.join |
||||
end |
||||
|
||||
main |
@ -0,0 +1,83 @@ |
||||
#!/usr/bin/env ruby |
||||
|
||||
# Copyright 2016 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# smoke test for a grpc-using app that receives and |
||||
# handles process-ending signals |
||||
|
||||
require_relative './end2end_common' |
||||
|
||||
# A service that calls back it's received_rpc_callback |
||||
# upon receiving an RPC. Used for synchronization/waiting |
||||
# for child process to start. |
||||
class ClientStartedService < Echo::EchoServer::Service |
||||
def initialize(received_rpc_callback) |
||||
@received_rpc_callback = received_rpc_callback |
||||
end |
||||
|
||||
def echo(echo_req, _) |
||||
@received_rpc_callback.call unless @received_rpc_callback.nil? |
||||
@received_rpc_callback = nil |
||||
Echo::EchoReply.new(response: echo_req.request) |
||||
end |
||||
end |
||||
|
||||
def main |
||||
STDERR.puts 'start server' |
||||
client_started = false |
||||
client_started_mu = Mutex.new |
||||
client_started_cv = ConditionVariable.new |
||||
received_rpc_callback = proc do |
||||
client_started_mu.synchronize do |
||||
client_started = true |
||||
client_started_cv.signal |
||||
end |
||||
end |
||||
|
||||
client_started_service = ClientStartedService.new(received_rpc_callback) |
||||
server_runner = ServerRunner.new(client_started_service) |
||||
server_port = server_runner.run |
||||
STDERR.puts 'start client' |
||||
control_stub, client_pid = start_client('graceful_sig_handling_client.rb', server_port) |
||||
|
||||
client_started_mu.synchronize do |
||||
client_started_cv.wait(client_started_mu) until client_started |
||||
end |
||||
|
||||
control_stub.do_echo_rpc( |
||||
ClientControl::DoEchoRpcRequest.new(request: 'hello')) |
||||
|
||||
STDERR.puts 'killing client' |
||||
Process.kill('SIGINT', client_pid) |
||||
Process.wait(client_pid) |
||||
client_exit_status = $CHILD_STATUS |
||||
|
||||
if client_exit_status.exited? |
||||
if client_exit_status.exitstatus != 0 |
||||
STDERR.puts 'Client did not close gracefully' |
||||
exit(1) |
||||
end |
||||
else |
||||
STDERR.puts 'Client did not close gracefully' |
||||
exit(1) |
||||
end |
||||
|
||||
STDERR.puts 'Client ended gracefully' |
||||
|
||||
# no need to call cleanup, client should already be dead |
||||
server_runner.stop |
||||
end |
||||
|
||||
main |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue