|
|
|
@ -41,7 +41,7 @@ |
|
|
|
|
#include <grpc/support/slice_buffer.h> |
|
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
|
#include <grpc/support/sync.h> |
|
|
|
|
//#include <grpc/support/thd.h>
|
|
|
|
|
#include <grpc/support/thd.h> |
|
|
|
|
#include <grpc/support/useful.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/channel/channel_args.h" |
|
|
|
@ -58,6 +58,16 @@ |
|
|
|
|
#include "src/core/lib/iomgr/tcp_server.h" |
|
|
|
|
#include "test/core/util/port.h" |
|
|
|
|
|
|
|
|
|
struct grpc_end2end_http_proxy { |
|
|
|
|
char* proxy_name; |
|
|
|
|
gpr_thd_id thd; |
|
|
|
|
grpc_tcp_server* server; |
|
|
|
|
grpc_channel_args* channel_args; |
|
|
|
|
gpr_mu* mu; |
|
|
|
|
grpc_pollset* pollset; |
|
|
|
|
bool shutdown; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Connection handling
|
|
|
|
|
//
|
|
|
|
@ -83,10 +93,16 @@ typedef struct connection_data { |
|
|
|
|
|
|
|
|
|
grpc_http_parser http_parser; |
|
|
|
|
grpc_http_request http_request; |
|
|
|
|
|
|
|
|
|
grpc_end2end_http_proxy* proxy; |
|
|
|
|
|
|
|
|
|
gpr_refcount refcount; |
|
|
|
|
} connection_data; |
|
|
|
|
|
|
|
|
|
static void connection_data_destroy(grpc_exec_ctx* exec_ctx, |
|
|
|
|
connection_data* cd) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
cd->proxy->shutdown = true; |
|
|
|
|
grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); |
|
|
|
|
if (cd->server_endpoint != NULL) |
|
|
|
|
grpc_endpoint_destroy(exec_ctx, cd->server_endpoint); |
|
|
|
@ -103,62 +119,96 @@ static void connection_data_destroy(grpc_exec_ctx* exec_ctx, |
|
|
|
|
static void connection_data_failed(grpc_exec_ctx* exec_ctx, |
|
|
|
|
connection_data* cd, const char* prefix, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
const char* msg = grpc_error_string(error); |
|
|
|
|
gpr_log(GPR_ERROR, "%s: %s", prefix, msg); |
|
|
|
|
grpc_error_free_string(msg); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE 0"); |
|
|
|
|
grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE 1"); |
|
|
|
|
if (cd->server_endpoint != NULL) |
|
|
|
|
grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); |
|
|
|
|
connection_data_destroy(exec_ctx, cd); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE 2"); |
|
|
|
|
if (gpr_unref(&cd->refcount)) { |
|
|
|
|
gpr_log(GPR_ERROR, "HERE 2.5"); |
|
|
|
|
connection_data_destroy(exec_ctx, cd); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_ERROR, "HERE 3"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Clear write buffer.
|
|
|
|
|
gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Clear write buffer.
|
|
|
|
|
gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Move read data into write buffer and write it.
|
|
|
|
|
gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); |
|
|
|
|
grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, |
|
|
|
|
&cd->on_server_write_done); |
|
|
|
|
// Read more data.
|
|
|
|
|
grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, |
|
|
|
|
&cd->on_client_read_done); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Move read data into write buffer and write it.
|
|
|
|
|
gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); |
|
|
|
|
grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, |
|
|
|
|
&cd->on_client_write_done); |
|
|
|
|
// Read more data.
|
|
|
|
|
grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, |
|
|
|
|
&cd->on_server_read_done); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Set up proxying.
|
|
|
|
|
// Clear write buffer.
|
|
|
|
|
gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); |
|
|
|
|
// Start reading from both client and server.
|
|
|
|
|
// We increase the refcount by one, since we already held one reference
|
|
|
|
|
// for ourselves, and there will now be two pending callbacks.
|
|
|
|
|
gpr_ref(&cd->refcount); |
|
|
|
|
grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, |
|
|
|
|
&cd->on_client_read_done); |
|
|
|
|
grpc_endpoint_read(exec_ctx, cd->server_endpoint, &cd->server_read_buffer, |
|
|
|
@ -167,6 +217,7 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
|
|
|
|
|
static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); |
|
|
|
@ -174,8 +225,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
} |
|
|
|
|
// We've established a connection, so send back a 200 response code to
|
|
|
|
|
// the client.
|
|
|
|
|
gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n"); |
|
|
|
|
gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); |
|
|
|
|
gpr_slice slice = gpr_slice_from_copied_string("200 connected\r\n\r\n"); |
|
|
|
|
gpr_slice_buffer_add(&cd->client_write_buffer, slice); |
|
|
|
|
grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, |
|
|
|
|
&cd->on_write_response_done); |
|
|
|
@ -183,6 +233,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
|
|
|
|
|
static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
connection_data* cd = arg; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); |
|
|
|
@ -240,12 +291,14 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_endpoint* ep, grpc_pollset* accepting_pollset, |
|
|
|
|
grpc_tcp_server_acceptor* acceptor) { |
|
|
|
|
// FIXME: remove
|
|
|
|
|
gpr_log(GPR_ERROR, "==> on_accept()"); |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
grpc_end2end_http_proxy* proxy = arg; |
|
|
|
|
// Instantiate connection_data.
|
|
|
|
|
connection_data* cd = gpr_malloc(sizeof(*cd)); |
|
|
|
|
memset(cd, 0, sizeof(*cd)); |
|
|
|
|
cd->client_endpoint = ep; |
|
|
|
|
cd->pollset_set = grpc_pollset_set_create(); |
|
|
|
|
grpc_pollset_set_add_pollset(exec_ctx, cd->pollset_set, proxy->pollset); |
|
|
|
|
grpc_closure_init(&cd->on_read_request_done, on_read_request_done, cd); |
|
|
|
|
grpc_closure_init(&cd->on_server_connect_done, on_server_connect_done, cd); |
|
|
|
|
grpc_closure_init(&cd->on_write_response_done, on_write_response_done, cd); |
|
|
|
@ -259,6 +312,8 @@ gpr_log(GPR_ERROR, "==> on_accept()"); |
|
|
|
|
gpr_slice_buffer_init(&cd->server_write_buffer); |
|
|
|
|
grpc_http_parser_init(&cd->http_parser, GRPC_HTTP_REQUEST, |
|
|
|
|
&cd->http_request); |
|
|
|
|
cd->proxy = proxy; |
|
|
|
|
gpr_ref_init(&cd->refcount, 1); |
|
|
|
|
grpc_endpoint_read(exec_ctx, cd->client_endpoint, &cd->client_read_buffer, |
|
|
|
|
&cd->on_read_request_done); |
|
|
|
|
} |
|
|
|
@ -267,25 +322,6 @@ gpr_log(GPR_ERROR, "==> on_accept()"); |
|
|
|
|
// Proxy class
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
struct grpc_end2end_http_proxy { |
|
|
|
|
char* proxy_name; |
|
|
|
|
// gpr_thd_id thd;
|
|
|
|
|
grpc_tcp_server* server; |
|
|
|
|
grpc_channel_args* channel_args; |
|
|
|
|
gpr_mu* mu; |
|
|
|
|
grpc_pollset* pollset; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
#if 0 |
|
|
|
|
static void thread_main(void *arg) { |
|
|
|
|
//grpc_end2end_http_proxy *proxy = arg;
|
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
while (true) { |
|
|
|
|
grpc_exec_ctx_flush(&exec_ctx); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { |
|
|
|
|
grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); |
|
|
|
|
memset(proxy, 0, sizeof(*proxy)); |
|
|
|
@ -315,7 +351,7 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); |
|
|
|
|
grpc_pollset_init(proxy->pollset, &proxy->mu); |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, |
|
|
|
|
on_accept, NULL); |
|
|
|
|
on_accept, proxy); |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
|
#if 0 |
|
|
|
|
// Start proxy thread.
|
|
|
|
@ -331,15 +367,21 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, |
|
|
|
|
grpc_pollset_destroy(p); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FIXME: remove (including all references below)
|
|
|
|
|
//#define USE_THREAD 1
|
|
|
|
|
|
|
|
|
|
void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); |
|
|
|
|
grpc_tcp_server_unref(&exec_ctx, proxy->server); |
|
|
|
|
// gpr_thd_join(proxy->thd);
|
|
|
|
|
#ifdef USE_THREAD |
|
|
|
|
gpr_thd_join(proxy->thd); |
|
|
|
|
#endif |
|
|
|
|
gpr_free(proxy->proxy_name); |
|
|
|
|
grpc_channel_args_destroy(proxy->channel_args); |
|
|
|
|
grpc_closure destroyed; |
|
|
|
|
grpc_closure_init(&destroyed, destroy_pollset, &proxy->pollset); |
|
|
|
|
grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset); |
|
|
|
|
grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); |
|
|
|
|
gpr_free(proxy); |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
@ -349,3 +391,39 @@ const char *grpc_end2end_http_proxy_get_proxy_name( |
|
|
|
|
grpc_end2end_http_proxy *proxy) { |
|
|
|
|
return proxy->proxy_name; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void thread_main(void* arg) { |
|
|
|
|
gpr_log(GPR_ERROR, "==> %s()", __func__); |
|
|
|
|
grpc_end2end_http_proxy *proxy = arg; |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
do { |
|
|
|
|
gpr_log(GPR_ERROR, "HERE a"); |
|
|
|
|
const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); |
|
|
|
|
const gpr_timespec deadline = |
|
|
|
|
gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); |
|
|
|
|
grpc_pollset_worker *worker = NULL; |
|
|
|
|
gpr_log(GPR_ERROR, "HERE b"); |
|
|
|
|
gpr_mu_lock(proxy->mu); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE c"); |
|
|
|
|
GRPC_LOG_IF_ERROR("grpc_pollset_work", |
|
|
|
|
grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, |
|
|
|
|
now, deadline)); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE d"); |
|
|
|
|
gpr_mu_unlock(proxy->mu); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE e"); |
|
|
|
|
grpc_exec_ctx_flush(&exec_ctx); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE f"); |
|
|
|
|
} while (!proxy->shutdown); |
|
|
|
|
gpr_log(GPR_ERROR, "HERE g"); |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy) { |
|
|
|
|
#ifdef USE_THREAD |
|
|
|
|
gpr_thd_options opt = gpr_thd_options_default(); |
|
|
|
|
gpr_thd_options_set_joinable(&opt); |
|
|
|
|
GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); |
|
|
|
|
#else |
|
|
|
|
thread_main(proxy); |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|