gRPC PHP Client Interceptor implementation and tests

move InterceptorChannel to Internal subdir; change year
pull/13342/head
ZhouyihaiDing 7 years ago
parent aef957950a
commit 52b95cf1fb
  1. 389
      src/php/lib/Grpc/BaseStub.php
  2. 86
      src/php/lib/Grpc/Interceptor.php
  3. 76
      src/php/lib/Grpc/Internal/InterceptorChannel.php
  4. 427
      src/php/tests/unit_tests/InterceptorTest.php

@ -38,12 +38,13 @@ class BaseStub
* - 'update_metadata': (optional) a callback function which takes in a
* metadata array, and returns an updated metadata array
* - 'grpc.primary_user_agent': (optional) a user-agent string
* @param Channel $channel An already created Channel object (optional)
* @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
*/
public function __construct($hostname, $opts, Channel $channel = null)
public function __construct($hostname, $opts, $channel = null)
{
$ssl_roots = file_get_contents(
dirname(__FILE__).'/../../../../etc/roots.pem');
dirname(__FILE__).'/../../../../etc/roots.pem'
);
ChannelCredentials::setDefaultRootsPem($ssl_roots);
$this->hostname = $hostname;
@ -58,16 +59,20 @@ class BaseStub
$this->hostname_override = $opts['grpc.ssl_target_name_override'];
}
if ($channel) {
if (!is_a($channel, 'Grpc\Channel')) {
throw new \Exception('The channel argument is not a'.
'Channel object');
if (!is_a($channel, 'Grpc\Channel') &&
!is_a($channel, 'Grpc\InterceptorChannel')) {
throw new \Exception('The channel argument is not a Channel object '.
'or an InterceptorChannel object created by '.
'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
}
$this->channel = $channel;
return;
}
$package_config = json_decode(
file_get_contents(dirname(__FILE__).'/../../composer.json'), true);
file_get_contents(dirname(__FILE__).'/../../composer.json'),
true
);
if (!empty($opts['grpc.primary_user_agent'])) {
$opts['grpc.primary_user_agent'] .= ' ';
} else {
@ -77,8 +82,8 @@ class BaseStub
'grpc-php/'.$package_config['version'];
if (!array_key_exists('credentials', $opts)) {
throw new \Exception("The opts['credentials'] key is now ".
'required. Please see one of the '.
'ChannelCredentials::create methods');
'required. Please see one of the '.
'ChannelCredentials::create methods');
}
$this->channel = new Channel($hostname, $opts);
}
@ -169,7 +174,8 @@ class BaseStub
$last_slash_idx = strrpos($method, '/');
if ($last_slash_idx === false) {
throw new \InvalidArgumentException(
'service name must have a slash');
'service name must have a slash'
);
}
$service_name = substr($method, 0, $last_slash_idx);
@ -197,7 +203,8 @@ class BaseStub
if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
throw new \InvalidArgumentException(
'Metadata keys must be nonempty strings containing only '.
'alphanumeric characters, hyphens and underscores');
'alphanumeric characters, hyphens and underscores'
);
}
$metadata_copy[strtolower($key)] = $value;
}
@ -205,9 +212,255 @@ class BaseStub
return $metadata_copy;
}
/**
* Create a function which can be used to create UnaryCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _GrpcUnaryUnary($channel, $deserialize)
{
return function ($method,
$argument,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
$call = new UnaryCall(
$channel,
$method,
$deserialize,
$options
);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func(
$this->update_metadata,
$metadata,
$jwt_aud_uri
);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata
);
$call->start($argument, $metadata, $options);
return $call;
};
}
/**
* Create a function which can be used to create ServerStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _GrpcStreamUnary($channel, $deserialize)
{
return function ($method,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
$call = new ClientStreamingCall(
$channel,
$method,
$deserialize,
$options
);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func(
$this->update_metadata,
$metadata,
$jwt_aud_uri
);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata
);
$call->start($metadata);
return $call;
};
}
/**
* Create a function which can be used to create ClientStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _GrpcUnaryStream($channel, $deserialize)
{
return function ($method,
$argument,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
$call = new ServerStreamingCall(
$channel,
$method,
$deserialize,
$options
);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func(
$this->update_metadata,
$metadata,
$jwt_aud_uri
);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata
);
$call->start($argument, $metadata, $options);
return $call;
};
}
/**
* Create a function which can be used to create BidiStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _GrpcStreamStream($channel, $deserialize)
{
return function ($method,
array $metadata = [],
array $options = []) use ($channel ,$deserialize) {
$call = new BidiStreamingCall(
$channel,
$method,
$deserialize,
$options
);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func(
$this->update_metadata,
$metadata,
$jwt_aud_uri
);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata
);
$call->start($metadata);
return $call;
};
}
/**
* Create a function which can be used to create UnaryCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _UnaryUnaryCallFactory($channel, $deserialize)
{
if (is_a($channel, 'Grpc\InterceptorChannel')) {
return function ($method,
$argument,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
return $channel->getInterceptor()->interceptUnaryUnary(
$method,
$argument,
$metadata,
$options,
$this->_UnaryUnaryCallFactory($channel->getNext(), $deserialize)
);
};
}
return $this->_GrpcUnaryUnary($channel, $deserialize);
}
/**
* Create a function which can be used to create ServerStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _UnaryStreamCallFactory($channel, $deserialize)
{
if (is_a($channel, 'Grpc\InterceptorChannel')) {
return function ($method,
$argument,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
return $channel->getInterceptor()->interceptUnaryStream(
$method,
$argument,
$metadata,
$options,
$this->_UnaryStreamCallFactory($channel->getNext(), $deserialize)
);
};
}
return $this->_GrpcUnaryStream($channel, $deserialize);
}
/**
* Create a function which can be used to create ClientStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _StreamUnaryCallFactory($channel, $deserialize)
{
if (is_a($channel, 'Grpc\InterceptorChannel')) {
return function ($method,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
return $channel->getInterceptor()->interceptStreamUnary(
$method,
$metadata,
$options,
$this->_StreamUnaryCallFactory($channel->getNext(), $deserialize)
);
};
}
return $this->_GrpcStreamUnary($channel, $deserialize);
}
/**
* Create a function which can be used to create BidiStreamingCall
*
* @param Channel|InterceptorChannel $channel
* @param callable $deserialize A function that deserializes the response
*
* @return \Closure
*/
private function _StreamStreamCallFactory($channel, $deserialize)
{
if (is_a($channel, 'Grpc\InterceptorChannel')) {
return function ($method,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
return $channel->getInterceptor()->interceptStreamStream(
$method,
$metadata,
$options,
$this->_StreamStreamCallFactory($channel->getNext(), $deserialize)
);
};
}
return $this->_GrpcStreamStream($channel, $deserialize);
}
/* This class is intended to be subclassed by generated code, so
* all functions begin with "_" to avoid name collisions. */
/**
* Call a remote method that takes a single argument and has a
* single output.
@ -221,26 +474,15 @@ class BaseStub
*
* @return UnaryCall The active call object
*/
protected function _simpleRequest($method,
$argument,
$deserialize,
array $metadata = [],
array $options = [])
{
$call = new UnaryCall($this->channel,
$method,
$deserialize,
$options);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func($this->update_metadata,
$metadata,
$jwt_aud_uri);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata);
$call->start($argument, $metadata, $options);
protected function _simpleRequest(
$method,
$argument,
$deserialize,
array $metadata = [],
array $options = []
) {
$call_factory = $this->_UnaryUnaryCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@ -256,25 +498,14 @@ class BaseStub
*
* @return ClientStreamingCall The active call object
*/
protected function _clientStreamRequest($method,
$deserialize,
array $metadata = [],
array $options = [])
{
$call = new ClientStreamingCall($this->channel,
$method,
$deserialize,
$options);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func($this->update_metadata,
$metadata,
$jwt_aud_uri);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata);
$call->start($metadata);
protected function _clientStreamRequest(
$method,
$deserialize,
array $metadata = [],
array $options = []
) {
$call_factory = $this->_StreamUnaryCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $metadata, $options);
return $call;
}
@ -291,26 +522,15 @@ class BaseStub
*
* @return ServerStreamingCall The active call object
*/
protected function _serverStreamRequest($method,
$argument,
$deserialize,
array $metadata = [],
array $options = [])
{
$call = new ServerStreamingCall($this->channel,
$method,
$deserialize,
$options);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func($this->update_metadata,
$metadata,
$jwt_aud_uri);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata);
$call->start($argument, $metadata, $options);
protected function _serverStreamRequest(
$method,
$argument,
$deserialize,
array $metadata = [],
array $options = []
) {
$call_factory = $this->_UnaryStreamCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@ -325,25 +545,14 @@ class BaseStub
*
* @return BidiStreamingCall The active call object
*/
protected function _bidiRequest($method,
$deserialize,
array $metadata = [],
array $options = [])
{
$call = new BidiStreamingCall($this->channel,
$method,
$deserialize,
$options);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
if (is_callable($this->update_metadata)) {
$metadata = call_user_func($this->update_metadata,
$metadata,
$jwt_aud_uri);
}
$metadata = $this->_validate_and_normalize_metadata(
$metadata);
$call->start($metadata);
protected function _bidiRequest(
$method,
$deserialize,
array $metadata = [],
array $options = []
) {
$call_factory = $this->_StreamStreamCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $metadata, $options);
return $call;
}
}

