pull/36509/head
Craig Tiller 10 months ago
parent 5d114c60c1
commit 2a75dc947d
  1. 9
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 5
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 2
      src/core/lib/promise/status_flag.h
  4. 121
      src/core/lib/surface/call.cc
  5. 1
      src/core/lib/surface/server.cc
  6. 6
      src/core/lib/surface/server.h
  7. 2
      src/core/lib/surface/server_interface.h
  8. 6
      src/core/lib/transport/call_spine.h

@ -380,10 +380,11 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
OnTransportActivityDone("reader"));
}
void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
GPR_ASSERT(acceptor_ == nullptr);
GPR_ASSERT(acceptor != nullptr);
acceptor_ = acceptor;
void ChaoticGoodServerTransport::SetCallDestination(
RefCountedPtr<UnstartedCallDestination> call_destination) {
GPR_ASSERT(call_destination_ == nullptr);
GPR_ASSERT(call_destination != nullptr);
call_destination_ = call_destination;
got_acceptor_.Set();
}

@ -98,7 +98,8 @@ class ChaoticGoodServerTransport final : public ServerTransport {
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { Unref(); }
void SetAcceptor(Acceptor* acceptor) override;
void SetCallDestination(
RefCountedPtr<UnstartedCallDestination> call_destination) override;
void AbortWithError();
private:
@ -137,7 +138,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
auto PushFragmentIntoCall(CallInitiator call_initiator,
ClientFragmentFrame frame, uint32_t stream_id);
Acceptor* acceptor_ = nullptr;
RefCountedPtr<UnstartedCallDestination> call_destination_;
InterActivityLatch<void> got_acceptor_;
MpscReceiver<ServerFrame> outgoing_frames_;
// Assigned aligned bytes from setting frame.

@ -170,6 +170,8 @@ class ValueOrFailure {
T& value() { return value_.value(); }
const T& operator*() const { return *value_; }
T& operator*() { return *value_; }
const T* operator->() const { return &*value_; }
T* operator->() { return &*value_; }
bool operator==(const ValueOrFailure& other) const {
return value_ == other.value_;

@ -3152,7 +3152,19 @@ grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
class ServerCall final : public Call {
public:
ServerCall(ClientMetadataHandle client_initial_metadata,
CallHandler call_handler);
CallHandler call_handler, ServerInterface* server,
grpc_completion_queue* cq, ServerTransport* transport)
: Call(false,
client_initial_metadata->get(GrpcTimeoutMetadata())
.value_or(Timestamp::InfFuture()),
call_handler.event_engine()),
call_handler_(std::move(call_handler)),
client_initial_metadata_stored_(std::move(client_initial_metadata)),
cq_(cq),
server_(server),
transport_(transport) {
global_stats().IncrementServerCallsCreated();
}
void CancelWithError(grpc_error_handle error) override {
call_handler_.SpawnInfallible(
@ -3172,13 +3184,53 @@ class ServerCall final : public Call {
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) override;
Arena* arena() override { return call_handler_.arena(); }
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return call_handler_.event_engine();
}
void ContextSet(grpc_context_index elem, void* value,
void (*destroy)(void*)) override {
call_handler_.legacy_context()[elem] =
grpc_call_context_element{value, destroy};
}
void* ContextGet(grpc_context_index elem) const override {
return call_handler_.legacy_context()[elem].value;
}
void SetCompletionQueue(grpc_completion_queue* cq) override {
Crash("unimplemented");
}
grpc_compression_options compression_options() override {
return server_->compression_options();
}
grpc_call_stack* call_stack() override { return nullptr; }
char* GetPeer() override {
Slice peer_slice = GetPeerString();
if (!peer_slice.empty()) {
absl::string_view peer_string_view = peer_slice.as_string_view();
char* peer_string =
static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
peer_string[peer_string_view.size()] = '\0';
return peer_string;
}
return gpr_strdup("unknown");
}
bool Completed() final { Crash("unimplemented"); }
bool failed_before_recv_message() const final { Crash("unimplemented"); }
private:
void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure);
StatusFlag FinishRecvMessage(NextResult<MessageHandle> result);
StatusFlag FinishRecvMessage(
ValueOrFailure<absl::optional<MessageHandle>> result);
std::string DebugTag();
@ -3186,15 +3238,10 @@ class ServerCall final : public Call {
grpc_byte_buffer** recv_message_ = nullptr;
ClientMetadataHandle client_initial_metadata_stored_;
grpc_completion_queue* const cq_;
ServerInterface* const server_;
ServerTransport* const transport_;
};
ServerCall::ServerCall(ClientMetadataHandle client_initial_metadata,
CallHandler call_handler)
: call_handler_(std::move(call_handler)),
client_initial_metadata_stored_(std::move(client_initial_metadata)) {
global_stats().IncrementServerCallsCreated();
}
grpc_call_error ServerCall::StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) {
@ -3346,47 +3393,47 @@ PollBatchLogger<F> LogPollBatch(void* tag, F f) {
}
} // namespace
StatusFlag ServerCall::FinishRecvMessage(NextResult<MessageHandle> result) {
if (result.has_value()) {
MessageHandle& message = *result;
NoteLastMessageFlags(message->flags());
if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
*recv_message_ = grpc_raw_compressed_byte_buffer_create(
nullptr, 0, incoming_compression_algorithm());
} else {
*recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
}
grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
&(*recv_message_)->data.raw.slice_buffer);
StatusFlag ServerCall::FinishRecvMessage(
ValueOrFailure<absl::optional<MessageHandle>> result) {
if (!result.ok()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
"finishes: received %" PRIdPTR " byte message",
DebugTag().c_str(),
(*recv_message_)->data.raw.slice_buffer.length);
"finishes: received end-of-stream with error",
DebugTag().c_str());
}
*recv_message_ = nullptr;
recv_message_ = nullptr;
return Success{};
return Failure{};
}
if (result.cancelled()) {
if (!result->has_value()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
"finishes: received end-of-stream with error",
"finishes: received end-of-stream",
DebugTag().c_str());
}
*recv_message_ = nullptr;
recv_message_ = nullptr;
return Failure{};
return Success{};
}
MessageHandle& message = **result;
NoteLastMessageFlags(message->flags());
if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
*recv_message_ = grpc_raw_compressed_byte_buffer_create(
nullptr, 0, incoming_compression_algorithm());
} else {
*recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
}
grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
&(*recv_message_)->data.raw.slice_buffer);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
"finishes: received end-of-stream",
DebugTag().c_str());
"finishes: received %" PRIdPTR " byte message",
DebugTag().c_str(), (*recv_message_)->data.raw.slice_buffer.length);
}
*recv_message_ = nullptr;
recv_message_ = nullptr;
return Success{};
}
@ -3456,8 +3503,8 @@ void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
GPR_ASSERT(recv_message_ == nullptr);
recv_message_ = op.data.recv_message.recv_message;
return [this]() mutable {
return Map(client_to_server_messages_.receiver.Next(),
[this](NextResult<MessageHandle> msg) {
return Map(call_handler_.PullMessage(),
[this](ValueOrFailure<absl::optional<MessageHandle>> msg) {
return FinishRecvMessage(std::move(msg));
});
};
@ -3512,8 +3559,10 @@ grpc_call* MakeServerCall(CallHandler call_handler,
grpc_metadata_array* publish_initial_metadata) {
PublishMetadataArray(client_initial_metadata.get(), publish_initial_metadata,
false);
return call_handler.arena()->New<ServerCall>(
std::move(client_initial_metadata), std::move(call_handler));
return call_handler.arena()
->New<ServerCall>(std::move(client_initial_metadata),
std::move(call_handler))
->c_ptr();
}
} // namespace grpc_core

@ -806,6 +806,7 @@ Server::Server(const ChannelArgs& args)
: channel_args_(args),
channelz_node_(CreateChannelzNode(args)),
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
compression_options_(CompressionOptionsFromChannelArgs(args)),
max_time_in_pending_queue_(Duration::Seconds(
channel_args_
.GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)

@ -38,6 +38,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
@ -211,6 +212,10 @@ class Server : public ServerInterface,
void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
grpc_compression_options compression_options() const override {
return compression_options_;
}
private:
struct RequestedCall;
@ -451,6 +456,7 @@ class Server : public ServerInterface,
std::vector<grpc_completion_queue*> cqs_;
std::vector<grpc_pollset*> pollsets_;
bool started_ = false;
const grpc_compression_options compression_options_;
// The two following mutexes control access to server-state.
// mu_global_ controls access to non-call-related state (e.g., channel state).

@ -17,6 +17,7 @@
#ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_INTERFACE_H
#define GRPC_SRC_CORE_LIB_SURFACE_SERVER_INTERFACE_H
#include <grpc/compression.h>
#include <grpc/support/port_platform.h>
#include "src/core/channelz/channelz.h"
@ -36,6 +37,7 @@ class ServerInterface {
virtual const ChannelArgs& channel_args() const = 0;
virtual channelz::ServerNode* channelz_node() const = 0;
virtual ServerCallTracerFactory* server_call_tracer_factory() const = 0;
virtual grpc_compression_options compression_options() const = 0;
};
} // namespace grpc_core

@ -536,11 +536,15 @@ class CallHandler {
Arena* arena() { return spine_->arena(); }
grpc_event_engine::experimental::EventEngine* event_engine() {
grpc_event_engine::experimental::EventEngine* event_engine() const {
return DownCast<CallSpine*>(spine_.get())->event_engine();
}
// TODO(ctiller): re-evaluate this API
const grpc_call_context_element* legacy_context() const {
return DownCast<CallSpine*>(spine_.get())->legacy_context();
}
grpc_call_context_element* legacy_context() {
return DownCast<CallSpine*>(spine_.get())->legacy_context();
}

Loading…
Cancel
Save