mirror of https://github.com/grpc/grpc.git
commit
ab0d600849
219 changed files with 7459 additions and 7776 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,50 @@ |
||||
# Copyright 2023 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
cc_binary( |
||||
name = "csm_greeter_client", |
||||
srcs = ["csm_greeter_client.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//:grpcpp_csm_observability", |
||||
"//examples/cpp/otel:util", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
"@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", |
||||
"@io_opentelemetry_cpp//sdk/src/metrics", |
||||
], |
||||
) |
||||
|
||||
cc_binary( |
||||
name = "csm_greeter_server", |
||||
srcs = ["csm_greeter_server.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//:grpc++_reflection", |
||||
"//:grpcpp_admin", |
||||
"//:grpcpp_csm_observability", |
||||
"//examples/cpp/otel:util", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
"@com_google_absl//absl/log", |
||||
"@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", |
||||
"@io_opentelemetry_cpp//sdk/src/metrics", |
||||
], |
||||
) |
@ -0,0 +1,37 @@ |
||||
# Copyright 2023 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
FROM python:3.9-slim-bookworm |
||||
|
||||
RUN apt-get update -y && apt-get upgrade -y && apt-get install -y build-essential clang curl |
||||
|
||||
WORKDIR /workdir |
||||
|
||||
RUN ln -s /usr/bin/python3 /usr/bin/python |
||||
RUN mkdir /artifacts |
||||
|
||||
COPY . . |
||||
RUN tools/bazel build //examples/cpp/csm/observability:csm_greeter_client |
||||
RUN cp -rL /workdir/bazel-bin/examples/cpp/csm/observability/csm_greeter_client /artifacts/ |
||||
|
||||
FROM python:3.9-slim-bookworm |
||||
|
||||
RUN apt-get update \ |
||||
&& apt-get -y upgrade \ |
||||
&& apt-get -y autoremove \ |
||||
&& apt-get install -y curl |
||||
|
||||
COPY --from=0 /artifacts ./ |
||||
|
||||
ENTRYPOINT ["/csm_greeter_client"] |
@ -0,0 +1,37 @@ |
||||
# Copyright 2023 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
FROM python:3.9-slim-bookworm |
||||
|
||||
RUN apt-get update -y && apt-get upgrade -y && apt-get install -y build-essential clang curl |
||||
|
||||
WORKDIR /workdir |
||||
|
||||
RUN ln -s /usr/bin/python3 /usr/bin/python |
||||
RUN mkdir /artifacts |
||||
|
||||
COPY . . |
||||
RUN tools/bazel build //examples/cpp/csm/observability:csm_greeter_server |
||||
RUN cp -rL /workdir/bazel-bin/examples/cpp/csm/observability/csm_greeter_server /artifacts/ |
||||
|
||||
FROM python:3.9-slim-bookworm |
||||
|
||||
RUN apt-get update \ |
||||
&& apt-get -y upgrade \ |
||||
&& apt-get -y autoremove \ |
||||
&& apt-get install -y curl |
||||
|
||||
COPY --from=0 /artifacts ./ |
||||
|
||||
ENTRYPOINT ["/csm_greeter_server"] |
@ -0,0 +1,35 @@ |
||||
# gRPC C++ CSM Hello World Example |
||||
|
||||
This CSM example builds on the [Hello World Example](https://github.com/grpc/grpc/tree/master/examples/cpp/helloworld) and changes the gRPC client and server to test CSM observability. |
||||
|
||||
## Configuration |
||||
|
||||
The client takes the following command-line arguments - |
||||
* target - By default, the client tries to connect to the xDS "xds:///helloworld:50051" and gRPC would use xDS to resolve this target and connect to the server backend. This can be overridden to change the target. |
||||
* prometheus_endpoint - Endpoint used for prometheus. Default value is localhost:9464 |
||||
|
||||
|
||||
The server takes the following command-line arguments - |
||||
* port - Port on which the Hello World service is run. Defaults to 50051. |
||||
* prometheus_endpoint - Endpoint used for prometheus. Default value is localhost:9464 |
||||
|
||||
## Building |
||||
|
||||
From the gRPC workspace folder: |
||||
|
||||
Client: |
||||
``` |
||||
docker build -f examples/cpp/csm/observability/Dockerfile.client |
||||
``` |
||||
Server: |
||||
``` |
||||
docker build -f examples/cpp/csm/observability/Dockerfile.server |
||||
``` |
||||
|
||||
To push to a registry, add a tag to the image either by adding a `-t` flag to `docker build` command above or run: |
||||
|
||||
``` |
||||
docker image tag ${sha from build command above} ${tag} |
||||
``` |
||||
|
||||
And then push the tagged image using `docker push` |
@ -0,0 +1,79 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2023 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <sys/types.h> |
||||
|
||||
#include <chrono> |
||||
#include <condition_variable> |
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "absl/types/optional.h" |
||||
#include "examples/cpp/otel/util.h" |
||||
#include "opentelemetry/exporters/prometheus/exporter_factory.h" |
||||
#include "opentelemetry/exporters/prometheus/exporter_options.h" |
||||
#include "opentelemetry/sdk/metrics/meter_provider.h" |
||||
|
||||
#include <grpcpp/ext/csm_observability.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/support/string_ref.h> |
||||
|
||||
ABSL_FLAG(std::string, target, "xds:///helloworld:50051", "Target string"); |
||||
ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9464", |
||||
"Prometheus exporter endpoint"); |
||||
|
||||
namespace { |
||||
|
||||
absl::StatusOr<grpc::CsmObservability> InitializeObservability() { |
||||
opentelemetry::exporter::metrics::PrometheusExporterOptions opts; |
||||
// default was "localhost:9464" which causes connection issue across GKE pods
|
||||
opts.url = "0.0.0.0:9464"; |
||||
auto prometheus_exporter = |
||||
opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(opts); |
||||
auto meter_provider = |
||||
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(); |
||||
// The default histogram boundaries are not granular enough for RPCs. Override
|
||||
// the "grpc.client.attempt.duration" view as recommended by
|
||||
// https://github.com/grpc/proposal/blob/master/A66-otel-stats.md.
|
||||
AddLatencyView(meter_provider.get(), "grpc.client.attempt.duration", "s"); |
||||
meter_provider->AddMetricReader(std::move(prometheus_exporter)); |
||||
return grpc::CsmObservabilityBuilder() |
||||
.SetMeterProvider(std::move(meter_provider)) |
||||
.BuildAndRegister(); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
// Setup CSM observability
|
||||
auto observability = InitializeObservability(); |
||||
if (!observability.ok()) { |
||||
std::cerr << "CsmObservability::Init() failed: " |
||||
<< observability.status().ToString() << std::endl; |
||||
return static_cast<int>(observability.status().code()); |
||||
} |
||||
|
||||
// Continuously send RPCs every second.
|
||||
RunClient(absl::GetFlag(FLAGS_target)); |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,67 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2023 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "absl/log/log.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "examples/cpp/otel/util.h" |
||||
#include "opentelemetry/exporters/prometheus/exporter_factory.h" |
||||
#include "opentelemetry/exporters/prometheus/exporter_options.h" |
||||
#include "opentelemetry/sdk/metrics/meter_provider.h" |
||||
|
||||
#include <grpcpp/ext/admin_services.h> |
||||
#include <grpcpp/ext/csm_observability.h> |
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/health_check_service_interface.h> |
||||
#include <grpcpp/xds_server_builder.h> |
||||
|
||||
ABSL_FLAG(int32_t, port, 50051, "Server port for service."); |
||||
ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9464", |
||||
"Prometheus exporter endpoint"); |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
opentelemetry::exporter::metrics::PrometheusExporterOptions opts; |
||||
// default was "localhost:9464" which causes connection issue across GKE pods
|
||||
opts.url = "0.0.0.0:9464"; |
||||
auto prometheus_exporter = |
||||
opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(opts); |
||||
auto meter_provider = |
||||
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(); |
||||
// The default histogram boundaries are not granular enough for RPCs. Override
|
||||
// the "grpc.server.call.duration" view as recommended by
|
||||
// https://github.com/grpc/proposal/blob/master/A66-otel-stats.md.
|
||||
AddLatencyView(meter_provider.get(), "grpc.server.call.duration", "s"); |
||||
meter_provider->AddMetricReader(std::move(prometheus_exporter)); |
||||
auto observability = grpc::CsmObservabilityBuilder() |
||||
.SetMeterProvider(std::move(meter_provider)) |
||||
.BuildAndRegister(); |
||||
if (!observability.ok()) { |
||||
std::cerr << "CsmObservability::Init() failed: " |
||||
<< observability.status().ToString() << std::endl; |
||||
return static_cast<int>(observability.status().code()); |
||||
} |
||||
RunServer(absl::GetFlag(FLAGS_port)); |
||||
return 0; |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,286 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/surface/call_utils.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/channel/status_util.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/match.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/surface/validate_metadata.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array, |
||||
bool is_client) { |
||||
const auto md_count = md->count(); |
||||
if (md_count > array->capacity) { |
||||
array->capacity = |
||||
std::max(array->capacity + md->count(), array->capacity * 3 / 2); |
||||
array->metadata = static_cast<grpc_metadata*>( |
||||
gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity)); |
||||
} |
||||
PublishToAppEncoder encoder(array, md, is_client); |
||||
md->Encode(&encoder); |
||||
} |
||||
|
||||
void CToMetadata(grpc_metadata* metadata, size_t count, |
||||
grpc_metadata_batch* b) { |
||||
for (size_t i = 0; i < count; i++) { |
||||
grpc_metadata* md = &metadata[i]; |
||||
auto key = StringViewFromSlice(md->key); |
||||
// Filter "content-length metadata"
|
||||
if (key == "content-length") continue; |
||||
b->Append(key, Slice(CSliceRef(md->value)), |
||||
[md](absl::string_view error, const Slice& value) { |
||||
gpr_log(GPR_DEBUG, "Append error: %s", |
||||
absl::StrCat("key=", StringViewFromSlice(md->key), |
||||
" error=", error, |
||||
" value=", value.as_string_view()) |
||||
.c_str()); |
||||
}); |
||||
} |
||||
} |
||||
|
||||
const char* GrpcOpTypeName(grpc_op_type op) { |
||||
switch (op) { |
||||
case GRPC_OP_SEND_INITIAL_METADATA: |
||||
return "SendInitialMetadata"; |
||||
case GRPC_OP_SEND_MESSAGE: |
||||
return "SendMessage"; |
||||
case GRPC_OP_SEND_STATUS_FROM_SERVER: |
||||
return "SendStatusFromServer"; |
||||
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
||||
return "SendCloseFromClient"; |
||||
case GRPC_OP_RECV_MESSAGE: |
||||
return "RecvMessage"; |
||||
case GRPC_OP_RECV_CLOSE_ON_SERVER: |
||||
return "RecvCloseOnServer"; |
||||
case GRPC_OP_RECV_INITIAL_METADATA: |
||||
return "RecvInitialMetadata"; |
||||
case GRPC_OP_RECV_STATUS_ON_CLIENT: |
||||
return "RecvStatusOnClient"; |
||||
} |
||||
Crash("Unreachable"); |
||||
} |
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
// WaitForCqEndOp
|
||||
|
||||
Poll<Empty> WaitForCqEndOp::operator()() { |
||||
if (grpc_trace_promise_primitives.enabled()) { |
||||
gpr_log(GPR_INFO, "%sWaitForCqEndOp[%p] %s", |
||||
Activity::current()->DebugTag().c_str(), this, |
||||
StateString(state_).c_str()); |
||||
} |
||||
if (auto* n = absl::get_if<NotStarted>(&state_)) { |
||||
if (n->is_closure) { |
||||
ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(n->tag), |
||||
std::move(n->error)); |
||||
return Empty{}; |
||||
} else { |
||||
auto not_started = std::move(*n); |
||||
auto& started = |
||||
state_.emplace<Started>(GetContext<Activity>()->MakeOwningWaker()); |
||||
grpc_cq_end_op( |
||||
not_started.cq, not_started.tag, std::move(not_started.error), |
||||
[](void* p, grpc_cq_completion*) { |
||||
auto started = static_cast<Started*>(p); |
||||
auto wakeup = std::move(started->waker); |
||||
started->done.store(true, std::memory_order_release); |
||||
wakeup.Wakeup(); |
||||
}, |
||||
&started, &started.completion); |
||||
} |
||||
} |
||||
auto& started = absl::get<Started>(state_); |
||||
if (started.done.load(std::memory_order_acquire)) { |
||||
return Empty{}; |
||||
} else { |
||||
return Pending{}; |
||||
} |
||||
} |
||||
|
||||
std::string WaitForCqEndOp::StateString(const State& state) { |
||||
return Match( |
||||
state, |
||||
[](const NotStarted& x) { |
||||
return absl::StrFormat( |
||||
"NotStarted{is_closure=%s, tag=%p, error=%s, cq=%p}", |
||||
x.is_closure ? "true" : "false", x.tag, x.error.ToString(), x.cq); |
||||
}, |
||||
[](const Started& x) { |
||||
return absl::StrFormat( |
||||
"Started{completion=%p, done=%s}", &x.completion, |
||||
x.done.load(std::memory_order_relaxed) ? "true" : "false"); |
||||
}, |
||||
[](const Invalid&) -> std::string { return "Invalid{}"; }); |
||||
} |
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
// MessageReceiver
|
||||
|
||||
StatusFlag MessageReceiver::FinishRecvMessage( |
||||
ValueOrFailure<absl::optional<MessageHandle>> result) { |
||||
if (!result.ok()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"%s[call] RecvMessage: outstanding_recv " |
||||
"finishes: received end-of-stream with error", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
*recv_message_ = nullptr; |
||||
recv_message_ = nullptr; |
||||
return Failure{}; |
||||
} |
||||
if (!result->has_value()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"%s[call] RecvMessage: outstanding_recv " |
||||
"finishes: received end-of-stream", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
*recv_message_ = nullptr; |
||||
recv_message_ = nullptr; |
||||
return Success{}; |
||||
} |
||||
MessageHandle& message = **result; |
||||
test_only_last_message_flags_ = message->flags(); |
||||
if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && |
||||
(incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { |
||||
*recv_message_ = grpc_raw_compressed_byte_buffer_create( |
||||
nullptr, 0, incoming_compression_algorithm_); |
||||
} else { |
||||
*recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); |
||||
} |
||||
grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), |
||||
&(*recv_message_)->data.raw.slice_buffer); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"%s[call] RecvMessage: outstanding_recv " |
||||
"finishes: received %" PRIdPTR " byte message", |
||||
Activity::current()->DebugTag().c_str(), |
||||
(*recv_message_)->data.raw.slice_buffer.length); |
||||
} |
||||
recv_message_ = nullptr; |
||||
return Success{}; |
||||
} |
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
// MakeErrorString
|
||||
|
||||
std::string MakeErrorString(const ServerMetadata* trailing_metadata) { |
||||
std::string out = absl::StrCat( |
||||
trailing_metadata->get(GrpcStatusFromWire()).value_or(false) |
||||
? "Error received from peer" |
||||
: "Error generated by client", |
||||
"grpc_status: ", |
||||
grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata()) |
||||
.value_or(GRPC_STATUS_UNKNOWN))); |
||||
if (const Slice* message = |
||||
trailing_metadata->get_pointer(GrpcMessageMetadata())) { |
||||
absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view()); |
||||
} |
||||
if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) { |
||||
absl::StrAppend(&out, "\nStatus Context:"); |
||||
for (const std::string& annotation : *annotations) { |
||||
absl::StrAppend(&out, "\n ", annotation); |
||||
} |
||||
} |
||||
return out; |
||||
} |
||||
|
||||
bool ValidateMetadata(size_t count, grpc_metadata* metadata) { |
||||
if (count > INT_MAX) { |
||||
return false; |
||||
} |
||||
for (size_t i = 0; i < count; i++) { |
||||
grpc_metadata* md = &metadata[i]; |
||||
if (!GRPC_LOG_IF_ERROR("validate_metadata", |
||||
grpc_validate_header_key_is_legal(md->key))) { |
||||
return false; |
||||
} else if (!grpc_is_binary_header_internal(md->key) && |
||||
!GRPC_LOG_IF_ERROR( |
||||
"validate_metadata", |
||||
grpc_validate_header_nonbin_value_is_legal(md->value))) { |
||||
return false; |
||||
} else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) { |
||||
// HTTP2 hpack encoding has a maximum limit.
|
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag, |
||||
bool is_notify_tag_closure) { |
||||
if (!is_notify_tag_closure) { |
||||
CHECK(grpc_cq_begin_op(cq, notify_tag)); |
||||
grpc_cq_end_op( |
||||
cq, notify_tag, absl::OkStatus(), |
||||
[](void*, grpc_cq_completion* completion) { gpr_free(completion); }, |
||||
nullptr, |
||||
static_cast<grpc_cq_completion*>( |
||||
gpr_malloc(sizeof(grpc_cq_completion)))); |
||||
} else { |
||||
Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag), |
||||
absl::OkStatus()); |
||||
} |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,457 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_SURFACE_CALL_UTILS_H |
||||
#define GRPC_SRC_CORE_LIB_SURFACE_CALL_UTILS_H |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <string> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/cancel_callback.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/transport/message.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class PublishToAppEncoder { |
||||
public: |
||||
explicit PublishToAppEncoder(grpc_metadata_array* dest, |
||||
const grpc_metadata_batch* encoding, |
||||
bool is_client) |
||||
: dest_(dest), encoding_(encoding), is_client_(is_client) {} |
||||
|
||||
void Encode(const Slice& key, const Slice& value) { |
||||
Append(key.c_slice(), value.c_slice()); |
||||
} |
||||
|
||||
// Catch anything that is not explicitly handled, and do not publish it to the
|
||||
// application. If new metadata is added to a batch that needs to be
|
||||
// published, it should be called out here.
|
||||
template <typename Which> |
||||
void Encode(Which, const typename Which::ValueType&) {} |
||||
|
||||
void Encode(UserAgentMetadata, const Slice& slice) { |
||||
Append(UserAgentMetadata::key(), slice); |
||||
} |
||||
|
||||
void Encode(HostMetadata, const Slice& slice) { |
||||
Append(HostMetadata::key(), slice); |
||||
} |
||||
|
||||
void Encode(GrpcPreviousRpcAttemptsMetadata, uint32_t count) { |
||||
Append(GrpcPreviousRpcAttemptsMetadata::key(), count); |
||||
} |
||||
|
||||
void Encode(GrpcRetryPushbackMsMetadata, Duration count) { |
||||
Append(GrpcRetryPushbackMsMetadata::key(), count.millis()); |
||||
} |
||||
|
||||
void Encode(LbTokenMetadata, const Slice& slice) { |
||||
Append(LbTokenMetadata::key(), slice); |
||||
} |
||||
|
||||
private: |
||||
void Append(absl::string_view key, int64_t value) { |
||||
Append(StaticSlice::FromStaticString(key).c_slice(), |
||||
Slice::FromInt64(value).c_slice()); |
||||
} |
||||
|
||||
void Append(absl::string_view key, const Slice& value) { |
||||
Append(StaticSlice::FromStaticString(key).c_slice(), value.c_slice()); |
||||
} |
||||
|
||||
void Append(grpc_slice key, grpc_slice value) { |
||||
if (dest_->count == dest_->capacity) { |
||||
Crash(absl::StrCat( |
||||
"Too many metadata entries: capacity=", dest_->capacity, " on ", |
||||
is_client_ ? "client" : "server", " encoding ", encoding_->count(), |
||||
" elements: ", encoding_->DebugString().c_str())); |
||||
} |
||||
auto* mdusr = &dest_->metadata[dest_->count++]; |
||||
mdusr->key = key; |
||||
mdusr->value = value; |
||||
} |
||||
|
||||
grpc_metadata_array* const dest_; |
||||
const grpc_metadata_batch* const encoding_; |
||||
const bool is_client_; |
||||
}; |
||||
|
||||
void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array, |
||||
bool is_client); |
||||
void CToMetadata(grpc_metadata* metadata, size_t count, grpc_metadata_batch* b); |
||||
const char* GrpcOpTypeName(grpc_op_type op); |
||||
|
||||
bool ValidateMetadata(size_t count, grpc_metadata* metadata); |
||||
void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag, |
||||
bool is_notify_tag_closure); |
||||
|
||||
inline bool AreWriteFlagsValid(uint32_t flags) { |
||||
// check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
|
||||
const uint32_t allowed_write_positions = |
||||
(GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); |
||||
const uint32_t invalid_positions = ~allowed_write_positions; |
||||
return !(flags & invalid_positions); |
||||
} |
||||
|
||||
inline bool AreInitialMetadataFlagsValid(uint32_t flags) { |
||||
// check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
|
||||
uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; |
||||
return !(flags & invalid_positions); |
||||
} |
||||
|
||||
// One batch operation
|
||||
// Wrapper around promise steps to perform once of the batch operations for the
|
||||
// legacy grpc surface api.
|
||||
template <typename SetupResult, grpc_op_type kOp> |
||||
class OpHandlerImpl { |
||||
public: |
||||
using PromiseFactory = promise_detail::OncePromiseFactory<void, SetupResult>; |
||||
using Promise = typename PromiseFactory::Promise; |
||||
static_assert(!std::is_same<Promise, void>::value, |
||||
"PromiseFactory must return a promise"); |
||||
|
||||
OpHandlerImpl() : state_(State::kDismissed) {} |
||||
explicit OpHandlerImpl(SetupResult result) : state_(State::kPromiseFactory) { |
||||
Construct(&promise_factory_, std::move(result)); |
||||
} |
||||
|
||||
~OpHandlerImpl() { |
||||
switch (state_) { |
||||
case State::kDismissed: |
||||
break; |
||||
case State::kPromiseFactory: |
||||
Destruct(&promise_factory_); |
||||
break; |
||||
case State::kPromise: |
||||
Destruct(&promise_); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
OpHandlerImpl(const OpHandlerImpl&) = delete; |
||||
OpHandlerImpl& operator=(const OpHandlerImpl&) = delete; |
||||
OpHandlerImpl(OpHandlerImpl&& other) noexcept : state_(other.state_) { |
||||
switch (state_) { |
||||
case State::kDismissed: |
||||
break; |
||||
case State::kPromiseFactory: |
||||
Construct(&promise_factory_, std::move(other.promise_factory_)); |
||||
break; |
||||
case State::kPromise: |
||||
Construct(&promise_, std::move(other.promise_)); |
||||
break; |
||||
} |
||||
} |
||||
OpHandlerImpl& operator=(OpHandlerImpl&& other) noexcept = delete; |
||||
|
||||
Poll<StatusFlag> operator()() { |
||||
switch (state_) { |
||||
case State::kDismissed: |
||||
return Success{}; |
||||
case State::kPromiseFactory: { |
||||
auto promise = promise_factory_.Make(); |
||||
Destruct(&promise_factory_); |
||||
Construct(&promise_, std::move(promise)); |
||||
state_ = State::kPromise; |
||||
} |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case State::kPromise: { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, "%sBeginPoll %s", |
||||
Activity::current()->DebugTag().c_str(), OpName()); |
||||
} |
||||
auto r = poll_cast<StatusFlag>(promise_()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_INFO, "%sEndPoll %s --> %s", |
||||
Activity::current()->DebugTag().c_str(), OpName(), |
||||
r.pending() ? "PENDING" : (r.value().ok() ? "OK" : "FAILURE")); |
||||
} |
||||
return r; |
||||
} |
||||
} |
||||
GPR_UNREACHABLE_CODE(return Pending{}); |
||||
} |
||||
|
||||
private: |
||||
enum class State { |
||||
kDismissed, |
||||
kPromiseFactory, |
||||
kPromise, |
||||
}; |
||||
|
||||
static const char* OpName() { return GrpcOpTypeName(kOp); } |
||||
|
||||
// gcc-12 has problems with this being a variant
|
||||
GPR_NO_UNIQUE_ADDRESS State state_; |
||||
union { |
||||
PromiseFactory promise_factory_; |
||||
Promise promise_; |
||||
}; |
||||
}; |
||||
|
||||
template <grpc_op_type op_type, typename PromiseFactory> |
||||
auto OpHandler(PromiseFactory setup) { |
||||
return OpHandlerImpl<PromiseFactory, op_type>(std::move(setup)); |
||||
} |
||||
|
||||
class BatchOpIndex { |
||||
public: |
||||
BatchOpIndex(const grpc_op* ops, size_t nops) : ops_(ops) { |
||||
for (size_t i = 0; i < nops; i++) { |
||||
idxs_[ops[i].op] = static_cast<uint8_t>(i); |
||||
} |
||||
} |
||||
|
||||
// 1. Check if op_type is in the batch
|
||||
// 2. If it is, run the setup function in the context of the API call (NOT in
|
||||
// the call party).
|
||||
// 3. This setup function returns a promise factory which we'll then run *in*
|
||||
// the party to do initial setup, and have it return the promise that we'll
|
||||
// ultimately poll on til completion.
|
||||
// Once we express our surface API in terms of core internal types this whole
|
||||
// dance will go away.
|
||||
template <grpc_op_type op_type, typename SetupFn> |
||||
auto OpHandler(SetupFn setup) { |
||||
using SetupResult = decltype(std::declval<SetupFn>()(grpc_op())); |
||||
using Impl = OpHandlerImpl<SetupResult, op_type>; |
||||
if (const grpc_op* op = this->op(op_type)) { |
||||
auto r = setup(*op); |
||||
return Impl(std::move(r)); |
||||
} else { |
||||
return Impl(); |
||||
} |
||||
} |
||||
|
||||
const grpc_op* op(grpc_op_type op_type) const { |
||||
return idxs_[op_type] == 255 ? nullptr : &ops_[idxs_[op_type]]; |
||||
} |
||||
|
||||
private: |
||||
const grpc_op* const ops_; |
||||
std::array<uint8_t, 8> idxs_{255, 255, 255, 255, 255, 255, 255, 255}; |
||||
}; |
||||
|
||||
// Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits
|
||||
// for the callback supplied to grpc_cq_end_op() to be called, before resolving
|
||||
// to Empty{}
|
||||
class WaitForCqEndOp { |
||||
public: |
||||
WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error, |
||||
grpc_completion_queue* cq) |
||||
: state_{NotStarted{is_closure, tag, std::move(error), cq}} {} |
||||
|
||||
Poll<Empty> operator()(); |
||||
|
||||
WaitForCqEndOp(const WaitForCqEndOp&) = delete; |
||||
WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete; |
||||
WaitForCqEndOp(WaitForCqEndOp&& other) noexcept |
||||
: state_(std::move(absl::get<NotStarted>(other.state_))) { |
||||
other.state_.emplace<Invalid>(); |
||||
} |
||||
WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept { |
||||
state_ = std::move(absl::get<NotStarted>(other.state_)); |
||||
other.state_.emplace<Invalid>(); |
||||
return *this; |
||||
} |
||||
|
||||
private: |
||||
struct NotStarted { |
||||
bool is_closure; |
||||
void* tag; |
||||
grpc_error_handle error; |
||||
grpc_completion_queue* cq; |
||||
}; |
||||
struct Started { |
||||
explicit Started(Waker waker) : waker(std::move(waker)) {} |
||||
Waker waker; |
||||
grpc_cq_completion completion; |
||||
std::atomic<bool> done{false}; |
||||
}; |
||||
struct Invalid {}; |
||||
using State = absl::variant<NotStarted, Started, Invalid>; |
||||
|
||||
static std::string StateString(const State& state); |
||||
|
||||
State state_{Invalid{}}; |
||||
}; |
||||
|
||||
template <typename FalliblePart, typename FinalPart> |
||||
auto InfallibleBatch(FalliblePart fallible_part, FinalPart final_part, |
||||
bool is_notify_tag_closure, void* notify_tag, |
||||
grpc_completion_queue* cq) { |
||||
// Perform fallible_part, then final_part, then wait for the
|
||||
// completion queue to be done.
|
||||
// If cancelled, we'll ensure the completion queue is notified.
|
||||
// There's a slight bug here in that if we cancel this promise after
|
||||
// the WaitForCqEndOp we'll double post -- but we don't currently do that.
|
||||
return OnCancelFactory( |
||||
[fallible_part = std::move(fallible_part), |
||||
final_part = std::move(final_part), is_notify_tag_closure, notify_tag, |
||||
cq]() mutable { |
||||
return LogPollBatch(notify_tag, |
||||
Seq(std::move(fallible_part), std::move(final_part), |
||||
[is_notify_tag_closure, notify_tag, cq]() { |
||||
return WaitForCqEndOp(is_notify_tag_closure, |
||||
notify_tag, |
||||
absl::OkStatus(), cq); |
||||
})); |
||||
}, |
||||
[cq, notify_tag]() { |
||||
grpc_cq_end_op( |
||||
cq, notify_tag, absl::OkStatus(), |
||||
[](void*, grpc_cq_completion* completion) { delete completion; }, |
||||
nullptr, new grpc_cq_completion); |
||||
}); |
||||
} |
||||
|
||||
template <typename FalliblePart> |
||||
auto FallibleBatch(FalliblePart fallible_part, bool is_notify_tag_closure, |
||||
void* notify_tag, grpc_completion_queue* cq) { |
||||
// Perform fallible_part, then wait for the completion queue to be done.
|
||||
// If cancelled, we'll ensure the completion queue is notified.
|
||||
// There's a slight bug here in that if we cancel this promise after
|
||||
// the WaitForCqEndOp we'll double post -- but we don't currently do that.
|
||||
return OnCancelFactory( |
||||
[fallible_part = std::move(fallible_part), is_notify_tag_closure, |
||||
notify_tag, cq]() mutable { |
||||
return LogPollBatch( |
||||
notify_tag, |
||||
Seq(std::move(fallible_part), |
||||
[is_notify_tag_closure, notify_tag, cq](StatusFlag r) { |
||||
return WaitForCqEndOp(is_notify_tag_closure, notify_tag, |
||||
StatusCast<absl::Status>(r), cq); |
||||
})); |
||||
}, |
||||
[cq]() { |
||||
grpc_cq_end_op( |
||||
cq, nullptr, absl::CancelledError(), |
||||
[](void*, grpc_cq_completion* completion) { delete completion; }, |
||||
nullptr, new grpc_cq_completion); |
||||
}); |
||||
} |
||||
|
||||
template <typename F> |
||||
class PollBatchLogger { |
||||
public: |
||||
PollBatchLogger(void* tag, F f) : tag_(tag), f_(std::move(f)) {} |
||||
|
||||
auto operator()() { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, "Poll batch %p", tag_); |
||||
} |
||||
auto r = f_(); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, "Poll batch %p --> %s", tag_, ResultString(r).c_str()); |
||||
} |
||||
return r; |
||||
} |
||||
|
||||
private: |
||||
template <typename T> |
||||
static std::string ResultString(Poll<T> r) { |
||||
if (r.pending()) return "PENDING"; |
||||
return ResultString(r.value()); |
||||
} |
||||
static std::string ResultString(Empty) { return "DONE"; } |
||||
|
||||
void* tag_; |
||||
F f_; |
||||
}; |
||||
|
||||
template <typename F> |
||||
PollBatchLogger<F> LogPollBatch(void* tag, F f) { |
||||
return PollBatchLogger<F>(tag, std::move(f)); |
||||
} |
||||
|
||||
class MessageReceiver { |
||||
public: |
||||
grpc_compression_algorithm incoming_compression_algorithm() const { |
||||
return incoming_compression_algorithm_; |
||||
} |
||||
|
||||
void SetIncomingCompressionAlgorithm( |
||||
grpc_compression_algorithm incoming_compression_algorithm) { |
||||
incoming_compression_algorithm_ = incoming_compression_algorithm; |
||||
} |
||||
|
||||
uint32_t last_message_flags() const { return test_only_last_message_flags_; } |
||||
|
||||
template <typename Puller> |
||||
auto MakeBatchOp(const grpc_op& op, Puller* puller) { |
||||
CHECK_EQ(recv_message_, nullptr); |
||||
recv_message_ = op.data.recv_message.recv_message; |
||||
return [this, puller]() mutable { |
||||
return Map(puller->PullMessage(), |
||||
[this](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
return FinishRecvMessage(std::move(msg)); |
||||
}); |
||||
}; |
||||
} |
||||
|
||||
private: |
||||
StatusFlag FinishRecvMessage( |
||||
ValueOrFailure<absl::optional<MessageHandle>> result); |
||||
|
||||
grpc_byte_buffer** recv_message_ = nullptr; |
||||
uint32_t test_only_last_message_flags_ = 0; |
||||
// Compression algorithm for incoming data
|
||||
grpc_compression_algorithm incoming_compression_algorithm_ = |
||||
GRPC_COMPRESS_NONE; |
||||
}; |
||||
|
||||
std::string MakeErrorString(const ServerMetadata* trailing_metadata); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_SURFACE_CALL_UTILS_H
|
@ -0,0 +1,423 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/surface/client_call.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/gprpp/bitset.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/promise/all_ok.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/telemetry/stats.h" |
||||
#include "src/core/telemetry/stats_data.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
grpc_call_error ValidateClientBatch(const grpc_op* ops, size_t nops) { |
||||
BitSet<8> got_ops; |
||||
for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
||||
const grpc_op& op = ops[op_idx]; |
||||
switch (op.op) { |
||||
case GRPC_OP_SEND_INITIAL_METADATA: |
||||
if (!AreInitialMetadataFlagsValid(op.flags)) { |
||||
return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
} |
||||
if (!ValidateMetadata(op.data.send_initial_metadata.count, |
||||
op.data.send_initial_metadata.metadata)) { |
||||
return GRPC_CALL_ERROR_INVALID_METADATA; |
||||
} |
||||
break; |
||||
case GRPC_OP_SEND_MESSAGE: |
||||
if (!AreWriteFlagsValid(op.flags)) { |
||||
return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
} |
||||
break; |
||||
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
||||
case GRPC_OP_RECV_INITIAL_METADATA: |
||||
case GRPC_OP_RECV_MESSAGE: |
||||
case GRPC_OP_RECV_STATUS_ON_CLIENT: |
||||
if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
break; |
||||
case GRPC_OP_RECV_CLOSE_ON_SERVER: |
||||
case GRPC_OP_SEND_STATUS_FROM_SERVER: |
||||
return GRPC_CALL_ERROR_NOT_ON_CLIENT; |
||||
} |
||||
if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
||||
got_ops.set(op.op); |
||||
} |
||||
return GRPC_CALL_OK; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
ClientCall::ClientCall( |
||||
grpc_call*, uint32_t, grpc_completion_queue* cq, Slice path, |
||||
absl::optional<Slice> authority, bool registered_method, Timestamp deadline, |
||||
grpc_compression_options compression_options, |
||||
grpc_event_engine::experimental::EventEngine* event_engine, |
||||
RefCountedPtr<Arena> arena, |
||||
RefCountedPtr<UnstartedCallDestination> destination) |
||||
: Call(false, deadline, std::move(arena), event_engine), |
||||
cq_(cq), |
||||
call_destination_(std::move(destination)), |
||||
compression_options_(compression_options) { |
||||
global_stats().IncrementClientCallsCreated(); |
||||
send_initial_metadata_->Set(HttpPathMetadata(), std::move(path)); |
||||
if (authority.has_value()) { |
||||
send_initial_metadata_->Set(HttpAuthorityMetadata(), std::move(*authority)); |
||||
} |
||||
send_initial_metadata_->Set( |
||||
GrpcRegisteredMethod(), |
||||
reinterpret_cast<void*>(static_cast<uintptr_t>(registered_method))); |
||||
if (deadline != Timestamp::InfFuture()) { |
||||
send_initial_metadata_->Set(GrpcTimeoutMetadata(), deadline); |
||||
UpdateDeadline(deadline); |
||||
} |
||||
} |
||||
|
||||
grpc_call_error ClientCall::StartBatch(const grpc_op* ops, size_t nops, |
||||
void* notify_tag, |
||||
bool is_notify_tag_closure) { |
||||
if (nops == 0) { |
||||
EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); |
||||
return GRPC_CALL_OK; |
||||
} |
||||
const grpc_call_error validation_result = ValidateClientBatch(ops, nops); |
||||
if (validation_result != GRPC_CALL_OK) { |
||||
return validation_result; |
||||
} |
||||
CommitBatch(ops, nops, notify_tag, is_notify_tag_closure); |
||||
return GRPC_CALL_OK; |
||||
} |
||||
|
||||
void ClientCall::CancelWithError(grpc_error_handle error) { |
||||
cancel_status_.Set(new absl::Status(error)); |
||||
auto cur_state = call_state_.load(std::memory_order_acquire); |
||||
while (true) { |
||||
if (grpc_call_trace.enabled()) { |
||||
LOG(INFO) << DebugTag() << "CancelWithError " |
||||
<< GRPC_DUMP_ARGS(cur_state, error); |
||||
} |
||||
switch (cur_state) { |
||||
case kCancelled: |
||||
return; |
||||
case kUnstarted: |
||||
if (call_state_.compare_exchange_strong(cur_state, kCancelled, |
||||
std::memory_order_acq_rel, |
||||
std::memory_order_acquire)) { |
||||
return; |
||||
} |
||||
break; |
||||
case kStarted: |
||||
started_call_initiator_.SpawnInfallible( |
||||
"CancelWithError", [self = WeakRefAsSubclass<ClientCall>(), |
||||
error = std::move(error)]() mutable { |
||||
self->started_call_initiator_.Cancel(std::move(error)); |
||||
return Empty{}; |
||||
}); |
||||
return; |
||||
default: |
||||
if (call_state_.compare_exchange_strong(cur_state, kCancelled, |
||||
std::memory_order_acq_rel, |
||||
std::memory_order_acquire)) { |
||||
auto* unordered_start = reinterpret_cast<UnorderedStart*>(cur_state); |
||||
while (unordered_start != nullptr) { |
||||
auto next = unordered_start->next; |
||||
delete unordered_start; |
||||
unordered_start = next; |
||||
} |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
template <typename Batch> |
||||
void ClientCall::ScheduleCommittedBatch(Batch batch) { |
||||
auto cur_state = call_state_.load(std::memory_order_acquire); |
||||
while (true) { |
||||
switch (cur_state) { |
||||
case kUnstarted: |
||||
default: { // UnorderedStart
|
||||
auto pending = std::make_unique<UnorderedStart>(); |
||||
pending->start_pending_batch = [this, |
||||
batch = std::move(batch)]() mutable { |
||||
started_call_initiator_.SpawnInfallible("batch", std::move(batch)); |
||||
}; |
||||
while (true) { |
||||
pending->next = reinterpret_cast<UnorderedStart*>(cur_state); |
||||
if (call_state_.compare_exchange_strong( |
||||
cur_state, reinterpret_cast<uintptr_t>(pending.get()), |
||||
std::memory_order_acq_rel, std::memory_order_acquire)) { |
||||
std::ignore = pending.release(); |
||||
return; |
||||
} |
||||
if (cur_state == kStarted) { |
||||
pending->start_pending_batch(); |
||||
return; |
||||
} |
||||
if (cur_state == kCancelled) { |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
case kStarted: |
||||
started_call_initiator_.SpawnInfallible("batch", std::move(batch)); |
||||
return; |
||||
case kCancelled: |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
|
||||
void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) { |
||||
auto cur_state = call_state_.load(std::memory_order_acquire); |
||||
CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata, |
||||
send_initial_metadata_op.data.send_initial_metadata.count, |
||||
send_initial_metadata_.get()); |
||||
PrepareOutgoingInitialMetadata(send_initial_metadata_op, |
||||
*send_initial_metadata_); |
||||
auto call = MakeCallPair(std::move(send_initial_metadata_), event_engine(), |
||||
arena()->Ref()); |
||||
started_call_initiator_ = std::move(call.initiator); |
||||
call_destination_->StartCall(std::move(call.handler)); |
||||
while (true) { |
||||
switch (cur_state) { |
||||
case kUnstarted: |
||||
if (call_state_.compare_exchange_strong(cur_state, kStarted, |
||||
std::memory_order_acq_rel, |
||||
std::memory_order_acquire)) { |
||||
return; |
||||
} |
||||
break; |
||||
case kStarted: |
||||
Crash("StartCall called twice"); // probably we crash earlier...
|
||||
case kCancelled: |
||||
return; |
||||
default: { // UnorderedStart
|
||||
if (call_state_.compare_exchange_strong(cur_state, kStarted, |
||||
std::memory_order_acq_rel, |
||||
std::memory_order_acquire)) { |
||||
auto unordered_start = reinterpret_cast<UnorderedStart*>(cur_state); |
||||
while (unordered_start->next != nullptr) { |
||||
unordered_start->start_pending_batch(); |
||||
auto next = unordered_start->next; |
||||
delete unordered_start; |
||||
unordered_start = next; |
||||
} |
||||
return; |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure) { |
||||
if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) { |
||||
StartCall(ops[0]); |
||||
EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); |
||||
return; |
||||
} |
||||
if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag); |
||||
BatchOpIndex op_index(ops, nops); |
||||
auto send_message = |
||||
op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) { |
||||
SliceBuffer send; |
||||
grpc_slice_buffer_swap( |
||||
&op.data.send_message.send_message->data.raw.slice_buffer, |
||||
send.c_slice_buffer()); |
||||
auto msg = arena()->MakePooled<Message>(std::move(send), op.flags); |
||||
return [this, msg = std::move(msg)]() mutable { |
||||
return started_call_initiator_.PushMessage(std::move(msg)); |
||||
}; |
||||
}); |
||||
auto send_close_from_client = |
||||
op_index.OpHandler<GRPC_OP_SEND_CLOSE_FROM_CLIENT>( |
||||
[this](const grpc_op&) { |
||||
return [this]() { |
||||
started_call_initiator_.FinishSends(); |
||||
return Success{}; |
||||
}; |
||||
}); |
||||
auto recv_message = |
||||
op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) { |
||||
return message_receiver_.MakeBatchOp(op, &started_call_initiator_); |
||||
}); |
||||
auto recv_initial_metadata = |
||||
op_index.OpHandler<GRPC_OP_RECV_INITIAL_METADATA>([this]( |
||||
const grpc_op& op) { |
||||
return [this, |
||||
array = op.data.recv_initial_metadata.recv_initial_metadata]() { |
||||
return Map( |
||||
started_call_initiator_.PullServerInitialMetadata(), |
||||
[this, |
||||
array](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) { |
||||
ServerMetadataHandle metadata; |
||||
if (!md.ok() || !md->has_value()) { |
||||
is_trailers_only_ = true; |
||||
metadata = Arena::MakePooled<ServerMetadata>(); |
||||
} else { |
||||
metadata = std::move(md->value()); |
||||
is_trailers_only_ = |
||||
metadata->get(GrpcTrailersOnly()).value_or(false); |
||||
} |
||||
ProcessIncomingInitialMetadata(*metadata); |
||||
PublishMetadataArray(metadata.get(), array, true); |
||||
received_initial_metadata_ = std::move(metadata); |
||||
return Success{}; |
||||
}); |
||||
}; |
||||
}); |
||||
auto primary_ops = AllOk<StatusFlag>( |
||||
TrySeq(std::move(send_message), std::move(send_close_from_client)), |
||||
TrySeq(std::move(recv_initial_metadata), std::move(recv_message))); |
||||
if (const grpc_op* op = op_index.op(GRPC_OP_SEND_INITIAL_METADATA)) { |
||||
StartCall(*op); |
||||
} |
||||
if (const grpc_op* op = op_index.op(GRPC_OP_RECV_STATUS_ON_CLIENT)) { |
||||
auto out_status = op->data.recv_status_on_client.status; |
||||
auto out_status_details = op->data.recv_status_on_client.status_details; |
||||
auto out_error_string = op->data.recv_status_on_client.error_string; |
||||
auto out_trailing_metadata = |
||||
op->data.recv_status_on_client.trailing_metadata; |
||||
auto make_read_trailing_metadata = [this, out_status, out_status_details, |
||||
out_error_string, |
||||
out_trailing_metadata]() { |
||||
return Map( |
||||
started_call_initiator_.PullServerTrailingMetadata(), |
||||
[this, out_status, out_status_details, out_error_string, |
||||
out_trailing_metadata]( |
||||
ServerMetadataHandle server_trailing_metadata) { |
||||
if (grpc_call_trace.enabled()) { |
||||
LOG(INFO) << DebugTag() << "RecvStatusOnClient " |
||||
<< server_trailing_metadata->DebugString(); |
||||
} |
||||
const auto status = |
||||
server_trailing_metadata->get(GrpcStatusMetadata()) |
||||
.value_or(GRPC_STATUS_UNKNOWN); |
||||
*out_status = status; |
||||
Slice message_slice; |
||||
if (Slice* message = server_trailing_metadata->get_pointer( |
||||
GrpcMessageMetadata())) { |
||||
message_slice = message->Ref(); |
||||
} |
||||
*out_status_details = message_slice.TakeCSlice(); |
||||
if (out_error_string != nullptr) { |
||||
if (status != GRPC_STATUS_OK) { |
||||
*out_error_string = gpr_strdup( |
||||
MakeErrorString(server_trailing_metadata.get()).c_str()); |
||||
} else { |
||||
*out_error_string = nullptr; |
||||
} |
||||
} |
||||
PublishMetadataArray(server_trailing_metadata.get(), |
||||
out_trailing_metadata, true); |
||||
received_trailing_metadata_ = std::move(server_trailing_metadata); |
||||
return Success{}; |
||||
}); |
||||
}; |
||||
ScheduleCommittedBatch(InfallibleBatch( |
||||
std::move(primary_ops), |
||||
OpHandler<GRPC_OP_RECV_STATUS_ON_CLIENT>(OnCancelFactory( |
||||
std::move(make_read_trailing_metadata), |
||||
[this, out_status, out_status_details, out_error_string, |
||||
out_trailing_metadata]() { |
||||
auto* status = cancel_status_.Get(); |
||||
CHECK_NE(status, nullptr); |
||||
*out_status = static_cast<grpc_status_code>(status->code()); |
||||
*out_status_details = |
||||
Slice::FromCopiedString(status->message()).TakeCSlice(); |
||||
if (out_error_string != nullptr) { |
||||
*out_error_string = nullptr; |
||||
} |
||||
out_trailing_metadata->count = 0; |
||||
})), |
||||
is_notify_tag_closure, notify_tag, cq_)); |
||||
} else { |
||||
ScheduleCommittedBatch(FallibleBatch( |
||||
std::move(primary_ops), is_notify_tag_closure, notify_tag, cq_)); |
||||
} |
||||
} |
||||
|
||||
char* ClientCall::GetPeer() { |
||||
Slice peer_slice = GetPeerString(); |
||||
if (!peer_slice.empty()) { |
||||
absl::string_view peer_string_view = peer_slice.as_string_view(); |
||||
char* peer_string = |
||||
static_cast<char*>(gpr_malloc(peer_string_view.size() + 1)); |
||||
memcpy(peer_string, peer_string_view.data(), peer_string_view.size()); |
||||
peer_string[peer_string_view.size()] = '\0'; |
||||
return peer_string; |
||||
} |
||||
return gpr_strdup("unknown"); |
||||
} |
||||
|
||||
grpc_call* MakeClientCall( |
||||
grpc_call* parent_call, uint32_t propagation_mask, |
||||
grpc_completion_queue* cq, Slice path, absl::optional<Slice> authority, |
||||
bool registered_method, Timestamp deadline, |
||||
grpc_compression_options compression_options, |
||||
grpc_event_engine::experimental::EventEngine* event_engine, |
||||
RefCountedPtr<Arena> arena, |
||||
RefCountedPtr<UnstartedCallDestination> destination) { |
||||
return arena |
||||
->New<ClientCall>(parent_call, propagation_mask, cq, std::move(path), |
||||
std::move(authority), registered_method, deadline, |
||||
compression_options, event_engine, arena, destination) |
||||
->c_ptr(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,180 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H |
||||
#define GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <string> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/single_set_ptr.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/call_utils.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ClientCall final |
||||
: public Call, |
||||
public DualRefCounted<ClientCall, NonPolymorphicRefCount, |
||||
UnrefCallDestroy> { |
||||
public: |
||||
ClientCall(grpc_call* parent_call, uint32_t propagation_mask, |
||||
grpc_completion_queue* cq, Slice path, |
||||
absl::optional<Slice> authority, bool registered_method, |
||||
Timestamp deadline, grpc_compression_options compression_options, |
||||
grpc_event_engine::experimental::EventEngine* event_engine, |
||||
RefCountedPtr<Arena> arena, |
||||
RefCountedPtr<UnstartedCallDestination> destination); |
||||
|
||||
void CancelWithError(grpc_error_handle error) override; |
||||
bool is_trailers_only() const override { return is_trailers_only_; } |
||||
absl::string_view GetServerAuthority() const override { |
||||
Crash("unimplemented"); |
||||
} |
||||
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure) override; |
||||
|
||||
void ExternalRef() override { Ref().release(); } |
||||
void ExternalUnref() override { Unref(); } |
||||
void InternalRef(const char*) override { WeakRef().release(); } |
||||
void InternalUnref(const char*) override { WeakUnref(); } |
||||
|
||||
void Orphaned() override { |
||||
// TODO(ctiller): only when we're not already finished
|
||||
CancelWithError(absl::CancelledError()); |
||||
} |
||||
|
||||
void SetCompletionQueue(grpc_completion_queue*) override { |
||||
Crash("unimplemented"); |
||||
} |
||||
|
||||
grpc_compression_options compression_options() override { |
||||
return compression_options_; |
||||
} |
||||
|
||||
grpc_call_stack* call_stack() override { return nullptr; } |
||||
|
||||
char* GetPeer() override; |
||||
|
||||
bool Completed() final { Crash("unimplemented"); } |
||||
bool failed_before_recv_message() const final { Crash("unimplemented"); } |
||||
|
||||
grpc_compression_algorithm incoming_compression_algorithm() override { |
||||
return message_receiver_.incoming_compression_algorithm(); |
||||
} |
||||
|
||||
void SetIncomingCompressionAlgorithm( |
||||
grpc_compression_algorithm algorithm) override { |
||||
message_receiver_.SetIncomingCompressionAlgorithm(algorithm); |
||||
} |
||||
|
||||
uint32_t test_only_message_flags() override { |
||||
return message_receiver_.last_message_flags(); |
||||
} |
||||
|
||||
void Destroy() { |
||||
auto arena = this->arena()->Ref(); |
||||
this->~ClientCall(); |
||||
} |
||||
|
||||
private: |
||||
struct UnorderedStart { |
||||
absl::AnyInvocable<void()> start_pending_batch; |
||||
UnorderedStart* next; |
||||
}; |
||||
|
||||
void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure); |
||||
template <typename Batch> |
||||
void ScheduleCommittedBatch(Batch batch); |
||||
void StartCall(const grpc_op& send_initial_metadata_op); |
||||
|
||||
std::string DebugTag() { return absl::StrFormat("CLIENT_CALL[%p]: ", this); } |
||||
|
||||
// call_state_ is one of:
|
||||
// 1. kUnstarted - call has not yet been started
|
||||
// 2. pointer to an UnorderedStart - call has ops started, but no send initial
|
||||
// metadata yet
|
||||
// 3. kStarted - call has been started and call_initiator_ is ready
|
||||
// 4. kCancelled - call was cancelled before starting
|
||||
// In cases (1) and (2) send_initial_metadata_ is used to store the initial
|
||||
// but unsent metadata.
|
||||
// In case (3) started_call_initiator_ is used to store the call initiator.
|
||||
// In case (4) no other state is used.
|
||||
enum CallState : uintptr_t { |
||||
kUnstarted = 0, |
||||
kStarted = 1, |
||||
kCancelled = 2, |
||||
}; |
||||
std::atomic<uintptr_t> call_state_{kUnstarted}; |
||||
ClientMetadataHandle send_initial_metadata_{ |
||||
Arena::MakePooled<ClientMetadata>()}; |
||||
CallInitiator started_call_initiator_; |
||||
// Status passed to CancelWithError;
|
||||
// if call_state_ == kCancelled then this is the authoritative status,
|
||||
// otherwise the server trailing metadata from started_call_initiator_ is
|
||||
// authoritative.
|
||||
SingleSetPtr<absl::Status> cancel_status_; |
||||
MessageReceiver message_receiver_; |
||||
grpc_completion_queue* const cq_; |
||||
const RefCountedPtr<UnstartedCallDestination> call_destination_; |
||||
const grpc_compression_options compression_options_; |
||||
ServerMetadataHandle received_initial_metadata_; |
||||
ServerMetadataHandle received_trailing_metadata_; |
||||
bool is_trailers_only_; |
||||
}; |
||||
|
||||
grpc_call* MakeClientCall( |
||||
grpc_call* parent_call, uint32_t propagation_mask, |
||||
grpc_completion_queue* cq, Slice path, absl::optional<Slice> authority, |
||||
bool registered_method, Timestamp deadline, |
||||
grpc_compression_options compression_options, |
||||
grpc_event_engine::experimental::EventEngine* event_engine, |
||||
RefCountedPtr<Arena> arena, |
||||
RefCountedPtr<UnstartedCallDestination> destination); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,372 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H |
||||
#define GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_join.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/call_combiner.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "src/core/server/server_interface.h" |
||||
#include "src/core/telemetry/call_tracer.h" |
||||
#include "src/core/util/alloc.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// FilterStackCall
|
||||
// To be removed once promise conversion is complete
|
||||
|
||||
class FilterStackCall final : public Call { |
||||
public: |
||||
~FilterStackCall() override { |
||||
gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string))); |
||||
} |
||||
|
||||
bool Completed() override { |
||||
return gpr_atm_acq_load(&received_final_op_atm_) != 0; |
||||
} |
||||
|
||||
// TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>?
|
||||
static grpc_error_handle Create(grpc_call_create_args* args, |
||||
grpc_call** out_call); |
||||
|
||||
static Call* FromTopElem(grpc_call_element* elem) { |
||||
return FromCallStack(grpc_call_stack_from_top_element(elem)); |
||||
} |
||||
|
||||
grpc_call_stack* call_stack() override { |
||||
return reinterpret_cast<grpc_call_stack*>( |
||||
reinterpret_cast<char*>(this) + |
||||
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this))); |
||||
} |
||||
|
||||
grpc_call_element* call_elem(size_t idx) { |
||||
return grpc_call_stack_element(call_stack(), idx); |
||||
} |
||||
|
||||
CallCombiner* call_combiner() { return &call_combiner_; } |
||||
|
||||
void CancelWithError(grpc_error_handle error) override; |
||||
void SetCompletionQueue(grpc_completion_queue* cq) override; |
||||
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure) override; |
||||
void ExternalRef() override { ext_ref_.Ref(); } |
||||
void ExternalUnref() override; |
||||
void InternalRef(const char* reason) override { |
||||
GRPC_CALL_STACK_REF(call_stack(), reason); |
||||
} |
||||
void InternalUnref(const char* reason) override { |
||||
GRPC_CALL_STACK_UNREF(call_stack(), reason); |
||||
} |
||||
|
||||
bool is_trailers_only() const override { |
||||
bool result = is_trailers_only_; |
||||
DCHECK(!result || recv_initial_metadata_.TransportSize() == 0); |
||||
return result; |
||||
} |
||||
|
||||
bool failed_before_recv_message() const override { |
||||
return call_failed_before_recv_message_; |
||||
} |
||||
|
||||
uint32_t test_only_message_flags() override { |
||||
return test_only_last_message_flags_; |
||||
} |
||||
|
||||
absl::string_view GetServerAuthority() const override { |
||||
const Slice* authority_metadata = |
||||
recv_initial_metadata_.get_pointer(HttpAuthorityMetadata()); |
||||
if (authority_metadata == nullptr) return ""; |
||||
return authority_metadata->as_string_view(); |
||||
} |
||||
|
||||
static size_t InitialSizeEstimate() { |
||||
return sizeof(FilterStackCall) + |
||||
sizeof(BatchControl) * kMaxConcurrentBatches; |
||||
} |
||||
|
||||
char* GetPeer() final; |
||||
|
||||
grpc_compression_options compression_options() override { |
||||
return channel_->compression_options(); |
||||
} |
||||
|
||||
void DeleteThis() { |
||||
auto arena = this->arena()->Ref(); |
||||
this->~FilterStackCall(); |
||||
} |
||||
|
||||
Channel* channel() const { return channel_.get(); } |
||||
|
||||
private: |
||||
class ScopedContext : public promise_detail::Context<Arena> { |
||||
public: |
||||
explicit ScopedContext(FilterStackCall* call) |
||||
: promise_detail::Context<Arena>(call->arena()) {} |
||||
}; |
||||
|
||||
static constexpr gpr_atm kRecvNone = 0; |
||||
static constexpr gpr_atm kRecvInitialMetadataFirst = 1; |
||||
|
||||
enum class PendingOp { |
||||
kRecvMessage, |
||||
kRecvInitialMetadata, |
||||
kRecvTrailingMetadata, |
||||
kSends |
||||
}; |
||||
static intptr_t PendingOpMask(PendingOp op) { |
||||
return static_cast<intptr_t>(1) << static_cast<intptr_t>(op); |
||||
} |
||||
static std::string PendingOpString(intptr_t pending_ops) { |
||||
std::vector<absl::string_view> pending_op_strings; |
||||
if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) { |
||||
pending_op_strings.push_back("kRecvMessage"); |
||||
} |
||||
if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) { |
||||
pending_op_strings.push_back("kRecvInitialMetadata"); |
||||
} |
||||
if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) { |
||||
pending_op_strings.push_back("kRecvTrailingMetadata"); |
||||
} |
||||
if (pending_ops & PendingOpMask(PendingOp::kSends)) { |
||||
pending_op_strings.push_back("kSends"); |
||||
} |
||||
return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}"); |
||||
} |
||||
struct BatchControl { |
||||
FilterStackCall* call_ = nullptr; |
||||
CallTracerAnnotationInterface* call_tracer_ = nullptr; |
||||
grpc_transport_stream_op_batch op_; |
||||
// Share memory for cq_completion and notify_tag as they are never needed
|
||||
// simultaneously. Each byte used in this data structure count as six bytes
|
||||
// per call, so any savings we can make are worthwhile,
|
||||
|
||||
// We use notify_tag to determine whether or not to send notification to the
|
||||
// completion queue. Once we've made that determination, we can reuse the
|
||||
// memory for cq_completion.
|
||||
union { |
||||
grpc_cq_completion cq_completion; |
||||
struct { |
||||
// Any given op indicates completion by either (a) calling a closure or
|
||||
// (b) sending a notification on the call's completion queue. If
|
||||
// \a is_closure is true, \a tag indicates a closure to be invoked;
|
||||
// otherwise, \a tag indicates the tag to be used in the notification to
|
||||
// be sent to the completion queue.
|
||||
void* tag; |
||||
bool is_closure; |
||||
} notify_tag; |
||||
} completion_data_; |
||||
grpc_closure start_batch_; |
||||
grpc_closure finish_batch_; |
||||
std::atomic<intptr_t> ops_pending_{0}; |
||||
AtomicError batch_error_; |
||||
void set_pending_ops(uintptr_t ops) { |
||||
ops_pending_.store(ops, std::memory_order_release); |
||||
} |
||||
bool completed_batch_step(PendingOp op) { |
||||
auto mask = PendingOpMask(op); |
||||
auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this, |
||||
PendingOpString(mask).c_str(), |
||||
PendingOpString(r & ~mask).c_str(), |
||||
completion_data_.notify_tag.tag); |
||||
} |
||||
CHECK_NE((r & mask), 0); |
||||
return r == mask; |
||||
} |
||||
|
||||
void PostCompletion(); |
||||
void FinishStep(PendingOp op); |
||||
void ProcessDataAfterMetadata(); |
||||
void ReceivingStreamReady(grpc_error_handle error); |
||||
void ReceivingInitialMetadataReady(grpc_error_handle error); |
||||
void ReceivingTrailingMetadataReady(grpc_error_handle error); |
||||
void FinishBatch(grpc_error_handle error); |
||||
}; |
||||
|
||||
FilterStackCall(RefCountedPtr<Arena> arena, |
||||
const grpc_call_create_args& args); |
||||
|
||||
static void ReleaseCall(void* call, grpc_error_handle); |
||||
static void DestroyCall(void* call, grpc_error_handle); |
||||
|
||||
static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) { |
||||
return reinterpret_cast<FilterStackCall*>( |
||||
reinterpret_cast<char*>(call_stack) - |
||||
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall))); |
||||
} |
||||
|
||||
void ExecuteBatch(grpc_transport_stream_op_batch* batch, |
||||
grpc_closure* start_batch_closure); |
||||
void SetFinalStatus(grpc_error_handle error); |
||||
BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops); |
||||
bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata, |
||||
bool is_trailing); |
||||
void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing); |
||||
void RecvInitialFilter(grpc_metadata_batch* b); |
||||
void RecvTrailingFilter(grpc_metadata_batch* b, |
||||
grpc_error_handle batch_error); |
||||
|
||||
grpc_compression_algorithm incoming_compression_algorithm() override { |
||||
return incoming_compression_algorithm_; |
||||
} |
||||
void SetIncomingCompressionAlgorithm( |
||||
grpc_compression_algorithm algorithm) override { |
||||
incoming_compression_algorithm_ = algorithm; |
||||
} |
||||
|
||||
RefCountedPtr<Channel> channel_; |
||||
RefCount ext_ref_; |
||||
CallCombiner call_combiner_; |
||||
grpc_completion_queue* cq_; |
||||
grpc_polling_entity pollent_; |
||||
|
||||
/// has grpc_call_unref been called
|
||||
bool destroy_called_ = false; |
||||
// Trailers-only response status
|
||||
bool is_trailers_only_ = false; |
||||
/// which ops are in-flight
|
||||
bool sent_initial_metadata_ = false; |
||||
bool sending_message_ = false; |
||||
bool sent_final_op_ = false; |
||||
bool received_initial_metadata_ = false; |
||||
bool receiving_message_ = false; |
||||
bool requested_final_op_ = false; |
||||
gpr_atm received_final_op_atm_ = 0; |
||||
|
||||
BatchControl* active_batches_[kMaxConcurrentBatches] = {}; |
||||
grpc_transport_stream_op_batch_payload stream_op_payload_; |
||||
|
||||
// first idx: is_receiving, second idx: is_trailing
|
||||
grpc_metadata_batch send_initial_metadata_; |
||||
grpc_metadata_batch send_trailing_metadata_; |
||||
grpc_metadata_batch recv_initial_metadata_; |
||||
grpc_metadata_batch recv_trailing_metadata_; |
||||
|
||||
// Buffered read metadata waiting to be returned to the application.
|
||||
// Element 0 is initial metadata, element 1 is trailing metadata.
|
||||
grpc_metadata_array* buffered_metadata_[2] = {}; |
||||
|
||||
// Call data useful used for reporting. Only valid after the call has
|
||||
// completed
|
||||
grpc_call_final_info final_info_; |
||||
|
||||
SliceBuffer send_slice_buffer_; |
||||
absl::optional<SliceBuffer> receiving_slice_buffer_; |
||||
uint32_t receiving_stream_flags_; |
||||
uint32_t test_only_last_message_flags_ = 0; |
||||
// Compression algorithm for *incoming* data
|
||||
grpc_compression_algorithm incoming_compression_algorithm_ = |
||||
GRPC_COMPRESS_NONE; |
||||
|
||||
bool call_failed_before_recv_message_ = false; |
||||
grpc_byte_buffer** receiving_buffer_ = nullptr; |
||||
grpc_slice receiving_slice_ = grpc_empty_slice(); |
||||
grpc_closure receiving_stream_ready_; |
||||
grpc_closure receiving_initial_metadata_ready_; |
||||
grpc_closure receiving_trailing_metadata_ready_; |
||||
// Status about operation of call
|
||||
bool sent_server_trailing_metadata_ = false; |
||||
gpr_atm cancelled_with_error_ = 0; |
||||
|
||||
grpc_closure release_call_; |
||||
|
||||
union { |
||||
struct { |
||||
grpc_status_code* status; |
||||
grpc_slice* status_details; |
||||
const char** error_string; |
||||
} client; |
||||
struct { |
||||
int* cancelled; |
||||
// backpointer to owning server if this is a server side call.
|
||||
ServerInterface* core_server; |
||||
} server; |
||||
} final_op_; |
||||
AtomicError status_error_; |
||||
|
||||
// recv_state can contain one of the following values:
|
||||
// RECV_NONE : : no initial metadata and messages received
|
||||
// RECV_INITIAL_METADATA_FIRST : received initial metadata first
|
||||
// a batch_control* : received messages first
|
||||
|
||||
// +------1------RECV_NONE------3-----+
|
||||
// | |
|
||||
// | |
|
||||
// v v
|
||||
// RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
|
||||
// | ^ | ^
|
||||
// | | | |
|
||||
// +-----2-----+ +-----4-----+
|
||||
|
||||
// For 1, 4: See receiving_initial_metadata_ready() function
|
||||
// For 2, 3: See receiving_stream_ready() function
|
||||
gpr_atm recv_state_ = 0; |
||||
}; |
||||
|
||||
// Create a new call based on \a args.
|
||||
// Regardless of success or failure, always returns a valid new call into *call
|
||||
//
|
||||
grpc_error_handle grpc_call_create(grpc_call_create_args* args, |
||||
grpc_call** call); |
||||
|
||||
// Given the top call_element, get the call object.
|
||||
grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H
|
@ -0,0 +1,224 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/surface/server_call.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/gprpp/bitset.h" |
||||
#include "src/core/lib/promise/all_ok.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/status_flag.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/server/server_interface.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) { |
||||
BitSet<8> got_ops; |
||||
for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
||||
const grpc_op& op = ops[op_idx]; |
||||
switch (op.op) { |
||||
case GRPC_OP_SEND_INITIAL_METADATA: |
||||
if (!AreInitialMetadataFlagsValid(op.flags)) { |
||||
return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
} |
||||
if (!ValidateMetadata(op.data.send_initial_metadata.count, |
||||
op.data.send_initial_metadata.metadata)) { |
||||
return GRPC_CALL_ERROR_INVALID_METADATA; |
||||
} |
||||
break; |
||||
case GRPC_OP_SEND_MESSAGE: |
||||
if (!AreWriteFlagsValid(op.flags)) { |
||||
return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
} |
||||
break; |
||||
case GRPC_OP_SEND_STATUS_FROM_SERVER: |
||||
if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
if (!ValidateMetadata( |
||||
op.data.send_status_from_server.trailing_metadata_count, |
||||
op.data.send_status_from_server.trailing_metadata)) { |
||||
return GRPC_CALL_ERROR_INVALID_METADATA; |
||||
} |
||||
break; |
||||
case GRPC_OP_RECV_MESSAGE: |
||||
case GRPC_OP_RECV_CLOSE_ON_SERVER: |
||||
if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; |
||||
break; |
||||
case GRPC_OP_RECV_INITIAL_METADATA: |
||||
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
||||
case GRPC_OP_RECV_STATUS_ON_CLIENT: |
||||
return GRPC_CALL_ERROR_NOT_ON_SERVER; |
||||
} |
||||
if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
||||
got_ops.set(op.op); |
||||
} |
||||
return GRPC_CALL_OK; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
grpc_call_error ServerCall::StartBatch(const grpc_op* ops, size_t nops, |
||||
void* notify_tag, |
||||
bool is_notify_tag_closure) { |
||||
if (nops == 0) { |
||||
EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); |
||||
return GRPC_CALL_OK; |
||||
} |
||||
const grpc_call_error validation_result = ValidateServerBatch(ops, nops); |
||||
if (validation_result != GRPC_CALL_OK) { |
||||
return validation_result; |
||||
} |
||||
CommitBatch(ops, nops, notify_tag, is_notify_tag_closure); |
||||
return GRPC_CALL_OK; |
||||
} |
||||
|
||||
void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure) { |
||||
BatchOpIndex op_index(ops, nops); |
||||
if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag); |
||||
auto send_initial_metadata = |
||||
op_index.OpHandler<GRPC_OP_SEND_INITIAL_METADATA>([this]( |
||||
const grpc_op& op) { |
||||
auto metadata = arena()->MakePooled<ServerMetadata>(); |
||||
PrepareOutgoingInitialMetadata(op, *metadata); |
||||
CToMetadata(op.data.send_initial_metadata.metadata, |
||||
op.data.send_initial_metadata.count, metadata.get()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_INFO, "%s[call] Send initial metadata", |
||||
DebugTag().c_str()); |
||||
} |
||||
return [this, metadata = std::move(metadata)]() mutable { |
||||
return call_handler_.PushServerInitialMetadata(std::move(metadata)); |
||||
}; |
||||
}); |
||||
auto send_message = |
||||
op_index.OpHandler<GRPC_OP_SEND_MESSAGE>([this](const grpc_op& op) { |
||||
SliceBuffer send; |
||||
grpc_slice_buffer_swap( |
||||
&op.data.send_message.send_message->data.raw.slice_buffer, |
||||
send.c_slice_buffer()); |
||||
auto msg = arena()->MakePooled<Message>(std::move(send), op.flags); |
||||
return [this, msg = std::move(msg)]() mutable { |
||||
return call_handler_.PushMessage(std::move(msg)); |
||||
}; |
||||
}); |
||||
auto send_trailing_metadata = |
||||
op_index.OpHandler<GRPC_OP_SEND_STATUS_FROM_SERVER>( |
||||
[this](const grpc_op& op) { |
||||
auto metadata = arena()->MakePooled<ServerMetadata>(); |
||||
CToMetadata(op.data.send_status_from_server.trailing_metadata, |
||||
op.data.send_status_from_server.trailing_metadata_count, |
||||
metadata.get()); |
||||
metadata->Set(GrpcStatusMetadata(), |
||||
op.data.send_status_from_server.status); |
||||
if (auto* details = |
||||
op.data.send_status_from_server.status_details) { |
||||
// TODO(ctiller): this should not be a copy, but we have
|
||||
// callers that allocate and pass in a slice created with
|
||||
// grpc_slice_from_static_string and then delete the string
|
||||
// after passing it in, which shouldn't be a supported API.
|
||||
metadata->Set(GrpcMessageMetadata(), |
||||
Slice(grpc_slice_copy(*details))); |
||||
} |
||||
CHECK(metadata != nullptr); |
||||
return [this, metadata = std::move(metadata)]() mutable { |
||||
CHECK(metadata != nullptr); |
||||
return [this, metadata = std::move( |
||||
metadata)]() mutable -> Poll<Success> { |
||||
CHECK(metadata != nullptr); |
||||
call_handler_.PushServerTrailingMetadata(std::move(metadata)); |
||||
return Success{}; |
||||
}; |
||||
}; |
||||
}); |
||||
auto recv_message = |
||||
op_index.OpHandler<GRPC_OP_RECV_MESSAGE>([this](const grpc_op& op) { |
||||
return message_receiver_.MakeBatchOp(op, &call_handler_); |
||||
}); |
||||
auto primary_ops = AllOk<StatusFlag>( |
||||
TrySeq(AllOk<StatusFlag>(std::move(send_initial_metadata), |
||||
std::move(send_message)), |
||||
std::move(send_trailing_metadata)), |
||||
std::move(recv_message)); |
||||
if (auto* op = op_index.op(GRPC_OP_RECV_CLOSE_ON_SERVER)) { |
||||
auto recv_trailing_metadata = OpHandler<GRPC_OP_RECV_CLOSE_ON_SERVER>( |
||||
[this, cancelled = op->data.recv_close_on_server.cancelled]() { |
||||
return Map(call_handler_.WasCancelled(), |
||||
[cancelled, this](bool result) -> Success { |
||||
ResetDeadline(); |
||||
*cancelled = result ? 1 : 0; |
||||
return Success{}; |
||||
}); |
||||
}); |
||||
call_handler_.SpawnInfallible( |
||||
"final-batch", InfallibleBatch(std::move(primary_ops), |
||||
std::move(recv_trailing_metadata), |
||||
is_notify_tag_closure, notify_tag, cq_)); |
||||
} else { |
||||
call_handler_.SpawnInfallible( |
||||
"batch", FallibleBatch(std::move(primary_ops), is_notify_tag_closure, |
||||
notify_tag, cq_)); |
||||
} |
||||
} |
||||
|
||||
grpc_call* MakeServerCall(CallHandler call_handler, |
||||
ClientMetadataHandle client_initial_metadata, |
||||
ServerInterface* server, grpc_completion_queue* cq, |
||||
grpc_metadata_array* publish_initial_metadata) { |
||||
PublishMetadataArray(client_initial_metadata.get(), publish_initial_metadata, |
||||
false); |
||||
// TODO(ctiller): ideally we'd put this in the arena with the CallHandler,
|
||||
// but there's an ownership problem: CallHandler owns the arena, and so would
|
||||
// get destroyed before the base class Call destructor runs, leading to
|
||||
// UB/crash. Investigate another path.
|
||||
return (new ServerCall(std::move(client_initial_metadata), |
||||
std::move(call_handler), server, cq)) |
||||
->c_ptr(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,167 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H |
||||
#define GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/log/check.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/compression.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/call.h> |
||||
#include <grpc/impl/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/call_utils.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/server/server_interface.h" |
||||
#include "src/core/telemetry/stats.h" |
||||
#include "src/core/telemetry/stats_data.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ServerCall final : public Call, public DualRefCounted<ServerCall> { |
||||
public: |
||||
ServerCall(ClientMetadataHandle client_initial_metadata, |
||||
CallHandler call_handler, ServerInterface* server, |
||||
grpc_completion_queue* cq) |
||||
: Call(false, |
||||
client_initial_metadata->get(GrpcTimeoutMetadata()) |
||||
.value_or(Timestamp::InfFuture()), |
||||
call_handler.arena()->Ref(), call_handler.event_engine()), |
||||
call_handler_(std::move(call_handler)), |
||||
client_initial_metadata_stored_(std::move(client_initial_metadata)), |
||||
cq_(cq), |
||||
server_(server) { |
||||
global_stats().IncrementServerCallsCreated(); |
||||
} |
||||
|
||||
void CancelWithError(grpc_error_handle error) override { |
||||
call_handler_.SpawnInfallible( |
||||
"CancelWithError", |
||||
[self = WeakRefAsSubclass<ServerCall>(), error = std::move(error)] { |
||||
auto status = ServerMetadataFromStatus(error); |
||||
status->Set(GrpcCallWasCancelled(), true); |
||||
self->call_handler_.PushServerTrailingMetadata(std::move(status)); |
||||
return Empty{}; |
||||
}); |
||||
} |
||||
bool is_trailers_only() const override { |
||||
Crash("is_trailers_only not implemented for server calls"); |
||||
} |
||||
absl::string_view GetServerAuthority() const override { |
||||
Crash("unimplemented"); |
||||
} |
||||
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure) override; |
||||
|
||||
void ExternalRef() override { Ref().release(); } |
||||
void ExternalUnref() override { Unref(); } |
||||
void InternalRef(const char*) override { WeakRef().release(); } |
||||
void InternalUnref(const char*) override { WeakUnref(); } |
||||
|
||||
void Orphaned() override { |
||||
// TODO(ctiller): only when we're not already finished
|
||||
CancelWithError(absl::CancelledError()); |
||||
} |
||||
|
||||
void SetCompletionQueue(grpc_completion_queue*) override { |
||||
Crash("unimplemented"); |
||||
} |
||||
|
||||
grpc_compression_options compression_options() override { |
||||
return server_->compression_options(); |
||||
} |
||||
|
||||
grpc_call_stack* call_stack() override { return nullptr; } |
||||
|
||||
char* GetPeer() override { |
||||
Slice peer_slice = GetPeerString(); |
||||
if (!peer_slice.empty()) { |
||||
absl::string_view peer_string_view = peer_slice.as_string_view(); |
||||
char* peer_string = |
||||
static_cast<char*>(gpr_malloc(peer_string_view.size() + 1)); |
||||
memcpy(peer_string, peer_string_view.data(), peer_string_view.size()); |
||||
peer_string[peer_string_view.size()] = '\0'; |
||||
return peer_string; |
||||
} |
||||
return gpr_strdup("unknown"); |
||||
} |
||||
|
||||
bool Completed() final { Crash("unimplemented"); } |
||||
bool failed_before_recv_message() const final { Crash("unimplemented"); } |
||||
|
||||
uint32_t test_only_message_flags() override { |
||||
return message_receiver_.last_message_flags(); |
||||
} |
||||
|
||||
grpc_compression_algorithm incoming_compression_algorithm() override { |
||||
return message_receiver_.incoming_compression_algorithm(); |
||||
} |
||||
|
||||
void SetIncomingCompressionAlgorithm( |
||||
grpc_compression_algorithm algorithm) override { |
||||
message_receiver_.SetIncomingCompressionAlgorithm(algorithm); |
||||
} |
||||
|
||||
private: |
||||
void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
||||
bool is_notify_tag_closure); |
||||
|
||||
std::string DebugTag() { return absl::StrFormat("SERVER_CALL[%p]: ", this); } |
||||
|
||||
CallHandler call_handler_; |
||||
MessageReceiver message_receiver_; |
||||
ClientMetadataHandle client_initial_metadata_stored_; |
||||
grpc_completion_queue* const cq_; |
||||
ServerInterface* const server_; |
||||
}; |
||||
|
||||
grpc_call* MakeServerCall(CallHandler call_handler, |
||||
ClientMetadataHandle client_initial_metadata, |
||||
ServerInterface* server, grpc_completion_queue* cq, |
||||
grpc_metadata_array* publish_initial_metadata); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H
|
@ -1,75 +0,0 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/surface/wait_for_cq_end_op.h" |
||||
|
||||
#include <atomic> |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/gprpp/match.h" |
||||
#include "src/core/lib/promise/trace.h" |
||||
|
||||
namespace grpc_core { |
||||
Poll<Empty> WaitForCqEndOp::operator()() { |
||||
if (grpc_trace_promise_primitives.enabled()) { |
||||
gpr_log(GPR_INFO, "%sWaitForCqEndOp[%p] %s", |
||||
Activity::current()->DebugTag().c_str(), this, |
||||
StateString(state_).c_str()); |
||||
} |
||||
if (auto* n = absl::get_if<NotStarted>(&state_)) { |
||||
if (n->is_closure) { |
||||
ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(n->tag), |
||||
std::move(n->error)); |
||||
return Empty{}; |
||||
} else { |
||||
auto not_started = std::move(*n); |
||||
auto& started = |
||||
state_.emplace<Started>(GetContext<Activity>()->MakeOwningWaker()); |
||||
grpc_cq_end_op( |
||||
not_started.cq, not_started.tag, std::move(not_started.error), |
||||
[](void* p, grpc_cq_completion*) { |
||||
auto started = static_cast<Started*>(p); |
||||
auto wakeup = std::move(started->waker); |
||||
started->done.store(true, std::memory_order_release); |
||||
wakeup.Wakeup(); |
||||
}, |
||||
&started, &started.completion); |
||||
} |
||||
} |
||||
auto& started = absl::get<Started>(state_); |
||||
if (started.done.load(std::memory_order_acquire)) { |
||||
return Empty{}; |
||||
} else { |
||||
return Pending{}; |
||||
} |
||||
} |
||||
|
||||
std::string WaitForCqEndOp::StateString(const State& state) { |
||||
return Match( |
||||
state, |
||||
[](const NotStarted& x) { |
||||
return absl::StrFormat( |
||||
"NotStarted{is_closure=%s, tag=%p, error=%s, cq=%p}", |
||||
x.is_closure ? "true" : "false", x.tag, x.error.ToString(), x.cq); |
||||
}, |
||||
[](const Started& x) { |
||||
return absl::StrFormat( |
||||
"Started{completion=%p, done=%s}", &x.completion, |
||||
x.done.load(std::memory_order_relaxed) ? "true" : "false"); |
||||
}, |
||||
[](const Invalid&) -> std::string { return "Invalid{}"; }); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -1,72 +0,0 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H |
||||
#define GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits
|
||||
// for the callback supplied to grpc_cq_end_op() to be called, before resolving
|
||||
// to Empty{}
|
||||
class WaitForCqEndOp { |
||||
public: |
||||
WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error, |
||||
grpc_completion_queue* cq) |
||||
: state_{NotStarted{is_closure, tag, std::move(error), cq}} {} |
||||
|
||||
Poll<Empty> operator()(); |
||||
|
||||
WaitForCqEndOp(const WaitForCqEndOp&) = delete; |
||||
WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete; |
||||
WaitForCqEndOp(WaitForCqEndOp&& other) noexcept |
||||
: state_(std::move(absl::get<NotStarted>(other.state_))) { |
||||
other.state_.emplace<Invalid>(); |
||||
} |
||||
WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept { |
||||
state_ = std::move(absl::get<NotStarted>(other.state_)); |
||||
other.state_.emplace<Invalid>(); |
||||
return *this; |
||||
} |
||||
|
||||
private: |
||||
struct NotStarted { |
||||
bool is_closure; |
||||
void* tag; |
||||
grpc_error_handle error; |
||||
grpc_completion_queue* cq; |
||||
}; |
||||
struct Started { |
||||
explicit Started(Waker waker) : waker(std::move(waker)) {} |
||||
Waker waker; |
||||
grpc_cq_completion completion; |
||||
std::atomic<bool> done{false}; |
||||
}; |
||||
struct Invalid {}; |
||||
using State = absl::variant<NotStarted, Started, Invalid>; |
||||
|
||||
static std::string StateString(const State& state); |
||||
|
||||
State state_{Invalid{}}; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
|
@ -1,171 +0,0 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/transport/batch_builder.h" |
||||
|
||||
#include <type_traits> |
||||
|
||||
#include "absl/log/check.h" |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
BatchBuilder::BatchBuilder(grpc_transport_stream_op_batch_payload* payload) |
||||
: payload_(payload) {} |
||||
|
||||
void BatchBuilder::PendingCompletion::CompletionCallback( |
||||
void* self, grpc_error_handle error) { |
||||
auto* pc = static_cast<PendingCompletion*>(self); |
||||
auto* party = pc->batch->party.get(); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sFinish batch-component %s: status=%s", |
||||
pc->batch->DebugPrefix(party).c_str(), |
||||
std::string(pc->name()).c_str(), error.ToString().c_str()); |
||||
} |
||||
party->Spawn( |
||||
"batch-completion", |
||||
[pc, error = std::move(error)]() mutable { |
||||
RefCountedPtr<Batch> batch = std::exchange(pc->batch, nullptr); |
||||
pc->done_latch.Set(std::move(error)); |
||||
return Empty{}; |
||||
}, |
||||
[](Empty) {}); |
||||
} |
||||
|
||||
BatchBuilder::PendingCompletion::PendingCompletion(RefCountedPtr<Batch> batch) |
||||
: batch(std::move(batch)) { |
||||
GRPC_CLOSURE_INIT(&on_done_closure, CompletionCallback, this, nullptr); |
||||
} |
||||
|
||||
BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload, |
||||
grpc_stream_refcount* stream_refcount) |
||||
: party(GetContext<Party>()->Ref()), stream_refcount(stream_refcount) { |
||||
batch.payload = payload; |
||||
batch.is_traced = GetContext<CallContext>()->traced(); |
||||
#ifndef NDEBUG |
||||
grpc_stream_ref(stream_refcount, "pending-batch"); |
||||
#else |
||||
grpc_stream_ref(stream_refcount); |
||||
#endif |
||||
} |
||||
|
||||
BatchBuilder::Batch::~Batch() { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy", |
||||
GetContext<Activity>()->DebugTag().c_str(), this); |
||||
} |
||||
delete pending_receive_message; |
||||
delete pending_receive_initial_metadata; |
||||
delete pending_receive_trailing_metadata; |
||||
delete pending_sends; |
||||
if (batch.cancel_stream) { |
||||
delete batch.payload; |
||||
} |
||||
#ifndef NDEBUG |
||||
grpc_stream_unref(stream_refcount, "pending-batch"); |
||||
#else |
||||
grpc_stream_unref(stream_refcount); |
||||
#endif |
||||
} |
||||
|
||||
BatchBuilder::Batch* BatchBuilder::GetBatch(Target target) { |
||||
if (target_.has_value() && |
||||
(target_->stream != target.stream || |
||||
target.transport->filter_stack_transport() |
||||
->HackyDisableStreamOpBatchCoalescingInConnectedChannel())) { |
||||
FlushBatch(); |
||||
} |
||||
if (!target_.has_value()) { |
||||
target_ = target; |
||||
batch_ = GetContext<Arena>()->NewPooled<Batch>(payload_, |
||||
target_->stream_refcount); |
||||
} |
||||
CHECK_NE(batch_, nullptr); |
||||
return batch_; |
||||
} |
||||
|
||||
void BatchBuilder::FlushBatch() { |
||||
CHECK_NE(batch_, nullptr); |
||||
CHECK(target_.has_value()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_DEBUG, "%sPerform transport stream op batch: %p %s", |
||||
batch_->DebugPrefix().c_str(), &batch_->batch, |
||||
grpc_transport_stream_op_batch_string(&batch_->batch, false).c_str()); |
||||
} |
||||
std::exchange(batch_, nullptr)->PerformWith(*target_); |
||||
target_.reset(); |
||||
} |
||||
|
||||
void BatchBuilder::Batch::PerformWith(Target target) { |
||||
target.transport->filter_stack_transport()->PerformStreamOp(target.stream, |
||||
&batch); |
||||
} |
||||
|
||||
ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata( |
||||
Batch* batch, ServerMetadataHandle sent_metadata, absl::Status send_result, |
||||
bool actually_sent) { |
||||
if (!send_result.ok()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, |
||||
"%sSend metadata failed with error: %s, fabricating trailing " |
||||
"metadata", |
||||
batch->DebugPrefix().c_str(), send_result.ToString().c_str()); |
||||
} |
||||
sent_metadata->Clear(); |
||||
sent_metadata->Set(GrpcStatusMetadata(), |
||||
static_cast<grpc_status_code>(send_result.code())); |
||||
sent_metadata->Set(GrpcMessageMetadata(), |
||||
Slice::FromCopiedString(send_result.message())); |
||||
sent_metadata->Set(GrpcCallWasCancelled(), true); |
||||
} |
||||
if (!sent_metadata->get(GrpcCallWasCancelled()).has_value()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_DEBUG, |
||||
"%sTagging trailing metadata with cancellation status from " |
||||
"transport: %s", |
||||
batch->DebugPrefix().c_str(), |
||||
actually_sent ? "sent => not-cancelled" : "not-sent => cancelled"); |
||||
} |
||||
sent_metadata->Set(GrpcCallWasCancelled(), !actually_sent); |
||||
} |
||||
return sent_metadata; |
||||
} |
||||
|
||||
BatchBuilder::Batch* BatchBuilder::MakeCancel( |
||||
grpc_stream_refcount* stream_refcount, absl::Status status) { |
||||
auto* arena = GetContext<Arena>(); |
||||
auto* payload = arena->NewPooled<grpc_transport_stream_op_batch_payload>(); |
||||
auto* batch = arena->NewPooled<Batch>(payload, stream_refcount); |
||||
batch->batch.cancel_stream = true; |
||||
payload->cancel_stream.cancel_error = std::move(status); |
||||
return batch; |
||||
} |
||||
|
||||
void BatchBuilder::Cancel(Target target, absl::Status status) { |
||||
auto* batch = MakeCancel(target.stream_refcount, std::move(status)); |
||||
batch->batch.on_complete = |
||||
NewClosure([batch](absl::Status) { delete batch; }); |
||||
batch->PerformWith(target); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -1,474 +0,0 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H |
||||
#define GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/status.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/status_helper.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/promise/party.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Build up a transport stream op batch for a stream for a promise based
|
||||
// connected channel.
|
||||
// Offered as a context from Call, so that it can collect ALL the updates during
|
||||
// a single party round, and then push them down to the transport as a single
|
||||
// transaction.
|
||||
class BatchBuilder { |
||||
public: |
||||
explicit BatchBuilder(grpc_transport_stream_op_batch_payload* payload); |
||||
~BatchBuilder() { |
||||
if (batch_ != nullptr) FlushBatch(); |
||||
} |
||||
|
||||
struct Target { |
||||
Transport* transport; |
||||
grpc_stream* stream; |
||||
grpc_stream_refcount* stream_refcount; |
||||
}; |
||||
|
||||
BatchBuilder(const BatchBuilder&) = delete; |
||||
BatchBuilder& operator=(const BatchBuilder&) = delete; |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendMessage(Target target, MessageHandle message); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendClientInitialMetadata(Target target, ClientMetadataHandle metadata); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendClientTrailingMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendServerInitialMetadata(Target target, ServerMetadataHandle metadata); |
||||
|
||||
// Returns a promise that will resolve to a ServerMetadataHandle when the send
|
||||
// is completed.
|
||||
//
|
||||
// If convert_to_cancellation is true, then the status will be converted to a
|
||||
// cancellation batch instead of a trailing metadata op in a coalesced batch.
|
||||
//
|
||||
// This quirk exists as in the filter based stack upon which our transports
|
||||
// were written if a trailing metadata op were sent it always needed to be
|
||||
// paired with an initial op batch, and the transports would wait for the
|
||||
// initial metadata batch to arrive (in case of reordering up the stack).
|
||||
auto SendServerTrailingMetadata(Target target, ServerMetadataHandle metadata, |
||||
bool convert_to_cancellation); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<optional<MessageHandle>>
|
||||
// when a message is received.
|
||||
// Error => non-ok status
|
||||
// End of stream => Ok, nullopt (no message)
|
||||
// Message => Ok, message
|
||||
auto ReceiveMessage(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ClientMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveClientInitialMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ClientMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveClientTrailingMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ServerMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveServerInitialMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ServerMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveServerTrailingMetadata(Target target); |
||||
|
||||
// Send a cancellation: does not occupy the same payload, nor does it
|
||||
// coalesce with other ops.
|
||||
void Cancel(Target target, absl::Status status); |
||||
|
||||
private: |
||||
struct Batch; |
||||
|
||||
// Base pending operation
|
||||
struct PendingCompletion { |
||||
explicit PendingCompletion(RefCountedPtr<Batch> batch); |
||||
virtual absl::string_view name() const = 0; |
||||
static void CompletionCallback(void* self, grpc_error_handle error); |
||||
grpc_closure on_done_closure; |
||||
Latch<absl::Status> done_latch; |
||||
RefCountedPtr<Batch> batch; |
||||
|
||||
protected: |
||||
~PendingCompletion() = default; |
||||
}; |
||||
|
||||
// A pending receive message.
|
||||
struct PendingReceiveMessage final : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
absl::string_view name() const override { return "receive_message"; } |
||||
|
||||
MessageHandle IntoMessageHandle() { |
||||
return Arena::MakePooled<Message>(std::move(*payload), flags); |
||||
} |
||||
|
||||
absl::optional<SliceBuffer> payload; |
||||
uint32_t flags; |
||||
bool call_failed_before_recv_message = false; |
||||
}; |
||||
|
||||
// A pending receive metadata.
|
||||
struct PendingReceiveMetadata : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
Arena::PoolPtr<grpc_metadata_batch> metadata = |
||||
Arena::MakePooled<grpc_metadata_batch>(); |
||||
|
||||
protected: |
||||
~PendingReceiveMetadata() = default; |
||||
}; |
||||
|
||||
struct PendingReceiveInitialMetadata final : public PendingReceiveMetadata { |
||||
using PendingReceiveMetadata::PendingReceiveMetadata; |
||||
absl::string_view name() const override { |
||||
return "receive_initial_metadata"; |
||||
} |
||||
}; |
||||
|
||||
struct PendingReceiveTrailingMetadata final : public PendingReceiveMetadata { |
||||
using PendingReceiveMetadata::PendingReceiveMetadata; |
||||
absl::string_view name() const override { |
||||
return "receive_trailing_metadata"; |
||||
} |
||||
}; |
||||
|
||||
// Pending sends in a batch
|
||||
struct PendingSends final : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
absl::string_view name() const override { return "sends"; } |
||||
|
||||
MessageHandle send_message; |
||||
Arena::PoolPtr<grpc_metadata_batch> send_initial_metadata; |
||||
Arena::PoolPtr<grpc_metadata_batch> send_trailing_metadata; |
||||
bool trailing_metadata_sent = false; |
||||
}; |
||||
|
||||
// One outstanding batch.
|
||||
struct Batch final { |
||||
Batch(grpc_transport_stream_op_batch_payload* payload, |
||||
grpc_stream_refcount* stream_refcount); |
||||
~Batch(); |
||||
Batch(const Batch&) = delete; |
||||
Batch& operator=(const Batch&) = delete; |
||||
std::string DebugPrefix(Activity* activity = GetContext<Activity>()) const { |
||||
return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(), |
||||
this); |
||||
} |
||||
|
||||
void IncrementRefCount() { ++refs; } |
||||
void Unref() { |
||||
if (--refs == 0) delete this; |
||||
} |
||||
RefCountedPtr<Batch> Ref() { |
||||
IncrementRefCount(); |
||||
return RefCountedPtr<Batch>(this); |
||||
} |
||||
// Get an initialized pending completion.
|
||||
// There are four pending completions potentially contained within a batch.
|
||||
// They can be rather large so we don't create all of them always. Instead,
|
||||
// we dynamically create them on the arena as needed.
|
||||
// This method either returns the existing completion in a batch if that
|
||||
// completion has already been initialized, or it creates a new completion
|
||||
// and returns that.
|
||||
template <typename T> |
||||
T* GetInitializedCompletion(T*(Batch::*field)) { |
||||
if (this->*field != nullptr) return this->*field; |
||||
this->*field = new T(Ref()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sAdd batch closure for %s @ %s", |
||||
DebugPrefix().c_str(), |
||||
std::string((this->*field)->name()).c_str(), |
||||
(this->*field)->on_done_closure.DebugString().c_str()); |
||||
} |
||||
return this->*field; |
||||
} |
||||
// grpc_transport_perform_stream_op on target.stream
|
||||
void PerformWith(Target target); |
||||
// Take a promise, and return a promise that holds a ref on this batch until
|
||||
// the promise completes or is cancelled.
|
||||
template <typename P> |
||||
auto RefUntil(P promise) { |
||||
return [self = Ref(), promise = std::move(promise)]() mutable { |
||||
return promise(); |
||||
}; |
||||
} |
||||
|
||||
grpc_transport_stream_op_batch batch; |
||||
PendingReceiveMessage* pending_receive_message = nullptr; |
||||
PendingReceiveInitialMetadata* pending_receive_initial_metadata = nullptr; |
||||
PendingReceiveTrailingMetadata* pending_receive_trailing_metadata = nullptr; |
||||
PendingSends* pending_sends = nullptr; |
||||
const RefCountedPtr<Party> party; |
||||
grpc_stream_refcount* const stream_refcount; |
||||
uint8_t refs = 0; |
||||
}; |
||||
|
||||
// Get a batch for the given target.
|
||||
// Currently: if the current batch is for this target, return it - otherwise
|
||||
// flush the batch and start a new one (and return that).
|
||||
// This function may change in the future to allow multiple batches to be
|
||||
// building at once (if that turns out to be useful for hedging).
|
||||
Batch* GetBatch(Target target); |
||||
// Flush the current batch down to the transport.
|
||||
void FlushBatch(); |
||||
// Create a cancel batch with its own payload.
|
||||
Batch* MakeCancel(grpc_stream_refcount* stream_refcount, absl::Status status); |
||||
|
||||
// Note: we don't distinguish between client and server metadata here.
|
||||
// At the time of writing they're both the same thing - and it's unclear
|
||||
// whether we'll get to separate them prior to batches going away or not.
|
||||
// So for now we claim YAGNI and just do the simplest possible implementation.
|
||||
auto SendInitialMetadata(Target target, |
||||
Arena::PoolPtr<grpc_metadata_batch> md); |
||||
auto ReceiveInitialMetadata(Target target); |
||||
auto ReceiveTrailingMetadata(Target target); |
||||
|
||||
// Combine send status and server metadata into a final status to report back
|
||||
// to the containing call.
|
||||
static ServerMetadataHandle CompleteSendServerTrailingMetadata( |
||||
Batch* batch, ServerMetadataHandle sent_metadata, |
||||
absl::Status send_result, bool actually_sent); |
||||
|
||||
grpc_transport_stream_op_batch_payload* const payload_; |
||||
absl::optional<Target> target_; |
||||
Batch* batch_ = nullptr; |
||||
}; |
||||
|
||||
inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue send message: %s", batch->DebugPrefix().c_str(), |
||||
message->DebugString().c_str()); |
||||
} |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_message = true; |
||||
payload_->send_message.send_message = message->payload(); |
||||
payload_->send_message.flags = message->flags(); |
||||
pc->send_message = std::move(message); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendInitialMetadata( |
||||
Target target, Arena::PoolPtr<grpc_metadata_batch> md) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue send initial metadata: %s", |
||||
batch->DebugPrefix().c_str(), md->DebugString().c_str()); |
||||
} |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_initial_metadata = true; |
||||
payload_->send_initial_metadata.send_initial_metadata = md.get(); |
||||
pc->send_initial_metadata = std::move(md); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendClientInitialMetadata( |
||||
Target target, ClientMetadataHandle metadata) { |
||||
return SendInitialMetadata(target, std::move(metadata)); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendClientTrailingMetadata(Target target) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue send trailing metadata", |
||||
batch->DebugPrefix().c_str()); |
||||
} |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_trailing_metadata = true; |
||||
auto metadata = Arena::MakePooled<grpc_metadata_batch>(); |
||||
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); |
||||
payload_->send_trailing_metadata.sent = nullptr; |
||||
pc->send_trailing_metadata = std::move(metadata); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendServerInitialMetadata( |
||||
Target target, ServerMetadataHandle metadata) { |
||||
return SendInitialMetadata(target, std::move(metadata)); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendServerTrailingMetadata( |
||||
Target target, ServerMetadataHandle metadata, |
||||
bool convert_to_cancellation) { |
||||
Batch* batch; |
||||
PendingSends* pc; |
||||
if (convert_to_cancellation) { |
||||
const auto status_code = |
||||
metadata->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); |
||||
auto status = grpc_error_set_int( |
||||
absl::Status(static_cast<absl::StatusCode>(status_code), |
||||
metadata->GetOrCreatePointer(GrpcMessageMetadata()) |
||||
->as_string_view()), |
||||
StatusIntProperty::kRpcStatus, status_code); |
||||
batch = MakeCancel(target.stream_refcount, std::move(status)); |
||||
pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
} else { |
||||
batch = GetBatch(target); |
||||
pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.send_trailing_metadata = true; |
||||
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); |
||||
payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent; |
||||
} |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s%s: %s", batch->DebugPrefix().c_str(), |
||||
convert_to_cancellation ? "Send trailing metadata as cancellation" |
||||
: "Queue send trailing metadata", |
||||
metadata->DebugString().c_str()); |
||||
} |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
pc->send_trailing_metadata = std::move(metadata); |
||||
auto promise = Map(pc->done_latch.WaitAndCopy(), |
||||
[pc, batch = batch->Ref()](absl::Status status) { |
||||
return CompleteSendServerTrailingMetadata( |
||||
batch.get(), std::move(pc->send_trailing_metadata), |
||||
std::move(status), pc->trailing_metadata_sent); |
||||
}); |
||||
if (convert_to_cancellation) { |
||||
batch->PerformWith(target); |
||||
} |
||||
return promise; |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveMessage(Target target) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue receive message", batch->DebugPrefix().c_str()); |
||||
} |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message); |
||||
batch->batch.recv_message = true; |
||||
payload_->recv_message.recv_message_ready = &pc->on_done_closure; |
||||
payload_->recv_message.recv_message = &pc->payload; |
||||
payload_->recv_message.flags = &pc->flags; |
||||
payload_->recv_message.call_failed_before_recv_message = |
||||
&pc->call_failed_before_recv_message; |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) |
||||
-> absl::StatusOr<absl::optional<MessageHandle>> { |
||||
if (!status.ok()) return status; |
||||
if (!pc->payload.has_value()) { |
||||
if (pc->call_failed_before_recv_message) { |
||||
return absl::CancelledError(); |
||||
} |
||||
return absl::nullopt; |
||||
} |
||||
return pc->IntoMessageHandle(); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveInitialMetadata(Target target) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue receive initial metadata", |
||||
batch->DebugPrefix().c_str()); |
||||
} |
||||
auto* pc = |
||||
batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata); |
||||
batch->batch.recv_initial_metadata = true; |
||||
payload_->recv_initial_metadata.recv_initial_metadata_ready = |
||||
&pc->on_done_closure; |
||||
payload_->recv_initial_metadata.recv_initial_metadata = pc->metadata.get(); |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) -> absl::StatusOr<ClientMetadataHandle> { |
||||
if (!status.ok()) return status; |
||||
return std::move(pc->metadata); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveClientInitialMetadata(Target target) { |
||||
return ReceiveInitialMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) { |
||||
return ReceiveInitialMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) { |
||||
auto* batch = GetBatch(target); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%sQueue receive trailing metadata", |
||||
batch->DebugPrefix().c_str()); |
||||
} |
||||
auto* pc = batch->GetInitializedCompletion( |
||||
&Batch::pending_receive_trailing_metadata); |
||||
batch->batch.recv_trailing_metadata = true; |
||||
payload_->recv_trailing_metadata.recv_trailing_metadata_ready = |
||||
&pc->on_done_closure; |
||||
payload_->recv_trailing_metadata.recv_trailing_metadata = pc->metadata.get(); |
||||
payload_->recv_trailing_metadata.collect_stats = |
||||
&GetContext<CallContext>()->call_stats()->transport_stream_stats; |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) -> absl::StatusOr<ServerMetadataHandle> { |
||||
if (!status.ok()) return status; |
||||
return std::move(pc->metadata); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveClientTrailingMetadata(Target target) { |
||||
return ReceiveTrailingMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveServerTrailingMetadata(Target target) { |
||||
return ReceiveTrailingMetadata(target); |
||||
} |
||||
|
||||
template <> |
||||
struct ContextType<BatchBuilder> {}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue