implemented get_local_address accessor for grpc_endpoint

pull/23489/head
Michael Wang 4 years ago
parent 70f001565b
commit c6586f087f
  1. 5
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  2. 41
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 2
      src/core/ext/transport/chttp2/transport/flow_control.cc
  4. 4
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  5. 2
      src/core/ext/transport/chttp2/transport/internal.h
  6. 11
      src/core/ext/transport/chttp2/transport/writing.cc
  7. 6
      src/core/lib/iomgr/endpoint.cc
  8. 9
      src/core/lib/iomgr/endpoint.h
  9. 43
      src/core/lib/iomgr/endpoint_cfstream.cc
  10. 48
      src/core/lib/iomgr/tcp_custom.cc
  11. 44
      src/core/lib/iomgr/tcp_posix.cc
  12. 36
      src/core/lib/iomgr/tcp_windows.cc
  13. 8
      src/core/lib/security/transport/secure_endpoint.cc
  14. 9
      test/core/util/mock_endpoint.cc
  15. 14
      test/core/util/passthru_endpoint.cc
  16. 30
      test/core/util/reconnect_server.cc
  17. 2
      test/core/util/reconnect_server.h
  18. 8
      test/core/util/trickle_endpoint.cc
  19. 5
      test/cpp/end2end/port_sharing_end2end_test.cc
  20. 6
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -325,10 +325,9 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
args_ = args;
on_handshake_done_ = on_handshake_done;
// Log connection via proxy.
char* proxy_name = grpc_endpoint_get_peer(args->endpoint);
std::string proxy_name(grpc_endpoint_get_peer(args->endpoint));
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", server_name,
proxy_name);
gpr_free(proxy_name);
proxy_name.c_str());
// Construct HTTP CONNECT request.
grpc_httpcli_request request;
request.host = server_name;

@ -226,7 +226,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
GRPC_ERROR_UNREF(closed_with_error);
gpr_free(ping_acks);
gpr_free(peer_string);
}
static const grpc_transport_vtable* get_vtable(void);
@ -378,11 +377,9 @@ static bool read_channel_args(grpc_chttp2_transport* t,
}
}
if (channelz_enabled) {
// TODO(ncteisen): add an API to endpoint to query for local addr, and pass
// it in here, so SocketNode knows its own address.
t->channelz_socket =
grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
"", t->peer_string,
std::string(grpc_endpoint_get_local_address(t->ep)), t->peer_string,
absl::StrFormat("%s %s", get_vtable()->name, t->peer_string));
}
return enable_bdp;
@ -795,7 +792,7 @@ static void set_write_state(grpc_chttp2_transport* t,
grpc_chttp2_write_state st, const char* reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
write_state_name(t->write_state), write_state_name(st), reason));
t->write_state = st;
// If the state is being reset back to idle, it means a write was just
@ -1084,7 +1081,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
// We want to log this irrespective of whether http tracing is enabled if we
// received a GOAWAY with a non NO_ERROR code.
if (goaway_error != GRPC_HTTP2_NO_ERROR) {
gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(),
goaway_error, grpc_error_string(t->goaway_error));
}
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
@ -1216,7 +1213,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
"Error in HTTP transport completing operation");
closure->error_data.error = grpc_error_set_str(
closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(t->peer_string));
grpc_slice_from_copied_string(t->peer_string.c_str()));
}
closure->error_data.error =
grpc_error_add_child(closure->error_data.error, error);
@ -1474,7 +1471,7 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op_payload->send_initial_metadata.peer_string != nullptr) {
gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
(gpr_atm)t->peer_string);
(gpr_atm)t->peer_string.c_str());
}
}
@ -1587,7 +1584,7 @@ static void perform_stream_op_locked(void* stream_op,
op_payload->recv_initial_metadata.trailing_metadata_available;
if (op_payload->recv_initial_metadata.peer_string != nullptr) {
gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
(gpr_atm)t->peer_string);
(gpr_atm)t->peer_string.c_str());
}
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
}
@ -1755,9 +1752,8 @@ static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (pq->inflight_id != id) {
char* from = grpc_endpoint_get_peer(t->ep);
gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
gpr_free(from);
gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
t->peer_string.c_str(), id);
return;
}
grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
@ -1769,7 +1765,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
// We want to log this irrespective of whether http tracing is enabled
gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string.c_str(),
grpc_error_string(error));
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
grpc_http2_error_code http_error;
@ -2641,7 +2637,7 @@ static void start_bdp_ping(void* tp, grpc_error* error) {
static void start_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string.c_str(),
grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
@ -2665,7 +2661,7 @@ static void finish_bdp_ping(void* tp, grpc_error* error) {
static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string.c_str(),
grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
@ -2835,7 +2831,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
}
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
@ -2859,7 +2855,7 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string.c_str());
}
if (!t->keepalive_ping_started) {
// start_keepalive_ping_locked has not run yet. Reschedule
@ -2897,7 +2893,7 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
t->peer_string);
t->peer_string.c_str());
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(
t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -3205,7 +3201,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
// disconnect cleanly
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
t->peer_string);
t->peer_string.c_str());
}
send_goaway(t,
grpc_error_set_int(
@ -3216,7 +3212,8 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO,
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
t->peer_string.c_str(),
grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
if (error != GRPC_ERROR_CANCELLED) {
@ -3241,8 +3238,8 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
grpc_chttp2_stream_map_rand(&t->stream_map));
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
s->id);
gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
t->peer_string.c_str(), s->id);
}
grpc_chttp2_cancel_stream(
t, s,

@ -175,7 +175,7 @@ TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
bool enable_bdp_probe)
: t_(t),
enable_bdp_probe_(enable_bdp_probe),
bdp_estimator_(t->peer_string),
bdp_estimator_(t->peer_string.c_str()),
pid_controller_(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)

@ -228,8 +228,8 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
parser->incoming_settings[id] = parser->value;
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "CHTTP2:%s:%s: got setting %s = %d",
t->is_client ? "CLI" : "SVR", t->peer_string, sp->name,
parser->value);
t->is_client ? "CLI" : "SVR", t->peer_string.c_str(),
sp->name, parser->value);
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",

@ -290,7 +290,7 @@ struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_core::RefCount refs;
grpc_endpoint* ep;
char* peer_string;
std::string peer_string;
grpc_resource_user* resource_user;

@ -58,7 +58,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
t->is_client ? "CLIENT" : "SERVER", t->peer_string);
t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str());
}
return;
}
@ -69,7 +69,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Ping delayed [%s]: too many recent pings: %d/%d",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
t->ping_state.pings_before_data_required,
t->ping_policy.max_pings_without_data);
}
@ -95,7 +95,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
gpr_log(GPR_INFO,
"%s: Ping delayed [%s]: not enough time elapsed since last ping. "
" Last ping %f: Next ping %f: Now %f",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
static_cast<double>(t->ping_state.last_ping_sent_time),
static_cast<double>(next_allowed_ping), static_cast<double>(now));
}
@ -125,7 +125,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
t->ping_state.pings_before_data_required,
t->ping_policy.max_pings_without_data);
}
@ -165,7 +165,8 @@ static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
"helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
":s_win=%d:s_delta=%" PRId64 "]",
t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
t->peer_string.c_str(), t, s->id, staller,
s->flow_controlled_buffer.length,
s->stream_compression_method ==
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
? 0

@ -52,10 +52,14 @@ void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
void grpc_endpoint_destroy(grpc_endpoint* ep) { ep->vtable->destroy(ep); }
char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
absl::string_view grpc_endpoint_get_local_address(grpc_endpoint* ep) {
return ep->vtable->get_local_address(ep);
}
int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) {

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/time.h>
@ -46,7 +48,8 @@ struct grpc_endpoint_vtable {
void (*shutdown)(grpc_endpoint* ep, grpc_error* why);
void (*destroy)(grpc_endpoint* ep);
grpc_resource_user* (*get_resource_user)(grpc_endpoint* ep);
char* (*get_peer)(grpc_endpoint* ep);
absl::string_view (*get_peer)(grpc_endpoint* ep);
absl::string_view (*get_local_address)(grpc_endpoint* ep);
int (*get_fd)(grpc_endpoint* ep);
bool (*can_track_err)(grpc_endpoint* ep);
};
@ -59,7 +62,9 @@ struct grpc_endpoint_vtable {
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool urgent);
char* grpc_endpoint_get_peer(grpc_endpoint* ep);
absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep);
absl::string_view grpc_endpoint_get_local_address(grpc_endpoint* ep);
/* Get the file descriptor used by \a ep. Return -1 if \a ep is not using an fd.
*/

