Merge github.com:grpc/grpc into shindig

pull/3423/head
Craig Tiller 9 years ago
commit 36c86e2a83
  1. 4
      src/core/client_config/connector.c
  2. 6
      src/core/client_config/connector.h
  3. 4
      src/core/client_config/subchannel.c
  4. 1
      src/core/client_config/subchannel.h
  5. 3
      src/core/iomgr/fd_posix.c
  6. 10
      src/core/iomgr/iomgr.c
  7. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  8. 20
      src/core/iomgr/udp_server.c
  9. 12
      src/core/iomgr/udp_server.h
  10. 2
      src/core/support/string.c
  11. 4
      src/core/surface/channel_create.c
  12. 36
      src/core/surface/secure_channel_create.c
  13. 4
      src/core/transport/chttp2/parsing.c
  14. 10
      test/core/iomgr/udp_server_test.c
  15. 92
      test/core/util/port_posix.c
  16. 1
      tools/jenkins/run_jenkins.sh
  17. 30
      tools/run_tests/port_server.py

@ -47,3 +47,7 @@ void grpc_connector_connect(grpc_connector *connector,
grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}
void grpc_connector_shutdown(grpc_connector *connector) {
connector->vtable->shutdown(connector);
}

@ -72,6 +72,9 @@ typedef struct {
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_connector *connector);
/** Implementation of grpc_connector_shutdown */
void (*shutdown)(grpc_connector *connector);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
@ -79,9 +82,12 @@ struct grpc_connector_vtable {
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
/** Connect using the connector: max one outstanding call at a time */
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_connector *connector);
#endif

@ -443,6 +443,10 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
if (cancel_alarm) {
grpc_alarm_cancel(&c->alarm);
}
if (op->disconnect) {
grpc_connector_shutdown(c->connector);
}
}
static void on_state_changed(void *p, int iomgr_success) {

@ -43,6 +43,7 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))

@ -222,10 +222,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
GPR_ASSERT(!fd->closed);
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure) {

@ -34,16 +34,18 @@
#include "src/core/iomgr/iomgr.h"
#include <stdlib.h>
#include <string.h>
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/support/string.h"
static gpr_mu g_mu;
static gpr_cv g_rcv;
static int g_shutdown;
@ -124,6 +126,8 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
memset(&g_root_object, 0, sizeof(g_root_object));
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();

@ -72,7 +72,7 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
to this pollset whilst adding, but that should be benign. */
GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
if (watcher.fd != NULL) {
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {

@ -94,9 +94,6 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
/* the overall server */
struct grpc_udp_server {
grpc_udp_server_cb cb;
void *cb_arg;
gpr_mu mu;
gpr_cv cv;
@ -132,8 +129,6 @@ grpc_udp_server *grpc_udp_server_create(void) {
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
s->cb = NULL;
s->cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
@ -236,6 +231,11 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) {
gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
strerror(errno));
}
get_local_ip = 1;
rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip));
@ -286,7 +286,7 @@ static void on_read(void *arg, int success) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg);
sp->read_cb(sp->fd);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
@ -305,7 +305,6 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb && "must add ports before starting server");
/* append it to the list under a lock */
if (s->nports == s->port_capacity) {
s->port_capacity *= 2;
@ -411,15 +410,10 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
}
void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets,
size_t pollset_count,
grpc_udp_server_cb new_transport_cb, void *cb_arg) {
size_t pollset_count) {
size_t i, j;
GPR_ASSERT(new_transport_cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb);
GPR_ASSERT(s->active_ports == 0);
s->cb = new_transport_cb;
s->cb_arg = cb_arg;
s->pollsets = pollsets;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {

@ -39,21 +39,15 @@
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* New server callback: ep is the newly connected connection */
typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep);
/* Called when data is available to read from the socket. */
typedef void (*grpc_udp_server_read_cb)(int fd,
grpc_udp_server_cb new_transport_cb,
void *cb_arg);
typedef void (*grpc_udp_server_read_cb)(int fd);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */
void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets,
size_t pollset_count, grpc_udp_server_cb cb,
void *cb_arg);
void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets,
size_t pollset_count);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);

@ -101,7 +101,7 @@ static void asciidump(dump_out *out, const char *buf, size_t len) {
dump_out_append(out, '\'');
}
for (cur = beg; cur != end; ++cur) {
dump_out_append(out, isprint(*cur) ? *(char *)cur : '.');
dump_out_append(out, (char)(isprint(*cur) ? *(char *)cur : '.'));
}
if (!out_was_empty) {
dump_out_append(out, '\'');

@ -89,6 +89,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
notify->cb(notify->cb_arg, 1);
}
static void connector_shutdown(grpc_connector *con) {}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
@ -105,7 +107,7 @@ static void connector_connect(grpc_connector *con,
}
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_connect};
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;

