Reviewer comments

reviewable/pr20542/r2
Yash Tibrewal 5 years ago
parent 6bbd3a1c3c
commit 6a32264cdf
  1. 5
      src/core/ext/filters/client_channel/xds/xds_client.cc
  2. 2
      src/core/ext/filters/client_channel/xds/xds_client.h
  3. 5
      src/core/lib/iomgr/combiner.h
  4. 2
      src/core/lib/transport/connectivity_state.cc
  5. 2
      src/core/lib/transport/connectivity_state.h
  6. 131
      test/core/end2end/fixtures/http_proxy_fixture.cc

@ -515,7 +515,7 @@ template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
calld->chand_->xds_client()->combiner()->Run(
calld->chand_->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&calld->on_retry_timer_, OnRetryTimerLocked, calld,
nullptr),
GRPC_ERROR_REF(error));
@ -628,7 +628,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
@ -875,7 +874,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->xds_client()->combiner()->Run(
self->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&self->on_next_report_timer_, OnNextReportTimerLocked,
self, nullptr),
GRPC_ERROR_REF(error));

@ -108,8 +108,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static RefCountedPtr<XdsClient> GetFromChannelArgs(
const grpc_channel_args& args);
Combiner* combiner() { return combiner_; }
private:
class ChannelState;

@ -28,14 +28,17 @@
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
// TODO(yashkt) : Remove this class and replace it with a class that does not
// use ExecCtx
class Combiner {
public:
void Run(grpc_closure* closure, grpc_error* error);
// TODO(yashkt) : Remove this method
void FinallyRun(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not

@ -58,7 +58,7 @@ const char* ConnectivityStateName(grpc_connectivity_state state) {
class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state, grpc_core::Combiner* combiner)
grpc_connectivity_state state, Combiner* combiner)
: watcher_(std::move(watcher)), state_(state) {
if (combiner != nullptr) {
combiner->Run(

@ -68,6 +68,8 @@ class AsyncConnectivityStateWatcherInterface
protected:
class Notifier;
// If \a combiner is nullptr, then the notification will be scheduled on the
// ExecCtx.
explicit AsyncConnectivityStateWatcherInterface(Combiner* combiner = nullptr)
: combiner_(combiner) {}

@ -89,14 +89,6 @@ typedef struct proxy_connection {
grpc_pollset_set* pollset_set;
grpc_closure on_read_request_done_hopper;
grpc_closure on_server_connect_done_hopper;
grpc_closure on_write_response_done_hopper;
grpc_closure on_client_read_done_hopper;
grpc_closure on_client_write_done_hopper;
grpc_closure on_server_read_done_hopper;
grpc_closure on_server_write_done_hopper;
// NOTE: All the closures execute under proxy->combiner lock. Which means
// there will not be any data-races between the closures
grpc_closure on_read_request_done;
@ -202,6 +194,8 @@ static void proxy_connection_failed(proxy_connection* conn,
static void on_client_write_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done_locked,
conn, nullptr);
conn->proxy->combiner->Run(&conn->on_client_write_done,
GRPC_ERROR_REF(error));
}
@ -223,22 +217,26 @@ static void on_client_write_done(void* arg, grpc_error* error) {
grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer,
&conn->client_write_buffer);
conn->client_is_writing = true;
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done_hopper, nullptr);
&conn->on_client_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
}
}
static void on_server_write_done_hopper(void* arg, grpc_error* error) {
static void on_server_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done_locked,
conn, nullptr);
conn->proxy->combiner->Run(&conn->on_server_write_done,
GRPC_ERROR_REF(error));
}
// Callback for writing proxy data to the backend server.
static void on_server_write_done(void* arg, grpc_error* error) {
static void on_server_write_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) {
@ -254,22 +252,26 @@ static void on_server_write_done(void* arg, grpc_error* error) {
grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer,
&conn->server_write_buffer);
conn->server_is_writing = true;
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done_hopper, nullptr);
&conn->on_server_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
}
}
static void on_client_read_done_hopper(void* arg, grpc_error* error) {
static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done_locked,
conn, nullptr);
conn->proxy->combiner->Run(&conn->on_client_read_done, GRPC_ERROR_REF(error));
}
// Callback for reading data from the client, which will be proxied to
// the backend server.
static void on_client_read_done(void* arg, grpc_error* error) {
static void on_client_read_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read",
@ -290,22 +292,28 @@ static void on_client_read_done(void* arg, grpc_error* error) {
&conn->server_write_buffer);
proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true;
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done_hopper, nullptr);
&conn->on_server_write_done, nullptr);
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done_hopper, /*urgent=*/false);
&conn->on_client_read_done, /*urgent=*/false);
}
static void on_server_read_done_hopper(void* arg, grpc_error* error) {
static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done_locked,
conn, nullptr);
conn->proxy->combiner->Run(&conn->on_server_read_done, GRPC_ERROR_REF(error));
}
// Callback for reading data from the backend server, which will be
// proxied to the client.
static void on_server_read_done(void* arg, grpc_error* error) {
static void on_server_read_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read",
@ -326,22 +334,28 @@ static void on_server_read_done(void* arg, grpc_error* error) {
&conn->client_write_buffer);
proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true;
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done_hopper, nullptr);
&conn->on_client_write_done, nullptr);
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done_hopper, /*urgent=*/false);
&conn->on_server_read_done, /*urgent=*/false);
}
static void on_write_response_done_hopper(void* arg, grpc_error* error) {
static void on_write_response_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_write_response_done,
on_write_response_done_locked, conn, nullptr);
conn->proxy->combiner->Run(&conn->on_write_response_done,
GRPC_ERROR_REF(error));
}
// Callback to write the HTTP response for the CONNECT request.
static void on_write_response_done(void* arg, grpc_error* error) {
static void on_write_response_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
@ -357,21 +371,27 @@ static void on_write_response_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read");
proxy_connection_ref(conn, "server_read");
proxy_connection_unref(conn, "write_response");
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done_hopper, /*urgent=*/false);
&conn->on_client_read_done, /*urgent=*/false);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done_hopper, /*urgent=*/false);
&conn->on_server_read_done, /*urgent=*/false);
}
static void on_server_connect_done_hopper(void* arg, grpc_error* error) {
static void on_server_connect_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done,
on_server_connect_done_locked, conn, nullptr);
conn->proxy->combiner->Run(&conn->on_server_connect_done,
GRPC_ERROR_REF(error));
}
// Callback to connect to the backend server specified by the HTTP
// CONNECT request.
static void on_server_connect_done(void* arg, grpc_error* error) {
static void on_server_connect_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
if (error != GRPC_ERROR_NONE) {
// TODO(roth): Technically, in this case, we should handle the error
@ -390,8 +410,10 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n");
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true;
GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_write_response_done_hopper, nullptr);
&conn->on_write_response_done, nullptr);
}
/**
@ -414,8 +436,10 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val,
return header_matches;
}
static void on_read_request_done_hopper(void* arg, grpc_error* error) {
static void on_read_request_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done_locked,
conn, nullptr);
conn->proxy->combiner->Run(&conn->on_read_request_done,
GRPC_ERROR_REF(error));
}
@ -425,7 +449,7 @@ static void on_read_request_done_hopper(void* arg, grpc_error* error) {
// the client indicating that the request failed. However, for the purposes
// of this test code, it's fine to pretend this is a client-side error,
// which will cause the client connection to be dropped.
static void on_read_request_done(void* arg, grpc_error* error) {
static void on_read_request_done_locked(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
grpc_error_string(error));
@ -450,8 +474,10 @@ static void on_read_request_done(void* arg, grpc_error* error) {
grpc_slice_buffer_reset_and_unref(&conn->client_read_buffer);
// If we're not done reading the request, read more data.
if (conn->http_parser.state != GRPC_HTTP_BODY) {
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done_hopper, /*urgent=*/false);
&conn->on_read_request_done, /*urgent=*/false);
return;
}
// Make sure we got a CONNECT request.
@ -503,8 +529,10 @@ static void on_read_request_done(void* arg, grpc_error* error) {
// The connection callback inherits our reference to conn.
const grpc_millis deadline =
grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC;
grpc_tcp_client_connect(&conn->on_server_connect_done_hopper,
&conn->server_endpoint, conn->pollset_set, nullptr,
GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn,
grpc_schedule_on_exec_ctx);
grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint,
conn->pollset_set, nullptr,
&resolved_addresses->addrs[0], deadline);
grpc_resolved_addresses_destroy(resolved_addresses);
}
@ -524,41 +552,6 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
conn->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset);
grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set);
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
nullptr);
GRPC_CLOSURE_INIT(&conn->on_read_request_done_hopper,
on_read_request_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done_hopper,
on_server_connect_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_write_response_done_hopper,
on_write_response_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_client_read_done_hopper,
on_client_read_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_client_write_done_hopper,
on_client_write_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_read_done_hopper,
on_server_read_done_hopper, conn,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_write_done_hopper,
on_server_write_done_hopper, conn,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&conn->client_read_buffer);
grpc_slice_buffer_init(&conn->client_deferred_write_buffer);
conn->client_is_writing = false;
@ -569,8 +562,10 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_slice_buffer_init(&conn->server_write_buffer);
grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST,
&conn->http_request);
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done_hopper, /*urgent=*/false);
&conn->on_read_request_done, /*urgent=*/false);
}
//

Loading…
Cancel
Save