Progress on splitting things up

pull/2149/head
Craig Tiller 10 years ago
parent 4aa71a1774
commit 5dc3b30964
  1. 6
      BUILD
  2. 2
      Makefile
  3. 2
      build.json
  4. 3
      src/core/transport/chttp2/hpack_parser.c
  5. 144
      src/core/transport/chttp2/incoming_metadata.c
  6. 61
      src/core/transport/chttp2/incoming_metadata.h
  7. 55
      src/core/transport/chttp2/internal.h
  8. 127
      src/core/transport/chttp2/parsing.c
  9. 101
      src/core/transport/chttp2/stream_lists.c
  10. 4
      src/core/transport/chttp2/writing.c
  11. 321
      src/core/transport/chttp2_transport.c
  12. 2
      tools/doxygen/Doxyfile.core.internal
  13. 4
      vsprojects/grpc/grpc.vcxproj
  14. 9
      vsprojects/grpc/grpc.vcxproj.filters
  15. 4
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  16. 9
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -203,6 +203,7 @@ cc_library(
"src/core/surface/surface_trace.h",
"src/core/transport/chttp2/alpn.h",
"src/core/transport/chttp2/bin_encoder.h",
"src/core/transport/chttp2/frame.h",
"src/core/transport/chttp2/frame_data.h",
"src/core/transport/chttp2/frame_goaway.h",
"src/core/transport/chttp2/frame_ping.h",
@ -213,6 +214,7 @@ cc_library(
"src/core/transport/chttp2/hpack_table.h",
"src/core/transport/chttp2/http2_errors.h",
"src/core/transport/chttp2/huffsyms.h",
"src/core/transport/chttp2/incoming_metadata.h",
"src/core/transport/chttp2/internal.h",
"src/core/transport/chttp2/status_conversion.h",
"src/core/transport/chttp2/stream_encoder.h",
@ -331,6 +333,7 @@ cc_library(
"src/core/transport/chttp2/hpack_parser.c",
"src/core/transport/chttp2/hpack_table.c",
"src/core/transport/chttp2/huffsyms.c",
"src/core/transport/chttp2/incoming_metadata.c",
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",
@ -428,6 +431,7 @@ cc_library(
"src/core/surface/surface_trace.h",
"src/core/transport/chttp2/alpn.h",
"src/core/transport/chttp2/bin_encoder.h",
"src/core/transport/chttp2/frame.h",
"src/core/transport/chttp2/frame_data.h",
"src/core/transport/chttp2/frame_goaway.h",
"src/core/transport/chttp2/frame_ping.h",
@ -438,6 +442,7 @@ cc_library(
"src/core/transport/chttp2/hpack_table.h",
"src/core/transport/chttp2/http2_errors.h",
"src/core/transport/chttp2/huffsyms.h",
"src/core/transport/chttp2/incoming_metadata.h",
"src/core/transport/chttp2/internal.h",
"src/core/transport/chttp2/status_conversion.h",
"src/core/transport/chttp2/stream_encoder.h",
@ -534,6 +539,7 @@ cc_library(
"src/core/transport/chttp2/hpack_parser.c",
"src/core/transport/chttp2/hpack_table.c",
"src/core/transport/chttp2/huffsyms.c",
"src/core/transport/chttp2/incoming_metadata.c",
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",

@ -3029,6 +3029,7 @@ LIBGRPC_SRC = \
src/core/transport/chttp2/hpack_parser.c \
src/core/transport/chttp2/hpack_table.c \
src/core/transport/chttp2/huffsyms.c \
src/core/transport/chttp2/incoming_metadata.c \
src/core/transport/chttp2/parsing.c \
src/core/transport/chttp2/status_conversion.c \
src/core/transport/chttp2/stream_encoder.c \
@ -3277,6 +3278,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/chttp2/hpack_parser.c \
src/core/transport/chttp2/hpack_table.c \
src/core/transport/chttp2/huffsyms.c \
src/core/transport/chttp2/incoming_metadata.c \
src/core/transport/chttp2/parsing.c \
src/core/transport/chttp2/status_conversion.c \
src/core/transport/chttp2/stream_encoder.c \

@ -176,6 +176,7 @@
"src/core/transport/chttp2/hpack_table.h",
"src/core/transport/chttp2/http2_errors.h",
"src/core/transport/chttp2/huffsyms.h",
"src/core/transport/chttp2/incoming_metadata.h",
"src/core/transport/chttp2/internal.h",
"src/core/transport/chttp2/status_conversion.h",
"src/core/transport/chttp2/stream_encoder.h",
@ -272,6 +273,7 @@
"src/core/transport/chttp2/hpack_parser.c",
"src/core/transport/chttp2/hpack_table.c",
"src/core/transport/chttp2/huffsyms.c",
"src/core/transport/chttp2/incoming_metadata.c",
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",

@ -1392,7 +1392,8 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (parser->is_boundary) {
grpc_chttp2_parsing_add_metadata_batch(transport_parsing, stream_parsing);
grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
&stream_parsing->incoming_metadata, &stream_parsing->data_parser.incoming_sopb);
}
if (parser->is_eof) {
stream_parsing->received_close = 1;

@ -0,0 +1,144 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/transport/chttp2/incoming_metadata.h"
#include <string.h>
#include "src/core/transport/chttp2/internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer) {
buffer->deadline = gpr_inf_future;
}
void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem *elem) {
if (buffer->capacity == buffer->count) {
buffer->capacity =
GPR_MAX(8, 2 * buffer->capacity);
buffer->elems =
gpr_realloc(buffer->elems,
sizeof(*buffer->elems) *
buffer->capacity);
}
buffer->elems[buffer->count++]
.md = elem;
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
buffer->deadline = deadline;
}
#if 0
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_metadata_batch b;
b.list.head = NULL;
/* Store away the last element of the list, so that in patch_metadata_ops
we can reconstitute the list.
We can't do list building here as later incoming metadata may reallocate
the underlying array. */
b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = stream_parsing->incoming_deadline;
stream_parsing->incoming_deadline = gpr_inf_future;
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
#endif
#if 0
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_stream_op *ops = stream_global->incoming_sopb->ops;
size_t nops = stream_global->incoming_sopb->nops;
size_t i;
size_t j;
size_t mdidx = 0;
size_t last_mdidx;
int found_metadata = 0;
/* rework the array of metadata into a linked list, making use
of the breadcrumbs we left in metadata batches during
add_metadata_batch */
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
found_metadata = 1;
/* we left a breadcrumb indicating where the end of this list is,
and since we add sequentially, we know from the end of the last
segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
op->data.metadata.list.tail =
&stream_parsing->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
stream_parsing->incoming_metadata[j].prev =
&stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next =
&stream_parsing->incoming_metadata[j];
}
stream_parsing->incoming_metadata[mdidx].prev = NULL;
stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
/* track where we're up to */
mdidx = last_mdidx;
}
if (found_metadata) {
stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata;
if (mdidx != stream_parsing->incoming_metadata_count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
size_t copy_bytes =
sizeof(*stream_parsing->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(stream_parsing->old_incoming_metadata + mdidx,
stream_parsing->incoming_metadata, copy_bytes);
stream_parsing->incoming_metadata_count =
stream_parsing->incoming_metadata_capacity = new_count;
} else {
stream_parsing->incoming_metadata = NULL;
stream_parsing->incoming_metadata_count = 0;
stream_parsing->incoming_metadata_capacity = 0;
}
}
}
#endif

@ -0,0 +1,61 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H
#define GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H
#include "src/core/transport/transport.h"
typedef struct {
grpc_linked_mdelem *elems;
size_t count;
size_t capacity;
gpr_timespec deadline;
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
/** extend sopb with a metadata batch; this must be post-processed by
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed
out of the transport */
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
#endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */

@ -43,9 +43,10 @@
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@ -53,6 +54,12 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
GRPC_CHTTP2_LIST_WRITABLE,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
#if 0
/* streams that have pending writes */
WRITABLE = 0,
/* streams that have been selected to be written */
@ -74,6 +81,7 @@ typedef enum {
PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
NEW_OUTGOING_WINDOW,
#endif
STREAM_LIST_COUNT /* must be last */
} grpc_chttp2_stream_list_id;
@ -404,6 +412,10 @@ typedef struct {
grpc_chttp2_write_state write_state;
/** is this stream closed (boolean) */
gpr_uint8 read_closed;
/** has this stream been cancelled? (boolean) */
gpr_uint8 cancelled;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
/** stream state already published to the upper layer */
grpc_stream_state published_state;
@ -411,6 +423,9 @@ typedef struct {
grpc_stream_state *publish_state;
/** pointer to sop buffer to fill in with new stream ops */
grpc_stream_op_buffer *incoming_sopb;
/** incoming metadata */
grpc_chttp2_incoming_metadata_buffer incoming_metadata;
} grpc_chttp2_stream_global;
typedef struct {
@ -427,8 +442,6 @@ struct grpc_chttp2_stream_parsing {
gpr_uint32 id;
/** has this stream received a close */
gpr_uint8 received_close;
/** saw an error on this stream during parsing (it should be cancelled) */
gpr_uint8 saw_error;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
/** incoming_window has been reduced by this much during parsing */
@ -442,12 +455,16 @@ struct grpc_chttp2_stream_parsing {
/* amount of window given */
gpr_uint64 outgoing_window_update;
/* incoming metadata */
/** incoming metadata */
grpc_chttp2_incoming_metadata_buffer incoming_metadata;
/*
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
*/
};
struct grpc_chttp2_stream {
@ -542,28 +559,44 @@ int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
void grpc_chttp2_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_incoming_window_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing);
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text);
#define GRPC_CHTTP2_FLOW_CTL_TRACE(a, b, c, d, e) \
do { \
} while (0)
void grpc_chttp2_remove_from_stream_map(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_flowctl_trace(grpc_chttp2_transport *t, const char *flow,
gpr_int32 window, gpr_uint32 id, gpr_int32 delta);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \

@ -35,6 +35,7 @@
#include <string.h>
#include "src/core/transport/chttp2/http2_errors.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
#include <grpc/support/alloc.h>
@ -155,6 +156,16 @@ void grpc_chttp2_publish_reads(
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
/* updating closed status */
if (stream_parsing->received_close) {
stream_global->read_closed = 1;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
}
if (stream_parsing->saw_rst_stream) {
stream_global->cancelled = 1;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
}
}
}
@ -437,11 +448,6 @@ static grpc_chttp2_parse_error update_incoming_window(
return GRPC_CHTTP2_CONNECTION_ERROR;
}
GRPC_CHTTP2_FLOW_CTL_TRACE(
t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOW_CTL_TRACE(
t, s, incoming, s->global.id,
-(gpr_int64)transport_parsing->incoming_frame_size);
transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
stream_parsing->incoming_window_delta +=
@ -474,7 +480,12 @@ static int init_data_frame_parser(
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
stream_parsing->received_close = 1;
stream_parsing->saw_error = 1;
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(
transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
return init_skip_frame_parser(transport_parsing, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
@ -486,21 +497,6 @@ static int init_data_frame_parser(
static void free_timeout(void *p) { gpr_free(p); }
static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing,
grpc_mdelem *elem) {
if (stream_parsing->incoming_metadata_capacity ==
stream_parsing->incoming_metadata_count) {
stream_parsing->incoming_metadata_capacity =
GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity);
stream_parsing->incoming_metadata =
gpr_realloc(stream_parsing->incoming_metadata,
sizeof(*stream_parsing->incoming_metadata) *
stream_parsing->incoming_metadata_capacity);
}
stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++]
.md = elem;
}
static void on_header(void *tp, grpc_mdelem *md) {
grpc_chttp2_transport_parsing *transport_parsing = tp;
grpc_chttp2_stream_parsing *stream_parsing =
@ -526,11 +522,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
stream_parsing->incoming_deadline =
gpr_time_add(gpr_now(), *cached_timeout);
grpc_chttp2_incoming_metadata_buffer_set_deadline(&stream_parsing->incoming_metadata, gpr_time_add(gpr_now(), *cached_timeout));
grpc_mdelem_unref(md);
} else {
add_incoming_metadata(stream_parsing, md);
grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, md);
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
@ -694,83 +689,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
}
*/
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_metadata_batch b;
b.list.head = NULL;
/* Store away the last element of the list, so that in patch_metadata_ops
we can reconstitute the list.
We can't do list building here as later incoming metadata may reallocate
the underlying array. */
b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = stream_parsing->incoming_deadline;
stream_parsing->incoming_deadline = gpr_inf_future;
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_stream_op *ops = stream_global->incoming_sopb->ops;
size_t nops = stream_global->incoming_sopb->nops;
size_t i;
size_t j;
size_t mdidx = 0;
size_t last_mdidx;
int found_metadata = 0;
/* rework the array of metadata into a linked list, making use
of the breadcrumbs we left in metadata batches during
add_metadata_batch */
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
found_metadata = 1;
/* we left a breadcrumb indicating where the end of this list is,
and since we add sequentially, we know from the end of the last
segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
op->data.metadata.list.tail =
&stream_parsing->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
stream_parsing->incoming_metadata[j].prev =
&stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next =
&stream_parsing->incoming_metadata[j];
}
stream_parsing->incoming_metadata[mdidx].prev = NULL;
stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
/* track where we're up to */
mdidx = last_mdidx;
}
if (found_metadata) {
stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata;
if (mdidx != stream_parsing->incoming_metadata_count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
size_t copy_bytes =
sizeof(*stream_parsing->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(stream_parsing->old_incoming_metadata + mdidx,
stream_parsing->incoming_metadata, copy_bytes);
stream_parsing->incoming_metadata_count =
stream_parsing->incoming_metadata_capacity = new_count;
} else {
stream_parsing->incoming_metadata = NULL;
stream_parsing->incoming_metadata_count = 0;
stream_parsing->incoming_metadata_capacity = 0;
}
}
}
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing =
@ -787,7 +705,12 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
case GRPC_CHTTP2_STREAM_ERROR:
become_skip_parser(transport_parsing);
if (stream_parsing) {
stream_parsing->saw_error = 1;
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(
transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
}
return 1;
case GRPC_CHTTP2_CONNECTION_ERROR:

@ -0,0 +1,101 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/* core list management */
static int stream_list_empty(grpc_chttp2_transport *t,
grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
GPR_ASSERT(s->included[id]);
if (new_head) {
t->lists[id].head = new_head;
new_head->links[id].prev = NULL;
} else {
t->lists[id].head = NULL;
t->lists[id].tail = NULL;
}
s->included[id] = 0;
}
return s;
}
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
s->included[id] = 0;
if (s->links[id].prev) {
s->links[id].prev->links[id].next = s->links[id].next;
} else {
GPR_ASSERT(t->lists[id].head == s);
t->lists[id].head = s->links[id].next;
}
if (s->links[id].next) {
s->links[id].next->links[id].prev = s->links[id].prev;
} else {
t->lists[id].tail = s->links[id].prev;
}
}
static void stream_list_add_tail(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *old_tail;
GPR_ASSERT(!s->included[id]);
old_tail = t->lists[id].tail;
s->links[id].next = NULL;
s->links[id].prev = old_tail;
if (old_tail) {
old_tail->links[id].next = s;
} else {
s->links[id].prev = NULL;
t->lists[id].head = s;
}
t->lists[id].tail = s;
s->included[id] = 1;
}
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
stream_list_add_tail(t, s, id);
}

@ -77,8 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
GPR_MIN(transport_global->outgoing_window,
stream_global->outgoing_window),
&stream_writing->sopb);
GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
transport_global->outgoing_window -= window_delta;
stream_global->outgoing_window -= window_delta;
@ -117,7 +115,6 @@ int grpc_chttp2_unlocking_check_writes(
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_global->id, window_delta));
GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta);
stream_global->incoming_window += window_delta;
}
}
@ -130,7 +127,6 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->incoming_window;
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta);
transport_global->incoming_window += window_delta;
}

