C++ize BDP estimator, introduce ManualConstructor

pull/12894/head
Craig Tiller 7 years ago
parent 313f36fd06
commit 922260656a
  1. 1
      BUILD
  2. 4
      CMakeLists.txt
  3. 4
      Makefile
  4. 3
      build.yaml
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 4
      grpc.gyp
  8. 1
      package.xml
  9. 13
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  10. 7
      src/core/ext/transport/chttp2/transport/flow_control.cc
  11. 3
      src/core/ext/transport/chttp2/transport/internal.h
  12. 71
      src/core/lib/support/manual_constructor.h
  13. 126
      src/core/lib/transport/bdp_estimator.cc
  14. 117
      src/core/lib/transport/bdp_estimator.h
  15. 17
      test/core/util/BUILD
  16. 2
      test/core/util/debugger_macros.cc
  17. 8
      test/core/util/debugger_macros.h
  18. 1
      tools/doxygen/Doxyfile.c++.internal
  19. 1
      tools/doxygen/Doxyfile.core.internal
  20. 4
      tools/run_tests/generated/sources_and_headers.json

@ -515,6 +515,7 @@ grpc_cc_library(
"src/core/lib/support/atomic_with_std.h", "src/core/lib/support/atomic_with_std.h",
"src/core/lib/support/env.h", "src/core/lib/support/env.h",
"src/core/lib/support/memory.h", "src/core/lib/support/memory.h",
"src/core/lib/support/manual_constructor.h",
"src/core/lib/support/mpscq.h", "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h", "src/core/lib/support/murmur_hash.h",
"src/core/lib/support/spinlock.h", "src/core/lib/support/spinlock.h",

@ -1616,7 +1616,7 @@ add_library(grpc_test_util
test/core/end2end/fixtures/http_proxy_fixture.c test/core/end2end/fixtures/http_proxy_fixture.c
test/core/end2end/fixtures/proxy.c test/core/end2end/fixtures/proxy.c
test/core/iomgr/endpoint_tests.c test/core/iomgr/endpoint_tests.c
test/core/util/debugger_macros.c test/core/util/debugger_macros.cc
test/core/util/grpc_profiler.c test/core/util/grpc_profiler.c
test/core/util/memory_counters.c test/core/util/memory_counters.c
test/core/util/mock_endpoint.c test/core/util/mock_endpoint.c
@ -1880,7 +1880,7 @@ add_library(grpc_test_util_unsecure
test/core/end2end/fixtures/http_proxy_fixture.c test/core/end2end/fixtures/http_proxy_fixture.c
test/core/end2end/fixtures/proxy.c test/core/end2end/fixtures/proxy.c
test/core/iomgr/endpoint_tests.c test/core/iomgr/endpoint_tests.c
test/core/util/debugger_macros.c test/core/util/debugger_macros.cc
test/core/util/grpc_profiler.c test/core/util/grpc_profiler.c
test/core/util/memory_counters.c test/core/util/memory_counters.c
test/core/util/mock_endpoint.c test/core/util/mock_endpoint.c

@ -3606,7 +3606,7 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/end2end/fixtures/http_proxy_fixture.c \ test/core/end2end/fixtures/http_proxy_fixture.c \
test/core/end2end/fixtures/proxy.c \ test/core/end2end/fixtures/proxy.c \
test/core/iomgr/endpoint_tests.c \ test/core/iomgr/endpoint_tests.c \
test/core/util/debugger_macros.c \ test/core/util/debugger_macros.cc \
test/core/util/grpc_profiler.c \ test/core/util/grpc_profiler.c \
test/core/util/memory_counters.c \ test/core/util/memory_counters.c \
test/core/util/mock_endpoint.c \ test/core/util/mock_endpoint.c \
@ -3861,7 +3861,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
test/core/end2end/fixtures/http_proxy_fixture.c \ test/core/end2end/fixtures/http_proxy_fixture.c \
test/core/end2end/fixtures/proxy.c \ test/core/end2end/fixtures/proxy.c \
test/core/iomgr/endpoint_tests.c \ test/core/iomgr/endpoint_tests.c \
test/core/util/debugger_macros.c \ test/core/util/debugger_macros.cc \
test/core/util/grpc_profiler.c \ test/core/util/grpc_profiler.c \
test/core/util/memory_counters.c \ test/core/util/memory_counters.c \
test/core/util/mock_endpoint.c \ test/core/util/mock_endpoint.c \

@ -143,6 +143,7 @@ filegroups:
- src/core/lib/support/atomic_with_atm.h - src/core/lib/support/atomic_with_atm.h
- src/core/lib/support/atomic_with_std.h - src/core/lib/support/atomic_with_std.h
- src/core/lib/support/env.h - src/core/lib/support/env.h
- src/core/lib/support/manual_constructor.h
- src/core/lib/support/memory.h - src/core/lib/support/memory.h
- src/core/lib/support/mpscq.h - src/core/lib/support/mpscq.h
- src/core/lib/support/murmur_hash.h - src/core/lib/support/murmur_hash.h
@ -741,7 +742,7 @@ filegroups:
- test/core/end2end/fixtures/http_proxy_fixture.c - test/core/end2end/fixtures/http_proxy_fixture.c
- test/core/end2end/fixtures/proxy.c - test/core/end2end/fixtures/proxy.c
- test/core/iomgr/endpoint_tests.c - test/core/iomgr/endpoint_tests.c
- test/core/util/debugger_macros.c - test/core/util/debugger_macros.cc
- test/core/util/grpc_profiler.c - test/core/util/grpc_profiler.c
- test/core/util/memory_counters.c - test/core/util/memory_counters.c
- test/core/util/mock_endpoint.c - test/core/util/mock_endpoint.c

@ -191,6 +191,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/atomic_with_atm.h', 'src/core/lib/support/atomic_with_atm.h',
'src/core/lib/support/atomic_with_std.h', 'src/core/lib/support/atomic_with_std.h',
'src/core/lib/support/env.h', 'src/core/lib/support/env.h',
'src/core/lib/support/manual_constructor.h',
'src/core/lib/support/memory.h', 'src/core/lib/support/memory.h',
'src/core/lib/support/mpscq.h', 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h', 'src/core/lib/support/murmur_hash.h',
@ -736,6 +737,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/atomic_with_atm.h', 'src/core/lib/support/atomic_with_atm.h',
'src/core/lib/support/atomic_with_std.h', 'src/core/lib/support/atomic_with_std.h',
'src/core/lib/support/env.h', 'src/core/lib/support/env.h',
'src/core/lib/support/manual_constructor.h',
'src/core/lib/support/memory.h', 'src/core/lib/support/memory.h',
'src/core/lib/support/mpscq.h', 'src/core/lib/support/mpscq.h',
'src/core/lib/support/murmur_hash.h', 'src/core/lib/support/murmur_hash.h',

@ -89,6 +89,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/atomic_with_atm.h ) s.files += %w( src/core/lib/support/atomic_with_atm.h )
s.files += %w( src/core/lib/support/atomic_with_std.h ) s.files += %w( src/core/lib/support/atomic_with_std.h )
s.files += %w( src/core/lib/support/env.h ) s.files += %w( src/core/lib/support/env.h )
s.files += %w( src/core/lib/support/manual_constructor.h )
s.files += %w( src/core/lib/support/memory.h ) s.files += %w( src/core/lib/support/memory.h )
s.files += %w( src/core/lib/support/mpscq.h ) s.files += %w( src/core/lib/support/mpscq.h )
s.files += %w( src/core/lib/support/murmur_hash.h ) s.files += %w( src/core/lib/support/murmur_hash.h )

@ -515,7 +515,7 @@
'test/core/end2end/fixtures/http_proxy_fixture.c', 'test/core/end2end/fixtures/http_proxy_fixture.c',
'test/core/end2end/fixtures/proxy.c', 'test/core/end2end/fixtures/proxy.c',
'test/core/iomgr/endpoint_tests.c', 'test/core/iomgr/endpoint_tests.c',
'test/core/util/debugger_macros.c', 'test/core/util/debugger_macros.cc',
'test/core/util/grpc_profiler.c', 'test/core/util/grpc_profiler.c',
'test/core/util/memory_counters.c', 'test/core/util/memory_counters.c',
'test/core/util/mock_endpoint.c', 'test/core/util/mock_endpoint.c',
@ -722,7 +722,7 @@
'test/core/end2end/fixtures/http_proxy_fixture.c', 'test/core/end2end/fixtures/http_proxy_fixture.c',
'test/core/end2end/fixtures/proxy.c', 'test/core/end2end/fixtures/proxy.c',
'test/core/iomgr/endpoint_tests.c', 'test/core/iomgr/endpoint_tests.c',
'test/core/util/debugger_macros.c', 'test/core/util/debugger_macros.cc',
'test/core/util/grpc_profiler.c', 'test/core/util/grpc_profiler.c',
'test/core/util/memory_counters.c', 'test/core/util/memory_counters.c',
'test/core/util/mock_endpoint.c', 'test/core/util/mock_endpoint.c',

@ -101,6 +101,7 @@
<file baseinstalldir="/" name="src/core/lib/support/atomic_with_atm.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/atomic_with_atm.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/atomic_with_std.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/atomic_with_std.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/env.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/env.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/manual_constructor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/memory.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/memory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/mpscq.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/murmur_hash.h" role="src" />

@ -218,6 +218,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next; t->write_cb_pool = next;
} }
t->flow_control.bdp_estimator.Destroy();
gpr_free(t->ping_acks); gpr_free(t->ping_acks);
gpr_free(t->peer_string); gpr_free(t->peer_string);
gpr_free(t); gpr_free(t);
@ -315,7 +317,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t, keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string); t->flow_control.bdp_estimator.Init(t->peer_string);
grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@ -2434,7 +2436,7 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
} }
if (action.need_ping) { if (action.need_ping) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); t->flow_control.bdp_estimator->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked); &t->finish_bdp_ping_locked);
} }
@ -2493,8 +2495,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE}; GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
grpc_bdp_estimator_add_incoming_bytes( t->flow_control.bdp_estimator->AddIncomingBytes(
&t->flow_control.bdp_estimator,
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] = errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@ -2569,7 +2570,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
} }
grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator); t->flow_control.bdp_estimator->StartPing();
} }
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@ -2578,7 +2579,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (GRPC_TRACER_ON(grpc_http_trace)) { if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
} }
grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator); t->flow_control.bdp_estimator->CompletePing(exec_ctx);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
} }

@ -459,12 +459,11 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
} }
} }
if (tfc->enable_bdp_probe) { if (tfc->enable_bdp_probe) {
action.need_ping = action.need_ping = tfc->bdp_estimator->NeedPing(exec_ctx);
grpc_bdp_estimator_need_ping(exec_ctx, &tfc->bdp_estimator);
// get bdp estimate and update initial_window accordingly. // get bdp estimate and update initial_window accordingly.
int64_t estimate = -1; int64_t estimate = -1;
if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) { if (tfc->bdp_estimator->EstimateBdp(&estimate)) {
double target = 1 + log2((double)estimate); double target = 1 + log2((double)estimate);
// target might change based on how much memory pressure we are under // target might change based on how much memory pressure we are under
@ -491,7 +490,7 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
// get bandwidth estimate and update max_frame accordingly. // get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1; double bw_dbl = -1;
if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds. // we target the max of BDP or bandwidth in microseconds.
int32_t frame_size = (int32_t)GPR_CLAMP( int32_t frame_size = (int32_t)GPR_CLAMP(
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,

@ -37,6 +37,7 @@
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/pid_controller.h" #include "src/core/lib/transport/pid_controller.h"
@ -268,7 +269,7 @@ typedef struct {
bool enable_bdp_probe; bool enable_bdp_probe;
/* bdp estimation */ /* bdp estimation */
grpc_bdp_estimator bdp_estimator; grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator;
/* pid controller */ /* pid controller */
bool pid_controller_initialized; bool pid_controller_initialized;

@ -0,0 +1,71 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// manually construct a region of memory with some type
#include <stddef.h>
#include <new>
#include <type_traits>
#include <utility>
namespace grpc_core {
template <typename Type>
class ManualConstructor {
public:
// No constructor or destructor because one of the most useful uses of
// this class is as part of a union, and members of a union could not have
// constructors or destructors till C++11. And, anyway, the whole point of
// this class is to bypass constructor and destructor.
Type* get() { return reinterpret_cast<Type*>(&space_); }
const Type* get() const { return reinterpret_cast<const Type*>(&space_); }
Type* operator->() { return get(); }
const Type* operator->() const { return get(); }
Type& operator*() { return *get(); }
const Type& operator*() const { return *get(); }
void Init() { new (&space_) Type; }
// Init() constructs the Type instance using the given arguments
// (which are forwarded to Type's constructor).
//
// Note that Init() with no arguments performs default-initialization,
// not zero-initialization (i.e it behaves the same as "new Type;", not
// "new Type();"), so it will leave non-class types uninitialized.
template <typename... Ts>
void Init(Ts&&... args) {
new (&space_) Type(std::forward<Ts>(args)...);
}
// Init() that is equivalent to copy and move construction.
// Enables usage like this:
// ManualConstructor<std::vector<int>> v;
// v.Init({1, 2, 3});
void Init(const Type& x) { new (&space_) Type(x); }
void Init(Type&& x) { new (&space_) Type(std::move(x)); }
void Destroy() { get()->~Type(); }
private:
typename std::aligned_storage<sizeof(Type), alignof(Type)>::type space_;
};
} // namespace grpc_core

@ -21,117 +21,65 @@
#include <inttypes.h> #include <inttypes.h>
#include <stdlib.h> #include <stdlib.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
grpc_tracer_flag grpc_bdp_estimator_trace = grpc_tracer_flag grpc_bdp_estimator_trace =
GRPC_TRACER_INITIALIZER(false, "bdp_estimator"); GRPC_TRACER_INITIALIZER(false, "bdp_estimator");
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { namespace grpc_core {
estimator->estimate = 65536;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC);
estimator->next_ping_scheduled = 0;
estimator->name = name;
estimator->bw_est = 0;
estimator->inter_ping_delay = 100.0; // start at 100ms
estimator->stable_estimate_count = 0;
}
bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate) {
*estimate = estimator->estimate;
return true;
}
bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
double *bw) {
*bw = estimator->bw_est;
return true;
}
void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
estimator->accumulator += num_bytes;
}
bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx,
const grpc_bdp_estimator *estimator) {
switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED:
return grpc_exec_ctx_now(exec_ctx) >= estimator->next_ping_scheduled;
case GRPC_BDP_PING_SCHEDULED:
return false;
case GRPC_BDP_PING_STARTED:
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) { BdpEstimator::BdpEstimator(const char *name)
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { : ping_state_(PingState::UNSCHEDULED),
gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, accumulator_(0),
estimator->name, estimator->accumulator, estimator->estimate); estimate_(65536),
} ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)),
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED); next_ping_scheduled_(0),
estimator->ping_state = GRPC_BDP_PING_SCHEDULED; inter_ping_delay_(100.0), // start at 100ms
estimator->accumulator = 0; stable_estimate_count_(0),
} bw_est_(0),
name_(name) {}
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
estimator->name, estimator->accumulator, estimator->estimate);
}
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
estimator->ping_state = GRPC_BDP_PING_STARTED;
estimator->accumulator = 0;
estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx,
grpc_bdp_estimator *estimator) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time); gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0; double bw = dt > 0 ? ((double)accumulator_ / dt) : 0;
int start_inter_ping_delay = estimator->inter_ping_delay; int start_inter_ping_delay = inter_ping_delay_;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64
" dt=%lf bw=%lfMbs bw_est=%lfMbs", " dt=%lf bw=%lfMbs bw_est=%lfMbs",
estimator->name, estimator->accumulator, estimator->estimate, dt, name_, accumulator_, estimate_, dt, bw / 125000.0,
bw / 125000.0, estimator->bw_est / 125000.0); bw_est_ / 125000.0);
} }
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED); GPR_ASSERT(ping_state_ == PingState::STARTED);
if (estimator->accumulator > 2 * estimator->estimate / 3 && if (accumulator_ > 2 * estimate_ / 3 && bw > bw_est_) {
bw > estimator->bw_est) { estimate_ = GPR_MAX(accumulator_, estimate_ * 2);
estimator->estimate = bw_est_ = bw;
GPR_MAX(estimator->accumulator, estimator->estimate * 2);
estimator->bw_est = bw;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_,
estimator->name, estimator->estimate); estimate_);
} }
estimator->inter_ping_delay /= 2; // if the ping estimate changes, inter_ping_delay_ /= 2; // if the ping estimate changes,
// exponentially get faster at probing // exponentially get faster at probing
} else if (estimator->inter_ping_delay < 10000) { } else if (inter_ping_delay_ < 10000) {
estimator->stable_estimate_count++; stable_estimate_count_++;
if (estimator->stable_estimate_count >= 2) { if (stable_estimate_count_ >= 2) {
estimator->inter_ping_delay += inter_ping_delay_ +=
100 + 100 +
(int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady, (int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady,
// slowly ramp down the probe time // slowly ramp down the probe time
} }
} }
if (start_inter_ping_delay != estimator->inter_ping_delay) { if (start_inter_ping_delay != inter_ping_delay_) {
estimator->stable_estimate_count = 0; stable_estimate_count_ = 0;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name, gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_,
estimator->inter_ping_delay); inter_ping_delay_);
} }
} }
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; ping_state_ = PingState::UNSCHEDULED;
estimator->accumulator = 0; accumulator_ = 0;
estimator->next_ping_scheduled = next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
grpc_exec_ctx_now(exec_ctx) + estimator->inter_ping_delay;
} }
} // namespace grpc_core

@ -19,67 +19,94 @@
#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H #ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H #define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
#include <grpc/support/time.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#define GRPC_BDP_SAMPLES 16 #include <grpc/support/log.h>
#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3 #include <grpc/support/time.h>
#ifdef __cplusplus #include "src/core/lib/debug/trace.h"
extern "C" { #include "src/core/lib/iomgr/exec_ctx.h"
#endif
extern grpc_tracer_flag grpc_bdp_estimator_trace; extern grpc_tracer_flag grpc_bdp_estimator_trace;
typedef enum { namespace grpc_core {
GRPC_BDP_PING_UNSCHEDULED,
GRPC_BDP_PING_SCHEDULED,
GRPC_BDP_PING_STARTED
} grpc_bdp_estimator_ping_state;
typedef struct grpc_bdp_estimator {
grpc_bdp_estimator_ping_state ping_state;
int64_t accumulator;
int64_t estimate;
// when was the current ping started?
gpr_timespec ping_start_time;
// when should the next ping start?
grpc_millis next_ping_scheduled;
int inter_ping_delay;
int stable_estimate_count;
double bw_est;
const char *name;
} grpc_bdp_estimator;
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); class BdpEstimator {
public:
explicit BdpEstimator(const char *name);
~BdpEstimator();
// Returns true if a reasonable estimate could be obtained // Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, bool EstimateBdp(int64_t *estimate_out) {
int64_t *estimate); *estimate_out = estimate_;
// Tracks new bytes read. return true;
bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw); }
// Returns true if the user should schedule a ping bool EstimateBandwidth(double *bw_out) {
void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, *bw_out = bw_est_;
int64_t num_bytes); return true;
}
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
// Returns true if the user should schedule a ping // Returns true if the user should schedule a ping
bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx, bool NeedPing(grpc_exec_ctx *exec_ctx) {
const grpc_bdp_estimator *estimator); switch (ping_state_) {
case PingState::UNSCHEDULED:
return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_;
case PingState::SCHEDULED:
case PingState::STARTED:
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
// Schedule a ping: call in response to receiving a true from // Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started) // transport (but not necessarily started)
void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator); void SchedulePing() {
// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_,
accumulator_, estimate_);
}
GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED);
ping_state_ = PingState::SCHEDULED;
accumulator_ = 0;
}
// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and
// once
// the ping is on the wire // the ping is on the wire
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator); void StartPing() {
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_,
accumulator_, estimate_);
}
GPR_ASSERT(ping_state_ == PingState::SCHEDULED);
ping_state_ = PingState::STARTED;
accumulator_ = 0;
ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
}
// Completes a previously started ping // Completes a previously started ping
void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx, void CompletePing(grpc_exec_ctx *exec_ctx);
grpc_bdp_estimator *estimator);
#ifdef __cplusplus private:
} enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
#endif
PingState ping_state_;
int64_t accumulator_;
int64_t estimate_;
// when was the current ping started?
gpr_timespec ping_start_time_;
// when should the next ping start?
grpc_millis next_ping_scheduled_;
int inter_ping_delay_;
int stable_estimate_count_;
double bw_est_;
const char *name_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */ #endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */

@ -32,9 +32,22 @@ grpc_cc_library(
) )
grpc_cc_library( grpc_cc_library(
name = "grpc_test_util_base", name = "grpc_debugger_macros",
srcs = [ srcs = [
"debugger_macros.c", "debugger_macros.c",
],
hdrs = [
"debugger_macros.h",
],
deps = [
":gpr_test_util",
"//:grpc_common",
],
)
grpc_cc_library(
name = "grpc_test_util_base",
srcs = [
"grpc_profiler.c", "grpc_profiler.c",
"mock_endpoint.c", "mock_endpoint.c",
"parse_hexstring.c", "parse_hexstring.c",
@ -47,7 +60,6 @@ grpc_cc_library(
"trickle_endpoint.c", "trickle_endpoint.c",
], ],
hdrs = [ hdrs = [
"debugger_macros.h",
"grpc_profiler.h", "grpc_profiler.h",
"mock_endpoint.h", "mock_endpoint.h",
"parse_hexstring.h", "parse_hexstring.h",
@ -63,6 +75,7 @@ grpc_cc_library(
deps = [ deps = [
":gpr_test_util", ":gpr_test_util",
"//:grpc_common", "//:grpc_common",
":grpc_debugger_macros"
], ],
) )

@ -29,7 +29,7 @@
#include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
void grpc_summon_debugger_macros() {} extern "C" void grpc_summon_debugger_macros() {}
grpc_stream *grpc_transport_stream_from_call(grpc_call *call) { grpc_stream *grpc_transport_stream_from_call(grpc_call *call) {
grpc_call_stack *cs = grpc_call_get_call_stack(call); grpc_call_stack *cs = grpc_call_get_call_stack(call);

@ -19,6 +19,14 @@
#ifndef GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H #ifndef GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H
#define GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H #define GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
void grpc_summon_debugger_macros(); void grpc_summon_debugger_macros();
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H */ #endif /* GRPC_TEST_CORE_UTIL_DEBUGGER_MACROS_H */

@ -1031,6 +1031,7 @@ src/core/lib/support/atomic.h \
src/core/lib/support/atomic_with_atm.h \ src/core/lib/support/atomic_with_atm.h \
src/core/lib/support/atomic_with_std.h \ src/core/lib/support/atomic_with_std.h \
src/core/lib/support/env.h \ src/core/lib/support/env.h \
src/core/lib/support/manual_constructor.h \
src/core/lib/support/memory.h \ src/core/lib/support/memory.h \
src/core/lib/support/mpscq.h \ src/core/lib/support/mpscq.h \
src/core/lib/support/murmur_hash.h \ src/core/lib/support/murmur_hash.h \

@ -1320,6 +1320,7 @@ src/core/lib/support/log_android.cc \
src/core/lib/support/log_linux.cc \ src/core/lib/support/log_linux.cc \
src/core/lib/support/log_posix.cc \ src/core/lib/support/log_posix.cc \
src/core/lib/support/log_windows.cc \ src/core/lib/support/log_windows.cc \
src/core/lib/support/manual_constructor.h \
src/core/lib/support/memory.h \ src/core/lib/support/memory.h \
src/core/lib/support/mpscq.cc \ src/core/lib/support/mpscq.cc \
src/core/lib/support/mpscq.h \ src/core/lib/support/mpscq.h \

@ -7856,6 +7856,7 @@
"src/core/lib/support/atomic_with_atm.h", "src/core/lib/support/atomic_with_atm.h",
"src/core/lib/support/atomic_with_std.h", "src/core/lib/support/atomic_with_std.h",
"src/core/lib/support/env.h", "src/core/lib/support/env.h",
"src/core/lib/support/manual_constructor.h",
"src/core/lib/support/memory.h", "src/core/lib/support/memory.h",
"src/core/lib/support/mpscq.h", "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h", "src/core/lib/support/murmur_hash.h",
@ -7903,6 +7904,7 @@
"src/core/lib/support/atomic_with_atm.h", "src/core/lib/support/atomic_with_atm.h",
"src/core/lib/support/atomic_with_std.h", "src/core/lib/support/atomic_with_std.h",
"src/core/lib/support/env.h", "src/core/lib/support/env.h",
"src/core/lib/support/manual_constructor.h",
"src/core/lib/support/memory.h", "src/core/lib/support/memory.h",
"src/core/lib/support/mpscq.h", "src/core/lib/support/mpscq.h",
"src/core/lib/support/murmur_hash.h", "src/core/lib/support/murmur_hash.h",
@ -8928,7 +8930,7 @@
"test/core/end2end/fixtures/proxy.h", "test/core/end2end/fixtures/proxy.h",
"test/core/iomgr/endpoint_tests.c", "test/core/iomgr/endpoint_tests.c",
"test/core/iomgr/endpoint_tests.h", "test/core/iomgr/endpoint_tests.h",
"test/core/util/debugger_macros.c", "test/core/util/debugger_macros.cc",
"test/core/util/debugger_macros.h", "test/core/util/debugger_macros.h",
"test/core/util/grpc_profiler.c", "test/core/util/grpc_profiler.c",
"test/core/util/grpc_profiler.h", "test/core/util/grpc_profiler.h",

Loading…
Cancel
Save