@ -61,6 +61,9 @@ typedef struct {
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
gpr_mu mu;
grpc_endpoint *connecting_endpoint;
} connector;
static void connector_ref(grpc_connector *con) {
@ -81,10 +84,20 @@ static void on_secure_transport_setup_done(void *arg,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (status != GRPC_SECURITY_OK) {
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
gpr_mu_unlock(&c->mu);
} else if (status != GRPC_SECURITY_OK) {
GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
memset(c->result, 0, sizeof(*c->result));
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
} else {
GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, secure_endpoint, c->args.metadata_context,
c->args.workqueue, 1);
@ -103,6 +116,10 @@ static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting_endpoint == NULL);
c->connecting_endpoint = tcp;
gpr_mu_unlock(&c->mu);
grpc_setup_secure_transport(&c->security_connector->base, tcp,
on_secure_transport_setup_done, c);
} else {
@ -113,6 +130,18 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
}
static void connector_shutdown(grpc_connector *con) {
connector *c = (connector *)con;
grpc_endpoint *ep;
gpr_mu_lock(&c->mu);
ep = c->connecting_endpoint;
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
if (ep) {
grpc_endpoint_shutdown(ep);
}
}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
@ -123,13 +152,16 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting_endpoint == NULL);
gpr_mu_unlock(&c->mu);
grpc_tcp_client_connect(connected, c, args->interested_parties,
args->workqueue, args->addr, args->addr_len,
args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_connect};
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;

@ -486,7 +486,7 @@ static int init_skip_frame_parser(
transport_parsing->hpack_parser.on_header_user_data = NULL;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof =
is_eoh ? transport_parsing->header_eof : 0;
(gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0);
} else {
transport_parsing->parser = skip_parser;
}
@ -696,7 +696,7 @@ static int init_header_frame_parser(
transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof =
is_eoh ? transport_parsing->header_eof : 0;
(gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0);
if (!is_continuation && (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser);

@ -49,9 +49,7 @@ static grpc_pollset g_pollset;
static int g_number_of_reads = 0;
static int g_number_of_bytes_read = 0;
static void on_connect(void *arg, grpc_endpoint *udp) {}
static void on_read(int fd, grpc_udp_server_cb new_transport_cb, void *cb_arg) {
static void on_read(int fd) {
char read_buffer[512];
ssize_t byte_count;
@ -73,7 +71,7 @@ static void test_no_op(void) {
static void test_no_op_with_start(void) {
grpc_udp_server *s = grpc_udp_server_create();
LOG_TEST("test_no_op_with_start");
grpc_udp_server_start(s, NULL, 0, on_connect, NULL);
grpc_udp_server_start(s, NULL, 0);
grpc_udp_server_destroy(s, NULL, NULL);
}
@ -100,7 +98,7 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read));
grpc_udp_server_start(s, NULL, 0, on_connect, NULL);
grpc_udp_server_start(s, NULL, 0);
grpc_udp_server_destroy(s, NULL, NULL);
}
@ -130,7 +128,7 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(addr_len <= sizeof(addr));
pollsets[0] = &g_pollset;
grpc_udp_server_start(s, pollsets, 1, on_connect, NULL);
grpc_udp_server_start(s, pollsets, 1);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));