@ -0,0 +1,86 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Grpc;
/**
* Represents an interceptor that intercept RPC invocations before call starts.
* This is an EXPERIMENTAL API.
*/
class Interceptor
{
public function interceptUnaryUnary(
$method,
$argument,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $argument, $metadata, $options);
}
public function interceptStreamUnary(
$method,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $metadata, $options);
}
public function interceptUnaryStream(
$method,
$argument,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $argument, $metadata, $options);
}
public function interceptStreamStream(
$method,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $metadata, $options);
}
/**
* Intercept the methods with Channel
*
* @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
* @param Interceptor|Interceptor[] $interceptors interceptors to be added
*
* @return InterceptorChannel
*/
public static function intercept($channel, $interceptors)
{
if (is_array($interceptors)) {
for ($i = count($interceptors) - 1; $i >= 0; $i--) {
$channel = new InterceptorChannel($channel, $interceptors[$i]);
}
} else {
$channel = new InterceptorChannel($channel, $interceptors);
}
return $channel;
}
}

@ -0,0 +1,76 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Grpc;
/**
* This is a PRIVATE API and can change without notice.
*/
class InterceptorChannel
{
private $next = null;
private $interceptor;
/**
* @param Channel|InterceptorChannel $channel An already created Channel
* or InterceptorChannel object (optional)
* @param Interceptor $interceptor
*/
public function __construct($channel, $interceptor)
{
if (!is_a($channel, 'Grpc\Channel') &&
!is_a($channel, 'Grpc\InterceptorChannel')) {
throw new \Exception('The channel argument is not a Channel object '.
'or an InterceptorChannel object created by '.
'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
}
$this->interceptor = $interceptor;
$this->next = $channel;
}
public function getNext()
{
return $this->next;
}
public function getInterceptor()
{
return $this->interceptor;
}
public function getTarget()
{
return $this->getNext()->getTarget();
}
public function watchConnectivityState($new_state, $deadline)
{
return $this->getNext()->watchConnectivityState($new_state, $deadline);
}
public function getConnectivityState($try_to_connect = false)
{
return $this->getNext()->getConnectivityState($try_to_connect);
}
public function close()
{
return $this->getNext()->close();
}
}

@ -0,0 +1,427 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Interface exported by the server.
*/
require_once(dirname(__FILE__).'/../../lib/Grpc/BaseStub.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
class SimpleRequest
{
private $data;
public function __construct($data)
{
$this->data = $data;
}
public function setData($data)
{
$this->data = $data;
}
public function serializeToString()
{
return $this->data;
}
}
class InterceptorClient extends Grpc\BaseStub
{
/**
* @param string $hostname hostname
* @param array $opts channel options
* @param Channel|InterceptorChannel $channel (optional) re-use channel object
*/
public function __construct($hostname, $opts, $channel = null)
{
parent::__construct($hostname, $opts, $channel);
}
/**
* A simple RPC.
* @param \Routeguide\Point $argument input argument
* @param array $metadata metadata
* @param array $options call options
*/
public function UnaryCall(
SimpleRequest $argument,
$metadata = [],
$options = []
) {
return $this->_simpleRequest(
'/dummy_method',
$argument,
[],
$metadata,
$options
);
}
/**
* A client-to-server streaming RPC.
* @param array $metadata metadata
* @param array $options call options
*/
public function StreamCall(
$metadata = [],
$options = []
) {
return $this->_clientStreamRequest('/dummy_method', [], $metadata, $options);
}
}
class ChangeMetadataInterceptor extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
array $metadata = [],
array $options = [],
$continuation)
{
$metadata["foo"] = array('interceptor_from_unary_request');
return $continuation($method, $argument, $metadata, $options);
}
public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
{
$metadata["foo"] = array('interceptor_from_stream_request');
return $continuation($method, $metadata, $options);
}
}
class ChangeMetadataInterceptor2 extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
array $metadata = [],
array $options = [],
$continuation)
{
if (array_key_exists('foo', $metadata)) {
$metadata['bar'] = array('ChangeMetadataInterceptor should be executed first');
} else {
$metadata["bar"] = array('interceptor_from_unary_request');
}
return $continuation($method, $argument, $metadata, $options);
}
public function interceptStreamUnary($method,
array $metadata = [],
array $options = [],
$continuation)
{
if (array_key_exists('foo', $metadata)) {
$metadata['bar'] = array('ChangeMetadataInterceptor should be executed first');
} else {
$metadata["bar"] = array('interceptor_from_stream_request');
}
return $continuation($method, $metadata, $options);
}
}
class ChangeRequestCall
{
private $call;
public function __construct($call)
{
$this->call = $call;
}
public function getCall()
{
return $this->call;
}
public function write($request)
{
$request->setData('intercepted_stream_request');
$this->getCall()->write($request);
}
public function wait()
{
return $this->getCall()->wait();
}
}
class ChangeRequestInterceptor extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
array $metadata = [],
array $options = [],
$continuation)
{
$argument->setData('intercepted_unary_request');
return $continuation($method, $argument, $metadata, $options);
}
public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
{
return new ChangeRequestCall(
$continuation($method, $metadata, $options)
);
}
}
class StopCallInterceptor extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
array $metadata = [],
array $options = [],
$continuation)
{
$metadata["foo"] = array('interceptor_from_request_response');
}
public function interceptStreamUnary($method,
array $metadata = [],
array $options = [],
$continuation)
{
$metadata["foo"] = array('interceptor_from_request_response');
}
}
class InterceptorTest extends PHPUnit_Framework_TestCase
{
public function setUp()
{
$this->server = new Grpc\Server([]);
$this->port = $this->server->addHttp2Port('0.0.0.0:0');
$this->channel = new Grpc\Channel('localhost:'.$this->port, ['credentials' => Grpc\ChannelCredentials::createInsecure()]);
$this->server->start();
}
public function tearDown()
{
$this->channel->close();
}
public function testClientChangeMetadataOneInterceptor()
{
$req_text = 'client_request';
$channel_matadata_interceptor = new ChangeMetadataInterceptor();
$intercept_channel = Grpc\Interceptor::intercept($this->channel, $channel_matadata_interceptor);
echo "create Client\n";
$client = new InterceptorClient('localhost:'.$this->port, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
], $intercept_channel);
echo "create Call\n";
$req = new SimpleRequest($req_text);
echo "Call created\n";
$unary_call = $client->UnaryCall($req);
echo "start call\n";
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
$stream_call = $client->StreamCall();
$stream_call->write($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
unset($unary_call);
unset($stream_call);
unset($server_call);
}
public function testClientChangeMetadataTwoInterceptor()
{
$req_text = 'client_request';
$channel_matadata_interceptor = new ChangeMetadataInterceptor();
$channel_matadata_intercepto2 = new ChangeMetadataInterceptor2();
// test intercept separately.
$intercept_channel1 = Grpc\Interceptor::intercept($this->channel, $channel_matadata_interceptor);
$intercept_channel2 = Grpc\Interceptor::intercept($intercept_channel1, $channel_matadata_intercepto2);
$client = new InterceptorClient('localhost:'.$this->port, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
], $intercept_channel2);
$req = new SimpleRequest($req_text);
$unary_call = $client->UnaryCall($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
$this->assertSame(['interceptor_from_unary_request'], $event->metadata['bar']);
$stream_call = $client->StreamCall();
$stream_call->write($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
$this->assertSame(['interceptor_from_stream_request'], $event->metadata['bar']);
unset($unary_call);
unset($stream_call);
unset($server_call);
// test intercept by array.
$intercept_channel3 = Grpc\Interceptor::intercept($this->channel,
[$channel_matadata_intercepto2, $channel_matadata_interceptor]);
$client = new InterceptorClient('localhost:'.$this->port, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
], $intercept_channel3);
$req = new SimpleRequest($req_text);
$unary_call = $client->UnaryCall($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
$this->assertSame(['interceptor_from_unary_request'], $event->metadata['bar']);
$stream_call = $client->StreamCall();
$stream_call->write($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
$this->assertSame(['interceptor_from_stream_request'], $event->metadata['bar']);
unset($unary_call);
unset($stream_call);
unset($server_call);
}
public function testClientChangeRequestInterceptor()
{
$req_text = 'client_request';
$change_request_interceptor = new ChangeRequestInterceptor();
$intercept_channel = Grpc\Interceptor::intercept($this->channel,
$change_request_interceptor);
$client = new InterceptorClient('localhost:'.$this->port, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
], $intercept_channel);
$req = new SimpleRequest($req_text);
$unary_call = $client->UnaryCall($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$stream_call = $client->StreamCall();
$stream_call->write($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_stream_request', $event->message);
unset($unary_call);
unset($stream_call);
unset($server_call);
}
public function testClientChangeStopCallInterceptor()
{
$req_text = 'client_request';
$channel_request_interceptor = new StopCallInterceptor();
$intercept_channel = Grpc\Interceptor::intercept($this->channel,
$channel_request_interceptor);
$client = new InterceptorClient('localhost:'.$this->port, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
], $intercept_channel);
$req = new SimpleRequest($req_text);
$unary_call = $client->UnaryCall($req);
$this->assertNull($unary_call);
$stream_call = $client->StreamCall();
$this->assertNull($stream_call);
unset($unary_call);
unset($stream_call);
unset($server_call);
}
public function testGetInterceptorChannelConnectivityState()
{
$channel = new Grpc\Channel(
'localhost:0',
['credentials' => Grpc\ChannelCredentials::createInsecure()]
);
$interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
$state = $interceptor_channel->getConnectivityState();
$this->assertEquals(0, $state);
$channel->close();
}
public function testInterceptorChannelWatchConnectivityState()
{
$channel = new Grpc\Channel(
'localhost:0',
['credentials' => Grpc\ChannelCredentials::createInsecure()]
);
$interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
$now = Grpc\Timeval::now();
$deadline = $now->add(new Grpc\Timeval(100*1000));
$state = $interceptor_channel->watchConnectivityState(1, $deadline);
$this->assertTrue($state);
unset($time);
unset($deadline);
$channel->close();
}
public function testInterceptorChannelClose()
{
$channel = new Grpc\Channel(
'localhost:0',
['credentials' => Grpc\ChannelCredentials::createInsecure()]
);
$interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
$this->assertNotNull($interceptor_channel);
$channel->close();
}
public function testInterceptorChannelGetTarget()
{
$channel = new Grpc\Channel(
'localhost:8888',
['credentials' => Grpc\ChannelCredentials::createInsecure()]
);
$interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
$target = $interceptor_channel->getTarget();
$this->assertTrue(is_string($target));
$channel->close();
}
}
Loading…
Cancel
Save