mirror of https://github.com/grpc/grpc.git
commit
3b1847c528
62 changed files with 3513 additions and 2316 deletions
@ -0,0 +1,177 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/transport/chttp2/incoming_metadata.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include "src/core/transport/chttp2/internal.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_init( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer) { |
||||
buffer->deadline = gpr_inf_future; |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_destroy( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer) { |
||||
gpr_free(buffer->elems); |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_add( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) { |
||||
if (buffer->capacity == buffer->count) { |
||||
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); |
||||
buffer->elems = |
||||
gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); |
||||
} |
||||
buffer->elems[buffer->count++].md = elem; |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_set_deadline( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { |
||||
buffer->deadline = deadline; |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_live_op_buffer_end( |
||||
grpc_chttp2_incoming_metadata_live_op_buffer *buffer) { |
||||
gpr_free(buffer->elems); |
||||
buffer->elems = NULL; |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) { |
||||
grpc_metadata_batch b; |
||||
|
||||
b.list.head = NULL; |
||||
/* Store away the last element of the list, so that in patch_metadata_ops
|
||||
we can reconstitute the list. |
||||
We can't do list building here as later incoming metadata may reallocate |
||||
the underlying array. */ |
||||
b.list.tail = (void *)(gpr_intptr)buffer->count; |
||||
b.garbage.head = b.garbage.tail = NULL; |
||||
b.deadline = buffer->deadline; |
||||
buffer->deadline = gpr_inf_future; |
||||
|
||||
grpc_sopb_add_metadata(sopb, b); |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_swap( |
||||
grpc_chttp2_incoming_metadata_buffer *a, |
||||
grpc_chttp2_incoming_metadata_buffer *b) { |
||||
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b); |
||||
} |
||||
|
||||
void grpc_incoming_metadata_buffer_move_to_referencing_sopb( |
||||
grpc_chttp2_incoming_metadata_buffer *src, |
||||
grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) { |
||||
size_t delta; |
||||
size_t i; |
||||
dst->deadline = gpr_time_min(src->deadline, dst->deadline); |
||||
|
||||
if (src->count == 0) { |
||||
return; |
||||
} |
||||
if (dst->count == 0) { |
||||
grpc_chttp2_incoming_metadata_buffer_swap(src, dst); |
||||
return; |
||||
} |
||||
delta = dst->count; |
||||
if (dst->capacity < src->count + dst->count) { |
||||
dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count); |
||||
dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems)); |
||||
} |
||||
memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems)); |
||||
dst->count += src->count; |
||||
for (i = 0; i < sopb->nops; i++) { |
||||
if (sopb->ops[i].type != GRPC_OP_METADATA) continue; |
||||
sopb->ops[i].data.metadata.list.tail = |
||||
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); |
||||
} |
||||
} |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, |
||||
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) { |
||||
grpc_stream_op *ops = sopb->ops; |
||||
size_t nops = sopb->nops; |
||||
size_t i; |
||||
size_t j; |
||||
size_t mdidx = 0; |
||||
size_t last_mdidx; |
||||
int found_metadata = 0; |
||||
|
||||
/* rework the array of metadata into a linked list, making use
|
||||
of the breadcrumbs we left in metadata batches during |
||||
add_metadata_batch */ |
||||
for (i = 0; i < nops; i++) { |
||||
grpc_stream_op *op = &ops[i]; |
||||
if (op->type != GRPC_OP_METADATA) continue; |
||||
found_metadata = 1; |
||||
/* we left a breadcrumb indicating where the end of this list is,
|
||||
and since we add sequentially, we know from the end of the last |
||||
segment where this segment begins */ |
||||
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); |
||||
GPR_ASSERT(last_mdidx > mdidx); |
||||
GPR_ASSERT(last_mdidx <= buffer->count); |
||||
/* turn the array into a doubly linked list */ |
||||
op->data.metadata.list.head = &buffer->elems[mdidx]; |
||||
op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1]; |
||||
for (j = mdidx + 1; j < last_mdidx; j++) { |
||||
buffer->elems[j].prev = &buffer->elems[j - 1]; |
||||
buffer->elems[j - 1].next = &buffer->elems[j]; |
||||
} |
||||
buffer->elems[mdidx].prev = NULL; |
||||
buffer->elems[last_mdidx - 1].next = NULL; |
||||
/* track where we're up to */ |
||||
mdidx = last_mdidx; |
||||
} |
||||
if (found_metadata) { |
||||
live_op_buffer->elems = buffer->elems; |
||||
if (mdidx != buffer->count) { |
||||
/* we have a partially read metadata batch still in incoming_metadata */ |
||||
size_t new_count = buffer->count - mdidx; |
||||
size_t copy_bytes = sizeof(*buffer->elems) * new_count; |
||||
GPR_ASSERT(mdidx < buffer->count); |
||||
buffer->elems = gpr_malloc(copy_bytes); |
||||
memcpy(live_op_buffer->elems + mdidx, buffer->elems, copy_bytes); |
||||
buffer->count = buffer->capacity = new_count; |
||||
} else { |
||||
buffer->elems = NULL; |
||||
buffer->count = 0; |
||||
buffer->capacity = 0; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,80 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H |
||||
#define GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H |
||||
|
||||
#include "src/core/transport/transport.h" |
||||
|
||||
typedef struct { |
||||
grpc_linked_mdelem *elems; |
||||
size_t count; |
||||
size_t capacity; |
||||
gpr_timespec deadline; |
||||
} grpc_chttp2_incoming_metadata_buffer; |
||||
|
||||
typedef struct { |
||||
grpc_linked_mdelem *elems; |
||||
} grpc_chttp2_incoming_metadata_live_op_buffer; |
||||
|
||||
/** assumes everything initially zeroed */ |
||||
void grpc_chttp2_incoming_metadata_buffer_init( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer); |
||||
void grpc_chttp2_incoming_metadata_buffer_destroy( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer); |
||||
void grpc_chttp2_incoming_metadata_buffer_reset( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer); |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_add( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem); |
||||
void grpc_chttp2_incoming_metadata_buffer_set_deadline( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); |
||||
|
||||
/** extend sopb with a metadata batch; this must be post-processed by
|
||||
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed |
||||
out of the transport */ |
||||
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); |
||||
|
||||
void grpc_incoming_metadata_buffer_move_to_referencing_sopb( |
||||
grpc_chttp2_incoming_metadata_buffer *src, |
||||
grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb); |
||||
|
||||
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( |
||||
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, |
||||
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); |
||||
|
||||
void grpc_chttp2_incoming_metadata_live_op_buffer_end( |
||||
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); |
||||
|
||||
#endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */ |
@ -0,0 +1,652 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H |
||||
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H |
||||
|
||||
#include "src/core/transport/transport_impl.h" |
||||
#include "src/core/iomgr/endpoint.h" |
||||
#include "src/core/transport/chttp2/frame.h" |
||||
#include "src/core/transport/chttp2/frame_data.h" |
||||
#include "src/core/transport/chttp2/frame_goaway.h" |
||||
#include "src/core/transport/chttp2/frame_ping.h" |
||||
#include "src/core/transport/chttp2/frame_rst_stream.h" |
||||
#include "src/core/transport/chttp2/frame_settings.h" |
||||
#include "src/core/transport/chttp2/frame_window_update.h" |
||||
#include "src/core/transport/chttp2/hpack_parser.h" |
||||
#include "src/core/transport/chttp2/incoming_metadata.h" |
||||
#include "src/core/transport/chttp2/stream_encoder.h" |
||||
#include "src/core/transport/chttp2/stream_map.h" |
||||
|
||||
typedef struct grpc_chttp2_transport grpc_chttp2_transport; |
||||
typedef struct grpc_chttp2_stream grpc_chttp2_stream; |
||||
|
||||
/* streams are kept in various linked lists depending on what things need to
|
||||
happen to them... this enum labels each list */ |
||||
typedef enum { |
||||
GRPC_CHTTP2_LIST_ALL_STREAMS, |
||||
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED, |
||||
GRPC_CHTTP2_LIST_WRITABLE, |
||||
GRPC_CHTTP2_LIST_WRITING, |
||||
GRPC_CHTTP2_LIST_WRITTEN, |
||||
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, |
||||
GRPC_CHTTP2_LIST_PARSING_SEEN, |
||||
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, |
||||
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, |
||||
/** streams that are waiting to start because there are too many concurrent
|
||||
streams on the connection */ |
||||
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, |
||||
STREAM_LIST_COUNT /* must be last */ |
||||
} grpc_chttp2_stream_list_id; |
||||
|
||||
/* deframer state for the overall http2 stream of bytes */ |
||||
typedef enum { |
||||
/* prefix: one entry per http2 connection prefix byte */ |
||||
DTS_CLIENT_PREFIX_0 = 0, |
||||
DTS_CLIENT_PREFIX_1, |
||||
DTS_CLIENT_PREFIX_2, |
||||
DTS_CLIENT_PREFIX_3, |
||||
DTS_CLIENT_PREFIX_4, |
||||
DTS_CLIENT_PREFIX_5, |
||||
DTS_CLIENT_PREFIX_6, |
||||
DTS_CLIENT_PREFIX_7, |
||||
DTS_CLIENT_PREFIX_8, |
||||
DTS_CLIENT_PREFIX_9, |
||||
DTS_CLIENT_PREFIX_10, |
||||
DTS_CLIENT_PREFIX_11, |
||||
DTS_CLIENT_PREFIX_12, |
||||
DTS_CLIENT_PREFIX_13, |
||||
DTS_CLIENT_PREFIX_14, |
||||
DTS_CLIENT_PREFIX_15, |
||||
DTS_CLIENT_PREFIX_16, |
||||
DTS_CLIENT_PREFIX_17, |
||||
DTS_CLIENT_PREFIX_18, |
||||
DTS_CLIENT_PREFIX_19, |
||||
DTS_CLIENT_PREFIX_20, |
||||
DTS_CLIENT_PREFIX_21, |
||||
DTS_CLIENT_PREFIX_22, |
||||
DTS_CLIENT_PREFIX_23, |
||||
/* frame header byte 0... */ |
||||
/* must follow from the prefix states */ |
||||
DTS_FH_0, |
||||
DTS_FH_1, |
||||
DTS_FH_2, |
||||
DTS_FH_3, |
||||
DTS_FH_4, |
||||
DTS_FH_5, |
||||
DTS_FH_6, |
||||
DTS_FH_7, |
||||
/* ... frame header byte 8 */ |
||||
DTS_FH_8, |
||||
/* inside a http2 frame */ |
||||
DTS_FRAME |
||||
} grpc_chttp2_deframe_transport_state; |
||||
|
||||
typedef enum { |
||||
WRITE_STATE_OPEN, |
||||
WRITE_STATE_QUEUED_CLOSE, |
||||
WRITE_STATE_SENT_CLOSE |
||||
} grpc_chttp2_write_state; |
||||
|
||||
typedef enum { |
||||
DONT_SEND_CLOSED = 0, |
||||
SEND_CLOSED, |
||||
SEND_CLOSED_WITH_RST_STREAM |
||||
} grpc_chttp2_send_closed; |
||||
|
||||
typedef struct { |
||||
grpc_chttp2_stream *head; |
||||
grpc_chttp2_stream *tail; |
||||
} grpc_chttp2_stream_list; |
||||
|
||||
typedef struct { |
||||
grpc_chttp2_stream *next; |
||||
grpc_chttp2_stream *prev; |
||||
} grpc_chttp2_stream_link; |
||||
|
||||
typedef enum { |
||||
GRPC_CHTTP2_ERROR_STATE_NONE, |
||||
GRPC_CHTTP2_ERROR_STATE_SEEN, |
||||
GRPC_CHTTP2_ERROR_STATE_NOTIFIED |
||||
} grpc_chttp2_error_state; |
||||
|
||||
/* We keep several sets of connection wide parameters */ |
||||
typedef enum { |
||||
/* The settings our peer has asked for (and we have acked) */ |
||||
PEER_SETTINGS = 0, |
||||
/* The settings we'd like to have */ |
||||
LOCAL_SETTINGS, |
||||
/* The settings we've published to our peer */ |
||||
SENT_SETTINGS, |
||||
/* The settings the peer has acked */ |
||||
ACKED_SETTINGS, |
||||
NUM_SETTING_SETS |
||||
} grpc_chttp2_setting_set; |
||||
|
||||
/* Outstanding ping request data */ |
||||
typedef struct grpc_chttp2_outstanding_ping { |
||||
gpr_uint8 id[8]; |
||||
grpc_iomgr_closure *on_recv; |
||||
struct grpc_chttp2_outstanding_ping *next; |
||||
struct grpc_chttp2_outstanding_ping *prev; |
||||
} grpc_chttp2_outstanding_ping; |
||||
|
||||
typedef struct { |
||||
/** data to write next write */ |
||||
gpr_slice_buffer qbuf; |
||||
/** queued callbacks */ |
||||
grpc_iomgr_closure *pending_closures; |
||||
|
||||
/** window available for us to send to peer */ |
||||
gpr_uint32 outgoing_window; |
||||
/** window available for peer to send to us - updated after parse */ |
||||
gpr_uint32 incoming_window; |
||||
/** how much window would we like to have for incoming_window */ |
||||
gpr_uint32 connection_window_target; |
||||
|
||||
/** is this transport a client? */ |
||||
gpr_uint8 is_client; |
||||
/** are the local settings dirty and need to be sent? */ |
||||
gpr_uint8 dirtied_local_settings; |
||||
/** have local settings been sent? */ |
||||
gpr_uint8 sent_local_settings; |
||||
/** bitmask of setting indexes to send out */ |
||||
gpr_uint32 force_send_settings; |
||||
/** settings values */ |
||||
gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; |
||||
|
||||
/** has there been a connection level error, and have we notified
|
||||
anyone about it? */ |
||||
grpc_chttp2_error_state error_state; |
||||
|
||||
/** what is the next stream id to be allocated by this peer?
|
||||
copied to next_stream_id in parsing when parsing commences */ |
||||
gpr_uint32 next_stream_id; |
||||
|
||||
/** last received stream id */ |
||||
gpr_uint32 last_incoming_stream_id; |
||||
|
||||
/** pings awaiting responses */ |
||||
grpc_chttp2_outstanding_ping pings; |
||||
/** next payload for an outgoing ping */ |
||||
gpr_uint64 ping_counter; |
||||
|
||||
/** concurrent stream count: updated when not parsing,
|
||||
so this is a strict over-estimation on the client */ |
||||
gpr_uint32 concurrent_stream_count; |
||||
|
||||
/** is there a goaway available? (boolean) */ |
||||
grpc_chttp2_error_state goaway_state; |
||||
/** what is the debug text of the goaway? */ |
||||
gpr_slice goaway_text; |
||||
/** what is the status code of the goaway? */ |
||||
grpc_status_code goaway_error; |
||||
} grpc_chttp2_transport_global; |
||||
|
||||
typedef struct { |
||||
/** data to write now */ |
||||
gpr_slice_buffer outbuf; |
||||
/** hpack encoding */ |
||||
grpc_chttp2_hpack_compressor hpack_compressor; |
||||
/** is this a client? */ |
||||
gpr_uint8 is_client; |
||||
} grpc_chttp2_transport_writing; |
||||
|
||||
struct grpc_chttp2_transport_parsing { |
||||
/** is this transport a client? (boolean) */ |
||||
gpr_uint8 is_client; |
||||
|
||||
/** were settings updated? */ |
||||
gpr_uint8 settings_updated; |
||||
/** was a settings ack received? */ |
||||
gpr_uint8 settings_ack_received; |
||||
/** was a goaway frame received? */ |
||||
gpr_uint8 goaway_received; |
||||
|
||||
/** initial window change */ |
||||
gpr_int64 initial_window_update; |
||||
|
||||
/** data to write later - after parsing */ |
||||
gpr_slice_buffer qbuf; |
||||
/* metadata object cache */ |
||||
grpc_mdstr *str_grpc_timeout; |
||||
/** parser for headers */ |
||||
grpc_chttp2_hpack_parser hpack_parser; |
||||
/** simple one shot parsers */ |
||||
union { |
||||
grpc_chttp2_window_update_parser window_update; |
||||
grpc_chttp2_settings_parser settings; |
||||
grpc_chttp2_ping_parser ping; |
||||
grpc_chttp2_rst_stream_parser rst_stream; |
||||
} simple; |
||||
/** parser for goaway frames */ |
||||
grpc_chttp2_goaway_parser goaway_parser; |
||||
|
||||
/** window available for peer to send to us */ |
||||
gpr_uint32 incoming_window; |
||||
gpr_uint32 incoming_window_delta; |
||||
|
||||
/** next stream id available at the time of beginning parsing */ |
||||
gpr_uint32 next_stream_id; |
||||
gpr_uint32 last_incoming_stream_id; |
||||
|
||||
/* deframing */ |
||||
grpc_chttp2_deframe_transport_state deframe_state; |
||||
gpr_uint8 incoming_frame_type; |
||||
gpr_uint8 incoming_frame_flags; |
||||
gpr_uint8 header_eof; |
||||
gpr_uint32 expect_continuation_stream_id; |
||||
gpr_uint32 incoming_frame_size; |
||||
gpr_uint32 incoming_stream_id; |
||||
|
||||
/* active parser */ |
||||
void *parser_data; |
||||
grpc_chttp2_stream_parsing *incoming_stream; |
||||
grpc_chttp2_parse_error (*parser)( |
||||
void *parser_user_data, grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); |
||||
|
||||
/* received settings */ |
||||
gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; |
||||
|
||||
/* goaway data */ |
||||
grpc_status_code goaway_error; |
||||
gpr_uint32 goaway_last_stream_index; |
||||
gpr_slice goaway_text; |
||||
|
||||
gpr_uint64 outgoing_window_update; |
||||
|
||||
/** pings awaiting responses */ |
||||
grpc_chttp2_outstanding_ping pings; |
||||
}; |
||||
|
||||
struct grpc_chttp2_transport { |
||||
grpc_transport base; /* must be first */ |
||||
grpc_endpoint *ep; |
||||
grpc_mdctx *metadata_context; |
||||
gpr_refcount refs; |
||||
|
||||
gpr_mu mu; |
||||
|
||||
/** is the transport destroying itself? */ |
||||
gpr_uint8 destroying; |
||||
/** has the upper layer closed the transport? */ |
||||
gpr_uint8 closed; |
||||
|
||||
/** is a thread currently writing */ |
||||
gpr_uint8 writing_active; |
||||
/** is a thread currently parsing */ |
||||
gpr_uint8 parsing_active; |
||||
|
||||
/** is there a read request to the endpoint outstanding? */ |
||||
gpr_uint8 endpoint_reading; |
||||
|
||||
/** various lists of streams */ |
||||
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; |
||||
|
||||
/** global state for reading/writing */ |
||||
grpc_chttp2_transport_global global; |
||||
/** state only accessible by the chain of execution that
|
||||
set writing_active=1 */ |
||||
grpc_chttp2_transport_writing writing; |
||||
/** state only accessible by the chain of execution that
|
||||
set parsing_active=1 */ |
||||
grpc_chttp2_transport_parsing parsing; |
||||
|
||||
/** maps stream id to grpc_chttp2_stream objects;
|
||||
owned by the parsing thread when parsing */ |
||||
grpc_chttp2_stream_map parsing_stream_map; |
||||
|
||||
/** streams created by the client (possibly during parsing);
|
||||
merged with parsing_stream_map during unlock when no |
||||
parsing is occurring */ |
||||
grpc_chttp2_stream_map new_stream_map; |
||||
|
||||
/** closure to execute writing */ |
||||
grpc_iomgr_closure writing_action; |
||||
/** closure to start reading from the endpoint */ |
||||
grpc_iomgr_closure reading_action; |
||||
|
||||
/** address to place a newly accepted stream - set and unset by
|
||||
grpc_chttp2_parsing_accept_stream; used by init_stream to |
||||
publish the accepted server stream */ |
||||
grpc_chttp2_stream **accepting_stream; |
||||
|
||||
struct { |
||||
/** is a thread currently performing channel callbacks */ |
||||
gpr_uint8 executing; |
||||
/** transport channel-level callback */ |
||||
const grpc_transport_callbacks *cb; |
||||
/** user data for cb calls */ |
||||
void *cb_user_data; |
||||
/** closure for notifying transport closure */ |
||||
grpc_iomgr_closure notify_closed; |
||||
} channel_callback; |
||||
|
||||
#if 0 |
||||
/* basic state management - what are we doing at the moment? */ |
||||
gpr_uint8 reading; |
||||
/** are we calling back any grpc_transport_stream_op completion events */ |
||||
gpr_uint8 calling_back_ops; |
||||
gpr_uint8 destroying; |
||||
gpr_uint8 closed; |
||||
|
||||
/* stream indexing */ |
||||
gpr_uint32 next_stream_id; |
||||
|
||||
/* window management */ |
||||
gpr_uint32 outgoing_window_update; |
||||
|
||||
/* state for a stream that's not yet been created */ |
||||
grpc_stream_op_buffer new_stream_sopb; |
||||
|
||||
/* stream ops that need to be destroyed, but outside of the lock */ |
||||
grpc_stream_op_buffer nuke_later_sopb; |
||||
|
||||
/* pings */ |
||||
gpr_int64 ping_counter; |
||||
|
||||
|
||||
grpc_chttp2_stream **accepting_stream; |
||||
|
||||
#endif |
||||
}; |
||||
|
||||
typedef struct { |
||||
/** HTTP2 stream id for this stream, or zero if one has not been assigned */ |
||||
gpr_uint32 id; |
||||
|
||||
grpc_iomgr_closure *send_done_closure; |
||||
grpc_iomgr_closure *recv_done_closure; |
||||
|
||||
/** window available for us to send to peer */ |
||||
gpr_int64 outgoing_window; |
||||
/** window available for peer to send to us - updated after parse */ |
||||
gpr_uint32 incoming_window; |
||||
/** stream ops the transport user would like to send */ |
||||
grpc_stream_op_buffer *outgoing_sopb; |
||||
/** when the application requests writes be closed, the write_closed is
|
||||
'queued'; when the close is flow controlled into the send path, we are |
||||
'sending' it; when the write has been performed it is 'sent' */ |
||||
grpc_chttp2_write_state write_state; |
||||
/** is this stream closed (boolean) */ |
||||
gpr_uint8 read_closed; |
||||
/** has this stream been cancelled? (boolean) */ |
||||
gpr_uint8 cancelled; |
||||
grpc_status_code cancelled_status; |
||||
/** have we told the upper layer that this stream is cancelled? */ |
||||
gpr_uint8 published_cancelled; |
||||
/** is this stream in the stream map? (boolean) */ |
||||
gpr_uint8 in_stream_map; |
||||
|
||||
/** stream state already published to the upper layer */ |
||||
grpc_stream_state published_state; |
||||
/** address to publish next stream state to */ |
||||
grpc_stream_state *publish_state; |
||||
/** pointer to sop buffer to fill in with new stream ops */ |
||||
grpc_stream_op_buffer *publish_sopb; |
||||
grpc_stream_op_buffer incoming_sopb; |
||||
|
||||
/** incoming metadata */ |
||||
grpc_chttp2_incoming_metadata_buffer incoming_metadata; |
||||
grpc_chttp2_incoming_metadata_live_op_buffer outstanding_metadata; |
||||
} grpc_chttp2_stream_global; |
||||
|
||||
typedef struct { |
||||
/** HTTP2 stream id for this stream, or zero if one has not been assigned */ |
||||
gpr_uint32 id; |
||||
/** sops that have passed flow control to be written */ |
||||
grpc_stream_op_buffer sopb; |
||||
/** how strongly should we indicate closure with the next write */ |
||||
grpc_chttp2_send_closed send_closed; |
||||
} grpc_chttp2_stream_writing; |
||||
|
||||
struct grpc_chttp2_stream_parsing { |
||||
/** HTTP2 stream id for this stream, or zero if one has not been assigned */ |
||||
gpr_uint32 id; |
||||
/** has this stream received a close */ |
||||
gpr_uint8 received_close; |
||||
/** saw a rst_stream */ |
||||
gpr_uint8 saw_rst_stream; |
||||
/** incoming_window has been reduced by this much during parsing */ |
||||
gpr_uint32 incoming_window_delta; |
||||
/** window available for peer to send to us */ |
||||
gpr_uint32 incoming_window; |
||||
/** parsing state for data frames */ |
||||
grpc_chttp2_data_parser data_parser; |
||||
/** reason give to rst_stream */ |
||||
gpr_uint32 rst_stream_reason; |
||||
/* amount of window given */ |
||||
gpr_uint64 outgoing_window_update; |
||||
|
||||
/** incoming metadata */ |
||||
grpc_chttp2_incoming_metadata_buffer incoming_metadata; |
||||
|
||||
/*
|
||||
grpc_linked_mdelem *incoming_metadata; |
||||
size_t incoming_metadata_count; |
||||
size_t incoming_metadata_capacity; |
||||
grpc_linked_mdelem *old_incoming_metadata; |
||||
gpr_timespec incoming_deadline; |
||||
*/ |
||||
}; |
||||
|
||||
struct grpc_chttp2_stream { |
||||
grpc_chttp2_stream_global global; |
||||
grpc_chttp2_stream_writing writing; |
||||
grpc_chttp2_stream_parsing parsing; |
||||
|
||||
grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; |
||||
gpr_uint8 included[STREAM_LIST_COUNT]; |
||||
|
||||
#if 0 |
||||
gpr_uint32 outgoing_window_update; |
||||
gpr_uint8 cancelled; |
||||
|
||||
grpc_stream_state callback_state; |
||||
grpc_stream_op_buffer callback_sopb; |
||||
#endif |
||||
}; |
||||
|
||||
/** Transport writing call flow:
|
||||
chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes |
||||
are required; |
||||
if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the |
||||
writes. |
||||
Once writes have been completed (meaning another write could potentially be |
||||
started), |
||||
grpc_chttp2_terminate_writing is called. This will call |
||||
grpc_chttp2_cleanup_writing, at which |
||||
point the write phase is complete. */ |
||||
|
||||
/** Someone is unlocking the transport mutex: check to see if writes
|
||||
are required, and schedule them if so */ |
||||
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, |
||||
grpc_chttp2_transport_writing *writing); |
||||
void grpc_chttp2_perform_writes( |
||||
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); |
||||
void grpc_chttp2_terminate_writing( |
||||
grpc_chttp2_transport_writing *transport_writing, int success); |
||||
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, |
||||
grpc_chttp2_transport_writing *writing); |
||||
|
||||
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, |
||||
grpc_chttp2_transport_parsing *parsing); |
||||
/** Process one slice of incoming data */ |
||||
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, |
||||
gpr_slice slice); |
||||
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, |
||||
grpc_chttp2_transport_parsing *parsing); |
||||
|
||||
/** Get a writable stream
|
||||
\return non-zero if there was a stream available */ |
||||
void grpc_chttp2_list_add_writable_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_writable_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_writing **stream_writing); |
||||
|
||||
void grpc_chttp2_list_add_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_parsing **stream_parsing); |
||||
void grpc_chttp2_list_remove_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
|
||||
void grpc_chttp2_list_add_writing_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing *stream_writing); |
||||
int grpc_chttp2_list_have_writing_streams( |
||||
grpc_chttp2_transport_writing *transport_writing); |
||||
int grpc_chttp2_list_pop_writing_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing **stream_writing); |
||||
|
||||
void grpc_chttp2_list_add_written_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing *stream_writing); |
||||
int grpc_chttp2_list_pop_written_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_writing **stream_writing); |
||||
|
||||
void grpc_chttp2_list_add_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global); |
||||
void grpc_chttp2_list_remove_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
|
||||
void grpc_chttp2_list_add_parsing_seen_stream( |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_parsing *stream_parsing); |
||||
int grpc_chttp2_list_pop_parsing_seen_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_parsing **stream_parsing); |
||||
|
||||
void grpc_chttp2_list_add_waiting_for_concurrency( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_waiting_for_concurrency( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global); |
||||
|
||||
void grpc_chttp2_list_add_closed_waiting_for_parsing( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_closed_waiting_for_parsing( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global); |
||||
|
||||
void grpc_chttp2_list_add_read_write_state_changed( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global); |
||||
int grpc_chttp2_list_pop_read_write_state_changed( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global); |
||||
|
||||
/** schedule a closure to run without the transport lock taken */ |
||||
void grpc_chttp2_schedule_closure( |
||||
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, |
||||
int success); |
||||
|
||||
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( |
||||
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); |
||||
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( |
||||
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); |
||||
|
||||
void grpc_chttp2_add_incoming_goaway( |
||||
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, |
||||
gpr_slice goaway_text); |
||||
|
||||
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s); |
||||
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s); |
||||
void grpc_chttp2_for_all_streams( |
||||
grpc_chttp2_transport_global *transport_global, void *user_data, |
||||
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, |
||||
grpc_chttp2_stream_global *stream_global)); |
||||
|
||||
void grpc_chttp2_parsing_become_skip_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing); |
||||
|
||||
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" |
||||
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ |
||||
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) |
||||
|
||||
extern int grpc_http_trace; |
||||
extern int grpc_flowctl_trace; |
||||
|
||||
#define IF_TRACING(stmt) \ |
||||
if (!(grpc_http_trace)) \
|
||||
; \
|
||||
else \
|
||||
stmt |
||||
|
||||
#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ |
||||
delta) \
|
||||
if (!(grpc_flowctl_trace)) { \
|
||||
} else { \
|
||||
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
|
||||
transport->is_client, context->id, context->var, \
|
||||
delta); \
|
||||
} |
||||
|
||||
#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ |
||||
if (!(grpc_flowctl_trace)) { \
|
||||
} else { \
|
||||
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
|
||||
context->is_client, 0, context->var, delta); \
|
||||
} |
||||
|
||||
void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, |
||||
const char *context, const char *var, |
||||
int is_client, gpr_uint32 stream_id, |
||||
gpr_int64 current_value, gpr_int64 delta); |
||||
|
||||
#endif |
@ -0,0 +1,813 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/transport/chttp2/internal.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include "src/core/transport/chttp2/http2_errors.h" |
||||
#include "src/core/transport/chttp2/status_conversion.h" |
||||
#include "src/core/transport/chttp2/timeout_encoding.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_header_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing, int is_continuation); |
||||
static int init_data_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_rst_stream_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_settings_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_window_update_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing); |
||||
static int init_skip_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing, int is_header); |
||||
|
||||
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, |
||||
gpr_slice slice, int is_last); |
||||
|
||||
void grpc_chttp2_prepare_to_read( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
grpc_chttp2_stream_global *stream_global; |
||||
grpc_chttp2_stream_parsing *stream_parsing; |
||||
|
||||
transport_parsing->next_stream_id = transport_global->next_stream_id; |
||||
|
||||
/* update the parsing view of incoming window */ |
||||
if (transport_parsing->incoming_window != transport_global->incoming_window) { |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"parse", transport_parsing, incoming_window, |
||||
(gpr_int64)transport_global->incoming_window - |
||||
(gpr_int64)transport_parsing->incoming_window); |
||||
transport_parsing->incoming_window = transport_global->incoming_window; |
||||
} |
||||
while (grpc_chttp2_list_pop_incoming_window_updated( |
||||
transport_global, transport_parsing, &stream_global, &stream_parsing)) { |
||||
stream_parsing->id = stream_global->id; |
||||
if (stream_parsing->incoming_window != stream_global->incoming_window) { |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( |
||||
"parse", transport_parsing, stream_parsing, incoming_window, |
||||
(gpr_int64)stream_global->incoming_window - |
||||
(gpr_int64)stream_parsing->incoming_window); |
||||
stream_parsing->incoming_window = stream_global->incoming_window; |
||||
} |
||||
} |
||||
} |
||||
|
||||
void grpc_chttp2_publish_reads( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
grpc_chttp2_stream_global *stream_global; |
||||
grpc_chttp2_stream_parsing *stream_parsing; |
||||
|
||||
/* transport_parsing->last_incoming_stream_id is used as
|
||||
last-grpc_chttp2_stream-id when |
||||
sending GOAWAY frame. |
||||
https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
|
||||
says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream |
||||
ID. So, |
||||
since we don't have server pushed streams, client should send |
||||
GOAWAY last-grpc_chttp2_stream-id=0 in this case. */ |
||||
if (!transport_parsing->is_client) { |
||||
transport_global->last_incoming_stream_id = |
||||
transport_parsing->incoming_stream_id; |
||||
} |
||||
|
||||
/* TODO(ctiller): re-implement */ |
||||
GPR_ASSERT(transport_parsing->initial_window_update == 0); |
||||
|
||||
/* copy parsing qbuf to global qbuf */ |
||||
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); |
||||
|
||||
/* update global settings */ |
||||
if (transport_parsing->settings_updated) { |
||||
memcpy(transport_global->settings[PEER_SETTINGS], |
||||
transport_parsing->settings, sizeof(transport_parsing->settings)); |
||||
transport_parsing->settings_updated = 0; |
||||
} |
||||
|
||||
/* update settings based on ack if received */ |
||||
if (transport_parsing->settings_ack_received) { |
||||
memcpy(transport_global->settings[ACKED_SETTINGS], |
||||
transport_global->settings[SENT_SETTINGS], |
||||
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); |
||||
transport_parsing->settings_ack_received = 0; |
||||
} |
||||
|
||||
/* move goaway to the global state if we received one (it will be
|
||||
published later */ |
||||
if (transport_parsing->goaway_received) { |
||||
grpc_chttp2_add_incoming_goaway(transport_global, |
||||
transport_parsing->goaway_error, |
||||
transport_parsing->goaway_text); |
||||
transport_parsing->goaway_text = gpr_empty_slice(); |
||||
transport_parsing->goaway_received = 0; |
||||
} |
||||
|
||||
/* propagate flow control tokens to global state */ |
||||
if (transport_parsing->outgoing_window_update) { |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"parsed", transport_global, outgoing_window, |
||||
transport_parsing->outgoing_window_update); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"parsed", transport_parsing, outgoing_window_update, |
||||
-(gpr_int64)transport_parsing->outgoing_window_update); |
||||
transport_global->outgoing_window += |
||||
transport_parsing->outgoing_window_update; |
||||
transport_parsing->outgoing_window_update = 0; |
||||
} |
||||
|
||||
if (transport_parsing->incoming_window_delta) { |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"parsed", transport_global, incoming_window, |
||||
-(gpr_int64)transport_parsing->incoming_window_delta); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"parsed", transport_parsing, incoming_window_delta, |
||||
-(gpr_int64)transport_parsing->incoming_window_delta); |
||||
transport_global->incoming_window -= |
||||
transport_parsing->incoming_window_delta; |
||||
transport_parsing->incoming_window_delta = 0; |
||||
} |
||||
|
||||
/* for each stream that saw an update, fixup global state */ |
||||
while (grpc_chttp2_list_pop_parsing_seen_stream( |
||||
transport_global, transport_parsing, &stream_global, &stream_parsing)) { |
||||
/* update incoming flow control window */ |
||||
if (stream_parsing->incoming_window_delta) { |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( |
||||
"parsed", transport_parsing, stream_global, incoming_window, |
||||
-(gpr_int64)stream_parsing->incoming_window_delta); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( |
||||
"parsed", transport_parsing, stream_parsing, incoming_window_delta, |
||||
-(gpr_int64)stream_parsing->incoming_window_delta); |
||||
stream_global->incoming_window -= stream_parsing->incoming_window_delta; |
||||
stream_parsing->incoming_window_delta = 0; |
||||
grpc_chttp2_list_add_writable_window_update_stream(transport_global, |
||||
stream_global); |
||||
} |
||||
|
||||
/* update outgoing flow control window */ |
||||
if (stream_parsing->outgoing_window_update) { |
||||
int was_zero = stream_global->outgoing_window <= 0; |
||||
int is_zero; |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing, |
||||
stream_global, outgoing_window, |
||||
stream_parsing->outgoing_window_update); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( |
||||
"parsed", transport_parsing, stream_parsing, outgoing_window_update, |
||||
-(gpr_int64)stream_parsing->outgoing_window_update); |
||||
stream_global->outgoing_window += stream_parsing->outgoing_window_update; |
||||
stream_parsing->outgoing_window_update = 0; |
||||
is_zero = stream_global->outgoing_window <= 0; |
||||
if (was_zero && !is_zero) { |
||||
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); |
||||
} |
||||
} |
||||
|
||||
/* updating closed status */ |
||||
if (stream_parsing->received_close) { |
||||
stream_global->read_closed = 1; |
||||
grpc_chttp2_list_add_read_write_state_changed(transport_global, |
||||
stream_global); |
||||
} |
||||
if (stream_parsing->saw_rst_stream) { |
||||
stream_global->cancelled = 1; |
||||
stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(stream_parsing->rst_stream_reason); |
||||
if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) { |
||||
stream_global->published_cancelled = 1; |
||||
} |
||||
grpc_chttp2_list_add_read_write_state_changed(transport_global, |
||||
stream_global); |
||||
} |
||||
|
||||
/* publish incoming stream ops */ |
||||
if (stream_parsing->data_parser.incoming_sopb.nops > 0) { |
||||
grpc_incoming_metadata_buffer_move_to_referencing_sopb( |
||||
&stream_parsing->incoming_metadata, &stream_global->incoming_metadata, |
||||
&stream_parsing->data_parser.incoming_sopb); |
||||
grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, |
||||
&stream_global->incoming_sopb); |
||||
grpc_chttp2_list_add_read_write_state_changed(transport_global, |
||||
stream_global); |
||||
} |
||||
} |
||||
} |
||||
|
||||
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, |
||||
gpr_slice slice) { |
||||
gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); |
||||
gpr_uint8 *end = GPR_SLICE_END_PTR(slice); |
||||
gpr_uint8 *cur = beg; |
||||
|
||||
if (cur == end) return 1; |
||||
|
||||
switch (transport_parsing->deframe_state) { |
||||
case DTS_CLIENT_PREFIX_0: |
||||
case DTS_CLIENT_PREFIX_1: |
||||
case DTS_CLIENT_PREFIX_2: |
||||
case DTS_CLIENT_PREFIX_3: |
||||
case DTS_CLIENT_PREFIX_4: |
||||
case DTS_CLIENT_PREFIX_5: |
||||
case DTS_CLIENT_PREFIX_6: |
||||
case DTS_CLIENT_PREFIX_7: |
||||
case DTS_CLIENT_PREFIX_8: |
||||
case DTS_CLIENT_PREFIX_9: |
||||
case DTS_CLIENT_PREFIX_10: |
||||
case DTS_CLIENT_PREFIX_11: |
||||
case DTS_CLIENT_PREFIX_12: |
||||
case DTS_CLIENT_PREFIX_13: |
||||
case DTS_CLIENT_PREFIX_14: |
||||
case DTS_CLIENT_PREFIX_15: |
||||
case DTS_CLIENT_PREFIX_16: |
||||
case DTS_CLIENT_PREFIX_17: |
||||
case DTS_CLIENT_PREFIX_18: |
||||
case DTS_CLIENT_PREFIX_19: |
||||
case DTS_CLIENT_PREFIX_20: |
||||
case DTS_CLIENT_PREFIX_21: |
||||
case DTS_CLIENT_PREFIX_22: |
||||
case DTS_CLIENT_PREFIX_23: |
||||
while (cur != end && transport_parsing->deframe_state != DTS_FH_0) { |
||||
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing |
||||
->deframe_state]) { |
||||
gpr_log(GPR_ERROR, |
||||
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) " |
||||
"at byte %d", |
||||
GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing |
||||
->deframe_state], |
||||
(int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING |
||||
[transport_parsing->deframe_state], |
||||
*cur, (int)*cur, transport_parsing->deframe_state); |
||||
return 0; |
||||
} |
||||
++cur; |
||||
++transport_parsing->deframe_state; |
||||
} |
||||
if (cur == end) { |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
dts_fh_0: |
||||
case DTS_FH_0: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_1; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_1: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_2; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_2: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_frame_size |= *cur; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_3; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_3: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_frame_type = *cur; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_4; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_4: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_frame_flags = *cur; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_5; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_5: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_6; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_6: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_7; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_7: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_8; |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FH_8: |
||||
GPR_ASSERT(cur < end); |
||||
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); |
||||
transport_parsing->deframe_state = DTS_FRAME; |
||||
if (!init_frame_parser(transport_parsing)) { |
||||
return 0; |
||||
} |
||||
if (transport_parsing->incoming_stream_id) { |
||||
transport_parsing->last_incoming_stream_id = |
||||
transport_parsing->incoming_stream_id; |
||||
} |
||||
if (transport_parsing->incoming_frame_size == 0) { |
||||
if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) { |
||||
return 0; |
||||
} |
||||
transport_parsing->incoming_stream = NULL; |
||||
if (++cur == end) { |
||||
transport_parsing->deframe_state = DTS_FH_0; |
||||
return 1; |
||||
} |
||||
goto dts_fh_0; /* loop */ |
||||
} |
||||
if (++cur == end) { |
||||
return 1; |
||||
} |
||||
/* fallthrough */ |
||||
case DTS_FRAME: |
||||
GPR_ASSERT(cur < end); |
||||
if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { |
||||
if (!parse_frame_slice( |
||||
transport_parsing, |
||||
gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { |
||||
return 0; |
||||
} |
||||
transport_parsing->deframe_state = DTS_FH_0; |
||||
transport_parsing->incoming_stream = NULL; |
||||
return 1; |
||||
} else if ((gpr_uint32)(end - cur) > |
||||
transport_parsing->incoming_frame_size) { |
||||
if (!parse_frame_slice( |
||||
transport_parsing, |
||||
gpr_slice_sub_no_ref( |
||||
slice, cur - beg, |
||||
cur + transport_parsing->incoming_frame_size - beg), |
||||
1)) { |
||||
return 0; |
||||
} |
||||
cur += transport_parsing->incoming_frame_size; |
||||
transport_parsing->incoming_stream = NULL; |
||||
goto dts_fh_0; /* loop */ |
||||
} else { |
||||
if (!parse_frame_slice( |
||||
transport_parsing, |
||||
gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { |
||||
return 0; |
||||
} |
||||
transport_parsing->incoming_frame_size -= (end - cur); |
||||
return 1; |
||||
} |
||||
gpr_log(GPR_ERROR, "should never reach here"); |
||||
abort(); |
||||
} |
||||
|
||||
gpr_log(GPR_ERROR, "should never reach here"); |
||||
abort(); |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { |
||||
if (transport_parsing->expect_continuation_stream_id != 0) { |
||||
if (transport_parsing->incoming_frame_type != |
||||
GRPC_CHTTP2_FRAME_CONTINUATION) { |
||||
gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", |
||||
transport_parsing->incoming_frame_type); |
||||
return 0; |
||||
} |
||||
if (transport_parsing->expect_continuation_stream_id != |
||||
transport_parsing->incoming_stream_id) { |
||||
gpr_log(GPR_ERROR, |
||||
"Expected CONTINUATION frame for grpc_chttp2_stream %08x, got " |
||||
"grpc_chttp2_stream %08x", |
||||
transport_parsing->expect_continuation_stream_id, |
||||
transport_parsing->incoming_stream_id); |
||||
return 0; |
||||
} |
||||
return init_header_frame_parser(transport_parsing, 1); |
||||
} |
||||
switch (transport_parsing->incoming_frame_type) { |
||||
case GRPC_CHTTP2_FRAME_DATA: |
||||
return init_data_frame_parser(transport_parsing); |
||||
case GRPC_CHTTP2_FRAME_HEADER: |
||||
return init_header_frame_parser(transport_parsing, 0); |
||||
case GRPC_CHTTP2_FRAME_CONTINUATION: |
||||
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); |
||||
return 0; |
||||
case GRPC_CHTTP2_FRAME_RST_STREAM: |
||||
return init_rst_stream_parser(transport_parsing); |
||||
case GRPC_CHTTP2_FRAME_SETTINGS: |
||||
return init_settings_frame_parser(transport_parsing); |
||||
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: |
||||
return init_window_update_frame_parser(transport_parsing); |
||||
case GRPC_CHTTP2_FRAME_PING: |
||||
return init_ping_parser(transport_parsing); |
||||
case GRPC_CHTTP2_FRAME_GOAWAY: |
||||
return init_goaway_parser(transport_parsing); |
||||
default: |
||||
gpr_log(GPR_ERROR, "Unknown frame type %02x", |
||||
transport_parsing->incoming_frame_type); |
||||
return init_skip_frame_parser(transport_parsing, 0); |
||||
} |
||||
} |
||||
|
||||
static grpc_chttp2_parse_error skip_parser( |
||||
void *parser, grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { |
||||
return GRPC_CHTTP2_PARSE_OK; |
||||
} |
||||
|
||||
static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } |
||||
|
||||
static int init_skip_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing, int is_header) { |
||||
if (is_header) { |
||||
int is_eoh = transport_parsing->expect_continuation_stream_id != 0; |
||||
transport_parsing->parser = grpc_chttp2_header_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->hpack_parser; |
||||
transport_parsing->hpack_parser.on_header = skip_header; |
||||
transport_parsing->hpack_parser.on_header_user_data = NULL; |
||||
transport_parsing->hpack_parser.is_boundary = is_eoh; |
||||
transport_parsing->hpack_parser.is_eof = |
||||
is_eoh ? transport_parsing->header_eof : 0; |
||||
} else { |
||||
transport_parsing->parser = skip_parser; |
||||
} |
||||
return 1; |
||||
} |
||||
|
||||
void grpc_chttp2_parsing_become_skip_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
init_skip_frame_parser( |
||||
transport_parsing, |
||||
transport_parsing->parser == grpc_chttp2_header_parser_parse); |
||||
} |
||||
|
||||
static grpc_chttp2_parse_error update_incoming_window( |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_parsing *stream_parsing) { |
||||
if (transport_parsing->incoming_frame_size > |
||||
transport_parsing->incoming_window) { |
||||
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_window); |
||||
return GRPC_CHTTP2_CONNECTION_ERROR; |
||||
} |
||||
|
||||
if (transport_parsing->incoming_frame_size > |
||||
stream_parsing->incoming_window) { |
||||
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", |
||||
transport_parsing->incoming_frame_size, |
||||
stream_parsing->incoming_window); |
||||
return GRPC_CHTTP2_CONNECTION_ERROR; |
||||
} |
||||
|
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"data", transport_parsing, incoming_window, |
||||
-(gpr_int64)transport_parsing->incoming_frame_size); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing, |
||||
incoming_window_delta, |
||||
transport_parsing->incoming_frame_size); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( |
||||
"data", transport_parsing, stream_parsing, incoming_window, |
||||
-(gpr_int64)transport_parsing->incoming_frame_size); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing, |
||||
incoming_window_delta, |
||||
transport_parsing->incoming_frame_size); |
||||
|
||||
transport_parsing->incoming_window -= transport_parsing->incoming_frame_size; |
||||
transport_parsing->incoming_window_delta += |
||||
transport_parsing->incoming_frame_size; |
||||
stream_parsing->incoming_window -= transport_parsing->incoming_frame_size; |
||||
stream_parsing->incoming_window_delta += |
||||
transport_parsing->incoming_frame_size; |
||||
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); |
||||
|
||||
return GRPC_CHTTP2_PARSE_OK; |
||||
} |
||||
|
||||
static int init_data_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
grpc_chttp2_stream_parsing *stream_parsing = |
||||
grpc_chttp2_parsing_lookup_stream(transport_parsing, |
||||
transport_parsing->incoming_stream_id); |
||||
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; |
||||
if (!stream_parsing || stream_parsing->received_close) |
||||
return init_skip_frame_parser(transport_parsing, 0); |
||||
if (err == GRPC_CHTTP2_PARSE_OK) { |
||||
err = update_incoming_window(transport_parsing, stream_parsing); |
||||
} |
||||
if (err == GRPC_CHTTP2_PARSE_OK) { |
||||
err = grpc_chttp2_data_parser_begin_frame( |
||||
&stream_parsing->data_parser, transport_parsing->incoming_frame_flags); |
||||
} |
||||
switch (err) { |
||||
case GRPC_CHTTP2_PARSE_OK: |
||||
transport_parsing->incoming_stream = stream_parsing; |
||||
transport_parsing->parser = grpc_chttp2_data_parser_parse; |
||||
transport_parsing->parser_data = &stream_parsing->data_parser; |
||||
return 1; |
||||
case GRPC_CHTTP2_STREAM_ERROR: |
||||
stream_parsing->received_close = 1; |
||||
stream_parsing->saw_rst_stream = 1; |
||||
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR; |
||||
gpr_slice_buffer_add( |
||||
&transport_parsing->qbuf, |
||||
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, |
||||
GRPC_CHTTP2_PROTOCOL_ERROR)); |
||||
return init_skip_frame_parser(transport_parsing, 0); |
||||
case GRPC_CHTTP2_CONNECTION_ERROR: |
||||
return 0; |
||||
} |
||||
gpr_log(GPR_ERROR, "should never reach here"); |
||||
abort(); |
||||
return 0; |
||||
} |
||||
|
||||
static void free_timeout(void *p) { gpr_free(p); } |
||||
|
||||
static void on_header(void *tp, grpc_mdelem *md) { |
||||
grpc_chttp2_transport_parsing *transport_parsing = tp; |
||||
grpc_chttp2_stream_parsing *stream_parsing = |
||||
transport_parsing->incoming_stream; |
||||
|
||||
GPR_ASSERT(stream_parsing); |
||||
|
||||
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, |
||||
transport_parsing->is_client ? "CLI" : "SVR", |
||||
grpc_mdstr_as_c_string(md->key), |
||||
grpc_mdstr_as_c_string(md->value))); |
||||
|
||||
if (md->key == transport_parsing->str_grpc_timeout) { |
||||
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); |
||||
if (!cached_timeout) { |
||||
/* not already parsed: parse it now, and store the result away */ |
||||
cached_timeout = gpr_malloc(sizeof(gpr_timespec)); |
||||
if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), |
||||
cached_timeout)) { |
||||
gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", |
||||
grpc_mdstr_as_c_string(md->value)); |
||||
*cached_timeout = gpr_inf_future; |
||||
} |
||||
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); |
||||
} |
||||
grpc_chttp2_incoming_metadata_buffer_set_deadline( |
||||
&stream_parsing->incoming_metadata, |
||||
gpr_time_add(gpr_now(), *cached_timeout)); |
||||
grpc_mdelem_unref(md); |
||||
} else { |
||||
grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, |
||||
md); |
||||
} |
||||
|
||||
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); |
||||
} |
||||
|
||||
static int init_header_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { |
||||
int is_eoh = (transport_parsing->incoming_frame_flags & |
||||
GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; |
||||
int via_accept = 0; |
||||
grpc_chttp2_stream_parsing *stream_parsing; |
||||
|
||||
if (is_eoh) { |
||||
transport_parsing->expect_continuation_stream_id = 0; |
||||
} else { |
||||
transport_parsing->expect_continuation_stream_id = |
||||
transport_parsing->incoming_stream_id; |
||||
} |
||||
|
||||
if (!is_continuation) { |
||||
transport_parsing->header_eof = (transport_parsing->incoming_frame_flags & |
||||
GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; |
||||
} |
||||
|
||||
/* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ |
||||
stream_parsing = grpc_chttp2_parsing_lookup_stream( |
||||
transport_parsing, transport_parsing->incoming_stream_id); |
||||
if (stream_parsing == NULL) { |
||||
if (is_continuation) { |
||||
gpr_log(GPR_ERROR, |
||||
"grpc_chttp2_stream disbanded before CONTINUATION received"); |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} |
||||
if (transport_parsing->is_client) { |
||||
if ((transport_parsing->incoming_stream_id & 1) && |
||||
transport_parsing->incoming_stream_id < |
||||
transport_parsing->next_stream_id) { |
||||
/* this is an old (probably cancelled) grpc_chttp2_stream */ |
||||
} else { |
||||
gpr_log(GPR_ERROR, |
||||
"ignoring new grpc_chttp2_stream creation on client"); |
||||
} |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} else if (transport_parsing->last_incoming_stream_id > |
||||
transport_parsing->incoming_stream_id) { |
||||
gpr_log(GPR_ERROR, |
||||
"ignoring out of order new grpc_chttp2_stream request on server; " |
||||
"last grpc_chttp2_stream " |
||||
"id=%d, new grpc_chttp2_stream id=%d", |
||||
transport_parsing->last_incoming_stream_id, |
||||
transport_parsing->incoming_stream_id); |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} else if ((transport_parsing->incoming_stream_id & 1) == 0) { |
||||
gpr_log(GPR_ERROR, |
||||
"ignoring grpc_chttp2_stream with non-client generated index %d", |
||||
transport_parsing->incoming_stream_id); |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} |
||||
stream_parsing = transport_parsing->incoming_stream = |
||||
grpc_chttp2_parsing_accept_stream( |
||||
transport_parsing, transport_parsing->incoming_stream_id); |
||||
if (stream_parsing == NULL) { |
||||
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} |
||||
via_accept = 1; |
||||
} else { |
||||
transport_parsing->incoming_stream = stream_parsing; |
||||
} |
||||
GPR_ASSERT(stream_parsing != NULL && (via_accept == 0 || via_accept == 1)); |
||||
if (stream_parsing->received_close) { |
||||
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); |
||||
transport_parsing->incoming_stream = NULL; |
||||
return init_skip_frame_parser(transport_parsing, 1); |
||||
} |
||||
transport_parsing->parser = grpc_chttp2_header_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->hpack_parser; |
||||
transport_parsing->hpack_parser.on_header = on_header; |
||||
transport_parsing->hpack_parser.on_header_user_data = transport_parsing; |
||||
transport_parsing->hpack_parser.is_boundary = is_eoh; |
||||
transport_parsing->hpack_parser.is_eof = |
||||
is_eoh ? transport_parsing->header_eof : 0; |
||||
if (!is_continuation && (transport_parsing->incoming_frame_flags & |
||||
GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { |
||||
grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); |
||||
} |
||||
return 1; |
||||
} |
||||
|
||||
static int init_window_update_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( |
||||
&transport_parsing->simple.window_update, |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_frame_flags); |
||||
if (transport_parsing->incoming_stream_id) { |
||||
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( |
||||
transport_parsing, transport_parsing->incoming_stream_id); |
||||
} |
||||
transport_parsing->parser = grpc_chttp2_window_update_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->simple.window_update; |
||||
return ok; |
||||
} |
||||
|
||||
static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) { |
||||
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame( |
||||
&transport_parsing->simple.ping, |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_frame_flags); |
||||
transport_parsing->parser = grpc_chttp2_ping_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->simple.ping; |
||||
return ok; |
||||
} |
||||
|
||||
static int init_rst_stream_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( |
||||
&transport_parsing->simple.rst_stream, |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_frame_flags); |
||||
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( |
||||
transport_parsing, transport_parsing->incoming_stream_id); |
||||
if (!transport_parsing->incoming_stream) { |
||||
return init_skip_frame_parser(transport_parsing, 0); |
||||
} |
||||
transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->simple.rst_stream; |
||||
return ok; |
||||
} |
||||
|
||||
static int init_goaway_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame( |
||||
&transport_parsing->goaway_parser, |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_frame_flags); |
||||
transport_parsing->parser = grpc_chttp2_goaway_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->goaway_parser; |
||||
return ok; |
||||
} |
||||
|
||||
static int init_settings_frame_parser( |
||||
grpc_chttp2_transport_parsing *transport_parsing) { |
||||
int ok; |
||||
|
||||
if (transport_parsing->incoming_stream_id != 0) { |
||||
gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", |
||||
transport_parsing->incoming_stream_id); |
||||
return 0; |
||||
} |
||||
|
||||
ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_settings_parser_begin_frame( |
||||
&transport_parsing->simple.settings, |
||||
transport_parsing->incoming_frame_size, |
||||
transport_parsing->incoming_frame_flags, |
||||
transport_parsing->settings); |
||||
if (!ok) { |
||||
return 0; |
||||
} |
||||
if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { |
||||
transport_parsing->settings_ack_received = 1; |
||||
} |
||||
transport_parsing->parser = grpc_chttp2_settings_parser_parse; |
||||
transport_parsing->parser_data = &transport_parsing->simple.settings; |
||||
return ok; |
||||
} |
||||
|
||||
/*
|
||||
static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { |
||||
return window + window_update < MAX_WINDOW; |
||||
} |
||||
*/ |
||||
|
||||
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, |
||||
gpr_slice slice, int is_last) { |
||||
grpc_chttp2_stream_parsing *stream_parsing = |
||||
transport_parsing->incoming_stream; |
||||
switch (transport_parsing->parser(transport_parsing->parser_data, |
||||
transport_parsing, stream_parsing, slice, |
||||
is_last)) { |
||||
case GRPC_CHTTP2_PARSE_OK: |
||||
if (stream_parsing) { |
||||
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, |
||||
stream_parsing); |
||||
} |
||||
return 1; |
||||
case GRPC_CHTTP2_STREAM_ERROR: |
||||
grpc_chttp2_parsing_become_skip_parser(transport_parsing); |
||||
if (stream_parsing) { |
||||
stream_parsing->saw_rst_stream = 1; |
||||
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR; |
||||
gpr_slice_buffer_add( |
||||
&transport_parsing->qbuf, |
||||
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, |
||||
GRPC_CHTTP2_PROTOCOL_ERROR)); |
||||
} |
||||
return 1; |
||||
case GRPC_CHTTP2_CONNECTION_ERROR: |
||||
return 0; |
||||
} |
||||
gpr_log(GPR_ERROR, "should never reach here"); |
||||
abort(); |
||||
return 0; |
||||
} |
@ -0,0 +1,353 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/transport/chttp2/internal.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#define TRANSPORT_FROM_GLOBAL(tg) \ |
||||
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
|
||||
global))) |
||||
|
||||
#define STREAM_FROM_GLOBAL(sg) \ |
||||
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) |
||||
|
||||
#define TRANSPORT_FROM_WRITING(tw) \ |
||||
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
|
||||
writing))) |
||||
|
||||
#define STREAM_FROM_WRITING(sw) \ |
||||
((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, writing))) |
||||
|
||||
#define TRANSPORT_FROM_PARSING(tp) \ |
||||
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
|
||||
parsing))) |
||||
|
||||
#define STREAM_FROM_PARSING(sp) \ |
||||
((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, parsing))) |
||||
|
||||
/* core list management */ |
||||
|
||||
static int stream_list_empty(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream_list_id id) { |
||||
return t->lists[id].head == NULL; |
||||
} |
||||
|
||||
static int stream_list_pop(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream **stream, |
||||
grpc_chttp2_stream_list_id id) { |
||||
grpc_chttp2_stream *s = t->lists[id].head; |
||||
if (s) { |
||||
grpc_chttp2_stream *new_head = s->links[id].next; |
||||
GPR_ASSERT(s->included[id]); |
||||
if (new_head) { |
||||
t->lists[id].head = new_head; |
||||
new_head->links[id].prev = NULL; |
||||
} else { |
||||
t->lists[id].head = NULL; |
||||
t->lists[id].tail = NULL; |
||||
} |
||||
s->included[id] = 0; |
||||
} |
||||
*stream = s; |
||||
return s != 0; |
||||
} |
||||
|
||||
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, |
||||
grpc_chttp2_stream_list_id id) { |
||||
GPR_ASSERT(s->included[id]); |
||||
s->included[id] = 0; |
||||
if (s->links[id].prev) { |
||||
s->links[id].prev->links[id].next = s->links[id].next; |
||||
} else { |
||||
GPR_ASSERT(t->lists[id].head == s); |
||||
t->lists[id].head = s->links[id].next; |
||||
} |
||||
if (s->links[id].next) { |
||||
s->links[id].next->links[id].prev = s->links[id].prev; |
||||
} else { |
||||
t->lists[id].tail = s->links[id].prev; |
||||
} |
||||
} |
||||
|
||||
static void stream_list_maybe_remove(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s, |
||||
grpc_chttp2_stream_list_id id) { |
||||
if (s->included[id]) { |
||||
stream_list_remove(t, s, id); |
||||
} |
||||
} |
||||
|
||||
static void stream_list_add_tail(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s, |
||||
grpc_chttp2_stream_list_id id) { |
||||
grpc_chttp2_stream *old_tail; |
||||
GPR_ASSERT(!s->included[id]); |
||||
old_tail = t->lists[id].tail; |
||||
s->links[id].next = NULL; |
||||
s->links[id].prev = old_tail; |
||||
if (old_tail) { |
||||
old_tail->links[id].next = s; |
||||
} else { |
||||
s->links[id].prev = NULL; |
||||
t->lists[id].head = s; |
||||
} |
||||
t->lists[id].tail = s; |
||||
s->included[id] = 1; |
||||
} |
||||
|
||||
static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, |
||||
grpc_chttp2_stream_list_id id) { |
||||
if (s->included[id]) { |
||||
return; |
||||
} |
||||
stream_list_add_tail(t, s, id); |
||||
} |
||||
|
||||
/* wrappers for specializations */ |
||||
|
||||
void grpc_chttp2_list_add_writable_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_writable_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_writing **stream_writing) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_WRITABLE); |
||||
*stream_global = &stream->global; |
||||
*stream_writing = &stream->writing; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_writing_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing *stream_writing) { |
||||
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), |
||||
STREAM_FROM_WRITING(stream_writing), |
||||
GRPC_CHTTP2_LIST_WRITING); |
||||
} |
||||
|
||||
int grpc_chttp2_list_have_writing_streams( |
||||
grpc_chttp2_transport_writing *transport_writing) { |
||||
return !stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing), |
||||
GRPC_CHTTP2_LIST_WRITING); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_writing_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing **stream_writing) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, |
||||
GRPC_CHTTP2_LIST_WRITING); |
||||
*stream_writing = &stream->writing; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_written_stream( |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_writing *stream_writing) { |
||||
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), |
||||
STREAM_FROM_WRITING(stream_writing), |
||||
GRPC_CHTTP2_LIST_WRITTEN); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_written_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_writing **stream_writing) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, |
||||
GRPC_CHTTP2_LIST_WRITTEN); |
||||
*stream_global = &stream->global; |
||||
*stream_writing = &stream->writing; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); |
||||
*stream_global = &stream->global; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_remove_writable_window_update_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_parsing_seen_stream( |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_parsing *stream_parsing) { |
||||
stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), |
||||
STREAM_FROM_PARSING(stream_parsing), |
||||
GRPC_CHTTP2_LIST_PARSING_SEEN); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_parsing_seen_stream( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_parsing **stream_parsing) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, |
||||
GRPC_CHTTP2_LIST_PARSING_SEEN); |
||||
*stream_global = &stream->global; |
||||
*stream_parsing = &stream->parsing; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_waiting_for_concurrency( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_waiting_for_concurrency( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); |
||||
*stream_global = &stream->global; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_closed_waiting_for_parsing( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_closed_waiting_for_parsing( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); |
||||
*stream_global = &stream->global; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_parsing *transport_parsing, |
||||
grpc_chttp2_stream_global **stream_global, |
||||
grpc_chttp2_stream_parsing **stream_parsing) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); |
||||
*stream_global = &stream->global; |
||||
*stream_parsing = &stream->parsing; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_list_remove_incoming_window_updated( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); |
||||
} |
||||
|
||||
void grpc_chttp2_list_add_read_write_state_changed( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global *stream_global) { |
||||
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), |
||||
STREAM_FROM_GLOBAL(stream_global), |
||||
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); |
||||
} |
||||
|
||||
int grpc_chttp2_list_pop_read_write_state_changed( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_stream_global **stream_global) { |
||||
grpc_chttp2_stream *stream; |
||||
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, |
||||
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); |
||||
*stream_global = &stream->global; |
||||
return r; |
||||
} |
||||
|
||||
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s) { |
||||
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); |
||||
} |
||||
|
||||
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, |
||||
grpc_chttp2_stream *s) { |
||||
stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); |
||||
} |
||||
|
||||
void grpc_chttp2_for_all_streams( |
||||
grpc_chttp2_transport_global *transport_global, void *user_data, |
||||
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, |
||||
grpc_chttp2_stream_global *stream_global)) { |
||||
grpc_chttp2_stream *s; |
||||
for (s = TRANSPORT_FROM_GLOBAL(transport_global) |
||||
->lists[GRPC_CHTTP2_LIST_ALL_STREAMS] |
||||
.head; |
||||
s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { |
||||
cb(transport_global, user_data, &s->global); |
||||
} |
||||
} |
@ -0,0 +1,215 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/transport/chttp2/internal.h" |
||||
#include "src/core/transport/chttp2/http2_errors.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); |
||||
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); |
||||
|
||||
int grpc_chttp2_unlocking_check_writes( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing) { |
||||
grpc_chttp2_stream_global *stream_global; |
||||
grpc_chttp2_stream_writing *stream_writing; |
||||
gpr_uint32 window_delta; |
||||
|
||||
/* simple writes are queued to qbuf, and flushed here */ |
||||
gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); |
||||
GPR_ASSERT(transport_global->qbuf.count == 0); |
||||
|
||||
if (transport_global->dirtied_local_settings && |
||||
!transport_global->sent_local_settings) { |
||||
gpr_slice_buffer_add( |
||||
&transport_writing->outbuf, |
||||
grpc_chttp2_settings_create(transport_global->settings[SENT_SETTINGS], |
||||
transport_global->settings[LOCAL_SETTINGS], |
||||
transport_global->force_send_settings, |
||||
GRPC_CHTTP2_NUM_SETTINGS)); |
||||
transport_global->force_send_settings = 0; |
||||
transport_global->dirtied_local_settings = 0; |
||||
transport_global->sent_local_settings = 1; |
||||
} |
||||
|
||||
/* for each grpc_chttp2_stream that's become writable, frame it's data
|
||||
(according to |
||||
available window sizes) and add to the output buffer */ |
||||
while (transport_global->outgoing_window && |
||||
grpc_chttp2_list_pop_writable_stream(transport_global, |
||||
transport_writing, &stream_global, |
||||
&stream_writing) && |
||||
stream_global->outgoing_window > 0) { |
||||
stream_writing->id = stream_global->id; |
||||
window_delta = grpc_chttp2_preencode( |
||||
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, |
||||
GPR_MIN(transport_global->outgoing_window, |
||||
stream_global->outgoing_window), |
||||
&stream_writing->sopb); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( |
||||
"write", transport_global, outgoing_window, -(gpr_int64)window_delta); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, |
||||
outgoing_window, -(gpr_int64)window_delta); |
||||
transport_global->outgoing_window -= window_delta; |
||||
stream_global->outgoing_window -= window_delta; |
||||
|
||||
if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && |
||||
stream_global->outgoing_sopb->nops == 0) { |
||||
if (!transport_global->is_client && !stream_global->read_closed) { |
||||
stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; |
||||
} else { |
||||
stream_writing->send_closed = SEND_CLOSED; |
||||
} |
||||
} |
||||
if (stream_writing->sopb.nops > 0 || |
||||
stream_writing->send_closed != DONT_SEND_CLOSED) { |
||||
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); |
||||
} |
||||
|
||||
/* we should either exhaust window or have no ops left, but not both */ |
||||
if (stream_global->outgoing_sopb->nops == 0) { |
||||
stream_global->outgoing_sopb = NULL; |
||||
grpc_chttp2_schedule_closure(transport_global, |
||||
stream_global->send_done_closure, 1); |
||||
} else if (stream_global->outgoing_window > 0) { |
||||
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); |
||||
} |
||||
} |
||||
|
||||
/* for each grpc_chttp2_stream that wants to update its window, add that
|
||||
* window here */ |
||||
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, |
||||
&stream_global)) { |
||||
window_delta = |
||||
transport_global->settings[LOCAL_SETTINGS] |
||||
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - |
||||
stream_global->incoming_window; |
||||
if (!stream_global->read_closed && window_delta > 0) { |
||||
gpr_slice_buffer_add( |
||||
&transport_writing->outbuf, |
||||
grpc_chttp2_window_update_create(stream_global->id, window_delta)); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, |
||||
incoming_window, window_delta); |
||||
stream_global->incoming_window += window_delta; |
||||
grpc_chttp2_list_add_incoming_window_updated(transport_global, |
||||
stream_global); |
||||
} |
||||
} |
||||
|
||||
/* if the grpc_chttp2_transport is ready to send a window update, do so here
|
||||
* also */ |
||||
if (transport_global->incoming_window < |
||||
transport_global->connection_window_target * 3 / 4) { |
||||
window_delta = transport_global->connection_window_target - |
||||
transport_global->incoming_window; |
||||
gpr_slice_buffer_add(&transport_writing->outbuf, |
||||
grpc_chttp2_window_update_create(0, window_delta)); |
||||
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global, |
||||
incoming_window, window_delta); |
||||
transport_global->incoming_window += window_delta; |
||||
} |
||||
|
||||
return transport_writing->outbuf.count > 0 || |
||||
grpc_chttp2_list_have_writing_streams(transport_writing); |
||||
} |
||||
|
||||
void grpc_chttp2_perform_writes( |
||||
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) { |
||||
GPR_ASSERT(transport_writing->outbuf.count > 0 || |
||||
grpc_chttp2_list_have_writing_streams(transport_writing)); |
||||
|
||||
finalize_outbuf(transport_writing); |
||||
|
||||
GPR_ASSERT(transport_writing->outbuf.count > 0); |
||||
GPR_ASSERT(endpoint); |
||||
|
||||
switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, |
||||
transport_writing->outbuf.count, finish_write_cb, |
||||
transport_writing)) { |
||||
case GRPC_ENDPOINT_WRITE_DONE: |
||||
grpc_chttp2_terminate_writing(transport_writing, 1); |
||||
break; |
||||
case GRPC_ENDPOINT_WRITE_ERROR: |
||||
grpc_chttp2_terminate_writing(transport_writing, 0); |
||||
break; |
||||
case GRPC_ENDPOINT_WRITE_PENDING: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { |
||||
grpc_chttp2_stream_writing *stream_writing; |
||||
|
||||
while ( |
||||
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { |
||||
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, |
||||
stream_writing->send_closed != DONT_SEND_CLOSED, |
||||
stream_writing->id, &transport_writing->hpack_compressor, |
||||
&transport_writing->outbuf); |
||||
stream_writing->sopb.nops = 0; |
||||
if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) { |
||||
gpr_slice_buffer_add(&transport_writing->outbuf, |
||||
grpc_chttp2_rst_stream_create(stream_writing->id, |
||||
GRPC_CHTTP2_NO_ERROR)); |
||||
} |
||||
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); |
||||
} |
||||
} |
||||
|
||||
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { |
||||
grpc_chttp2_transport_writing *transport_writing = tw; |
||||
grpc_chttp2_terminate_writing(transport_writing, |
||||
write_status == GRPC_ENDPOINT_CB_OK); |
||||
} |
||||
|
||||
void grpc_chttp2_cleanup_writing( |
||||
grpc_chttp2_transport_global *transport_global, |
||||
grpc_chttp2_transport_writing *transport_writing) { |
||||
grpc_chttp2_stream_writing *stream_writing; |
||||
grpc_chttp2_stream_global *stream_global; |
||||
|
||||
while (grpc_chttp2_list_pop_written_stream( |
||||
transport_global, transport_writing, &stream_global, &stream_writing)) { |
||||
if (stream_writing->send_closed != DONT_SEND_CLOSED) { |
||||
stream_global->write_state = WRITE_STATE_SENT_CLOSE; |
||||
if (!transport_global->is_client) { |
||||
stream_global->read_closed = 1; |
||||
} |
||||
grpc_chttp2_list_add_read_write_state_changed(transport_global, |
||||
stream_global); |
||||
} |
||||
} |
||||
transport_writing->outbuf.count = 0; |
||||
transport_writing->outbuf.length = 0; |
||||
} |
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue