stream_op cleanup: miscellany

reviewable/pr3993/r1
Craig Tiller 9 years ago
parent 93b944785c
commit 5925603101
  1. 2
      include/grpc++/alarm.h
  2. 2
      include/grpc/support/port_platform.h
  3. 5
      include/grpc/support/slice_buffer.h
  4. 6
      src/core/profiling/basic_timers.c
  5. 49
      src/core/support/slice_buffer.c
  6. 5
      src/core/support/sync_posix.c
  7. 8
      src/cpp/common/alarm.cc
  8. 2
      src/cpp/server/server.cc

@ -43,7 +43,7 @@
namespace grpc {
/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
class Alarm: public GrpcLibrary {
class Alarm : public GrpcLibrary {
public:
/// Create a completion queue alarm instance associated to \a cq.
///

@ -181,9 +181,9 @@
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif
#define GPR_FORBID_UNREACHABLE_CODE
#define GPR_MSG_IOVLEN_TYPE int
#if TARGET_OS_IPHONE
#define GPR_FORBID_UNREACHABLE_CODE
#define GPR_PLATFORM_STRING "ios"
#define GPR_CPU_IPHONE 1
#define GPR_PTHREAD_TLS 1

@ -89,6 +89,11 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
/* remove n bytes from the end of a slice buffer */
void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n,
gpr_slice_buffer *garbage);
/* move the first n bytes of src into dst */
void gpr_slice_buffer_move_first(gpr_slice_buffer *src, size_t n,
gpr_slice_buffer *dst);
/* take the first slice in the slice buffer */
gpr_slice gpr_slice_buffer_take_first(gpr_slice_buffer *src);
#ifdef __cplusplus
}

@ -50,12 +50,12 @@ typedef struct gpr_timer_entry {
gpr_timespec tm;
const char *tagstr;
const char *file;
int line;
short line;
char type;
gpr_uint8 important;
} gpr_timer_entry;
#define MAX_COUNT (1024 * 1024 / sizeof(gpr_timer_entry))
#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
static __thread gpr_timer_entry g_log[MAX_COUNT];
static __thread int g_count;
@ -102,7 +102,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
entry->tagstr = tagstr;
entry->type = type;
entry->file = file;
entry->line = line;
entry->line = (short)line;
entry->important = important != 0;
}

@ -31,6 +31,7 @@
*
*/
#include <grpc/support/port_platform.h>
#include <grpc/support/slice_buffer.h>
#include <string.h>
@ -208,6 +209,44 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->length = 0;
}
void gpr_slice_buffer_move_first(gpr_slice_buffer *src, size_t n,
gpr_slice_buffer *dst) {
size_t src_idx;
size_t output_len = dst->length + n;
size_t new_input_len = src->length - n;
GPR_ASSERT(src->length >= n);
if (src->length == n) {
gpr_slice_buffer_move_into(src, dst);
return;
}
src_idx = 0;
for (;;) {
gpr_slice slice = src->slices[src_idx];
size_t slice_len = GPR_SLICE_LENGTH(slice);
if (n > slice_len) {
gpr_slice_buffer_add(dst, slice);
n -= slice_len;
src_idx++;
} else if (n == slice_len) {
gpr_slice_buffer_add(dst, slice);
src_idx++;
break;
} else { /* n < slice_len */
src->slices[src_idx] = gpr_slice_split_tail(&slice, n);
GPR_ASSERT(GPR_SLICE_LENGTH(slice) == n);
GPR_ASSERT(GPR_SLICE_LENGTH(src->slices[src_idx]) == slice_len - n);
gpr_slice_buffer_add(dst, slice);
break;
}
}
GPR_ASSERT(dst->length == output_len);
memmove(src->slices, src->slices + src_idx,
sizeof(gpr_slice) * (src->count - src_idx));
src->count -= src_idx;
src->length = new_input_len;
GPR_ASSERT(src->count > 0);
}
void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n,
gpr_slice_buffer *garbage) {
GPR_ASSERT(n <= sb->length);
@ -231,3 +270,13 @@ void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n,
}
}
}
gpr_slice gpr_slice_buffer_take_first(gpr_slice_buffer *sb) {
gpr_slice slice;
GPR_ASSERT(sb->count > 0);
slice = sb->slices[0];
memmove(&sb->slices[0], &sb->slices[1], (sb->count - 1) * sizeof(gpr_slice));
sb->count--;
sb->length -= GPR_SLICE_LENGTH(slice);
return slice;
}

@ -59,8 +59,11 @@ void gpr_mu_unlock(gpr_mu* mu) {
}
int gpr_mu_trylock(gpr_mu* mu) {
int err = pthread_mutex_trylock(mu);
int err;
GPR_TIMER_BEGIN("gpr_mu_trylock", 0);
err = pthread_mutex_trylock(mu);
GPR_ASSERT(err == 0 || err == EBUSY);
GPR_TIMER_END("gpr_mu_trylock", 0);
return err == 0;
}

@ -38,12 +38,8 @@ namespace grpc {
Alarm::Alarm(CompletionQueue* cq, gpr_timespec deadline, void* tag)
: alarm_(grpc_alarm_create(cq->cq(), deadline, tag)) {}
Alarm::~Alarm() {
grpc_alarm_destroy(alarm_);
}
Alarm::~Alarm() { grpc_alarm_destroy(alarm_); }
void Alarm::Cancel() {
grpc_alarm_cancel(alarm_);
}
void Alarm::Cancel() { grpc_alarm_cancel(alarm_); }
} // namespace grpc

@ -388,6 +388,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
@ -403,6 +404,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
SyncRequest::CallData call_data(this, request);
}
}
lock.lock();
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {

Loading…
Cancel
Save