diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 5ae6931b992..ecb419ac8f8 100644 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -28,6 +28,7 @@ class BaseStub private $hostname; private $hostname_override; private $channel; + private $call_invoker; // a callback function private $update_metadata; @@ -58,6 +59,15 @@ class BaseStub if (!empty($opts['grpc.ssl_target_name_override'])) { $this->hostname_override = $opts['grpc.ssl_target_name_override']; } + if (isset($opts['grpc_call_invoker'])) { + $this->call_invoker = $opts['grpc_call_invoker']; + unset($opts['grpc_call_invoker']); + $channel_opts = $this->updateOpts($opts); + // If the grpc_call_invoker is defined, use the channel created by the call invoker. + $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts); + return; + } + $this->call_invoker = new DefaultCallInvoker(); if ($channel) { if (!is_a($channel, 'Grpc\Channel') && !is_a($channel, 'Grpc\Internal\InterceptorChannel')) { @@ -72,15 +82,7 @@ class BaseStub $this->channel = static::getDefaultChannel($hostname, $opts); } - /** - * Creates and returns the default Channel - * - * @param array $opts Channel constructor options - * - * @return Channel The channel - */ - public static function getDefaultChannel($hostname, array $opts) - { + private static function updateOpts($opts) { $package_config = json_decode( file_get_contents(dirname(__FILE__).'/../../composer.json'), true @@ -97,6 +99,19 @@ class BaseStub 'required. Please see one of the '. 'ChannelCredentials::create methods'); } + return $opts; + } + + /** + * Creates and returns the default Channel + * + * @param array $opts Channel constructor options + * + * @return Channel The channel + */ + public static function getDefaultChannel($hostname, array $opts) + { + $channel_opts = self::updateOpts($opts); return new Channel($hostname, $opts); } @@ -239,7 +254,7 @@ class BaseStub $deserialize, array $metadata = [], array $options = []) use ($channel) { - $call = new UnaryCall( + $call = $this->call_invoker->UnaryCall( $channel, $method, $deserialize, @@ -275,7 +290,7 @@ class BaseStub $deserialize, array $metadata = [], array $options = []) use ($channel) { - $call = new ClientStreamingCall( + $call = $this->call_invoker->ClientStreamingCall( $channel, $method, $deserialize, @@ -312,7 +327,7 @@ class BaseStub $deserialize, array $metadata = [], array $options = []) use ($channel) { - $call = new ServerStreamingCall( + $call = $this->call_invoker->ServerStreamingCall( $channel, $method, $deserialize, @@ -348,7 +363,7 @@ class BaseStub $deserialize, array $metadata = [], array $options = []) use ($channel) { - $call = new BidiStreamingCall( + $call = $this->call_invoker->BidiStreamingCall( $channel, $method, $deserialize, diff --git a/src/php/lib/Grpc/CallInvoker.php b/src/php/lib/Grpc/CallInvoker.php new file mode 100644 index 00000000000..a1b45532120 --- /dev/null +++ b/src/php/lib/Grpc/CallInvoker.php @@ -0,0 +1,33 @@ +data = $data; + } + public function setData($data) + { + $this->data = $data; + } + public function serializeToString() + { + return $this->data; + } +} + +class CallInvokerClient extends Grpc\BaseStub +{ + + /** + * @param string $hostname hostname + * @param array $opts channel options + * @param Channel|InterceptorChannel $channel (optional) re-use channel object + */ + public function __construct($hostname, $opts, $channel = null) + { + parent::__construct($hostname, $opts, $channel); + } + + /** + * A simple RPC. + * @param SimpleRequest $argument input argument + * @param array $metadata metadata + * @param array $options call options + */ + public function UnaryCall( + CallInvokerSimpleRequest $argument, + $metadata = [], + $options = [] + ) { + return $this->_simpleRequest( + '/dummy_method', + $argument, + [], + $metadata, + $options + ); + } +} + +class CallInvokerUpdateChannel implements \Grpc\CallInvoker +{ + private $channel; + + public function getChannel() { + return $this->channel; + } + + public function createChannelFactory($hostname, $opts) { + $this->channel = new \Grpc\Channel('localhost:50050', $opts); + return $this->channel; + } + + public function UnaryCall($channel, $method, $deserialize, $options) { + return new UnaryCall($channel, $method, $deserialize, $options); + } + + public function ClientStreamingCall($channel, $method, $deserialize, $options) { + return new ClientStreamingCall($channel, $method, $deserialize, $options); + } + + public function ServerStreamingCall($channel, $method, $deserialize, $options) { + return new ServerStreamingCall($channel, $method, $deserialize, $options); + } + + public function BidiStreamingCall($channel, $method, $deserialize, $options) { + return new BidiStreamingCall($channel, $method, $deserialize, $options); + } +} + + +class CallInvokerChangeRequest implements \Grpc\CallInvoker +{ + private $channel; + + public function getChannel() { + return $this->channel; + } + public function createChannelFactory($hostname, $opts) { + $this->channel = new \Grpc\Channel($hostname, $opts); + return $this->channel; + } + + public function UnaryCall($channel, $method, $deserialize, $options) { + return new CallInvokerChangeRequestCall($channel, $method, $deserialize, $options); + } + + public function ClientStreamingCall($channel, $method, $deserialize, $options) { + return new ClientStreamingCall($channel, $method, $deserialize, $options); + } + + public function ServerStreamingCall($channel, $method, $deserialize, $options) { + return new ServerStreamingCall($channel, $method, $deserialize, $options); + } + + public function BidiStreamingCall($channel, $method, $deserialize, $options) { + return new BidiStreamingCall($channel, $method, $deserialize, $options); + } +} + +class CallInvokerChangeRequestCall +{ + private $call; + + public function __construct($channel, $method, $deserialize, $options) + { + $this->call = new \Grpc\UnaryCall($channel, $method, $deserialize, $options); + } + + public function start($argument, $metadata, $options) { + $argument->setData('intercepted_unary_request'); + $this->call->start($argument, $metadata, $options); + } + + public function wait() + { + return $this->call->wait(); + } +} + +class CallInvokerTest extends PHPUnit_Framework_TestCase +{ + public function setUp() + { + $this->server = new Grpc\Server([]); + $this->port = $this->server->addHttp2Port('0.0.0.0:0'); + $this->server->start(); + } + + public function tearDown() + { + unset($this->server); + } + + public function testCreateDefaultCallInvoker() + { + $call_invoker = new \Grpc\DefaultCallInvoker(); + } + + public function testCreateCallInvoker() + { + $call_invoker = new CallInvokerUpdateChannel(); + } + + public function testCallInvokerAccessChannel() + { + $call_invoker = new CallInvokerUpdateChannel(); + $stub = new \Grpc\BaseStub('localhost:50051', + ['credentials' => \Grpc\ChannelCredentials::createInsecure(), + 'grpc_call_invoker' => $call_invoker]); + $this->assertEquals($call_invoker->getChannel()->getTarget(), 'localhost:50050'); + $call_invoker->getChannel()->close(); + } + + public function testClientChangeRequestCallInvoker() + { + $req_text = 'client_request'; + $call_invoker = new CallInvokerChangeRequest(); + $client = new CallInvokerClient('localhost:'.$this->port, [ + 'force_new' => true, + 'credentials' => Grpc\ChannelCredentials::createInsecure(), + 'grpc_call_invoker' => $call_invoker, + ]); + + $req = new CallInvokerSimpleRequest($req_text); + $unary_call = $client->UnaryCall($req); + + $event = $this->server->requestCall(); + $this->assertSame('/dummy_method', $event->method); + $server_call = $event->call; + $event = $server_call->startBatch([ + Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_STATUS_FROM_SERVER => [ + 'metadata' => [], + 'code' => Grpc\STATUS_OK, + 'details' => '', + ], + Grpc\OP_RECV_MESSAGE => true, + Grpc\OP_RECV_CLOSE_ON_SERVER => true, + ]); + $this->assertSame('intercepted_unary_request', $event->message); + $call_invoker->getChannel()->close(); + unset($unary_call); + unset($server_call); + } +} diff --git a/src/php/tests/unit_tests/InterceptorTest.php b/src/php/tests/unit_tests/InterceptorTest.php index d759ceba6a8..0ad49fc2bd1 100644 --- a/src/php/tests/unit_tests/InterceptorTest.php +++ b/src/php/tests/unit_tests/InterceptorTest.php @@ -24,6 +24,7 @@ require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php'); require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php'); require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php'); require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php'); +require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php'); require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php'); class SimpleRequest