Merge branch 'master' of github.com:grpc/grpc into cpp_docs

pull/3074/head
David Garcia Quintas 9 years ago
commit 2ea2e31158
  1. 7
      src/compiler/csharp_generator.cc
  2. 3
      src/compiler/csharp_generator.h
  3. 23
      src/node/ext/call.cc
  4. 15
      src/node/index.js
  5. 8
      src/node/interop/interop_client.js
  6. 61
      src/node/src/client.js
  7. 181
      src/node/src/metadata.js
  8. 65
      src/node/src/server.js
  9. 193
      src/node/test/metadata_test.js
  10. 62
      src/node/test/surface_test.js
  11. 51
      src/python/grpcio/grpc/_links/service.py
  12. 6
      src/python/grpcio/grpc/framework/core/_end.py
  13. 25
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  14. 16
      src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
  15. 15
      src/python/grpcio_test/grpc_test/_links/_transmission_test.py
  16. 5
      src/python/grpcio_test/grpc_test/test_common.py

@ -33,6 +33,7 @@
#include <cctype>
#include <map>
#include <sstream>
#include <vector>
#include "src/compiler/csharp_generator.h"
@ -44,7 +45,6 @@
using google::protobuf::compiler::csharp::GetFileNamespace;
using google::protobuf::compiler::csharp::GetClassName;
using google::protobuf::compiler::csharp::GetUmbrellaClassName;
using google::protobuf::SimpleItoa;
using grpc::protobuf::FileDescriptor;
using grpc::protobuf::Descriptor;
using grpc::protobuf::ServiceDescriptor;
@ -228,11 +228,14 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
}
void GenerateServiceDescriptorProperty(Printer* out, const ServiceDescriptor *service) {
std::ostringstream index;
index << service->index();
out->Print("// service descriptor\n");
out->Print("public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor\n");
out->Print("{\n");
out->Print(" get { return $umbrella$.Descriptor.Services[$index$]; }\n",
"umbrella", GetUmbrellaClassName(service->file()), "index", SimpleItoa(service->index()));
"umbrella", GetUmbrellaClassName(service->file()), "index",
index.str());
out->Print("}\n");
out->Print("\n");
}

@ -36,10 +36,7 @@
#include "src/compiler/config.h"
using namespace std;
#include <google/protobuf/compiler/csharp/csharp_names.h>
#include <google/protobuf/stubs/strutil.h>
namespace grpc_csharp_generator {

@ -111,17 +111,19 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
NanAssignPersistent(*handle, value);
resources->handles.push_back(unique_ptr<PersistentHolder>(
new PersistentHolder(handle)));
continue;
} else {
return false;
}
}
if (value->IsString()) {
Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
current->value = **utf8_value;
current->value_length = string_value->Length();
} else {
return false;
if (value->IsString()) {
Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
current->value = **utf8_value;
current->value_length = string_value->Length();
} else {
return false;
}
}
array->count += 1;
}
@ -156,8 +158,7 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
}
if (EndsWith(elem->key, "-bin")) {
array->Set(index_map[elem->key],
MakeFastBuffer(
NanNewBufferHandle(elem->value, elem->value_length)));
NanNewBufferHandle(elem->value, elem->value_length));
} else {
array->Set(index_map[elem->key], NanNew(elem->value));
}

@ -41,6 +41,8 @@ var client = require('./src/client.js');
var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
var grpc = require('bindings')('grpc');
/**
@ -107,18 +109,12 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
* @param {function(Error, Object)} callback
*/
return function updateMetadata(authURI, metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
credential.getRequestMetadata(authURI, function(err, header) {
if (err) {
callback(err);
return;
}
metadata.Authorization.push(header.Authorization);
metadata.add('authorization', header.Authorization);
callback(null, metadata);
});
};
@ -129,6 +125,11 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
*/
exports.Server = server.Server;
/**
* @see module:src/metadata
*/
exports.Metadata = Metadata;
/**
* Status name to code number mapping
*/

