Merge github.com:grpc/grpc into tis-but-thy-name

pull/2308/head
Craig Tiller 10 years ago
commit bdfd8976e1
  1. 4
      include/grpc/byte_buffer.h
  2. 5
      src/core/iomgr/fd_posix.c
  3. 26
      src/core/iomgr/pollset_multipoller_with_epoll.c
  4. 14
      src/core/surface/byte_buffer.c
  5. 3
      src/core/transport/chttp2/parsing.c
  6. 1
      src/core/transport/chttp2_transport.c
  7. 2
      src/ruby/ext/grpc/extconf.rb
  8. 25
      test/core/surface/byte_buffer_reader_test.c
  9. 35
      test/cpp/qps/server_async.cc

@ -102,6 +102,10 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice);
/** Returns a RAW byte buffer instance from the output of \a reader. */
grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer_reader *reader);
#ifdef __cplusplus
}
#endif

@ -369,16 +369,17 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->fd = NULL;
watcher->pollset = NULL;
gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll");
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}

@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */
if (errno != EEXIST) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
grpc_fd_watcher watcher;
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
if (watcher.fd != NULL) {
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */
if (errno != EEXIST) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
}
}
}
grpc_fd_end_poll(&watcher, 0, 0);
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,

@ -55,6 +55,20 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
return bb;
}
grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer_reader *reader) {
grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
gpr_slice slice;
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = GRPC_COMPRESS_NONE;
gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
while (grpc_byte_buffer_reader_next(reader, &slice)) {
gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice);
}
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:

@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads(
transport_parsing->incoming_stream_id;
}
/* TODO(ctiller): re-implement */
GPR_ASSERT(transport_parsing->initial_window_update == 0);
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);

@ -933,6 +933,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
t->parsing.initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);

@ -89,7 +89,7 @@ $CFLAGS << ' -Wno-return-type '
$CFLAGS << ' -Wall '
$CFLAGS << ' -pedantic '
$LDFLAGS << ' -lgrpc -lgpr -ldl'
$LDFLAGS << ' -lgrpc -lgpr -lz -ldl'
crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy')
have_library('grpc', 'grpc_channel_destroy')

@ -160,6 +160,30 @@ static void test_read_deflate_compressed_slice(void) {
read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
}
static void test_byte_buffer_from_reader(void) {
gpr_slice slice;
grpc_byte_buffer *buffer, *buffer_from_reader;
grpc_byte_buffer_reader reader;
LOG_TEST("test_byte_buffer_from_reader");
slice = gpr_slice_malloc(4);
memcpy(GPR_SLICE_START_PTR(slice), "test", 4);
buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
buffer_from_reader = grpc_raw_byte_buffer_from_reader(&reader);
GPR_ASSERT(buffer->type == buffer_from_reader->type);
GPR_ASSERT(buffer_from_reader->data.raw.compression == GRPC_COMPRESS_NONE);
GPR_ASSERT(buffer_from_reader->data.raw.slice_buffer.count == 1);
GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(
buffer_from_reader->data.raw.slice_buffer.slices[0]),
"test", 4) == 0);
grpc_byte_buffer_destroy(buffer);
grpc_byte_buffer_destroy(buffer_from_reader);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
@ -167,6 +191,7 @@ int main(int argc, char **argv) {
test_read_none_compressed_slice();
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
test_byte_buffer_from_reader();
return 0;
}

@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
AsyncQpsServerTest(const ServerConfig &config, int port) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
@ -96,6 +96,9 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
for (int i = 0; i < config.threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
std::unique_lock<std::mutex> g(shutdown_mutex_);
if (!shutdown_) {
if (!shutdown_state_[i]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
g.unlock();
ctx->Reset();
}
} else {
@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server {
}
~AsyncQpsServerTest() {
server_->Shutdown();
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
(*ss)->set_shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server {
TestService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
bool shutdown_;
class PerThreadShutdownState {
public:
PerThreadShutdownState() : shutdown_(false) {}
bool shutdown() const {
std::lock_guard<std::mutex> lock(mutex_);
return shutdown_;
}
void set_shutdown() {
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
private:
mutable std::mutex mutex_;
bool shutdown_;
};
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,

Loading…
Cancel
Save