Merge branch 'master' into posix-ee-client-flip

pull/35669/head
AJ Heller 1 year ago
commit 0d563ed3a1
  1. 12
      bazel/experiments.bzl
  2. 2
      src/core/lib/experiments/rollouts.yaml
  3. 12
      src/python/grpcio/grpc/aio/_base_call.py
  4. 59
      test/cpp/interop/xds_interop_client.cc
  5. 7
      test/cpp/interop/xds_interop_server_lib.cc

@ -70,7 +70,6 @@ EXPERIMENTS = {
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
@ -89,9 +88,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
],
"lb_unit_test": [
"work_serializer_dispatch",
],
@ -149,7 +145,6 @@ EXPERIMENTS = {
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
@ -168,9 +163,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
],
"lb_unit_test": [
"work_serializer_dispatch",
],
@ -225,7 +217,6 @@ EXPERIMENTS = {
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
@ -244,9 +235,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
],
"lb_unit_test": [
"work_serializer_dispatch",
],

@ -87,7 +87,7 @@
- name: pick_first_happy_eyeballs
default: true
- name: promise_based_client_call
default: false
default: broken
- name: promise_based_server_call
default: false
- name: registered_method_lookup_in_transport

@ -169,6 +169,9 @@ class UnaryStreamCall(
Read operations must be serialized when called from multiple
coroutines.
Note that the iterator and read/write APIs may not be mixed on
a single RPC.
Returns:
A response message, or an `grpc.aio.EOF` to indicate the end of the
stream.
@ -182,6 +185,9 @@ class StreamUnaryCall(
async def write(self, request: RequestType) -> None:
"""Writes one message to the stream.
Note that the iterator and read/write APIs may not be mixed on
a single RPC.
Raises:
An RpcError exception if the write failed.
"""
@ -223,6 +229,9 @@ class StreamStreamCall(
Read operations must be serialized when called from multiple
coroutines.
Note that the iterator and read/write APIs may not be mixed on
a single RPC.
Returns:
A response message, or an `grpc.aio.EOF` to indicate the end of the
stream.
@ -232,6 +241,9 @@ class StreamStreamCall(
async def write(self, request: RequestType) -> None:
"""Writes one message to the stream.
Note that the iterator and read/write APIs may not be mixed on
a single RPC.
Raises:
An RpcError exception if the write failed.
"""

@ -67,6 +67,12 @@ ABSL_FLAG(int32_t, stats_port, 50052,
"Port to expose peer distribution stats service.");
ABSL_FLAG(std::string, rpc, "UnaryCall",
"a comma separated list of rpc methods.");
ABSL_FLAG(int32_t, request_payload_size, 0,
"Set the SimpleRequest.payload.body to a string of repeated 0 (zero) "
"ASCII characters of the given size in bytes.");
ABSL_FLAG(int32_t, response_payload_size, 0,
"Ask the server to respond with SimpleResponse.payload.body of the "
"given length (may not be implemented on the server).");
ABSL_FLAG(std::string, metadata, "", "metadata to send with the RPC.");
ABSL_FLAG(std::string, expect_status, "OK",
"RPC status for the test RPC to be considered successful");
@ -117,6 +123,9 @@ struct RpcConfig {
ClientConfigureRequest::RpcType type;
std::vector<std::pair<std::string, std::string>> metadata;
int timeout_sec = 0;
std::string request_payload;
int request_payload_size = 0;
int response_payload_size = 0;
};
struct RpcConfigurationsQueue {
// A queue of RPC configurations detailing how RPCs should be sent.
@ -154,11 +163,17 @@ class TestClient {
std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
}
}
SimpleRequest request;
request.set_response_size(config.response_payload_size);
if (config.request_payload_size > 0) {
request.mutable_payload()->set_body(config.request_payload.c_str(),
config.request_payload_size);
}
call->context.set_deadline(deadline);
call->result.saved_request_id = saved_request_id;
call->result.rpc_type = ClientConfigureRequest::UNARY_CALL;
call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
&call->context, SimpleRequest::default_instance(), &cq_);
call->simple_response_reader =
stub_->PrepareAsyncUnaryCall(&call->context, request, &cq_);
call->simple_response_reader->StartCall();
call->simple_response_reader->Finish(&call->result.simple_response,
&call->result.status, call);
@ -324,6 +339,10 @@ class XdsUpdateClientConfigureServiceImpl
metadata_map[data.type()].push_back({data.key(), data.value()});
}
std::vector<RpcConfig> configs;
int request_payload_size = absl::GetFlag(FLAGS_request_payload_size);
int response_payload_size = absl::GetFlag(FLAGS_response_payload_size);
GPR_ASSERT(request_payload_size >= 0);
GPR_ASSERT(response_payload_size >= 0);
for (const auto& rpc : request->types()) {
RpcConfig config;
config.timeout_sec = request->timeout_sec();
@ -332,6 +351,22 @@ class XdsUpdateClientConfigureServiceImpl
if (metadata_iter != metadata_map.end()) {
config.metadata = metadata_iter->second;
}
if (request_payload_size > 0 &&
config.type == ClientConfigureRequest::EMPTY_CALL) {
gpr_log(GPR_ERROR,
"request_payload_size should not be set "
"for EMPTY_CALL");
}
if (response_payload_size > 0 &&
config.type == ClientConfigureRequest::EMPTY_CALL) {
gpr_log(GPR_ERROR,
"response_payload_size should not be set "
"for EMPTY_CALL");
}
config.request_payload_size = request_payload_size;
std::string payload(config.request_payload_size, '0');
config.request_payload = payload;
config.response_payload_size = response_payload_size;
configs.push_back(std::move(config));
}
{
@ -459,6 +494,10 @@ void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
std::vector<RpcConfig> configs;
std::vector<std::string> rpc_methods =
absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty());
int request_payload_size = absl::GetFlag(FLAGS_request_payload_size);
int response_payload_size = absl::GetFlag(FLAGS_response_payload_size);
GPR_ASSERT(request_payload_size >= 0);
GPR_ASSERT(response_payload_size >= 0);
for (const std::string& rpc_method : rpc_methods) {
RpcConfig config;
if (rpc_method == "EmptyCall") {
@ -472,6 +511,22 @@ void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
if (metadata_iter != metadata_map.end()) {
config.metadata = metadata_iter->second;
}
if (request_payload_size > 0 &&
config.type == ClientConfigureRequest::EMPTY_CALL) {
gpr_log(GPR_ERROR,
"request_payload_size should not be set "
"for EMPTY_CALL");
}
if (response_payload_size > 0 &&
config.type == ClientConfigureRequest::EMPTY_CALL) {
gpr_log(GPR_ERROR,
"response_payload_size should not be set "
"for EMPTY_CALL");
}
config.request_payload_size = request_payload_size;
std::string payload(config.request_payload_size, '0');
config.request_payload = payload;
config.response_payload_size = response_payload_size;
configs.push_back(std::move(config));
}
{

@ -81,7 +81,7 @@ class TestServiceImpl : public TestService::Service {
absl::string_view server_id)
: hostname_(hostname), server_id_(server_id) {}
Status UnaryCall(ServerContext* context, const SimpleRequest* /*request*/,
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
response->set_server_id(server_id_);
for (const auto& rpc_behavior : GetRpcBehaviorMetadata(context)) {
@ -91,6 +91,11 @@ class TestServiceImpl : public TestService::Service {
return *maybe_status;
}
}
if (request->response_size() > 0) {
std::string payload(request->response_size(), '0');
response->mutable_payload()->set_body(payload.c_str(),
request->response_size());
}
response->set_hostname(hostname_);
context->AddInitialMetadata("hostname", hostname_);
return Status::OK;

Loading…
Cancel
Save