@ -34,6 +34,8 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error_cfstream.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@ -55,7 +57,8 @@ struct CFStreamEndpoint {
grpc_closure read_action;
grpc_closure write_action;
char* peer_string;
std::string peer_string;
std::string local_address;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
};
@ -64,8 +67,7 @@ static void CFStreamFree(CFStreamEndpoint* ep) {
CFRelease(ep->read_stream);
CFRelease(ep->write_stream);
CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
gpr_free(ep->peer_string);
gpr_free(ep);
delete ep;
}
#ifndef NDEBUG
@ -110,7 +112,7 @@ static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(ep->peer_string));
grpc_slice_from_copied_string(ep->peer_string.c_str()));
}
static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
@ -124,7 +126,8 @@ static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
for (i = 0; i < ep->read_slices->count; i++) {
char* dump = grpc_dump_slice(ep->read_slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string.c_str(),
dump);
gpr_free(dump);
}
}
@ -230,7 +233,8 @@ static void WriteAction(void* arg, grpc_error* error) {
if (grpc_tcp_trace.enabled()) {
grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string.c_str(),
dump);
gpr_free(dump);
grpc_slice_unref_internal(trace_slice);
}
@ -309,9 +313,14 @@ grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
return ep_impl->resource_user;
}
char* CFStreamGetPeer(grpc_endpoint* ep) {
absl::string_view CFStreamGetPeer(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
return gpr_strdup(ep_impl->peer_string);
return ep_impl->peer_string;
}
absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
return ep_impl->local_address;
}
int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
@ -332,6 +341,7 @@ static const grpc_endpoint_vtable vtable = {CFStreamRead,
CFStreamDestroy,
CFStreamGetResourceUser,
CFStreamGetPeer,
CFStreamGetLocalAddress,
CFStreamGetFD,
CFStreamCanTrackErr};
@ -339,8 +349,7 @@ grpc_endpoint* grpc_cfstream_endpoint_create(
CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
const char* peer_string, grpc_resource_quota* resource_quota,
CFStreamHandle* stream_sync) {
CFStreamEndpoint* ep_impl =
static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG,
"CFStream endpoint:%p create readStream:%p writeStream: %p",
@ -355,7 +364,19 @@ grpc_endpoint* grpc_cfstream_endpoint_create(
ep_impl->stream_sync = stream_sync;
CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
ep_impl->peer_string = gpr_strdup(peer_string);
ep_impl->peer_string = peer_string;
const int* native_handle =
reinterpret_cast<const int*>(CFReadStreamCopyProperty(
ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
grpc_resolved_address resolved_local_addr;
resolved_local_addr.len = sizeof(resolved_local_addr.addr);
if (getsockname(*native_handle,
reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
&resolved_local_addr.len) < 0) {
ep_impl->local_address = "";
} else {
ep_impl->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
}
ep_impl->read_cb = nil;
ep_impl->write_cb = nil;
ep_impl->read_slices = nil;

@ -32,6 +32,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_custom.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_custom.h"
#include "src/core/lib/iomgr/tcp_server.h"
@ -57,24 +58,24 @@ struct custom_tcp_endpoint {
gpr_refcount refcount;
grpc_custom_socket* socket;
grpc_closure* read_cb;
grpc_closure* write_cb;
grpc_closure* read_cb = nullptr;
grpc_closure* write_cb = nullptr;
grpc_slice_buffer* read_slices;
grpc_slice_buffer* write_slices;
grpc_slice_buffer* read_slices = nullptr;
grpc_slice_buffer* write_slices = nullptr;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
bool shutting_down;
char* peer_string;
std::string peer_string;
std::string local_address;
};
static void tcp_free(grpc_custom_socket* s) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)s->endpoint;
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
delete tcp;
s->refs--;
if (s->refs == 0) {
grpc_custom_socket_vtable->destroy(s);
@ -132,7 +133,8 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
for (i = 0; i < tcp->read_slices->count; i++) {
char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string.c_str(),
dump);
gpr_free(dump);
}
}
@ -233,8 +235,8 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
for (j = 0; j < write_slices->count; j++) {
char* data = grpc_dump_slice(write_slices->slices[j],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp->socket, tcp->peer_string,
data);
gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp->socket,
tcp->peer_string.c_str(), data);
gpr_free(data);
}
}
@ -317,9 +319,14 @@ static void endpoint_destroy(grpc_endpoint* ep) {
grpc_custom_socket_vtable->close(tcp->socket, custom_close_callback);
}
static char* endpoint_get_peer(grpc_endpoint* ep) {
static absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
return gpr_strdup(tcp->peer_string);
return tcp->peer_string;
}
static absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
return tcp->local_address;
}
static grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) {
@ -340,27 +347,36 @@ static grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_destroy,
endpoint_get_resource_user,
endpoint_get_peer,
endpoint_get_local_address,
endpoint_get_fd,
endpoint_can_track_err};
grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket,
grpc_resource_quota* resource_quota,
const char* peer_string) {
custom_tcp_endpoint* tcp =
(custom_tcp_endpoint*)gpr_malloc(sizeof(custom_tcp_endpoint));
custom_tcp_endpoint* tcp = new custom_tcp_endpoint;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "Creating TCP endpoint %p", socket);
}
memset(tcp, 0, sizeof(custom_tcp_endpoint));
socket->refs++;
socket->endpoint = (grpc_endpoint*)tcp;
tcp->socket = socket;
tcp->base.vtable = &vtable;
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
tcp->peer_string = peer_string;
grpc_resolved_address resolved_local_addr;
resolved_local_addr.len = sizeof(resolved_local_addr.addr);
if (grpc_custom_socket_vtable->getsockname(
socket, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
reinterpret_cast<int*>(&resolved_local_addr.len)) !=
GRPC_ERROR_NONE) {
tcp->local_address = "";
} else {
tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
}
tcp->shutting_down = false;
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(

@ -54,6 +54,7 @@
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -351,6 +352,8 @@ using grpc_core::TcpZerocopySendRecord;
namespace {
struct grpc_tcp {
grpc_tcp(int max_sends, size_t send_bytes_threshold)
: tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
grpc_endpoint base;
grpc_fd* em_fd;
int fd;
@ -385,7 +388,8 @@ struct grpc_tcp {
grpc_closure write_done_closure;
grpc_closure error_closure;
char* peer_string;
std::string peer_string;
std::string local_address;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
@ -605,7 +609,7 @@ static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
* choose to retry. */
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(tcp->peer_string));
grpc_slice_from_copied_string(tcp->peer_string.c_str()));
}
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
@ -623,7 +627,6 @@ static void tcp_free(grpc_tcp* tcp) {
"tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
/* The lock is not really necessary here, since all refs have been released */
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
@ -632,8 +635,7 @@ static void tcp_free(grpc_tcp* tcp) {
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
gpr_mu_destroy(&tcp->tb_mu);
tcp->tcp_zerocopy_send_ctx.~TcpZerocopySendCtx();
gpr_free(tcp);
delete tcp;
}
#ifndef NDEBUG
@ -680,7 +682,8 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp, tcp->peer_string, str);
gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp,
tcp->peer_string.c_str(), str);
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < tcp->incoming_buffer->count; i++) {
@ -1563,7 +1566,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
size_t i;
for (i = 0; i < buf->count; i++) {
gpr_log(GPR_INFO, "WRITE %p (peer=%s)", tcp, tcp->peer_string);
gpr_log(GPR_INFO, "WRITE %p (peer=%s)", tcp, tcp->peer_string.c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
char* data =
grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
@ -1637,9 +1640,14 @@ static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
grpc_pollset_set_del_fd(pollset_set, tcp->em_fd);
}
static char* tcp_get_peer(grpc_endpoint* ep) {
static absl::string_view tcp_get_peer(grpc_endpoint* ep) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
return gpr_strdup(tcp->peer_string);
return tcp->peer_string;
}
static absl::string_view tcp_get_local_address(grpc_endpoint* ep) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
return tcp->local_address;
}
static int tcp_get_fd(grpc_endpoint* ep) {
@ -1677,6 +1685,7 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_destroy,
tcp_get_resource_user,
tcp_get_peer,
tcp_get_local_address,
tcp_get_fd,
tcp_can_track_err};
@ -1745,10 +1754,21 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size,
tcp_max_read_chunk_size);
grpc_tcp* tcp = static_cast<grpc_tcp*>(gpr_malloc(sizeof(grpc_tcp)));
grpc_tcp* tcp = new grpc_tcp(tcp_tx_zerocopy_max_simult_sends,
tcp_tx_zerocopy_send_bytes_thresh);
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
tcp->peer_string = peer_string;
tcp->fd = grpc_fd_wrapped_fd(em_fd);
grpc_resolved_address resolved_local_addr;
memset(&resolved_local_addr, 0, sizeof(resolved_local_addr));
resolved_local_addr.len = sizeof(resolved_local_addr.addr);
if (getsockname(tcp->fd,
reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
&resolved_local_addr.len) < 0) {
tcp->local_address = "";
} else {
tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
}
tcp->read_cb = nullptr;
tcp->write_cb = nullptr;
tcp->current_zerocopy_send = nullptr;
@ -1765,8 +1785,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
new (&tcp->tcp_zerocopy_send_ctx) TcpZerocopySendCtx(
tcp_tx_zerocopy_max_simult_sends, tcp_tx_zerocopy_send_bytes_thresh);
if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
#ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1;

