pull/2149/head
Craig Tiller 10 years ago
parent 1911c3b2c4
commit 3208e3922a
  1. 14
      src/core/transport/chttp2/internal.h
  2. 90
      src/core/transport/chttp2/writing.c
  3. 100
      src/core/transport/chttp2_transport.c

@ -35,6 +35,16 @@
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
#include "src/core/transport/chttp2/frame_ping.h"
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/stream_encoder.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@ -336,4 +346,8 @@ struct grpc_chttp2_stream {
grpc_stream_op_buffer callback_sopb;
};
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
#endif

@ -32,3 +32,93 @@
*/
#include "src/core/transport/chttp2/internal.h"
#include <grpc/support/log.h>
static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
gpr_uint32 window_delta;
/* don't do anything if we are already writing */
if (t->writing.executing) {
return;
}
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf);
GPR_ASSERT(t->global.qbuf.count == 0);
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
&t->writing.outbuf, grpc_chttp2_settings_create(
t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
t->force_send_settings = 0;
t->dirtied_local_settings = 0;
t->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb);
FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
if (!t->is_client && !s->read_closed) {
s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
s->writing.send_closed = SEND_CLOSED;
}
}
if (s->writing.sopb.nops > 0 || s->writing.send_closed) {
stream_list_join(t, s, WRITING);
}
/* we should either exhaust window or have no ops left, but not both */
if (s->outgoing_sopb->nops == 0) {
s->outgoing_sopb = NULL;
schedule_cb(t, s->global.send_done_closure, 1);
} else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
}
if (!t->parsing.executing) {
/* for each grpc_chttp2_stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
s->incoming_window;
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->writing.outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}
}
if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) {
t->writing.executing = 1;
ref_transport(t);
gpr_log(GPR_DEBUG, "schedule write");
schedule_cb(t, &t->writing.action, 1);
}
}

@ -39,17 +39,8 @@
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
#include "src/core/transport/chttp2/frame_ping.h"
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/http2_errors.h"
#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/transport_impl.h"
@ -90,7 +81,6 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
static void unlock_check_writes(grpc_chttp2_transport* t);
static void unlock_check_cancellations(grpc_chttp2_transport* t);
static void unlock_check_parser(grpc_chttp2_transport* t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport* t);
@ -535,7 +525,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
unlock_check_writes(t);
grpc_chttp2_unlocking_check_writes(t);
unlock_check_cancellations(t);
unlock_check_parser(t);
unlock_check_channel_callbacks(t);
@ -571,94 +561,6 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
static void unlock_check_writes(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
gpr_uint32 window_delta;
/* don't do anything if we are already writing */
if (t->writing.executing) {
return;
}
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf);
GPR_ASSERT(t->global.qbuf.count == 0);
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
&t->writing.outbuf, grpc_chttp2_settings_create(
t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
t->force_send_settings = 0;
t->dirtied_local_settings = 0;
t->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb);
FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
if (!t->is_client && !s->read_closed) {
s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
s->writing.send_closed = SEND_CLOSED;
}
}
if (s->writing.sopb.nops > 0 || s->writing.send_closed) {
stream_list_join(t, s, WRITING);
}
/* we should either exhaust window or have no ops left, but not both */
if (s->outgoing_sopb->nops == 0) {
s->outgoing_sopb = NULL;
schedule_cb(t, s->global.send_done_closure, 1);
} else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
}
if (!t->parsing.executing) {
/* for each grpc_chttp2_stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
s->incoming_window;
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->writing.outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}
}
if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) {
t->writing.executing = 1;
ref_transport(t);
gpr_log(GPR_DEBUG, "schedule write");
schedule_cb(t, &t->writing.action, 1);
}
}
static void writing_finalize_outbuf(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;

Loading…
Cancel
Save