Introduce the (outside-of-iomgr) pollset API.

This CL introduces the public side of this interface. There will need to be an
iomgr-private API also, but this will be a per-implementation API and so is not
covered here.

I've taken care of wiring the interface through the codebase in the manner that
I expect it will be used.
	Change on 2014/12/17 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82376987
pull/1/merge
ctiller 11 years ago committed by Michael Lumish
parent a4b6f5df94
commit d79b4865ce
  1. 2
      Makefile
  2. 2
      build.json
  3. 2
      src/core/channel/call_op_string.c
  4. 3
      src/core/channel/channel_stack.h
  5. 3
      src/core/channel/client_channel.c
  6. 1
      src/core/channel/connected_channel.c
  7. 4
      src/core/iomgr/endpoint.c
  8. 6
      src/core/iomgr/endpoint.h
  9. 37
      src/core/iomgr/pollset.c
  10. 51
      src/core/iomgr/pollset.h
  11. 11
      src/core/iomgr/tcp_posix.c
  12. 9
      src/core/security/secure_endpoint.c
  13. 2
      src/core/surface/call.c
  14. 8
      src/core/surface/completion_queue.c
  15. 3
      src/core/surface/completion_queue.h
  16. 2
      src/core/surface/server.c
  17. 13
      src/core/transport/chttp2_transport.c
  18. 5
      src/core/transport/transport.c
  19. 12
      src/core/transport/transport.h
  20. 3
      src/core/transport/transport_impl.h
  21. 3
      vsprojects/vs2013/grpc.vcxproj

@ -889,6 +889,7 @@ LIBGRPC_SRC = \
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/iomgr_libevent.c \
src/core/iomgr/iomgr_libevent_use_threads.c \
src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \
@ -2018,6 +2019,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/iomgr_libevent.c \
src/core/iomgr/iomgr_libevent_use_threads.c \
src/core/iomgr/pollset.c \
src/core/iomgr/resolve_address_posix.c \
src/core/iomgr/sockaddr_utils.c \
src/core/iomgr/socket_utils_common_posix.c \

@ -119,6 +119,7 @@
"src/core/iomgr/endpoint_pair_posix.c",
"src/core/iomgr/iomgr_libevent.c",
"src/core/iomgr/iomgr_libevent_use_threads.c",
"src/core/iomgr/pollset.c",
"src/core/iomgr/resolve_address_posix.c",
"src/core/iomgr/sockaddr_utils.c",
"src/core/iomgr/socket_utils_common_posix.c",
@ -215,6 +216,7 @@
"src/core/iomgr/iomgr_completion_queue_interface.h",
"src/core/iomgr/iomgr.h",
"src/core/iomgr/iomgr_libevent.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/resolve_address.h",
"src/core/iomgr/sockaddr.h",
"src/core/iomgr/sockaddr_posix.h",

@ -107,7 +107,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
op->data.deadline.tv_nsec);
break;
case GRPC_SEND_START:
bprintf(&b, "SEND_START");
bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset);
break;
case GRPC_SEND_MESSAGE:
bprintf(&b, "SEND_MESSAGE");

@ -108,6 +108,9 @@ typedef struct {
/* Argument data, matching up with grpc_call_op_type names */
union {
struct {
grpc_pollset *pollset;
} start;
grpc_byte_buffer *message;
grpc_mdelem *metadata;
gpr_timespec deadline;

@ -98,6 +98,7 @@ struct call_data {
void (*on_complete)(void *user_data, grpc_op_error error);
void *on_complete_user_data;
gpr_uint32 start_flags;
grpc_pollset *pollset;
} waiting;
} s;
};
@ -186,6 +187,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->s.waiting.on_complete = op->done_cb;
calld->s.waiting.on_complete_user_data = op->user_data;
calld->s.waiting.start_flags = op->flags;
calld->s.waiting.pollset = op->data.start.pollset;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@ -523,6 +525,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);

@ -132,6 +132,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
op->user_data);
break;
case GRPC_SEND_START:
grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;

@ -44,6 +44,10 @@ grpc_endpoint_write_status grpc_endpoint_write(
return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
ep->vtable->add_to_pollset(ep, pollset);
}
void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }

@ -34,6 +34,7 @@
#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
#define __GRPC_INTERNAL_IOMGR_ENDPOINT_H__
#include "src/core/iomgr/pollset.h"
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
@ -69,6 +70,7 @@ struct grpc_endpoint_vtable {
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
void *user_data, gpr_timespec deadline);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
};
@ -92,6 +94,10 @@ grpc_endpoint_write_status grpc_endpoint_write(
void grpc_endpoint_shutdown(grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};

@ -0,0 +1,37 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/pollset.h"
void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
void grpc_pollset_destroy(grpc_pollset *pollset) {}

@ -0,0 +1,51 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
so that it can find new calls to service
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
/* Eventually different implementations of iomgr will provide their own
grpc_pollset structs. As this is just a dummy wrapper to get the API in,
we just define a simple type here. */
typedef struct { char unused; } grpc_pollset;
void grpc_pollset_init(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */

@ -538,9 +538,14 @@ static grpc_endpoint_write_status grpc_tcp_write(
return status;
}
static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read,
grpc_tcp_write, grpc_tcp_shutdown,
grpc_tcp_destroy};
static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
/* tickle the pollset so we crash if things aren't wired correctly */
pollset->unused++;
}
static const grpc_endpoint_vtable vtable = {
grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
grpc_tcp_shutdown, grpc_tcp_destroy};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));

@ -325,8 +325,15 @@ static void endpoint_unref(grpc_endpoint *secure_ep) {
secure_endpoint_unref(ep);
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
grpc_pollset *pollset) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
}
static const grpc_endpoint_vtable vtable = {
endpoint_notify_on_read, endpoint_write, endpoint_shutdown, endpoint_unref};
endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
endpoint_shutdown, endpoint_unref};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,

@ -409,6 +409,7 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_invoke;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
elem = CALL_ELEM_FROM_CALL(call, 0);
@ -480,6 +481,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = do_nothing;
op.data.start.pollset = grpc_cq_pollset(call->cq);
op.user_data = NULL;
elem = CALL_ELEM_FROM_CALL(call, 0);

@ -66,6 +66,8 @@ struct grpc_completion_queue {
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
/* the set of low level i/o things that concern this cq */
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
int shutdown;
/* Head of a linked list of queued events (prev points to the last element) */
@ -87,6 +89,7 @@ grpc_completion_queue *grpc_completion_queue_create() {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
@ -367,6 +370,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
GPR_ASSERT(cc->queue == NULL);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
@ -392,3 +396,7 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
gpr_log(GPR_INFO, "pending ops:%s", tmp);
#endif
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}

@ -36,6 +36,7 @@
/* Internal API for completion channels */
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
/* A finish func is executed whenever the event consumer calls
@ -101,4 +102,6 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
#endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */

@ -491,6 +491,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters[i] = &grpc_connected_channel_filter;
grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(

@ -1696,14 +1696,23 @@ static void run_callbacks(transport *t) {
}
}
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
transport *t = (transport *)gt;
lock(t);
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
unlock(t);
}
/*
* INTEGRATION GLUE
*/
static const grpc_transport_vtable vtable = {
sizeof(stream), init_stream, send_batch, set_allow_window_updates,
destroy_stream, abort_stream, goaway, close_transport, send_ping,
destroy_transport};
add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,

@ -66,6 +66,11 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport,
transport->vtable->set_allow_window_updates(transport, stream, allow);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset) {
transport->vtable->add_to_pollset(transport, pollset);
}
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream) {
transport->vtable->destroy_stream(transport, stream);

@ -36,6 +36,7 @@
#include <stddef.h>
#include "src/core/iomgr/pollset.h"
#include "src/core/transport/stream_op.h"
/* forward declarations */
@ -202,15 +203,18 @@ void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
grpc_status_code status);
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(struct grpc_transport *transport,
grpc_status_code status, gpr_slice debug_data);
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
/* Close a transport. Aborts all open streams. */
void grpc_transport_close(struct grpc_transport *transport);
void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(struct grpc_transport *transport);
void grpc_transport_destroy(grpc_transport *transport);
/* Return type for grpc_transport_setup_callback */
typedef struct grpc_transport_setup_result {

@ -53,6 +53,9 @@ typedef struct grpc_transport_vtable {
void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
int allow);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);

@ -104,6 +104,7 @@
<ClInclude Include="..\..\src\core\iomgr\iomgr_completion_queue_interface.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr_libevent.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr_posix.h" />
@ -209,6 +210,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_libevent_use_threads.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\resolve_address_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\sockaddr_utils.c">

Loading…
Cancel
Save