mirror of https://github.com/grpc/grpc.git
commit
be0b6ee33c
58 changed files with 2484 additions and 1578 deletions
@ -0,0 +1,265 @@ |
||||
# xDS Bootstrap File Format in gRPC |
||||
|
||||
This document specifies the xDS bootstrap file format supported by gRPC. |
||||
|
||||
## Background |
||||
|
||||
gRPC expects the xDS bootstrap configuration to be specified as a JSON string. |
||||
The xDS bootstrap file location may be specified using the environment variable |
||||
`GRPC_XDS_BOOTSTRAP`. Alternatively, the bootstrap file contents may be |
||||
specified using the environment variable `GRPC_XDS_BOOTSTRAP_CONFIG`. If both |
||||
are specified, the former takes precendence. |
||||
|
||||
The xDS client inside of gRPC parses the bootstrap configuration specified by |
||||
one of the above means when it is created to configure itself. |
||||
|
||||
The following sections describe the bootstrap file format, including links to |
||||
gRFCs where support for appropriate fields was added. |
||||
|
||||
## File Format |
||||
|
||||
``` |
||||
{ |
||||
// The xDS server to talk to. The value is an ordered array of server |
||||
// configurations, to support failing over to a secondary xDS server if the |
||||
// primary is down. |
||||
// |
||||
// Prior to gRFC A71, all but the first entry was ignored. |
||||
"xds_servers": [ |
||||
{ |
||||
|
||||
// A target URI string suitable for creating a gRPC channel. |
||||
"server_uri": <string containing the target URI of xds server>, |
||||
|
||||
// List of channel creds; client will stop at the first type it |
||||
// supports. This field is required and must contain at least one |
||||
// channel creds type that the client supports. |
||||
// |
||||
// See section titled "Supported Channel Credentials". |
||||
"channel_creds": [ |
||||
{ |
||||
"type": <string containing channel cred type>, |
||||
|
||||
// The "config" field is optional; it may be missing if the |
||||
// credential type does not require config parameters. |
||||
"config": <JSON object containing config for the type> |
||||
} |
||||
], |
||||
|
||||
// A list of features supported by the server. New values will |
||||
// be added over time. For forward compatibility reasons, the |
||||
// client will ignore any entry in the list that it does not |
||||
// understand, regardless of type. |
||||
// |
||||
// See section titled "Supported Server Features". |
||||
"server_features": [ ... ] |
||||
} |
||||
], |
||||
|
||||
// Identifies a specific gRPC instance. |
||||
"node": { |
||||
|
||||
// Opaque identifier for the gRPC instance. |
||||
"id": <string>, |
||||
|
||||
// Identifier for the local service cluster where the gRPC instance is |
||||
// running. |
||||
"cluster": <string>, |
||||
|
||||
// Specifies where the gRPC instance is running. |
||||
"locality": { |
||||
"region": <string>, |
||||
"zone": <string>, |
||||
"sub_zone": <string>, |
||||
}, |
||||
|
||||
// Opaque metadata extending the node identifier. |
||||
"metadata": <JSON Object>, |
||||
} |
||||
|
||||
// Map of supported certificate providers, keyed by the provider instance |
||||
// name. |
||||
// See section titled "Supported certificate providers". |
||||
"certificate_providers": { |
||||
|
||||
// Certificate provider instance name, specified by the |
||||
// control plane, to fetch certificates from. |
||||
"<instance_name>": { |
||||
|
||||
// Name of the plugin implementation. |
||||
"plugin_name": <string containing plugin type>, |
||||
|
||||
// A JSON object containing the configuration for the plugin, whose schema |
||||
// is defined by the plugin. The "config" field is optional; it may be |
||||
// missing if the credential type does not require config parameters. |
||||
"config": <JSON object containing config for the type> |
||||
} |
||||
} |
||||
|
||||
// A template for the name of the Listener resource to subscribe to for a gRPC |
||||
// server. If the token `%s` is present in the string, all instances of the |
||||
// token will be replaced with the server's listening "IP:port" (e.g., |
||||
// "0.0.0.0:8080", "[::]:8080"). |
||||
"server_listener_resource_name_template": "example/resource/%s", |
||||
|
||||
// A template for the name of the Listener resource to subscribe to for a gRPC |
||||
// client channel. Used only when the channel is created with an "xds:" URI |
||||
// with no authority. |
||||
// |
||||
// If starts with "xdstp:", will be interpreted as a new-style name, in which |
||||
// case the authority of the URI will be used to select the relevant |
||||
// configuration in the "authorities" map. |
||||
// |
||||
// The token "%s", if present in this string, will be replaced with the |
||||
// service authority (i.e., the path part of the target URI used to create the |
||||
// gRPC channel). If the template starts with "xdstp:", the replaced string |
||||
// will be percent-encoded. In that case, the replacement string must include |
||||
// only characters allowed in a URI path as per RFC-3986 section 3.3 (which |
||||
// includes '/'), and all other characters must be percent-encoded. |
||||
// |
||||
// Defaults to "%s". |
||||
"client_default_listener_resource_name_template": <string>, |
||||
|
||||
// A map of authority name to corresponding configuration. |
||||
// |
||||
// This is used in the following cases: |
||||
// - A gRPC client channel is created using an "xds:" URI that includes |
||||
// an authority. |
||||
// - A gRPC client channel is created using an "xds:" URI with no |
||||
// authority, but the "client_default_listener_resource_name_template" |
||||
// field turns it into an "xdstp:" URI. |
||||
// - A gRPC server is created and the |
||||
// "server_listener_resource_name_template" field is an "xdstp:" URI. |
||||
// |
||||
// In any of those cases, it is an error if the specified authority is |
||||
// not present in this map. |
||||
"authorities": { |
||||
// Entries are keyed by authority name. |
||||
// Note: If a new-style resource name has no authority, we will use |
||||
// the empty string here as the key. |
||||
"<authority_name>": { |
||||
|
||||
// A template for the name of the Listener resource to subscribe |
||||
// to for a gRPC client channel. Used only when the channel is |
||||
// created using an "xds:" URI with this authority name. |
||||
// |
||||
// The token "%s", if present in this string, will be replaced |
||||
// with percent-encoded service authority (i.e., the path part of the |
||||
// target URI used to create the gRPC channel). The replacement string |
||||
// must include only characters allowed in a URI path as per RFC-3986 |
||||
// section 3.3 (which includes '/'), and all other characters must be |
||||
// percent-encoded. |
||||
// |
||||
// Must start with "xdstp://<authority_name>/". If it does not, |
||||
// that is considered a bootstrap file parsing error. |
||||
// |
||||
// If not present in the bootstrap file, defaults to |
||||
// "xdstp://<authority_name>/envoy.config.listener.v3.Listener/%s". |
||||
"client_listener_resource_name_template": <string>, |
||||
|
||||
// Ordered list of xDS servers to contact for this authority. |
||||
// Format is exactly the same as the top level "xds_servers" field. |
||||
// |
||||
// If the same server is listed in multiple authorities, the |
||||
// entries will be de-duped (i.e., resources for both authorities |
||||
// will be fetched on the same ADS stream). |
||||
// |
||||
// If not specified, the top-level server list is used. |
||||
"xds_servers": [ ... ] |
||||
} |
||||
} |
||||
} |
||||
``` |
||||
|
||||
### Supported Channel Credentials |
||||
|
||||
gRPC supports the following channel credentials as part of the `channel_creds` |
||||
field of `xds_servers`. |
||||
|
||||
#### Insecure credentials |
||||
|
||||
- **Type Name**: `insecure` |
||||
- **Config**: Accepts no configuration |
||||
|
||||
#### Google Default credentials |
||||
|
||||
- **Type Name**: `google_default` |
||||
- **Config**: Accepts no configuration |
||||
|
||||
#### mTLS credentials |
||||
|
||||
- **Type Name**: `tls` |
||||
- **Config**: As described in [gRFC A65](a65): |
||||
``` |
||||
{ |
||||
// Path to CA certificate file. |
||||
// If unset, system-wide root certs are used. |
||||
"ca_certificate_file": <string>, |
||||
|
||||
// Paths to identity certificate file and private key file. |
||||
// If either of these fields are set, both must be set. |
||||
// If set, mTLS will be used; if unset, normal TLS will be used. |
||||
"certificate_file": <string>, |
||||
"private_key_file": <string>, |
||||
|
||||
// How often to re-read the certificate files. |
||||
// Value is the JSON format described for a google.protobuf.Duration |
||||
// message in https://protobuf.dev/programming-guides/proto3/#json. |
||||
// If unset, defaults to "600s". |
||||
"refresh_interval": <string> |
||||
} |
||||
``` |
||||
|
||||
### Supported Certificate Provider Instances |
||||
|
||||
gRPC supports the following Certificate Provider instances as part of the |
||||
`certificate_providers` field: |
||||
|
||||
#### PEM file watcher |
||||
|
||||
- **Plugin Name**: `file_watcher` |
||||
- **Config**: As described in [gRFC A29](a29): |
||||
``` |
||||
{ |
||||
"certificate_file": "<path to the certificate file in PEM format>", |
||||
"private_key_file": "<path to private key file in PEM format>", |
||||
"ca_certificate_file": "<path to CA certificate file in PEM format>", |
||||
"refresh_interval": "<JSON form of google.protobuf.Duration>" |
||||
} |
||||
``` |
||||
|
||||
### Supported Server Features |
||||
|
||||
gRPC supports the following server features in the `server_features` field |
||||
inside `xds_servers`: |
||||
- `xds_v3`: Added in gRFC A30. Supported in older versions of gRPC. See |
||||
[here](grpc_xds_features.md) for when gRPC added support for xDS transport |
||||
protocol v3, and when support for xDS transport protocol v2 was dropped. |
||||
- `ignore_resource_deletion`: Added in [gRFC A53](a53) |
||||
|
||||
|
||||
### When were fields added? |
||||
|
||||
| Bootstrap Field | Relevant gRFCs |
||||
------------------|--------------- |
||||
`xds_servers` | [A27](a27), [A71](a71) |
||||
`google_default` channel credentials | [A27](a27) |
||||
`insecure` channel credentials | [A27](a27) |
||||
`node` | [A27](a27) |
||||
`certificate_providers` | [A29](a29) |
||||
`file_watcher`certificate provider | [A29](a29) |
||||
`xds_servers.server_features` | [A30](a30) |
||||
`server_listener_resource_name_template` | [A36](a36), [A47](a47) |
||||
`client_default_listener_resource_name_template` | [A47](a47) |
||||
`authorities` | [A47](a47) |
||||
`tls` channel credentials | [A65](a65) |
||||
|
||||
|
||||
[a27]: https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md |
||||
[a29]: https://github.com/grpc/proposal/blob/master/A29-xds-tls-security.md#file_watcher-certificate-provider |
||||
[a30]: https://github.com/grpc/proposal/blob/master/A30-xds-v3.md |
||||
[a36]: https://github.com/grpc/proposal/blob/master/A36-xds-for-servers.md |
||||
[a47]: https://github.com/grpc/proposal/blob/master/A47-xds-federation.md |
||||
[a53]: https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md |
||||
[a65]: https://github.com/grpc/proposal/blob/master/A65-xds-mtls-creds-in-bootstrap.md#proposal |
||||
[a71]: https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md |
@ -0,0 +1,39 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/transport/call_state.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
std::string CallState::DebugString() const { |
||||
return absl::StrCat( |
||||
"client_to_server_pull_state:", client_to_server_pull_state_, |
||||
" client_to_server_push_state:", client_to_server_push_state_, |
||||
" server_to_client_pull_state:", server_to_client_pull_state_, |
||||
" server_to_client_message_push_state:", server_to_client_push_state_, |
||||
" server_trailing_metadata_state:", server_trailing_metadata_state_, |
||||
client_to_server_push_waiter_.DebugString(), |
||||
" server_to_client_push_waiter:", |
||||
server_to_client_push_waiter_.DebugString(), |
||||
" client_to_server_pull_waiter:", |
||||
client_to_server_pull_waiter_.DebugString(), |
||||
" server_to_client_pull_waiter:", |
||||
server_to_client_pull_waiter_.DebugString(), |
||||
" server_trailing_metadata_waiter:", |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
} |
||||
|
||||
static_assert(sizeof(CallState) <= 16, "CallState too large"); |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,957 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H |
||||
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H |
||||
|
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class CallState { |
||||
public: |
||||
CallState(); |
||||
// Start the call: allows pulls to proceed
|
||||
void Start(); |
||||
// PUSH: client -> server
|
||||
void BeginPushClientToServerMessage(); |
||||
Poll<StatusFlag> PollPushClientToServerMessage(); |
||||
void ClientToServerHalfClose(); |
||||
// PULL: client -> server
|
||||
void BeginPullClientInitialMetadata(); |
||||
void FinishPullClientInitialMetadata(); |
||||
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable(); |
||||
void FinishPullClientToServerMessage(); |
||||
// PUSH: server -> client
|
||||
StatusFlag PushServerInitialMetadata(); |
||||
void BeginPushServerToClientMessage(); |
||||
Poll<StatusFlag> PollPushServerToClientMessage(); |
||||
bool PushServerTrailingMetadata(bool cancel); |
||||
// PULL: server -> client
|
||||
Poll<bool> PollPullServerInitialMetadataAvailable(); |
||||
void FinishPullServerInitialMetadata(); |
||||
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable(); |
||||
void FinishPullServerToClientMessage(); |
||||
Poll<Empty> PollServerTrailingMetadataAvailable(); |
||||
void FinishPullServerTrailingMetadata(); |
||||
Poll<bool> PollWasCancelled(); |
||||
// Debug
|
||||
std::string DebugString() const; |
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
const CallState& call_state) { |
||||
return out << call_state.DebugString(); |
||||
} |
||||
|
||||
private: |
||||
enum class ClientToServerPullState : uint16_t { |
||||
// Ready to read: client initial metadata is there, but not yet processed
|
||||
kBegin, |
||||
// Processing client initial metadata
|
||||
kProcessingClientInitialMetadata, |
||||
// Main call loop: not reading
|
||||
kIdle, |
||||
// Main call loop: reading but no message available
|
||||
kReading, |
||||
// Main call loop: processing one message
|
||||
kProcessingClientToServerMessage, |
||||
// Processing complete
|
||||
kTerminated, |
||||
}; |
||||
static const char* ClientToServerPullStateString( |
||||
ClientToServerPullState state) { |
||||
switch (state) { |
||||
case ClientToServerPullState::kBegin: |
||||
return "Begin"; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
return "ProcessingClientInitialMetadata"; |
||||
case ClientToServerPullState::kIdle: |
||||
return "Idle"; |
||||
case ClientToServerPullState::kReading: |
||||
return "Reading"; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
return "ProcessingClientToServerMessage"; |
||||
case ClientToServerPullState::kTerminated: |
||||
return "Terminated"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ClientToServerPullState state) { |
||||
out.Append(ClientToServerPullStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ClientToServerPullState state) { |
||||
return out << ClientToServerPullStateString(state); |
||||
} |
||||
enum class ClientToServerPushState : uint16_t { |
||||
kIdle, |
||||
kPushedMessage, |
||||
kPushedHalfClose, |
||||
kPushedMessageAndHalfClosed, |
||||
kFinished, |
||||
}; |
||||
static const char* ClientToServerPushStateString( |
||||
ClientToServerPushState state) { |
||||
switch (state) { |
||||
case ClientToServerPushState::kIdle: |
||||
return "Idle"; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
return "PushedMessage"; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return "PushedHalfClose"; |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
return "PushedMessageAndHalfClosed"; |
||||
case ClientToServerPushState::kFinished: |
||||
return "Finished"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ClientToServerPushState state) { |
||||
out.Append(ClientToServerPushStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ClientToServerPushState state) { |
||||
return out << ClientToServerPushStateString(state); |
||||
} |
||||
enum class ServerToClientPullState : uint16_t { |
||||
// Not yet started: cannot read
|
||||
kUnstarted, |
||||
kUnstartedReading, |
||||
kStarted, |
||||
kStartedReading, |
||||
// Processing server initial metadata
|
||||
kProcessingServerInitialMetadata, |
||||
kProcessingServerInitialMetadataReading, |
||||
// Main call loop: not reading
|
||||
kIdle, |
||||
// Main call loop: reading but no message available
|
||||
kReading, |
||||
// Main call loop: processing one message
|
||||
kProcessingServerToClientMessage, |
||||
// Processing server trailing metadata
|
||||
kProcessingServerTrailingMetadata, |
||||
kTerminated, |
||||
}; |
||||
static const char* ServerToClientPullStateString( |
||||
ServerToClientPullState state) { |
||||
switch (state) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
return "Unstarted"; |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
return "UnstartedReading"; |
||||
case ServerToClientPullState::kStarted: |
||||
return "Started"; |
||||
case ServerToClientPullState::kStartedReading: |
||||
return "StartedReading"; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
return "ProcessingServerInitialMetadata"; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
return "ProcessingServerInitialMetadataReading"; |
||||
case ServerToClientPullState::kIdle: |
||||
return "Idle"; |
||||
case ServerToClientPullState::kReading: |
||||
return "Reading"; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
return "ProcessingServerToClientMessage"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
return "ProcessingServerTrailingMetadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return "Terminated"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerToClientPullState state) { |
||||
out.Append(ServerToClientPullStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerToClientPullState state) { |
||||
return out << ServerToClientPullStateString(state); |
||||
} |
||||
enum class ServerToClientPushState : uint16_t { |
||||
kStart, |
||||
kPushedServerInitialMetadata, |
||||
kPushedServerInitialMetadataAndPushedMessage, |
||||
kTrailersOnly, |
||||
kIdle, |
||||
kPushedMessage, |
||||
kFinished, |
||||
}; |
||||
static const char* ServerToClientPushStateString( |
||||
ServerToClientPushState state) { |
||||
switch (state) { |
||||
case ServerToClientPushState::kStart: |
||||
return "Start"; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
return "PushedServerInitialMetadata"; |
||||
case ServerToClientPushState:: |
||||
kPushedServerInitialMetadataAndPushedMessage: |
||||
return "PushedServerInitialMetadataAndPushedMessage"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return "TrailersOnly"; |
||||
case ServerToClientPushState::kIdle: |
||||
return "Idle"; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
return "PushedMessage"; |
||||
case ServerToClientPushState::kFinished: |
||||
return "Finished"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerToClientPushState state) { |
||||
out.Append(ServerToClientPushStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerToClientPushState state) { |
||||
return out << ServerToClientPushStateString(state); |
||||
} |
||||
enum class ServerTrailingMetadataState : uint16_t { |
||||
kNotPushed, |
||||
kPushed, |
||||
kPushedCancel, |
||||
kPulled, |
||||
kPulledCancel, |
||||
}; |
||||
static const char* ServerTrailingMetadataStateString( |
||||
ServerTrailingMetadataState state) { |
||||
switch (state) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
return "NotPushed"; |
||||
case ServerTrailingMetadataState::kPushed: |
||||
return "Pushed"; |
||||
case ServerTrailingMetadataState::kPushedCancel: |
||||
return "PushedCancel"; |
||||
case ServerTrailingMetadataState::kPulled: |
||||
return "Pulled"; |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
return "PulledCancel"; |
||||
} |
||||
} |
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) { |
||||
out.Append(ServerTrailingMetadataStateString(state)); |
||||
} |
||||
friend std::ostream& operator<<(std::ostream& out, |
||||
ServerTrailingMetadataState state) { |
||||
return out << ServerTrailingMetadataStateString(state); |
||||
} |
||||
ClientToServerPullState client_to_server_pull_state_ : 3; |
||||
ClientToServerPushState client_to_server_push_state_ : 3; |
||||
ServerToClientPullState server_to_client_pull_state_ : 4; |
||||
ServerToClientPushState server_to_client_push_state_ : 3; |
||||
ServerTrailingMetadataState server_trailing_metadata_state_ : 3; |
||||
IntraActivityWaiter client_to_server_pull_waiter_; |
||||
IntraActivityWaiter server_to_client_pull_waiter_; |
||||
IntraActivityWaiter client_to_server_push_waiter_; |
||||
IntraActivityWaiter server_to_client_push_waiter_; |
||||
IntraActivityWaiter server_trailing_metadata_waiter_; |
||||
}; |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState() |
||||
: client_to_server_pull_state_(ClientToServerPullState::kBegin), |
||||
client_to_server_push_state_(ClientToServerPushState::kIdle), |
||||
server_to_client_pull_state_(ServerToClientPullState::kUnstarted), |
||||
server_to_client_push_state_(ServerToClientPushState::kStart), |
||||
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) { |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] Start: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStarted; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
LOG(FATAL) << "Start called twice"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
case ServerToClientPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPushClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPushClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
LOG(FATAL) << "PushClientToServerMessage called twice concurrently"; |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
LOG(FATAL) << "PushClientToServerMessage called after half-close"; |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag> |
||||
CallState::PollPushClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPushClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return Success{}; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
return client_to_server_push_waiter_.pending(); |
||||
case ClientToServerPushState::kFinished: |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::ClientToServerHalfClose() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] ClientToServerHalfClose: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
client_to_server_push_state_ = |
||||
ClientToServerPushState::kPushedMessageAndHalfClosed; |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
LOG(FATAL) << "ClientToServerHalfClose called twice"; |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPullClientInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPullClientInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
client_to_server_pull_state_ = |
||||
ClientToServerPullState::kProcessingClientInitialMetadata; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
case ClientToServerPullState::kIdle: |
||||
case ClientToServerPullState::kReading: |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "BeginPullClientInitialMetadata called twice"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullClientInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullClientInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin"; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kIdle; |
||||
client_to_server_pull_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPullState::kIdle: |
||||
case ClientToServerPullState::kReading: |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>> |
||||
CallState::PollPullClientToServerMessageAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullClientToServerMessageAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_, |
||||
client_to_server_push_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
return client_to_server_pull_waiter_.pending(); |
||||
case ClientToServerPullState::kIdle: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ClientToServerPullState::kReading: |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while " |
||||
"processing a message"; |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
return Failure{}; |
||||
} |
||||
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading); |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
return client_to_server_push_waiter_.pending(); |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_pull_state_ = |
||||
ClientToServerPullState::kProcessingClientToServerMessage; |
||||
return true; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
return false; |
||||
case ClientToServerPushState::kFinished: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kTerminated; |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullClientToServerMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullClientToServerMessage: " |
||||
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_, |
||||
client_to_server_push_state_); |
||||
switch (client_to_server_pull_state_) { |
||||
case ClientToServerPullState::kBegin: |
||||
case ClientToServerPullState::kProcessingClientInitialMetadata: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin"; |
||||
break; |
||||
case ClientToServerPullState::kIdle: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called twice"; |
||||
break; |
||||
case ClientToServerPullState::kReading: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called before " |
||||
"PollPullClientToServerMessageAvailable"; |
||||
break; |
||||
case ClientToServerPullState::kProcessingClientToServerMessage: |
||||
client_to_server_pull_state_ = ClientToServerPullState::kIdle; |
||||
client_to_server_pull_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPullState::kTerminated: |
||||
break; |
||||
} |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kPushedMessage: |
||||
client_to_server_push_state_ = ClientToServerPushState::kIdle; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kIdle: |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
LOG(FATAL) << "FinishPullClientToServerMessage called without a message"; |
||||
break; |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag |
||||
CallState::PushServerInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PushServerInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_, |
||||
server_trailing_metadata_state_); |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return Failure{}; |
||||
} |
||||
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart); |
||||
server_to_client_push_state_ = |
||||
ServerToClientPushState::kPushedServerInitialMetadata; |
||||
server_to_client_push_waiter_.Wake(); |
||||
return Success{}; |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::BeginPushServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] BeginPushServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "BeginPushServerToClientMessage called before " |
||||
"PushServerInitialMetadata"; |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
server_to_client_push_state_ = |
||||
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage; |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently"; |
||||
break; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
// Will fail in poll.
|
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag> |
||||
CallState::PollPushServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPushServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
LOG(FATAL) << "PollPushServerToClientMessage called before " |
||||
<< "PushServerInitialMetadata"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return false; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kIdle: |
||||
return Success{}; |
||||
case ServerToClientPushState::kFinished: |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool |
||||
CallState::PushServerTrailingMetadata(bool cancel) { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PushServerTrailingMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_, |
||||
server_to_client_push_state_, |
||||
client_to_server_push_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return false; |
||||
} |
||||
server_trailing_metadata_state_ = |
||||
cancel ? ServerTrailingMetadataState::kPushedCancel |
||||
: ServerTrailingMetadataState::kPushed; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
if (cancel) { |
||||
server_to_client_push_state_ = ServerToClientPushState::kFinished; |
||||
server_to_client_push_waiter_.Wake(); |
||||
} |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
if (cancel) { |
||||
server_to_client_push_state_ = ServerToClientPushState::kFinished; |
||||
server_to_client_push_waiter_.Wake(); |
||||
} |
||||
break; |
||||
case ServerToClientPushState::kFinished: |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
break; |
||||
} |
||||
switch (client_to_server_push_state_) { |
||||
case ClientToServerPushState::kIdle: |
||||
client_to_server_push_state_ = ClientToServerPushState::kFinished; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedMessage: |
||||
case ClientToServerPushState::kPushedMessageAndHalfClosed: |
||||
client_to_server_push_state_ = ClientToServerPushState::kFinished; |
||||
client_to_server_push_waiter_.Wake(); |
||||
break; |
||||
case ClientToServerPushState::kPushedHalfClose: |
||||
case ClientToServerPushState::kFinished: |
||||
break; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool> |
||||
CallState::PollPullServerInitialMetadataAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullServerInitialMetadataAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_); |
||||
bool reading; |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
if (server_to_client_push_state_ == |
||||
ServerToClientPushState::kTrailersOnly) { |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
return false; |
||||
} |
||||
server_to_client_push_waiter_.pending(); |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStartedReading: |
||||
reading = true; |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
reading = false; |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return false; |
||||
} |
||||
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted || |
||||
server_to_client_pull_state_ == |
||||
ServerToClientPullState::kStartedReading) |
||||
<< server_to_client_pull_state_; |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
server_to_client_pull_state_ = |
||||
reading |
||||
? ServerToClientPullState::kProcessingServerInitialMetadataReading |
||||
: ServerToClientPullState::kProcessingServerInitialMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return true; |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
LOG(FATAL) |
||||
<< "PollPullServerInitialMetadataAvailable after metadata processed"; |
||||
case ServerToClientPushState::kFinished: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return false; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
return false; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerInitialMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerInitialMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start"; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
CHECK_EQ(server_to_client_push_state_, |
||||
ServerToClientPushState::kTrailersOnly); |
||||
return; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kIdle; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kReading; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kIdle: |
||||
case ServerToClientPullState::kReading: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return; |
||||
} |
||||
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle || |
||||
server_to_client_pull_state_ == ServerToClientPullState::kReading) |
||||
<< server_to_client_pull_state_; |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial " |
||||
"metadata consumed"; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
server_to_client_push_state_ = ServerToClientPushState::kIdle; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
case ServerToClientPushState::kFinished: |
||||
LOG(FATAL) << "FinishPullServerInitialMetadata called twice"; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>> |
||||
CallState::PollPullServerToClientMessageAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollPullServerToClientMessageAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_, |
||||
server_trailing_metadata_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading; |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerInitialMetadataReading; |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStarted: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPullState::kStartedReading: |
||||
if (server_to_client_push_state_ == |
||||
ServerToClientPushState::kTrailersOnly) { |
||||
return false; |
||||
} |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kIdle: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kReading; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPullState::kReading: |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " |
||||
"processing a message"; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " |
||||
"processing trailing metadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return Failure{}; |
||||
} |
||||
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading); |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kIdle: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
return false; |
||||
} |
||||
server_trailing_metadata_waiter_.pending(); |
||||
return server_to_client_push_waiter_.pending(); |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
DCHECK_NE(server_trailing_metadata_state_, |
||||
ServerTrailingMetadataState::kNotPushed); |
||||
return false; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerToClientMessage; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return true; |
||||
case ServerToClientPushState::kFinished: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kTerminated; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Failure{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerToClientMessage() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerToClientMessage: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
LOG(FATAL) |
||||
<< "FinishPullServerToClientMessage called before metadata available"; |
||||
case ServerToClientPullState::kIdle: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called twice"; |
||||
case ServerToClientPullState::kReading: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called before " |
||||
<< "PollPullServerToClientMessageAvailable"; |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
server_to_client_pull_state_ = ServerToClientPullState::kIdle; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called while processing " |
||||
"trailing metadata"; |
||||
case ServerToClientPullState::kTerminated: |
||||
break; |
||||
} |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState::kStart: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called before initial " |
||||
"metadata consumed"; |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called after " |
||||
"PushServerTrailingMetadata"; |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_push_state_ = ServerToClientPushState::kIdle; |
||||
server_to_client_push_waiter_.Wake(); |
||||
break; |
||||
case ServerToClientPushState::kIdle: |
||||
LOG(FATAL) << "FinishPullServerToClientMessage called without a message"; |
||||
case ServerToClientPushState::kFinished: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty> |
||||
CallState::PollServerTrailingMetadataAvailable() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollServerTrailingMetadataAvailable: " |
||||
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_, |
||||
server_to_client_push_state_, |
||||
server_trailing_metadata_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
switch (server_to_client_pull_state_) { |
||||
case ServerToClientPullState::kProcessingServerInitialMetadata: |
||||
case ServerToClientPullState::kProcessingServerToClientMessage: |
||||
case ServerToClientPullState::kProcessingServerInitialMetadataReading: |
||||
case ServerToClientPullState::kUnstartedReading: |
||||
return server_to_client_pull_waiter_.pending(); |
||||
case ServerToClientPullState::kStartedReading: |
||||
case ServerToClientPullState::kReading: |
||||
switch (server_to_client_push_state_) { |
||||
case ServerToClientPushState::kTrailersOnly: |
||||
case ServerToClientPushState::kIdle: |
||||
case ServerToClientPushState::kStart: |
||||
case ServerToClientPushState::kFinished: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerTrailingMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Empty{}; |
||||
} |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case ServerToClientPushState::kPushedServerInitialMetadata: |
||||
case ServerToClientPushState:: |
||||
kPushedServerInitialMetadataAndPushedMessage: |
||||
case ServerToClientPushState::kPushedMessage: |
||||
server_to_client_push_waiter_.pending(); |
||||
return server_to_client_pull_waiter_.pending(); |
||||
} |
||||
break; |
||||
case ServerToClientPullState::kStarted: |
||||
case ServerToClientPullState::kUnstarted: |
||||
case ServerToClientPullState::kIdle: |
||||
if (server_trailing_metadata_state_ != |
||||
ServerTrailingMetadataState::kNotPushed) { |
||||
server_to_client_pull_state_ = |
||||
ServerToClientPullState::kProcessingServerTrailingMetadata; |
||||
server_to_client_pull_waiter_.Wake(); |
||||
return Empty{}; |
||||
} |
||||
return server_trailing_metadata_waiter_.pending(); |
||||
case ServerToClientPullState::kProcessingServerTrailingMetadata: |
||||
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice"; |
||||
case ServerToClientPullState::kTerminated: |
||||
return Empty{}; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void |
||||
CallState::FinishPullServerTrailingMetadata() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] FinishPullServerTrailingMetadata: " |
||||
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_, |
||||
server_trailing_metadata_waiter_.DebugString()); |
||||
switch (server_trailing_metadata_state_) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
LOG(FATAL) << "FinishPullServerTrailingMetadata called before " |
||||
"PollServerTrailingMetadataAvailable"; |
||||
case ServerTrailingMetadataState::kPushed: |
||||
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
break; |
||||
case ServerTrailingMetadataState::kPushedCancel: |
||||
server_trailing_metadata_state_ = |
||||
ServerTrailingMetadataState::kPulledCancel; |
||||
server_trailing_metadata_waiter_.Wake(); |
||||
break; |
||||
case ServerTrailingMetadataState::kPulled: |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice"; |
||||
} |
||||
} |
||||
|
||||
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool> |
||||
CallState::PollWasCancelled() { |
||||
GRPC_TRACE_LOG(call_state, INFO) |
||||
<< "[call_state] PollWasCancelled: " |
||||
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_); |
||||
switch (server_trailing_metadata_state_) { |
||||
case ServerTrailingMetadataState::kNotPushed: |
||||
case ServerTrailingMetadataState::kPushed: |
||||
case ServerTrailingMetadataState::kPushedCancel: { |
||||
return server_trailing_metadata_waiter_.pending(); |
||||
} |
||||
case ServerTrailingMetadataState::kPulled: |
||||
return false; |
||||
case ServerTrailingMetadataState::kPulledCancel: |
||||
return true; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
|
@ -0,0 +1,310 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/transport/call_state.h" |
||||
|
||||
#include <vector> |
||||
|
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include "test/core/promise/poll_matcher.h" |
||||
|
||||
using testing::Mock; |
||||
using testing::StrictMock; |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
// A mock activity that can be activated and deactivated.
|
||||
class MockActivity : public Activity, public Wakeable { |
||||
public: |
||||
MOCK_METHOD(void, WakeupRequested, ()); |
||||
|
||||
void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void Orphan() override {} |
||||
Waker MakeOwningWaker() override { return Waker(this, 0); } |
||||
Waker MakeNonOwningWaker() override { return Waker(this, 0); } |
||||
void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); } |
||||
void Drop(WakeupMask /*mask*/) override {} |
||||
std::string DebugTag() const override { return "MockActivity"; } |
||||
std::string ActivityDebugTag(WakeupMask /*mask*/) const override { |
||||
return DebugTag(); |
||||
} |
||||
|
||||
void Activate() { |
||||
if (scoped_activity_ == nullptr) { |
||||
scoped_activity_ = std::make_unique<ScopedActivity>(this); |
||||
} |
||||
} |
||||
|
||||
void Deactivate() { scoped_activity_.reset(); } |
||||
|
||||
private: |
||||
std::unique_ptr<ScopedActivity> scoped_activity_; |
||||
}; |
||||
|
||||
#define EXPECT_WAKEUP(activity, statement) \ |
||||
EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
|
||||
statement; \
|
||||
Mock::VerifyAndClearExpectations(&(activity)); |
||||
|
||||
} // namespace
|
||||
|
||||
TEST(CallStateTest, NoOp) { CallState state; } |
||||
|
||||
TEST(CallStateTest, StartTwiceCrashes) { |
||||
CallState state; |
||||
state.Start(); |
||||
EXPECT_DEATH(state.Start(), ""); |
||||
} |
||||
|
||||
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.Start()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); |
||||
} |
||||
|
||||
TEST(CallStateTest, PullClientInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), ""); |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
state.BeginPushClientToServerMessage(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
state.BeginPullClientInitialMetadata(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
|
||||
// Message 0
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 1
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 2: push before polling
|
||||
state.BeginPushClientToServerMessage(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// Message 3: push before polling and half close
|
||||
state.BeginPushClientToServerMessage(); |
||||
state.ClientToServerHalfClose(); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); |
||||
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); |
||||
|
||||
// ... and now we should see the half close
|
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ImmediateClientToServerHalfClose) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.BeginPullClientInitialMetadata(); |
||||
state.FinishPullClientInitialMetadata(); |
||||
state.ClientToServerHalfClose(); |
||||
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.Start()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); |
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), |
||||
IsReady(true))); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, RepeatedServerToClientMessages) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.PushServerInitialMetadata(); |
||||
state.Start(); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
|
||||
// Message 0
|
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 1
|
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 2: push before polling
|
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
|
||||
// Message 3: push before polling
|
||||
state.BeginPushServerToClientMessage(); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); |
||||
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailersOnly) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
state.FinishPullServerTrailingMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
state.FinishPullServerTrailingMetadata(); |
||||
} |
||||
|
||||
TEST(CallStateTest, RecallNoCancellation) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(false); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsReady(false)); |
||||
} |
||||
|
||||
TEST(CallStateTest, RecallCancellation) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerTrailingMetadata(true); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); |
||||
EXPECT_THAT(state.PollWasCancelled(), IsReady(true)); |
||||
} |
||||
|
||||
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) { |
||||
StrictMock<MockActivity> activity; |
||||
activity.Activate(); |
||||
CallState state; |
||||
state.Start(); |
||||
state.PushServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); |
||||
state.FinishPullServerInitialMetadata(); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); |
||||
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false)); |
||||
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false)); |
||||
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_tracer_init(); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue