Merge pull request #10 from markdroth/http_proxy_patch

Provide a cleaner API for proxy_connection_failed().
reviewable/pr13579/r5
Sree Kuchibhotla 7 years ago committed by GitHub
commit 45abfe104c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 138
      test/core/end2end/fixtures/http_proxy_fixture.cc

@ -68,15 +68,6 @@ struct grpc_end2end_http_proxy {
// Connection handling // Connection handling
// //
#define SERVER_EP_READ_FAIL 1
#define SERVER_EP_WRITE_FAIL 2
#define CLIENT_EP_READ_FAIL 4
#define CLIENT_EP_WRITE_FAIL 8
#define SERVER_EP_FAIL (SERVER_EP_READ_FAIL | SERVER_EP_WRITE_FAIL)
#define CLIENT_EP_FAIL (CLIENT_EP_READ_FAIL | CLIENT_EP_WRITE_FAIL)
#define EP_FAIL (SERVER_EP_FAIL | CLIENT_EP_FAIL)
// proxy_connection structure is only accessed in the closures which are all // proxy_connection structure is only accessed in the closures which are all
// scheduled under the same combiner lock. So there is is no need for a mutex to // scheduled under the same combiner lock. So there is is no need for a mutex to
// protect this structure. // protect this structure.
@ -88,13 +79,6 @@ typedef struct proxy_connection {
gpr_refcount refcount; gpr_refcount refcount;
// The lower four bits store the endpoint failure information
// bit-0 Server endpoint read failure
// bit-1 Server endpoint write failure
// bit-2 Client endpoint read failure
// bit-3 Client endpoint write failure
int ep_state;
grpc_pollset_set* pollset_set; grpc_pollset_set* pollset_set;
// NOTE: All the closures execute under proxy->combiner lock. Which means // NOTE: All the closures execute under proxy->combiner lock. Which means
@ -107,6 +91,13 @@ typedef struct proxy_connection {
grpc_closure on_server_read_done; grpc_closure on_server_read_done;
grpc_closure on_server_write_done; grpc_closure on_server_write_done;
bool client_read_failed : 1;
bool client_write_failed : 1;
bool client_shutdown : 1;
bool server_read_failed : 1;
bool server_write_failed : 1;
bool server_shutdown : 1;
grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_read_buffer;
grpc_slice_buffer client_deferred_write_buffer; grpc_slice_buffer client_deferred_write_buffer;
bool client_is_writing; bool client_is_writing;
@ -150,36 +141,52 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
} }
} }
enum failure_type {
SETUP_FAILED, // To be used before we start proxying.
CLIENT_READ_FAILED,
CLIENT_WRITE_FAILED,
SERVER_READ_FAILED,
SERVER_WRITE_FAILED,
};
// Helper function to shut down the proxy connection. // Helper function to shut down the proxy connection.
// Does NOT take ownership of a reference to error.
static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
proxy_connection* conn, int failure_type, proxy_connection* conn,
const char* prefix, grpc_error* error) { failure_type failure, const char* prefix,
const char* msg = grpc_error_string(error); grpc_error* error) {
gpr_log(GPR_INFO, "%s: %s", prefix, msg); gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
int ep_state = conn->ep_state; // Decide whether we should shut down the client and server.
int new_ep_state = ep_state | failure_type; bool shutdown_client = false;
if (ep_state != new_ep_state) { bool shutdown_server = false;
conn->ep_state = new_ep_state; if (failure == SETUP_FAILED) {
// Shutdown the endpoint (client and/or server) if both read and write shutdown_client = true;
// failures are observed after setting the failure_type. shutdown_server = true;
// To prevent calling endpoint shutdown multiple times, It is important to } else {
// ensure that ep_state i.e the old state, did not already have both if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
// failures set. (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
if (((ep_state & SERVER_EP_FAIL) != SERVER_EP_FAIL) && (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
((new_ep_state & SERVER_EP_FAIL) == SERVER_EP_FAIL)) { shutdown_client = true;
if (conn->server_endpoint != nullptr) {
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
GRPC_ERROR_REF(error));
}
} }
if (((ep_state & CLIENT_EP_FAIL) != CLIENT_EP_FAIL) && if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
((new_ep_state & CLIENT_EP_FAIL) == CLIENT_EP_FAIL)) { (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
GRPC_ERROR_REF(error)); shutdown_server = true;
} }
} }
// If we decided to shut down either one and have not yet done so, do so.
if (shutdown_client && !conn->client_shutdown) {
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
GRPC_ERROR_REF(error));
conn->client_shutdown = true;
}
if (shutdown_server && !conn->server_shutdown) {
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
GRPC_ERROR_REF(error));
conn->server_shutdown = true;
}
// Unref the connection.
proxy_connection_unref(exec_ctx, conn, "conn_failed"); proxy_connection_unref(exec_ctx, conn, "conn_failed");
GRPC_ERROR_UNREF(error);
} }
// Callback for writing proxy data to the client. // Callback for writing proxy data to the client.
@ -188,8 +195,8 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg; proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false; conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, CLIENT_EP_WRITE_FAIL, proxy_connection_failed(exec_ctx, conn, CLIENT_WRITE_FAILED,
"HTTP proxy client write", error); "HTTP proxy client write", GRPC_ERROR_REF(error));
return; return;
} }
// Clear write buffer (the data we just wrote). // Clear write buffer (the data we just wrote).
@ -215,8 +222,8 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg; proxy_connection* conn = (proxy_connection*)arg;
conn->server_is_writing = false; conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, SERVER_EP_WRITE_FAIL, proxy_connection_failed(exec_ctx, conn, SERVER_WRITE_FAILED,
"HTTP proxy server write", error); "HTTP proxy server write", GRPC_ERROR_REF(error));
return; return;
} }
// Clear write buffer (the data we just wrote). // Clear write buffer (the data we just wrote).
@ -244,11 +251,8 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg,
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
// Report a read failure on the client endpoint. If there is no pending // Report a read failure on the client endpoint. If there is no pending
// server write, then shutdown the server endpoint as well. // server write, then shutdown the server endpoint as well.
proxy_connection_failed( proxy_connection_failed(exec_ctx, conn, CLIENT_READ_FAILED,
exec_ctx, conn, "HTTP proxy client read", GRPC_ERROR_REF(error));
(conn->server_is_writing ? CLIENT_EP_READ_FAIL
: (CLIENT_EP_READ_FAIL | SERVER_EP_FAIL)),
"HTTP proxy client read", error);
return; return;
} }
// If there is already a pending write (i.e., server_write_buffer is // If there is already a pending write (i.e., server_write_buffer is
@ -282,11 +286,8 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
// Report a read failure on the server end point. If there is no pending // Report a read failure on the server end point. If there is no pending
// write to the client, then shutdown the client endpoint as well. // write to the client, then shutdown the client endpoint as well.
proxy_connection_failed( proxy_connection_failed(exec_ctx, conn, SERVER_READ_FAILED,
exec_ctx, conn, "HTTP proxy server read", GRPC_ERROR_REF(error));
(conn->client_is_writing ? SERVER_EP_READ_FAIL
: (SERVER_EP_READ_FAIL | CLIENT_EP_FAIL)),
"HTTP proxy server read", error);
return; return;
} }
// If there is already a pending write (i.e., client_write_buffer is // If there is already a pending write (i.e., client_write_buffer is
@ -318,8 +319,8 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg; proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false; conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, EP_FAIL, proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
"HTTP proxy write response", error); "HTTP proxy write response", GRPC_ERROR_REF(error));
return; return;
} }
// Clear write buffer. // Clear write buffer.
@ -347,8 +348,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
// connection failed. However, for the purposes of this test code, // connection failed. However, for the purposes of this test code,
// it's fine to pretend this is a client-side error, which will // it's fine to pretend this is a client-side error, which will
// cause the client connection to be dropped. // cause the client connection to be dropped.
proxy_connection_failed(exec_ctx, conn, EP_FAIL, proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
"HTTP proxy server connect", error); "HTTP proxy server connect", GRPC_ERROR_REF(error));
return; return;
} }
// We've established a connection, so send back a 200 response code to // We've established a connection, so send back a 200 response code to
@ -397,8 +398,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
grpc_error_string(error)); grpc_error_string(error));
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy read request", proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
error); "HTTP proxy read request", GRPC_ERROR_REF(error));
return; return;
} }
// Read request and feed it to the parser. // Read request and feed it to the parser.
@ -407,8 +408,9 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
error = grpc_http_parser_parse( error = grpc_http_parser_parse(
&conn->http_parser, conn->client_read_buffer.slices[i], nullptr); &conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, EP_FAIL, proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
"HTTP proxy request parse", error); "HTTP proxy request parse",
GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} }
@ -428,8 +430,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
conn->http_request.method); conn->http_request.method);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg); gpr_free(msg);
proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy read request", proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
error); "HTTP proxy read request", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} }
@ -449,8 +451,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
if (!client_authenticated) { if (!client_authenticated) {
const char* msg = "HTTP Connect could not verify authentication"; const char* msg = "HTTP Connect could not verify authentication";
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
proxy_connection_failed(exec_ctx, conn, EP_FAIL, proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
"HTTP proxy read request", error); "HTTP proxy read request", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} }
@ -460,8 +462,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
error = grpc_blocking_resolve_address(conn->http_request.path, "80", error = grpc_blocking_resolve_address(conn->http_request.path, "80",
&resolved_addresses); &resolved_addresses);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy DNS lookup", proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
error); "HTTP proxy DNS lookup", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} }

Loading…
Cancel
Save