@ -47,6 +47,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/httpcli/httpcli.h"
#include "src/core/support/env.h"
@ -66,7 +67,70 @@ static int has_port_been_chosen(int port) {
return 0;
}
static void free_chosen_ports() { gpr_free(chosen_ports); }
typedef struct freereq {
grpc_pollset pollset;
int done;
} freereq;
static void destroy_pollset_and_shutdown(void *p) {
grpc_pollset_destroy(p);
grpc_shutdown();
}
static void freed_port_from_server(void *arg,
const grpc_httpcli_response *response) {
freereq *pr = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset));
pr->done = 1;
grpc_pollset_kick(&pr->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset));
}
static void free_port_using_server(char *server, int port) {
grpc_httpcli_context context;
grpc_httpcli_request req;
freereq pr;
char *path;
grpc_init();
memset(&pr, 0, sizeof(pr));
memset(&req, 0, sizeof(req));
grpc_pollset_init(&pr.pollset);
req.host = server;
gpr_asprintf(&path, "/drop/%d", port);
req.path = path;
grpc_httpcli_context_init(&context);
grpc_httpcli_get(&context, &pr.pollset, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), freed_port_from_server,
&pr);
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (!pr.done) {
grpc_pollset_worker worker;
grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
grpc_httpcli_context_destroy(&context);
grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset);
gpr_free(path);
}
static void free_chosen_ports() {
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
if (env != NULL) {
size_t i;
for (i = 0; i < num_chosen_ports; i++) {
free_port_using_server(env, chosen_ports[i]);
}
gpr_free(env);
}
gpr_free(chosen_ports);
}
static void chose_port(int port) {
if (chosen_ports == NULL) {
@ -131,6 +195,9 @@ static int is_port_available(int *port, int is_tcp) {
typedef struct portreq {
grpc_pollset pollset;
int port;
int retries;
char *server;
grpc_httpcli_context *ctx;
} portreq;
static void got_port_from_server(void *arg,
@ -138,6 +205,19 @@ static void got_port_from_server(void *arg,
size_t i;
int port = 0;
portreq *pr = arg;
if (!response || response->status != 200) {
grpc_httpcli_request req;
memset(&req, 0, sizeof(req));
GPR_ASSERT(pr->retries < 10);
pr->retries++;
req.host = pr->server;
req.path = "/get";
gpr_log(GPR_DEBUG, "failed port pick from server: retrying");
sleep(1);
grpc_httpcli_get(pr->ctx, &pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
got_port_from_server, pr);
return;
}
GPR_ASSERT(response);
GPR_ASSERT(response->status == 200);
for (i = 0; i < response->body_length; i++) {
@ -151,11 +231,6 @@ static void got_port_from_server(void *arg,
gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset));
}
static void destroy_pollset_and_shutdown(void *p) {
grpc_pollset_destroy(p);
grpc_shutdown();
}
static int pick_port_using_server(char *server) {
grpc_httpcli_context context;
grpc_httpcli_request req;
@ -167,6 +242,8 @@ static int pick_port_using_server(char *server) {
memset(&req, 0, sizeof(req));
grpc_pollset_init(&pr.pollset);
pr.port = -1;
pr.server = server;
pr.ctx = &context;
req.host = server;
req.path = "/get";
@ -211,8 +288,9 @@ int grpc_pick_unused_port(void) {
int port = pick_port_using_server(env);
gpr_free(env);
if (port != 0) {
return port;
chose_port(port);
}
return port;
}
for (;;) {

@ -79,7 +79,6 @@ then
-e "config=$config" \
-e "language=$language" \
-e "arch=$arch" \
-e "GRPC_ZOOKEEPER_SERVER_TEST=grpc-jenkins-master:2181" \
-e CCACHE_DIR=/tmp/ccache \
-i \
-v "$git_root:/var/local/jenkins/grpc" \

@ -37,6 +37,7 @@ import os
import socket
import sys
import time
import yaml
argp = argparse.ArgumentParser(description='Server for httpcli_test')
argp.add_argument('-p', '--port', default=12345, type=int)
@ -51,16 +52,17 @@ with open(__file__) as f:
_MY_VERSION = hashlib.sha1(f.read()).hexdigest()
def refill_pool():
def refill_pool(max_timeout):
"""Scan for ports not marked for being in use"""
for i in range(10000, 65000):
for i in range(1025, 32767):
if len(pool) > 100: break
if i in in_use:
age = time.time() - in_use[i]
if age < 600:
if age < max_timeout:
continue
del in_use[i]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(('localhost', i))
pool.append(i)
@ -73,8 +75,12 @@ def refill_pool():
def allocate_port():
global pool
global in_use
if not pool:
refill_pool()
max_timeout = 600
while not pool:
refill_pool(max_timeout)
if not pool:
time.sleep(1)
max_timeout /= 2
port = pool[0]
pool = pool[1:]
in_use[port] = time.time()
@ -97,12 +103,26 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
p = allocate_port()
self.log_message('allocated port %d' % p)
self.wfile.write('%d' % p)
elif self.path[0:6] == '/drop/':
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
p = int(self.path[6:])
del in_use[p]
pool.append(p)
self.log_message('drop port %d' % p)
elif self.path == '/version':
# fetch a version string and the current process pid
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(_MY_VERSION)
elif self.path == '/dump':
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
now = time.time()
self.wfile.write(yaml.dump({'pool': pool, 'in_use': dict((k, now - v) for k, v in in_use.iteritems())}))
elif self.path == '/quit':
self.send_response(200)
self.end_headers()

Loading…
Cancel
Save