Reworked credentials surface API, added test

pull/3558/head
murgatroid99 9 years ago
parent 8cde3d7c20
commit 153b09d039
  1. 2
      src/node/examples/perf_test.js
  2. 2
      src/node/examples/qps_test.js
  3. 2
      src/node/examples/stock_client.js
  4. 20
      src/node/ext/call.cc
  5. 8
      src/node/ext/call.h
  6. 70
      src/node/ext/credentials.cc
  7. 24
      src/node/ext/credentials.h
  8. 2
      src/node/index.js
  9. 150
      src/node/interop/interop_client.js
  10. 348
      src/node/src/client.js
  11. 120
      src/node/src/credentials.js
  12. 118
      src/node/test/credentials_test.js
  13. 2
      src/node/test/health_test.js
  14. 2
      src/node/test/math_client_test.js
  15. 26
      src/node/test/surface_test.js

@ -42,7 +42,7 @@ function runTest(iterations, callback) {
var testServer = interop_server.getServer(0, false);
testServer.server.start();
var client = new testProto.TestService('localhost:' + testServer.port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
function runIterations(finish) {
var start = process.hrtime();

@ -62,7 +62,7 @@ function runTest(concurrent_calls, seconds, callback) {
var testServer = interop_server.getServer(0, false);
testServer.server.start();
var client = new testProto.TestService('localhost:' + testServer.port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var warmup_num = 100;

@ -39,7 +39,7 @@ var examples = grpc.load(__dirname + '/stock.proto').examples;
*
* var StockClient = require('stock_client.js');
* var stockClient = new StockClient(server_address,
* grpc.Credentials.createInsecure());
* grpc.credentials.createInsecure());
* stockClient.getLastTradePrice({symbol: 'GOOG'}, function(error, response) {
* console.log(error || response);
* });

@ -39,12 +39,14 @@
#include "grpc/support/log.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/alloc.h"
#include "grpc/support/time.h"
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
#include "completion_queue_async_worker.h"
#include "credentials.h"
#include "timeval.h"
using std::unique_ptr;
@ -501,6 +503,7 @@ void Call::Init(Local<Object> exports) {
Nan::SetPrototypeMethod(tpl, "cancel", Cancel);
Nan::SetPrototypeMethod(tpl, "cancelWithStatus", CancelWithStatus);
Nan::SetPrototypeMethod(tpl, "getPeer", GetPeer);
Nan::SetPrototypeMethod(tpl, "setCredentials", SetCredentials);
fun_tpl.Reset(tpl);
Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
Nan::Set(exports, Nan::New("Call").ToLocalChecked(), ctr);
@ -724,5 +727,22 @@ NAN_METHOD(Call::GetPeer) {
info.GetReturnValue().Set(peer_value);
}
NAN_METHOD(Call::SetCredentials) {
Nan::HandleScope scope;
if (!Credentials::HasInstance(info[0])) {
return Nan::ThrowTypeError(
"setCredentials' first argument must be a credential");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
Credentials *creds_object = ObjectWrap::Unwrap<Credentials>(
Nan::To<Object>(info[1]).ToLocalChecked());
grpc_credentials *creds = creds_object->GetWrappedCredentials();
grpc_call_error error = GRPC_CALL_ERROR;
if (creds) {
error = grpc_call_set_credentials(call->wrapped_call, creds);
}
info.GetReturnValue().Set(Nan::New<Uint32>(error));
}
} // namespace node
} // namespace grpc

@ -66,9 +66,6 @@ inline v8::Local<v8::Value> nanErrorWithCode(const char *msg,
return scope.Escape(err);
}
bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array,
shared_ptr<Resources> resources);
v8::Local<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
struct Resources {
@ -76,6 +73,10 @@ struct Resources {
std::vector<unique_ptr<PersistentValue> > handles;
};
bool CreateMetadataArray(v8::Local<v8::Object> metadata,
grpc_metadata_array *array,
shared_ptr<Resources> resources);
class Op {
public:
virtual v8::Local<v8::Value> GetNodeValue() const = 0;
@ -125,6 +126,7 @@ class Call : public Nan::ObjectWrap {
static NAN_METHOD(Cancel);
static NAN_METHOD(CancelWithStatus);
static NAN_METHOD(GetPeer);
static NAN_METHOD(SetCredentials);
static Nan::Callback *constructor;
// Used for typechecking instances of this javascript class
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;

@ -96,6 +96,9 @@ void Credentials::Init(Local<Object> exports) {
Nan::Set(ctr, Nan::New("createInsecure").ToLocalChecked(),
Nan::GetFunction(
Nan::New<FunctionTemplate>(CreateInsecure)).ToLocalChecked());
Nan::Set(ctr, Nan::New("createFromPlugin").ToLocalChecked(),
Nan::GetFunction(
Nan::New<FunctionTemplate>(CreateFromPlugin)).ToLocalChecked());
Nan::Set(exports, Nan::New("Credentials").ToLocalChecked(), ctr);
constructor = new Nan::Callback(ctr);
}
@ -198,12 +201,20 @@ NAN_METHOD(Credentials::CreateComposite) {
return Nan::ThrowTypeError(
"createComposite's second argument must be a Credentials object");
}
Credentials *creds1 = ObjectWrap::Unwrap<Credentials>(
Credentials *creds0 = ObjectWrap::Unwrap<Credentials>(
Nan::To<Object>(info[0]).ToLocalChecked());
Credentials *creds2 = ObjectWrap::Unwrap<Credentials>(
Credentials *creds1 = ObjectWrap::Unwrap<Credentials>(
Nan::To<Object>(info[1]).ToLocalChecked());
if (creds0->wrapped_credentials == NULL) {
info.GetReturnValue().Set(info[1]);
return;
}
if (creds1->wrapped_credentials == NULL) {
info.GetReturnValue().Set(info[0]);
return;
}
grpc_credentials *creds = grpc_composite_credentials_create(
creds1->wrapped_credentials, creds2->wrapped_credentials, NULL);
creds0->wrapped_credentials, creds1->wrapped_credentials, NULL);
if (creds == NULL) {
info.GetReturnValue().SetNull();
} else {
@ -259,7 +270,7 @@ NAN_METHOD(Credentials::CreateFromPlugin) {
if (creds == NULL) {
info.GetReturnValue().SetNull();
} else {
info.GetReturnValue().Set(WrapStruct(creds()));
info.GetReturnValue().Set(WrapStruct(creds));
}
}
@ -287,22 +298,59 @@ NAN_METHOD(PluginCallback) {
}
grpc_credentials_plugin_metadata_cb cb =
reinterpret_cast<grpc_credentials_plugin_metadata_cb>(
Nan::To<External>(
Nan::Get(info.Callee, "cb").ToLocalChecked()
).ToLocalChecked()->Value());
void *user_data = Nan::To<External>(
Nan::Get(info.Callee, "user_data").ToLocalChecked()
).ToLocalChecked()->Value();
Nan::Get(info.Callee(),
Nan::New("cb").ToLocalChecked()
).ToLocalChecked().As<External>()->Value());
void *user_data =
Nan::Get(info.Callee(),
Nan::New("user_data").ToLocalChecked()
).ToLocalChecked().As<External>()->Value();
gpr_log(GPR_DEBUG, "Calling plugin metadata callback");
cb(user_data, array.metadata, array.count, code, details);
}
NAUV_WORK_CB(SendPluginCallback) {
Nan::HandleScope scope;
plugin_callback_data *data = reinterpret_cast<plugin_callback_data*>(
async->data);
// Attach cb and user_data to plugin_callback so that it can access them later
v8::Local<v8::Function> plugin_callback = Nan::GetFunction(
Nan::New<v8::FunctionTemplate>(PluginCallback)).ToLocalChecked();
Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(),
Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(),
Nan::New<v8::External>(data->user_data));
const int argc = 2;
v8::Local<v8::Value> argv[argc] = {
Nan::New(data->service_url).ToLocalChecked(),
plugin_callback
};
Nan::Callback *callback = data->state->callback;
callback->Call(argc, argv);
delete data;
uv_unref((uv_handle_t *)async);
delete async;
}
void plugin_get_metadata(void *state, const char *service_url,
grpc_credentials_plugin_metadata_cb cb,
void *user_data) {
uv_async_t *async = new uv_async_t;
uv_async_init(uv_default_loop(),
async,
PluginCallback);
SendPluginCallback);
plugin_callback_data *data = new plugin_callback_data;
data->state = reinterpret_cast<plugin_state*>(state);
data->service_url = service_url;
data->cb = cb;
data->user_data = user_data;
async->data = data;
uv_async_send(async);
}
void plugin_destroy_state(void *ptr) {
plugin_state *state = reinterpret_cast<plugin_state *>(ptr);
delete state->callback;
}
} // namespace node

@ -96,27 +96,9 @@ void plugin_get_metadata(void *state, const char *service_url,
void plugin_destroy_state(void *state);
static NAN_METHOD(PluginCallback);
NAN_INLINE NAUV_WORK_CB(SendPluginCallback) {
Nan::HandleScope scope;
plugin_callback_data *data = reinterpret_cast<plugin_callback_data>(
async->data);
// Attach cb and user_data to plugin_callback so that it can access them later
v8::Local<v8::Function> plugin_callback = Nan::GetFunction(
Nan::New<v8::FunctionTemplate>(PluginCallback).ToLocalChecked());
Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(),
Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(),
Nan::New<v8::External>(data->user_data));
const int argc = 2;
v8::Local<v8::Value> argv = {Nan::New(data->service_url).ToLocalChecked(),
plugin_callback};
NanCallback *callback = static_cast<NanCallback*>(async->data);
callback->Call(argc, argv);
uv_unref((uv_handle_t *)async);
delete async;
}
NAN_METHOD(PluginCallback);
NAUV_WORK_CB(SendPluginCallback);
} // namespace node
} // namespace grpc

@ -153,7 +153,7 @@ exports.writeFlags = grpc.writeFlags;
/**
* Credentials factories
*/
exports.Credentials = grpc.Credentials;
exports.credentials = require('./src/credentials.js');
/**
* ServerCredentials factories

@ -280,33 +280,26 @@ function timeoutOnSleepingServer(client, done) {
* primarily for use with mocha
*/
function authTest(expected_user, scope, client, done) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
var arg = {
response_type: 'COMPRESSABLE',
response_size: 314159,
payload: {
body: zeroBuffer(271828)
},
fill_username: true,
fill_oauth_scope: true
};
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
if (credential.createScopedRequired() && scope) {
credential = credential.createScoped(scope);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
assert.strictEqual(resp.username, expected_user);
if (scope) {
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
}
if (done) {
done();
}
client.$updateMetadata = grpc.getGoogleAuthDelegate(credential);
var arg = {
response_type: 'COMPRESSABLE',
response_size: 314159,
payload: {
body: zeroBuffer(271828)
},
fill_username: true,
fill_oauth_scope: true
};
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
assert.strictEqual(resp.username, expected_user);
if (scope) {
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
}
if (done) {
done();
}
});
});
}
@ -345,24 +338,83 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
});
}
function perRpcAuthTest(expected_user, scope, per_rpc, client, done) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
assert.ifError(err);
var arg = {
fill_username: true,
fill_oauth_scope: true
};
credential = credential.createScoped(scope);
var creds = grpc.credentials.createFromGoogleCredential(credential);
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
if (done) {
done();
}
}, null, {credentials: creds});
});
}
function getApplicationCreds(scope, callback) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
if (err) {
callback(err);
return;
}
if (credential.createScopedRequired() && scope) {
credential = credential.createScoped(scope);
}
callback(null, grpc.credentials.createFromGoogleCredential(credential));
});
}
function getOauth2Creds(scope, callback) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
if (err) {
callback(err);
return;
}
credential = credential.createScoped(scope);
credential.getAccessToken(function(err, token) {
if (err) {
callback(err);
return;
}
var updateMd = function(service_url, callback) {
var metadata = new grpc.Metadata();
metadata.add('authorization', 'Bearer ' + token);
callback(null, metadata);
};
callback(null, grpc.credentials.createFromMetadataGenerator(updateMd));
});
});
}
/**
* Map from test case names to test functions
*/
var test_cases = {
empty_unary: emptyUnary,
large_unary: largeUnary,
client_streaming: clientStreaming,
server_streaming: serverStreaming,
ping_pong: pingPong,
empty_stream: emptyStream,
cancel_after_begin: cancelAfterBegin,
cancel_after_first_response: cancelAfterFirstResponse,
timeout_on_sleeping_server: timeoutOnSleepingServer,
compute_engine_creds: _.partial(authTest, COMPUTE_ENGINE_USER, null),
service_account_creds: _.partial(authTest, AUTH_USER, AUTH_SCOPE),
jwt_token_creds: _.partial(authTest, AUTH_USER, null),
oauth2_auth_token: _.partial(oauth2Test, AUTH_USER, AUTH_SCOPE, false),
per_rpc_creds: _.partial(oauth2Test, AUTH_USER, AUTH_SCOPE, true)
empty_unary: {run: emptyUnary},
large_unary: {run: largeUnary},
client_streaming: {run: clientStreaming},
server_streaming: {run: serverStreaming},
ping_pong: {run: pingPong},
empty_stream: {run: emptyStream},
cancel_after_begin: {run: cancelAfterBegin},
cancel_after_first_response: {run: cancelAfterFirstResponse},
timeout_on_sleeping_server: {run: timeoutOnSleepingServer},
compute_engine_creds: {run: _.partial(authTest, COMPUTE_ENGINE_USER, null),
getCreds: _.partial(getApplicationCreds, null)},
service_account_creds: {run: _.partial(authTest, AUTH_USER, AUTH_SCOPE),
getCreds: _.partial(getApplicationCreds, AUTH_SCOPE)},
jwt_token_creds: {run: _.partial(authTest, AUTH_USER, null),
getCreds: _.partial(getApplicationCreds, null)},
oauth2_auth_token: {run: _.partial(oauth2Test, AUTH_USER, AUTH_SCOPE, false),
getCreds: _.partial(getOauth2Creds, AUTH_SCOPE)},
per_rpc_creds: {run: _.partial(perRpcAuthTest, AUTH_USER, AUTH_SCOPE, true)}
};
/**
@ -388,17 +440,29 @@ function runTest(address, host_override, test_case, tls, test_ca, done) {
ca_path = process.env.SSL_CERT_FILE;
}
var ca_data = fs.readFileSync(ca_path);
creds = grpc.Credentials.createSsl(ca_data);
creds = grpc.credentials.createSsl(ca_data);
if (host_override) {
options['grpc.ssl_target_name_override'] = host_override;
options['grpc.default_authority'] = host_override;
}
} else {
creds = grpc.Credentials.createInsecure();
creds = grpc.credentials.createInsecure();
}
var client = new testProto.TestService(address, creds, options);
var test = test_cases[test_case];
var execute = function(err, creds) {
assert.ifError(err);
var client = new testProto.TestService(address, creds, options);
test.run(client, done);
};
test_cases[test_case](client, done);
if (test.getCreds) {
test.getCreds(function(err, new_creds) {
execute(err, grpc.credentials.combineCredentials(creds, new_creds));
});
} else {
execute(null, creds);
}
}
if (require.main === module) {

@ -233,17 +233,23 @@ function getCall(channel, method, options) {
var host;
var parent;
var propagate_flags;
var credentials;
if (options) {
deadline = options.deadline;
host = options.host;
parent = _.get(options, 'parent.call');
propagate_flags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
return new grpc.Call(channel, method, deadline, host,
parent, propagate_flags);
var call = new grpc.Call(channel, method, deadline, host,
parent, propagate_flags);
if (credentials) {
call.setCredentials(credentials);
}
return call;
}
/**
@ -282,60 +288,53 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
emitter.getPeer = function getPeer() {
return call.getPeer();
};
this.$updateMetadata(this.$auth_uri, metadata, function(error, metadata) {
if (error) {
call.cancel();
callback(error);
return;
}
var client_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== grpc.status.OK) {
error = new Error(response.status.details);
error.code = status.code;
error.metadata = status.metadata;
callback(error);
var client_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
callback(null, deserialized);
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
emitter.emit('status', status);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
}
if (status.code !== grpc.status.OK) {
error = new Error(response.status.details);
error.code = status.code;
error.metadata = status.metadata;
callback(error);
} else {
callback(null, deserialized);
}
emitter.emit('status', status);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
return emitter;
}
@ -371,62 +370,55 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
metadata = metadata.clone();
}
var stream = new ClientWritableStream(call, serialize);
this.$updateMetadata(this.$auth_uri, metadata, function(error, metadata) {
if (error) {
call.cancel();
callback(error);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === grpc.status.OK) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== grpc.status.OK) {
error = new Error(response.status.details);
error.code = status.code;
error.metadata = status.metadata;
callback(error);
} else {
callback(null, deserialized);
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
stream.emit('status', status);
});
}
if (status.code !== grpc.status.OK) {
error = new Error(response.status.details);
error.code = status.code;
error.metadata = status.metadata;
callback(error);
} else {
callback(null, deserialized);
}
stream.emit('status', status);
});
return stream;
}
@ -462,51 +454,44 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
metadata = metadata.clone();
}
var stream = new ClientReadableStream(call, deserialize);
this.$updateMetadata(this.$auth_uri, metadata, function(error, metadata) {
if (error) {
call.cancel();
stream.emit('error', error);
var start_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
var start_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
} else {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
} else {
if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
}
}
});
}
});
return stream;
}
@ -540,45 +525,38 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
metadata = metadata.clone();
}
var stream = new ClientDuplexStream(call, serialize, deserialize);
this.$updateMetadata(this.$auth_uri, metadata, function(error, metadata) {
if (error) {
call.cancel();
stream.emit('error', error);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
} else {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
error.code = response.status.code;
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
} else {
if (err) {
// Got a batch error, but OK status. Something went wrong
stream.emit('error', err);
return;
}
}
});
}
});
return stream;
}
@ -618,15 +596,8 @@ exports.makeClientConstructor = function(methods, serviceName) {
* @param {grpc.Credentials} credentials Credentials to use to connect
* to the server
* @param {Object} options Options to pass to the underlying channel
* @param {function(string, Object, function)=} updateMetadata function to
* update the metadata for each request
*/
function Client(address, credentials, options, updateMetadata) {
if (!updateMetadata) {
updateMetadata = function(uri, metadata, callback) {
callback(null, metadata);
};
}
function Client(address, credentials, options) {
if (!options) {
options = {};
}
@ -634,11 +605,6 @@ exports.makeClientConstructor = function(methods, serviceName) {
/* Private fields use $ as a prefix instead of _ because it is an invalid
* prefix of a method name */
this.$channel = new grpc.Channel(address, credentials, options);
// Remove the optional DNS scheme, trailing port, and trailing backslash
address = address.replace(/^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$/, '$2');
this.$server_address = address;
this.$auth_uri = 'https://' + this.$server_address + '/' + serviceName;
this.$updateMetadata = updateMetadata;
}
_.each(methods, function(attrs, name) {

@ -0,0 +1,120 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Credentials module
* @module
*/
'use strict';
var grpc = require('bindings')('grpc.node');
var Credentials = grpc.Credentials;
var Metadata = require('./metadata.js');
/**
* Create an SSL Credentials object. If using a client-side certificate, both
* the second and third arguments must be passed.
* @param {Buffer} root_certs The root certificate data
* @param {Buffer=} private_key The client certificate private key, if
* applicable
* @param {Buffer=} cert_chain The client certificate cert chain, if applicable
*/
exports.createSsl = Credentials.createSsl;
/**
* Create a gRPC credentials object from a metadata generation function. This
* function gets the service URL and a callback as parameters. The error
* passed to the callback can optionally have a 'code' value attached to it,
* which corresponds to a status code that this library uses.
* @param {function(String, function(Error, Metadata))} metadata_generator The
* function that generates metadata
* @return {Credentials} The credentials object
*/
exports.createFromMetadataGenerator = function(metadata_generator) {
return Credentials.createFromPlugin(function(service_url, callback) {
metadata_generator(service_url, function(error, metadata) {
var code = grpc.status.OK;
var message = '';
if (error) {
message = error.message;
if (error.hasOwnProperty('code')) {
code = error.code;
}
}
callback(code, message, metadata._getCoreRepresentation());
});
});
};
/**
* Create a gRPC credential from a Google credential object.
* @param {Object} google_credential The Google credential object to use
* @return {Credentials} The resulting credentials object
*/
exports.createFromGoogleCredential = function(google_credential) {
return exports.createFromMetadataGenerator(function(service_url, callback) {
google_credential.getRequestMetadata(service_url, function(err, header) {
if (err) {
callback(err);
return;
}
var metadata = new Metadata();
metadata.add('authorization', header.Authorization);
callback(null, metadata);
});
});
};
/**
* Combine any number of Credentials into a single credentials object
* @param(...Credentials) credentials The Credentials to combine
* @return Credentials A credentials object that combines all of the input
* credentials
*/
exports.combineCredentials = function() {
var current = arguments[0];
for (var i = 1; i < arguments.length; i++) {
current = Credentials.createComposite(current, arguments[i]);
}
return current;
};
/**
* Create an insecure credentials object. This is used to create a channel that
* does not use SSL.
* @return Credentials The insecure credentials object
*/
exports.createInsecure = Credentials.createInsecure;

@ -0,0 +1,118 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var assert = require('assert');
var fs = require('fs');
var path = require('path');
var grpc = require('..');
describe('client credentials', function() {
var Client;
var server;
var port;
var client_ssl_creds;
var client_options = {};
before(function() {
var proto = grpc.load(__dirname + '/test_service.proto');
server = new grpc.Server();
server.addProtoService(proto.TestService.service, {
unary: function(call, cb) {
call.sendMetadata(call.metadata);
cb(null, {});
},
clientStream: function(stream, cb){
stream.on('data', function(data) {});
stream.on('end', function() {
stream.sendMetadata(stream.metadata);
cb(null, {});
});
},
serverStream: function(stream) {
stream.sendMetadata(stream.metadata);
stream.end();
},
bidiStream: function(stream) {
stream.on('data', function(data) {});
stream.on('end', function() {
stream.sendMetadata(stream.metadata);
stream.end();
});
}
});
var key_path = path.join(__dirname, './data/server1.key');
var pem_path = path.join(__dirname, './data/server1.pem');
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
var creds = grpc.ServerCredentials.createSsl(null,
[{private_key: key_data,
cert_chain: pem_data}]);
//creds = grpc.ServerCredentials.createInsecure();
port = server.bind('localhost:0', creds);
server.start();
Client = proto.TestService;
var ca_path = path.join(__dirname, '../test/data/ca.pem');
var ca_data = fs.readFileSync(ca_path);
client_ssl_creds = grpc.credentials.createSsl(ca_data);
var host_override = 'foo.test.google.fr';
client_options['grpc.ssl_target_name_override'] = host_override;
client_options['grpc.default_authority'] = host_override;
});
after(function() {
server.forceShutdown();
});
it.only('Should update metadata with SSL creds', function(done) {
var metadataUpdater = function(service_url, callback) {
var metadata = new grpc.Metadata();
metadata.set('plugin_key', 'plugin_value');
callback(null, metadata);
};
var creds = grpc.credentials.createFromMetadataGenerator(metadataUpdater);
var combined_creds = grpc.credentials.combineCredentials(client_ssl_creds,
creds);
//combined_creds = grpc.credentials.createInsecure();
var client = new Client('localhost:' + port, combined_creds,
client_options);
var call = client.unary({}, function(err, data) {
assert.ifError(err);
console.log('Received response');
});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']);
done();
});
});
});

@ -54,7 +54,7 @@ describe('Health Checking', function() {
grpc.ServerCredentials.createInsecure());
healthServer.start();
healthClient = new health.Client('localhost:' + port_num,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
});
after(function() {
healthServer.forceShutdown();

@ -55,7 +55,7 @@ describe('Math client', function() {
grpc.ServerCredentials.createInsecure());
server.start();
math_client = new math.Math('localhost:' + port_num,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
done();
});
after(function() {

@ -163,7 +163,7 @@ describe('waitForClientReady', function() {
Client = surface_client.makeProtobufClientConstructor(mathService);
});
beforeEach(function() {
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
});
after(function() {
server.forceShutdown();
@ -217,7 +217,7 @@ describe('Echo service', function() {
});
var port = server.bind('localhost:0', server_insecure_creds);
var Client = surface_client.makeProtobufClientConstructor(echo_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
after(function() {
@ -263,7 +263,7 @@ describe('Generic client and server', function() {
server.start();
var Client = grpc.makeGenericClientConstructor(string_service_attrs);
client = new Client('localhost:' + port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
});
after(function() {
server.forceShutdown();
@ -311,7 +311,7 @@ describe('Echo metadata', function() {
});
var port = server.bind('localhost:0', server_insecure_creds);
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
metadata = new grpc.Metadata();
metadata.set('key', 'value');
@ -437,7 +437,7 @@ describe('Other conditions', function() {
});
port = server.bind('localhost:0', server_insecure_creds);
Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
after(function() {
@ -484,7 +484,7 @@ describe('Other conditions', function() {
var Client = surface_client.makeClientConstructor(test_service_attrs,
'TestService');
misbehavingClient = new Client('localhost:' + port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
});
it('should respond correctly to a unary call', function(done) {
misbehavingClient.unary(badArg, function(err, data) {
@ -725,7 +725,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var call = proxy_client.unary({}, function(err, value) {
done();
});
@ -748,7 +748,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var call = proxy_client.clientStream(function(err, value) {
done();
});
@ -769,7 +769,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var call = proxy_client.serverStream({});
call.on('error', function(err) {
done();
@ -790,7 +790,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var call = proxy_client.bidiStream();
call.on('error', function(err) {
done();
@ -819,7 +819,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
proxy_client.clientStream(function(err, value) {
@ -842,7 +842,7 @@ describe('Other conditions', function() {
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.Credentials.createInsecure());
grpc.credentials.createInsecure());
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
var call = proxy_client.bidiStream(null, {deadline: deadline});
@ -866,7 +866,7 @@ describe('Cancelling surface client', function() {
});
var port = server.bind('localhost:0', server_insecure_creds);
var Client = surface_client.makeProtobufClientConstructor(mathService);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
client = new Client('localhost:' + port, grpc.credentials.createInsecure());
server.start();
});
after(function() {

Loading…
Cancel
Save