diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 6080a4bd1c4..3734c0150b7 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -104,23 +104,22 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( uint8_t* end = nullptr; uint8_t* cur = nullptr; - grpc_slice slice = grpc_slice_buffer_take_first(slices); - - beg = GRPC_SLICE_START_PTR(slice); - end = GRPC_SLICE_END_PTR(slice); + grpc_slice* slice = grpc_slice_buffer_peek_first(slices); + beg = GRPC_SLICE_START_PTR(*slice); + end = GRPC_SLICE_END_PTR(*slice); cur = beg; uint32_t message_flags; char* msg; if (cur == end) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } switch (p->state) { case GRPC_CHTTP2_DATA_ERROR: p->state = GRPC_CHTTP2_DATA_ERROR; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return GRPC_ERROR_REF(p->error); case GRPC_CHTTP2_DATA_FH_0: s->stats.incoming.framing_bytes++; @@ -138,19 +137,19 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, static_cast(s->id)); gpr_free(msg); - msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + msg = grpc_dump_slice(*slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, grpc_slice_from_copied_string(msg)); gpr_free(msg); p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); p->state = GRPC_CHTTP2_DATA_ERROR; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return GRPC_ERROR_REF(p->error); } if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_1; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } /* fallthrough */ @@ -159,7 +158,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( p->frame_size = (static_cast(*cur)) << 24; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_2; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } /* fallthrough */ @@ -168,7 +167,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( p->frame_size |= (static_cast(*cur)) << 16; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_3; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } /* fallthrough */ @@ -177,7 +176,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( p->frame_size |= (static_cast(*cur)) << 8; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_4; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } /* fallthrough */ @@ -204,19 +203,18 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( p->state = GRPC_CHTTP2_DATA_FH_0; } s->pending_byte_stream = true; - if (cur != end) { - grpc_slice_buffer_undo_take_first( - slices, grpc_slice_sub(slice, static_cast(cur - beg), - static_cast(end - beg))); + grpc_slice_buffer_sub_first(slices, static_cast(cur - beg), + static_cast(end - beg)); + } else { + grpc_slice_buffer_remove_first(slices); } - grpc_slice_unref_internal(slice); return GRPC_ERROR_NONE; case GRPC_CHTTP2_DATA_FRAME: { GPR_ASSERT(p->parsing_frame != nullptr); GPR_ASSERT(slice_out != nullptr); if (cur == end) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); continue; } uint32_t remaining = static_cast(end - cur); @@ -224,32 +222,32 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != (error = p->parsing_frame->Push( - grpc_slice_sub(slice, static_cast(cur - beg), + grpc_slice_sub(*slice, static_cast(cur - beg), static_cast(end - beg)), slice_out))) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return error; } if (GRPC_ERROR_NONE != (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return error; } p->parsing_frame = nullptr; p->state = GRPC_CHTTP2_DATA_FH_0; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return GRPC_ERROR_NONE; } else if (remaining < p->frame_size) { s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != (error = p->parsing_frame->Push( - grpc_slice_sub(slice, static_cast(cur - beg), + grpc_slice_sub(*slice, static_cast(cur - beg), static_cast(end - beg)), slice_out))) { return error; } p->frame_size -= remaining; - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return GRPC_ERROR_NONE; } else { GPR_ASSERT(remaining > p->frame_size); @@ -257,30 +255,27 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( if (GRPC_ERROR_NONE != p->parsing_frame->Push( grpc_slice_sub( - slice, static_cast(cur - beg), + *slice, static_cast(cur - beg), static_cast(cur + p->frame_size - beg)), slice_out)) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return error; } if (GRPC_ERROR_NONE != (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(slices); return error; } p->parsing_frame = nullptr; p->state = GRPC_CHTTP2_DATA_FH_0; cur += p->frame_size; - grpc_slice_buffer_undo_take_first( - slices, grpc_slice_sub(slice, static_cast(cur - beg), - static_cast(end - beg))); - grpc_slice_unref_internal(slice); + grpc_slice_buffer_sub_first(slices, static_cast(cur - beg), + static_cast(end - beg)); return GRPC_ERROR_NONE; } } } } - return GRPC_ERROR_NONE; } diff --git a/src/core/lib/compression/stream_compression_gzip.cc b/src/core/lib/compression/stream_compression_gzip.cc index bffdb1fd17d..452b22b7628 100644 --- a/src/core/lib/compression/stream_compression_gzip.cc +++ b/src/core/lib/compression/stream_compression_gzip.cc @@ -53,25 +53,25 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, ctx->zs.avail_out = static_cast(slice_size); ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { - grpc_slice slice = grpc_slice_buffer_take_first(in); - ctx->zs.avail_in = static_cast GRPC_SLICE_LENGTH(slice); - ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); + grpc_slice* slice = grpc_slice_buffer_peek_first(in); + ctx->zs.avail_in = static_cast GRPC_SLICE_LENGTH(*slice); + ctx->zs.next_in = GRPC_SLICE_START_PTR(*slice); r = ctx->flate(&ctx->zs, Z_NO_FLUSH); if (r < 0 && r != Z_BUF_ERROR) { gpr_log(GPR_ERROR, "zlib error (%d)", r); grpc_slice_unref_internal(slice_out); - grpc_slice_unref_internal(slice); + grpc_slice_buffer_remove_first(in); return false; } else if (r == Z_STREAM_END && ctx->flate == inflate) { eoc = true; } if (ctx->zs.avail_in > 0) { - grpc_slice_buffer_undo_take_first( - in, - grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, - GRPC_SLICE_LENGTH(slice))); + grpc_slice_buffer_sub_first( + in, GRPC_SLICE_LENGTH(*slice) - ctx->zs.avail_in, + GRPC_SLICE_LENGTH(*slice)); + } else { + grpc_slice_buffer_remove_first(in); } - grpc_slice_unref_internal(slice); } if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { GPR_ASSERT(in->length == 0); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b9376b3ed0a..cda7bba0d3a 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -980,8 +980,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { // unref all and forget about all slices that have been written to this // point for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { - grpc_slice_unref_internal( - grpc_slice_buffer_take_first(tcp->outgoing_buffer)); + grpc_slice_buffer_remove_first(tcp->outgoing_buffer); } return false; } else if (errno == EPIPE) { diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 3605bbe5974..fdc64727b96 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -144,11 +144,11 @@ size_t SecurityHandshaker::MoveReadBufferIntoHandshakeBuffer() { } size_t offset = 0; while (args_->read_buffer->count > 0) { - grpc_slice next_slice = grpc_slice_buffer_take_first(args_->read_buffer); - memcpy(handshake_buffer_ + offset, GRPC_SLICE_START_PTR(next_slice), - GRPC_SLICE_LENGTH(next_slice)); - offset += GRPC_SLICE_LENGTH(next_slice); - grpc_slice_unref_internal(next_slice); + grpc_slice* next_slice = grpc_slice_buffer_peek_first(args_->read_buffer); + memcpy(handshake_buffer_ + offset, GRPC_SLICE_START_PTR(*next_slice), + GRPC_SLICE_LENGTH(*next_slice)); + offset += GRPC_SLICE_LENGTH(*next_slice); + grpc_slice_buffer_remove_first(args_->read_buffer); } return bytes_in_read_buffer; } diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 111d3c9578e..e1929250d4a 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -375,6 +375,24 @@ grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer* sb) { return slice; } +void grpc_slice_buffer_remove_first(grpc_slice_buffer* sb) { + GPR_DEBUG_ASSERT(sb->count > 0); + sb->length -= GRPC_SLICE_LENGTH(sb->slices[0]); + grpc_slice_unref_internal(sb->slices[0]); + sb->slices++; + if (--sb->count == 0) { + sb->slices = sb->base_slices; + } +} + +void grpc_slice_buffer_sub_first(grpc_slice_buffer* sb, size_t begin, + size_t end) { + // TODO(soheil): Introduce a ptr version for sub. + sb->length -= GRPC_SLICE_LENGTH(sb->slices[0]); + sb->slices[0] = grpc_slice_sub_no_ref(sb->slices[0], begin, end); + sb->length += end - begin; +} + void grpc_slice_buffer_undo_take_first(grpc_slice_buffer* sb, grpc_slice slice) { sb->slices--; diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index a9f6087e11f..e4aba015315 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -21,6 +21,8 @@ #include +#include + #include #include #include @@ -240,6 +242,20 @@ void grpc_slice_buffer_partial_unref_internal(grpc_slice_buffer* sb, size_t idx); void grpc_slice_buffer_destroy_internal(grpc_slice_buffer* sb); +// Returns a pointer to the first slice in the slice buffer without giving +// ownership to or a reference count on that slice. +inline grpc_slice* grpc_slice_buffer_peek_first(grpc_slice_buffer* sb) { + GPR_DEBUG_ASSERT(sb->count > 0); + return &sb->slices[0]; +} + +// Removes the first slice from the slice buffer. +void grpc_slice_buffer_remove_first(grpc_slice_buffer* sb); + +// Calls grpc_slice_sub with the given parameters on the first slice. +void grpc_slice_buffer_sub_first(grpc_slice_buffer* sb, size_t begin, + size_t end); + /* Check if a slice is interned */ bool grpc_slice_is_interned(const grpc_slice& slice); diff --git a/test/core/slice/slice_buffer_test.cc b/test/core/slice/slice_buffer_test.cc index b53e3312df1..7d4acfed003 100644 --- a/test/core/slice/slice_buffer_test.cc +++ b/test/core/slice/slice_buffer_test.cc @@ -19,6 +19,7 @@ #include #include #include +#include "src/core/lib/slice/slice_internal.h" #include "test/core/util/test_config.h" void test_slice_buffer_add() { @@ -105,12 +106,54 @@ void test_slice_buffer_move_first() { GPR_ASSERT(dst.length == dst_len); } +void test_slice_buffer_first() { + grpc_slice slices[3]; + slices[0] = grpc_slice_from_copied_string("aaa"); + slices[1] = grpc_slice_from_copied_string("bbbb"); + slices[2] = grpc_slice_from_copied_string("ccccc"); + + grpc_slice_buffer buf; + grpc_slice_buffer_init(&buf); + for (int idx = 0; idx < 3; ++idx) { + grpc_slice_ref(slices[idx]); + grpc_slice_buffer_add_indexed(&buf, slices[idx]); + } + + grpc_slice* first = grpc_slice_buffer_peek_first(&buf); + GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[0])); + GPR_ASSERT(buf.count == 3); + GPR_ASSERT(buf.length == 12); + + grpc_slice_buffer_sub_first(&buf, 1, 2); + first = grpc_slice_buffer_peek_first(&buf); + GPR_ASSERT(GPR_SLICE_LENGTH(*first) == 1); + GPR_ASSERT(buf.count == 3); + GPR_ASSERT(buf.length == 10); + + grpc_slice_buffer_remove_first(&buf); + first = grpc_slice_buffer_peek_first(&buf); + GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[1])); + GPR_ASSERT(buf.count == 2); + GPR_ASSERT(buf.length == 9); + + grpc_slice_buffer_remove_first(&buf); + first = grpc_slice_buffer_peek_first(&buf); + GPR_ASSERT(GPR_SLICE_LENGTH(*first) == GPR_SLICE_LENGTH(slices[2])); + GPR_ASSERT(buf.count == 1); + GPR_ASSERT(buf.length == 5); + + grpc_slice_buffer_remove_first(&buf); + GPR_ASSERT(buf.count == 0); + GPR_ASSERT(buf.length == 0); +} + int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); test_slice_buffer_add(); test_slice_buffer_move_first(); + test_slice_buffer_first(); grpc_shutdown(); return 0;