From 567e0f1eb62f361f845d5aba7f7b502e98d5b9d6 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 29 Aug 2017 09:21:22 -0700 Subject: [PATCH 1/4] Have write ops take ownership over slices --- include/grpc++/impl/codegen/call.h | 12 ++++++++---- include/grpc++/impl/codegen/core_codegen.h | 1 + include/grpc++/impl/codegen/core_codegen_interface.h | 1 + src/core/lib/transport/byte_stream.c | 1 + src/cpp/common/core_codegen.cc | 4 ++++ test/core/end2end/tests/resource_quota_server.c | 11 ++++++----- 6 files changed, 21 insertions(+), 9 deletions(-) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 0cb11b4ccad..3c30ccdb1e2 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -272,7 +272,7 @@ class CallOpSendInitialMetadata { class CallOpSendMessage { public: - CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + CallOpSendMessage() : send_buf_(nullptr) {} /// Send \a message using \a options for the write. The \a options are cleared /// after use. @@ -295,20 +295,24 @@ class CallOpSendMessage { write_options_.Clear(); } void FinishOp(bool* status) { - if (own_buf_) g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_); + g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_); send_buf_ = nullptr; } private: grpc_byte_buffer* send_buf_; WriteOptions write_options_; - bool own_buf_; }; template Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; - return SerializationTraits::Serialize(message, &send_buf_, &own_buf_); + bool own_buf; + Status result = SerializationTraits::Serialize(message, &send_buf_, &own_buf); + if (!own_buf) { + send_buf_ = g_core_codegen_interface->grpc_byte_buffer_copy(send_buf_); + } + return result; } template diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 2b15a018455..5f96c8345b0 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -68,6 +68,7 @@ class CoreCodegen final : public CoreCodegenInterface { void grpc_call_unref(grpc_call* call) override; virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override; + grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) override; void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index b4c771ac93a..7556016f27d 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -74,6 +74,7 @@ class CoreCodegenInterface { virtual void gpr_cv_signal(gpr_cv* cv) = 0; virtual void gpr_cv_broadcast(gpr_cv* cv) = 0; + virtual grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) = 0; virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index fb03a10315b..08f61629a92 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer); GRPC_ERROR_UNREF(stream->shutdown_error); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index c7c6b6b13b4..e81509904f1 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); } void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); } +grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer *bb) { + return ::grpc_byte_buffer_copy(bb); +} + void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 010e20c4c24..34a6a80a314 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -143,6 +143,8 @@ void resource_quota_server(grpc_end2end_test_config config) { malloc(sizeof(grpc_call_details) * NUM_CALLS); grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS); grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS); + grpc_byte_buffer **request_payload = + malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); grpc_byte_buffer **request_payload_recv = malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); int *was_cancelled = malloc(sizeof(int) * NUM_CALLS); @@ -156,9 +158,6 @@ void resource_quota_server(grpc_end2end_test_config config) { int deadline_exceeded = 0; int unavailable = 0; - grpc_byte_buffer *request_payload = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_op ops[6]; grpc_op *op; @@ -167,6 +166,7 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_metadata_array_init(&trailing_metadata_recv[i]); grpc_metadata_array_init(&request_metadata_recv[i]); grpc_call_details_init(&call_details[i]); + request_payload[i] = grpc_raw_byte_buffer_create(&request_payload_slice, 1); request_payload_recv[i] = NULL; was_cancelled[i] = 0; } @@ -195,7 +195,7 @@ void resource_quota_server(grpc_end2end_test_config config) { op->reserved = NULL; op++; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = request_payload; + op->data.send_message.send_message = request_payload[i]; op->flags = 0; op->reserved = NULL; op++; @@ -261,6 +261,7 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]); grpc_call_unref(client_calls[call_id]); grpc_slice_unref(details[call_id]); + grpc_byte_buffer_destroy(request_payload[call_id]); pending_client_calls--; } else if (ev_tag < SERVER_RECV_BASE_TAG) { @@ -351,7 +352,6 @@ void resource_quota_server(grpc_end2end_test_config config) { NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client, deadline_exceeded, unavailable); - grpc_byte_buffer_destroy(request_payload); grpc_slice_unref(request_payload_slice); grpc_resource_quota_unref(resource_quota); @@ -366,6 +366,7 @@ void resource_quota_server(grpc_end2end_test_config config) { free(call_details); free(status); free(details); + free(request_payload); free(request_payload_recv); free(was_cancelled); } From 951971881165fad65f99f7b0ca821eea0ce25d23 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 29 Aug 2017 11:31:00 -0700 Subject: [PATCH 2/4] Documentation change + e2e test fix --- src/core/lib/transport/byte_stream.h | 4 +++- test/core/end2end/tests/cancel_after_round_trip.c | 11 +++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 1e1e8310b83..be2a35213e3 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -81,7 +81,9 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, // grpc_slice_buffer_stream // -// A grpc_byte_stream that wraps a slice buffer. +// A grpc_byte_stream that wraps a slice buffer. The stream takes +// ownership of the slices in the buffer, and on destruction will +// reset the contents of the buffer. typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; diff --git a/test/core/end2end/tests/cancel_after_round_trip.c b/test/core/end2end/tests/cancel_after_round_trip.c index 0fc8b95ef72..ad24b4e5387 100644 --- a/test/core/end2end/tests/cancel_after_round_trip.c +++ b/test/core/end2end/tests/cancel_after_round_trip.c @@ -114,7 +114,9 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config, grpc_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_byte_buffer *response_payload = + grpc_byte_buffer *response_payload1 = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer *response_payload2 = grpc_raw_byte_buffer_create(&response_payload_slice, 1); int was_cancelled = 2; @@ -199,7 +201,7 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config, op->reserved = NULL; op++; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = response_payload; + op->data.send_message.send_message = response_payload1; op->flags = 0; op->reserved = NULL; op++; @@ -242,7 +244,7 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config, op->reserved = NULL; op++; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = response_payload; + op->data.send_message.send_message = response_payload2; op->flags = 0; op->reserved = NULL; op++; @@ -262,7 +264,8 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config, grpc_call_details_destroy(&call_details); grpc_byte_buffer_destroy(request_payload); - grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(response_payload1); + grpc_byte_buffer_destroy(response_payload2); grpc_byte_buffer_destroy(request_payload_recv); grpc_byte_buffer_destroy(response_payload_recv); grpc_slice_unref(details); From 4754398f811a2108eb3327e6819efedb16866312 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 30 Aug 2017 08:31:38 -0700 Subject: [PATCH 3/4] Clang_format --- include/grpc++/impl/codegen/call.h | 3 ++- include/grpc++/impl/codegen/core_codegen.h | 2 +- include/grpc++/impl/codegen/core_codegen_interface.h | 2 +- src/cpp/common/core_codegen.cc | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 3c30ccdb1e2..8e70225f86a 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -308,7 +308,8 @@ template Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; bool own_buf; - Status result = SerializationTraits::Serialize(message, &send_buf_, &own_buf); + Status result = + SerializationTraits::Serialize(message, &send_buf_, &own_buf); if (!own_buf) { send_buf_ = g_core_codegen_interface->grpc_byte_buffer_copy(send_buf_); } diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 5f96c8345b0..c751c1e7347 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -68,7 +68,7 @@ class CoreCodegen final : public CoreCodegenInterface { void grpc_call_unref(grpc_call* call) override; virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override; - grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) override; + grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) override; void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 7556016f27d..a4c50dab873 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -74,7 +74,7 @@ class CoreCodegenInterface { virtual void gpr_cv_signal(gpr_cv* cv) = 0; virtual void gpr_cv_broadcast(gpr_cv* cv) = 0; - virtual grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) = 0; + virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0; virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index e81509904f1..6ea5f1d3c76 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -89,7 +89,7 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); } void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); } -grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer *bb) { +grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) { return ::grpc_byte_buffer_copy(bb); } From 08b79c1611a1d8ae2f19937676aa0ad9df8c230b Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 30 Aug 2017 08:36:59 -0700 Subject: [PATCH 4/4] Doc UPdate --- include/grpc/impl/codegen/grpc_types.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 9079506463c..59b90af03a3 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -511,6 +511,11 @@ typedef struct grpc_op { } maybe_stream_compression_level; } send_initial_metadata; struct grpc_op_send_message { + /** This op takes ownership of the slices in send_message. After + * a call completes, the contents of send_message are not guaranteed + * and likely empty. The original owner should still call + * grpc_byte_buffer_destroy() on this object however. + */ struct grpc_byte_buffer *send_message; } send_message; struct grpc_op_send_status_from_server {