diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index daf023958c9..3953687c261 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -72,10 +72,21 @@ struct grpc_end2end_http_proxy { // Connection handling // -typedef struct connection_data { +typedef struct proxy_connection { grpc_endpoint* client_endpoint; grpc_endpoint* server_endpoint; + gpr_refcount refcount; + bool client_shutdown; + bool server_shutdown; + bool client_write_pending; + bool server_write_pending; + +size_t client_bytes_read; +size_t client_bytes_written; +size_t server_bytes_read; +size_t server_bytes_written; + grpc_pollset_set* pollset_set; grpc_closure on_read_request_done; @@ -94,229 +105,304 @@ typedef struct connection_data { grpc_http_parser http_parser; grpc_http_request http_request; - grpc_end2end_http_proxy* proxy; - - gpr_refcount refcount; -} connection_data; + grpc_end2end_http_proxy* proxy; // Does not own. +} proxy_connection; // Helper function to destroy the proxy connection. -static void connection_data_destroy(grpc_exec_ctx* exec_ctx, - connection_data* cd) { - cd->proxy->shutdown = true; - grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); - if (cd->server_endpoint != NULL) - grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); - grpc_pollset_set_destroy(cd->pollset_set); - gpr_slice_buffer_destroy(&cd->client_read_buffer); - gpr_slice_buffer_destroy(&cd->client_write_buffer); - gpr_slice_buffer_destroy(&cd->server_read_buffer); - gpr_slice_buffer_destroy(&cd->server_write_buffer); - grpc_http_parser_destroy(&cd->http_parser); - grpc_http_request_destroy(&cd->http_request); - gpr_free(cd); +static void proxy_connection_destroy(grpc_exec_ctx* exec_ctx, + proxy_connection* conn) { +gpr_log(GPR_INFO, "==> %s()", __func__); +gpr_log(GPR_INFO, "client_bytes_read=%lu", conn->client_bytes_read); +gpr_log(GPR_INFO, "server_bytes_written=%lu", conn->server_bytes_written); +gpr_log(GPR_INFO, "server_bytes_read=%lu", conn->server_bytes_read); +gpr_log(GPR_INFO, "client_bytes_written=%lu", conn->client_bytes_written); + // Tell the server to shut down when this connection is closed. + conn->proxy->shutdown = true; + grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); + if (conn->server_endpoint != NULL) + grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); + grpc_pollset_set_destroy(conn->pollset_set); + gpr_slice_buffer_destroy(&conn->client_read_buffer); + gpr_slice_buffer_destroy(&conn->client_write_buffer); + gpr_slice_buffer_destroy(&conn->server_read_buffer); + gpr_slice_buffer_destroy(&conn->server_write_buffer); + grpc_http_parser_destroy(&conn->http_parser); + grpc_http_request_destroy(&conn->http_request); + gpr_free(conn); } // Helper function to shut down the proxy connection. // Does NOT take ownership of a reference to error. -static void connection_data_failed(grpc_exec_ctx* exec_ctx, - connection_data* cd, const char* prefix, - grpc_error* error) { +static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, + proxy_connection* conn, bool is_client, + const char* prefix, grpc_error* error) { +gpr_log(GPR_INFO, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); - grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); - if (cd->server_endpoint != NULL) - grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - if (gpr_unref(&cd->refcount)) - connection_data_destroy(exec_ctx, cd); + if (is_client || !conn->client_write_pending) { + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); + conn->client_shutdown = true; + } + if (!is_client || !conn->server_write_pending) { + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); + conn->server_shutdown = true; + } + if (gpr_unref(&conn->refcount)) + proxy_connection_destroy(exec_ctx, conn); } +// Forward declarations. +static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); +static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn); + // Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; + conn->client_write_pending = false; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy client write", error); return; } - // Clear write buffer and release our reference. - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); - gpr_unref(&cd->refcount); + gpr_unref(&conn->refcount); + // Clear write buffer. +gpr_log(GPR_INFO, "wrote %lu bytes to client", conn->client_write_buffer.length); +conn->client_bytes_written += conn->client_write_buffer.length; + gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); + // If the server has been shut down, shut down the client now. + if (conn->server_shutdown) + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); } // Callback for writing proxy data to the backend server. static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; + conn->server_write_pending = false; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); + proxy_connection_failed(exec_ctx, conn, false /* is_client */, + "HTTP proxy server write", error); return; } - // Clear write buffer and release our reference. - gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); - gpr_unref(&cd->refcount); + gpr_unref(&conn->refcount); + // Clear write buffer. +gpr_log(GPR_INFO, "wrote %lu bytes to server", conn->server_write_buffer.length); +conn->server_bytes_written += conn->server_write_buffer.length; + gpr_slice_buffer_reset_and_unref(&conn->server_write_buffer); + // If the client has been shut down, shut down the server now. + if (conn->client_shutdown) + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); } // Callback for reading data from the client, which will be proxied to // the backend server. static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy client read", error); return; } // Move read data into write buffer and write it. - // Take a new ref for the write callback. - gpr_ref(&cd->refcount); - gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); - grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, - &cd->on_server_write_done); + // The write operation inherits our reference to conn. +gpr_log(GPR_INFO, "read %lu bytes from client", conn->client_read_buffer.length); +conn->client_bytes_read += conn->client_read_buffer.length; + gpr_slice_buffer_move_into(&conn->client_read_buffer, + &conn->server_write_buffer); + conn->server_write_pending = true; + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, &conn->on_server_write_done); // Read more data. - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_client_read_done); + do_client_read(exec_ctx, conn); } // Callback for reading data from the backend server, which will be // proxied to the client. static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); + proxy_connection_failed(exec_ctx, conn, false /* is_client */, + "HTTP proxy server read", error); return; } // Move read data into write buffer and write it. - // Take a new ref for the write callback. - gpr_ref(&cd->refcount); - gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); - grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, - &cd->on_client_write_done); + // The write operation inherits our reference to conn. +gpr_log(GPR_INFO, "read %lu bytes from server", conn->server_read_buffer.length); +conn->server_bytes_read += conn->server_read_buffer.length; + gpr_slice_buffer_move_into(&conn->server_read_buffer, + &conn->client_write_buffer); + conn->client_write_pending = true; + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, &conn->on_client_write_done); // Read more data. - grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, - &cd->on_server_read_done); + do_server_read(exec_ctx, conn); } // Callback to write the HTTP response for the CONNECT request. static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy write response", error); return; } + gpr_unref(&conn->refcount); // Clear write buffer. - gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_slice_buffer_reset_and_unref(&conn->client_write_buffer); // Start reading from both client and server. - // We increase the refcount by one, since we already held one reference - // for ourselves, and there will now be two pending callbacks. - gpr_ref(&cd->refcount); - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_client_read_done); - grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, - &cd->on_server_read_done); + do_client_read(exec_ctx, conn); + do_server_read(exec_ctx, conn); +} + +// Start a read from the client. +static void do_client_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { + gpr_ref(&conn->refcount); + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, + &conn->on_client_read_done); +} + +// Start a read from the server. +static void do_server_read(grpc_exec_ctx* exec_ctx, proxy_connection* conn) { + gpr_ref(&conn->refcount); + grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, + &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP // CONNECT request. static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); + // TODO(roth): Technically, in this case, we should handle the error + // by returning an HTTP response to the client indicating that the + // connection failed. However, for the purposes of this test code, + // it's fine to pretend this is a client-side error, which will + // cause the client connection to be dropped. + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy server connect", error); return; } // We've established a connection, so send back a 200 response code to // the client. + // The write callback inherits our reference to conn. gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); - gpr_slice_buffer_add(&cd->client_write_buffer, slice); - grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, - &cd->on_write_response_done); + gpr_slice_buffer_add(&conn->client_write_buffer, slice); + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, + &conn->on_write_response_done); } // Callback to read the HTTP CONNECT request. +// TODO(roth): Technically, for any of the failure modes handled by this +// function, we should handle the error by returning an HTTP response to +// the client indicating that the request failed. However, for the purposes +// of this test code, it's fine to pretend this is a client-side error, +// which will cause the client connection to be dropped. static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - connection_data* cd = arg; +gpr_log(GPR_INFO, "==> %s()", __func__); + proxy_connection* conn = arg; if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy read request", error); return; } // Read request and feed it to the parser. - for (size_t i = 0; i < cd->client_read_buffer.count; ++i) { - if (GPR_SLICE_LENGTH(cd->client_read_buffer.slices[i]) > 0) { + for (size_t i = 0; i < conn->client_read_buffer.count; ++i) { + if (GPR_SLICE_LENGTH(conn->client_read_buffer.slices[i]) > 0) { error = grpc_http_parser_parse( - &cd->http_parser, cd->client_read_buffer.slices[i]); + &conn->http_parser, conn->client_read_buffer.slices[i]); if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy request parse", - error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy request parse", error); + GRPC_ERROR_UNREF(error); return; } } } - gpr_slice_buffer_reset_and_unref(&cd->client_read_buffer); + gpr_slice_buffer_reset_and_unref(&conn->client_read_buffer); // If we're not done reading the request, read more data. - if (cd->http_parser.state != GRPC_HTTP_BODY) { - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_read_request_done); + if (conn->http_parser.state != GRPC_HTTP_BODY) { + grpc_endpoint_read(exec_ctx, conn->client_endpoint, + &conn->client_read_buffer, &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. - if (strcmp(cd->http_request.method, "CONNECT") != 0) { + if (strcmp(conn->http_request.method, "CONNECT") != 0) { char* msg; gpr_asprintf(&msg, "HTTP proxy got request method %s", - cd->http_request.method); + conn->http_request.method); error = GRPC_ERROR_CREATE(msg); gpr_free(msg); - connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy read request", error); + GRPC_ERROR_UNREF(error); return; } // Resolve address. grpc_resolved_addresses* resolved_addresses = NULL; - error = grpc_blocking_resolve_address(cd->http_request.path, "80", + error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - connection_data_failed(exec_ctx, cd, "HTTP proxy DNS lookup", error); + proxy_connection_failed(exec_ctx, conn, true /* is_client */, + "HTTP proxy DNS lookup", error); + GRPC_ERROR_UNREF(error); return; } GPR_ASSERT(resolved_addresses->naddrs >= 1); // Connect to requested address. + // The connection callback inherits our reference to conn. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(10, GPR_TIMESPAN)); - grpc_tcp_client_connect(exec_ctx, &cd->on_server_connect_done, - &cd->server_endpoint, cd->pollset_set, + grpc_tcp_client_connect(exec_ctx, &conn->on_server_connect_done, + &conn->server_endpoint, conn->pollset_set, (struct sockaddr*)&resolved_addresses->addrs[0].addr, resolved_addresses->addrs[0].len, deadline); grpc_resolved_addresses_destroy(resolved_addresses); } static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, - grpc_endpoint* ep, grpc_pollset* accepting_pollset, + grpc_endpoint* endpoint, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { +gpr_log(GPR_INFO, "==> %s()", __func__); grpc_end2end_http_proxy* proxy = arg; - // Instantiate connection_data. - connection_data* cd = gpr_malloc(sizeof(*cd)); - memset(cd, 0, sizeof(*cd)); - cd->client_endpoint = ep; - cd->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(exec_ctx, cd->pollset_set, proxy->pollset); - grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); - grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); - grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); - grpc_closure_init(&cd->on_client_read_done, on_client_read_done, cd); - grpc_closure_init(&cd->on_client_write_done, on_client_write_done, cd); - grpc_closure_init(&cd->on_server_read_done, on_server_read_done, cd); - grpc_closure_init(&cd->on_server_write_done, on_server_write_done, cd); - gpr_slice_buffer_init(&cd->client_read_buffer); - gpr_slice_buffer_init(&cd->client_write_buffer); - gpr_slice_buffer_init(&cd->server_read_buffer); - gpr_slice_buffer_init(&cd->server_write_buffer); - grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, - &cd->http_request); - cd->proxy = proxy; - gpr_ref_init(&cd->refcount, 1); - grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, - &cd->on_read_request_done); + // Instantiate proxy_connection. + proxy_connection* conn = gpr_malloc(sizeof(*conn)); + memset(conn, 0, sizeof(*conn)); + conn->client_endpoint = endpoint; + gpr_ref_init(&conn->refcount, 1); + conn->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); + grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn); + grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, + conn); + grpc_closure_init(&conn->on_write_response_done, on_write_response_done, + conn); + grpc_closure_init(&conn->on_client_read_done, on_client_read_done, conn); + grpc_closure_init(&conn->on_client_write_done, on_client_write_done, conn); + grpc_closure_init(&conn->on_server_read_done, on_server_read_done, conn); + grpc_closure_init(&conn->on_server_write_done, on_server_write_done, conn); + gpr_slice_buffer_init(&conn->client_read_buffer); + gpr_slice_buffer_init(&conn->client_write_buffer); + gpr_slice_buffer_init(&conn->server_read_buffer); + gpr_slice_buffer_init(&conn->server_write_buffer); + grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, + &conn->http_request); + conn->proxy = proxy; + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, + &conn->on_read_request_done); } // @@ -329,7 +415,7 @@ static void thread_main(void* arg) { do { const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); grpc_pollset_worker *worker = NULL; gpr_mu_lock(proxy->mu); GRPC_LOG_IF_ERROR("grpc_pollset_work",