Merge branch 'master' of github.com:grpc/grpc into alarm_cpp

pull/5293/head
David Garcia Quintas 9 years ago
commit 45add8a445
  1. 2
      BUILD
  2. 2
      Makefile
  3. 1
      build.yaml
  4. 7
      include/grpc++/alarm.h
  5. 6
      include/grpc++/server.h
  6. 9
      src/core/transport/chttp2/internal.h
  7. 22
      src/core/transport/chttp2/stream_lists.c
  8. 10
      src/core/transport/chttp2/writing.c
  9. 7
      src/core/transport/chttp2_transport.c
  10. 44
      src/cpp/common/alarm.cc
  11. 14
      src/cpp/server/server.cc
  12. 2
      src/cpp/server/server_builder.cc
  13. 24
      summerofcode/ideas.md
  14. 31
      test/cpp/common/alarm_cpp_test.cc
  15. 4
      test/cpp/end2end/async_end2end_test.cc
  16. 1
      tools/doxygen/Doxyfile.c++.internal
  17. 2
      tools/run_tests/sources_and_headers.json
  18. 2
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  19. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  20. 2
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  21. 3
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -795,7 +795,6 @@ cc_library(
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/call.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/completion_queue.cc",
@ -919,7 +918,6 @@ cc_library(
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/call.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/completion_queue.cc",

@ -2944,7 +2944,6 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
src/cpp/client/insecure_credentials.cc \
src/cpp/common/alarm.cc \
src/cpp/common/call.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/completion_queue.cc \
@ -3225,7 +3224,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
src/cpp/client/insecure_credentials.cc \
src/cpp/common/alarm.cc \
src/cpp/common/call.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/completion_queue.cc \

@ -183,7 +183,6 @@ filegroups:
- src/cpp/client/credentials.cc
- src/cpp/client/generic_stub.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/common/alarm.cc
- src/cpp/common/call.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/completion_queue.cc

@ -36,10 +36,11 @@
#ifndef GRPCXX_ALARM_H
#define GRPCXX_ALARM_H
#include <grpc++/impl/codegen/completion_queue.h>
#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/time.h>
#include <grpc++/impl/codegen/completion_queue.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc/grpc.h>
struct grpc_alarm;
@ -67,11 +68,11 @@ class Alarm : private GrpcLibrary {
static_cast<void*>(&tag_))) {}
/// Destroy the given completion queue alarm, cancelling it in the process.
~Alarm();
~Alarm() { grpc_alarm_destroy(alarm_); }
/// Cancel a completion queue alarm. Calling this function over an alarm that
/// has already fired has no effect.
void Cancel();
void Cancel() { grpc_alarm_cancel(alarm_); }
private:
class AlarmEntry : public CompletionQueueTag {

@ -79,6 +79,8 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary {
class GlobalCallbacks {
public:
virtual ~GlobalCallbacks() {}
/// Called before server is created.
virtual void UpdateArguments(ChannelArguments* args) {}
/// Called before application callback for each synchronous server request
virtual void PreSynchronousRequest(ServerContext* context) = 0;
/// Called after application callback for each synchronous server request
@ -108,7 +110,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary {
/// \param max_message_size Maximum message length that the channel can
/// receive.
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size, const ChannelArguments& args);
int max_message_size, ChannelArguments* args);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@ -177,7 +179,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary {
bool has_generic_service_;
// Pointer to the c grpc server.
grpc_server* const server_;
grpc_server* server_;
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.

@ -485,7 +485,8 @@ struct grpc_chttp2_stream {
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing,
int is_parsing);
void grpc_chttp2_perform_writes(
@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing, bool is_window_available);
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);

@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
}
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available) {
grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
} else {
stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
}
}
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {

@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
bool is_window_available = transport_writing->outgoing_window > 0;
grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
is_window_available);
grpc_chttp2_list_flush_writing_stalled_by_transport(
exec_ctx, transport_writing, is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
}
}
if (stream_global->send_trailing_metadata) {

@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("unlock", 0);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing,
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
t->parsing_active)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
@ -1019,6 +1019,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
while (stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
grpc_byte_stream_destroy(exec_ctx, bs);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);

@ -1,44 +0,0 @@
/*
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/alarm.h>
#include <grpc++/impl/grpc_library.h>
namespace grpc {
Alarm::~Alarm() {
grpc_alarm_destroy(alarm_);
}
void Alarm::Cancel() { grpc_alarm_cancel(alarm_); }
} // namespace grpc

@ -272,27 +272,25 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
static grpc_server* CreateServer(const ChannelArguments& args) {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return grpc_server_create(&channel_args, nullptr);
}
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size, const ChannelArguments& args)
int max_message_size, ChannelArguments* args)
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(CreateServer(args)),
server_(nullptr),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
server_ = grpc_server_create(&channel_args, nullptr);
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}

@ -103,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, args));
new Server(thread_pool.release(), true, max_message_size_, &args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);

@ -1,4 +1,22 @@
Google Summer of Code 2016 gRPC Ideas
=====================================
# gRPC Summer of Code Project Ideas
(Skeleton for now.)
C Core:
1. Port gRPC to one of (Free, Net, Open) BSD platforms and create packages for them. Add kqueue support in the process.
2. Fix gRPC C-core's URI parser. The current parser does not qualify as a standard parser according to [RFC3986]( https://tools.ietf.org/html/rfc3986). Write test suites to verify this and make changes necessary to make the URI parser compliant.
3. HPACK compression efficiency evaluation - Figure out how to benchmark gRPC's compression efficiency (both in terms of bytes on the wire and cpu cycles). Implement benchmarks. Potentially extend this to other standalone implementations -- Java and Go.
gRPC Python:
1. Evaluate the port of gRPC's Python implementation to PyPy. Investigate the state of [Cython support](http://docs.cython.org/src/userguide/pypy.html) to do this or potentially explore cffi
2. Develop and test Python 3.5 Support for gRPC. Make necessary changes to port gRPC and package it for supported platforms.
gRPC Ruby/Java:
1. jRuby support for gRPC. Develop a jRuby wrapper for gRPC based on grpc-java and ensure that it is API compatible with the existing Ruby implementation and passes all tests.
Other:
1. Develop a Wireshark plugin for the gRPC protocol. Provide documentation and tutorials for this plugin. Bonus: consider set-up and use with the mobile clients.

@ -72,6 +72,37 @@ TEST(AlarmTest, RegularExpiryChrono) {
EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, ZeroExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, NegativeExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(-1), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);

@ -989,6 +989,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx);
// Client reads may fail bacause it is notified that the stream is
// cancelled.
ignore_cq_result = true;
}
// Client attemts to read the three messages from the server

@ -854,7 +854,6 @@ src/cpp/client/create_channel_internal.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
src/cpp/client/insecure_credentials.cc \
src/cpp/common/alarm.cc \
src/cpp/common/call.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/completion_queue.cc \

@ -4132,7 +4132,6 @@
"src/cpp/client/secure_credentials.cc",
"src/cpp/client/secure_credentials.h",
"src/cpp/codegen/grpc_library.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/auth_property_iterator.cc",
"src/cpp/common/call.cc",
"src/cpp/common/channel_arguments.cc",
@ -4385,7 +4384,6 @@
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/codegen/grpc_library.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/call.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/completion_queue.cc",

@ -369,8 +369,6 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\alarm.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\call.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\channel_arguments.cc">

@ -40,9 +40,6 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\alarm.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\call.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>

@ -356,8 +356,6 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\alarm.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\call.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\channel_arguments.cc">

@ -25,9 +25,6 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\alarm.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\call.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>

Loading…
Cancel
Save