Move deframe_unprocessed_incoming_frames to frame_data.c

pull/9626/head
Muxi Yan 8 years ago
parent c702a7a85f
commit e899e32710
  1. 181
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 178
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 7
      src/core/ext/transport/chttp2/transport/frame_data.h

@ -44,6 +44,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
@ -179,10 +180,6 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static grpc_error *deframe_unprocessed_incoming_frames(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
grpc_slice_buffer *slices, grpc_slice *slice_out,
grpc_byte_stream **stream_out);
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
@ -2441,182 +2438,6 @@ static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
}
}
static grpc_error *deframe_unprocessed_incoming_frames(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
grpc_slice_buffer *slices, grpc_slice *slice_out,
grpc_byte_stream **stream_out) {
grpc_error *error = GRPC_ERROR_NONE;
grpc_chttp2_transport *t = s->t;
while (slices->count > 0) {
uint8_t *beg = NULL;
uint8_t *end = NULL;
uint8_t *cur = NULL;
grpc_slice slice = grpc_slice_buffer_take_first(slices);
beg = GRPC_SLICE_START_PTR(slice);
end = GRPC_SLICE_END_PTR(slice);
cur = beg;
uint32_t message_flags;
char *msg;
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
case GRPC_CHTTP2_DATA_FH_0:
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
p->is_frame_compressed = 0; /* GPR_FALSE */
break;
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)s->id);
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
GPR_ASSERT(stream_out != NULL);
GPR_ASSERT(p->parsing_frame == NULL);
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
*stream_out = &p->parsing_frame->base;
if (p->parsing_frame->remaining_bytes == 0) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
s->pending_byte_stream = true;
if (cur != end) {
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
}
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
if (GRPC_ERROR_NONE !=
(grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
}
}
}
return GRPC_ERROR_NONE;
}
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {

@ -112,6 +112,184 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s,
grpc_slice_buffer *slices,
grpc_slice *slice_out,
grpc_byte_stream **stream_out) {
grpc_error *error = GRPC_ERROR_NONE;
grpc_chttp2_transport *t = s->t;
while (slices->count > 0) {
uint8_t *beg = NULL;
uint8_t *end = NULL;
uint8_t *cur = NULL;
grpc_slice slice = grpc_slice_buffer_take_first(slices);
beg = GRPC_SLICE_START_PTR(slice);
end = GRPC_SLICE_END_PTR(slice);
cur = beg;
uint32_t message_flags;
char *msg;
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
case GRPC_CHTTP2_DATA_FH_0:
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
p->is_frame_compressed = 0; /* GPR_FALSE */
break;
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)s->id);
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
GPR_ASSERT(stream_out != NULL);
GPR_ASSERT(p->parsing_frame == NULL);
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
*stream_out = &p->parsing_frame->base;
if (p->parsing_frame->remaining_bytes == 0) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
s->pending_byte_stream = true;
if (cur != end) {
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
}
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
if (GRPC_ERROR_NONE !=
(grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
}
}
}
return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,

@ -101,4 +101,11 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf);
grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s,
grpc_slice_buffer *slices,
grpc_slice *slice_out,
grpc_byte_stream **stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */

Loading…
Cancel
Save