@ -58,21 +58,26 @@
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
if (!grpc_flowctl_trace) \
; \
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \
global)))
static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
static void unlock_check_cancellations(grpc_chttp2_transport *t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
@ -88,27 +93,27 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
/* basic stream list management */
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
static void stream_list_add_tail(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
/** schedule a closure to be called outside of the transport lock after the next
/** Schedule a closure to be called outside of the transport lock after the next
unlock() operation */
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
static void schedule_cb(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
/** Perform a transport_op */
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op);
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
#if 0
static void unlock_check_cancellations(grpc_chttp2_transport *t);
static void unlock_check_parser(grpc_chttp2_transport *t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void end_all_the_calls(grpc_chttp2_transport *t);
@ -131,10 +136,6 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
int is_parser);
static void maybe_join_window_updates(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_transport_op *op);
static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
#endif
@ -375,7 +376,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
memset(s, 0, sizeof(*s));
s->parsing.incoming_deadline = gpr_inf_future;
grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata);
grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata);
grpc_sopb_init(&s->writing.sopb);
grpc_chttp2_data_parser_init(&s->parsing.data_parser);
@ -395,7 +397,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
}
if (initial_op) perform_op_locked(t, s, initial_op);
if (initial_op) perform_op_locked(&t->global, &s->global, initial_op);
unlock(t);
return 0;
@ -404,102 +406,24 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
size_t i;
gpr_mu_lock(&t->mu);
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
}
gpr_mu_unlock(&t->mu);
GPR_ASSERT(s->global.outgoing_sopb == NULL);
GPR_ASSERT(s->global.incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing.sopb);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
for (i = 0; i < s->parsing.incoming_metadata_count; i++) {
grpc_mdelem_unref(s->parsing.incoming_metadata[i].md);
}
gpr_free(s->parsing.incoming_metadata);
gpr_free(s->parsing.old_incoming_metadata);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata);
unref_transport(t);
}
/*
* LIST MANAGEMENT
*/
static int stream_list_empty(grpc_chttp2_transport *t,
grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
GPR_ASSERT(s->included[id]);
if (new_head) {
t->lists[id].head = new_head;
new_head->links[id].prev = NULL;
} else {
t->lists[id].head = NULL;
t->lists[id].tail = NULL;
}
s->included[id] = 0;
}
return s;
}
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
s->included[id] = 0;
if (s->links[id].prev) {
s->links[id].prev->links[id].next = s->links[id].next;
} else {
GPR_ASSERT(t->lists[id].head == s);
t->lists[id].head = s->links[id].next;
}
if (s->links[id].next) {
s->links[id].next->links[id].prev = s->links[id].prev;
} else {
t->lists[id].tail = s->links[id].prev;
}
}
static void stream_list_add_tail(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *old_tail;
GPR_ASSERT(!s->included[id]);
old_tail = t->lists[id].tail;
s->links[id].next = NULL;
s->links[id].prev = old_tail;
if (old_tail) {
old_tail->links[id].next = s;
} else {
s->links[id].prev = NULL;
t->lists[id].head = s;
}
t->lists[id].tail = s;
s->included[id] = 1;
}
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
stream_list_add_tail(t, s, id);
}
#if 0
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (s->global.id == 0) return;
@ -530,11 +454,11 @@ static void unlock(grpc_chttp2_transport *t) {
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
ref_transport(t);
schedule_cb(t, &t->writing_action, 1);
schedule_cb(&t->global, &t->writing_action, 1);
}
/* unlock_check_cancellations(t); */
unlock_check_cancellations(t);
/* unlock_check_parser(t); */
/* unlock_check_channel_callbacks(t); */
unlock_check_channel_callbacks(t);
run_closures = t->global.pending_closures;
t->global.pending_closures = NULL;
@ -610,102 +534,96 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_glo
}
}
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
static void maybe_start_some_streams(grpc_chttp2_transport_global *transport_global) {
grpc_chttp2_stream_global *stream_global;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID &&
t->global.concurrent_stream_count <
t->global.settings[PEER_SETTINGS]
while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
transport_global->concurrent_stream_count <
transport_global->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
(s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) {
IF_TRACING(gpr_log(
GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id));
if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) {
if (transport_global->next_stream_id == MAX_CLIENT_STREAM_ID) {
grpc_chttp2_add_incoming_goaway(
&t->global, GRPC_CHTTP2_NO_ERROR,
transport_global, GRPC_CHTTP2_NO_ERROR,
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
GPR_ASSERT(s->global.id == 0);
s->global.id = t->global.next_stream_id;
t->global.next_stream_id += 2;
s->global.outgoing_window =
t->global
.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global
.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
t->global.concurrent_stream_count++;
stream_list_join(t, s, WRITABLE);
GPR_ASSERT(stream_global->id == 0);
stream_global->id = transport_global->next_stream_id;
transport_global->next_stream_id += 2;
stream_global->outgoing_window =
transport_global
->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_global->incoming_window =
transport_global->
settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global));
transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID &&
(s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
cancel_stream(
t, s, GRPC_STATUS_UNAVAILABLE,
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL,
0);
while (transport_global->next_stream_id > MAX_CLIENT_STREAM_ID &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) {
cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
}
}
#if 0
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) {
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
t, s, op->cancel_with_status,
grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
op->cancel_message, 1);
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
if (op->send_ops) {
GPR_ASSERT(s->global.outgoing_sopb == NULL);
s->global.send_done_closure = op->on_done_send;
if (!s->cancelled) {
s->global.outgoing_sopb = op->send_ops;
if (op->is_last_send && s->global.write_state == WRITE_STATE_OPEN) {
s->global.write_state = WRITE_STATE_QUEUED_CLOSE;
GPR_ASSERT(stream_global->outgoing_sopb == NULL);
stream_global->send_done_closure = op->on_done_send;
if (!stream_global->cancelled) {
stream_global->outgoing_sopb = op->send_ops;
if (op->is_last_send && stream_global->write_state == WRITE_STATE_OPEN) {
stream_global->write_state = WRITE_STATE_QUEUED_CLOSE;
}
if (s->global.id == 0) {
if (stream_global->id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
t->global.is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->global.outgoing_window > 0) {
stream_list_join(t, s, WRITABLE);
transport_global->is_client ? "CLI" : "SVR", stream_global));
grpc_chttp2_list_add_waiting_for_concurrency(
transport_global, stream_global
);
maybe_start_some_streams(transport_global);
} else if (stream_global->outgoing_window > 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
} else {
grpc_sopb_reset(op->send_ops);
schedule_cb(t, s->global.send_done_closure, 0);
schedule_cb(transport_global, stream_global->send_done_closure, 0);
}
}
if (op->recv_ops) {
GPR_ASSERT(s->global.incoming_sopb == NULL);
GPR_ASSERT(s->global.published_state != GRPC_STREAM_CLOSED);
s->global.recv_done_closure = op->on_done_recv;
s->global.incoming_sopb = op->recv_ops;
s->global.incoming_sopb->nops = 0;
s->global.publish_state = op->recv_state;
gpr_free(s->global.old_incoming_metadata);
s->global.old_incoming_metadata = NULL;
maybe_finish_read(t, s, 0);
maybe_join_window_updates(t, s);
GPR_ASSERT(stream_global->incoming_sopb == NULL);
GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED);
stream_global->recv_done_closure = op->on_done_recv;
stream_global->incoming_sopb = op->recv_ops;
stream_global->incoming_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
gpr_free(stream_global->old_incoming_metadata);
stream_global->old_incoming_metadata = NULL;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_incoming_window_state_changed(transport_global, stream_global);
}
if (op->bind_pollset) {
add_to_pollset_locked(t, op->bind_pollset);
add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), op->bind_pollset);
}
if (op->on_consumed) {
schedule_cb(t, op->on_consumed, 1);
schedule_cb(transport_global, op->on_consumed, 1);
}
}
#endif
static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_op *op) {
@ -713,7 +631,7 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
lock(t);
perform_op_locked(t, s, op);
perform_op_locked(&t->global, &s->global, op);
unlock(t);
}
@ -743,6 +661,22 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
*/
static void unlock_check_cancellations(grpc_chttp2_transport *t) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global;
/* if a stream is in the stream map, and gets cancelled, we need to ensure
we are not parsing before continuing the cancellation to keep things in
a sane state */
if (!t->parsing_active) {
while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, &stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id);
stream_global->in_stream_map = 0;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
}
}
#if 0
grpc_chttp2_stream *s;
if (t->writing_active) {
@ -754,6 +688,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
s->global.write_state = WRITE_STATE_SENT_CLOSE;
grpc_chttp2_read_write_state_changed(&t->global, &s->global);
}
#endif
}
#if 0
@ -839,16 +774,15 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
cancel_stream_inner(t, s, s->global.id, local_status, error_code, optional_message,
send_rst, 0);
}
#endif
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_stream) {
cancel_stream(user_data, grpc_chttp2_stream, GRPC_STATUS_UNAVAILABLE,
GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) {
cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
}
static void end_all_the_calls(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb);
}
#endif
static void drop_connection(grpc_chttp2_transport *t) {
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
@ -892,7 +826,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
grpc_chttp2_transport *t = tp;
size_t i;
int keep_reading = 0;
switch (error) {
case GRPC_ENDPOINT_CB_SHUTDOWN:
@ -908,18 +841,21 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
}
unlock(t);
unref_transport(t);
for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
break;
case GRPC_ENDPOINT_CB_OK:
lock(t);
i = 0;
GPR_ASSERT(!t->parsing_active);
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
t->parsing_active = 1;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (i = 0;
for (;
i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
i++)
;
i++) {
gpr_slice_unref(slices[i]);
}
gpr_mu_lock(&t->mu);
if (i != nslices) {
drop_connection(t);
@ -927,13 +863,13 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
t->parsing_active = 0;
}
#if 0
#if 0
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
@ -959,16 +895,13 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
#endif
if (i == nslices) {
grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}
unlock(t);
keep_reading = 1;
for (; i < nslices; i++) gpr_slice_unref(slices[i]);
break;
}
for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
if (keep_reading) {
grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}
}
/*
@ -1031,7 +964,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
t->channel_callback.executing = 1;
grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
ref_transport(t);
schedule_cb(t, &a->closure, 1);
schedule_cb(&t->global, &a->closure, 1);
return;
} else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
return;
@ -1041,15 +974,15 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
t->channel_callback.executing = 1;
ref_transport(t);
schedule_cb(t, &t->channel_callback.notify_closed, 1);
schedule_cb(&t->global, &t->channel_callback.notify_closed, 1);
}
}
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
static void schedule_cb(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
closure->next = t->global.pending_closures;
t->global.pending_closures = closure;
closure->next = transport_global->pending_closures;
transport_global->pending_closures = closure;
}
/*

File diff suppressed because one or more lines are too long

@ -231,6 +231,7 @@
<ClInclude Include="..\..\src\core\surface\surface_trace.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\alpn.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" />
@ -241,6 +242,7 @@
<ClInclude Include="..\..\src\core\transport\chttp2\hpack_table.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\http2_errors.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\huffsyms.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\incoming_metadata.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\internal.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\status_conversion.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" />
@ -467,6 +469,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\huffsyms.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\incoming_metadata.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\parsing.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\status_conversion.c">

@ -319,6 +319,9 @@
<ClCompile Include="..\..\src\core\transport\chttp2\huffsyms.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\incoming_metadata.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\parsing.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
@ -611,6 +614,9 @@
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\frame.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
@ -641,6 +647,9 @@
<ClInclude Include="..\..\src\core\transport\chttp2\huffsyms.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\incoming_metadata.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\internal.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>

@ -213,6 +213,7 @@
<ClInclude Include="..\..\src\core\surface\surface_trace.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\alpn.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_goaway.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\frame_ping.h" />
@ -223,6 +224,7 @@
<ClInclude Include="..\..\src\core\transport\chttp2\hpack_table.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\http2_errors.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\huffsyms.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\incoming_metadata.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\internal.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\status_conversion.h" />
<ClInclude Include="..\..\src\core\transport\chttp2\stream_encoder.h" />
@ -405,6 +407,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\huffsyms.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\incoming_metadata.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\parsing.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\status_conversion.c">

@ -253,6 +253,9 @@
<ClCompile Include="..\..\src\core\transport\chttp2\huffsyms.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\incoming_metadata.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\parsing.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
@ -494,6 +497,9 @@
<ClInclude Include="..\..\src\core\transport\chttp2\bin_encoder.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\frame.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\frame_data.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
@ -524,6 +530,9 @@
<ClInclude Include="..\..\src\core\transport\chttp2\huffsyms.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\incoming_metadata.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\transport\chttp2\internal.h">
<Filter>src\core\transport\chttp2</Filter>
</ClInclude>

Loading…
Cancel
Save