php: support per message compression disable

pull/3181/head
Stanley Cheung 9 years ago
parent dea2648a6a
commit 3ab8e79b01
  1. 27
      src/php/ext/grpc/call.c
  2. 18
      src/php/lib/Grpc/AbstractCall.php
  3. 13
      src/php/lib/Grpc/BaseStub.php
  4. 11
      src/php/lib/Grpc/BidiStreamingCall.php
  5. 22
      src/php/lib/Grpc/ClientStreamingCall.php
  6. 15
      src/php/lib/Grpc/ServerStreamingCall.php
  7. 13
      src/php/lib/Grpc/UnaryCall.php
  8. 14
      src/php/tests/generated_code/AbstractGeneratedCodeTest.php
  9. 7
      src/php/tests/interop/interop_client.php
  10. 49
      src/php/tests/unit_tests/EndToEndTest.php
  11. 51
      src/php/tests/unit_tests/SecureEndToEndTest.php

@ -265,6 +265,9 @@ PHP_METHOD(Call, startBatch) {
HashTable *array_hash;
HashPosition array_pointer;
HashTable *status_hash;
HashTable *message_hash;
zval **message_value;
zval **message_flags;
char *key;
uint key_len;
ulong index;
@ -319,13 +322,33 @@ PHP_METHOD(Call, startBatch) {
metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
if (Z_TYPE_PP(value) != IS_STRING) {
if (Z_TYPE_PP(value) != IS_ARRAY) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Expected an array for send message",
1 TSRMLS_CC);
goto cleanup;
}
message_hash = Z_ARRVAL_PP(value);
if (zend_hash_find(message_hash, "flags", sizeof("flags"),
(void **)&message_flags) == SUCCESS) {
if (Z_TYPE_PP(message_flags) != IS_LONG) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Expected an int for message flags",
1 TSRMLS_CC);
}
ops[op_num].flags = Z_LVAL_PP(message_flags) & GRPC_WRITE_USED_MASK;
}
if (zend_hash_find(message_hash, "message", sizeof("message"),
(void **)&message_value) != SUCCESS ||
Z_TYPE_PP(message_value) != IS_STRING) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Expected a string for send message",
1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_message =
string_to_byte_buffer(Z_STRVAL_PP(value), Z_STRLEN_PP(value));
string_to_byte_buffer(Z_STRVAL_PP(message_value),
Z_STRLEN_PP(message_value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;

@ -92,4 +92,20 @@ abstract class AbstractCall {
}
return call_user_func($this->deserialize, $value);
}
}
/**
* Get the list of Grpc Write Flags
* @param array $options an array of options
* @return The list of Grpc Write Flags contained in the input
*/
protected static function getGrpcWriteFlags($options) {
$grpc_write_flags = [];
foreach ([WRITE_BUFFER_HINT,
WRITE_NO_COMPRESS] as $flag) {
if (in_array($flag, $options)) {
$grpc_write_flags[] = $flag;
}
}
return $grpc_write_flags;
}
}

@ -168,7 +168,8 @@ class BaseStub {
public function _simpleRequest($method,
$argument,
callable $deserialize,
$metadata = array()) {
$metadata = array(),
$options = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
$call = new UnaryCall($this->channel, $method, $deserialize, $timeout);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
@ -177,7 +178,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
$call->start($argument, $actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}
@ -193,7 +194,6 @@ class BaseStub {
* @return ClientStreamingSurfaceActiveCall The active call object
*/
public function _clientStreamRequest($method,
$arguments,
callable $deserialize,
$metadata = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
@ -204,7 +204,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
$call->start($arguments, $actual_metadata);
$call->start($actual_metadata);
return $call;
}
@ -221,7 +221,8 @@ class BaseStub {
public function _serverStreamRequest($method,
$argument,
callable $deserialize,
$metadata = array()) {
$metadata = array(),
$options = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
$call = new ServerStreamingCall($this->channel, $method, $deserialize, $timeout);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
@ -230,7 +231,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
$call->start($argument, $actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}

@ -66,9 +66,14 @@ class BidiStreamingCall extends AbstractCall {
* Write a single message to the server. This cannot be called after
* writesDone is called.
* @param ByteBuffer $data The data to write
* @param array $options an array of options
*/
public function write($data) {
$this->call->startBatch([OP_SEND_MESSAGE => $data->serialize()]);
public function write($data, $options = array()) {
$message_array = ['message' => $data->serialize()];
if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
$message_array['flags'] = $grpc_write_flags;
}
$this->call->startBatch([OP_SEND_MESSAGE => $message_array]);
}
/**
@ -86,7 +91,7 @@ class BidiStreamingCall extends AbstractCall {
public function getStatus() {
$status_event = $this->call->startBatch([
OP_RECV_STATUS_ON_CLIENT => true
]);
]);
return $status_event->status;
}
}

@ -40,15 +40,24 @@ namespace Grpc;
class ClientStreamingCall extends AbstractCall {
/**
* Start the call.
* @param Traversable $arg_iter The iterator of arguments to send
* @param array $metadata Metadata to send with the call, if applicable
*/
public function start($arg_iter, $metadata = array()) {
$event = $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]);
foreach($arg_iter as $arg) {
$this->call->startBatch([OP_SEND_MESSAGE => $arg->serialize()]);
public function start($metadata) {
$this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]);
}
/**
* Write a single message to the server. This cannot be called after
* wait is called.
* @param ByteBuffer $data The data to write
* @param array $options an array of options
*/
public function write($data, $options = array()) {
$message_array = ['message' => $data->serialize()];
if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
$message_array['flags'] = $grpc_write_flags;
}
$this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]);
$this->call->startBatch([OP_SEND_MESSAGE => $message_array]);
}
/**
@ -57,6 +66,7 @@ class ClientStreamingCall extends AbstractCall {
*/
public function wait() {
$event = $this->call->startBatch([
OP_SEND_CLOSE_FROM_CLIENT => true,
OP_RECV_INITIAL_METADATA => true,
OP_RECV_MESSAGE => true,
OP_RECV_STATUS_ON_CLIENT => true]);

@ -40,14 +40,19 @@ namespace Grpc;
class ServerStreamingCall extends AbstractCall {
/**
* Start the call
* @param $arg The argument to send
* @param $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
* @param array $options an array of options
*/
public function start($arg, $metadata = array()) {
public function start($data, $metadata = array(), $options = array()) {
$message_array = ['message' => $data->serialize()];
if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
$message_array['flags'] = $grpc_write_flags;
}
$event = $this->call->startBatch([
OP_SEND_INITIAL_METADATA => $metadata,
OP_RECV_INITIAL_METADATA => true,
OP_SEND_MESSAGE => $arg->serialize(),
OP_SEND_MESSAGE => $message_array,
OP_SEND_CLOSE_FROM_CLIENT => true]);
$this->metadata = $event->metadata;
}
@ -71,7 +76,7 @@ class ServerStreamingCall extends AbstractCall {
public function getStatus() {
$status_event = $this->call->startBatch([
OP_RECV_STATUS_ON_CLIENT => true
]);
]);
return $status_event->status;
}
}
}

