More commits

pull/16455/head
Yash Tibrewal 7 years ago
parent f71fd84e42
commit 9c5bde5e4e
  1. 28
      CMakeLists.txt
  2. 36
      Makefile
  3. 9
      build.yaml
  4. 17
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 28
      src/core/ext/transport/chttp2/transport/context_list.cc
  6. 39
      src/core/ext/transport/chttp2/transport/context_list.h
  7. 8
      src/core/ext/transport/chttp2/transport/internal.h
  8. 6
      src/core/ext/transport/chttp2/transport/writing.cc
  9. 15
      src/core/lib/iomgr/buffer_list.cc
  10. 2
      src/core/lib/iomgr/buffer_list.h
  11. 7
      src/core/lib/iomgr/endpoint.cc
  12. 3
      src/core/lib/iomgr/endpoint.h
  13. 4
      src/core/lib/iomgr/endpoint_pair_posix.cc
  14. 5
      src/core/lib/iomgr/tcp_custom.cc
  15. 68
      src/core/lib/iomgr/tcp_posix.cc
  16. 8
      src/core/lib/security/transport/secure_endpoint.cc
  17. 4
      test/core/iomgr/buffer_list_test.cc
  18. 13
      test/core/transport/chttp2/BUILD
  19. 64
      test/core/transport/chttp2/context_list_test.cc
  20. 1
      test/core/util/mock_endpoint.cc
  21. 1
      test/core/util/passthru_endpoint.cc
  22. 3
      test/core/util/trickle_endpoint.cc
  23. 15
      tools/run_tests/generated/sources_and_headers.json
  24. 24
      tools/run_tests/generated/tests.json

@ -242,6 +242,7 @@ add_dependencies(buildtests_c combiner_test)
add_dependencies(buildtests_c compression_test)
add_dependencies(buildtests_c concurrent_connectivity_test)
add_dependencies(buildtests_c connection_refused_test)
add_dependencies(buildtests_c context_list_test)
add_dependencies(buildtests_c dns_resolver_connectivity_test)
add_dependencies(buildtests_c dns_resolver_cooldown_test)
add_dependencies(buildtests_c dns_resolver_test)
@ -6175,6 +6176,33 @@ target_link_libraries(connection_refused_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(context_list_test
test/core/transport/chttp2/context_list_test.cc
)
target_include_directories(context_list_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
)
target_link_libraries(context_list_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(dns_resolver_connectivity_test
test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
)

@ -990,6 +990,7 @@ combiner_test: $(BINDIR)/$(CONFIG)/combiner_test
compression_test: $(BINDIR)/$(CONFIG)/compression_test
concurrent_connectivity_test: $(BINDIR)/$(CONFIG)/concurrent_connectivity_test
connection_refused_test: $(BINDIR)/$(CONFIG)/connection_refused_test
context_list_test: $(BINDIR)/$(CONFIG)/context_list_test
dns_resolver_connectivity_test: $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test
dns_resolver_cooldown_test: $(BINDIR)/$(CONFIG)/dns_resolver_cooldown_test
dns_resolver_test: $(BINDIR)/$(CONFIG)/dns_resolver_test
@ -1445,6 +1446,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/compression_test \
$(BINDIR)/$(CONFIG)/concurrent_connectivity_test \
$(BINDIR)/$(CONFIG)/connection_refused_test \
$(BINDIR)/$(CONFIG)/context_list_test \
$(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test \
$(BINDIR)/$(CONFIG)/dns_resolver_cooldown_test \
$(BINDIR)/$(CONFIG)/dns_resolver_test \
@ -1972,6 +1974,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/concurrent_connectivity_test || ( echo test concurrent_connectivity_test failed ; exit 1 )
$(E) "[RUN] Testing connection_refused_test"
$(Q) $(BINDIR)/$(CONFIG)/connection_refused_test || ( echo test connection_refused_test failed ; exit 1 )
$(E) "[RUN] Testing context_list_test"
$(Q) $(BINDIR)/$(CONFIG)/context_list_test || ( echo test context_list_test failed ; exit 1 )
$(E) "[RUN] Testing dns_resolver_connectivity_test"
$(Q) $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_test || ( echo test dns_resolver_connectivity_test failed ; exit 1 )
$(E) "[RUN] Testing dns_resolver_cooldown_test"
@ -11107,6 +11111,38 @@ endif
endif
CONTEXT_LIST_TEST_SRC = \
test/core/transport/chttp2/context_list_test.cc \
CONTEXT_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CONTEXT_LIST_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/context_list_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/context_list_test: $(CONTEXT_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(CONTEXT_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/context_list_test
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/context_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a
deps_context_list_test: $(CONTEXT_LIST_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(CONTEXT_LIST_TEST_OBJS:.o=.dep)
endif
endif
DNS_RESOLVER_CONNECTIVITY_TEST_SRC = \
test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc \

@ -2280,6 +2280,15 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: context_list_test
build: test
language: c
src:
- test/core/transport/chttp2/context_list_test.cc
deps:
- grpc_test_util
- grpc
uses_polling: false
- name: dns_resolver_connectivity_test
cpu_cost: 0.1
build: test

@ -31,6 +31,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
@ -155,6 +156,7 @@ bool g_flow_control_enabled = true;
*/
static void destruct_transport(grpc_chttp2_transport* t) {
gpr_log(GPR_INFO, "destruct transport %p", t);
size_t i;
grpc_endpoint_destroy(t->ep);
@ -164,6 +166,8 @@ static void destruct_transport(grpc_chttp2_transport* t) {
grpc_slice_buffer_destroy_internal(&t->outbuf);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
grpc_core::ContextList::Execute(t->cl, nullptr, GRPC_ERROR_NONE);
grpc_slice_buffer_destroy_internal(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
@ -236,7 +240,7 @@ static void init_transport(grpc_chttp2_transport* t,
size_t i;
int j;
grpc_tcp_set_write_timestamps_callback(ContextList::Execute);
grpc_tcp_set_write_timestamps_callback(grpc_core::ContextList::Execute);
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@ -1027,13 +1031,16 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
static void write_action(void* gt, grpc_error* error) {
GPR_TIMER_SCOPE("write_action", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
void *cl = t->cl;
void* cl = t->cl;
t->cl = nullptr;
if (cl) {
gpr_log(GPR_INFO, "cleared for write");
}
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner)), cl
);
grpc_combiner_scheduler(t->combiner)),
cl);
}
/* Callback from the grpc_endpoint after bytes have been written by calling
@ -1357,7 +1364,7 @@ static void perform_stream_op_locked(void* stream_op,
GRPC_STATS_INC_HTTP2_OP_BATCHES();
s->context = op->context;
s->context = op->payload->context;
if (grpc_http_trace.enabled()) {
char* str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,

@ -21,22 +21,32 @@
#include "src/core/ext/transport/chttp2/transport/context_list.h"
namespace {
void (*cb)(void *, grpc_core::Timestamps*);
void (*cb)(void*, grpc_core::Timestamps*) = nullptr;
}
void ContextList::Execute(ContextList *head, grpc_core::Timestamps *ts, grpc_error* error) {
ContextList *ptr;
while(head != nullptr) {
if(error == GRPC_ERROR_NONE) {
cb(head->context, ts);
namespace grpc_core {
void ContextList::Execute(void* arg, grpc_core::Timestamps* ts,
grpc_error* error) {
gpr_log(GPR_INFO, "execute");
ContextList* head = static_cast<ContextList*>(arg);
ContextList* ptr;
while (head != nullptr) {
if (error == GRPC_ERROR_NONE && ts != nullptr) {
if (cb) {
cb(head->s->context, ts);
}
}
gpr_log(GPR_INFO, "one iteration %p %p", head, arg);
// GRPC_CHTTP2_STREAM_UNREF(static_cast<grpc_chttp2_stream *>(head->s),
// "timestamp exec");
ptr = head;
head_ = head->next;
head = head->next;
gpr_free(ptr);
}
}
grpc_http2_set_write_timestamps_callback(void (*fn)(void *, grpc_core::Timestamps *)) {
void grpc_http2_set_write_timestamps_callback(
void (*fn)(void*, grpc_core::Timestamps*)) {
cb = fn;
}
} /* namespace grpc_core */

@ -21,44 +21,55 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
namespace grpc_core {
/** A list of RPC Contexts */
class ContextList {\
class ContextList {
public:
/* Creates a new element with \a context as the value and appends it to the
* list. */
void Append(ContextList **head, void *context) {
static void Append(ContextList** head, grpc_chttp2_stream* s) {
/* Make sure context is not already present */
ContextList *ptr = *head;
while(ptr != nullptr) {
if(ptr->context == context) {
ContextList* ptr = *head;
// GRPC_CHTTP2_STREAM_REF(s, "timestamp");
while (ptr != nullptr) {
if (ptr->s == s) {
GPR_ASSERT(false);
}
ptr = ptr->next;
}
ContextList *elem = static_cast<ContextListElement *>(gpr_malloc(sizeof(ContextList)));
elem->context = context;
ContextList* elem =
static_cast<ContextList*>(gpr_malloc(sizeof(ContextList)));
elem->s = s;
elem->next = nullptr;
if(*head_ == nullptr) {
if (*head == nullptr) {
*head = elem;
gpr_log(GPR_INFO, "new head");
gpr_log(GPR_INFO, "append %p %p", elem, *head);
return;
}
gpr_log(GPR_INFO, "append %p %p", elem, *head);
ptr = *head;
while(ptr->next != nullptr) {
while (ptr->next != nullptr) {
ptr = ptr->next;
}
ptr->next = elem;
}
/* Executes a function \a fn with each context in the list and \a arg. It also
/* Executes a function \a fn with each context in the list and \a ts. It also
* frees up the entire list after this operation. */
void Execute(ContextList *head, grpc_core::Timestamps *ts, grpc_error* error);
static void Execute(void* arg, grpc_core::Timestamps* ts, grpc_error* error);
private:
void *context;
ContextListElement *next;
grpc_chttp2_stream* s;
ContextList* next;
};
grpc_http2_set_write_timestamps_callback(void (*fn)(void *, grpc_core::Timestamps*));
void grpc_http2_set_write_timestamps_callback(
void (*fn)(void*, grpc_core::Timestamps*));
} /* namespace grpc_core */
#endif /* GRPC_CORE_EXT_TRANSPORT_CONTEXT_LIST_H */

@ -44,6 +44,10 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
namespace grpc_core {
class ContextList;
}
/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
typedef enum {
@ -469,7 +473,7 @@ struct grpc_chttp2_transport {
bool keepalive_permit_without_calls;
/** keep-alive state machine state */
grpc_chttp2_keepalive_state keepalive_state;
ContextList *cl;
grpc_core::ContextList* cl;
};
typedef enum {
@ -480,7 +484,7 @@ typedef enum {
} grpc_published_metadata_method;
struct grpc_chttp2_stream {
void *context;
void* context;
grpc_chttp2_transport* t;
grpc_stream_refcount* refcount;

@ -18,6 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include <limits.h>
@ -487,7 +488,10 @@ class StreamWriteContext {
return; // early out: nothing to do
}
ContextList::Append(&t_->cl, s_->context);
if (/* traced && */ grpc_endpoint_can_track_err(t_->ep)) {
gpr_log(GPR_INFO, "for transport %p", t_);
grpc_core::ContextList::Append(&t_->cl, s_);
}
while ((s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) &&
data_send_context.max_outgoing() > 0) {

@ -31,6 +31,7 @@
namespace grpc_core {
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
void* arg) {
gpr_log(GPR_INFO, "new entry %u", seq_no);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
@ -64,6 +65,7 @@ void (*timestamps_callback)(void*, grpc_core::Timestamps*,
void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct scm_timestamping* tss) {
gpr_log(GPR_INFO, "process timestamp %u", serr->ee_data);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
TracedBuffer* next = nullptr;
@ -85,6 +87,7 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
/* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the
* restriction on the lifetime. */
gpr_log(GPR_INFO, "calling");
timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
next = elem->next_;
Delete<TracedBuffer>(elem);
@ -99,18 +102,22 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
}
}
void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining,
grpc_error* shutdown_err) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
gpr_log(GPR_INFO, "shutdown");
while (elem != nullptr) {
if (timestamps_callback) {
timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
}
timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
gpr_log(GPR_INFO, "iter");
auto* next = elem->next_;
Delete<TracedBuffer>(elem);
elem = next;
}
*head = nullptr;
if (remaining != nullptr) {
timestamps_callback(remaining, nullptr, shutdown_err);
}
GRPC_ERROR_UNREF(shutdown_err);
}

@ -67,7 +67,7 @@ class TracedBuffer {
/** Cleans the list by calling the callback for each traced buffer in the list
* with timestamps that it has. */
static void Shutdown(grpc_core::TracedBuffer** head,
static void Shutdown(grpc_core::TracedBuffer** head, void* remaining,
grpc_error* shutdown_err);
private:

@ -61,3 +61,10 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) {
return ep->vtable->get_resource_user(ep);
}
bool grpc_endpoint_can_track_err(grpc_endpoint* ep) {
if (ep->vtable->can_track_err != nullptr) {
return ep->vtable->can_track_err(ep);
}
return false;
}

@ -47,6 +47,7 @@ struct grpc_endpoint_vtable {
grpc_resource_user* (*get_resource_user)(grpc_endpoint* ep);
char* (*get_peer)(grpc_endpoint* ep);
int (*get_fd)(grpc_endpoint* ep);
bool (*can_track_err)(grpc_endpoint* ep);
};
/* When data is available on the connection, calls the callback with slices.
@ -95,6 +96,8 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep,
grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* endpoint);
bool grpc_endpoint_can_track_err(grpc_endpoint* ep);
struct grpc_endpoint {
const grpc_endpoint_vtable* vtable;
};

@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
"socketpair-client");
gpr_free(final_name);

@ -326,6 +326,8 @@ static grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) {
static int endpoint_get_fd(grpc_endpoint* ep) { return -1; }
static bool endpoint_can_track_err(grpc_endpoint* ep) { return false; }
static grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
endpoint_add_to_pollset,
@ -335,7 +337,8 @@ static grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_destroy,
endpoint_get_resource_user,
endpoint_get_peer,
endpoint_get_fd};
endpoint_get_fd,
endpoint_can_track_err};
grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket,
grpc_resource_quota* resource_quota,

@ -375,8 +375,15 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
gpr_log(GPR_INFO, "tcp destroy %p %p", ep, tcp->outgoing_buffer_arg);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
&tcp->tb_head, tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}
@ -578,6 +585,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
ssize_t* sent_length,
grpc_error** error) {
if (!tcp->socket_ts_enabled) {
gpr_log(GPR_INFO, "set timestamps");
uint32_t opt = grpc_core::kTimestampingSocketOptions;
if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
static_cast<void*>(&opt), sizeof(opt)) != 0) {
@ -610,6 +618,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
*sent_length = length;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) {
gpr_log(GPR_INFO, "tcp new entry %p %p", tcp, tcp->outgoing_buffer_arg);
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
&tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
@ -668,6 +677,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
* non-linux platforms, error processing is not used/enabled currently.
*/
static bool process_errors(grpc_tcp* tcp) {
gpr_log(GPR_INFO, "process errors");
while (true) {
struct iovec iov;
iov.iov_base = nullptr;
@ -730,6 +740,8 @@ static bool process_errors(grpc_tcp* tcp) {
}
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
gpr_log(GPR_INFO, "handle error %p", arg);
GRPC_LOG_IF_ERROR("handle error", GRPC_ERROR_REF(error));
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
@ -739,7 +751,8 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
/* We aren't going to register to hear on error anymore, so it is safe to
* unref. */
grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error));
gpr_log(GPR_INFO, "%p %d early return", error,
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification)));
TCP_UNREF(tcp, "error-tracking");
return;
}
@ -774,6 +787,17 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
}
#endif /* GRPC_LINUX_ERRQUEUE */
void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
if (tcp->outgoing_buffer_arg) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
&tcp->tb_head, tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
}
}
/* returns true if done, false if pending; if returning true, *error is set */
#if defined(IOV_MAX) && IOV_MAX < 1000
#define MAX_WRITE_IOVEC IOV_MAX
@ -821,8 +845,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_flags = 0;
if (tcp->outgoing_buffer_arg != nullptr) {
if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
error))
error)) {
gpr_log(GPR_INFO, "something went wrong");
tcp_shutdown_buffer_list(tcp);
return true; /* something went wrong with timestamps */
}
} else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
@ -844,12 +871,16 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
return false;
} else if (errno == EPIPE) {
gpr_log(GPR_INFO, "something went wrong");
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
return true;
} else {
gpr_log(GPR_INFO, "something went wrong");
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
return true;
}
}
@ -910,6 +941,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
gpr_log(GPR_INFO, "tcp_write %p %p", ep, arg);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error* error = GRPC_ERROR_NONE;
@ -926,17 +958,18 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
GPR_ASSERT(tcp->write_cb == nullptr);
tcp->outgoing_buffer_arg = arg;
if (buf->length == 0) {
GRPC_CLOSURE_SCHED(
cb, grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
tcp_shutdown_buffer_list(tcp);
return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
tcp->outgoing_buffer_arg = arg;
if (arg) {
GPR_ASSERT(grpc_event_engine_can_track_errors());
}
@ -949,6 +982,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
notify_on_write(tcp);
} else {
gpr_log(GPR_INFO, "imm sched");
if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
@ -989,6 +1023,23 @@ static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) {
return tcp->resource_user;
}
static bool tcp_can_track_err(grpc_endpoint* ep) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
if (!grpc_event_engine_can_track_errors()) {
return false;
}
struct sockaddr addr;
socklen_t len = sizeof(addr);
if (getsockname(tcp->fd, &addr, &len) < 0) {
gpr_log(GPR_ERROR, "getsockname");
return false;
}
if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) {
return true;
}
return false;
}
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_add_to_pollset,
@ -998,7 +1049,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_destroy,
tcp_get_resource_user,
tcp_get_peer,
tcp_get_fd};
tcp_get_fd,
tcp_can_track_err};
#define MAX_CHUNK_SIZE 32 * 1024 * 1024
@ -1059,6 +1111,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->is_first_read = true;
tcp->bytes_counter = -1;
tcp->socket_ts_enabled = false;
tcp->outgoing_buffer_arg = nullptr;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@ -1097,12 +1150,19 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
gpr_log(GPR_INFO, "destroy and release %p %p", ep, tcp->outgoing_buffer_arg);
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
/* Stop errors notification. */
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
&tcp->tb_head, tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}

@ -389,6 +389,11 @@ static grpc_resource_user* endpoint_get_resource_user(
return grpc_endpoint_get_resource_user(ep->wrapped_ep);
}
static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
return grpc_endpoint_can_track_err(ep->wrapped_ep);
}
static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
endpoint_add_to_pollset,
@ -398,7 +403,8 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_destroy,
endpoint_get_resource_user,
endpoint_get_peer,
endpoint_get_fd};
endpoint_get_fd,
endpoint_can_track_err};
grpc_endpoint* grpc_secure_endpoint_create(
struct tsi_frame_protector* protector,

@ -50,7 +50,7 @@ static void TestShutdownFlushesList() {
grpc_core::TracedBuffer::AddNewEntry(
&list, i, static_cast<void*>(&verifier_called[i]));
}
grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE);
GPR_ASSERT(list == nullptr);
for (auto i = 0; i < NUM_ELEM; i++) {
GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
@ -88,7 +88,7 @@ static void TestVerifierCalledOnAck() {
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
GPR_ASSERT(list == nullptr);
grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE);
}
static void TestTcpBufferList() {

@ -66,6 +66,19 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "context_list_test",
srcs = ["context_list_test.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "hpack_encoder_test",
srcs = ["hpack_encoder_test.cc"],

@ -0,0 +1,64 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/port.h"
#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include <grpc/grpc.h>
#include "test/core/util/test_config.h"
static void TestExecuteFlushesListVerifier(void* arg,
grpc_core::Timestamps* ts) {
GPR_ASSERT(arg != nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
/** Tests that all ContextList elements in the list are flushed out on
* execute.
* Also tests that arg is passed correctly.
*/
static void TestExecuteFlushesList() {
grpc_core::ContextList* list = nullptr;
grpc_http2_set_write_timestamps_callback(TestExecuteFlushesListVerifier);
#define NUM_ELEM 5
grpc_chttp2_stream s[NUM_ELEM];
gpr_atm verifier_called[NUM_ELEM];
for (auto i = 0; i < NUM_ELEM; i++) {
s[i].context = &verifier_called[i];
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::ContextList::Append(&list, &s[i]);
}
grpc_core::Timestamps ts;
grpc_core::ContextList::Execute(list, &ts, GRPC_ERROR_NONE);
for (auto i = 0; i < NUM_ELEM; i++) {
GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
static_cast<gpr_atm>(1));
}
}
static void TestContextList() { TestExecuteFlushesList(); }
int main(int argc, char** argv) {
grpc_init();
TestContextList();
grpc_shutdown();
return 0;
}

@ -114,6 +114,7 @@ static const grpc_endpoint_vtable vtable = {
me_get_resource_user,
me_get_peer,
me_get_fd,
nullptr,
};
grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice),

@ -171,6 +171,7 @@ static const grpc_endpoint_vtable vtable = {
me_get_resource_user,
me_get_peer,
me_get_fd,
nullptr,
};
static void half_init(half* m, passthru_endpoint* parent,

@ -148,7 +148,8 @@ static const grpc_endpoint_vtable vtable = {te_read,
te_destroy,
te_get_resource_user,
te_get_peer,
te_get_fd};
te_get_fd,
nullptr};
grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap,
double bytes_per_second) {

@ -364,6 +364,21 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "context_list_test",
"src": [
"test/core/transport/chttp2/context_list_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -433,6 +433,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "context_list_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save