More changes (#31194)

pull/31202/head
Esun Kim 3 years ago committed by GitHub
parent 203bc0dc52
commit 4bd27c524a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  2. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  3. 16
      src/core/lib/channel/connected_channel.cc
  4. 8
      src/core/lib/iomgr/error.h
  5. 2
      src/core/lib/security/transport/security_handshaker.cc
  6. 2
      src/core/lib/security/transport/server_auth_filter.cc
  7. 11
      src/core/lib/surface/call.cc
  8. 14
      src/core/lib/transport/transport.cc
  9. 2
      src/cpp/ext/filters/census/client_filter.cc
  10. 3
      test/core/end2end/fixtures/h2_sockpair_with_minstack.cc
  11. 5
      test/core/http/request_fuzzer.cc
  12. 5
      test/core/http/response_fuzzer.cc
  13. 3
      test/core/transport/chttp2/hpack_parser_fuzzer_test.cc
  14. 2
      test/core/transport/chttp2/settings_timeout_test.cc
  15. 4
      test/cpp/microbenchmarks/bm_call_create.cc
  16. 4
      test/cpp/microbenchmarks/bm_exec_ctx.cc
  17. 14
      test/cpp/microbenchmarks/bm_pollset.cc

@ -104,7 +104,7 @@ class CallData {
ForwardSendMessageBatch, elem, grpc_schedule_on_exec_ctx);
}
~CallData() { GRPC_ERROR_UNREF(cancel_error_); }
~CallData() {}
void CompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);

@ -185,7 +185,7 @@ struct call_data {
}
}
~call_data() { GRPC_ERROR_UNREF(error); }
~call_data() {}
grpc_core::CallCombiner* call_combiner;
grpc_core::MessageSizeParsedConfig::message_size_limits limits;