@ -40,14 +40,19 @@ namespace Grpc;
class UnaryCall extends AbstractCall {
/**
* Start the call
* @param $arg The argument to send
* @param $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
* @param array $options an array of options
*/
public function start($arg, $metadata = array()) {
public function start($data, $metadata = array(), $options = array()) {
$message_array = ['message' => $data->serialize()];
if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
$message_array['flags'] = $grpc_write_flags;
}
$event = $this->call->startBatch([
OP_SEND_INITIAL_METADATA => $metadata,
OP_RECV_INITIAL_METADATA => true,
OP_SEND_MESSAGE => $arg->serialize(),
OP_SEND_MESSAGE => $message_array,
OP_SEND_CLOSE_FROM_CLIENT => true]);
$this->metadata = $event->metadata;
}
@ -62,4 +67,4 @@ class UnaryCall extends AbstractCall {
OP_RECV_STATUS_ON_CLIENT => true]);
return array($this->deserializeResponse($event->message), $event->status);
}
}
}

@ -79,15 +79,13 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
}
public function testClientStreaming() {
$num_iter = function() {
for ($i = 0; $i < 7; $i++) {
$num = new math\Num();
$num->setNum($i);
yield $num;
}
};
$call = self::$client->Sum($num_iter());
$call = self::$client->Sum();
$this->assertTrue(is_string($call->getPeer()));
for ($i = 0; $i < 7; $i++) {
$num = new math\Num();
$num->setNum($i);
$call->write($num);
}
list($response, $status) = $call->wait();
$this->assertSame(21, $response->getNum());
$this->assertSame(\Grpc\STATUS_OK, $status->code);

@ -173,7 +173,11 @@ function clientStreaming($stub) {
return $request;
}, $request_lengths);
list($result, $status) = $stub->StreamingInputCall($requests)->wait();
$call = $stub->StreamingInputCall();
foreach ($requests as $request) {
$call->write($request);
}
list($result, $status) = $call->wait();
hardAssert($status->code === Grpc\STATUS_OK, 'Call did not complete successfully');
hardAssert($result->getAggregatedPayloadSize() === 74922,
'aggregated_payload_size was incorrect');
@ -374,5 +378,6 @@ switch ($args['test_case']) {
// messages are sent immediately after metadata is sent. There is
// currently no way to cancel before messages are sent.
default:
echo "Unsupported test case $args[test_case]\n";
exit(1);
}

@ -91,6 +91,51 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
unset($server_call);
}
public function testMessageWriteFlags() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'message_write_flags_test';
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => ['message' => $req_text,
'flags' => Grpc\WRITE_NO_COMPRESS],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
$event = $this->server->requestCall();
$this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
]);
$event = $call->startBatch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true
]);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
unset($call);
unset($server_call);
}
public function testClientServerFullRequestResponse() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'client_server_full_request_response';
@ -104,7 +149,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
Grpc\OP_SEND_MESSAGE => $req_text
Grpc\OP_SEND_MESSAGE => ['message' => $req_text],
]);
$this->assertTrue($event->send_metadata);
@ -117,7 +162,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => $reply_text,
Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,

@ -107,6 +107,53 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
unset($server_call);
}
public function testMessageWriteFlags() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'message_write_flags_test';
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline,
$this->host_override);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => ['message' => $req_text,
'flags' => Grpc\WRITE_NO_COMPRESS],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
$event = $this->server->requestCall();
$this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
]);
$event = $call->startBatch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true
]);
$this->assertSame([], $event->metadata);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
unset($call);
unset($server_call);
}
public function testClientServerFullRequestResponse() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'client_server_full_request_response';
@ -121,7 +168,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
Grpc\OP_SEND_MESSAGE => $req_text
Grpc\OP_SEND_MESSAGE => ['message' => $req_text]
]);
$this->assertTrue($event->send_metadata);
@ -134,7 +181,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => $reply_text,
Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,

Loading…
Cancel
Save