@ -321,13 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
credential.getAccessToken(function(err, token) {
assert.ifError(err);
var updateMetadata = function(authURI, metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
metadata.Authorization.push('Bearer ' + token);
metadata.Add('authorization', 'Bearer ' + token);
callback(null, metadata);
};
var makeTestCall = function(error, client_metadata) {

@ -42,7 +42,9 @@ var _ = require('lodash');
var grpc = require('bindings')('grpc.node');
var common = require('./common.js');
var common = require('./common');
var Metadata = require('./metadata');
var EventEmitter = require('events').EventEmitter;
@ -254,8 +256,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* serialize
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Metadata=} metadata Metadata to add to the call
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
@ -264,7 +265,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var emitter = new EventEmitter();
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
emitter.cancel = function cancel() {
call.cancel();
@ -283,13 +286,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (options) {
message.grpcWriteFlags = options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -304,7 +310,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
}
emitter.emit('metadata', response.metadata);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
callback(null, deserialize(response.read));
});
});
@ -328,7 +335,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* @this {Client} Client object. Must have a channel member.
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -337,7 +344,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientWritableStream(call, serialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -347,7 +356,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
return;
}
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
@ -355,12 +365,15 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -398,7 +411,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Object} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -407,7 +420,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientReadableStream(call, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -421,7 +436,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@ -431,11 +447,14 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -470,7 +489,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a bidirectional stream request with this method on the given channel.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Options} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -479,7 +498,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientDuplexStream(call, serialize, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -489,7 +510,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
@ -497,11 +519,14 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);

@ -0,0 +1,181 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Metadata module
* @module
*/
'use strict';
var _ = require('lodash');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
* @constructor
*/
function Metadata() {
this._internal_repr = {};
}
function normalizeKey(key) {
if (!(/^[A-Za-z\d_-]+$/.test(key))) {
throw new Error('Metadata keys must be nonempty strings containing only ' +
'alphanumeric characters and hyphens');
}
return key.toLowerCase();
}
function validate(key, value) {
if (_.endsWith(key, '-bin')) {
if (!(value instanceof Buffer)) {
throw new Error('keys that end with \'-bin\' must have Buffer values');
}
} else {
if (!_.isString(value)) {
throw new Error(
'keys that don\'t end with \'-bin\' must have String values');
}
if (!(/^[\x20-\x7E]*$/.test(value))) {
throw new Error('Metadata string values can only contain printable ' +
'ASCII characters and space');
}
}
}
/**
* Sets the given value for the given key, replacing any other values associated
* with that key. Normalizes the key.
* @param {String} key The key to set
* @param {String|Buffer} value The value to set. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.set = function(key, value) {
key = normalizeKey(key);
validate(key, value);
this._internal_repr[key] = [value];
};
/**
* Adds the given value for the given key. Normalizes the key.
* @param {String} key The key to add to.
* @param {String|Buffer} value The value to add. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.add = function(key, value) {
key = normalizeKey(key);
validate(key, value);
if (!this._internal_repr[key]) {
this._internal_repr[key] = [];
}
this._internal_repr[key].push(value);
};
/**
* Remove the given key and any associated values. Normalizes the key.
* @param {String} key The key to remove
*/
Metadata.prototype.remove = function(key) {
key = normalizeKey(key);
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
delete this._internal_repr[key];
}
};
/**
* Gets a list of all values associated with the key. Normalizes the key.
* @param {String} key The key to get
* @return {Array.<String|Buffer>} The values associated with that key
*/
Metadata.prototype.get = function(key) {
key = normalizeKey(key);
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
return this._internal_repr[key];
} else {
return [];
}
};
/**
* Get a map of each key to a single associated value. This reflects the most
* common way that people will want to see metadata.
* @return {Object.<String,String|Buffer>} A key/value mapping of the metadata
*/
Metadata.prototype.getMap = function() {
var result = {};
_.forOwn(this._internal_repr, function(values, key) {
if(values.length > 0) {
result[key] = values[0];
}
});
return result;
};
/**
* Clone the metadata object.
* @return {Metadata} The new cloned object
*/
Metadata.prototype.clone = function() {
var copy = new Metadata();
_.forOwn(this._internal_repr, function(value, key) {
copy._internal_repr[key] = _.clone(value);
});
return copy;
};
/**
* Gets the metadata in the format used by interal code. Intended for internal
* use only. API stability is not guaranteed.
* @private
* @return {Object.<String, Array.<String|Buffer>>} The metadata
*/
Metadata.prototype._getCoreRepresentation = function() {
return this._internal_repr;
};
/**
* Creates a Metadata object from a metadata map in the internal format.
* Intended for internal use only. API stability is not guaranteed.
* @private
* @param {Object.<String, Array.<String|Buffer>>} The metadata
* @return {Metadata} The new Metadata object
*/
Metadata._fromCoreRepresentation = function(metadata) {
var newMetadata = new Metadata();
if (metadata) {
newMetadata._internal_repr = _.cloneDeep(metadata);
}
return newMetadata;
};
module.exports = Metadata;