@ -125,17 +125,17 @@ typedef struct grpc_tcp {
int shutting_down;
grpc_error* shutdown_error;
char* peer_string;
std::string peer_string;
std::string local_address;
} grpc_tcp;
static void tcp_free(grpc_tcp* tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
delete tcp;
}
#ifndef NDEBUG
@ -213,8 +213,8 @@ static void on_read(void* tcpp, grpc_error* error) {
for (i = 0; i < tcp->read_slices->count; i++) {
char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
dump);
gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp,
tcp->peer_string.c_str(), dump);
gpr_free(dump);
}
}
@ -361,7 +361,8 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
for (i = 0; i < slices->count; i++) {
char* data =
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string.c_str(),
data);
gpr_free(data);
}
}
@ -475,9 +476,14 @@ static void win_destroy(grpc_endpoint* ep) {
TCP_UNREF(tcp, "destroy");
}
static char* win_get_peer(grpc_endpoint* ep) {
static absl::string_view win_get_peer(grpc_endpoint* ep) {
grpc_tcp* tcp = (grpc_tcp*)ep;
return gpr_strdup(tcp->peer_string);
return tcp->peer_string;
}
static absl::string_view win_get_local_address(grpc_endpoint* ep) {
grpc_tcp* tcp = (grpc_tcp*)ep;
return tcp->local_address;
}
static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
@ -498,6 +504,7 @@ static grpc_endpoint_vtable vtable = {win_read,
win_destroy,
win_get_resource_user,
win_get_peer,
win_get_local_address,
win_get_fd,
win_can_track_err};
@ -514,7 +521,7 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
}
}
}
grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
grpc_tcp* tcp = new grpc_tcp;
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
@ -522,7 +529,16 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
gpr_ref_init(&tcp->refcount, 1);
GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
grpc_resolved_address resolved_local_addr;
resolved_local_addr.len = sizeof(resolved_local_addr.addr);
if (getsockname(tcp->socket->socket,
reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
&resolved_local_addr.len) < 0) {
tcp->local_address = "";
} else {
tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
}
tcp->peer_string = peer_string;
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_quota_unref_internal(resource_quota);

