diff --git a/INSTALL b/INSTALL index 7a3d02f186e..b7c1d46884e 100644 --- a/INSTALL +++ b/INSTALL @@ -100,16 +100,16 @@ Then, you can build and install protobuf 3.0.0: A word on OpenSSL ----------------- -Secure HTTP2 requires to have the TLS extension ALPN (see rfc 7301 and +Secure HTTP2 requires the TLS extension ALPN (see rfc 7301 and http://http2.github.io/http2-spec/ section 3.3). Our HTTP2 implementation relies on OpenSSL's implementation. OpenSSL 1.0.2 is the first released version of OpenSSL that has ALPN support, and this explains our dependency on it. Note that the Makefile supports compiling only the unsecure elements of grpc, and if you do not have OpenSSL and do not want it, you can still proceed -with installing only the elements you require. However, it is recommended -to encrypt your network traffic, therefore we urge you to not use the unsecure -version of grpc if possible. +with installing only the elements you require. However, we strongly recommend +the use of encryption for all network traffic, and discourage the use of grpc +without TLS. Compiling diff --git a/Makefile b/Makefile index f2884fd8e5e..622181b15b6 100644 --- a/Makefile +++ b/Makefile @@ -1432,10 +1432,6 @@ test_cxx: buildtests_cxx $(Q) ./bins/$(CONFIG)/credentials_test || ( echo test credentials_test failed ; exit 1 ) $(E) "[RUN] Testing end2end_test" $(Q) ./bins/$(CONFIG)/end2end_test || ( echo test end2end_test failed ; exit 1 ) - $(E) "[RUN] Testing qps_client" - $(Q) ./bins/$(CONFIG)/qps_client || ( echo test qps_client failed ; exit 1 ) - $(E) "[RUN] Testing qps_server" - $(Q) ./bins/$(CONFIG)/qps_server || ( echo test qps_server failed ; exit 1 ) $(E) "[RUN] Testing status_test" $(Q) ./bins/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 ) $(E) "[RUN] Testing thread_pool_test" @@ -1724,6 +1720,7 @@ PUBLIC_HEADERS_C += \ include/grpc/support/histogram.h \ include/grpc/support/host_port.h \ include/grpc/support/log.h \ + include/grpc/support/log_win32.h \ include/grpc/support/port_platform.h \ include/grpc/support/slice.h \ include/grpc/support/slice_buffer.h \ @@ -1886,8 +1883,10 @@ LIBGRPC_SRC = \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/fd_posix.c \ + src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ + src/core/iomgr/iomgr_windows.c \ src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ src/core/iomgr/pollset_posix.c \ @@ -1897,9 +1896,13 @@ LIBGRPC_SRC = \ src/core/iomgr/socket_utils_common_posix.c \ src/core/iomgr/socket_utils_linux.c \ src/core/iomgr/socket_utils_posix.c \ + src/core/iomgr/socket_windows.c \ src/core/iomgr/tcp_client_posix.c \ + src/core/iomgr/tcp_client_windows.c \ src/core/iomgr/tcp_posix.c \ src/core/iomgr/tcp_server_posix.c \ + src/core/iomgr/tcp_server_windows.c \ + src/core/iomgr/tcp_windows.c \ src/core/iomgr/time_averaged_stats.c \ src/core/iomgr/wakeup_fd_eventfd.c \ src/core/iomgr/wakeup_fd_nospecial.c \ @@ -2015,8 +2018,10 @@ src/core/iomgr/alarm_heap.c: $(OPENSSL_DEP) src/core/iomgr/endpoint.c: $(OPENSSL_DEP) src/core/iomgr/endpoint_pair_posix.c: $(OPENSSL_DEP) src/core/iomgr/fd_posix.c: $(OPENSSL_DEP) +src/core/iomgr/iocp_windows.c: $(OPENSSL_DEP) src/core/iomgr/iomgr.c: $(OPENSSL_DEP) src/core/iomgr/iomgr_posix.c: $(OPENSSL_DEP) +src/core/iomgr/iomgr_windows.c: $(OPENSSL_DEP) src/core/iomgr/pollset_kick.c: $(OPENSSL_DEP) src/core/iomgr/pollset_multipoller_with_poll_posix.c: $(OPENSSL_DEP) src/core/iomgr/pollset_posix.c: $(OPENSSL_DEP) @@ -2026,9 +2031,13 @@ src/core/iomgr/sockaddr_utils.c: $(OPENSSL_DEP) src/core/iomgr/socket_utils_common_posix.c: $(OPENSSL_DEP) src/core/iomgr/socket_utils_linux.c: $(OPENSSL_DEP) src/core/iomgr/socket_utils_posix.c: $(OPENSSL_DEP) +src/core/iomgr/socket_windows.c: $(OPENSSL_DEP) src/core/iomgr/tcp_client_posix.c: $(OPENSSL_DEP) +src/core/iomgr/tcp_client_windows.c: $(OPENSSL_DEP) src/core/iomgr/tcp_posix.c: $(OPENSSL_DEP) src/core/iomgr/tcp_server_posix.c: $(OPENSSL_DEP) +src/core/iomgr/tcp_server_windows.c: $(OPENSSL_DEP) +src/core/iomgr/tcp_windows.c: $(OPENSSL_DEP) src/core/iomgr/time_averaged_stats.c: $(OPENSSL_DEP) src/core/iomgr/wakeup_fd_eventfd.c: $(OPENSSL_DEP) src/core/iomgr/wakeup_fd_nospecial.c: $(OPENSSL_DEP) @@ -2166,8 +2175,10 @@ objs/$(CONFIG)/src/core/iomgr/alarm_heap.o: objs/$(CONFIG)/src/core/iomgr/endpoint.o: objs/$(CONFIG)/src/core/iomgr/endpoint_pair_posix.o: objs/$(CONFIG)/src/core/iomgr/fd_posix.o: +objs/$(CONFIG)/src/core/iomgr/iocp_windows.o: objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: +objs/$(CONFIG)/src/core/iomgr/iomgr_windows.o: objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: @@ -2177,9 +2188,13 @@ objs/$(CONFIG)/src/core/iomgr/sockaddr_utils.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_common_posix.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_linux.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_posix.o: +objs/$(CONFIG)/src/core/iomgr/socket_windows.o: objs/$(CONFIG)/src/core/iomgr/tcp_client_posix.o: +objs/$(CONFIG)/src/core/iomgr/tcp_client_windows.o: objs/$(CONFIG)/src/core/iomgr/tcp_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_server_posix.o: +objs/$(CONFIG)/src/core/iomgr/tcp_server_windows.o: +objs/$(CONFIG)/src/core/iomgr/tcp_windows.o: objs/$(CONFIG)/src/core/iomgr/time_averaged_stats.o: objs/$(CONFIG)/src/core/iomgr/wakeup_fd_eventfd.o: objs/$(CONFIG)/src/core/iomgr/wakeup_fd_nospecial.o: @@ -2401,8 +2416,10 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/fd_posix.c \ + src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ + src/core/iomgr/iomgr_windows.c \ src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ src/core/iomgr/pollset_posix.c \ @@ -2412,9 +2429,13 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/socket_utils_common_posix.c \ src/core/iomgr/socket_utils_linux.c \ src/core/iomgr/socket_utils_posix.c \ + src/core/iomgr/socket_windows.c \ src/core/iomgr/tcp_client_posix.c \ + src/core/iomgr/tcp_client_windows.c \ src/core/iomgr/tcp_posix.c \ src/core/iomgr/tcp_server_posix.c \ + src/core/iomgr/tcp_server_windows.c \ + src/core/iomgr/tcp_windows.c \ src/core/iomgr/time_averaged_stats.c \ src/core/iomgr/wakeup_fd_eventfd.c \ src/core/iomgr/wakeup_fd_nospecial.c \ @@ -2535,8 +2556,10 @@ objs/$(CONFIG)/src/core/iomgr/alarm_heap.o: objs/$(CONFIG)/src/core/iomgr/endpoint.o: objs/$(CONFIG)/src/core/iomgr/endpoint_pair_posix.o: objs/$(CONFIG)/src/core/iomgr/fd_posix.o: +objs/$(CONFIG)/src/core/iomgr/iocp_windows.o: objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: +objs/$(CONFIG)/src/core/iomgr/iomgr_windows.o: objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: @@ -2546,9 +2569,13 @@ objs/$(CONFIG)/src/core/iomgr/sockaddr_utils.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_common_posix.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_linux.o: objs/$(CONFIG)/src/core/iomgr/socket_utils_posix.o: +objs/$(CONFIG)/src/core/iomgr/socket_windows.o: objs/$(CONFIG)/src/core/iomgr/tcp_client_posix.o: +objs/$(CONFIG)/src/core/iomgr/tcp_client_windows.o: objs/$(CONFIG)/src/core/iomgr/tcp_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_server_posix.o: +objs/$(CONFIG)/src/core/iomgr/tcp_server_windows.o: +objs/$(CONFIG)/src/core/iomgr/tcp_windows.o: objs/$(CONFIG)/src/core/iomgr/time_averaged_stats.o: objs/$(CONFIG)/src/core/iomgr/wakeup_fd_eventfd.o: objs/$(CONFIG)/src/core/iomgr/wakeup_fd_nospecial.o: diff --git a/build.json b/build.json index 602d7752a1c..e6993acd6e8 100644 --- a/build.json +++ b/build.json @@ -42,6 +42,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/fd_posix.h", + "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", @@ -57,9 +58,11 @@ "src/core/iomgr/sockaddr_utils.h", "src/core/iomgr/sockaddr_win32.h", "src/core/iomgr/socket_utils_posix.h", + "src/core/iomgr/socket_windows.h", "src/core/iomgr/tcp_client.h", "src/core/iomgr/tcp_posix.h", "src/core/iomgr/tcp_server.h", + "src/core/iomgr/tcp_windows.h", "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", @@ -130,8 +133,10 @@ "src/core/iomgr/endpoint.c", "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/fd_posix.c", + "src/core/iomgr/iocp_windows.c", "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", + "src/core/iomgr/iomgr_windows.c", "src/core/iomgr/pollset_kick.c", "src/core/iomgr/pollset_multipoller_with_poll_posix.c", "src/core/iomgr/pollset_posix.c", @@ -141,9 +146,13 @@ "src/core/iomgr/socket_utils_common_posix.c", "src/core/iomgr/socket_utils_linux.c", "src/core/iomgr/socket_utils_posix.c", + "src/core/iomgr/socket_windows.c", "src/core/iomgr/tcp_client_posix.c", + "src/core/iomgr/tcp_client_windows.c", "src/core/iomgr/tcp_posix.c", "src/core/iomgr/tcp_server_posix.c", + "src/core/iomgr/tcp_server_windows.c", + "src/core/iomgr/tcp_windows.c", "src/core/iomgr/time_averaged_stats.c", "src/core/iomgr/wakeup_fd_eventfd.c", "src/core/iomgr/wakeup_fd_nospecial.c", @@ -216,6 +225,7 @@ "include/grpc/support/histogram.h", "include/grpc/support/host_port.h", "include/grpc/support/log.h", + "include/grpc/support/log_win32.h", "include/grpc/support/port_platform.h", "include/grpc/support/slice.h", "include/grpc/support/slice_buffer.h", @@ -1611,6 +1621,7 @@ { "name": "qps_client", "build": "test", + "run": false, "language": "c++", "src": [ "test/cpp/qps/qpstest.proto", @@ -1628,6 +1639,7 @@ { "name": "qps_server", "build": "test", + "run": false, "language": "c++", "src": [ "test/cpp/qps/qpstest.proto", diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index de789febe6e..edc6555b0c4 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -34,11 +34,12 @@ #ifndef __GRPCPP_CALL_H__ #define __GRPCPP_CALL_H__ +#include #include #include #include -#include +#include namespace google { namespace protobuf { @@ -59,7 +60,9 @@ class CallOpBuffer final : public CompletionQueueTag { void Reset(void *next_return_tag); - void AddSendInitialMetadata(std::vector > *metadata); + // Does not take ownership. + void AddSendInitialMetadata( + std::multimap *metadata); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); @@ -70,11 +73,25 @@ class CallOpBuffer final : public CompletionQueueTag { // Convert to an array of grpc_op elements void FillOps(grpc_op *ops, size_t *nops); + // Release send buffers. + void ReleaseSendBuffer(); + // Called by completion queue just prior to returning from Next() or Pluck() void FinalizeResult(void *tag, bool *status) override; private: - void *return_tag_; + void *return_tag_ = nullptr; + size_t initial_metadata_count_ = 0; + grpc_metadata* initial_metadata_ = nullptr; + const google::protobuf::Message* send_message_ = nullptr; + grpc_byte_buffer* write_buffer_ = nullptr; + google::protobuf::Message* recv_message_ = nullptr; + grpc_byte_buffer* recv_message_buf_ = nullptr; + bool client_send_close_ = false; + Status* recv_status_ = nullptr; + grpc_status_code status_code_ = GRPC_STATUS_OK; + char* status_details_ = nullptr; + size_t status_details_capacity_ = 0; }; class CCallDeleter { diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 670ffa78154..eefd4457f95 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -35,7 +35,7 @@ #define __GRPCPP_SERVER_H__ #include -#include +#include #include #include @@ -69,6 +69,25 @@ class Server { private: friend class ServerBuilder; + class MethodRequestData { + public: + MethodRequestData(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag) {} + static MethodRequestData *Wait(CompletionQueue *cq); + + void Request(CompletionQueue* cq); + + class CallData { + public: + explicit CallData(MethodRequestData *mrd); + + void Run(); + }; + + private: + RpcServiceMethod *const method_; + void *const tag_; + }; + // ServerBuilder use only Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); Server(); @@ -85,7 +104,8 @@ class Server { void ScheduleCallback(); // Completion queue. - CompletionQueue cq_; + std::unique_ptr cq_sync_; + std::unique_ptr cq_async_; // Sever status std::mutex mu_; @@ -95,12 +115,11 @@ class Server { int num_running_cb_; std::condition_variable callback_cv_; + std::list methods_; + // Pointer to the c grpc server. grpc_server* server_; - // A map for all method information. - std::map method_map_; - ThreadPoolInterface* thread_pool_; // Whether the thread pool is created and owned by the server. bool thread_pool_owned_; diff --git a/include/grpc/grpc b/include/grpc/grpc new file mode 120000 index 00000000000..fc80ad1c867 --- /dev/null +++ b/include/grpc/grpc @@ -0,0 +1 @@ +/home/craig/grpc-ct/include/grpc \ No newline at end of file diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 4ccb5a4dd59..7733f8bb2ae 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -553,7 +553,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *completion_queue, void *tag_new); + grpc_completion_queue *cq_when_rpc_available, + grpc_completion_queue *cq_bound_to_call, + void *tag_new); void *grpc_server_register_method(grpc_server *server, const char *method, const char *host); @@ -562,7 +564,8 @@ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *completion_queue, void *tag_new); + grpc_completion_queue *cq_when_rpc_available, + grpc_completion_queue *cq_bound_to_call, void *tag_new); /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, diff --git a/include/grpc/support/log_win32.h b/include/grpc/support/log_win32.h new file mode 100644 index 00000000000..0350056d26e --- /dev/null +++ b/include/grpc/support/log_win32.h @@ -0,0 +1,53 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_SUPPORT_LOG_WIN32_H__ +#define __GRPC_SUPPORT_LOG_WIN32_H__ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Returns a string allocated with gpr_malloc that contains a UTF-8 + * formatted error message, corresponding to the error messageid. + * Use in conjunction with GetLastError() et al. + */ +char *gpr_format_message(DWORD messageid); + +#ifdef __cplusplus +} +#endif + +#endif /* __GRPC_SUPPORT_LOG_H__ */ diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index e99099c651c..b0b528d2828 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -46,10 +46,12 @@ #define GPR_WIN32 1 #define GPR_ARCH_64 1 #define GPR_GETPID_IN_PROCESS_H 1 +#define GPR_WINSOCK_SOCKET 1 #elif defined(_WIN32) || defined(WIN32) #define GPR_ARCH_32 1 #define GPR_WIN32 1 #define GPR_GETPID_IN_PROCESS_H 1 +#define GPR_WINSOCK_SOCKET 1 #elif defined(ANDROID) || defined(__ANDROID__) #define GPR_ANDROID 1 #define GPR_ARCH_32 1 diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b67c6cde709..737ee016aab 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -68,7 +68,6 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { - gpr_free(fd->watchers); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -93,9 +92,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_atm_rel_store(&r->writest.state, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; - r->watchers = NULL; - r->watcher_count = 0; - r->watcher_capacity = 0; + r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; r->freelist_next = NULL; return r; } @@ -118,9 +115,7 @@ static void unref_by(grpc_fd *fd, int n) { } } -void grpc_fd_global_init(void) { - gpr_mu_init(&fd_freelist_mu); -} +void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } void grpc_fd_global_shutdown(void) { while (fd_freelist != NULL) { @@ -145,11 +140,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void wake_watchers(grpc_fd *fd) { - size_t i, n; + grpc_fd_watcher *watcher; gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (i = 0; i < n; i++) { - grpc_pollset_force_kick(fd->watchers[i]); + for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root; + watcher = watcher->next) { + grpc_pollset_force_kick(watcher->pollset); } gpr_mu_unlock(&fd->watcher_mu); } @@ -293,36 +288,27 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask) { + gpr_uint32 read_mask, gpr_uint32 write_mask, + grpc_fd_watcher *watcher) { /* keep track of pollers that have requested our events, in case they change */ gpr_mu_lock(&fd->watcher_mu); - if (fd->watcher_capacity == fd->watcher_count) { - fd->watcher_capacity = - GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); - fd->watchers = gpr_realloc(fd->watchers, - fd->watcher_capacity * sizeof(grpc_pollset *)); - } - fd->watchers[fd->watcher_count++] = pollset; + watcher->next = &fd->watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + watcher->pollset = pollset; + watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); } -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { - size_t r, w, n; - - gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (r = 0, w = 0; r < n; r++) { - if (fd->watchers[r] == pollset) { - fd->watcher_count--; - continue; - } - fd->watchers[w++] = fd->watchers[r]; - } - gpr_mu_unlock(&fd->watcher_mu); +void grpc_fd_end_poll(grpc_fd_watcher *watcher) { + gpr_mu_lock(&watcher->fd->watcher_mu); + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + gpr_mu_unlock(&watcher->fd->watcher_mu); } void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index f42ae195790..9a675087e59 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -47,7 +47,16 @@ typedef struct { gpr_atm state; } grpc_fd_state; -typedef struct grpc_fd { +typedef struct grpc_fd grpc_fd; + +typedef struct grpc_fd_watcher { + struct grpc_fd_watcher *next; + struct grpc_fd_watcher *prev; + grpc_pollset *pollset; + grpc_fd *fd; +} grpc_fd_watcher; + +struct grpc_fd { int fd; /* refst format: bit0: 1=active/0=orphaned @@ -60,9 +69,7 @@ typedef struct grpc_fd { gpr_atm shutdown; gpr_mu watcher_mu; - grpc_pollset **watchers; - size_t watcher_count; - size_t watcher_capacity; + grpc_fd_watcher watcher_root; grpc_fd_state readst; grpc_fd_state writest; @@ -70,7 +77,7 @@ typedef struct grpc_fd { grpc_iomgr_cb_func on_done; void *on_done_user_data; struct grpc_fd *freelist_next; -} grpc_fd; +}; /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. @@ -95,9 +102,10 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); Polling strategies that do not need to alter their behavior depending on the fd's current interest (such as epoll) do not need to call this function. */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask); + gpr_uint32 read_mask, gpr_uint32 write_mask, + grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll */ -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); +void grpc_fd_end_poll(grpc_fd_watcher *rec); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c new file mode 100644 index 00000000000..729b11b78dc --- /dev/null +++ b/src/core/iomgr/iocp_windows.c @@ -0,0 +1,200 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_WINSOCK_SOCKET + +#include + +#include +#include +#include +#include + +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/iocp_windows.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_windows.h" + +static ULONG g_iocp_kick_token; +static OVERLAPPED g_iocp_custom_overlap; + +static gpr_event g_shutdown_iocp; +static gpr_event g_iocp_done; + +static HANDLE g_iocp; + +static int do_iocp_work() { + BOOL success; + DWORD bytes = 0; + DWORD flags = 0; + ULONG_PTR completion_key; + LPOVERLAPPED overlapped; + gpr_timespec wait_time = gpr_inf_future; + grpc_winsocket *socket; + grpc_winsocket_callback_info *info; + void(*f)(void *, int) = NULL; + void *opaque = NULL; + success = GetQueuedCompletionStatus(g_iocp, &bytes, + &completion_key, &overlapped, + gpr_time_to_millis(wait_time)); + if (!success && !overlapped) { + /* The deadline got attained. */ + return 0; + } + GPR_ASSERT(completion_key && overlapped); + if (overlapped == &g_iocp_custom_overlap) { + if (completion_key == (ULONG_PTR) &g_iocp_kick_token) { + /* We were awoken from a kick. */ + gpr_log(GPR_DEBUG, "do_iocp_work - got a kick"); + return 1; + } + gpr_log(GPR_ERROR, "Unknown custom completion key."); + abort(); + } + + socket = (grpc_winsocket*) completion_key; + if (overlapped == &socket->write_info.overlapped) { + gpr_log(GPR_DEBUG, "do_iocp_work - got write packet"); + info = &socket->write_info; + } else if (overlapped == &socket->read_info.overlapped) { + gpr_log(GPR_DEBUG, "do_iocp_work - got read packet"); + info = &socket->read_info; + } else { + gpr_log(GPR_ERROR, "Unknown IOCP operation"); + abort(); + } + success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, + FALSE, &flags); + gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags, + success ? "succeeded" : "failed"); + info->bytes_transfered = bytes; + info->wsa_error = success ? 0 : WSAGetLastError(); + GPR_ASSERT(overlapped == &info->overlapped); + gpr_mu_lock(&socket->state_mu); + GPR_ASSERT(!info->has_pending_iocp); + if (info->cb) { + f = info->cb; + opaque = info->opaque; + info->cb = NULL; + } else { + info->has_pending_iocp = 1; + } + gpr_mu_unlock(&socket->state_mu); + if (f) f(opaque, 1); + + return 1; +} + +static void iocp_loop(void *p) { + while (!gpr_event_get(&g_shutdown_iocp)) { + grpc_maybe_call_delayed_callbacks(NULL, 1); + do_iocp_work(); + } + + gpr_event_set(&g_iocp_done, (void *)1); +} + +void grpc_iocp_init(void) { + gpr_thd_id id; + + g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, + (ULONG_PTR)NULL, 0); + GPR_ASSERT(g_iocp); + + gpr_event_init(&g_iocp_done); + gpr_event_init(&g_shutdown_iocp); + gpr_thd_new(&id, iocp_loop, NULL, NULL); +} + +void grpc_iocp_shutdown(void) { + BOOL success; + gpr_event_set(&g_shutdown_iocp, (void *)1); + success = PostQueuedCompletionStatus(g_iocp, 0, + (ULONG_PTR) &g_iocp_kick_token, + &g_iocp_custom_overlap); + GPR_ASSERT(success); + gpr_event_wait(&g_iocp_done, gpr_inf_future); + success = CloseHandle(g_iocp); + GPR_ASSERT(success); +} + +void grpc_iocp_add_socket(grpc_winsocket *socket) { + HANDLE ret; + if (socket->added_to_iocp) return; + ret = CreateIoCompletionPort((HANDLE)socket->socket, + g_iocp, (gpr_uintptr) socket, 0); + if (!ret) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); + gpr_free(utf8_message); + __debugbreak(); + abort(); + } + socket->added_to_iocp = 1; + GPR_ASSERT(ret == g_iocp); +} + +static void socket_notify_on_iocp(grpc_winsocket *socket, + void(*cb)(void *, int), void *opaque, + grpc_winsocket_callback_info *info) { + int run_now = 0; + GPR_ASSERT(!info->cb); + gpr_mu_lock(&socket->state_mu); + if (info->has_pending_iocp) { + run_now = 1; + info->has_pending_iocp = 0; + gpr_log(GPR_DEBUG, "socket_notify_on_iocp - runs now"); + } else { + info->cb = cb; + info->opaque = opaque; + gpr_log(GPR_DEBUG, "socket_notify_on_iocp - queued"); + } + gpr_mu_unlock(&socket->state_mu); + if (run_now) cb(opaque, 1); +} + +void grpc_socket_notify_on_write(grpc_winsocket *socket, + void(*cb)(void *, int), void *opaque) { + gpr_log(GPR_DEBUG, "grpc_socket_notify_on_write"); + socket_notify_on_iocp(socket, cb, opaque, &socket->write_info); +} + +void grpc_socket_notify_on_read(grpc_winsocket *socket, + void(*cb)(void *, int), void *opaque) { + gpr_log(GPR_DEBUG, "grpc_socket_notify_on_read"); + socket_notify_on_iocp(socket, cb, opaque, &socket->read_info); +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h new file mode 100644 index 00000000000..bf5b90978ef --- /dev/null +++ b/src/core/iomgr/iocp_windows.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_ +#define __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_ + +#include +#include + +#include "src/core/iomgr/socket_windows.h" + +void grpc_iocp_init(void); +void grpc_iocp_shutdown(void); +void grpc_iocp_add_socket(grpc_winsocket *); + +void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success), + void *opaque); + +void grpc_socket_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success), + void *opaque); + +#endif /* __GRPC_INTERNAL_IOMGR_IOCP_WINDOWS_H_ */ diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 9297f08e99a..bbf8cfc4190 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -31,6 +31,10 @@ * */ +#include + +#ifdef GPR_POSIX_SOCKET + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/fd_posix.h" @@ -43,3 +47,5 @@ void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); grpc_fd_global_shutdown(); } + +#endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c new file mode 100644 index 00000000000..a3a255eaed2 --- /dev/null +++ b/src/core/iomgr/iomgr_windows.c @@ -0,0 +1,67 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_WINSOCK_SOCKET + +#include "src/core/iomgr/sockaddr_win32.h" + +#include + +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/iocp_windows.h" +#include "src/core/iomgr/iomgr.h" + +static void winsock_init(void) { + WSADATA wsaData; + int status = WSAStartup(MAKEWORD(2, 0), &wsaData); + GPR_ASSERT(status == 0); +} + +static void winsock_shutdown(void) { + int status = WSACleanup(); + GPR_ASSERT(status == 0); +} + +void grpc_iomgr_platform_init(void) { + winsock_init(); + grpc_iocp_init(); +} + +void grpc_iomgr_platform_shutdown(void) { + grpc_iocp_shutdown(); + winsock_shutdown(); +} + +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index e88296979df..3244ae08db5 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -53,11 +53,11 @@ typedef struct { size_t fd_count; size_t fd_capacity; grpc_fd **fds; - /* fds being polled by the current poller: parallel arrays of pollfd and the - * grpc_fd* that the pollfd was constructed from */ + /* fds being polled by the current poller: parallel arrays of pollfd, and + a grpc_fd_watcher */ size_t pfd_count; size_t pfd_capacity; - grpc_fd **selfds; + grpc_fd_watcher *watchers; struct pollfd *pfds; /* fds that have been removed from the pollset explicitly */ size_t del_count; @@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) { pollset_hdr *h; h = pollset->data.ptr; for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(h->selfds[i], pollset); + grpc_fd_end_poll(&h->watchers[i]); } } @@ -125,9 +125,9 @@ static int multipoll_with_poll_pollset_maybe_work( if (h->pfd_capacity < h->fd_count + 1) { h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); gpr_free(h->pfds); - gpr_free(h->selfds); + gpr_free(h->watchers); h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); - h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); + h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity); } nf = 0; np = 1; @@ -147,7 +147,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_fd_unref(h->fds[i]); } else { h->fds[nf++] = h->fds[i]; - h->selfds[np] = h->fds[i]; + h->watchers[np].fd = h->fds[i]; h->pfds[np].fd = h->fds[i]->fd; h->pfds[np].revents = 0; np++; @@ -167,8 +167,8 @@ static int multipoll_with_poll_pollset_maybe_work( gpr_mu_unlock(&pollset->mu); for (i = 1; i < np; i++) { - h->pfds[i].events = - grpc_fd_begin_poll(h->selfds[i], pollset, POLLIN, POLLOUT); + h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN, + POLLOUT, &h->watchers[i]); } r = poll(h->pfds, h->pfd_count, timeout); @@ -184,10 +184,10 @@ static int multipoll_with_poll_pollset_maybe_work( } for (i = 1; i < np; i++) { if (h->pfds[i].revents & POLLIN) { - grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); + grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback); } if (h->pfds[i].revents & POLLOUT) { - grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); + grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback); } } } @@ -211,7 +211,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { grpc_fd_unref(h->dels[i]); } gpr_free(h->pfds); - gpr_free(h->selfds); + gpr_free(h->watchers); gpr_free(h->fds); gpr_free(h->dels); gpr_free(h); @@ -234,7 +234,7 @@ void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, h->pfd_count = 0; h->pfd_capacity = 0; h->pfds = NULL; - h->selfds = NULL; + h->watchers = NULL; h->del_count = 0; h->del_capacity = 0; h->dels = NULL; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b1c2c64a18c..2837a0dff3f 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -80,7 +80,9 @@ void grpc_pollset_kick(grpc_pollset *p) { } } -void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); } +void grpc_pollset_force_kick(grpc_pollset *p) { + grpc_pollset_kick_kick(&p->kick_state); +} /* global state management */ @@ -200,8 +202,15 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { if (fd == pollset->data.ptr) return; fds[0] = pollset->data.ptr; fds[1] = fd; - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); - grpc_fd_unref(fds[0]); + if (!grpc_fd_is_orphaned(fds[0])) { + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); + } else { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + grpc_fd_unref(fds[0]); + pollset->data.ptr = fd; + } } static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { @@ -217,6 +226,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, int allow_synchronous_callback) { struct pollfd pfd[2]; grpc_fd *fd; + grpc_fd_watcher fd_watcher; int timeout; int r; @@ -249,7 +259,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pollset->counter = 1; gpr_mu_unlock(&pollset->mu); - pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); if (r < 0) { @@ -271,7 +281,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, } grpc_pollset_kick_post_poll(&pollset->kick_state); - grpc_fd_end_poll(fd, pollset); + grpc_fd_end_poll(&fd_watcher); gpr_mu_lock(&pollset->mu); pollset->counter = 0; diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 3fb39918b35..b81d23e57c2 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -33,6 +33,39 @@ #include -#ifdef GPR_WIN32 +#ifdef GPR_WINSOCK_SOCKET -#endif /* GPR_WIN32 */ +#include + +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_windows.h" + +void grpc_pollset_init(grpc_pollset *pollset) { + gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); +} + +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { + gpr_timespec now; + now = gpr_now(); + if (gpr_time_cmp(now, deadline) > 0) { + return 0; + } + if (grpc_maybe_call_delayed_callbacks(NULL, 1)) { + return 1; + } + if (grpc_alarm_check(NULL, now, &deadline)) { + return 1; + } + return 0; +} + +void grpc_pollset_kick(grpc_pollset *p) { } + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 9214b04b170..1a5e31f627b 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -34,9 +34,11 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_WINDOWS_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_WINDOWS_H_ +#include #include #include "src/core/iomgr/pollset_kick.h" +#include "src/core/iomgr/socket_windows.h" /* forward declare only in this file to avoid leaking impl details via pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 07bf7b3a35c..8dcfca74c68 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -111,13 +111,20 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) { void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, struct sockaddr_in6 *wild6_out) { - memset(wild4_out, 0, sizeof(*wild4_out)); - wild4_out->sin_family = AF_INET; - wild4_out->sin_port = htons(port); + grpc_sockaddr_make_wildcard4(port, wild4_out); + grpc_sockaddr_make_wildcard6(port, wild6_out); +} + +void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out) { + memset(wild_out, 0, sizeof(*wild_out)); + wild_out->sin_family = AF_INET; + wild_out->sin_port = htons(port); +} - memset(wild6_out, 0, sizeof(*wild6_out)); - wild6_out->sin6_family = AF_INET6; - wild6_out->sin6_port = htons(port); +void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out) { + memset(wild_out, 0, sizeof(*wild_out)); + wild_out->sin6_family = AF_INET6; + wild_out->sin6_port = htons(port); } int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index 3f5b770e865..b49cc50491c 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -57,6 +57,12 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, struct sockaddr_in6 *wild6_out); +/* Writes 0.0.0.0:port. */ +void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out); + +/* Writes [::]:port. */ +void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out); + /* Return the IP port number of a sockaddr */ int grpc_sockaddr_get_port(const struct sockaddr *addr); diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h index cdea33fec07..08be0e54f8e 100644 --- a/src/core/iomgr/sockaddr_win32.h +++ b/src/core/iomgr/sockaddr_win32.h @@ -35,5 +35,7 @@ #define __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ #include +#include +#include -#endif // __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ */ diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c new file mode 100644 index 00000000000..3639798dbcd --- /dev/null +++ b/src/core/iomgr/socket_windows.c @@ -0,0 +1,77 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include + +#ifdef GPR_WINSOCK_SOCKET + +#include "src/core/iomgr/iocp_windows.h" +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/pollset_windows.h" + +grpc_winsocket *grpc_winsocket_create(SOCKET socket) { + grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); + gpr_log(GPR_DEBUG, "grpc_winsocket_create"); + memset(r, 0, sizeof(grpc_winsocket)); + r->socket = socket; + gpr_mu_init(&r->state_mu); + grpc_iomgr_ref(); + grpc_iocp_add_socket(r); + return r; +} + +void shutdown_op(grpc_winsocket_callback_info *info) { + if (!info->cb) return; + grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); +} + +void grpc_winsocket_shutdown(grpc_winsocket *socket) { + gpr_log(GPR_DEBUG, "grpc_winsocket_shutdown"); + shutdown_op(&socket->read_info); + shutdown_op(&socket->write_info); +} + +void grpc_winsocket_orphan(grpc_winsocket *socket) { + gpr_log(GPR_DEBUG, "grpc_winsocket_orphan"); + grpc_iomgr_unref(); + closesocket(socket->socket); + gpr_mu_destroy(&socket->state_mu); + gpr_free(socket); +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h new file mode 100644 index 00000000000..990b520c6da --- /dev/null +++ b/src/core/iomgr/socket_windows.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_HANDLE_WINDOWS_H__ +#define __GRPC_INTERNAL_IOMGR_HANDLE_WINDOWS_H__ + +#include + +#include +#include + +typedef struct grpc_winsocket_callback_info { + /* This is supposed to be a WSAOVERLAPPED, but in order to get that + * definition, we need to include ws2tcpip.h, which needs to be included + * from the top, otherwise it'll clash with a previous inclusion of + * windows.h that in turns includes winsock.h. If anyone knows a way + * to do it properly, feel free to send a patch. + */ + OVERLAPPED overlapped; + void(*cb)(void *opaque, int success); + void *opaque; + int has_pending_iocp; + DWORD bytes_transfered; + int wsa_error; +} grpc_winsocket_callback_info; + +typedef struct grpc_winsocket { + SOCKET socket; + + int added_to_iocp; + + grpc_winsocket_callback_info write_info; + grpc_winsocket_callback_info read_info; + + gpr_mu state_mu; +} grpc_winsocket; + +/* Create a wrapped windows handle. +This takes ownership of closing it. */ +grpc_winsocket *grpc_winsocket_create(SOCKET socket); + +void grpc_winsocket_shutdown(grpc_winsocket *socket); +void grpc_winsocket_orphan(grpc_winsocket *socket); + +#endif /* __GRPC_INTERNAL_IOMGR_HANDLE_WINDOWS_H__ */ diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c new file mode 100644 index 00000000000..2ed5f39b390 --- /dev/null +++ b/src/core/iomgr/tcp_client_windows.c @@ -0,0 +1,215 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_WINSOCK_SOCKET + +#include "src/core/iomgr/sockaddr_win32.h" + +#include +#include +#include +#include +#include + +#include "src/core/iomgr/tcp_client.h" +#include "src/core/iomgr/tcp_windows.h" +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/sockaddr.h" +#include "src/core/iomgr/sockaddr_utils.h" + +typedef struct { + void(*cb)(void *arg, grpc_endpoint *tcp); + void *cb_arg; + gpr_mu mu; + grpc_winsocket *socket; + gpr_timespec deadline; + grpc_alarm alarm; + int refs; +} async_connect; + +static void async_connect_cleanup(async_connect *ac) { + int done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } +} + +static void on_alarm(void *acp, int success) { + async_connect *ac = acp; + gpr_mu_lock(&ac->mu); + if (ac->socket != NULL && success) { + grpc_winsocket_shutdown(ac->socket); + } + async_connect_cleanup(ac); +} + +static void on_connect(void *acp, int success) { + async_connect *ac = acp; + SOCKET sock = ac->socket->socket; + grpc_endpoint *ep = NULL; + grpc_winsocket_callback_info *info = &ac->socket->write_info; + void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void *cb_arg = ac->cb_arg; + + grpc_alarm_cancel(&ac->alarm); + + if (success) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + &transfered_bytes, FALSE, + &flags); + GPR_ASSERT(transfered_bytes == 0); + if (!wsa_success) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); + gpr_free(utf8_message); + goto finish; + } else { + gpr_log(GPR_DEBUG, "on_connect: connection established"); + ep = grpc_tcp_create(ac->socket); + goto finish; + } + } else { + gpr_log(GPR_ERROR, "on_connect is shutting down"); + goto finish; + } + + abort(); + +finish: + gpr_mu_lock(&ac->mu); + if (!ep) { + grpc_winsocket_orphan(ac->socket); + } + async_connect_cleanup(ac); + cb(cb_arg, ep); +} + +void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), + void *arg, const struct sockaddr *addr, + int addr_len, gpr_timespec deadline) { + SOCKET sock = INVALID_SOCKET; + BOOL success; + int status; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in6 local_address; + async_connect *ac; + grpc_winsocket *socket = NULL; + LPFN_CONNECTEX ConnectEx; + GUID guid = WSAID_CONNECTEX; + DWORD ioctl_num_bytes; + const char *message = NULL; + char *utf8_message; + grpc_winsocket_callback_info *info; + + /* Use dualstack sockets where available. */ + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); + if (sock == INVALID_SOCKET) { + message = "Unable to create socket: %s"; + goto failure; + } + + if (!grpc_tcp_prepare_socket(sock)) { + message = "Unable to set socket options: %s"; + goto failure; + } + + status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), + &ioctl_num_bytes, NULL, NULL); + + if (status != 0) { + message = "Unable to retreive ConnectEx pointer: %s"; + goto failure; + } + + grpc_sockaddr_make_wildcard6(0, &local_address); + + status = bind(sock, (struct sockaddr *) &local_address, + sizeof(local_address)); + if (status != 0) { + message = "Unable to bind socket: %s"; + goto failure; + } + + socket = grpc_winsocket_create(sock); + info = &socket->write_info; + success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + + if (success) { + gpr_log(GPR_DEBUG, "connected immediately - but we still go to sleep"); + } else { + int error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) { + message = "ConnectEx failed: %s"; + goto failure; + } + } + + gpr_log(GPR_DEBUG, "grpc_tcp_client_connect: connection pending"); + ac = gpr_malloc(sizeof(async_connect)); + ac->cb = cb; + ac->cb_arg = arg; + ac->socket = socket; + gpr_mu_init(&ac->mu); + ac->refs = 2; + + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_socket_notify_on_write(socket, on_connect, ac); + return; + +failure: + utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, message, utf8_message); + gpr_free(utf8_message); + if (socket) { + grpc_winsocket_orphan(socket); + } else if (sock != INVALID_SOCKET) { + closesocket(sock); + } + cb(arg, NULL); +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c new file mode 100644 index 00000000000..e6161eb1e86 --- /dev/null +++ b/src/core/iomgr/tcp_server_windows.c @@ -0,0 +1,374 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_WINSOCK_SOCKET + +#define _GNU_SOURCE +#include "src/core/iomgr/sockaddr_utils.h" + +#include +#include +#include +#include +#include + +#include "src/core/iomgr/iocp_windows.h" +#include "src/core/iomgr/pollset_windows.h" +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/tcp_server.h" +#include "src/core/iomgr/tcp_windows.h" + +#define INIT_PORT_CAP 2 +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* one listening port */ +typedef struct server_port { + gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; + SOCKET new_socket; + grpc_winsocket *socket; + grpc_tcp_server *server; + LPFN_ACCEPTEX AcceptEx; +} server_port; + +/* the overall server */ +struct grpc_tcp_server { + grpc_tcp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + int active_ports; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; +}; + +grpc_tcp_server *grpc_tcp_server_create(void) { + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + return s; +} + +void grpc_tcp_server_destroy(grpc_tcp_server *s) { + size_t i; + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + for (i = 0; i < s->nports; i++) { + grpc_winsocket_shutdown(s->ports[i].socket); + } + /* wait while that happens */ + while (s->active_ports) { + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + } + gpr_mu_unlock(&s->mu); + + /* delete ALL the things */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_winsocket_orphan(sp->socket); + } + gpr_free(s->ports); + gpr_free(s); +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(SOCKET sock, + const struct sockaddr *addr, int addr_len) { + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + + if (sock == INVALID_SOCKET) goto error; + + if (!grpc_tcp_prepare_socket(sock)) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "Unable to prepare socket: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + if (bind(sock, addr, addr_len) == SOCKET_ERROR) { + char *addr_str; + char *utf8_message = gpr_format_message(WSAGetLastError()); + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, utf8_message); + gpr_free(utf8_message); + gpr_free(addr_str); + goto error; + } + + if (listen(sock, SOMAXCONN) == SOCKET_ERROR) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "listen: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + sockname_len = sizeof(sockname_temp); + if (getsockname(sock, (struct sockaddr *) &sockname_temp, &sockname_len) + == SOCKET_ERROR) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "getsockname: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + return grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + +error: + if (sock != INVALID_SOCKET) closesocket(sock); + return -1; +} + +static void on_accept(void *arg, int success); + +static void start_accept(server_port *port) { + SOCKET sock = INVALID_SOCKET; + char *message; + char *utf8_message; + BOOL success; + DWORD addrlen = sizeof(struct sockaddr_in6) + 16; + DWORD bytes_received = 0; + + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); + + if (sock == INVALID_SOCKET) { + message = "Unable to create socket: %s"; + goto failure; + } + + if (!grpc_tcp_prepare_socket(sock)) { + message = "Unable to prepare socket: %s"; + goto failure; + } + + success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, + addrlen, addrlen, &bytes_received, + &port->socket->read_info.overlapped); + + if (success) { + gpr_log(GPR_DEBUG, "accepted immediately - but we still go to sleep"); + } else { + int error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) { + message = "AcceptEx failed: %s"; + goto failure; + } + } + + port->new_socket = sock; + grpc_socket_notify_on_read(port->socket, on_accept, port); + return; + +failure: + utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, message, utf8_message); + gpr_free(utf8_message); + if (sock != INVALID_SOCKET) closesocket(sock); +} + +/* event manager callback when reads are ready */ +static void on_accept(void *arg, int success) { + server_port *sp = arg; + SOCKET sock = sp->new_socket; + grpc_winsocket_callback_info *info = &sp->socket->read_info; + grpc_endpoint *ep = NULL; + + if (success) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + &transfered_bytes, FALSE, + &flags); + if (!wsa_success) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); + gpr_free(utf8_message); + closesocket(sock); + } else { + gpr_log(GPR_DEBUG, "on_accept: accepted connection"); + ep = grpc_tcp_create(grpc_winsocket_create(sock)); + } + } else { + gpr_log(GPR_DEBUG, "on_accept: shutting down"); + closesocket(sock); + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + } + + if (ep) sp->server->cb(sp->server->cb_arg, ep); + start_accept(sp); +} + +static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, + const struct sockaddr *addr, int addr_len) { + server_port *sp; + int port; + int status; + GUID guid = WSAID_ACCEPTEX; + DWORD ioctl_num_bytes; + LPFN_ACCEPTEX AcceptEx; + + if (sock == INVALID_SOCKET) return -1; + + status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid, sizeof(guid), &AcceptEx, sizeof(AcceptEx), + &ioctl_num_bytes, NULL, NULL); + + if (status != 0) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); + gpr_free(utf8_message); + closesocket(sock); + return -1; + } + + port = prepare_socket(sock, addr, addr_len); + if (port >= 0) { + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->server = s; + sp->socket = grpc_winsocket_create(sock); + sp->AcceptEx = AcceptEx; + GPR_ASSERT(sp->socket); + gpr_mu_unlock(&s->mu); + } + + return port; +} + +int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + int addr_len) { + int allocated_port = -1; + unsigned i; + SOCKET sock; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in6 wildcard; + struct sockaddr *allocated_addr = NULL; + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int port; + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (i = 0; i < s->nports; i++) { + sockname_len = sizeof(sockname_temp); + if (0 == getsockname(s->ports[i].socket->socket, + (struct sockaddr *) &sockname_temp, + &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + if (port > 0) { + allocated_addr = malloc(addr_len); + memcpy(allocated_addr, addr, addr_len); + grpc_sockaddr_set_port(allocated_addr, port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcard6(port, &wildcard); + + addr = (struct sockaddr *) &wildcard; + addr_len = sizeof(wildcard); + } + + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); + if (sock == INVALID_SOCKET) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "unable to create socket: %s", utf8_message); + gpr_free(utf8_message); + } + + allocated_port = add_socket_to_server(s, sock, addr, addr_len); + gpr_free(allocated_addr); + + return allocated_port; +} + +SOCKET grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { + return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; +} + +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg) { + size_t i; + GPR_ASSERT(cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = cb; + s->cb_arg = cb_arg; + for (i = 0; i < s->nports; i++) { + start_accept(s->ports + i); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c new file mode 100644 index 00000000000..94d84f92b59 --- /dev/null +++ b/src/core/iomgr/tcp_windows.c @@ -0,0 +1,373 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_WINSOCK_SOCKET + +#include "src/core/iomgr/sockaddr_win32.h" + +#include +#include +#include +#include +#include + +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/iocp_windows.h" +#include "src/core/iomgr/sockaddr.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/tcp_client.h" + +static int set_non_block(SOCKET sock) { + int status; + unsigned long param = 1; + DWORD ret; + status = WSAIoctl(sock, FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, + NULL, NULL); + return status == 0; +} + +static int set_dualstack(SOCKET sock) { + int status; + unsigned long param = 0; + status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, + (const char *) ¶m, sizeof(param)); + return status == 0; +} + +int grpc_tcp_prepare_socket(SOCKET sock) { + if (!set_non_block(sock)) + return 0; + if (!set_dualstack(sock)) + return 0; + return 1; +} + +typedef struct grpc_tcp { + grpc_endpoint base; + grpc_winsocket *socket; + gpr_refcount refcount; + + grpc_endpoint_read_cb read_cb; + void *read_user_data; + gpr_slice read_slice; + int outstanding_read; + + grpc_endpoint_write_cb write_cb; + void *write_user_data; + gpr_slice_buffer write_slices; + int outstanding_write; + +} grpc_tcp; + +static void tcp_ref(grpc_tcp *tcp) { + gpr_log(GPR_DEBUG, "tcp_ref"); + gpr_ref(&tcp->refcount); +} + +static void tcp_unref(grpc_tcp *tcp) { + gpr_log(GPR_DEBUG, "tcp_unref"); + if (gpr_unref(&tcp->refcount)) { + gpr_log(GPR_DEBUG, "tcp_unref: destroying"); + gpr_slice_buffer_destroy(&tcp->write_slices); + grpc_winsocket_orphan(tcp->socket); + gpr_free(tcp); + } +} + +static void on_read(void *tcpp, int success) { + grpc_tcp *tcp = (grpc_tcp *) tcpp; + grpc_winsocket *socket = tcp->socket; + gpr_slice sub; + gpr_slice *slice = NULL; + size_t nslices = 0; + grpc_endpoint_cb_status status; + grpc_endpoint_read_cb cb = tcp->read_cb; + grpc_winsocket_callback_info *info = &socket->read_info; + void *opaque = tcp->read_user_data; + + GPR_ASSERT(tcp->outstanding_read); + + if (!success) { + tcp_unref(tcp); + cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + return; + } + + gpr_log(GPR_DEBUG, "on_read"); + tcp->outstanding_read = 0; + + if (socket->read_info.wsa_error != 0) { + char *utf8_message = gpr_format_message(info->wsa_error); + __debugbreak(); + gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); + gpr_free(utf8_message); + status = GRPC_ENDPOINT_CB_ERROR; + } else { + if (info->bytes_transfered != 0) { + sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); + gpr_log(GPR_DEBUG, "on_read: calling callback"); + status = GRPC_ENDPOINT_CB_OK; + slice = ⊂ + nslices = 1; + } else { + gpr_log(GPR_DEBUG, "on_read: closed socket"); + gpr_slice_unref(tcp->read_slice); + status = GRPC_ENDPOINT_CB_EOF; + } + } + tcp_unref(tcp); + cb(opaque, slice, nslices, status); +} + +static void win_notify_on_read(grpc_endpoint *ep, + grpc_endpoint_read_cb cb, void *arg) { + grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_winsocket *handle = tcp->socket; + grpc_winsocket_callback_info *info = &handle->read_info; + int status; + DWORD bytes_read = 0; + DWORD flags = 0; + int error; + WSABUF buffer; + + GPR_ASSERT(!tcp->outstanding_read); + tcp_ref(tcp); + tcp->outstanding_read = 1; + tcp->read_cb = cb; + tcp->read_user_data = arg; + + tcp->read_slice = gpr_slice_malloc(8192); + + buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); + buffer.buf = GPR_SLICE_START_PTR(tcp->read_slice); + + gpr_log(GPR_DEBUG, "win_notify_on_read: calling WSARecv without overlap"); + status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, + NULL, NULL); + info->wsa_error = status == 0 ? 0 : WSAGetLastError(); + + if (info->wsa_error != WSAEWOULDBLOCK) { + gpr_log(GPR_DEBUG, "got response immediately, calling on_read"); + info->bytes_transfered = bytes_read; + /* This might heavily recurse. */ + on_read(tcp, 1); + return; + } + + gpr_log(GPR_DEBUG, "got WSAEWOULDBLOCK - calling WSARecv with overlap"); + + memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); + status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, + &info->overlapped, NULL); + + if (status == 0) { + gpr_log(GPR_DEBUG, "got response immediately, but we're going to sleep"); + grpc_socket_notify_on_read(tcp->socket, on_read, tcp); + return; + } + + error = WSAGetLastError(); + + if (error != WSA_IO_PENDING) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + __debugbreak(); + gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message); + gpr_free(utf8_message); + /* would the IO completion port be called anyway... ? Let's assume not. */ + tcp->outstanding_read = 0; + tcp_unref(tcp); + cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + return; + } + + gpr_log(GPR_DEBUG, "waiting on the IO completion port now"); + grpc_socket_notify_on_read(tcp->socket, on_read, tcp); +} + +static void on_write(void *tcpp, int success) { + grpc_tcp *tcp = (grpc_tcp *) tcpp; + grpc_winsocket *handle = tcp->socket; + grpc_winsocket_callback_info *info = &handle->write_info; + grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; + grpc_endpoint_write_cb cb = tcp->write_cb; + void *opaque = tcp->write_user_data; + + GPR_ASSERT(tcp->outstanding_write); + + gpr_log(GPR_DEBUG, "on_write"); + + if (!success) { + tcp_unref(tcp); + cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); + return; + } + + if (info->wsa_error != 0) { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); + gpr_free(utf8_message); + status = GRPC_ENDPOINT_CB_ERROR; + } else { + GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); + } + + gpr_slice_buffer_reset_and_unref(&tcp->write_slices); + tcp->outstanding_write = 0; + + tcp_unref(tcp); + cb(opaque, status); +} + +static grpc_endpoint_write_status win_write(grpc_endpoint *ep, + gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, + void *arg) { + grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_winsocket *socket = tcp->socket; + grpc_winsocket_callback_info *info = &socket->write_info; + unsigned i; + DWORD bytes_sent; + int status; + WSABUF local_buffers[16]; + WSABUF *allocated = NULL; + WSABUF *buffers = local_buffers; + + GPR_ASSERT(nslices != 0); + GPR_ASSERT(GPR_SLICE_LENGTH(slices[0]) != 0); + GPR_ASSERT(!tcp->outstanding_write); + tcp_ref(tcp); + + gpr_log(GPR_DEBUG, "win_write"); + + tcp->outstanding_write = 1; + tcp->write_cb = cb; + tcp->write_user_data = arg; + gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); + + if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { + buffers = (WSABUF *) gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); + allocated = buffers; + } + + for (i = 0; i < tcp->write_slices.count; i++) { + buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]); + buffers[i].buf = GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); + } + + gpr_log(GPR_DEBUG, "win_write: calling WSASend without overlap"); + status = WSASend(socket->socket, buffers, tcp->write_slices.count, + &bytes_sent, 0, NULL, NULL); + info->wsa_error = status == 0 ? 0 : WSAGetLastError(); + + if (info->wsa_error != WSAEWOULDBLOCK) { + grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; + gpr_log(GPR_DEBUG, "got response immediately, cleaning up and leaving"); + if (status == 0) { + ret = GRPC_ENDPOINT_WRITE_DONE; + GPR_ASSERT(bytes_sent == tcp->write_slices.length); + } else { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); + gpr_free(utf8_message); + } + if (allocated) gpr_free(allocated); + gpr_slice_buffer_reset_and_unref(&tcp->write_slices); + tcp->outstanding_write = 0; + tcp_unref(tcp); + return ret; + } + + gpr_log(GPR_DEBUG, "got WSAEWOULDBLOCK - calling WSASend with overlap"); + + memset(&socket->write_info, 0, sizeof(OVERLAPPED)); + status = WSASend(socket->socket, buffers, tcp->write_slices.count, + &bytes_sent, 0, &socket->write_info.overlapped, NULL); + if (allocated) gpr_free(allocated); + + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + __debugbreak(); + gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); + gpr_free(utf8_message); + /* would the IO completion port be called anyway ? Let's assume not. */ + tcp->outstanding_write = 0; + tcp_unref(tcp); + return GRPC_ENDPOINT_WRITE_ERROR; + } + gpr_log(GPR_DEBUG, "win_write: got pending op"); + } else { + gpr_log(GPR_DEBUG, "wrote data immediately - but we're going to sleep"); + } + + grpc_socket_notify_on_write(socket, on_write, tcp); + return GRPC_ENDPOINT_WRITE_PENDING; +} + +static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { + grpc_tcp *tcp = (grpc_tcp *) ep; + gpr_log(GPR_DEBUG, "win_add_to_pollset"); + grpc_iocp_add_socket(tcp->socket); +} + +static void win_shutdown(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *) ep; + gpr_log(GPR_DEBUG, "win_shutdown"); + grpc_winsocket_shutdown(tcp->socket); +} + +static void win_destroy(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *) ep; + gpr_log(GPR_DEBUG, "win_destroy"); + tcp_unref(tcp); +} + +static grpc_endpoint_vtable vtable = { + win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy +}; + +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { + grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); + memset(tcp, 0, sizeof(grpc_tcp)); + tcp->base.vtable = &vtable; + tcp->socket = socket; + gpr_slice_buffer_init(&tcp->write_slices); + gpr_ref_init(&tcp->refcount, 1); + return &tcp->base; +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h new file mode 100644 index 00000000000..cbe60801b49 --- /dev/null +++ b/src/core/iomgr/tcp_windows.h @@ -0,0 +1,57 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_TCP_WINDOWS_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_WINDOWS_H__ +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/socket_windows.h" + +/* Create a tcp endpoint given a winsock handle. + * Takes ownership of the handle. + */ +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket); + +int grpc_tcp_prepare_socket(SOCKET sock); + +#endif /* __GRPC_INTERNAL_IOMGR_TCP_WINDOWS_H__ */ diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index dc8c1d0785a..840f24f68aa 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -35,11 +35,16 @@ #ifdef GPR_WIN32 -#include -#include #include #include +#include +#include +#include +#include + +#include "src/core/support/string_win32.h" + void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format, ...) { char *message = NULL; @@ -74,8 +79,35 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, /* Simple starter implementation */ void gpr_default_log(gpr_log_func_args *args) { - fprintf(stderr, "%s %s:%d: %s\n", gpr_log_severity_string(args->severity), + char time_buffer[64]; + gpr_timespec now = gpr_now(); + struct tm tm; + + if (localtime_s(&tm, &now.tv_sec)) { + strcpy(time_buffer, "error:localtime"); + } else if (0 == + strftime(time_buffer, sizeof(time_buffer), "%m%d %H:%M:%S", &tm)) { + strcpy(time_buffer, "error:strftime"); + } + + fprintf(stderr, "%s%s.%09u %5u %s:%d: %s\n", + gpr_log_severity_string(args->severity), time_buffer, + (int)(now.tv_nsec), GetCurrentThreadId(), args->file, args->line, args->message); } -#endif +char *gpr_format_message(DWORD messageid) { + LPTSTR tmessage; + char *message; + DWORD status = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, messageid, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPTSTR)(&tmessage), 0, NULL); + message = gpr_tchar_to_char(tmessage); + LocalFree(tmessage); + return message; +} + +#endif /* GPR_WIN32 */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cc7094a0ce2..b31f4f1931d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1264,7 +1264,10 @@ grpc_call_error grpc_call_server_accept_old(grpc_call *call, ls = get_legacy_state(call); err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) return err; + if (err != GRPC_CALL_OK) { + unlock(call); + return err; + } ls->finished_tag = finished_tag; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 8455ccf5c98..b28a52bcbdd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,13 +74,15 @@ typedef struct { void *tag; union { struct { - grpc_completion_queue *cq; + grpc_completion_queue *cq_new; + grpc_completion_queue *cq_bind; grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq; + grpc_completion_queue *cq_new; + grpc_completion_queue *cq_bind; grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; @@ -99,7 +101,7 @@ typedef struct { struct registered_method { char *method; char *host; - call_link pending; + call_data *pending; requested_call_array requested; registered_method *next; }; @@ -118,6 +120,9 @@ struct channel_data { /* linked list of all channels on a server */ channel_data *next; channel_data *prev; + channel_registered_method *registered_methods; + gpr_uint32 registered_method_slots; + gpr_uint32 registered_method_max_probes; }; struct grpc_server { @@ -167,8 +172,10 @@ struct call_data { legacy_data *legacy; - gpr_uint8 included[CALL_LIST_COUNT]; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; + + grpc_completion_queue *cq_new; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -180,30 +187,30 @@ static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static int call_list_join(grpc_server *server, call_data *call, +static int call_list_join(call_data **root, call_data *call, call_list list) { - if (call->included[list]) return 0; - call->included[list] = 1; - if (!server->lists[list]) { - server->lists[list] = call; + GPR_ASSERT(!call->root[list]); + call->root[list] = root; + if (!*root) { + *root = call; call->links[list].next = call->links[list].prev = call; } else { - call->links[list].next = server->lists[list]; - call->links[list].prev = server->lists[list]->links[list].prev; + call->links[list].next = *root; + call->links[list].prev = (*root)->links[list].prev; call->links[list].next->links[list].prev = call->links[list].prev->links[list].next = call; } return 1; } -static call_data *call_list_remove_head(grpc_server *server, call_list list) { - call_data *out = server->lists[list]; +static call_data *call_list_remove_head(call_data **root, call_list list) { + call_data *out = *root; if (out) { - out->included[list] = 0; + out->root[list] = NULL; if (out->links[list].next == out) { - server->lists[list] = NULL; + *root = NULL; } else { - server->lists[list] = out->links[list].next; + *root = out->links[list].next; out->links[list].next->links[list].prev = out->links[list].prev; out->links[list].prev->links[list].next = out->links[list].next; } @@ -211,18 +218,18 @@ static call_data *call_list_remove_head(grpc_server *server, call_list list) { return out; } -static int call_list_remove(grpc_server *server, call_data *call, - call_list list) { - if (!call->included[list]) return 0; - call->included[list] = 0; - if (server->lists[list] == call) { - server->lists[list] = call->links[list].next; - if (server->lists[list] == call) { - server->lists[list] = NULL; +static int call_list_remove(call_data *call, call_list list) { + call_data **root = call->root[list]; + if (root == NULL) return 0; + call->root[list] = NULL; + if (*root == call) { + *root = call->links[list].next; + if (*root == call) { + *root = NULL; return 1; } } - GPR_ASSERT(server->lists[list] != call); + GPR_ASSERT(*root != call); call->links[list].next->links[list].prev = call->links[list].prev; call->links[list].prev->links[list].next = call->links[list].next; return 1; @@ -283,23 +290,53 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } +static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) { + requested_call rc; + call_data *calld = elem->call_data; + if (array->count == 0) { + calld->state = PENDING; + call_list_join(pending_root, calld, PENDING_START); + gpr_mu_unlock(&server->mu); + } else { + rc = server->requested_calls.calls[--server->requested_calls.count]; + calld->state = ACTIVATED; + gpr_mu_unlock(&server->mu); + begin_call(server, calld, &rc); + } +} + static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; + gpr_uint32 i; + gpr_uint32 hash; + channel_registered_method *rm; gpr_mu_lock(&server->mu); - if (server->requested_calls.count > 0) { - requested_call rc = - server->requested_calls.calls[--server->requested_calls.count]; - calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); - begin_call(server, calld, &rc); - } else { - calld->state = PENDING; - call_list_join(server, calld, PENDING_START); - gpr_mu_unlock(&server->mu); + if (chand->registered_methods && calld->path && calld->host) { + /* check for an exact match with host */ + hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); + for (i = 0; i < chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + if (!rm) break; + if (rm->host != calld->host) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + return; + } + /* check for a wildcard method definition (no host set) */ + hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); + for (i = 0; i < chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + if (!rm) break; + if (rm->host != NULL) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + return; + } } + finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -314,7 +351,7 @@ static void stream_closed(grpc_call_element *elem) { case ACTIVATED: break; case PENDING: - call_list_remove(chand->server, calld, PENDING_START); + call_list_remove(calld, PENDING_START); /* fallthrough intended */ case NOT_STARTED: calld->state = ZOMBIED; @@ -445,7 +482,7 @@ static void init_call_elem(grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_lock(&chand->server->mu); - call_list_join(chand->server, calld, ALL_CALLS); + call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); @@ -458,7 +495,7 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(chand->server, elem->call_data, i); + call_list_remove(elem->call_data, i); } if (chand->server->shutdown && chand->server->have_shutdown_tag && chand->server->lists[ALL_CALLS] == NULL) { @@ -493,6 +530,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; + chand->registered_methods = NULL; } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -600,8 +638,18 @@ grpc_transport_setup_result grpc_server_setup_transport( grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); size_t i; + size_t num_registered_methods; + size_t alloc; + registered_method *rm; + channel_registered_method *crm; grpc_channel *channel; channel_data *chand; + grpc_mdstr *host; + grpc_mdstr *method; + gpr_uint32 hash; + gpr_uint32 slots; + gpr_uint32 probes; + gpr_uint32 max_probes = 0; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -621,6 +669,32 @@ grpc_transport_setup_result grpc_server_setup_transport( server_ref(s); chand->channel = channel; + num_registered_methods = 0; + for (rm = s->registered_methods; rm; rm = rm->next) { + num_registered_methods++; + } + /* build a lookup table phrased in terms of mdstr's in this channels context + to quickly find registered methods */ + if (num_registered_methods > 0) { + slots = 2 * num_registered_methods; + alloc = sizeof(channel_registered_method) * slots; + chand->registered_methods = gpr_malloc(alloc); + memset(chand->registered_methods, 0, alloc); + for (rm = s->registered_methods; rm; rm = rm->next) { + host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; + method = grpc_mdstr_from_string(mdctx, rm->host); + hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); + for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++); + if (probes > max_probes) max_probes = probes; + crm = &chand->registered_methods[(hash + probes) % slots]; + crm->server_registered_method = rm; + crm->host = host; + crm->method = method; + } + chand->registered_method_slots = slots; + chand->registered_method_max_probes = max_probes; + } + gpr_mu_lock(&s->mu); chand->next = &s->root_channel_data; chand->prev = chand->next->prev; @@ -752,7 +826,15 @@ static grpc_call_error queue_call_request(grpc_server *server, fail_call(server, rc); return GRPC_CALL_OK; } - calld = call_list_remove_head(server, PENDING_START); + switch (rc->type) { + case LEGACY_CALL: + case BATCH_CALL: + calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); + break; + case REGISTERED_CALL: + calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START); + break; + } if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; @@ -769,12 +851,14 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq, void *tag) { + grpc_completion_queue *cq_new, + grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq = cq; + rc.data.batch.cq_new = cq_new; + rc.data.batch.cq_bind = cq_bind; rc.data.batch.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; @@ -784,12 +868,14 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, - grpc_byte_buffer **optional_payload, grpc_completion_queue *cq, void *tag) { + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind, + void *tag) { requested_call rc; - grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq = cq; + rc.data.registered.cq_new = cq_new; + rc.data.registered.cq_bind = cq_bind; rc.data.registered.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; @@ -848,16 +934,17 @@ static void begin_call(grpc_server *server, call_data *calld, &rc->data.batch.details->host_capacity, calld->host); cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq); + grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; + calld->cq_new = rc->data.batch.cq_new; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq); + grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; @@ -867,6 +954,7 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } + calld->cq_new = rc->data.registered.cq_new; publish = publish_registered_or_batch; break; } @@ -885,13 +973,13 @@ static void fail_call(grpc_server *server, requested_call *rc) { case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.batch.cq, rc->tag, NULL, do_nothing, + grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.cq, rc->tag, NULL, do_nothing, + grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; } @@ -920,9 +1008,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status); + call_data *calld = elem->call_data; + grpc_cq_end_op_complete(calld->cq_new, tag, call, + do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 0315ffb6bdd..1aa79d46150 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,13 +31,140 @@ * */ +#include #include #include +#include "src/cpp/proto/proto_utils.h" + namespace grpc { +void CallOpBuffer::Reset(void* next_return_tag) { + return_tag_ = next_return_tag; + initial_metadata_count_ = 0; + if (initial_metadata_) { + gpr_free(initial_metadata_); + } + send_message_ = nullptr; + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } + recv_message_ = nullptr; + if (recv_message_buf_) { + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } + client_send_close_ = false; + recv_status_ = nullptr; + status_code_ = GRPC_STATUS_OK; + if (status_details_) { + gpr_free(status_details_); + status_details_ = nullptr; + } + status_details_capacity_ = 0; +} + +namespace { +// TODO(yangg) if the map is changed before we send, the pointers will be a +// mess. Make sure it does not happen. +grpc_metadata* FillMetadata( + std::multimap* metadata) { + if (metadata->empty()) { return nullptr; } + grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc( + metadata->size()* sizeof(grpc_metadata)); + size_t i = 0; + for (auto iter = metadata->cbegin(); + iter != metadata->cend(); + ++iter, ++i) { + metadata_array[i].key = iter->first.c_str(); + metadata_array[i].value = iter->second.c_str(); + metadata_array[i].value_length = iter->second.size(); + } + return metadata_array; +} +} // namespace + +void CallOpBuffer::AddSendInitialMetadata( + std::multimap* metadata) { + initial_metadata_count_ = metadata->size(); + initial_metadata_ = FillMetadata(metadata); +} + +void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { + send_message_ = &message; +} + +void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) { + recv_message_ = message; +} + +void CallOpBuffer::AddClientSendClose() { + client_send_close_ = true; +} + +void CallOpBuffer::AddClientRecvStatus(Status *status) { + recv_status_ = status; +} + + +void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { + *nops = 0; + if (initial_metadata_count_) { + ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; + ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; + (*nops)++; + } + if (send_message_) { + bool success = SerializeProto(*send_message_, &write_buffer_); + if (!success) { + // TODO handle parse failure + } + ops[*nops].op = GRPC_OP_SEND_MESSAGE; + ops[*nops].data.send_message = write_buffer_; + (*nops)++; + } + if (recv_message_) { + ops[*nops].op = GRPC_OP_RECV_MESSAGE; + ops[*nops].data.recv_message = &recv_message_buf_; + (*nops)++; + } + if (client_send_close_) { + ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + (*nops)++; + } + if (recv_status_) { + ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + // ops[*nops].data.recv_status_on_client.trailing_metadata = + ops[*nops].data.recv_status_on_client.status = &status_code_; + ops[*nops].data.recv_status_on_client.status_details = &status_details_; + ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; + (*nops)++; + } +} + +void CallOpBuffer::ReleaseSendBuffer() { + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } +} + +void CallOpBuffer::FinalizeResult(void *tag, bool *status) { + +} + +void CCallDeleter::operator()(grpc_call* c) { + grpc_call_destroy(c); +} + +Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) + : channel_(channel), cq_(cq), call_(call) {} + void Call::PerformOps(CallOpBuffer* buffer) { channel_->PerformOpsOnCall(buffer, this); + buffer->ReleaseSendBuffer(); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 5d44ab2ba42..f5bbfdc6f73 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -54,9 +54,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerC secure_(creds != nullptr) { if (creds) { server_ = - grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); + grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); } else { - server_ = grpc_server_create(cq_.cq(), nullptr); + server_ = grpc_server_create(nullptr, nullptr); } } @@ -80,13 +80,17 @@ Server::~Server() { } bool Server::RegisterService(RpcService *service) { + if (!cq_sync_) { + cq_sync_.reset(new CompletionQueue); + } for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); - if (method_map_.find(method->name()) != method_map_.end()) { + void *tag = grpc_server_register_method(server_, method->name(), nullptr); + if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - method_map_.insert(std::make_pair(method->name(), method)); + methods_.emplace_back(method, tag); } return true; } @@ -106,7 +110,11 @@ bool Server::Start() { grpc_server_start(server_); // Start processing rpcs. - if (thread_pool_) { + if (cq_sync_) { + for (auto& m : methods_) { + m.Request(cq_sync_.get()); + } + ScheduleCallback(); } @@ -126,12 +134,6 @@ void Server::Shutdown() { } } } - - // Shutdown the completion queue. - cq_.Shutdown(); - void *tag = nullptr; - bool ok = false; - GPR_ASSERT(false == cq_.Next(&tag, &ok)); } void Server::ScheduleCallback() { @@ -144,35 +146,15 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. - void *tag = nullptr; - GPR_ASSERT(started_); - grpc_call *c_call = NULL; - grpc_call_details call_details; - grpc_call_details_init(&call_details); - grpc_metadata_array initial_metadata; - grpc_metadata_array_init(&initial_metadata); - CompletionQueue cq; - grpc_call_error err = grpc_server_request_call(server_, &c_call, &call_details, &initial_metadata, cq.cq(), nullptr); - GPR_ASSERT(err == GRPC_CALL_OK); - bool ok = false; - GPR_ASSERT(cq_.Next(&tag, &ok)); - if (ok) { - ServerContext context; - Call call(c_call, nullptr, &cq); + auto* mrd = MethodRequestData::Wait(cq_sync_.get()); + if (mrd) { + MethodRequestData::CallData cd(mrd); + + mrd->Request(cq_sync_.get()); ScheduleCallback(); - RpcServiceMethod *method = nullptr; - auto iter = method_map_.find(call_details.method); - if (iter != method_map_.end()) { - method = iter->second; - } - // TODO(ctiller): allocate only if necessary - std::unique_ptr request(method->AllocateRequestProto()); - std::unique_ptr response(method->AllocateResponseProto()); - method->handler()->RunHandler(MethodHandler::HandlerParameter( - &call, &context, request.get(), response.get())); + + cd.Run(); } - grpc_call_details_destroy(&call_details); - grpc_metadata_array_destroy(&initial_metadata); { std::unique_lock lock(mu_); diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore index dbf38f34b73..d35ff63f6ef 100644 --- a/src/csharp/.gitignore +++ b/src/csharp/.gitignore @@ -1,2 +1,4 @@ *.userprefs test-results +packages +Grpc.v12.suo diff --git a/src/csharp/GrpcApi/.gitignore b/src/csharp/GrpcApi/.gitignore index 2cc8cca52d0..4795a95b94e 100644 --- a/src/csharp/GrpcApi/.gitignore +++ b/src/csharp/GrpcApi/.gitignore @@ -1,2 +1,3 @@ test-results bin +obj diff --git a/src/csharp/GrpcApi/GrpcApi.csproj b/src/csharp/GrpcApi/GrpcApi.csproj index f0f11de2167..5a4ae67bd55 100644 --- a/src/csharp/GrpcApi/GrpcApi.csproj +++ b/src/csharp/GrpcApi/GrpcApi.csproj @@ -1,4 +1,4 @@ - + Debug @@ -30,19 +30,23 @@ false + + False + ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll + - - False + + ..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll - - - False + + ..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll - - False + + + ..\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll - - ..\lib\Google.ProtocolBuffers.dll + + ..\packages\Rx-PlatformServices.2.2.5\lib\net45\System.Reactive.PlatformServices.dll @@ -63,12 +67,10 @@ + - - - \ No newline at end of file diff --git a/src/csharp/GrpcApi/packages.config b/src/csharp/GrpcApi/packages.config new file mode 100644 index 00000000000..a6a949b3b3a --- /dev/null +++ b/src/csharp/GrpcApi/packages.config @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/src/csharp/GrpcApiTests/.gitignore b/src/csharp/GrpcApiTests/.gitignore index 2cc8cca52d0..4795a95b94e 100644 --- a/src/csharp/GrpcApiTests/.gitignore +++ b/src/csharp/GrpcApiTests/.gitignore @@ -1,2 +1,3 @@ test-results bin +obj diff --git a/src/csharp/GrpcApiTests/GrpcApiTests.csproj b/src/csharp/GrpcApiTests/GrpcApiTests.csproj index d0aac2b7533..cb955cff41b 100644 --- a/src/csharp/GrpcApiTests/GrpcApiTests.csproj +++ b/src/csharp/GrpcApiTests/GrpcApiTests.csproj @@ -1,4 +1,4 @@ - + Debug @@ -30,13 +30,14 @@ false - - - False + + False + ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll - - ..\lib\Google.ProtocolBuffers.dll + + ..\packages\NUnit.2.6.4\lib\nunit.framework.dll + @@ -53,4 +54,10 @@ GrpcCore + + + + + + \ No newline at end of file diff --git a/src/csharp/GrpcApiTests/packages.config b/src/csharp/GrpcApiTests/packages.config new file mode 100644 index 00000000000..51c17bcd5e7 --- /dev/null +++ b/src/csharp/GrpcApiTests/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/csharp/GrpcCore/.gitignore b/src/csharp/GrpcCore/.gitignore index ba077a4031a..8d4a6c08a83 100644 --- a/src/csharp/GrpcCore/.gitignore +++ b/src/csharp/GrpcCore/.gitignore @@ -1 +1,2 @@ bin +obj \ No newline at end of file diff --git a/src/csharp/GrpcCoreTests/.gitignore b/src/csharp/GrpcCoreTests/.gitignore index 2cc8cca52d0..775a9440a2c 100644 --- a/src/csharp/GrpcCoreTests/.gitignore +++ b/src/csharp/GrpcCoreTests/.gitignore @@ -1,2 +1,3 @@ test-results bin +obj \ No newline at end of file diff --git a/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj b/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj index 111f0883db0..ca52cd8f40d 100644 --- a/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj +++ b/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj @@ -1,4 +1,4 @@ - + Debug @@ -30,10 +30,10 @@ false - - - False + + ..\packages\NUnit.2.6.4\lib\nunit.framework.dll + @@ -49,4 +49,10 @@ GrpcCore + + + + + + \ No newline at end of file diff --git a/src/csharp/GrpcCoreTests/packages.config b/src/csharp/GrpcCoreTests/packages.config new file mode 100644 index 00000000000..c714ef3a23e --- /dev/null +++ b/src/csharp/GrpcCoreTests/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/csharp/InteropClient/.gitignore b/src/csharp/InteropClient/.gitignore index ba077a4031a..8d4a6c08a83 100644 --- a/src/csharp/InteropClient/.gitignore +++ b/src/csharp/InteropClient/.gitignore @@ -1 +1,2 @@ bin +obj \ No newline at end of file diff --git a/src/csharp/InteropClient/InteropClient.csproj b/src/csharp/InteropClient/InteropClient.csproj index b8e099d7852..a450f3a2feb 100644 --- a/src/csharp/InteropClient/InteropClient.csproj +++ b/src/csharp/InteropClient/InteropClient.csproj @@ -1,4 +1,4 @@ - + Debug @@ -33,13 +33,14 @@ x86 - - - False + + False + ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll - - ..\lib\Google.ProtocolBuffers.dll + + ..\packages\NUnit.2.6.4\lib\nunit.framework.dll + @@ -56,4 +57,7 @@ GrpcApi + + + \ No newline at end of file diff --git a/src/csharp/InteropClient/packages.config b/src/csharp/InteropClient/packages.config new file mode 100644 index 00000000000..51c17bcd5e7 --- /dev/null +++ b/src/csharp/InteropClient/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/csharp/MathClient/.gitignore b/src/csharp/MathClient/.gitignore index ba077a4031a..1746e3269ed 100644 --- a/src/csharp/MathClient/.gitignore +++ b/src/csharp/MathClient/.gitignore @@ -1 +1,2 @@ bin +obj diff --git a/src/csharp/README.md b/src/csharp/README.md index 0df6925b395..a16f1e719e1 100755 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -15,8 +15,15 @@ EXPERIMENTAL ONLY completely rewritten. -INSTALLATION AND USAGE ----------------------- +INSTALLATION AND USAGE: WINDOWS +------------------------------- + +- Open Grpc.sln using Visual Studio 2013. NuGet dependencies will be restored + upon build. + + +INSTALLATION AND USAGE: LINUX & MONO +------------------------------------ - Compile and install the gRPC C Core library ``` @@ -31,6 +38,18 @@ sudo apt-get install monodevelop monodevelop-nunit sudo apt-get install nunit nunit-console ``` +- NuGet is used to manage project's dependencies. Prior opening Grpc.sln, + download dependencies using NuGet restore command: + +``` +# Import needed certicates into Mono certificate store: +mozroots --import --sync + +# Download NuGet.exe http://nuget.codeplex.com/releases/ +# Restore the nuget packages with Grpc C# dependencies +mono ~/Downloads/NuGet.exe restore Grpc.sln +``` + - Use MonoDevelop to open the solution Grpc.sln (you can also run unit tests from there). diff --git a/src/csharp/lib/Google.ProtocolBuffers.dll b/src/csharp/lib/Google.ProtocolBuffers.dll deleted file mode 100755 index ce2f466b243..00000000000 Binary files a/src/csharp/lib/Google.ProtocolBuffers.dll and /dev/null differ diff --git a/src/python/setup.py b/src/python/setup.py index 58dc3b17dfa..5e566bad4fd 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -38,6 +38,7 @@ _EXTENSION_SOURCES = ( 'src/_adapter/_completion_queue.c', 'src/_adapter/_error.c', 'src/_adapter/_server.c', + 'src/_adapter/_server_credentials.c', ) _EXTENSION_INCLUDE_DIRECTORIES = ( diff --git a/src/python/src/_adapter/_c.c b/src/python/src/_adapter/_c.c index d1f7fbb0d54..6fb7fa29faf 100644 --- a/src/python/src/_adapter/_c.c +++ b/src/python/src/_adapter/_c.c @@ -38,6 +38,7 @@ #include "_adapter/_channel.h" #include "_adapter/_call.h" #include "_adapter/_server.h" +#include "_adapter/_server_credentials.h" static PyObject *init(PyObject *self, PyObject *args) { grpc_init(); @@ -74,4 +75,7 @@ PyMODINIT_FUNC init_c(void) { if (pygrpc_add_server(module) == -1) { return; } + if (pygrpc_add_server_credentials(module) == -1) { + return; + } } diff --git a/src/python/src/_adapter/_c_test.py b/src/python/src/_adapter/_c_test.py index bc0a622cc4f..19c91ffe018 100644 --- a/src/python/src/_adapter/_c_test.py +++ b/src/python/src/_adapter/_c_test.py @@ -136,6 +136,32 @@ class _CTest(unittest.TestCase): _c.shut_down() + def test_server_credentials(self): + root_certificates = b'Trust starts here. Really.' + first_private_key = b'This is a really bad private key, yo.' + first_certificate_chain = b'Trust me! Do I not look trustworty?' + second_private_key = b'This is another bad private key, yo.' + second_certificate_chain = b'Look into my eyes; you can totes trust me.' + + _c.init() + + server_credentials = _c.ServerCredentials( + None, ((first_private_key, first_certificate_chain),)) + del server_credentials + server_credentials = _c.ServerCredentials( + root_certificates, ((first_private_key, first_certificate_chain),)) + del server_credentials + server_credentials = _c.ServerCredentials( + root_certificates, + ((first_private_key, first_certificate_chain), + (second_private_key, second_certificate_chain),)) + del server_credentials + with self.assertRaises(TypeError): + _c.ServerCredentials( + root_certificates, first_private_key, second_certificate_chain) + + _c.shut_down() + if __name__ == '__main__': unittest.main() diff --git a/src/python/src/_adapter/_server_credentials.c b/src/python/src/_adapter/_server_credentials.c new file mode 100644 index 00000000000..390266ae897 --- /dev/null +++ b/src/python/src/_adapter/_server_credentials.c @@ -0,0 +1,157 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_server_credentials.h" + +#include +#include +#include + +static int pygrpc_server_credentials_init(ServerCredentials *self, + PyObject *args, PyObject *kwds) { + char *root_certificates; + PyObject *pair_sequence; + Py_ssize_t pair_count; + grpc_ssl_pem_key_cert_pair *pairs; + int error; + PyObject *iterator; + int i; + PyObject *pair; + + if (!(PyArg_ParseTuple(args, "zO", &root_certificates, &pair_sequence))) { + self->c_server_credentials = NULL; + return -1; + } + + pair_count = PySequence_Length(pair_sequence); + if (pair_count == -1) { + self->c_server_credentials = NULL; + return -1; + } + + iterator = PyObject_GetIter(pair_sequence); + if (iterator == NULL) { + self->c_server_credentials = NULL; + return -1; + } + pairs = gpr_malloc(pair_count * sizeof(grpc_ssl_pem_key_cert_pair)); + error = 0; + for (i = 0; i < pair_count; i++) { + pair = PyIter_Next(iterator); + if (pair == NULL) { + error = 1; + break; + } + if (!(PyArg_ParseTuple(pair, "ss", &pairs[i].private_key, + &pairs[i].cert_chain))) { + error = 1; + Py_DECREF(pair); + break; + } + Py_DECREF(pair); + } + Py_DECREF(iterator); + + if (error) { + self->c_server_credentials = NULL; + gpr_free(pairs); + return -1; + } else { + self->c_server_credentials = grpc_ssl_server_credentials_create( + root_certificates, pairs, pair_count); + gpr_free(pairs); + return 0; + } +} + +static void pygrpc_server_credentials_dealloc(ServerCredentials *self) { + if (self->c_server_credentials != NULL) { + grpc_server_credentials_release(self->c_server_credentials); + } + self->ob_type->tp_free((PyObject *)self); +} + +PyTypeObject pygrpc_ServerCredentialsType = { + PyObject_HEAD_INIT(NULL)0, /*ob_size*/ + "_grpc.ServerCredencials", /*tp_name*/ + sizeof(ServerCredentials), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_server_credentials_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_server_credentials.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_server_credentials_init, /* tp_init */ +}; + +int pygrpc_add_server_credentials(PyObject *module) { + pygrpc_ServerCredentialsType.tp_new = PyType_GenericNew; + if (PyType_Ready(&pygrpc_ServerCredentialsType) < 0) { + PyErr_SetString(PyExc_RuntimeError, + "Error defining pygrpc_ServerCredentialsType!"); + return -1; + } + if (PyModule_AddObject(module, "ServerCredentials", + (PyObject *)&pygrpc_ServerCredentialsType) == -1) { + PyErr_SetString(PyExc_ImportError, + "Couldn't add ServerCredentials type to module!"); + return -1; + } + return 0; +} diff --git a/src/python/src/_adapter/_server_credentials.h b/src/python/src/_adapter/_server_credentials.h new file mode 100644 index 00000000000..2e56efdcd9b --- /dev/null +++ b/src/python/src/_adapter/_server_credentials.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__SERVER_CREDENTIALS_H_ +#define _ADAPTER__SERVER_CREDENTIALS_H_ + +#include +#include + +typedef struct { + PyObject_HEAD grpc_server_credentials *c_server_credentials; +} ServerCredentials; + +PyTypeObject pygrpc_ServerCredentialsType; + +int pygrpc_add_server_credentials(PyObject *module); + +#endif /* _ADAPTER__SERVER_CREDENTIALS_H_ */ diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index de65ef317d7..15dc4270d6f 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -31,13 +31,6 @@ * */ -/* Disable sprintf warnings on Windows (it's fine to do that for test code). - Also, cases where sprintf is called are crash sites anyway. - TODO(jtattermusch): b/18636890 */ -#ifdef _MSC_VER -#define _CRT_SECURE_NO_WARNINGS -#endif - #include "test/core/end2end/cq_verifier.h" #include diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index eb26ff14f00..ab7c683e452 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -166,7 +166,8 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(2))); + f.server_cq, f.server_cq, + tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index fa5df5f5260..cb477144d3f 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -175,7 +175,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index ad01fe70813..0d4822ec91b 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -168,7 +168,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 6b60c4da651..fe3f05fa954 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -162,7 +162,8 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index 5878058c982..86ee405964b 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -169,7 +169,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index 7e7bec0160c..8e5b1014f54 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -166,7 +166,8 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 2c23f37e0c3..67b15770142 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -157,7 +157,8 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 99d1a263864..5c9109f9629 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -144,7 +144,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, &call_details, &request_metadata_recv, - f->server_cq, tag(101))); + f->server_cq, f->server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 0f046ae2d23..280bf98c167 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -150,7 +150,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 4e1d2cab0e6..718046170f0 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -44,6 +44,7 @@ #include #include #include +#include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" @@ -52,10 +53,12 @@ DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); DEFINE_int32(port, 0, "Server port."); +DEFINE_int32(server_threads, 4, "Number of server threads."); using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; +using grpc::ThreadPool; using grpc::testing::Payload; using grpc::testing::PayloadType; using grpc::testing::ServerStats; @@ -126,6 +129,10 @@ static void RunServer() { ServerBuilder builder; builder.AddPort(server_address); builder.RegisterService(&service); + + std::unique_ptr pool(new ThreadPool(FLAGS_server_threads)); + builder.SetThreadPool(pool.get()); + std::unique_ptr server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s\n", server_address); diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index 4abb412c95a..b45b9d6106e 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -5,7 +5,11 @@ set -ex # change to grpc repo root cd $(dirname $0)/../.. +make -j6 + root=`pwd` virtualenv python2.7_virtual_environment -python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0 protobuf==2.6.1 -python2.7_virtual_environment/bin/pip install src/python +ln -sf $root/include/grpc python2.7_virtual_environment/include/grpc +source python2.7_virtual_environment/bin/activate +pip install enum34==1.0.4 futures==2.2.0 protobuf==2.6.1 +CFLAGS=-I$root/include LDFLAGS=-L$root/libs/opt pip install src/python diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index 6e9405afb6f..7d3ee73a0e4 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -6,19 +6,21 @@ set -ex cd $(dirname $0)/../.. root=`pwd` +export LD_LIBRARY_PATH=$root/libs/opt +source python2.7_virtual_environment/bin/activate # TODO(issue 215): Properly itemize these in run_tests.py so that they can be parallelized. -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._blocking_invocation_inline_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._c_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._event_invocation_synchronous_event_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._future_invocation_asynchronous_event_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._links_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._lonely_rear_link_test -python2.7_virtual_environment/bin/python2.7 -B -m _adapter._low_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.base.packets.implementations_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.face.blocking_invocation_inline_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.face.event_invocation_synchronous_event_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.face.future_invocation_asynchronous_event_service_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.foundation._later_test -python2.7_virtual_environment/bin/python2.7 -B -m _framework.foundation._logging_pool_test +python2.7 -B -m _adapter._blocking_invocation_inline_service_test +python2.7 -B -m _adapter._c_test +python2.7 -B -m _adapter._event_invocation_synchronous_event_service_test +python2.7 -B -m _adapter._future_invocation_asynchronous_event_service_test +python2.7 -B -m _adapter._links_test +python2.7 -B -m _adapter._lonely_rear_link_test +python2.7 -B -m _adapter._low_test +python2.7 -B -m _framework.base.packets.implementations_test +python2.7 -B -m _framework.face.blocking_invocation_inline_service_test +python2.7 -B -m _framework.face.event_invocation_synchronous_event_service_test +python2.7 -B -m _framework.face.future_invocation_asynchronous_event_service_test +python2.7 -B -m _framework.foundation._later_test +python2.7 -B -m _framework.foundation._logging_pool_test # TODO(nathaniel): Get tests working under 3.4 (requires 3.X-friendly protobuf) # python3.4 -B -m unittest discover -s src/python -p '*.py' diff --git a/vsprojects/vs2013/global.props b/vsprojects/vs2013/global.props index 6a9050e3d2b..27efc13b247 100644 --- a/vsprojects/vs2013/global.props +++ b/vsprojects/vs2013/global.props @@ -1,12 +1,14 @@ - - - - - - - - $(SolutionDir)\..\..;$(SolutionDir)\..\..\include;$(SolutionDir)\..\..\third_party\zlib;$(SolutionDir)\..\third_party;$(SolutionDir)\..\..\third_party\openssl\inc32 - - - + + + + + + + + $(SolutionDir)\..\..;$(SolutionDir)\..\..\include;$(SolutionDir)\..\..\third_party\zlib;$(SolutionDir)\..\third_party;$(SolutionDir)\..\..\third_party\openssl\inc32 + _CRT_SECURE_NO_WARNINGS;_UNICODE;UNICODE;%(PreprocessorDefinitions) + EnableAllWarnings + + + \ No newline at end of file diff --git a/vsprojects/vs2013/gpr.vcxproj b/vsprojects/vs2013/gpr.vcxproj index 8276082cfda..e54029390da 100644 --- a/vsprojects/vs2013/gpr.vcxproj +++ b/vsprojects/vs2013/gpr.vcxproj @@ -83,6 +83,7 @@ + diff --git a/vsprojects/vs2013/gpr.vcxproj.filters b/vsprojects/vs2013/gpr.vcxproj.filters index 2329acc9dae..20e4e07c49d 100644 --- a/vsprojects/vs2013/gpr.vcxproj.filters +++ b/vsprojects/vs2013/gpr.vcxproj.filters @@ -129,6 +129,9 @@ include\grpc\support + + include\grpc\support + include\grpc\support diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index c6f2846e317..12428bd4b16 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -115,6 +115,7 @@ + @@ -130,9 +131,11 @@ + + @@ -253,10 +256,14 @@ + + + + @@ -275,12 +282,20 @@ + + + + + + + + diff --git a/vsprojects/vs2013/grpc.vcxproj.filters b/vsprojects/vs2013/grpc.vcxproj.filters index ce76dd8d2b9..fed8fb10bfb 100644 --- a/vsprojects/vs2013/grpc.vcxproj.filters +++ b/vsprojects/vs2013/grpc.vcxproj.filters @@ -112,12 +112,18 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -145,15 +151,27 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr + + src\core\iomgr + src\core\iomgr src\core\iomgr + + src\core\iomgr + + + src\core\iomgr + src\core\iomgr @@ -437,6 +455,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -482,6 +503,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -491,6 +515,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index c6f2846e317..12428bd4b16 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -115,6 +115,7 @@ + @@ -130,9 +131,11 @@ + + @@ -253,10 +256,14 @@ + + + + @@ -275,12 +282,20 @@ + + + + + + + + diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters index 0e942219d07..b0964b61b33 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters @@ -73,12 +73,18 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -106,15 +112,27 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr + + src\core\iomgr + src\core\iomgr src\core\iomgr + + src\core\iomgr + + + src\core\iomgr + src\core\iomgr @@ -362,6 +380,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -407,6 +428,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -416,6 +440,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr