Merge pull request #15749 from ZhouyihaiDing/call_invoker

PHP: add call invoker
pull/15895/head
Zhouyihai Ding 7 years ago committed by GitHub
commit 2f924c6c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      src/php/lib/Grpc/BaseStub.php
  2. 33
      src/php/lib/Grpc/CallInvoker.php
  3. 47
      src/php/lib/Grpc/DefaultCallInvoker.php
  4. 227
      src/php/tests/unit_tests/CallInvokerTest.php
  5. 1
      src/php/tests/unit_tests/InterceptorTest.php

@ -28,6 +28,7 @@ class BaseStub
private $hostname;
private $hostname_override;
private $channel;
private $call_invoker;
// a callback function
private $update_metadata;
@ -58,6 +59,15 @@ class BaseStub
if (!empty($opts['grpc.ssl_target_name_override'])) {
$this->hostname_override = $opts['grpc.ssl_target_name_override'];
}
if (isset($opts['grpc_call_invoker'])) {
$this->call_invoker = $opts['grpc_call_invoker'];
unset($opts['grpc_call_invoker']);
$channel_opts = $this->updateOpts($opts);
// If the grpc_call_invoker is defined, use the channel created by the call invoker.
$this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts);
return;
}
$this->call_invoker = new DefaultCallInvoker();
if ($channel) {
if (!is_a($channel, 'Grpc\Channel') &&
!is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
@ -72,15 +82,7 @@ class BaseStub
$this->channel = static::getDefaultChannel($hostname, $opts);
}
/**
* Creates and returns the default Channel
*
* @param array $opts Channel constructor options
*
* @return Channel The channel
*/
public static function getDefaultChannel($hostname, array $opts)
{
private static function updateOpts($opts) {
$package_config = json_decode(
file_get_contents(dirname(__FILE__).'/../../composer.json'),
true
@ -97,6 +99,19 @@ class BaseStub
'required. Please see one of the '.
'ChannelCredentials::create methods');
}
return $opts;
}
/**
* Creates and returns the default Channel
*
* @param array $opts Channel constructor options
*
* @return Channel The channel
*/
public static function getDefaultChannel($hostname, array $opts)
{
$channel_opts = self::updateOpts($opts);
return new Channel($hostname, $opts);
}
@ -239,7 +254,7 @@ class BaseStub
$deserialize,
array $metadata = [],
array $options = []) use ($channel) {
$call = new UnaryCall(
$call = $this->call_invoker->UnaryCall(
$channel,
$method,
$deserialize,
@ -275,7 +290,7 @@ class BaseStub
$deserialize,
array $metadata = [],
array $options = []) use ($channel) {
$call = new ClientStreamingCall(
$call = $this->call_invoker->ClientStreamingCall(
$channel,
$method,
$deserialize,
@ -312,7 +327,7 @@ class BaseStub
$deserialize,
array $metadata = [],
array $options = []) use ($channel) {
$call = new ServerStreamingCall(
$call = $this->call_invoker->ServerStreamingCall(
$channel,
$method,
$deserialize,
@ -348,7 +363,7 @@ class BaseStub
$deserialize,
array $metadata = [],
array $options = []) use ($channel) {
$call = new BidiStreamingCall(
$call = $this->call_invoker->BidiStreamingCall(
$channel,
$method,
$deserialize,

@ -0,0 +1,33 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Grpc;
/**
* CallInvoker is used to pass the self defined channel into the stub,
* while intercept each RPC with the channel accessible.
* THIS IS AN EXPERIMENTAL API.
*/
interface CallInvoker
{
public function createChannelFactory($hostname, $opts);
public function UnaryCall($channel, $method, $deserialize, $options);
public function ClientStreamingCall($channel, $method, $deserialize, $options);
public function ServerStreamingCall($channel, $method, $deserialize, $options);
public function BidiStreamingCall($channel, $method, $deserialize, $options);
}

@ -0,0 +1,47 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Grpc;
/**
* Default call invoker in the gRPC stub.
* THIS IS AN EXPERIMENTAL API.
*/
class DefaultCallInvoker implements CallInvoker
{
public function createChannelFactory($hostname, $opts) {
return new Channel($hostname, $opts);
}
public function UnaryCall($channel, $method, $deserialize, $options) {
return new UnaryCall($channel, $method, $deserialize, $options);
}
public function ClientStreamingCall($channel, $method, $deserialize, $options) {
return new ClientStreamingCall($channel, $method, $deserialize, $options);
}
public function ServerStreamingCall($channel, $method, $deserialize, $options) {
return new ServerStreamingCall($channel, $method, $deserialize, $options);
}
public function BidiStreamingCall($channel, $method, $deserialize, $options) {
return new BidiStreamingCall($channel, $method, $deserialize, $options);
}
}

@ -0,0 +1,227 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Interface exported by the server.
*/
require_once(dirname(__FILE__).'/../../lib/Grpc/BaseStub.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/DefaultCallInvoker.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
class CallInvokerSimpleRequest
{
private $data;
public function __construct($data)
{
$this->data = $data;
}
public function setData($data)
{
$this->data = $data;
}
public function serializeToString()
{
return $this->data;
}
}
class CallInvokerClient extends Grpc\BaseStub
{
/**
* @param string $hostname hostname
* @param array $opts channel options
* @param Channel|InterceptorChannel $channel (optional) re-use channel object
*/
public function __construct($hostname, $opts, $channel = null)
{
parent::__construct($hostname, $opts, $channel);
}
/**
* A simple RPC.
* @param SimpleRequest $argument input argument
* @param array $metadata metadata
* @param array $options call options
*/
public function UnaryCall(
CallInvokerSimpleRequest $argument,
$metadata = [],
$options = []
) {
return $this->_simpleRequest(
'/dummy_method',
$argument,
[],
$metadata,
$options
);
}
}
class CallInvokerUpdateChannel implements \Grpc\CallInvoker
{
private $channel;
public function getChannel() {
return $this->channel;
}
public function createChannelFactory($hostname, $opts) {
$this->channel = new \Grpc\Channel('localhost:50050', $opts);
return $this->channel;
}
public function UnaryCall($channel, $method, $deserialize, $options) {
return new UnaryCall($channel, $method, $deserialize, $options);
}
public function ClientStreamingCall($channel, $method, $deserialize, $options) {
return new ClientStreamingCall($channel, $method, $deserialize, $options);
}
public function ServerStreamingCall($channel, $method, $deserialize, $options) {
return new ServerStreamingCall($channel, $method, $deserialize, $options);
}
public function BidiStreamingCall($channel, $method, $deserialize, $options) {
return new BidiStreamingCall($channel, $method, $deserialize, $options);
}
}
class CallInvokerChangeRequest implements \Grpc\CallInvoker
{
private $channel;
public function getChannel() {
return $this->channel;
}
public function createChannelFactory($hostname, $opts) {
$this->channel = new \Grpc\Channel($hostname, $opts);
return $this->channel;
}
public function UnaryCall($channel, $method, $deserialize, $options) {
return new CallInvokerChangeRequestCall($channel, $method, $deserialize, $options);
}
public function ClientStreamingCall($channel, $method, $deserialize, $options) {
return new ClientStreamingCall($channel, $method, $deserialize, $options);
}
public function ServerStreamingCall($channel, $method, $deserialize, $options) {
return new ServerStreamingCall($channel, $method, $deserialize, $options);
}
public function BidiStreamingCall($channel, $method, $deserialize, $options) {
return new BidiStreamingCall($channel, $method, $deserialize, $options);
}
}
class CallInvokerChangeRequestCall
{
private $call;
public function __construct($channel, $method, $deserialize, $options)
{
$this->call = new \Grpc\UnaryCall($channel, $method, $deserialize, $options);
}
public function start($argument, $metadata, $options) {
$argument->setData('intercepted_unary_request');
$this->call->start($argument, $metadata, $options);
}
public function wait()
{
return $this->call->wait();
}
}
class CallInvokerTest extends PHPUnit_Framework_TestCase
{
public function setUp()
{
$this->server = new Grpc\Server([]);
$this->port = $this->server->addHttp2Port('0.0.0.0:0');
$this->server->start();
}
public function tearDown()
{
unset($this->server);
}
public function testCreateDefaultCallInvoker()
{
$call_invoker = new \Grpc\DefaultCallInvoker();
}
public function testCreateCallInvoker()
{
$call_invoker = new CallInvokerUpdateChannel();
}
public function testCallInvokerAccessChannel()
{
$call_invoker = new CallInvokerUpdateChannel();
$stub = new \Grpc\BaseStub('localhost:50051',
['credentials' => \Grpc\ChannelCredentials::createInsecure(),
'grpc_call_invoker' => $call_invoker]);
$this->assertEquals($call_invoker->getChannel()->getTarget(), 'localhost:50050');
$call_invoker->getChannel()->close();
}
public function testClientChangeRequestCallInvoker()
{
$req_text = 'client_request';
$call_invoker = new CallInvokerChangeRequest();
$client = new CallInvokerClient('localhost:'.$this->port, [
'force_new' => true,
'credentials' => Grpc\ChannelCredentials::createInsecure(),
'grpc_call_invoker' => $call_invoker,
]);
$req = new CallInvokerSimpleRequest($req_text);
$unary_call = $client->UnaryCall($req);
$event = $this->server->requestCall();
$this->assertSame('/dummy_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$call_invoker->getChannel()->close();
unset($unary_call);
unset($server_call);
}
}

@ -24,6 +24,7 @@ require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php');
require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
class SimpleRequest

Loading…
Cancel
Save