@ -289,7 +289,7 @@ class ClientStream : public Orphanable {
auto* stream = stream_.get();
cancel_op->on_complete = NewClosure(
[this](grpc_error_handle) { Unref("shutdown client stream"); });
batch_payload_.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
batch_payload_.cancel_stream.cancel_error = absl::CancelledError();
grpc_transport_perform_stream_op(transport_, stream, cancel_op);
}
Unref("orphan client stream");
@ -521,7 +521,7 @@ class ClientStream : public Orphanable {
}
void RecvInitialMetadataReady(grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(error == absl::OkStatus());
{
MutexLock lock(&mu_);
server_initial_metadata_state_ =
@ -532,7 +532,7 @@ class ClientStream : public Orphanable {
}
void RecvTrailingMetadataReady(grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(error == absl::OkStatus());
{
MutexLock lock(&mu_);
queued_trailing_metadata_ = true;
@ -542,14 +542,14 @@ class ClientStream : public Orphanable {
}
void MetadataBatchDone(grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(error == absl::OkStatus());
Unref("metadata_batch_done");
}
void SendMessageBatchDone(grpc_error_handle error) {
{
MutexLock lock(&mu_);
if (error != GRPC_ERROR_NONE) {
if (error != absl::OkStatus()) {
// Note that we're in error here, the call will be closed by the
// transport in a moment, and we'll return from the promise with an
// error - so we don't need to do any extra work to close out pipes or
@ -567,7 +567,7 @@ class ClientStream : public Orphanable {
void RecvMessageBatchDone(grpc_error_handle error) {
{
MutexLock lock(&mu_);
if (error != GRPC_ERROR_NONE) {
if (error != absl::OkStatus()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%sRecvMessageBatchDone: error=%s",
recv_message_waker_.ActivityDebugTag().c_str(),
@ -597,7 +597,7 @@ class ClientStream : public Orphanable {
grpc_transport_perform_stream_op(transport_, stream_.get(), batch);
} else {
grpc_transport_stream_op_batch_finish_with_failure_from_transport(
batch, GRPC_ERROR_CANCELLED);
batch, absl::CancelledError());
}
};
bool push_metadata;
@ -657,7 +657,7 @@ class ClientStream : public Orphanable {
void SchedulePush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (std::exchange(scheduled_push_, true)) return;
IncrementRefCount("push");
ExecCtx::Run(DEBUG_LOCATION, &push_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &push_, absl::OkStatus());
}
std::string ActiveOpsString() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {

@ -258,12 +258,8 @@ inline bool grpc_log_if_error(const char* what, grpc_error_handle error,
/// This could be considered as atomic<grpc_error_handle>.
class AtomicError {
public:
AtomicError() {
error_ = absl::OkStatus();
lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
}
AtomicError() = default;
explicit AtomicError(grpc_error_handle error) { error_ = error; }
~AtomicError() { GRPC_ERROR_UNREF(error_); }
AtomicError(const AtomicError&) = delete;
AtomicError& operator=(const AtomicError&) = delete;
@ -291,7 +287,7 @@ class AtomicError {
private:
grpc_error_handle error_;
gpr_spinlock lock_;
gpr_spinlock lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
};
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */

@ -581,7 +581,7 @@ void SecurityHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
class FailHandshaker : public Handshaker {
public:
const char* name() const override { return "security_fail"; }
void Shutdown(grpc_error_handle why) override { GRPC_ERROR_UNREF(why); }
void Shutdown(grpc_error_handle /*why*/) override {}
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* args) override {

@ -96,7 +96,7 @@ struct call_data {
grpc_server_security_context_destroy;
}
~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); }
~call_data() {}
grpc_core::CallCombiner* call_combiner;
grpc_call_stack* owning_call;

@ -324,7 +324,7 @@ void Call::PropagateCancellationToChildren() {
Call* next_child_call = child->child_->sibling_next;
if (child->cancellation_is_inherited_) {
child->InternalRef("propagate_cancel");
child->CancelWithError(GRPC_ERROR_CANCELLED);
child->CancelWithError(absl::CancelledError());
child->InternalUnref("propagate_cancel");
}
child = next_child_call;
@ -1331,14 +1331,14 @@ void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(cq, notify_tag));
grpc_cq_end_op(
cq, notify_tag, GRPC_ERROR_NONE,
cq, notify_tag, absl::OkStatus(),
[](void*, grpc_cq_completion* completion) { gpr_free(completion); },
nullptr,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag),
GRPC_ERROR_NONE);
absl::OkStatus());
}
}
} // namespace
@ -2195,7 +2195,7 @@ grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
PromiseBasedCall* call = new (alloc.second) T(alloc.first, args);
*out_call = call->c_ptr();
GPR_DEBUG_ASSERT(Call::FromC(*out_call) == call);
return GRPC_ERROR_NONE;
return absl::OkStatus();
}
PromiseBasedCall::PromiseBasedCall(Arena* arena,
@ -2319,7 +2319,7 @@ void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
CompletionInfo::Pending& pending = completion_info_[i].pending;
GPR_ASSERT(pending.pending_op_bits & PendingOpBit(reason));
pending.pending_op_bits &= ~PendingOpBit(reason);
auto error = pending.success ? GRPC_ERROR_NONE : GRPC_ERROR_CANCELLED;
auto error = pending.success ? absl::OkStatus() : absl::CancelledError();
if (pending.pending_op_bits == 0) {
if (pending.is_closure) {
ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(pending.tag),
@ -2486,7 +2486,6 @@ void ClientPromiseBasedCall::CancelWithError(grpc_error_handle error) {
MutexLock lock(mu());
ScopedContext context(this);
Finish(ServerMetadataHandle(grpc_error_to_absl_status(error)));
GRPC_ERROR_UNREF(error);
}
grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops,

@ -188,31 +188,27 @@ void grpc_transport_stream_op_batch_queue_finish_with_failure(
void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
grpc_transport_stream_op_batch* batch, grpc_error_handle error) {
if (batch->cancel_stream) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
// Construct a list of closures to execute.
if (batch->recv_initial_metadata) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
error);
}
if (batch->recv_message) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, batch->payload->recv_message.recv_message_ready, error);
}
if (batch->recv_trailing_metadata) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
error);
}
if (batch->on_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete, error);
}
GRPC_ERROR_UNREF(error);
}
struct made_transport_op {

@ -74,7 +74,7 @@ grpc_error_handle CensusClientChannelData::Init(
tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
return GRPC_ERROR_NONE;
return absl::OkStatus();
}
//

@ -63,10 +63,9 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::Server* core_server = grpc_core::Server::FromC(f->server);
grpc_error_handle error = core_server->SetupTransport(
transport, nullptr, core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
if (error == absl::OkStatus()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
}

@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/error.h"
bool squelch = true;
bool leak_check = true;
@ -35,8 +34,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
memset(&request, 0, sizeof(request));
grpc_http_parser_init(&parser, GRPC_HTTP_REQUEST, &request);
grpc_slice slice = grpc_slice_from_copied_buffer((const char*)data, size);
GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice, nullptr));
GRPC_ERROR_UNREF(grpc_http_parser_eof(&parser));
(void)grpc_http_parser_parse(&parser, slice, nullptr);
(void)grpc_http_parser_eof(&parser);
grpc_slice_unref(slice);
grpc_http_parser_destroy(&parser);
grpc_http_request_destroy(&request);

@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/error.h"
bool squelch = true;
bool leak_check = true;
@ -35,8 +34,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
response = {};
grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
grpc_slice slice = grpc_slice_from_copied_buffer((const char*)data, size);
GRPC_ERROR_UNREF(grpc_http_parser_parse(&parser, slice, nullptr));
GRPC_ERROR_UNREF(grpc_http_parser_eof(&parser));
(void)grpc_http_parser_parse(&parser, slice, nullptr);
(void)grpc_http_parser_eof(&parser);
grpc_slice_unref(slice);
grpc_http_parser_destroy(&parser);
grpc_http_response_destroy(&response);

@ -26,7 +26,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -85,7 +84,7 @@ DEFINE_PROTO_FUZZER(const hpack_parser_fuzzer::Msg& msg) {
for (const auto& parse : frame.parse()) {
grpc_slice buffer =
grpc_slice_from_copied_buffer(parse.data(), parse.size());
GRPC_ERROR_UNREF(parser->Parse(buffer, i == msg.frames_size() - 1));
(void)parser->Parse(buffer, i == msg.frames_size() - 1);
grpc_slice_unref(buffer);
stop_buffering_ctr--;
if (0 == stop_buffering_ctr) parser->StopBufferingFrame();

@ -196,7 +196,7 @@ class Client {
grpc_schedule_on_exec_ctx);
}
~EventState() { GRPC_ERROR_UNREF(error_); }
~EventState() {}
grpc_closure* closure() { return &closure_; }

@ -561,8 +561,8 @@ static void BM_IsolatedFilter(benchmark::State& state) {
grpc_core::Arena::Create(kArenaSize, g_memory_allocator),
nullptr};
while (state.KeepRunning()) {
GRPC_ERROR_UNREF(
grpc_call_stack_init(channel_stack, 1, DoNothing, nullptr, &call_args));
(void)grpc_call_stack_init(channel_stack, 1, DoNothing, nullptr,
&call_args);
typename TestOp::Op op(&test_op_data, call_stack, call_args.arena);
grpc_call_stack_destroy(call_stack, &final_info, nullptr);
op.Finish();

@ -34,7 +34,7 @@ void BM_ExecCtx_Run(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
for (int i = 0; i < cb_count; i++) {
exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE);
exec_ctx.Run(DEBUG_LOCATION, &cb, absl::OkStatus());
exec_ctx.Flush();
}
}
@ -68,7 +68,7 @@ void BM_ExecCtx_RunCounted(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
for (int i = 0; i < cb_count; i++) {
exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE);
exec_ctx.Run(DEBUG_LOCATION, &cb, absl::OkStatus());
exec_ctx.Flush();
}
data.signal->WaitForNotification();

@ -118,8 +118,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
gpr_mu_lock(mu);
for (auto _ : state) {
GRPC_ERROR_UNREF(
grpc_pollset_work(ps, nullptr, grpc_core::Timestamp::ProcessEpoch()));
(void)grpc_pollset_work(ps, nullptr, grpc_core::Timestamp::ProcessEpoch());
}
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
@ -220,25 +219,24 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_pollset_init(ps, &mu);
grpc_core::ExecCtx exec_ctx;
grpc_wakeup_fd wakeup_fd;
GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd));
(void)grpc_wakeup_fd_init(&wakeup_fd);
grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false);
grpc_pollset_add_fd(ps, wakeup);
bool done = false;
TestClosure* continue_closure = MakeTestClosure([&]() {
GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd));
(void)grpc_wakeup_fd_consume_wakeup(&wakeup_fd);
if (!state.KeepRunning()) {
done = true;
return;
}
GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
(void)grpc_wakeup_fd_wakeup(&wakeup_fd);
grpc_fd_notify_on_read(wakeup, continue_closure);
});
GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
(void)grpc_wakeup_fd_wakeup(&wakeup_fd);
grpc_fd_notify_on_read(wakeup, continue_closure);
gpr_mu_lock(mu);
while (!done) {
GRPC_ERROR_UNREF(
grpc_pollset_work(ps, nullptr, grpc_core::Timestamp::InfFuture()));
(void)grpc_pollset_work(ps, nullptr, grpc_core::Timestamp::InfFuture());
}
grpc_fd_orphan(wakeup, nullptr, nullptr, "done");
wakeup_fd.read_fd = 0;

Loading…
Cancel
Save