Merge pull request #18926 from soheilhy/slicebuffer-first

Introduce slice_buffer helper methods to avoid copies.
pull/19026/head
Soheil Hassas Yeganeh 6 years ago committed by GitHub
commit 801eed8387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      src/core/ext/transport/chttp2/transport/frame_data.cc
  2. 18
      src/core/lib/compression/stream_compression_gzip.cc
  3. 3
      src/core/lib/iomgr/tcp_posix.cc
  4. 10
      src/core/lib/security/transport/security_handshaker.cc
  5. 18
      src/core/lib/slice/slice_buffer.cc
  6. 16
      src/core/lib/slice/slice_internal.h
  7. 43
      test/core/slice/slice_buffer_test.cc

@ -104,23 +104,22 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
uint8_t* end = nullptr;
uint8_t* cur = nullptr;
grpc_slice slice = grpc_slice_buffer_take_first(slices);
beg = GRPC_SLICE_START_PTR(slice);
end = GRPC_SLICE_END_PTR(slice);
grpc_slice* slice = grpc_slice_buffer_peek_first(slices);
beg = GRPC_SLICE_START_PTR(*slice);
end = GRPC_SLICE_END_PTR(*slice);
cur = beg;
uint32_t message_flags;
char* msg;
if (cur == end) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return GRPC_ERROR_REF(p->error);
case GRPC_CHTTP2_DATA_FH_0:
s->stats.incoming.framing_bytes++;
@ -138,19 +137,19 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
static_cast<intptr_t>(s->id));
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
msg = grpc_dump_slice(*slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
/* fallthrough */
@ -159,7 +158,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
/* fallthrough */
@ -168,7 +167,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
/* fallthrough */
@ -177,7 +176,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
/* fallthrough */
@ -204,19 +203,18 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
p->state = GRPC_CHTTP2_DATA_FH_0;
}
s->pending_byte_stream = true;
if (cur != end) {
grpc_slice_buffer_undo_take_first(
slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)));
grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg));
} else {
grpc_slice_buffer_remove_first(slices);
}
grpc_slice_unref_internal(slice);
return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != nullptr);
GPR_ASSERT(slice_out != nullptr);
if (cur == end) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
continue;
}
uint32_t remaining = static_cast<uint32_t>(end - cur);
@ -224,32 +222,32 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
(error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return error;
}
if (GRPC_ERROR_NONE !=
(error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return error;
}
p->parsing_frame = nullptr;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
(error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
@ -257,30 +255,27 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (GRPC_ERROR_NONE !=
p->parsing_frame->Push(
grpc_slice_sub(
slice, static_cast<size_t>(cur - beg),
*slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(cur + p->frame_size - beg)),
slice_out)) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return error;
}
if (GRPC_ERROR_NONE !=
(error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(slices);
return error;
}
p->parsing_frame = nullptr;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
grpc_slice_buffer_undo_take_first(
slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)));
grpc_slice_unref_internal(slice);
grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg));
return GRPC_ERROR_NONE;
}
}
}
}
return GRPC_ERROR_NONE;
}

@ -53,25 +53,25 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx,
ctx->zs.avail_out = static_cast<uInt>(slice_size);
ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out);
while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) {
grpc_slice slice = grpc_slice_buffer_take_first(in);
ctx->zs.avail_in = static_cast<uInt> GRPC_SLICE_LENGTH(slice);
ctx->zs.next_in = GRPC_SLICE_START_PTR(slice);
grpc_slice* slice = grpc_slice_buffer_peek_first(in);
ctx->zs.avail_in = static_cast<uInt> GRPC_SLICE_LENGTH(*slice);
ctx->zs.next_in = GRPC_SLICE_START_PTR(*slice);
r = ctx->flate(&ctx->zs, Z_NO_FLUSH);
if (r < 0 && r != Z_BUF_ERROR) {
gpr_log(GPR_ERROR, "zlib error (%d)", r);
grpc_slice_unref_internal(slice_out);
grpc_slice_unref_internal(slice);
grpc_slice_buffer_remove_first(in);
return false;
} else if (r == Z_STREAM_END && ctx->flate == inflate) {
eoc = true;
}
if (ctx->zs.avail_in > 0) {
grpc_slice_buffer_undo_take_first(
in,
grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in,
GRPC_SLICE_LENGTH(slice)));
grpc_slice_buffer_sub_first(
in, GRPC_SLICE_LENGTH(*slice) - ctx->zs.avail_in,
GRPC_SLICE_LENGTH(*slice));
} else {
grpc_slice_buffer_remove_first(in);
}
grpc_slice_unref_internal(slice);
}
if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) {
GPR_ASSERT(in->length == 0);

@ -980,8 +980,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
// unref all and forget about all slices that have been written to this
// point
for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
grpc_slice_unref_internal(
grpc_slice_buffer_take_first(tcp->outgoing_buffer));
grpc_slice_buffer_remove_first(tcp->outgoing_buffer);
}
return false;
} else if (errno == EPIPE) {

@ -144,11 +144,11 @@ size_t SecurityHandshaker::MoveReadBufferIntoHandshakeBuffer() {
}
size_t offset = 0;
while (args_->read_buffer->count > 0) {
grpc_slice next_slice = grpc_slice_buffer_take_first(args_->read_buffer);
memcpy(handshake_buffer_ + offset, GRPC_SLICE_START_PTR(next_slice),
GRPC_SLICE_LENGTH(next_slice));
offset += GRPC_SLICE_LENGTH(next_slice);
grpc_slice_unref_internal(next_slice);
grpc_slice* next_slice = grpc_slice_buffer_peek_first(args_->read_buffer);
memcpy(handshake_buffer_ + offset, GRPC_SLICE_START_PTR(*next_slice),
GRPC_SLICE_LENGTH(*next_slice));
offset += GRPC_SLICE_LENGTH(*next_slice);
grpc_slice_buffer_remove_first(args_->read_buffer);
}
return bytes_in_read_buffer;
}