@ -401,11 +401,16 @@ static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
}
static char* endpoint_get_peer(grpc_endpoint* secure_ep) {
static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
return grpc_endpoint_get_local_address(ep->wrapped_ep);
}
static int endpoint_get_fd(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
return grpc_endpoint_get_fd(ep->wrapped_ep);
@ -431,6 +436,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_destroy,
endpoint_get_resource_user,
endpoint_get_peer,
endpoint_get_local_address,
endpoint_get_fd,
endpoint_can_track_err};

@ -98,8 +98,12 @@ static void me_destroy(grpc_endpoint* ep) {
gpr_free(m);
}
static char* me_get_peer(grpc_endpoint* /*ep*/) {
return gpr_strdup("fake:mock_endpoint");
static absl::string_view me_get_peer(grpc_endpoint* /*ep*/) {
return "fake:mock_endpoint";
}
static absl::string_view me_get_local_address(grpc_endpoint* /*ep*/) {
return "fake:mock_endpoint";
}
static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
@ -120,6 +124,7 @@ static const grpc_endpoint_vtable vtable = {me_read,
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_local_address,
me_get_fd,
me_can_track_err};

@ -152,11 +152,18 @@ static void me_destroy(grpc_endpoint* ep) {
}
}
static char* me_get_peer(grpc_endpoint* ep) {
static absl::string_view me_get_peer(grpc_endpoint* ep) {
passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
return (reinterpret_cast<half*>(ep)) == &p->client
? gpr_strdup("fake:mock_client_endpoint")
: gpr_strdup("fake:mock_server_endpoint");
? "fake:mock_client_endpoint"
: "fake:mock_server_endpoint";
}
static absl::string_view me_get_local_address(grpc_endpoint* ep) {
passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
return (reinterpret_cast<half*>(ep)) == &p->client
? "fake:mock_client_endpoint"
: "fake:mock_server_endpoint";
}
static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
@ -178,6 +185,7 @@ static const grpc_endpoint_vtable vtable = {
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_local_address,
me_get_fd,
me_can_track_err,
};

@ -21,10 +21,12 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <string.h>
#include "absl/strings/string_view.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/tcp_server.h"
@ -59,8 +61,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
grpc_pollset* /*accepting_pollset*/,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
char* peer;
char* last_colon;
absl::string_view peer;
int last_colon;
reconnect_server* server = static_cast<reconnect_server*>(arg);
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list* new_tail;
@ -68,18 +70,16 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
grpc_endpoint_shutdown(tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
grpc_endpoint_destroy(tcp);
if (peer) {
last_colon = strrchr(peer, ':');
if (server->peer == nullptr) {
server->peer = peer;
} else {
if (last_colon == nullptr) {
gpr_log(GPR_ERROR, "peer does not contain a ':'");
} else if (strncmp(server->peer, peer,
static_cast<size_t>(last_colon - peer)) != 0) {
gpr_log(GPR_ERROR, "mismatched peer! %s vs %s", server->peer, peer);
}
gpr_free(peer);
last_colon = peer.rfind(':');
if (server->peer == nullptr) {
server->peer = new std::string(peer);
} else {
if (last_colon == std::string::npos) {
gpr_log(GPR_ERROR, "peer does not contain a ':'");
} else if (peer.compare(0, static_cast<size_t>(last_colon),
*server->peer) != 0) {
gpr_log(GPR_ERROR, "mismatched peer! %s vs %s", server->peer->c_str(),
std::string(peer).c_str());
}
}
new_tail = static_cast<timestamp_list*>(gpr_malloc(sizeof(timestamp_list)));
@ -119,7 +119,7 @@ void reconnect_server_clear_timestamps(reconnect_server* server) {
server->head = new_head;
}
server->tail = nullptr;
gpr_free(server->peer);
delete server->peer;
server->peer = nullptr;
}

@ -32,7 +32,7 @@ typedef struct reconnect_server {
test_tcp_server tcp_server;
timestamp_list* head;
timestamp_list* tail;
char* peer;
std::string* peer;
int max_reconnect_backoff_ms;
} reconnect_server;

@ -122,11 +122,16 @@ static grpc_resource_user* te_get_resource_user(grpc_endpoint* ep) {
return grpc_endpoint_get_resource_user(te->wrapped);
}
static char* te_get_peer(grpc_endpoint* ep) {
static absl::string_view te_get_peer(grpc_endpoint* ep) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
return grpc_endpoint_get_peer(te->wrapped);
}
static absl::string_view te_get_local_address(grpc_endpoint* ep) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
return grpc_endpoint_get_local_address(te->wrapped);
}
static int te_get_fd(grpc_endpoint* ep) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
return grpc_endpoint_get_fd(te->wrapped);
@ -151,6 +156,7 @@ static const grpc_endpoint_vtable vtable = {te_read,
te_destroy,
te_get_resource_user,
te_get_peer,
te_get_local_address,
te_get_fd,
te_can_track_err};

@ -156,9 +156,8 @@ class TestTcpServer {
private:
void OnConnect(grpc_endpoint* tcp, grpc_pollset* /*accepting_pollset*/,
grpc_tcp_server_acceptor* acceptor) {
char* peer = grpc_endpoint_get_peer(tcp);
gpr_log(GPR_INFO, "Got incoming connection! from %s", peer);
gpr_free(peer);
std::string peer(grpc_endpoint_get_peer(tcp));
gpr_log(GPR_INFO, "Got incoming connection! from %s", peer.c_str());
EXPECT_FALSE(acceptor->external_connection);
listener_fd_ = grpc_tcp_server_port_fd(
acceptor->from_server, acceptor->port_index, acceptor->fd_index);

@ -53,6 +53,7 @@ class DummyEndpoint : public grpc_endpoint {
destroy,
get_resource_user,
get_peer,
get_local_address,
get_fd,
can_track_err};
grpc_endpoint::vtable = &my_vtable;
@ -124,7 +125,10 @@ class DummyEndpoint : public grpc_endpoint {
static grpc_resource_user* get_resource_user(grpc_endpoint* ep) {
return static_cast<DummyEndpoint*>(ep)->ru_;
}
static char* get_peer(grpc_endpoint* /*ep*/) { return gpr_strdup("test"); }
static absl::string_view get_peer(grpc_endpoint* /*ep*/) { return "test"; }
static absl::string_view get_local_address(grpc_endpoint* /*ep*/) {
return "test";
}
static int get_fd(grpc_endpoint* /*ep*/) { return 0; }
static bool can_track_err(grpc_endpoint* /*ep*/) { return false; }
};

Loading…
Cancel
Save