diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 252623d0c30..3b99de75382 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -265,6 +265,9 @@ PHP_METHOD(Call, startBatch) { HashTable *array_hash; HashPosition array_pointer; HashTable *status_hash; + HashTable *message_hash; + zval **message_value; + zval **message_flags; char *key; uint key_len; ulong index; @@ -319,13 +322,33 @@ PHP_METHOD(Call, startBatch) { metadata.metadata; break; case GRPC_OP_SEND_MESSAGE: - if (Z_TYPE_PP(value) != IS_STRING) { + if (Z_TYPE_PP(value) != IS_ARRAY) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected an array for send message", + 1 TSRMLS_CC); + goto cleanup; + } + message_hash = Z_ARRVAL_PP(value); + if (zend_hash_find(message_hash, "flags", sizeof("flags"), + (void **)&message_flags) == SUCCESS) { + if (Z_TYPE_PP(message_flags) != IS_LONG) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected an int for message flags", + 1 TSRMLS_CC); + } + ops[op_num].flags = Z_LVAL_PP(message_flags) & GRPC_WRITE_USED_MASK; + } + if (zend_hash_find(message_hash, "message", sizeof("message"), + (void **)&message_value) != SUCCESS || + Z_TYPE_PP(message_value) != IS_STRING) { zend_throw_exception(spl_ce_InvalidArgumentException, "Expected a string for send message", 1 TSRMLS_CC); + goto cleanup; } ops[op_num].data.send_message = - string_to_byte_buffer(Z_STRVAL_PP(value), Z_STRLEN_PP(value)); + string_to_byte_buffer(Z_STRVAL_PP(message_value), + Z_STRLEN_PP(message_value)); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: break; diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php index 35057224f8c..3fdaf2e4872 100644 --- a/src/php/lib/Grpc/AbstractCall.php +++ b/src/php/lib/Grpc/AbstractCall.php @@ -92,4 +92,20 @@ abstract class AbstractCall { } return call_user_func($this->deserialize, $value); } -} \ No newline at end of file + + /** + * Get the list of Grpc Write Flags + * @param array $options an array of options + * @return The list of Grpc Write Flags contained in the input + */ + protected static function getGrpcWriteFlags($options) { + $grpc_write_flags = []; + foreach ([WRITE_BUFFER_HINT, + WRITE_NO_COMPRESS] as $flag) { + if (in_array($flag, $options)) { + $grpc_write_flags[] = $flag; + } + } + return $grpc_write_flags; + } +} diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 2e980c5eed6..381b1143993 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -168,7 +168,8 @@ class BaseStub { public function _simpleRequest($method, $argument, callable $deserialize, - $metadata = array()) { + $metadata = array(), + $options = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); $call = new UnaryCall($this->channel, $method, $deserialize, $timeout); $jwt_aud_uri = $this->_get_jwt_aud_uri($method); @@ -177,7 +178,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($argument, $actual_metadata); + $call->start($argument, $actual_metadata, $options); return $call; } @@ -193,7 +194,6 @@ class BaseStub { * @return ClientStreamingSurfaceActiveCall The active call object */ public function _clientStreamRequest($method, - $arguments, callable $deserialize, $metadata = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); @@ -204,7 +204,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($arguments, $actual_metadata); + $call->start($actual_metadata); return $call; } @@ -221,7 +221,8 @@ class BaseStub { public function _serverStreamRequest($method, $argument, callable $deserialize, - $metadata = array()) { + $metadata = array(), + $options = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); $call = new ServerStreamingCall($this->channel, $method, $deserialize, $timeout); $jwt_aud_uri = $this->_get_jwt_aud_uri($method); @@ -230,7 +231,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($argument, $actual_metadata); + $call->start($argument, $actual_metadata, $options); return $call; } diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php index 76c642bef46..80b7a66a764 100644 --- a/src/php/lib/Grpc/BidiStreamingCall.php +++ b/src/php/lib/Grpc/BidiStreamingCall.php @@ -66,9 +66,14 @@ class BidiStreamingCall extends AbstractCall { * Write a single message to the server. This cannot be called after * writesDone is called. * @param ByteBuffer $data The data to write + * @param array $options an array of options */ - public function write($data) { - $this->call->startBatch([OP_SEND_MESSAGE => $data->serialize()]); + public function write($data, $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } + $this->call->startBatch([OP_SEND_MESSAGE => $message_array]); } /** @@ -86,7 +91,7 @@ class BidiStreamingCall extends AbstractCall { public function getStatus() { $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true - ]); + ]); return $status_event->status; } } \ No newline at end of file diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php index 61439d3f475..97c241a087d 100644 --- a/src/php/lib/Grpc/ClientStreamingCall.php +++ b/src/php/lib/Grpc/ClientStreamingCall.php @@ -40,15 +40,24 @@ namespace Grpc; class ClientStreamingCall extends AbstractCall { /** * Start the call. - * @param Traversable $arg_iter The iterator of arguments to send * @param array $metadata Metadata to send with the call, if applicable */ - public function start($arg_iter, $metadata = array()) { - $event = $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); - foreach($arg_iter as $arg) { - $this->call->startBatch([OP_SEND_MESSAGE => $arg->serialize()]); + public function start($metadata) { + $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); + } + + /** + * Write a single message to the server. This cannot be called after + * wait is called. + * @param ByteBuffer $data The data to write + * @param array $options an array of options + */ + public function write($data, $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; } - $this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->call->startBatch([OP_SEND_MESSAGE => $message_array]); } /** @@ -57,6 +66,7 @@ class ClientStreamingCall extends AbstractCall { */ public function wait() { $event = $this->call->startBatch([ + OP_SEND_CLOSE_FROM_CLIENT => true, OP_RECV_INITIAL_METADATA => true, OP_RECV_MESSAGE => true, OP_RECV_STATUS_ON_CLIENT => true]); diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php index 631c863345b..159561b43ab 100644 --- a/src/php/lib/Grpc/ServerStreamingCall.php +++ b/src/php/lib/Grpc/ServerStreamingCall.php @@ -40,14 +40,19 @@ namespace Grpc; class ServerStreamingCall extends AbstractCall { /** * Start the call - * @param $arg The argument to send + * @param $data The data to send * @param array $metadata Metadata to send with the call, if applicable + * @param array $options an array of options */ - public function start($arg, $metadata = array()) { + public function start($data, $metadata = array(), $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, - OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_MESSAGE => $message_array, OP_SEND_CLOSE_FROM_CLIENT => true]); $this->metadata = $event->metadata; } @@ -71,7 +76,7 @@ class ServerStreamingCall extends AbstractCall { public function getStatus() { $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true - ]); + ]); return $status_event->status; } -} \ No newline at end of file +} diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php index 97a10a40f49..5ca7d9ed652 100644 --- a/src/php/lib/Grpc/UnaryCall.php +++ b/src/php/lib/Grpc/UnaryCall.php @@ -40,14 +40,19 @@ namespace Grpc; class UnaryCall extends AbstractCall { /** * Start the call - * @param $arg The argument to send + * @param $data The data to send * @param array $metadata Metadata to send with the call, if applicable + * @param array $options an array of options */ - public function start($arg, $metadata = array()) { + public function start($data, $metadata = array(), $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, - OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_MESSAGE => $message_array, OP_SEND_CLOSE_FROM_CLIENT => true]); $this->metadata = $event->metadata; } @@ -62,4 +67,4 @@ class UnaryCall extends AbstractCall { OP_RECV_STATUS_ON_CLIENT => true]); return array($this->deserializeResponse($event->message), $event->status); } -} \ No newline at end of file +} diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index a368dd4ee02..531e80aa61c 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -79,15 +79,13 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { } public function testClientStreaming() { - $num_iter = function() { - for ($i = 0; $i < 7; $i++) { - $num = new math\Num(); - $num->setNum($i); - yield $num; - } - }; - $call = self::$client->Sum($num_iter()); + $call = self::$client->Sum(); $this->assertTrue(is_string($call->getPeer())); + for ($i = 0; $i < 7; $i++) { + $num = new math\Num(); + $num->setNum($i); + $call->write($num); + } list($response, $status) = $call->wait(); $this->assertSame(21, $response->getNum()); $this->assertSame(\Grpc\STATUS_OK, $status->code); diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index bd15ee43031..28782735c09 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -173,7 +173,11 @@ function clientStreaming($stub) { return $request; }, $request_lengths); - list($result, $status) = $stub->StreamingInputCall($requests)->wait(); + $call = $stub->StreamingInputCall(); + foreach ($requests as $request) { + $call->write($request); + } + list($result, $status) = $call->wait(); hardAssert($status->code === Grpc\STATUS_OK, 'Call did not complete successfully'); hardAssert($result->getAggregatedPayloadSize() === 74922, 'aggregated_payload_size was incorrect'); @@ -374,5 +378,6 @@ switch ($args['test_case']) { // messages are sent immediately after metadata is sent. There is // currently no way to cancel before messages are sent. default: + echo "Unsupported test case $args[test_case]\n"; exit(1); } diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 4c0cf91d512..bd464f939fc 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -91,6 +91,51 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ unset($server_call); } + public function testMessageWriteFlags() { + $deadline = Grpc\Timeval::infFuture(); + $req_text = 'message_write_flags_test'; + $status_text = 'xyz'; + $call = new Grpc\Call($this->channel, + 'dummy_method', + $deadline); + + $event = $call->startBatch([ + Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_MESSAGE => ['message' => $req_text, + 'flags' => Grpc\WRITE_NO_COMPRESS], + Grpc\OP_SEND_CLOSE_FROM_CLIENT => true + ]); + + $this->assertTrue($event->send_metadata); + $this->assertTrue($event->send_close); + + $event = $this->server->requestCall(); + $this->assertSame('dummy_method', $event->method); + $server_call = $event->call; + + $event = $server_call->startBatch([ + Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_STATUS_FROM_SERVER => [ + 'metadata' => [], + 'code' => Grpc\STATUS_OK, + 'details' => $status_text + ], + ]); + + $event = $call->startBatch([ + Grpc\OP_RECV_INITIAL_METADATA => true, + Grpc\OP_RECV_STATUS_ON_CLIENT => true + ]); + + $status = $event->status; + $this->assertSame([], $status->metadata); + $this->assertSame(Grpc\STATUS_OK, $status->code); + $this->assertSame($status_text, $status->details); + + unset($call); + unset($server_call); + } + public function testClientServerFullRequestResponse() { $deadline = Grpc\Timeval::infFuture(); $req_text = 'client_server_full_request_response'; @@ -104,7 +149,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true, - Grpc\OP_SEND_MESSAGE => $req_text + Grpc\OP_SEND_MESSAGE => ['message' => $req_text], ]); $this->assertTrue($event->send_metadata); @@ -117,7 +162,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], - Grpc\OP_SEND_MESSAGE => $reply_text, + Grpc\OP_SEND_MESSAGE => ['message' => $reply_text], Grpc\OP_SEND_STATUS_FROM_SERVER => [ 'metadata' => [], 'code' => Grpc\STATUS_OK, diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index 60341b983db..d7fca14a0d5 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -107,6 +107,53 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ unset($server_call); } + public function testMessageWriteFlags() { + $deadline = Grpc\Timeval::infFuture(); + $req_text = 'message_write_flags_test'; + $status_text = 'xyz'; + $call = new Grpc\Call($this->channel, + 'dummy_method', + $deadline, + $this->host_override); + + $event = $call->startBatch([ + Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_MESSAGE => ['message' => $req_text, + 'flags' => Grpc\WRITE_NO_COMPRESS], + Grpc\OP_SEND_CLOSE_FROM_CLIENT => true + ]); + + $this->assertTrue($event->send_metadata); + $this->assertTrue($event->send_close); + + $event = $this->server->requestCall(); + $this->assertSame('dummy_method', $event->method); + $server_call = $event->call; + + $event = $server_call->startBatch([ + Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_STATUS_FROM_SERVER => [ + 'metadata' => [], + 'code' => Grpc\STATUS_OK, + 'details' => $status_text + ], + ]); + + $event = $call->startBatch([ + Grpc\OP_RECV_INITIAL_METADATA => true, + Grpc\OP_RECV_STATUS_ON_CLIENT => true + ]); + + $this->assertSame([], $event->metadata); + $status = $event->status; + $this->assertSame([], $status->metadata); + $this->assertSame(Grpc\STATUS_OK, $status->code); + $this->assertSame($status_text, $status->details); + + unset($call); + unset($server_call); + } + public function testClientServerFullRequestResponse() { $deadline = Grpc\Timeval::infFuture(); $req_text = 'client_server_full_request_response'; @@ -121,7 +168,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_CLOSE_FROM_CLIENT => true, - Grpc\OP_SEND_MESSAGE => $req_text + Grpc\OP_SEND_MESSAGE => ['message' => $req_text] ]); $this->assertTrue($event->send_metadata); @@ -134,7 +181,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], - Grpc\OP_SEND_MESSAGE => $reply_text, + Grpc\OP_SEND_MESSAGE => ['message' => $reply_text], Grpc\OP_SEND_STATUS_FROM_SERVER => [ 'metadata' => [], 'code' => Grpc\STATUS_OK,