@ -375,6 +375,24 @@ grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer* sb) {
return slice;
}
void grpc_slice_buffer_remove_first(grpc_slice_buffer* sb) {
GPR_DEBUG_ASSERT(sb->count > 0);
sb->length -= GRPC_SLICE_LENGTH(sb->slices[0]);
grpc_slice_unref_internal(sb->slices[0]);
sb->slices++;
if (--sb->count == 0) {
sb->slices = sb->base_slices;
}
}
void grpc_slice_buffer_sub_first(grpc_slice_buffer* sb, size_t begin,
size_t end) {
// TODO(soheil): Introduce a ptr version for sub.
sb->length -= GRPC_SLICE_LENGTH(sb->slices[0]);
sb->slices[0] = grpc_slice_sub_no_ref(sb->slices[0], begin, end);
sb->length += end - begin;
}
void grpc_slice_buffer_undo_take_first(grpc_slice_buffer* sb,
grpc_slice slice) {
sb->slices--;

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <string.h>
@ -240,6 +242,20 @@ void grpc_slice_buffer_partial_unref_internal(grpc_slice_buffer* sb,
size_t idx);
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer* sb);
// Returns a pointer to the first slice in the slice buffer without giving
// ownership to or a reference count on that slice.
inline grpc_slice* grpc_slice_buffer_peek_first(grpc_slice_buffer* sb) {
GPR_DEBUG_ASSERT(sb->count > 0);
return &sb->slices[0];
}
// Removes the first slice from the slice buffer.
void grpc_slice_buffer_remove_first(grpc_slice_buffer* sb);
// Calls grpc_slice_sub with the given parameters on the first slice.
void grpc_slice_buffer_sub_first(grpc_slice_buffer* sb, size_t begin,
size_t end);
/* Check if a slice is interned */
bool grpc_slice_is_interned(const grpc_slice& slice);

@ -19,6 +19,7 @@
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/util/test_config.h"
void test_slice_buffer_add() {
@ -105,12 +106,54 @@ void test_slice_buffer_move_first() {
GPR_ASSERT(dst.length == dst_len);
}
void test_slice_buffer_first() {
grpc_slice slices[3];
slices[0] = grpc_slice_from_copied_string("aaa");
slices[1] = grpc_slice_from_copied_string("bbbb");
slices[2] = grpc_slice_from_copied_string("ccccc");
grpc_slice_buffer buf;
grpc_slice_buffer_init(&buf);
for (int idx = 0; idx < 3; ++idx) {
grpc_slice_ref(slices[idx]);
grpc_slice_buffer_add_indexed(&buf, slices[idx]);
}
grpc_slice* first = grpc_slice_buffer_peek_first(&buf);
GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[0]));
GPR_ASSERT(buf.count == 3);
GPR_ASSERT(buf.length == 12);
grpc_slice_buffer_sub_first(&buf, 1, 2);
first = grpc_slice_buffer_peek_first(&buf);
GPR_ASSERT(GPR_SLICE_LENGTH(*first) == 1);
GPR_ASSERT(buf.count == 3);
GPR_ASSERT(buf.length == 10);
grpc_slice_buffer_remove_first(&buf);
first = grpc_slice_buffer_peek_first(&buf);
GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[1]));
GPR_ASSERT(buf.count == 2);
GPR_ASSERT(buf.length == 9);
grpc_slice_buffer_remove_first(&buf);
first = grpc_slice_buffer_peek_first(&buf);
GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[2]));
GPR_ASSERT(buf.count == 1);
GPR_ASSERT(buf.length == 5);
grpc_slice_buffer_remove_first(&buf);
GPR_ASSERT(buf.count == 0);
GPR_ASSERT(buf.length == 0);
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
test_slice_buffer_add();
test_slice_buffer_move_first();
test_slice_buffer_first();
grpc_shutdown();
return 0;

Loading…
Cancel
Save