@ -44,6 +44,8 @@ var grpc = require('bindings')('grpc.node');
var common = require('./common');
var Metadata = require('./metadata');
var stream = require('stream');
var Readable = stream.Readable;
@ -60,10 +62,10 @@ var EventEmitter = require('events').EventEmitter;
* @param {Object} error The error object
*/
function handleError(call, error) {
var statusMetadata = new Metadata();
var status = {
code: grpc.status.UNKNOWN,
details: 'Unknown Error',
metadata: {}
details: 'Unknown Error'
};
if (error.hasOwnProperty('message')) {
status.details = error.message;
@ -75,11 +77,13 @@ function handleError(call, error) {
}
}
if (error.hasOwnProperty('metadata')) {
status.metadata = error.metadata;
statusMetadata = error.metadata;
}
status.metadata = statusMetadata._getCoreRepresentation();
var error_batch = {};
if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
error_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
}
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
@ -114,22 +118,24 @@ function waitForCancel(call, emitter) {
* @param {*} value The value to respond with
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
* @param {Metadata=} metadata Optional trailing metadata to send with status
* @param {number=} flags Flags for modifying how the message is sent.
* Defaults to 0.
*/
function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var statusMetadata = new Metadata();
var status = {
code: grpc.status.OK,
details: 'OK',
metadata: {}
details: 'OK'
};
if (metadata) {
status.metadata = metadata;
statusMetadata = metadata;
}
status.metadata = statusMetadata._getCoreRepresentation();
if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
end_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
call.metadataSent = true;
}
var message = serialize(value);
@ -151,14 +157,19 @@ function setUpWritable(stream, serialize) {
stream.status = {
code : grpc.status.OK,
details : 'OK',
metadata : {}
metadata : new Metadata()
};
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
if (!stream.call.metadataSent) {
stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
}
if (stream.status.metadata) {
stream.status.metadata = stream.status.metadata._getCoreRepresentation();
}
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
@ -173,7 +184,7 @@ function setUpWritable(stream, serialize) {
function setStatus(err) {
var code = grpc.status.UNKNOWN;
var details = 'Unknown Error';
var metadata = {};
var metadata = new Metadata();
if (err.hasOwnProperty('message')) {
details = err.message;
}
@ -203,7 +214,7 @@ function setUpWritable(stream, serialize) {
/**
* Override of Writable#end method that allows for sending metadata with a
* success status.
* @param {Object=} metadata Metadata to send with the status
* @param {Metadata=} metadata Metadata to send with the status
*/
stream.end = function(metadata) {
if (metadata) {
@ -266,7 +277,8 @@ function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
this.call.metadataSent = true;
}
var message = this.serialize(chunk);
@ -289,15 +301,15 @@ ServerWritableStream.prototype._write = _write;
/**
* Send the initial metadata for a writable stream.
* @param {Object<String, Array<(String|Buffer)>>} responseMetadata Metadata
* to send
* @param {Metadata} responseMetadata Metadata to send
*/
function sendMetadata(responseMetadata) {
/* jshint validthis: true */
if (!this.call.metadataSent) {
this.call.metadataSent = true;
var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
this.call.startBatch(batch, function(err) {
if (err) {
this.emit('error', err);
@ -422,7 +434,7 @@ ServerDuplexStream.prototype.getPeer = getPeer;
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
@ -430,7 +442,8 @@ function handleUnary(call, handler, metadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {});
}
};
@ -478,7 +491,7 @@ function handleUnary(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
@ -507,7 +520,7 @@ function handleServerStreaming(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
@ -515,7 +528,8 @@ function handleClientStreaming(call, handler, metadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {});
}
};
@ -542,7 +556,7 @@ function handleClientStreaming(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
@ -599,7 +613,7 @@ function Server(options) {
var details = event.new_call;
var call = details.call;
var method = details.method;
var metadata = details.metadata;
var metadata = Metadata._fromCoreRepresentation(details.metadata);
if (method === null) {
return;
}
@ -609,7 +623,8 @@ function Server(options) {
handler = handlers[method];
} else {
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
code: grpc.status.UNIMPLEMENTED,
details: 'This method is not available on this server.',

@ -0,0 +1,193 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var Metadata = require('../src/metadata.js');
var assert = require('assert');
describe('Metadata', function() {
var metadata;
beforeEach(function() {
metadata = new Metadata();
});
describe('#set', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.set('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.set('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.set('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.set('key-bin', new Buffer('value'));
});
});
it('Rejects invalid keys', function() {
assert.throws(function() {
metadata.set('key$', 'value');
});
assert.throws(function() {
metadata.set('', 'value');
});
});
it('Rejects values with non-ASCII characters', function() {
assert.throws(function() {
metadata.set('key', 'résumé');
});
});
it('Saves values that can be retrieved', function() {
metadata.set('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Overwrites previous values', function() {
metadata.set('key', 'value1');
metadata.set('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
it('Normalizes keys', function() {
metadata.set('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.set('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
});
describe('#add', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.add('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.add('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.add('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.add('key-bin', new Buffer('value'));
});
});
it('Rejects invalid keys', function() {
assert.throws(function() {
metadata.add('key$', 'value');
});
assert.throws(function() {
metadata.add('', 'value');
});
});
it('Saves values that can be retrieved', function() {
metadata.add('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Combines with previous values', function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('Normalizes keys', function() {
metadata.add('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.add('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
});
describe('#remove', function() {
it('clears values from a key', function() {
metadata.add('key', 'value');
metadata.remove('key');
assert.deepEqual(metadata.get('key'), []);
});
it('Normalizes keys', function() {
metadata.add('key', 'value');
metadata.remove('KEY');
assert.deepEqual(metadata.get('key'), []);
});
});
describe('#get', function() {
beforeEach(function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
metadata.add('key-bin', new Buffer('value'));
});
it('gets all values associated with a key', function() {
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('Normalizes keys', function() {
assert.deepEqual(metadata.get('KEY'), ['value1', 'value2']);
});
it('returns an empty list for non-existent keys', function() {
assert.deepEqual(metadata.get('non-existent-key'), []);
});
it('returns Buffers for "-bin" keys', function() {
assert(metadata.get('key-bin')[0] instanceof Buffer);
});
});
describe('#getMap', function() {
it('gets a map of keys to values', function() {
metadata.add('key1', 'value1');
metadata.add('Key2', 'value2');
metadata.add('KEY3', 'value3');
assert.deepEqual(metadata.getMap(),
{key1: 'value1',
key2: 'value2',
key3: 'value3'});
});
});
describe('#clone', function() {
it('retains values from the original', function() {
metadata.add('key', 'value');
var copy = metadata.clone();
assert.deepEqual(copy.get('key'), ['value']);
});
it('Does not see newly added values', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
metadata.add('key', 'value2');
assert.deepEqual(copy.get('key'), ['value1']);
});
it('Does not add new values to the original', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
copy.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1']);
});
});
});

@ -262,6 +262,7 @@ describe('Generic client and server', function() {
describe('Echo metadata', function() {
var client;
var server;
var metadata;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
@ -294,6 +295,8 @@ describe('Echo metadata', function() {
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
server.start();
metadata = new grpc.Metadata();
metadata.set('key', 'value');
});
after(function() {
server.forceShutdown();
@ -301,35 +304,35 @@ describe('Echo metadata', function() {
it('with unary call', function(done) {
var call = client.unary({}, function(err, data) {
assert.ifError(err);
}, {key: ['value']});
}, metadata);
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
});
it('with client stream call', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
}, {key: ['value']});
}, metadata);
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
call.end();
});
it('with server stream call', function(done) {
var call = client.serverStream({}, {key: ['value']});
var call = client.serverStream({}, metadata);
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
});
it('with bidi stream call', function(done) {
var call = client.bidiStream({key: ['value']});
var call = client.bidiStream(metadata);
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
call.end();
@ -337,9 +340,10 @@ describe('Echo metadata', function() {
it('shows the correct user-agent string', function(done) {
var version = require('../package.json').version;
var call = client.unary({}, function(err, data) { assert.ifError(err); },
{key: ['value']});
metadata);
call.on('metadata', function(metadata) {
assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version));
assert(_.startsWith(metadata.get('user-agent')[0],
'grpc-node/' + version));
done();
});
});
@ -354,13 +358,15 @@ describe('Other conditions', function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
test_service = test_proto.lookup('TestService');
server = new grpc.Server();
var trailer_metadata = new grpc.Metadata();
trailer_metadata.add('trailer-present', 'yes');
server.addProtoService(test_service, {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
cb(new Error('Requested error'), null, {trailer_present: ['yes']});
cb(new Error('Requested error'), null, trailer_metadata);
} else {
cb(null, {count: 1}, {trailer_present: ['yes']});
cb(null, {count: 1}, trailer_metadata);
}
},
clientStream: function(stream, cb){
@ -369,14 +375,14 @@ describe('Other conditions', function() {
stream.on('data', function(data) {
if (data.error) {
errored = true;
cb(new Error('Requested error'), null, {trailer_present: ['yes']});
cb(new Error('Requested error'), null, trailer_metadata);
} else {
count += 1;
}
});
stream.on('end', function() {
if (!errored) {
cb(null, {count: count}, {trailer_present: ['yes']});
cb(null, {count: count}, trailer_metadata);
}
});
},
@ -384,13 +390,13 @@ describe('Other conditions', function() {
var req = stream.request;
if (req.error) {
var err = new Error('Requested error');
err.metadata = {trailer_present: ['yes']};
err.metadata = trailer_metadata;
stream.emit('error', err);
} else {
for (var i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end({trailer_present: ['yes']});
stream.end(trailer_metadata);
}
},
bidiStream: function(stream) {
@ -398,10 +404,8 @@ describe('Other conditions', function() {
stream.on('data', function(data) {
if (data.error) {
var err = new Error('Requested error');
err.metadata = {
trailer_present: ['yes'],
count: ['' + count]
};
err.metadata = trailer_metadata.clone();
err.metadata.add('count', '' + count);
stream.emit('error', err);
} else {
stream.write({count: count});
@ -409,7 +413,7 @@ describe('Other conditions', function() {
}
});
stream.on('end', function() {
stream.end({trailer_present: ['yes']});
stream.end(trailer_metadata);
});
}
});
@ -510,7 +514,7 @@ describe('Other conditions', function() {
assert.ifError(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -519,7 +523,7 @@ describe('Other conditions', function() {
assert(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -531,7 +535,7 @@ describe('Other conditions', function() {
call.write({error: false});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -543,7 +547,7 @@ describe('Other conditions', function() {
call.write({error: true});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -552,7 +556,7 @@ describe('Other conditions', function() {
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -560,7 +564,7 @@ describe('Other conditions', function() {
var call = client.serverStream({error: true});
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']);
assert.deepEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -572,7 +576,7 @@ describe('Other conditions', function() {
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -583,7 +587,7 @@ describe('Other conditions', function() {
call.end();
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']);
assert.deepEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});

@ -316,9 +316,8 @@ class _Kernel(object):
call.status(status, call)
self._rpc_states.pop(call, None)
def add_port(self, port, server_credentials):
def add_port(self, address, server_credentials):
with self._lock:
address = '[::]:%d' % port
if self._server is None:
self._completion_queue = _intermediary_low.CompletionQueue()
self._server = _intermediary_low.Server(self._completion_queue)
@ -337,10 +336,13 @@ class _Kernel(object):
self._server.start()
self._server.service(None)
def graceful_stop(self):
def begin_stop(self):
with self._lock:
self._server.stop()
self._server = None
def end_stop(self):
with self._lock:
self._completion_queue.stop()
self._completion_queue = None
pool = self._pool
@ -348,11 +350,6 @@ class _Kernel(object):
self._rpc_states = None
pool.shutdown(wait=True)
def immediate_stop(self):
# TODO(nathaniel): Implementation.
raise NotImplementedError(
'TODO(nathaniel): after merge of rewritten lower layers')
class ServiceLink(links.Link):
"""A links.Link for use on the service-side of a gRPC connection.
@ -362,17 +359,20 @@ class ServiceLink(links.Link):
"""
@abc.abstractmethod
def add_port(self, port, server_credentials):
def add_port(self, address, server_credentials):
"""Adds a port on which to service RPCs after this link has been started.
Args:
port: The port on which to service RPCs, or zero to request that a port
be automatically selected and used.
address: The address on which to service RPCs with a port number of zero
requesting that a port number be automatically selected and used.
server_credentials: An _intermediary_low.ServerCredentials object, or
None for insecure service.
Returns:
A port on which RPCs will be serviced after this link has been started.
A integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()
@ -386,18 +386,20 @@ class ServiceLink(links.Link):
raise NotImplementedError()
@abc.abstractmethod
def stop_gracefully(self):
"""Stops this link.
def begin_stop(self):
"""Indicate imminent link stop and immediate rejection of new RPCs.
New RPCs will be rejected as soon as this method is called, but ongoing RPCs
will be allowed to continue until they terminate. This method blocks until
all RPCs have terminated.
will be allowed to continue until they terminate. This method does not
block.
"""
raise NotImplementedError()
@abc.abstractmethod
def stop_immediately(self):
"""Stops this link.
def end_stop(self):
"""Finishes stopping this link.
begin_stop must have been called exactly once before calling this method.
All in-progress RPCs will be terminated immediately.
"""
@ -417,19 +419,18 @@ class _ServiceLink(ServiceLink):
def join_link(self, link):
self._relay.set_behavior(link.accept_ticket)
def add_port(self, port, server_credentials):
return self._kernel.add_port(port, server_credentials)
def add_port(self, address, server_credentials):
return self._kernel.add_port(address, server_credentials)
def start(self):
self._relay.start()
return self._kernel.start()
def stop_gracefully(self):
self._kernel.graceful_stop()
self._relay.stop()
def begin_stop(self):
self._kernel.begin_stop()
def stop_immediately(self):
self._kernel.immediate_stop()
def end_stop(self):
self._kernel.end_stop()
self._relay.stop()

@ -30,7 +30,6 @@
"""Implementation of base.End."""
import abc
import enum
import threading
import uuid
@ -75,7 +74,7 @@ def _abort(operations):
def _cancel_futures(futures):
for future in futures:
futures.cancel()
future.cancel()
def _future_shutdown(lock, cycle, event):
@ -83,8 +82,6 @@ def _future_shutdown(lock, cycle, event):
with lock:
_abort(cycle.operations.values())
_cancel_futures(cycle.futures)
pool = cycle.pool
cycle.pool.shutdown(wait=True)
return in_future
@ -113,6 +110,7 @@ def _termination_action(lock, stats, operation_id, cycle):
cycle.idle_actions = []
if cycle.grace:
_cancel_futures(cycle.futures)
cycle.pool.shutdown(wait=False)
return termination_action

@ -45,11 +45,7 @@ from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import test_cases
from grpc_test.framework.interfaces.base import test_interfaces
_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
_CODE = _intermediary_low.Code.OK
_MESSAGE = b'test message'
class _SerializationBehaviors(
@ -95,7 +91,7 @@ class _Implementation(test_interfaces.Implementation):
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
@ -114,19 +110,22 @@ class _Implementation(test_interfaces.Implementation):
def destantiate(self, memo):
invocation_grpc_link, service_grpc_link = memo
invocation_grpc_link.stop()
service_grpc_link.stop_gracefully()
service_grpc_link.begin_stop()
service_grpc_link.end_stop()
def invocation_initial_metadata(self):
return _INVOCATION_INITIAL_METADATA
return grpc_test_common.INVOCATION_INITIAL_METADATA
def service_initial_metadata(self):
return _SERVICE_INITIAL_METADATA
return grpc_test_common.SERVICE_INITIAL_METADATA
def invocation_completion(self):
return utilities.completion(None, None, None)
def service_completion(self):
return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
return utilities.completion(
grpc_test_common.SERVICE_TERMINAL_METADATA, _CODE,
grpc_test_common.DETAILS)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
@ -146,14 +145,6 @@ class _Implementation(test_interfaces.Implementation):
return True
def setUpModule():
logging.warn('setUpModule!')
def tearDownModule():
logging.warn('tearDownModule!')
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(

@ -39,11 +39,10 @@ from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from grpc_test import test_common
from grpc_test import test_common as grpc_test_common
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.face import test_cases
from grpc_test.framework.interfaces.face import test_interfaces
from grpc_test.framework.interfaces.links import test_utilities
class _SerializationBehaviors(
@ -85,7 +84,7 @@ class _Implementation(test_interfaces.Implementation):
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
@ -121,8 +120,9 @@ class _Implementation(test_interfaces.Implementation):
service_end_link, pool) = memo
invocation_end_link.stop(0).wait()
invocation_grpc_link.stop()
service_grpc_link.stop_gracefully()
service_grpc_link.begin_stop()
service_end_link.stop(0).wait()
service_grpc_link.end_stop()
invocation_end_link.join_link(utilities.NULL_LINK)
invocation_grpc_link.join_link(utilities.NULL_LINK)
service_grpc_link.join_link(utilities.NULL_LINK)
@ -130,19 +130,19 @@ class _Implementation(test_interfaces.Implementation):
pool.shutdown(wait=True)
def invocation_metadata(self):
return test_common.INVOCATION_INITIAL_METADATA
return grpc_test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return test_common.SERVICE_INITIAL_METADATA
return grpc_test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return test_common.SERVICE_TERMINAL_METADATA
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
return _intermediary_low.Code.OK
def details(self):
return test_common.DETAILS
return grpc_test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(

@ -50,7 +50,7 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
service_link = service.service_link(
{self.group_and_method(): self.deserialize_request},
{self.group_and_method(): self.serialize_response})
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
@ -62,7 +62,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
invocation_side_link.stop()
service_side_link.stop_gracefully()
service_side_link.begin_stop()
service_side_link.end_stop()
def create_invocation_initial_metadata(self):
return (
@ -116,7 +117,7 @@ class RoundTripTest(unittest.TestCase):
identity_transformation, identity_transformation)
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
@ -140,7 +141,8 @@ class RoundTripTest(unittest.TestCase):
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.stop_gracefully()
service_link.begin_stop()
service_link.end_stop()
self.assertIs(
service_mate.tickets()[-1].termination,
@ -160,7 +162,7 @@ class RoundTripTest(unittest.TestCase):
{(test_group, test_method): scenario.serialize_response})
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
@ -206,7 +208,8 @@ class RoundTripTest(unittest.TestCase):
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.stop_gracefully()
service_link.begin_stop()
service_link.end_stop()
observed_requests = tuple(
ticket.payload for ticket in service_mate.tickets()

@ -31,6 +31,11 @@
import collections
INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
DETAILS = b'test details'
def metadata_transmitted(original_metadata, transmitted_metadata):
"""Judges whether or not metadata was acceptably transmitted.

Loading…
Cancel
Save