Fix race between Read and ServerContext::IsCancelled in Sync API (#27056)

* Fix OOM issues in qps tests

* Add more verbose logging.

* Fix clang error

* Fix race between IsCancelled and Read

* Fix build errors from using bool in C code
pull/27357/head
Alisha Nanda 3 years ago committed by GitHub
parent 96a7b357dc
commit 0a502d8f2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      grpc.def
  2. 4
      include/grpc/grpc.h
  3. 1
      include/grpcpp/impl/codegen/core_codegen.h
  4. 1
      include/grpcpp/impl/codegen/core_codegen_interface.h
  5. 9
      include/grpcpp/impl/codegen/server_context.h
  6. 12
      include/grpcpp/impl/codegen/sync_stream.h
  7. 2
      src/core/lib/surface/call.cc
  8. 6
      src/core/lib/surface/call.h
  9. 4
      src/cpp/common/core_codegen.cc
  10. 9
      src/cpp/server/server_context.cc
  11. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  12. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  13. 1
      test/core/surface/public_headers_must_be_c89.c
  14. 7
      test/cpp/end2end/test_service_impl.h

1
grpc.def generated

@ -50,6 +50,7 @@ EXPORTS
grpc_channel_destroy
grpc_call_cancel
grpc_call_cancel_with_status
grpc_call_failed_before_recv_message
grpc_call_ref
grpc_call_unref
grpc_server_request_call

@ -340,6 +340,10 @@ GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call* call,
const char* description,
void* reserved);
/* Returns whether or not the call's receive message operation failed because of
* an error (as opposed to a graceful end-of-stream) */
GRPCAPI int grpc_call_failed_before_recv_message(const grpc_call* c);
/** Ref a call.
THREAD SAFETY: grpc_call_ref is thread-compatible */
GRPCAPI void grpc_call_ref(grpc_call* call);

@ -72,6 +72,7 @@ class CoreCodegen final : public CoreCodegenInterface {
grpc_status_code status,
const char* description,
void* reserved) override;
int grpc_call_failed_before_recv_message(const grpc_call* c) override;
void grpc_call_ref(grpc_call* call) override;
void grpc_call_unref(grpc_call* call) override;
void* grpc_call_arena_alloc(grpc_call* call, size_t length) override;

@ -113,6 +113,7 @@ class CoreCodegenInterface {
grpc_status_code status,
const char* description,
void* reserved) = 0;
virtual int grpc_call_failed_before_recv_message(const grpc_call* c) = 0;
virtual void grpc_call_ref(grpc_call* call) = 0;
virtual void grpc_call_unref(grpc_call* call) = 0;
virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0;

@ -65,6 +65,8 @@ class ServerReader;
template <class W>
class ServerWriter;
extern CoreCodegenInterface* g_core_codegen_interface;
namespace internal {
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
@ -420,7 +422,12 @@ class ServerContextBase {
message_allocator_state_ = allocator_state;
}
void MaybeMarkCancelledOnRead();
void MaybeMarkCancelledOnRead() {
if (g_core_codegen_interface->grpc_call_failed_before_recv_message(
call_.call)) {
marked_cancelled_.store(true, std::memory_order_release);
}
}
struct CallWrapper {
~CallWrapper();

@ -609,7 +609,11 @@ class ServerReader final : public ServerReaderInterface<R> {
::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
if (!ok) {
ctx_->MaybeMarkCancelledOnRead();
}
return ok;
}
private:
@ -736,7 +740,11 @@ class ServerReaderWriterBody final {
::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
if (!ok) {
ctx_->MaybeMarkCancelledOnRead();
}
return ok;
}
bool Write(const W& msg, ::grpc::WriteOptions options) {

@ -2036,7 +2036,7 @@ bool grpc_call_is_trailers_only(const grpc_call* call) {
return result;
}
bool grpc_call_failed_before_recv_message(const grpc_call* c) {
int grpc_call_failed_before_recv_message(const grpc_call* c) {
return c->call_failed_before_recv_message;
}

@ -125,12 +125,6 @@ grpc_compression_algorithm grpc_call_compression_for_level(
Move to surface API if requested by other languages. */
bool grpc_call_is_trailers_only(const grpc_call* call);
/* Returns whether or not the call's receive message operation failed because of
* an error (as opposed to a graceful end-of-stream) */
/* TODO(markdroth): This is currently available only to the C++ API.
Move to surface API if requested by other languages. */
bool grpc_call_failed_before_recv_message(const grpc_call* c);
extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;

@ -118,6 +118,10 @@ grpc_call_error CoreCodegen::grpc_call_cancel_with_status(
void* reserved) {
return ::grpc_call_cancel_with_status(call, status, description, reserved);
}
int CoreCodegen::grpc_call_failed_before_recv_message(const grpc_call* c) {
return ::grpc_call_failed_before_recv_message(c);
}
void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); }
void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); }
void* CoreCodegen::grpc_call_arena_alloc(grpc_call* call, size_t length) {

@ -324,12 +324,6 @@ void ServerContextBase::TryCancel() const {
}
}
void ServerContextBase::MaybeMarkCancelledOnRead() {
if (grpc_call_failed_before_recv_message(call_.call)) {
marked_cancelled_.store(true, std::memory_order_release);
}
}
bool ServerContextBase::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
@ -341,7 +335,8 @@ bool ServerContextBase::IsCancelled() const {
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
// when using sync API, the result is always valid
return completion_op_ && completion_op_->CheckCancelled(cq_);
return marked_cancelled_.load(std::memory_order_acquire) ||
(completion_op_ && completion_op_->CheckCancelled(cq_));
}
}

@ -73,6 +73,7 @@ grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import;
grpc_channel_destroy_type grpc_channel_destroy_import;
grpc_call_cancel_type grpc_call_cancel_import;
grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import;
grpc_call_failed_before_recv_message_type grpc_call_failed_before_recv_message_import;
grpc_call_ref_type grpc_call_ref_import;
grpc_call_unref_type grpc_call_unref_import;
grpc_server_request_call_type grpc_server_request_call_import;
@ -361,6 +362,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy");
grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel");
grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status");
grpc_call_failed_before_recv_message_import = (grpc_call_failed_before_recv_message_type) GetProcAddress(library, "grpc_call_failed_before_recv_message");
grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref");
grpc_call_unref_import = (grpc_call_unref_type) GetProcAddress(library, "grpc_call_unref");
grpc_server_request_call_import = (grpc_server_request_call_type) GetProcAddress(library, "grpc_server_request_call");

@ -194,6 +194,9 @@ extern grpc_call_cancel_type grpc_call_cancel_import;
typedef grpc_call_error(*grpc_call_cancel_with_status_type)(grpc_call* call, grpc_status_code status, const char* description, void* reserved);
extern grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import;
#define grpc_call_cancel_with_status grpc_call_cancel_with_status_import
typedef int(*grpc_call_failed_before_recv_message_type)(const grpc_call* c);
extern grpc_call_failed_before_recv_message_type grpc_call_failed_before_recv_message_import;
#define grpc_call_failed_before_recv_message grpc_call_failed_before_recv_message_import
typedef void(*grpc_call_ref_type)(grpc_call* call);
extern grpc_call_ref_type grpc_call_ref_import;
#define grpc_call_ref grpc_call_ref_import

@ -119,6 +119,7 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_channel_destroy);
printf("%lx", (unsigned long) grpc_call_cancel);
printf("%lx", (unsigned long) grpc_call_cancel_with_status);
printf("%lx", (unsigned long) grpc_call_failed_before_recv_message);
printf("%lx", (unsigned long) grpc_call_ref);
printf("%lx", (unsigned long) grpc_call_unref);
printf("%lx", (unsigned long) grpc_server_request_call);

@ -377,6 +377,9 @@ class TestMultipleServiceImpl : public RpcService {
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata(
kClientTryCancelRequest, context->client_metadata(), 0));
EchoRequest request;
EchoResponse response;
@ -409,6 +412,10 @@ class TestMultipleServiceImpl : public RpcService {
}
}
if (client_try_cancel) {
EXPECT_TRUE(context->IsCancelled());
}
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;

Loading…
Cancel
Save