mirror of https://github.com/grpc/grpc.git
commit
aeaca27a6d
82 changed files with 3329 additions and 2755 deletions
@ -0,0 +1,38 @@ |
||||
# Copyright 2024 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
cc_binary( |
||||
name = "client", |
||||
srcs = ["client.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/strings:string_view", |
||||
], |
||||
) |
||||
|
||||
cc_binary( |
||||
name = "server", |
||||
srcs = ["server.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//:grpc++_reflection", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/strings:str_format", |
||||
], |
||||
) |
@ -0,0 +1,73 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
# cmake build file for C++ retry example. |
||||
# Assumes protobuf and gRPC have been installed using cmake. |
||||
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build |
||||
# that automatically builds all the dependencies before building retry. |
||||
|
||||
cmake_minimum_required(VERSION 3.8) |
||||
|
||||
project(Retry C CXX) |
||||
|
||||
include(../cmake/common.cmake) |
||||
|
||||
# Proto file |
||||
get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE) |
||||
get_filename_component(hw_proto_path "${hw_proto}" PATH) |
||||
|
||||
# Generated sources |
||||
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") |
||||
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") |
||||
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") |
||||
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") |
||||
add_custom_command( |
||||
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" |
||||
COMMAND ${_PROTOBUF_PROTOC} |
||||
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
-I "${hw_proto_path}" |
||||
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" |
||||
"${hw_proto}" |
||||
DEPENDS "${hw_proto}") |
||||
|
||||
# Include generated *.pb.h files |
||||
include_directories("${CMAKE_CURRENT_BINARY_DIR}") |
||||
|
||||
# hw_grpc_proto |
||||
add_library(hw_grpc_proto |
||||
${hw_grpc_srcs} |
||||
${hw_grpc_hdrs} |
||||
${hw_proto_srcs} |
||||
${hw_proto_hdrs}) |
||||
target_link_libraries(hw_grpc_proto |
||||
absl::check |
||||
${_REFLECTION} |
||||
${_GRPC_GRPCPP} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
|
||||
# Targets (client|server) |
||||
foreach(_target |
||||
client server) |
||||
add_executable(${_target} "${_target}.cc") |
||||
target_link_libraries(${_target} |
||||
hw_grpc_proto |
||||
absl::check |
||||
absl::flags |
||||
absl::flags_parse |
||||
absl::log |
||||
${_REFLECTION} |
||||
${_GRPC_GRPCPP} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
endforeach() |
@ -0,0 +1,69 @@ |
||||
# Retry |
||||
|
||||
This example shows how to enable and configure retry on gRPC clients. |
||||
|
||||
## Documentation |
||||
|
||||
[gRFC for client-side retry support](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) |
||||
|
||||
## Try it |
||||
|
||||
This example includes a service implementation that fails requests three times with status |
||||
code `Unavailable`, then passes the fourth. The client is configured to make four retry attempts |
||||
when receiving an `Unavailable` status code. |
||||
|
||||
First start the server: |
||||
|
||||
```bash |
||||
$ ./server |
||||
``` |
||||
|
||||
Then run the client: |
||||
|
||||
```bash |
||||
$ ./client |
||||
``` |
||||
|
||||
Expected server output: |
||||
|
||||
``` |
||||
Server listening on 0.0.0.0:50052 |
||||
return UNAVAILABLE |
||||
return UNAVAILABLE |
||||
return UNAVAILABLE |
||||
return OK |
||||
``` |
||||
|
||||
Expected client output: |
||||
|
||||
``` |
||||
Greeter received: Hello world |
||||
``` |
||||
|
||||
## Usage |
||||
|
||||
### Define your retry policy |
||||
|
||||
Retry is enabled via the service config, which can be provided by the name resolver or |
||||
a [GRPC_ARG_SERVICE_CONFIG](https://github.com/grpc/grpc/blob/master/include/grpc/impl/channel_arg_names.h#L207-L209) channel argument. In the below config, we set retry policy for the "helloworld.Greeter" service. |
||||
|
||||
`maxAttempts`: how many times to attempt the RPC before failing. |
||||
|
||||
`initialBackoff`, `maxBackoff`, `backoffMultiplier`: configures delay between attempts. |
||||
|
||||
`retryableStatusCodes`: Retry only when receiving these status codes. |
||||
|
||||
```c++ |
||||
constexpr absl::string_view kRetryPolicy = |
||||
"{\"methodConfig\" : [{" |
||||
" \"name\" : [{\"service\": \"helloworld.Greeter\"}]," |
||||
" \"waitForReady\": true," |
||||
" \"retryPolicy\": {" |
||||
" \"maxAttempts\": 4," |
||||
" \"initialBackoff\": \"1s\"," |
||||
" \"maxBackoff\": \"120s\"," |
||||
" \"backoffMultiplier\": 1.0," |
||||
" \"retryableStatusCodes\": [\"UNAVAILABLE\"]" |
||||
" }" |
||||
"}]}"; |
||||
``` |
@ -0,0 +1,98 @@ |
||||
/*
|
||||
* Copyright 2024 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/support/status.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
using grpc::Channel; |
||||
using grpc::ClientContext; |
||||
using grpc::Status; |
||||
using helloworld::Greeter; |
||||
using helloworld::HelloReply; |
||||
using helloworld::HelloRequest; |
||||
|
||||
constexpr absl::string_view kTargetAddress = "localhost:50052"; |
||||
|
||||
// clang-format off
|
||||
constexpr absl::string_view kRetryPolicy = |
||||
"{\"methodConfig\" : [{" |
||||
" \"name\" : [{\"service\": \"helloworld.Greeter\"}]," |
||||
" \"waitForReady\": true," |
||||
" \"retryPolicy\": {" |
||||
" \"maxAttempts\": 4," |
||||
" \"initialBackoff\": \"1s\"," |
||||
" \"maxBackoff\": \"120s\"," |
||||
" \"backoffMultiplier\": 1.0," |
||||
" \"retryableStatusCodes\": [\"UNAVAILABLE\"]" |
||||
" }" |
||||
"}]}"; |
||||
// clang-format on
|
||||
|
||||
class GreeterClient { |
||||
public: |
||||
GreeterClient(std::shared_ptr<Channel> channel) |
||||
: stub_(Greeter::NewStub(channel)) {} |
||||
|
||||
// Assembles the client's payload, sends it and presents the response back
|
||||
// from the server.
|
||||
std::string SayHello(const std::string& user) { |
||||
// Data we are sending to the server.
|
||||
HelloRequest request; |
||||
request.set_name(user); |
||||
// Container for the data we expect from the server.
|
||||
HelloReply reply; |
||||
// Context for the client. It could be used to convey extra information to
|
||||
// the server and/or tweak certain RPC behaviors.
|
||||
ClientContext context; |
||||
// The actual RPC.
|
||||
Status status = stub_->SayHello(&context, request, &reply); |
||||
// Act upon its status.
|
||||
if (status.ok()) { |
||||
return reply.message(); |
||||
} else { |
||||
std::cout << status.error_code() << ": " << status.error_message() |
||||
<< std::endl; |
||||
return "RPC failed"; |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::unique_ptr<Greeter::Stub> stub_; |
||||
}; |
||||
|
||||
int main() { |
||||
auto channel_args = grpc::ChannelArguments(); |
||||
channel_args.SetServiceConfigJSON(std::string(kRetryPolicy)); |
||||
GreeterClient greeter(grpc::CreateCustomChannel( |
||||
std::string(kTargetAddress), grpc::InsecureChannelCredentials(), |
||||
channel_args)); |
||||
std::string user("world"); |
||||
std::string reply = greeter.SayHello(user); |
||||
std::cout << "Greeter received: " << reply << std::endl; |
||||
return 0; |
||||
} |
@ -0,0 +1,86 @@ |
||||
/*
|
||||
* Copyright 2024 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/health_check_service_interface.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
using grpc::Server; |
||||
using grpc::ServerBuilder; |
||||
using grpc::ServerContext; |
||||
using grpc::Status; |
||||
using grpc::StatusCode; |
||||
using helloworld::Greeter; |
||||
using helloworld::HelloReply; |
||||
using helloworld::HelloRequest; |
||||
|
||||
// Logic and data behind the server's behavior.
|
||||
class GreeterServiceImpl final : public Greeter::Service { |
||||
public: |
||||
Status SayHello(ServerContext* context, const HelloRequest* request, |
||||
HelloReply* reply) override { |
||||
if (++request_counter_ % request_modulo_ != 0) { |
||||
// Return an OK status for every request_modulo_ number of requests,
|
||||
// return UNAVAILABLE otherwise.
|
||||
std::cout << "return UNAVAILABLE" << std::endl; |
||||
return Status(StatusCode::UNAVAILABLE, ""); |
||||
} |
||||
std::string prefix("Hello "); |
||||
reply->set_message(prefix + request->name()); |
||||
std::cout << "return OK" << std::endl; |
||||
return Status::OK; |
||||
} |
||||
|
||||
private: |
||||
static constexpr int request_modulo_ = 4; |
||||
int request_counter_ = 0; |
||||
}; |
||||
|
||||
void RunServer(uint16_t port) { |
||||
std::string server_address = absl::StrFormat("0.0.0.0:%d", port); |
||||
GreeterServiceImpl service; |
||||
|
||||
grpc::EnableDefaultHealthCheckService(true); |
||||
grpc::reflection::InitProtoReflectionServerBuilderPlugin(); |
||||
ServerBuilder builder; |
||||
// Listen on the given address without any authentication mechanism.
|
||||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); |
||||
// Register "service" as the instance through which we'll communicate with
|
||||
// clients. In this case it corresponds to an *synchronous* service.
|
||||
builder.RegisterService(&service); |
||||
// Finally assemble the server.
|
||||
std::unique_ptr<Server> server(builder.BuildAndStart()); |
||||
std::cout << "Server listening on " << server_address << std::endl; |
||||
|
||||
// Wait for the server to shutdown. Note that some other thread must be
|
||||
// responsible for shutting down the server for this call to ever return.
|
||||
server->Wait(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
RunServer(/*port=*/50052); |
||||
return 0; |
||||
} |
@ -0,0 +1,39 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/transport/call_state.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
std::string CallState::DebugString() const { |
||||
return absl::StrCat( |
||||
"client_to_server_pull_state:", client_to_server_pull_state_, |
||||
" client_to_server_push_state:", client_to_server_push_state_, |
||||
" server_to_client_pull_state:", server_to_client_pull_state_, |
||||
" server_to_client_message_push_state:", server_to_client_push_state_, |
||||
" server_trailing_metadata_state:", server_trailing_metadata_state_, |
||||
client_to_server_push_waiter_.DebugString(), |
||||
" server_to_client_push_waiter:", |
||||
server_to_client_push_waiter_.DebugString(), |
||||
" client_to_server_pull_waiter:", |
||||
client_to_server_pull_waiter_.DebugString(), |
||||
" server_to_client_pull_waiter:", |
||||
server_to_client_pull_waiter_.DebugString(), |
||||
" server_trailing_metadata_waiter:", |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
} |
||||
|
||||
static_assert(sizeof(CallState) <= 16, "CallState too large"); |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,957 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H |
||||
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H |
||||
|
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class CallState { |
||||
public: |
||||
CallState(); |
||||
// Start the call: allows pulls to proceed
|
||||
void Start(); |
||||
// PUSH: client -> server
|
||||
void BeginPushClientToServerMessage(); |
||||
Poll<StatusFlag> PollPushClientToServerMessage(); |
||||
void ClientToServerHalfClose(); |
||||
// PULL: client -> server
|
||||
void BeginPullClientInitialMetadata(); |
||||
void FinishPullClientInitialMetadata(); |
||||
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable(); |
||||
void FinishPullClientToServerMessage(); |
||||
// PUSH: server -> client
|
||||
StatusFlag PushServerInitialMetadata(); |
||||
void BeginPushServerToClientMessage(); |
||||
Poll<StatusFlag> PollPushServerToClientMessage(); |
||||
bool PushServerTrailingMetadata(bool cancel); |
||||
// PULL: server -> client
|
||||
Poll<bool> PollPullServerInitialMetadataAvailable(); |
||||
void FinishPullServerInitialMetadata(); |
||||
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable(); |
||||
void FinishPullServerToClientMessage(); |
||||
Poll<Empty> PollServerTrailingMetadataAvailable(); |
||||
void FinishPullServerTrailingMetadata(); |
||||
Poll<bool> PollWasCancelled(); |
||||
// Debug
|
||||
std::string DebugString() const; |
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
const CallState& call_state) { |
||||
return out << call_state.DebugString(); |
||||
} |
||||
|
||||
private: |
||||
enum class ClientToServerPullState : uint16_t { |
||||
// Ready to read: client initial metadata is there, but not yet processed
|
||||
kBegin, |
||||
// Processing client initial metadata
|
||||
kProcessingClientInitialMetadata, |
||||
// Main call loop: not reading
|
||||
kIdle, |
||||
// Main call loop: reading but no message available
|
||||
kReading, |
||||
// Main call loop: processing one message
|
||||
kProcessingClientToServerMessage, |
||||
// Processing complete
|
||||
kTerminated, |
||||
}; |
||||
static const char* ClientToServerPullStateString( |
||||
ClientToServerPullState state) { |
||||
switch (state) { |
||||
case ClientToServerPullState::kBegin: |
||||
return "Begin"; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
return "ProcessingClientInitialMetadata"; |
||||
case ClientToServerPullState::kIdle: |
||||
return "Idle"; |
||||
case ClientToServerPullState::kReading: |
||||
return "Reading"; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
return "ProcessingClientToServerMessage"; |
||||
case ClientToServerPullState::kTerminated: |
||||
return "Terminated"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ClientToServerPullState state) { |
||||
out.Append(ClientToServerPullStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ClientToServerPullState state) { |
||||
return out << ClientToServerPullStateString(state); |
||||
} |
||||
enum class ClientToServerPushState : uint16_t { |
||||
kIdle, |
||||
kPushedMessage, |
||||
kPushedHalfClose, |
||||
kPushedMessageAndHalfClosed, |
||||
kFinished, |
||||
}; |
||||
static const char* ClientToServerPushStateString( |
||||
ClientToServerPushState state) { |
||||
switch (state) { |
||||
case ClientToServerPushState::kIdle: |
||||
return "Idle"; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
return "PushedMessage"; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return "PushedHalfClose"; |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
return "PushedMessageAndHalfClosed"; |
||||
case ClientToServerPushState::kFinished: |
||||
return "Finished"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ClientToServerPushState state) { |
||||
out.Append(ClientToServerPushStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ClientToServerPushState state) { |
||||
return out << ClientToServerPushStateString(state); |
||||
} |
||||
enum class ServerToClientPullState : uint16_t { |
||||
// Not yet started: cannot read
|
||||
kUnstarted, |
||||
kUnstartedReading, |
||||
kStarted, |
||||
kStartedReading, |
||||
// Processing server initial metadata
|
||||
kProcessingServerInitialMetadata, |
||||
kProcessingServerInitialMetadataReading, |
||||
// Main call loop: not reading
|
||||
kIdle, |
||||
// Main call loop: reading but no message available
|
||||
kReading, |
||||
// Main call loop: processing one message
|
||||
kProcessingServerToClientMessage, |
||||
// Processing server trailing metadata
|
||||
kProcessingServerTrailingMetadata, |
||||
kTerminated, |
||||
}; |
||||
static const char* ServerToClientPullStateString( |
||||
ServerToClientPullState state) { |
||||
switch (state) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
return "Unstarted"; |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
return "UnstartedReading"; |
||||
case ServerToClientPullState::kStarted: |
||||
return "Started"; |
||||
case ServerToClientPullState::kStartedReading: |
||||
return "StartedReading"; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
return "ProcessingServerInitialMetadata"; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
return "ProcessingServerInitialMetadataReading"; |
||||
case ServerToClientPullState::kIdle: |
||||
return "Idle"; |
||||
case ServerToClientPullState::kReading: |
||||
return "Reading"; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
return "ProcessingServerToClientMessage"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
return "ProcessingServerTrailingMetadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return "Terminated"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerToClientPullState state) { |
||||
out.Append(ServerToClientPullStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerToClientPullState state) { |
||||
return out << ServerToClientPullStateString(state); |
||||
} |
||||
enum class ServerToClientPushState : uint16_t { |
||||
kStart, |
||||
kPushedServerInitialMetadata, |
||||
kPushedServerInitialMetadataAndPushedMessage, |
||||
kTrailersOnly, |
||||
kIdle, |
||||
kPushedMessage, |
||||
kFinished, |
||||
}; |
||||
static const char* ServerToClientPushStateString( |
||||
ServerToClientPushState state) { |
||||
switch (state) { |
||||
case ServerToClientPushState::kStart: |
||||
return "Start"; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
return "PushedServerInitialMetadata"; |
||||
case ServerToClientPushState:: |
||||
kPushedServerInitialMetadataAndPushedMessage: |
||||
return "PushedServerInitialMetadataAndPushedMessage"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return "TrailersOnly"; |
||||
case ServerToClientPushState::kIdle: |
||||
return "Idle"; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
return "PushedMessage"; |
||||
case ServerToClientPushState::kFinished: |
||||
return "Finished"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerToClientPushState state) { |
||||
out.Append(ServerToClientPushStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerToClientPushState state) { |
||||
return out << ServerToClientPushStateString(state); |
||||
} |
||||
enum class ServerTrailingMetadataState : uint16_t { |
||||
kNotPushed, |
||||
kPushed, |
||||
kPushedCancel, |
||||
kPulled, |
||||
kPulledCancel, |
||||
}; |
||||
static const char* ServerTrailingMetadataStateString( |
||||
ServerTrailingMetadataState state) { |
||||
switch (state) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
return "NotPushed"; |
||||
case ServerTrailingMetadataState::kPushed: |
||||
return "Pushed"; |
||||
case ServerTrailingMetadataState::kPushedCancel: |
||||
return "PushedCancel"; |
||||
case ServerTrailingMetadataState::kPulled: |
||||
return "Pulled"; |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
return "PulledCancel"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) { |
||||
out.Append(ServerTrailingMetadataStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerTrailingMetadataState state) { |
||||
return out << ServerTrailingMetadataStateString(state); |
||||
} |
||||
ClientToServerPullState client_to_server_pull_state_ : 3; |
||||
ClientToServerPushState client_to_server_push_state_ : 3; |
||||
ServerToClientPullState server_to_client_pull_state_ : 4; |
||||
ServerToClientPushState server_to_client_push_state_ : 3; |
||||
ServerTrailingMetadataState server_trailing_metadata_state_ : 3; |
||||
IntraActivityWaiter client_to_server_pull_waiter_; |
||||
IntraActivityWaiter server_to_client_pull_waiter_; |
||||
IntraActivityWaiter client_to_server_push_waiter_; |
||||
IntraActivityWaiter server_to_client_push_waiter_; |
||||
IntraActivityWaiter server_trailing_metadata_waiter_; |
||||
}; |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState() |
||||
: client_to_server_pull_state_(ClientToServerPullState::kBegin), |
||||
client_to_server_push_state_(ClientToServerPushState::kIdle), |
||||
server_to_client_pull_state_(ServerToClientPullState::kUnstarted), |
||||
server_to_client_push_state_(ServerToClientPushState::kStart), |
||||
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) { |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] Start: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStarted; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
LOG(FATAL) << "Start called twice"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
case ServerToClientPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPushClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPushClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
LOG(FATAL) << "PushClientToServerMessage called twice concurrently"; |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
LOG(FATAL) << "PushClientToServerMessage called after half-close"; |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag> |
||||
CallState::PollPushClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPushClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return Success{}; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
return client_to_server_push_waiter_.pending(); |
||||
case ClientToServerPushState::kFinished: |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::ClientToServerHalfClose() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] ClientToServerHalfClose: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
client_to_server_push_state_ = |
||||
ClientToServerPushState::kPushedMessageAndHalfClosed; |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
LOG(FATAL) << "ClientToServerHalfClose called twice"; |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPullClientInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPullClientInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
client_to_server_pull_state_ = |
||||
ClientToServerPullState::kProcessingClientInitialMetadata; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
case ClientToServerPullState::kIdle: |
||||
case ClientToServerPullState::kReading: |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "BeginPullClientInitialMetadata called twice"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullClientInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullClientInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin"; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kIdle; |
||||
client_to_server_pull_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPullState::kIdle: |
||||
case ClientToServerPullState::kReading: |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>> |
||||
CallState::PollPullClientToServerMessageAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullClientToServerMessageAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_, |
||||
client_to_server_push_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
return client_to_server_pull_waiter_.pending(); |
||||
case ClientToServerPullState::kIdle: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ClientToServerPullState::kReading: |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while " |
||||
"processing a message"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
return Failure{}; |
||||
} |
||||
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
return client_to_server_push_waiter_.pending(); |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_pull_state_ = |
||||
ClientToServerPullState::kProcessingClientToServerMessage; |
||||
return true; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return false; |
||||
case ClientToServerPushState::kFinished: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kTerminated; |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_, |
||||
client_to_server_push_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin"; |
||||
break; |
||||
case ClientToServerPullState::kIdle: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called twice"; |
||||
break; |
||||
case ClientToServerPullState::kReading: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called before " |
||||
"PollPullClientToServerMessageAvailable"; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kIdle; |
||||
client_to_server_pull_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kPushedMessage: |
||||
client_to_server_push_state_ = ClientToServerPushState::kIdle; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kIdle: |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called without a message"; |
||||
break; |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag |
||||
CallState::PushServerInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PushServerInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_, |
||||
server_trailing_metadata_state_); |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return Failure{}; |
||||
} |
||||
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart); |
||||
server_to_client_push_state_ = |
||||
ServerToClientPushState::kPushedServerInitialMetadata; |
||||
server_to_client_push_waiter_.Wake(); |
||||
return Success{}; |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPushServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPushServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "BeginPushServerToClientMessage called before " |
||||
"PushServerInitialMetadata"; |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
server_to_client_push_state_ = |
||||
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage; |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently"; |
||||
break; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
// Will fail in poll.
|
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag> |
||||
CallState::PollPushServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPushServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
LOG(FATAL) << "PollPushServerToClientMessage called before " |
||||
<< "PushServerInitialMetadata"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return false; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kIdle: |
||||
return Success{}; |
||||
case ServerToClientPushState::kFinished: |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool |
||||
CallState::PushServerTrailingMetadata(bool cancel) { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PushServerTrailingMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_, |
||||
server_to_client_push_state_, |
||||
client_to_server_push_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return false; |
||||
} |
||||
server_trailing_metadata_state_ = |
||||
cancel ? ServerTrailingMetadataState::kPushedCancel |
||||
: ServerTrailingMetadataState::kPushed; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
if (cancel) { |
||||
server_to_client_push_state_ = ServerToClientPushState::kFinished; |
||||
server_to_client_push_waiter_.Wake(); |
||||
} |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
if (cancel) { |
||||
server_to_client_push_state_ = ServerToClientPushState::kFinished; |
||||
server_to_client_push_waiter_.Wake(); |
||||
} |
||||
break; |
||||
case ServerToClientPushState::kFinished: |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
break; |
||||
} |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kFinished; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_push_state_ = ClientToServerPushState::kFinished; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool> |
||||
CallState::PollPullServerInitialMetadataAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullServerInitialMetadataAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_); |
||||
bool reading; |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
if (server_to_client_push_state_ == |
||||
ServerToClientPushState::kTrailersOnly) { |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
return false; |
||||
} |
||||
server_to_client_push_waiter_.pending(); |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStartedReading: |
||||
reading = true; |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
reading = false; |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return false; |
||||
} |
||||
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted || |
||||
server_to_client_pull_state_ == |
||||
ServerToClientPullState::kStartedReading) |
||||
<< server_to_client_pull_state_; |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
server_to_client_pull_state_ = |
||||
reading |
||||
? ServerToClientPullState::kProcessingServerInitialMetadataReading |
||||
: ServerToClientPullState::kProcessingServerInitialMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return true; |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
LOG(FATAL) |
||||
<< "PollPullServerInitialMetadataAvailable after metadata processed"; |
||||
case ServerToClientPushState::kFinished: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return false; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return false; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start"; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
CHECK_EQ(server_to_client_push_state_, |
||||
ServerToClientPushState::kTrailersOnly); |
||||
return; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kIdle; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kReading; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return; |
||||
} |
||||
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle || |
||||
server_to_client_pull_state_ == ServerToClientPullState::kReading) |
||||
<< server_to_client_pull_state_; |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial " |
||||
"metadata consumed"; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
server_to_client_push_state_ = ServerToClientPushState::kIdle; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
case ServerToClientPushState::kFinished: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called twice"; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>> |
||||
CallState::PollPullServerToClientMessageAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullServerToClientMessageAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_, |
||||
server_trailing_metadata_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading; |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerInitialMetadataReading; |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPullState::kStartedReading: |
||||
if (server_to_client_push_state_ == |
||||
ServerToClientPushState::kTrailersOnly) { |
||||
return false; |
||||
} |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kIdle: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPullState::kReading: |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " |
||||
"processing a message"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " |
||||
"processing trailing metadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return Failure{}; |
||||
} |
||||
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kIdle: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return false; |
||||
} |
||||
server_trailing_metadata_waiter_.pending(); |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
DCHECK_NE(server_trailing_metadata_state_, |
||||
ServerTrailingMetadataState::kNotPushed); |
||||
return false; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerToClientMessage; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return true; |
||||
case ServerToClientPushState::kFinished: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
LOG(FATAL) |
||||
<< "FinishPullServerToClientMessage called before metadata available"; |
||||
case ServerToClientPullState::kIdle: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called twice"; |
||||
case ServerToClientPullState::kReading: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called before " |
||||
<< "PollPullServerToClientMessageAvailable"; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kIdle; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called while processing " |
||||
"trailing metadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
break; |
||||
} |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called before initial " |
||||
"metadata consumed"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called after " |
||||
"PushServerTrailingMetadata"; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_push_state_ = ServerToClientPushState::kIdle; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called without a message"; |
||||
case ServerToClientPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty> |
||||
CallState::PollServerTrailingMetadataAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollServerTrailingMetadataAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_, |
||||
server_trailing_metadata_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kReading: |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kFinished: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerTrailingMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Empty{}; |
||||
} |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState:: |
||||
kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_push_waiter_.pending(); |
||||
return server_to_client_pull_waiter_.pending(); |
||||
} |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kIdle: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerTrailingMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Empty{}; |
||||
} |
||||
return server_trailing_metadata_waiter_.pending(); |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return Empty{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerTrailingMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerTrailingMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
switch (server_trailing_metadata_state_) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
LOG(FATAL) << "FinishPullServerTrailingMetadata called before " |
||||
"PollServerTrailingMetadataAvailable"; |
||||
case ServerTrailingMetadataState::kPushed: |
||||
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
break; |
||||
case ServerTrailingMetadataState::kPushedCancel: |
||||
server_trailing_metadata_state_ = |
||||
ServerTrailingMetadataState::kPulledCancel; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
break; |
||||
case ServerTrailingMetadataState::kPulled: |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice"; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool> |
||||
CallState::PollWasCancelled() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollWasCancelled: " |
||||
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_); |
||||
switch (server_trailing_metadata_state_) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
case ServerTrailingMetadataState::kPushed: |
||||
case ServerTrailingMetadataState::kPushedCancel: { |
||||
return server_trailing_metadata_waiter_.pending(); |
||||
} |
||||
case ServerTrailingMetadataState::kPulled: |
||||
return false; |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
return true; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
|
@ -0,0 +1,310 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/transport/call_state.h" |
||||
|
||||
#include <vector> |
||||
|
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include "test/core/promise/poll_matcher.h" |
||||
|
||||
using testing::Mock; |
||||
using testing::StrictMock; |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
// A mock activity that can be activated and deactivated.
|
||||
class MockActivity : public Activity, public Wakeable { |
||||
public: |
||||
MOCK_METHOD(void, WakeupRequested, ()); |
||||
|
||||
void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void Orphan() override {} |
||||
Waker MakeOwningWaker() override { return Waker(this, 0); } |
||||
Waker MakeNonOwningWaker() override { return Waker(this, 0); } |
||||
void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void Drop(WakeupMask /*mask*/) override {} |
||||
std::string DebugTag() const override { return "MockActivity"; } |
||||
std::string ActivityDebugTag(WakeupMask /*mask*/) const override { |
||||
return DebugTag(); |
||||
} |
||||
|
||||
void Activate() { |
||||
if (scoped_activity_ == nullptr) { |
||||
scoped_activity_ = std::make_unique<ScopedActivity>(this); |
||||
} |
||||
} |
||||
|
||||
void Deactivate() { scoped_activity_.reset(); } |
||||
|
||||
private: |
||||
std::unique_ptr<ScopedActivity> scoped_activity_; |
||||
}; |
||||
|
||||
#define EXPECT_WAKEUP(activity, statement) \ |
||||
EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
|
||||
statement; \
|
||||
Mock::VerifyAndClearExpectations(&(activity)); |
||||
|
||||
} // namespace
|
||||
|
||||
TEST(CallStateTest, NoOp) { CallState state; } |
||||
|
||||
TEST(CallStateTest, StartTwiceCrashes) { |
||||
CallState state; |
||||
state.Start(); |
||||
EXPECT_DEATH(state.Start(), ""); |
||||
} |
||||
|
||||
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.Start()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); |
||||
} |
||||
|
||||
TEST(CallStateTest, PullClientInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), ""); |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
state.BeginPushClientToServerMessage(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
state.BeginPullClientInitialMetadata(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
|
||||
// Message 0
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 1
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 2: push before polling
|
||||
state.BeginPushClientToServerMessage(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 3: push before polling and half close
|
||||
state.BeginPushClientToServerMessage(); |
||||
state.ClientToServerHalfClose(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// ... and now we should see the half close
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ImmediateClientToServerHalfClose) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
state.ClientToServerHalfClose(); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.Start()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); |
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), |
||||
IsReady(true))); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, RepeatedServerToClientMessages) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.PushServerInitialMetadata(); |
||||
state.Start(); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
|
||||
// Message 0
|
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 1
|
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 2: push before polling
|
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 3: push before polling
|
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailersOnly) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
state.FinishPullServerTrailingMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
state.FinishPullServerTrailingMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, RecallNoCancellation) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, RecallCancellation) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(true); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsReady(true)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false)); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false)); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_tracer_init(); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue