From 5bf510bba1ef0b3889be22f027b2a54e6ab4f97c Mon Sep 17 00:00:00 2001 From: yang-g Date: Tue, 14 Jul 2015 10:54:29 -0700 Subject: [PATCH 01/20] add per_rpc_creds test case in interop test --- test/cpp/interop/client.cc | 7 +++++- test/cpp/interop/interop_client.cc | 35 ++++++++++++++++++++++++++++++ test/cpp/interop/interop_client.h | 3 +++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 1f1e6c13067..d0393fafb24 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -69,6 +69,7 @@ DEFINE_string(test_case, "large_unary", "compute_engine_creds: large_unary with compute engine auth; " "jwt_token_creds: large_unary with JWT token auth; " "oauth2_auth_token: raw oauth2 access token auth; " + "per_rpc_creds: raw oauth2 access token on a single rpc; " "all : all of above."); DEFINE_string(default_service_account, "", "Email of GCE default service account"); @@ -117,6 +118,9 @@ int main(int argc, char** argv) { } else if (FLAGS_test_case == "oauth2_auth_token") { grpc::string json_key = GetServiceAccountJsonKey(); client.DoOauth2AuthToken(json_key, FLAGS_oauth_scope); + } else if (FLAGS_test_case == "per_rpc_creds") { + grpc::string json_key = GetServiceAccountJsonKey(); + client.DoPerRpcCreds(json_key, FLAGS_oauth_scope); } else if (FLAGS_test_case == "all") { client.DoEmpty(); client.DoLargeUnary(); @@ -133,6 +137,7 @@ int main(int argc, char** argv) { client.DoServiceAccountCreds(json_key, FLAGS_oauth_scope); client.DoJwtTokenCreds(json_key); client.DoOauth2AuthToken(json_key, FLAGS_oauth_scope); + client.DoPerRpcCreds(json_key, FLAGS_oauth_scope); } // compute_engine_creds only runs in GCE. } else { @@ -142,7 +147,7 @@ int main(int argc, char** argv) { "large_unary|client_streaming|server_streaming|half_duplex|ping_pong|" "cancel_after_begin|cancel_after_first_response|" "timeout_on_sleeping_server|service_account_creds|compute_engine_creds|" - "jwt_token_creds|oauth2_auth_token", + "jwt_token_creds|oauth2_auth_token|per_rpc_creds", FLAGS_test_case.c_str()); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 30056e26ab2..92bc872f011 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -41,8 +41,10 @@ #include #include #include +#include #include #include +#include "test/cpp/interop/client_helper.h" #include "test/proto/test.grpc.pb.h" #include "test/proto/empty.grpc.pb.h" #include "test/proto/messages.grpc.pb.h" @@ -160,6 +162,39 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username, gpr_log(GPR_INFO, "Large unary with oauth2 access token done."); } +void InteropClient::DoPerRpcCreds(const grpc::string& username, + const grpc::string& oauth_scope) { + gpr_log(GPR_INFO, + "Sending a large unary rpc with per-rpc raw oauth2 access token ..."); + SimpleRequest request; + SimpleResponse response; + request.set_fill_username(true); + request.set_fill_oauth_scope(true); + std::unique_ptr stub(TestService::NewStub(channel_)); + + ClientContext context; + grpc::string access_token = GetOauth2AccessToken(); + std::shared_ptr creds = AccessTokenCredentials(access_token); + context.set_credentials(creds); + request.set_response_type(PayloadType::COMPRESSABLE); + request.set_response_size(kLargeResponseSize); + grpc::string payload(kLargeRequestSize, '\0'); + request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + + Status s = stub->UnaryCall(&context, request, &response); + + AssertOkOrPrintErrorStatus(s); + GPR_ASSERT(response.payload().type() == PayloadType::COMPRESSABLE); + GPR_ASSERT(response.payload().body() == + grpc::string(kLargeResponseSize, '\0')); + GPR_ASSERT(!response.username().empty()); + GPR_ASSERT(!response.oauth_scope().empty()); + GPR_ASSERT(username.find(response.username()) != grpc::string::npos); + const char* oauth_scope_str = response.oauth_scope().c_str(); + GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); + gpr_log(GPR_INFO, "Large unary with per-rpc oauth2 access token done."); +} + void InteropClient::DoJwtTokenCreds(const grpc::string& username) { gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ..."); SimpleRequest request; diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 67eecd9cccb..bf8188325e7 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -71,6 +71,9 @@ class InteropClient { // username is a string containing the user email void DoOauth2AuthToken(const grpc::string& username, const grpc::string& oauth_scope); + // username is a string containing the user email + void DoPerRpcCreds(const grpc::string& username, + const grpc::string& oauth_scope); private: void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); From 8c31ee2751ec973b1b1bdde4bd57ab65a515aaf9 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 15 Jul 2015 13:36:27 -0700 Subject: [PATCH 02/20] update according to spec change --- test/cpp/interop/interop_client.cc | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 92bc872f011..c77cbce5c41 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -165,7 +165,7 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username, void InteropClient::DoPerRpcCreds(const grpc::string& username, const grpc::string& oauth_scope) { gpr_log(GPR_INFO, - "Sending a large unary rpc with per-rpc raw oauth2 access token ..."); + "Sending a unary rpc with per-rpc raw oauth2 access token ..."); SimpleRequest request; SimpleResponse response; request.set_fill_username(true); @@ -176,23 +176,16 @@ void InteropClient::DoPerRpcCreds(const grpc::string& username, grpc::string access_token = GetOauth2AccessToken(); std::shared_ptr creds = AccessTokenCredentials(access_token); context.set_credentials(creds); - request.set_response_type(PayloadType::COMPRESSABLE); - request.set_response_size(kLargeResponseSize); - grpc::string payload(kLargeRequestSize, '\0'); - request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = stub->UnaryCall(&context, request, &response); AssertOkOrPrintErrorStatus(s); - GPR_ASSERT(response.payload().type() == PayloadType::COMPRESSABLE); - GPR_ASSERT(response.payload().body() == - grpc::string(kLargeResponseSize, '\0')); GPR_ASSERT(!response.username().empty()); GPR_ASSERT(!response.oauth_scope().empty()); GPR_ASSERT(username.find(response.username()) != grpc::string::npos); const char* oauth_scope_str = response.oauth_scope().c_str(); GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); - gpr_log(GPR_INFO, "Large unary with per-rpc oauth2 access token done."); + gpr_log(GPR_INFO, "Unary with per-rpc oauth2 access token done."); } void InteropClient::DoJwtTokenCreds(const grpc::string& username) { From 9f85fa004353de3ae85ee3d22505b136da200dd7 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 12:35:25 -0700 Subject: [PATCH 03/20] Only validate metadata from the client that we know should exist - it's allowed that other metadata may be picked up when sending a request --- src/python/src/grpc/_adapter/_low_test.py | 5 ++++- src/python/src/grpc/_links/_transmission_test.py | 9 +++++++-- .../src/grpc/framework/interfaces/links/test_cases.py | 11 ++++++----- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 268e5fe765b..a49cd007bfa 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -129,7 +129,10 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) self.assertEquals(1, len(request_event.results)) - self.assertEquals(dict(client_initial_metadata), dict(request_event.results[0].initial_metadata)) + got_initial_metadata = dict(request_event.results[0].initial_metadata) + self.assertEquals( + dict(client_initial_metadata), + dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) self.assertEquals(METHOD, request_event.call_details.method) self.assertEquals(HOST, request_event.call_details.host) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) diff --git a/src/python/src/grpc/_links/_transmission_test.py b/src/python/src/grpc/_links/_transmission_test.py index c5ef1edb253..d551a1fd5fc 100644 --- a/src/python/src/grpc/_links/_transmission_test.py +++ b/src/python/src/grpc/_links/_transmission_test.py @@ -93,8 +93,13 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): def create_service_completion(self): return _intermediary_low.Code.OK, 'An exuberant test "details" message!' - def assertMetadataEqual(self, original_metadata, transmitted_metadata): - self.assertSequenceEqual(original_metadata, transmitted_metadata) + def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): + # we need to filter out any additional metadata added in transmitted_metadata + # since implementations are allowed to add to what is sent (in any position) + keys, _ = zip(*original_metadata) + self.assertSequenceEqual( + original_metadata, + (x for x in transmitted_metadata if x[0] in keys)) class RoundTripTest(unittest.TestCase): diff --git a/src/python/src/grpc/framework/interfaces/links/test_cases.py b/src/python/src/grpc/framework/interfaces/links/test_cases.py index 3ac212ebdfb..bf1f09d99de 100644 --- a/src/python/src/grpc/framework/interfaces/links/test_cases.py +++ b/src/python/src/grpc/framework/interfaces/links/test_cases.py @@ -161,8 +161,8 @@ class TransmissionTest(object): raise NotImplementedError() @abc.abstractmethod - def assertMetadataEqual(self, original_metadata, transmitted_metadata): - """Asserts that two metadata objects are equal. + def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): + """Asserts that transmitted_metadata contains original_metadata. Args: original_metadata: A metadata object used in this test. @@ -170,7 +170,8 @@ class TransmissionTest(object): through the system under test. Raises: - AssertionError: if the two metadata objects are not equal. + AssertionError: if the transmitted_metadata object does not contain + original_metadata. """ raise NotImplementedError() @@ -239,7 +240,7 @@ class TransmissionTest(object): self.assertFalse(initial_metadata_seen) self.assertFalse(seen_payloads) self.assertFalse(terminal_metadata_seen) - self.assertMetadataEqual(initial_metadata, ticket.initial_metadata) + self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata) initial_metadata_seen = True if ticket.payload is not None: @@ -248,7 +249,7 @@ class TransmissionTest(object): if ticket.terminal_metadata is not None: self.assertFalse(terminal_metadata_seen) - self.assertMetadataEqual(terminal_metadata, ticket.terminal_metadata) + self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata) terminal_metadata_seen = True self.assertSequenceEqual(payloads, seen_payloads) From 83ac705fa72c09544f8784bcfc839b1e4ffd422b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 11:53:13 -0700 Subject: [PATCH 04/20] Only check metadata that we want: its allowed to have extra elements --- .../src/grpc/_links/_transmission_test.py | 4 ++-- src/ruby/spec/generic/rpc_server_spec.rb | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/python/src/grpc/_links/_transmission_test.py b/src/python/src/grpc/_links/_transmission_test.py index d551a1fd5fc..3eeec03f467 100644 --- a/src/python/src/grpc/_links/_transmission_test.py +++ b/src/python/src/grpc/_links/_transmission_test.py @@ -98,8 +98,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): # since implementations are allowed to add to what is sent (in any position) keys, _ = zip(*original_metadata) self.assertSequenceEqual( - original_metadata, - (x for x in transmitted_metadata if x[0] in keys)) + original_metadata, + [x for x in transmitted_metadata if x[0] in keys]) class RoundTripTest(unittest.TestCase): diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index f2403de77c4..0326f6e894b 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -35,6 +35,14 @@ def load_test_certs files.map { |f| File.open(File.join(test_root, f)).read } end +def check_md(wanted_md, received_md) + wanted_md.zip(received_md).each do |w, r| + w.each do |key, value| + expect(r[key]).to eq(value) + end + end +end + # A test message class EchoMsg def self.marshal(_o) @@ -376,7 +384,7 @@ describe GRPC::RpcServer do stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] - expect(service.received_md).to eq(wanted_md) + check_md(wanted_md, service.received_md) @srv.stop t.join end @@ -391,7 +399,7 @@ describe GRPC::RpcServer do deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] - expect(service.received_md).to eq(wanted_md) + check_md(wanted_md, service.received_md) @srv.stop t.join end @@ -443,7 +451,7 @@ describe GRPC::RpcServer do expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', 'jwt_aud_uri' => "https://#{@host}/EchoService" }] - expect(service.received_md).to eq(wanted_md) + check_md(wanted_md, service.received_md) @srv.stop t.join end @@ -535,7 +543,9 @@ describe GRPC::RpcServer do 'method' => '/EchoService/an_rpc', 'connect_k1' => 'connect_v1' } - expect(op.metadata).to eq(wanted_md) + wanted_md.each do |key, value| + expect(op.metadata[key]).to eq(value) + end @srv.stop t.join end From 352f38c1127f7785c9f4a1f4e1365fdcd185842e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 14:01:54 -0700 Subject: [PATCH 05/20] Remove metadata checks - these are testing things that arent true --- src/php/ext/grpc/server.c | 1 + src/php/tests/unit_tests/EndToEndTest.php | 2 -- src/php/tests/unit_tests/SecureEndToEndTest.php | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index c319526e420..8b8d5b2f476 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -64,6 +64,7 @@ void free_wrapped_grpc_server(void *object TSRMLS_DC) { wrapped_grpc_server *server = (wrapped_grpc_server *)object; if (server->wrapped != NULL) { grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL); + grpc_server_cancel_all_calls(server->wrapped); grpc_completion_queue_pluck(completion_queue, NULL, gpr_inf_future(GPR_CLOCK_REALTIME)); grpc_server_destroy(server->wrapped); diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 296873fa8f7..2980dca4a75 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -61,7 +61,6 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); - $this->assertSame([], $event->metadata); $server_call = $event->call; $event = $server_call->startBatch([ @@ -83,7 +82,6 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ Grpc\OP_RECV_STATUS_ON_CLIENT => true ]); - $this->assertSame([], $event->metadata); $status = $event->status; $this->assertSame([], $status->metadata); $this->assertSame(Grpc\STATUS_OK, $status->code); diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index 0c18cd3e91a..f91c006da5e 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -73,7 +73,6 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $event = $this->server->requestCall(); $this->assertSame('dummy_method', $event->method); - $this->assertSame([], $event->metadata); $server_call = $event->call; $event = $server_call->startBatch([ From 8bf2dcab4e73a3cfb0138f1c6e2a9db7e1f7edef Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Jul 2015 13:08:41 -0700 Subject: [PATCH 06/20] Make tests a little more robust --- test/cpp/end2end/async_end2end_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 117d8bb9fa4..b95bdf6b9b4 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -415,7 +415,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); - EXPECT_EQ(static_cast(2), client_initial_metadata.size()); + EXPECT_GE(client_initial_metadata.size(), static_cast(2)); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); @@ -563,7 +563,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); - EXPECT_EQ(static_cast(2), client_initial_metadata.size()); + EXPECT_GE(client_initial_metadata.size(), static_cast(2)); srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); @@ -574,7 +574,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); - EXPECT_EQ(static_cast(2), server_initial_metadata.size()); + EXPECT_GE(server_initial_metadata.size(), static_cast(2)); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); @@ -590,7 +590,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second); EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); - EXPECT_EQ(static_cast(2), server_trailing_metadata.size()); + EXPECT_GE(server_trailing_metadata.size(), static_cast(2)); } } // namespace } // namespace testing From 0dc5e6cf163f1451dfb1677ce61966be3ab1f062 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Jul 2015 10:07:53 -0700 Subject: [PATCH 07/20] User agent string support --- include/grpc++/channel_arguments.h | 12 +++-- include/grpc/grpc.h | 6 +++ include/grpc/support/port_platform.h | 12 +++++ src/core/channel/http_client_filter.c | 61 +++++++++++++++++++++++ src/core/transport/chttp2/parsing.c | 2 +- src/cpp/client/channel_arguments.cc | 38 ++++++++++++++ src/cpp/client/create_channel.cc | 9 +++- test/core/end2end/dualstack_socket_test.c | 4 ++ test/cpp/end2end/end2end_test.cc | 7 +-- test/cpp/qps/qps_test.cc | 10 ++-- 10 files changed, 147 insertions(+), 14 deletions(-) diff --git a/include/grpc++/channel_arguments.h b/include/grpc++/channel_arguments.h index 68f24cde4af..c7e8d24b65b 100644 --- a/include/grpc++/channel_arguments.h +++ b/include/grpc++/channel_arguments.h @@ -54,6 +54,14 @@ class ChannelArguments { ChannelArguments() {} ~ChannelArguments() {} + ChannelArguments(const ChannelArguments& other); + ChannelArguments& operator=(ChannelArguments other) { + Swap(other); + return *this; + } + + void Swap(ChannelArguments& other); + // grpc specific channel argument setters // Set target name override for SSL host name checking. void SetSslTargetNameOverride(const grpc::string& name); @@ -73,10 +81,6 @@ class ChannelArguments { friend class SecureCredentials; friend class testing::ChannelArgumentsTest; - // TODO(yangg) implement copy and assign - ChannelArguments(const ChannelArguments&); - ChannelArguments& operator=(const ChannelArguments&); - // Returns empty string when it is not set. grpc::string GetSslTargetNameOverride() const; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3c72c1db27a..35a2c37ca9b 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -117,6 +117,12 @@ typedef struct { /* Initial sequence number for http2 transports */ #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ "grpc.http2.initial_sequence_number" +/** Primary user agent: goes at the start of the user-agent metadata + sent on each request */ +#define GRPC_ARG_PRIMARY_USER_AGENT_STRING "grpc.primary_user_agent" +/** Secondary user agent: goes at the end of the user-agent metadata + sent on each request */ +#define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent" /** Connectivity state of a channel. */ typedef enum { diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index a5d1b627028..57fed18cf68 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -71,6 +71,7 @@ #if !defined(GPR_NO_AUTODETECT_PLATFORM) #if defined(_WIN64) || defined(WIN64) +#define GPR_PLATFORM_STRING "windows" #define GPR_WIN32 1 #define GPR_ARCH_64 1 #define GPR_GETPID_IN_PROCESS_H 1 @@ -84,6 +85,7 @@ #endif #define GPR_WINDOWS_CRASH_HANDLER 1 #elif defined(_WIN32) || defined(WIN32) +#define GPR_PLATFORM_STRING "windows" #define GPR_ARCH_32 1 #define GPR_WIN32 1 #define GPR_GETPID_IN_PROCESS_H 1 @@ -97,6 +99,7 @@ #endif #define GPR_WINDOWS_CRASH_HANDLER 1 #elif defined(ANDROID) || defined(__ANDROID__) +#define GPR_PLATFORM_STRING "android" #define GPR_ANDROID 1 #define GPR_ARCH_32 1 #define GPR_CPU_LINUX 1 @@ -117,6 +120,7 @@ #define GPR_GETPID_IN_UNISTD_H 1 #define GPR_HAVE_MSG_NOSIGNAL 1 #elif defined(__linux__) +#define GPR_PLATFORM_STRING "linux" #ifndef _BSD_SOURCE #define _BSD_SOURCE #endif @@ -173,9 +177,11 @@ #define _BSD_SOURCE #endif #if TARGET_OS_IPHONE +#define GPR_PLATFORM_STRING "ios" #define GPR_CPU_IPHONE 1 #define GPR_PTHREAD_TLS 1 #else /* TARGET_OS_IPHONE */ +#define GPR_PLATFORM_STRING "osx" #define GPR_CPU_POSIX 1 #define GPR_GCC_TLS 1 #endif @@ -201,6 +207,7 @@ #define GPR_ARCH_32 1 #endif /* _LP64 */ #elif defined(__FreeBSD__) +#define GPR_PLATFORM_STRING "freebsd" #ifndef _BSD_SOURCE #define _BSD_SOURCE #endif @@ -232,6 +239,11 @@ #endif #endif /* GPR_NO_AUTODETECT_PLATFORM */ +#ifndef GPR_PLATFORM_STRING +#warning "GPR_PLATFORM_STRING not auto-detected" +#define GPR_PLATFORM_STRING "unknown" +#endif + /* For a common case, assume that the platform has a C99-like stdint.h */ #include diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 63e4912397c..bc821b16fa2 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -32,13 +32,17 @@ #include "src/core/channel/http_client_filter.h" #include +#include #include +#include +#include "src/core/support/string.h" typedef struct call_data { grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; + grpc_linked_mdelem user_agent; int sent_initial_metadata; int got_initial_metadata; @@ -58,6 +62,8 @@ typedef struct channel_data { grpc_mdelem *scheme; grpc_mdelem *content_type; grpc_mdelem *status; + /** complete user agent mdelem */ + grpc_mdelem *user_agent; } channel_data; /* used to silence 'variable not used' warnings */ @@ -115,6 +121,8 @@ static void hc_mutate_op(grpc_call_element *elem, GRPC_MDELEM_REF(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF(channeld->content_type)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent, + GRPC_MDELEM_REF(channeld->user_agent)); break; } } @@ -169,6 +177,55 @@ static const char *scheme_from_args(const grpc_channel_args *args) { return "http"; } +static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, + const grpc_channel_args *args) { + gpr_strvec v; + size_t i; + int is_first = 1; + char *tmp; + grpc_mdstr *result; + + gpr_strvec_init(&v); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_PRIMARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", + grpc_version_string(), GPR_PLATFORM_STRING); + is_first = 0; + gpr_strvec_add(&v, tmp); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_SECONDARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + tmp = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + result = grpc_mdstr_from_string(mdctx, tmp); + gpr_free(tmp); + + return result; +} + /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, @@ -189,6 +246,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); + channeld->user_agent = grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_from_string(mdctx, "user-agent"), + user_agent_from_args(mdctx, args)); } /* Destructor for channel data */ @@ -201,6 +261,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { GRPC_MDELEM_UNREF(channeld->scheme); GRPC_MDELEM_UNREF(channeld->content_type); GRPC_MDELEM_UNREF(channeld->status); + GRPC_MDELEM_UNREF(channeld->user_agent); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index aa32f2e44a1..904b9afce7e 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -588,7 +588,7 @@ static void on_header(void *tp, grpc_mdelem *md) { GPR_ASSERT(stream_parsing); GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, + GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc index b271650673c..22abe407a76 100644 --- a/src/cpp/client/channel_arguments.cc +++ b/src/cpp/client/channel_arguments.cc @@ -33,10 +33,48 @@ #include +#include + #include "src/core/channel/channel_args.h" namespace grpc { +ChannelArguments::ChannelArguments(const ChannelArguments& other) + : strings_(other.strings_) { + args_.reserve(other.args_.size()); + auto list_it_dst = strings_.begin(); + auto list_it_src = other.strings_.begin(); + for (auto a = other.args_.begin(); a != other.args_.end(); ++a) { + grpc_arg ap; + ap.type = a->type; + GPR_ASSERT(list_it_src->c_str() == a->key); + ap.key = const_cast(list_it_dst->c_str()); + ++list_it_src; + ++list_it_dst; + switch (a->type) { + case GRPC_ARG_INTEGER: + ap.value.integer = a->value.integer; + break; + case GRPC_ARG_STRING: + GPR_ASSERT(list_it_src->c_str() == a->value.string); + ap.value.string = const_cast(list_it_dst->c_str()); + ++list_it_src; + ++list_it_dst; + break; + case GRPC_ARG_POINTER: + ap.value.pointer = a->value.pointer; + ap.value.pointer.p = a->value.pointer.copy(ap.value.pointer.p); + break; + } + args_.push_back(ap); + } +} + +void ChannelArguments::Swap(ChannelArguments& other) { + args_.swap(other.args_); + strings_.swap(other.strings_); +} + void ChannelArguments::SetCompressionLevel(grpc_compression_level level) { SetInt(GRPC_COMPRESSION_LEVEL_ARG, level); } diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index 510af2bb00a..38eeda0dc0e 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -32,9 +32,11 @@ */ #include +#include #include "src/cpp/client/channel.h" #include +#include #include namespace grpc { @@ -43,7 +45,12 @@ class ChannelArguments; std::shared_ptr CreateChannel( const grpc::string& target, const std::shared_ptr& creds, const ChannelArguments& args) { - return creds ? creds->CreateChannel(target, args) + ChannelArguments cp_args = args; + std::ostringstream user_agent_prefix; + user_agent_prefix << "grpc-c++/" << grpc_version_string(); + cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING, + user_agent_prefix.str()); + return creds ? creds->CreateChannel(target, cp_args) : std::shared_ptr( new Channel(target, grpc_lame_client_channel_create())); } diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 7d3568c22e1..05ad42ca1f3 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -211,6 +211,10 @@ void test_connect(const char *server_host, const char *client_host, int port, drain_cq(cq); grpc_completion_queue_destroy(cq); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); gpr_free(details); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 89d9d635af3..00088cfb15b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -249,9 +249,10 @@ class End2endTest : public ::testing::Test { void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } void ResetStub() { - std::shared_ptr channel = - CreateChannel(server_address_.str(), FakeTransportSecurityCredentials(), - ChannelArguments()); + ChannelArguments args; + args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); + std::shared_ptr channel = CreateChannel( + server_address_.str(), FakeTransportSecurityCredentials(), args); stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index 07b4834cc00..7b93443f7ce 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -44,8 +44,8 @@ namespace grpc { namespace testing { -static const int WARMUP = 5; -static const int BENCHMARK = 10; +static const int WARMUP = 20; +static const int BENCHMARK = 40; static void RunQPS() { gpr_log(GPR_INFO, "Running QPS test"); @@ -53,8 +53,8 @@ static void RunQPS() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); client_config.set_enable_ssl(false); - client_config.set_outstanding_rpcs_per_channel(1000); - client_config.set_client_channels(8); + client_config.set_outstanding_rpcs_per_channel(10); + client_config.set_client_channels(800); client_config.set_payload_size(1); client_config.set_async_client_threads(8); client_config.set_rpc_type(UNARY); @@ -62,7 +62,7 @@ static void RunQPS() { ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); server_config.set_enable_ssl(false); - server_config.set_threads(4); + server_config.set_threads(8); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); From 7156fca1923281e48bc21c754ba51845cf84f2d3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 13:54:20 -0700 Subject: [PATCH 08/20] Extend tracer to server shutdown --- src/core/surface/call.h | 7 +++++++ src/core/surface/call_log_batch.c | 8 ++++++++ src/core/surface/server.c | 2 ++ 3 files changed, 17 insertions(+) diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 3b6f9c942eb..265638d5196 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -134,6 +134,10 @@ void grpc_server_log_request_call(char *file, int line, grpc_completion_queue *cq_for_notification, void *tag); +void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity, + grpc_server *server, grpc_completion_queue *cq, + void *tag); + /* Set a context pointer. No thread safety guarantees are made wrt this value. */ void grpc_call_context_set(grpc_call *call, grpc_context_index elem, @@ -151,6 +155,9 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); grpc_server_log_request_call(sev, server, call, details, initial_metadata, \ cq_bound_to_call, cq_for_notifications, tag) +#define GRPC_SERVER_LOG_SHUTDOWN(sev, server, cq, tag) \ + if (grpc_trace_batch) grpc_server_log_shutdown(sev, server, cq, tag) + gpr_uint8 grpc_call_is_client(grpc_call *call); #endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c index 997046d954e..7bf8cafc241 100644 --- a/src/core/surface/call_log_batch.c +++ b/src/core/surface/call_log_batch.c @@ -136,3 +136,11 @@ void grpc_server_log_request_call(char *file, int line, "tag=%p)", server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); } + +void grpc_server_log_shutdown(char *file, int line, gpr_log_severity severity, + grpc_server *server, grpc_completion_queue *cq, + void *tag) { + gpr_log(file, line, severity, + "grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", server, + cq, tag); +} diff --git a/src/core/surface/server.c b/src/core/surface/server.c index fa120088e1f..e87d80aabd5 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -980,6 +980,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, channel_broadcaster broadcaster; request_killer reqkill; + GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); + /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq); From 6a7626c98fbd2c013553a91f8ab7bf01afeedd3e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 19 Jul 2015 22:21:41 -0700 Subject: [PATCH 09/20] Move alarm subsystem to monotonic time --- include/grpc/support/time.h | 3 +++ src/core/client_config/subchannel.c | 4 ++-- src/core/iomgr/alarm.c | 8 ++++++- src/core/iomgr/iomgr.c | 10 ++++---- src/core/iomgr/pollset_posix.c | 4 ++-- src/core/iomgr/tcp_client_posix.c | 4 ++-- src/core/support/sync_posix.c | 3 ++- src/core/support/time.c | 27 ++++++++++++++++++++++ src/core/support/time_posix.c | 2 +- src/core/support/time_win32.c | 2 +- src/core/surface/call.c | 6 ++--- src/core/surface/completion_queue.c | 4 ++++ src/core/transport/chttp2/stream_encoder.c | 9 ++++---- src/core/transport/transport_op_string.c | 2 +- test/core/iomgr/alarm_test.c | 9 ++++---- test/core/iomgr/endpoint_tests.c | 6 ++--- test/core/iomgr/fd_posix_test.c | 8 +++---- test/core/iomgr/tcp_client_posix_test.c | 6 ++--- test/core/iomgr/tcp_server_posix_test.c | 2 +- test/core/util/test_config.h | 4 ++-- 20 files changed, 83 insertions(+), 40 deletions(-) diff --git a/include/grpc/support/time.h b/include/grpc/support/time.h index 3f375f6ecd5..be59c37956b 100644 --- a/include/grpc/support/time.h +++ b/include/grpc/support/time.h @@ -83,6 +83,9 @@ void gpr_time_init(void); /* Return the current time measured from the given clocks epoch. */ gpr_timespec gpr_now(gpr_clock_type clock); +/* Convert a timespec from one clock to another */ +gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock); + /* Return -ve, 0, or +ve according to whether a < b, a == b, or a > b respectively. */ int gpr_time_cmp(gpr_timespec a, gpr_timespec b); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 35f172683a4..487f5afb35d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -300,7 +300,7 @@ static void continue_connect(grpc_subchannel *c) { } static void start_connect(grpc_subchannel *c) { - gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); c->next_attempt = now; c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN); @@ -585,7 +585,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { c->have_alarm = 1; c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_MONOTONIC)); gpr_mu_unlock(&c->mu); } } diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 5b9a37e5ddc..931f746f756 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -36,6 +36,7 @@ #include "src/core/iomgr/alarm_heap.h" #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/time_averaged_stats.h" +#include #include #include @@ -67,6 +68,7 @@ typedef struct { static gpr_mu g_mu; /* Allow only one run_some_expired_alarms at once */ static gpr_mu g_checker_mu; +static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; @@ -85,6 +87,7 @@ void grpc_alarm_list_init(gpr_timespec now) { gpr_mu_init(&g_mu); gpr_mu_init(&g_checker_mu); + g_clock_type = now.clock_type; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -102,7 +105,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown(void) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, + while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0)) ; for (i = 0; i < NUM_SHARDS; i++) { @@ -175,6 +178,8 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, gpr_timespec now) { int is_first_alarm = 0; shard_type *shard = &g_shards[shard_idx(alarm)]; + GPR_ASSERT(deadline.clock_type == g_clock_type); + GPR_ASSERT(now.clock_type == g_clock_type); alarm->cb = alarm_cb; alarm->cb_arg = alarm_cb_arg; alarm->deadline = deadline; @@ -355,6 +360,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, } int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { + GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms(drop_mu, now, next, 1); } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 0244f689b1a..a18c176b305 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object; static void background_callback_executor(void *ignored) { gpr_mu_lock(&g_mu); while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); gpr_timespec short_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN)); if (g_cbs_head) { grpc_iomgr_closure *closure = g_cbs_head; g_cbs_head = closure->next; @@ -67,7 +67,7 @@ static void background_callback_executor(void *ignored) { gpr_mu_unlock(&g_mu); closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME), + } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC), &deadline)) { } else { gpr_mu_unlock(&g_mu); @@ -90,7 +90,7 @@ void grpc_iomgr_init(void) { gpr_thd_id id; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); @@ -145,7 +145,7 @@ void grpc_iomgr_shutdown(void) { } while (g_cbs_head); continue; } - if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_REALTIME), NULL)) { + if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { gpr_log(GPR_DEBUG, "got late alarm"); continue; } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index efb301d81cb..c8646af615c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -136,7 +136,7 @@ static void finish_shutdown(grpc_pollset *pollset) { int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { /* pollset->mu already held */ - gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(now, deadline) > 0) { return 0; } @@ -205,7 +205,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; static const int max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { return -1; } if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index dc0489e64f4..1b73e5e1c9b 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -253,8 +253,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb_arg = ac; gpr_mu_lock(&ac->mu); - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, - gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 41af8ceb0a5..61572b9a8ea 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -63,10 +63,11 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); } int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int err = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) { err = pthread_cond_wait(cv, mu); } else { struct timespec abs_deadline_ts; + abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_REALTIME); abs_deadline_ts.tv_sec = abs_deadline.tv_sec; abs_deadline_ts.tv_nsec = abs_deadline.tv_nsec; err = pthread_cond_timedwait(cv, mu, &abs_deadline_ts); diff --git a/src/core/support/time.c b/src/core/support/time.c index 570f195bd11..b523ae01cce 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -290,3 +290,30 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) { double gpr_timespec_to_micros(gpr_timespec t) { return (double)t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3; } + +gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type) { + if (t.clock_type == clock_type) { + return t; + } + + if (t.tv_nsec == 0) { + if (t.tv_sec == TYPE_MAX(time_t)) { + t.clock_type = clock_type; + return t; + } + if (t.tv_sec == TYPE_MIN(time_t)) { + t.clock_type = clock_type; + return t; + } + } + + if (clock_type == GPR_TIMESPAN) { + return gpr_time_sub(t, gpr_now(t.clock_type)); + } + + if (t.clock_type == GPR_TIMESPAN) { + return gpr_time_add(gpr_now(clock_type), t); + } + + return gpr_time_add(gpr_now(clock_type), gpr_time_sub(t, gpr_now(t.clock_type))); +} diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 258b2e640e9..841485c4b4a 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -120,7 +120,7 @@ void gpr_sleep_until(gpr_timespec until) { for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be * slightly less portable. */ - now = gpr_now(GPR_CLOCK_REALTIME); + now = gpr_now(until.clock_type); if (gpr_time_cmp(until, now) <= 0) { return; } diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index 238cd07ebc7..7f64c80e276 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -80,7 +80,7 @@ void gpr_sleep_until(gpr_timespec until) { for (;;) { /* We could simplify by using clock_nanosleep instead, but it might be * slightly less portable. */ - now = gpr_now(GPR_CLOCK_REALTIME); + now = gpr_now(until.clock_type); if (gpr_time_cmp(until, now) <= 0) { return; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6e643b591cf..e08273e451c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -348,7 +348,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); - if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { + if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { set_deadline_alarm(call, send_deadline); } return call; @@ -1278,8 +1278,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, - gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call, + gpr_now(GPR_CLOCK_MONOTONIC)); } /* we offset status by a small amount when storing it into transport metadata diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 84844182477..3f60b0b0ba3 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -148,6 +148,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { grpc_event ret; + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { @@ -188,6 +190,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index d30059abf8c..65b31a5afdf 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -438,7 +438,7 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem *mdelem; grpc_chttp2_encode_timeout( - gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)), timeout_str); + gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str); mdelem = grpc_mdelem_from_metadata_strings( c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str), grpc_mdstr_from_string(c->mdctx, timeout_str)); @@ -560,6 +560,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, grpc_mdctx *mdctx = compressor->mdctx; grpc_linked_mdelem *l; int need_unref = 0; + gpr_timespec deadline; GPR_ASSERT(stream_id != 0); @@ -589,9 +590,9 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, l->md = hpack_enc(compressor, l->md, &st); need_unref |= l->md != NULL; } - if (gpr_time_cmp(op->data.metadata.deadline, - gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { - deadline_enc(compressor, op->data.metadata.deadline, &st); + deadline = op->data.metadata.deadline; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) { + deadline_enc(compressor, deadline, &st); } curop++; break; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 9d127c5472d..10d796fc158 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } - if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { + if (gpr_time_cmp(md.deadline, gpr_inf_future(md.deadline.clock_type)) != 0) { char *tmp; gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, md.deadline.tv_nsec); diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index 362eb5fe634..55aa517529c 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -100,7 +101,7 @@ static void test_grpc_alarm(void) { alarm_arg arg2; void *fdone; - grpc_iomgr_init(); + grpc_init(); arg.counter = 0; arg.success = SUCCESS_NOT_SET; @@ -113,7 +114,7 @@ static void test_grpc_alarm(void) { gpr_event_init(&arg.fcb_arg); grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg, - gpr_now(GPR_CLOCK_REALTIME)); + gpr_now(GPR_CLOCK_MONOTONIC)); alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); gpr_mu_lock(&arg.mu); @@ -165,7 +166,7 @@ static void test_grpc_alarm(void) { gpr_event_init(&arg2.fcb_arg); grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), - alarm_cb, &arg2, gpr_now(GPR_CLOCK_REALTIME)); + alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_alarm_cancel(&alarm_to_cancel); alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); @@ -214,7 +215,7 @@ static void test_grpc_alarm(void) { gpr_mu_destroy(&arg2.mu); gpr_free(arg2.followup_closure); - grpc_iomgr_shutdown(); + grpc_shutdown(); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 0cfba5fac81..cb6adc58cff 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -254,7 +254,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { - GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); grpc_pollset_work(g_pollset, deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); @@ -350,14 +350,14 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!write_st.done) { - GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); grpc_pollset_work(g_pollset, deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); grpc_endpoint_destroy(write_st.ep); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!read_st.done) { - GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); grpc_pollset_work(g_pollset, deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index cd268661dbc..7d00c098cce 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -249,7 +249,7 @@ static int server_start(server *sv) { static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { - grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -356,7 +356,7 @@ static void client_start(client *cl, int port) { static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { - grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -445,7 +445,7 @@ static void test_grpc_fd_change(void) { /* And now wait for it to run. */ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { - grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -463,7 +463,7 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { - grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 637886a7383..38b7b5909d4 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -196,13 +196,13 @@ void test_times_out(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(2, GPR_TIMESPAN)), - gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_now(connect_deadline.clock_type)) > 0) { int is_after_deadline = - gpr_time_cmp(connect_deadline, gpr_now(GPR_CLOCK_REALTIME)) <= 0; + gpr_time_cmp(connect_deadline, gpr_now(GPR_CLOCK_MONOTONIC)) <= 0; if (is_after_deadline && gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(1, GPR_TIMESPAN)), - gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_now(GPR_CLOCK_MONOTONIC)) > 0) { /* allow some slack before insisting that things be done */ } else { GPR_ASSERT(g_connections_complete == diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 83252a889bb..f8d0fe82178 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -135,7 +135,7 @@ static void test_connect(int n) { gpr_log(GPR_DEBUG, "wait"); while (g_nconnects == nconnects_before && - gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_work(&g_pollset, deadline); } gpr_log(GPR_DEBUG, "wait done"); diff --git a/test/core/util/test_config.h b/test/core/util/test_config.h index 063c797ce96..7028ade7b21 100644 --- a/test/core/util/test_config.h +++ b/test/core/util/test_config.h @@ -52,12 +52,12 @@ extern "C" { (GRPC_TEST_SLOWDOWN_BUILD_FACTOR * GRPC_TEST_SLOWDOWN_MACHINE_FACTOR) #define GRPC_TIMEOUT_SECONDS_TO_DEADLINE(x) \ - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), \ gpr_time_from_micros(GRPC_TEST_SLOWDOWN_FACTOR * 1e6 * (x), \ GPR_TIMESPAN)) #define GRPC_TIMEOUT_MILLIS_TO_DEADLINE(x) \ - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), \ + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), \ gpr_time_from_micros(GRPC_TEST_SLOWDOWN_FACTOR * 1e3 * (x), \ GPR_TIMESPAN)) From 56488eaf9cf67407ddfe191223055be9970fd4dc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 19 Jul 2015 22:50:18 -0700 Subject: [PATCH 10/20] Fix Windows CV --- src/core/support/sync_win32.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index 63196d10d3c..54f84a46ac6 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -83,10 +83,10 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int timeout = 0; DWORD timeout_max_ms; mu->locked = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) { SleepConditionVariableCS(cv, &mu->cs, INFINITE); } else { - gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec now = gpr_now(abs_deadline.clock_type); gpr_int64 now_ms = now.tv_sec * 1000 + now.tv_nsec / 1000000; gpr_int64 deadline_ms = abs_deadline.tv_sec * 1000 + abs_deadline.tv_nsec / 1000000; From 4a6f9e08e9592c4bb07eef0e41d70b86a5dc1abc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Jul 2015 10:16:18 -0700 Subject: [PATCH 11/20] Update Windows to use the monotonic clock for alarms also --- src/core/iomgr/pollset_windows.c | 2 +- src/core/iomgr/tcp_client_windows.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 24226cc9801..a9c4739c7c8 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { gpr_timespec now; - now = gpr_now(GPR_CLOCK_REALTIME); + now = gpr_now(GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(now, deadline) > 0) { return 0 /* GPR_FALSE */; } diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 16741452b91..39fd43130b0 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -216,7 +216,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, - gpr_now(GPR_CLOCK_REALTIME)); + gpr_now(GPR_CLOCK_MONOTONIC)); socket->write_info.outstanding = 1; grpc_socket_notify_on_write(socket, on_connect, ac); return; From 21ec6c30b130ef5278eb78dcee2524db7af816a9 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 20 Jul 2015 17:59:25 -0700 Subject: [PATCH 12/20] Added oauth2_auth_token and per_rpc_creds Node interop tests --- src/node/interop/interop_client.js | 56 +++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index b61b0b63c02..0aadd862e58 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -318,6 +318,58 @@ function authTest(expected_user, scope, client, done) { }); } +function oauth2Test(expected_user, scope, per_rpc, client, done) { + (new GoogleAuth()).getApplicationDefault(function(err, credential) { + assert.ifError(err); + var arg = { + response_type: 'COMPRESSABLE', + response_size: 314159, + payload: { + body: zeroBuffer(271828) + }, + fill_username: true, + fill_oauth_scope: true + }; + credential = credential.createScoped(scope); + credential.getAccessToken(function(err, token) { + assert.ifError(err); + var updateMetadata = function(authURI, metadata, callback) { + metadata = _.clone(metadata); + if (metadata.Authorization) { + metadata.Authorization = _.clone(metadata.Authorization); + } else { + metadata.Authorization = []; + } + metadata.Authorization.push('Bearer ' + token); + callback(null, metadata); + }; + var makeTestCall = function(error, client_metadata) { + assert.ifError(error); + var call = client.unaryCall(arg, function(err, resp) { + assert.ifError(err); + assert.strictEqual(resp.payload.type, 'COMPRESSABLE'); + assert.strictEqual(resp.payload.body.length, 314159); + assert.strictEqual(resp.username, expected_user); + assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE); + }); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + if (done) { + done(); + } + }); + }; + if (per_rpc) { + updateMetadata('', {}, makeTestCall); + } else { + client.updateMetadata = updateMetadata; + makeTestCall(null, {}); + } + + }); + }); +} + /** * Map from test case names to test functions */ @@ -333,7 +385,9 @@ var test_cases = { timeout_on_sleeping_server: timeoutOnSleepingServer, compute_engine_creds: _.partial(authTest, COMPUTE_ENGINE_USER, null), service_account_creds: _.partial(authTest, AUTH_USER, AUTH_SCOPE), - jwt_token_creds: _.partial(authTest, AUTH_USER, null) + jwt_token_creds: _.partial(authTest, AUTH_USER, null), + oauth2_auth_token: _.partial(oauth2Test, AUTH_USER, AUTH_SCOPE, false), + per_rpc_creds: _.partial(oauth2Test, AUTH_USER, AUTH_SCOPE, true) }; /** From c7b8872f5481a4d068e8e0729413b1779c13292f Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 20 Jul 2015 18:06:27 -0700 Subject: [PATCH 13/20] Removed unnecessary arguments from auth message --- src/node/interop/interop_client.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 0aadd862e58..34d6282fbfe 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -322,11 +322,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { (new GoogleAuth()).getApplicationDefault(function(err, credential) { assert.ifError(err); var arg = { - response_type: 'COMPRESSABLE', - response_size: 314159, - payload: { - body: zeroBuffer(271828) - }, fill_username: true, fill_oauth_scope: true }; From 63bb2f1de261c63df704c3a78b222b603faa0d74 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 20 Jul 2015 18:09:49 -0700 Subject: [PATCH 14/20] Removed now-incorrect asserts in oauth test --- src/node/interop/interop_client.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 34d6282fbfe..e810e68e450 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -342,8 +342,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { assert.ifError(error); var call = client.unaryCall(arg, function(err, resp) { assert.ifError(err); - assert.strictEqual(resp.payload.type, 'COMPRESSABLE'); - assert.strictEqual(resp.payload.body.length, 314159); assert.strictEqual(resp.username, expected_user); assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE); }); From 3d57871e9dd90d7302655cb4a6563fd1adca917b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Jul 2015 22:00:24 -0700 Subject: [PATCH 15/20] Fix TSAN reported failure --- src/core/channel/client_channel.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 10e01ebbb4f..09d71eb736a 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -236,18 +236,15 @@ static void picked_target(void *arg, int iomgr_success) { } } -static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { +static void pick_target( + grpc_lb_policy *lb_policy, call_data *calld, grpc_pollset *bind_pollset) { grpc_metadata_batch *initial_metadata; grpc_transport_stream_op *op = &calld->waiting_op; - GPR_ASSERT(op->bind_pollset); - GPR_ASSERT(op->send_ops); - GPR_ASSERT(op->send_ops->nops >= 1); - GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); initial_metadata = &op->send_ops->ops[0].data.metadata; grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata, + grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task); } @@ -358,12 +355,19 @@ static void perform_transport_stream_op(grpc_call_element *elem, gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; if (lb_policy) { + grpc_pollset *bind_pollset = calld->waiting_op.bind_pollset; GRPC_LB_POLICY_REF(lb_policy, "pick"); gpr_mu_unlock(&chand->mu_config); calld->state = CALL_WAITING_FOR_PICK; + + GPR_ASSERT(calld->waiting_op.bind_pollset); + GPR_ASSERT(calld->waiting_op.send_ops); + GPR_ASSERT(calld->waiting_op.send_ops->nops >= 1); + GPR_ASSERT( + calld->waiting_op.send_ops->ops[0].type == GRPC_OP_METADATA); gpr_mu_unlock(&calld->mu_state); - pick_target(lb_policy, calld); + pick_target(lb_policy, calld, bind_pollset); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else if (chand->resolver != NULL) { From 990f6427e8d9fabcff87fc21fcfce774d7500b2b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Jul 2015 22:07:13 -0700 Subject: [PATCH 16/20] Fix TSAN reported failure --- src/core/channel/client_channel.c | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 09d71eb736a..c1aa580b2d2 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -236,18 +236,6 @@ static void picked_target(void *arg, int iomgr_success) { } } -static void pick_target( - grpc_lb_policy *lb_policy, call_data *calld, grpc_pollset *bind_pollset) { - grpc_metadata_batch *initial_metadata; - grpc_transport_stream_op *op = &calld->waiting_op; - - initial_metadata = &op->send_ops->ops[0].data.metadata; - - grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, - &calld->picked_channel, &calld->async_setup_task); -} - static grpc_iomgr_closure *merge_into_waiting_op( grpc_call_element *elem, grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; @@ -355,19 +343,23 @@ static void perform_transport_stream_op(grpc_call_element *elem, gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; if (lb_policy) { - grpc_pollset *bind_pollset = calld->waiting_op.bind_pollset; + grpc_transport_stream_op *op = &calld->waiting_op; + grpc_pollset *bind_pollset = op->bind_pollset; + grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata; GRPC_LB_POLICY_REF(lb_policy, "pick"); gpr_mu_unlock(&chand->mu_config); calld->state = CALL_WAITING_FOR_PICK; - GPR_ASSERT(calld->waiting_op.bind_pollset); - GPR_ASSERT(calld->waiting_op.send_ops); - GPR_ASSERT(calld->waiting_op.send_ops->nops >= 1); + GPR_ASSERT(op->bind_pollset); + GPR_ASSERT(op->send_ops); + GPR_ASSERT(op->send_ops->nops >= 1); GPR_ASSERT( - calld->waiting_op.send_ops->ops[0].type == GRPC_OP_METADATA); + op->send_ops->ops[0].type == GRPC_OP_METADATA); gpr_mu_unlock(&calld->mu_state); - pick_target(lb_policy, calld, bind_pollset); + grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, + &calld->picked_channel, &calld->async_setup_task); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else if (chand->resolver != NULL) { From 26205360fe57350d300fc60626d51727b6595f0f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Jul 2015 08:21:57 -0700 Subject: [PATCH 17/20] Fix (forever) a TSAN bug thats plagued us --- src/core/iomgr/tcp_client_posix.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index dc0489e64f4..2ae0251abb0 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -114,6 +114,8 @@ static void on_writable(void *acp, int success) { void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + grpc_alarm_cancel(&ac->alarm); + gpr_mu_lock(&ac->mu); if (success) { do { @@ -178,8 +180,6 @@ finish: if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac); - } else { - grpc_alarm_cancel(&ac->alarm); } cb(cb_arg, ep); } From f97b4f26d64d175ff810896b5918ce28c47981af Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Mon, 20 Jul 2015 19:28:18 -0700 Subject: [PATCH 18/20] Add per-language homebrew testing for Mac on Jenkins --- tools/jenkins/run_distribution.sh | 95 ++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 20 deletions(-) diff --git a/tools/jenkins/run_distribution.sh b/tools/jenkins/run_distribution.sh index fd318692ac4..cb564fba1ac 100755 --- a/tools/jenkins/run_distribution.sh +++ b/tools/jenkins/run_distribution.sh @@ -39,19 +39,19 @@ if [ "$platform" == "linux" ]; then sha1=$(sha1sum tools/jenkins/grpc_linuxbrew/Dockerfile | cut -f1 -d\ ) DOCKER_IMAGE_NAME=grpc_linuxbrew_$sha1 + # build docker image, contains all pre-requisites docker build -t $DOCKER_IMAGE_NAME tools/jenkins/grpc_linuxbrew - supported="python nodejs ruby php" - if [ "$language" == "core" ]; then command="curl -fsSL https://goo.gl/getgrpc | bash -" - elif [[ "$supported" =~ "$language" ]]; then + elif [[ "python nodejs ruby php" =~ "$language" ]]; then command="curl -fsSL https://goo.gl/getgrpc | bash -s $language" else echo "unsupported language $language" exit 1 fi + # run per-language homebrew installation script docker run $DOCKER_IMAGE_NAME bash -l \ -c "nvm use 0.12; \ npm set unsafe-perm true; \ @@ -66,26 +66,81 @@ if [ "$platform" == "linux" ]; then elif [ "$platform" == "macos" ]; then if [ "$dist_channel" == "homebrew" ]; then - which brew # TODO: for debug, can be removed later + # system installed homebrew, don't interfere brew list -l - dir=/tmp/homebrew-test-$language - rm -rf $dir - mkdir -p $dir - git clone https://github.com/Homebrew/homebrew.git $dir - cd $dir - # TODO: Uncomment these when the general structure of the script is verified - # PATH=$dir/bin:$PATH brew tap homebrew/dupes - # PATH=$dir/bin:$PATH brew install zlib - # PATH=$dir/bin:$PATH brew install openssl - # PATH=$dir/bin:$PATH brew tap grpc/grpc - # PATH=$dir/bin:$PATH brew install --without-python google-protobuf - # PATH=$dir/bin:$PATH brew install grpc - PATH=$dir/bin:$PATH brew list -l + + # Set up temp directories for test installation of homebrew + brew_root=/tmp/homebrew-test-$language + rm -rf $brew_root + mkdir -p $brew_root + git clone https://github.com/Homebrew/homebrew.git $brew_root + + # Install grpc via homebrew + # + # The temp $PATH env variable makes sure we are operating at the right copy of + # temp homebrew installation, and do not interfere with the system's main brew + # installation. + # + # TODO: replace the next section with the actual homebrew installation script + # i.e. curl -fsSL https://goo.gl/getgrpc | bash -s $language + # need to resolve a bunch of environment and privilege issue on the jenkins + # mac machine itself + local OLD_PATH=$PATH + local PATH=$brew_root/bin:$PATH + cd $brew_root + brew tap homebrew/dupes + brew install zlib + brew install openssl + brew tap grpc/grpc + brew install --without-python google-protobuf + brew install grpc brew list -l + + # Install per-language modules/extensions on top of core grpc + # + # If a command below needs root access, the binary had been added to + # /etc/sudoers. This step needs to be repeated if we add more mac instances + # to our jenkins project. + # + # Examples (lines that needed to be added to /etc/sudoers): + # + Defaults env_keep += "CFLAGS CXXFLAGS LDFLAGS enable_grpc" + # + jenkinsnode1 ALL=(ALL) NOPASSWD: /usr/bin/pecl, /usr/local/bin/pip, + # + /usr/local/bin/npm + case $language in + *core*) ;; + *python*) + sudo CFLAGS=-I$brew_root/include LDFLAGS=-L$brew_root/lib pip install grpcio + pip list | grep grpcio + echo 'y' | sudo pip uninstall grpcio + ;; + *nodejs*) + sudo CXXFLAGS=-I$brew_root/include LDFLAGS=-L$brew_root/lib npm install grpc + npm list | grep grpc + sudo npm uninstall grpc + ;; + *ruby*) + gem install grpc -- --with-grpc-dir=$brew_root + gem list | grep grpc + gem uninstall grpc + ;; + *php*) + sudo enable_grpc=$brew_root CFLAGS="-Wno-parentheses-equality" pecl install grpc-alpha + pecl list | grep grpc + sudo pecl uninstall grpc + ;; + *) + echo "Unsupported language $language" + exit 1 + ;; + esac + + # clean up cd ~/ - rm -rf $dir - echo $PATH # TODO: for debug, can be removed later - brew list -l # TODO: for debug, can be removed later + rm -rf $brew_root + + # Make sure the system brew installation is still unaffected + local PATH=$OLD_PATH + brew list -l else echo "Unsupported $platform dist_channel $dist_channel" From 29e1aca8cc3882cc9e973176d275b61f13642ec8 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Tue, 21 Jul 2015 16:12:53 -0700 Subject: [PATCH 19/20] local can only be used in a function; --- tools/jenkins/run_distribution.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/jenkins/run_distribution.sh b/tools/jenkins/run_distribution.sh index cb564fba1ac..2824c31e2aa 100755 --- a/tools/jenkins/run_distribution.sh +++ b/tools/jenkins/run_distribution.sh @@ -85,8 +85,8 @@ elif [ "$platform" == "macos" ]; then # i.e. curl -fsSL https://goo.gl/getgrpc | bash -s $language # need to resolve a bunch of environment and privilege issue on the jenkins # mac machine itself - local OLD_PATH=$PATH - local PATH=$brew_root/bin:$PATH + OLD_PATH=$PATH + PATH=$brew_root/bin:$PATH cd $brew_root brew tap homebrew/dupes brew install zlib @@ -139,7 +139,7 @@ elif [ "$platform" == "macos" ]; then rm -rf $brew_root # Make sure the system brew installation is still unaffected - local PATH=$OLD_PATH + PATH=$OLD_PATH brew list -l else From a83cb9160fc6619df7dc81358a1f9e550bc83169 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Tue, 21 Jul 2015 17:13:33 -0700 Subject: [PATCH 20/20] use export instead --- tools/jenkins/run_distribution.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/jenkins/run_distribution.sh b/tools/jenkins/run_distribution.sh index 2824c31e2aa..eea25b62e6e 100755 --- a/tools/jenkins/run_distribution.sh +++ b/tools/jenkins/run_distribution.sh @@ -85,8 +85,8 @@ elif [ "$platform" == "macos" ]; then # i.e. curl -fsSL https://goo.gl/getgrpc | bash -s $language # need to resolve a bunch of environment and privilege issue on the jenkins # mac machine itself - OLD_PATH=$PATH - PATH=$brew_root/bin:$PATH + export OLD_PATH=$PATH + export PATH=$brew_root/bin:$PATH cd $brew_root brew tap homebrew/dupes brew install zlib @@ -139,7 +139,7 @@ elif [ "$platform" == "macos" ]; then rm -rf $brew_root # Make sure the system brew installation is still unaffected - PATH=$OLD_PATH + export PATH=$OLD_PATH brew list -l else