Merge pull request #12306 from kpayson64/ref_counting_slices

Give ownership of byte buffers to write ops
pull/11873/head^2
kpayson64 8 years ago committed by GitHub
commit 451b92a754
  1. 13
      include/grpc++/impl/codegen/call.h
  2. 1
      include/grpc++/impl/codegen/core_codegen.h
  3. 1
      include/grpc++/impl/codegen/core_codegen_interface.h
  4. 5
      include/grpc/impl/codegen/grpc_types.h
  5. 1
      src/core/lib/transport/byte_stream.c
  6. 4
      src/core/lib/transport/byte_stream.h
  7. 4
      src/cpp/common/core_codegen.cc
  8. 11
      test/core/end2end/tests/cancel_after_round_trip.c
  9. 11
      test/core/end2end/tests/resource_quota_server.c

@ -272,7 +272,7 @@ class CallOpSendInitialMetadata {
class CallOpSendMessage {
public:
CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
CallOpSendMessage() : send_buf_(nullptr) {}
/// Send \a message using \a options for the write. The \a options are cleared
/// after use.
@ -295,20 +295,25 @@ class CallOpSendMessage {
write_options_.Clear();
}
void FinishOp(bool* status) {
if (own_buf_) g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_);
g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_);
send_buf_ = nullptr;
}
private:
grpc_byte_buffer* send_buf_;
WriteOptions write_options_;
bool own_buf_;
};
template <class M>
Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
bool own_buf;
Status result =
SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf);
if (!own_buf) {
send_buf_ = g_core_codegen_interface->grpc_byte_buffer_copy(send_buf_);
}
return result;
}
template <class M>

@ -68,6 +68,7 @@ class CoreCodegen final : public CoreCodegenInterface {
void grpc_call_unref(grpc_call* call) override;
virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override;
grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) override;
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,

@ -74,6 +74,7 @@ class CoreCodegenInterface {
virtual void gpr_cv_signal(gpr_cv* cv) = 0;
virtual void gpr_cv_broadcast(gpr_cv* cv) = 0;
virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0;
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,

@ -511,6 +511,11 @@ typedef struct grpc_op {
} maybe_stream_compression_level;
} send_initial_metadata;
struct grpc_op_send_message {
/** This op takes ownership of the slices in send_message. After
* a call completes, the contents of send_message are not guaranteed
* and likely empty. The original owner should still call
* grpc_byte_buffer_destroy() on this object however.
*/
struct grpc_byte_buffer *send_message;
} send_message;
struct grpc_op_send_status_from_server {

@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
}

@ -81,7 +81,9 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
// grpc_slice_buffer_stream
//
// A grpc_byte_stream that wraps a slice buffer.
// A grpc_byte_stream that wraps a slice buffer. The stream takes
// ownership of the slices in the buffer, and on destruction will
// reset the contents of the buffer.
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;

@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu,
void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); }
void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); }
grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
return ::grpc_byte_buffer_copy(bb);
}
void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}

@ -114,7 +114,9 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config,
grpc_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer *response_payload1 =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer *response_payload2 =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
int was_cancelled = 2;
@ -199,7 +201,7 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config,
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op->data.send_message.send_message = response_payload1;
op->flags = 0;
op->reserved = NULL;
op++;
@ -242,7 +244,7 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config,
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op->data.send_message.send_message = response_payload2;
op->flags = 0;
op->reserved = NULL;
op++;
@ -262,7 +264,8 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config,
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(response_payload1);
grpc_byte_buffer_destroy(response_payload2);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_slice_unref(details);

@ -143,6 +143,8 @@ void resource_quota_server(grpc_end2end_test_config config) {
malloc(sizeof(grpc_call_details) * NUM_CALLS);
grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS);
grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS);
grpc_byte_buffer **request_payload =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
grpc_byte_buffer **request_payload_recv =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
int *was_cancelled = malloc(sizeof(int) * NUM_CALLS);
@ -156,9 +158,6 @@ void resource_quota_server(grpc_end2end_test_config config) {
int deadline_exceeded = 0;
int unavailable = 0;
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_op ops[6];
grpc_op *op;
@ -167,6 +166,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_metadata_array_init(&trailing_metadata_recv[i]);
grpc_metadata_array_init(&request_metadata_recv[i]);
grpc_call_details_init(&call_details[i]);
request_payload[i] = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
request_payload_recv[i] = NULL;
was_cancelled[i] = 0;
}
@ -195,7 +195,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->data.send_message.send_message = request_payload[i];
op->flags = 0;
op->reserved = NULL;
op++;
@ -261,6 +261,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]);
grpc_call_unref(client_calls[call_id]);
grpc_slice_unref(details[call_id]);
grpc_byte_buffer_destroy(request_payload[call_id]);
pending_client_calls--;
} else if (ev_tag < SERVER_RECV_BASE_TAG) {
@ -351,7 +352,6 @@ void resource_quota_server(grpc_end2end_test_config config) {
NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client,
deadline_exceeded, unavailable);
grpc_byte_buffer_destroy(request_payload);
grpc_slice_unref(request_payload_slice);
grpc_resource_quota_unref(resource_quota);
@ -366,6 +366,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
free(call_details);
free(status);
free(details);
free(request_payload);
free(request_payload_recv);
free(was_cancelled);
}

Loading…
Cancel
Save