Merge pull request #15850 from grpc/v1.13.x

Upmerge 1.13.x into master
pull/15852/head
Mehrdad Afshari 7 years ago committed by GitHub
commit 6cec581345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      src/core/lib/iomgr/exec_ctx.h
  2. 2
      src/core/lib/iomgr/executor.cc
  3. 2
      src/core/lib/iomgr/timer_manager.cc
  4. 76
      src/php/lib/Grpc/BaseStub.php
  5. 14
      src/php/lib/Grpc/Interceptor.php
  6. 20
      src/php/tests/unit_tests/InterceptorTest.php

@ -45,6 +45,9 @@ typedef struct grpc_combiner grpc_combiner;
/* The exec_ctx's thread is (potentially) owned by a call or channel: care
should be given to not delete said call/channel from this exec_ctx */
#define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
/* This exec ctx was initialized by an internal thread, and should not
be counted by fork handlers */
#define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx;
@ -93,7 +96,9 @@ class ExecCtx {
/** Parameterised Constructor */
ExecCtx(uintptr_t fl) : flags_(fl) {
grpc_core::Fork::IncExecCtxCount();
if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
grpc_core::Fork::IncExecCtxCount();
}
Set(this);
}
@ -102,7 +107,9 @@ class ExecCtx {
flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
Flush();
Set(last_exec_ctx_);
grpc_core::Fork::DecExecCtxCount();
if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
grpc_core::Fork::DecExecCtxCount();
}
}
/** Disallow copy and assignment operators */

@ -145,7 +145,7 @@ static void executor_thread(void* arg) {
thread_state* ts = static_cast<thread_state*>(arg);
gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
grpc_core::ExecCtx exec_ctx(0);
grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
size_t subtract_depth = 0;
for (;;) {

@ -265,7 +265,7 @@ static void timer_thread_cleanup(completed_thread* ct) {
static void timer_thread(void* completed_thread_ptr) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_core::ExecCtx exec_ctx(0);
grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
timer_main_loop();
timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));

@ -232,12 +232,13 @@ class BaseStub
*
* @return \Closure
*/
private function _GrpcUnaryUnary($channel, $deserialize)
private function _GrpcUnaryUnary($channel)
{
return function ($method,
$argument,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
$call = new UnaryCall(
$channel,
$method,
@ -268,11 +269,12 @@ class BaseStub
*
* @return \Closure
*/
private function _GrpcStreamUnary($channel, $deserialize)
private function _GrpcStreamUnary($channel)
{
return function ($method,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
$call = new ClientStreamingCall(
$channel,
$method,
@ -303,12 +305,13 @@ class BaseStub
*
* @return \Closure
*/
private function _GrpcUnaryStream($channel, $deserialize)
private function _GrpcUnaryStream($channel)
{
return function ($method,
$argument,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
$call = new ServerStreamingCall(
$channel,
$method,
@ -339,11 +342,12 @@ class BaseStub
*
* @return \Closure
*/
private function _GrpcStreamStream($channel, $deserialize)
private function _GrpcStreamStream($channel)
{
return function ($method,
$deserialize,
array $metadata = [],
array $options = []) use ($channel ,$deserialize) {
array $options = []) use ($channel) {
$call = new BidiStreamingCall(
$channel,
$method,
@ -375,23 +379,25 @@ class BaseStub
*
* @return \Closure
*/
private function _UnaryUnaryCallFactory($channel, $deserialize)
private function _UnaryUnaryCallFactory($channel)
{
if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
return function ($method,
$argument,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
return $channel->getInterceptor()->interceptUnaryUnary(
$method,
$argument,
$deserialize,
$metadata,
$options,
$this->_UnaryUnaryCallFactory($channel->getNext(), $deserialize)
$this->_UnaryUnaryCallFactory($channel->getNext())
);
};
}
return $this->_GrpcUnaryUnary($channel, $deserialize);
return $this->_GrpcUnaryUnary($channel);
}
/**
@ -402,23 +408,25 @@ class BaseStub
*
* @return \Closure
*/
private function _UnaryStreamCallFactory($channel, $deserialize)
private function _UnaryStreamCallFactory($channel)
{
if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
return function ($method,
$argument,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
return $channel->getInterceptor()->interceptUnaryStream(
$method,
$argument,
$deserialize,
$metadata,
$options,
$this->_UnaryStreamCallFactory($channel->getNext(), $deserialize)
$this->_UnaryStreamCallFactory($channel->getNext())
);
};
}
return $this->_GrpcUnaryStream($channel, $deserialize);
return $this->_GrpcUnaryStream($channel);
}
/**
@ -429,21 +437,23 @@ class BaseStub
*
* @return \Closure
*/
private function _StreamUnaryCallFactory($channel, $deserialize)
private function _StreamUnaryCallFactory($channel)
{
if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
return function ($method,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
return $channel->getInterceptor()->interceptStreamUnary(
$method,
$deserialize,
$metadata,
$options,
$this->_StreamUnaryCallFactory($channel->getNext(), $deserialize)
$this->_StreamUnaryCallFactory($channel->getNext())
);
};
}
return $this->_GrpcStreamUnary($channel, $deserialize);
return $this->_GrpcStreamUnary($channel);
}
/**
@ -454,21 +464,23 @@ class BaseStub
*
* @return \Closure
*/
private function _StreamStreamCallFactory($channel, $deserialize)
private function _StreamStreamCallFactory($channel)
{
if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
return function ($method,
$deserialize,
array $metadata = [],
array $options = []) use ($channel, $deserialize) {
array $options = []) use ($channel) {
return $channel->getInterceptor()->interceptStreamStream(
$method,
$deserialize,
$metadata,
$options,
$this->_StreamStreamCallFactory($channel->getNext(), $deserialize)
$this->_StreamStreamCallFactory($channel->getNext())
);
};
}
return $this->_GrpcStreamStream($channel, $deserialize);
return $this->_GrpcStreamStream($channel);
}
/* This class is intended to be subclassed by generated code, so
@ -493,8 +505,8 @@ class BaseStub
array $metadata = [],
array $options = []
) {
$call_factory = $this->_UnaryUnaryCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $argument, $metadata, $options);
$call_factory = $this->_UnaryUnaryCallFactory($this->channel);
$call = $call_factory($method, $argument, $deserialize, $metadata, $options);
return $call;
}
@ -516,8 +528,8 @@ class BaseStub
array $metadata = [],
array $options = []
) {
$call_factory = $this->_StreamUnaryCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $metadata, $options);
$call_factory = $this->_StreamUnaryCallFactory($this->channel);
$call = $call_factory($method, $deserialize, $metadata, $options);
return $call;
}
@ -541,8 +553,8 @@ class BaseStub
array $metadata = [],
array $options = []
) {
$call_factory = $this->_UnaryStreamCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $argument, $metadata, $options);
$call_factory = $this->_UnaryStreamCallFactory($this->channel);
$call = $call_factory($method, $argument, $deserialize, $metadata, $options);
return $call;
}
@ -563,8 +575,8 @@ class BaseStub
array $metadata = [],
array $options = []
) {
$call_factory = $this->_StreamStreamCallFactory($this->channel, $deserialize);
$call = $call_factory($method, $metadata, $options);
$call_factory = $this->_StreamStreamCallFactory($this->channel);
$call = $call_factory($method, $deserialize, $metadata, $options);
return $call;
}
}

@ -21,6 +21,8 @@ namespace Grpc;
/**
* Represents an interceptor that intercept RPC invocations before call starts.
* There is one proposal related to the argument $deserialize under the review.
* The proposal link is https://github.com/grpc/proposal/pull/86.
* This is an EXPERIMENTAL API.
*/
class Interceptor
@ -28,39 +30,43 @@ class Interceptor
public function interceptUnaryUnary(
$method,
$argument,
$deserialize,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $argument, $metadata, $options);
return $continuation($method, $argument, $deserialize, $metadata, $options);
}
public function interceptStreamUnary(
$method,
$deserialize,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $metadata, $options);
return $continuation($method, $deserialize, $metadata, $options);
}
public function interceptUnaryStream(
$method,
$argument,
$deserialize,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $argument, $metadata, $options);
return $continuation($method, $argument, $deserialize, $metadata, $options);
}
public function interceptStreamStream(
$method,
$deserialize,
array $metadata = [],
array $options = [],
$continuation
) {
return $continuation($method, $metadata, $options);
return $continuation($method, $deserialize, $metadata, $options);
}
/**

@ -94,17 +94,18 @@ class ChangeMetadataInterceptor extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
$deserialize,
array $metadata = [],
array $options = [],
$continuation)
{
$metadata["foo"] = array('interceptor_from_unary_request');
return $continuation($method, $argument, $metadata, $options);
return $continuation($method, $argument, $deserialize, $metadata, $options);
}
public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
public function interceptStreamUnary($method, $deserialize, array $metadata = [], array $options = [], $continuation)
{
$metadata["foo"] = array('interceptor_from_stream_request');
return $continuation($method, $metadata, $options);
return $continuation($method, $deserialize, $metadata, $options);
}
}
@ -112,6 +113,7 @@ class ChangeMetadataInterceptor2 extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
$deserialize,
array $metadata = [],
array $options = [],
$continuation)
@ -121,9 +123,10 @@ class ChangeMetadataInterceptor2 extends Grpc\Interceptor
} else {
$metadata["bar"] = array('interceptor_from_unary_request');
}
return $continuation($method, $argument, $metadata, $options);
return $continuation($method, $argument, $deserialize, $metadata, $options);
}
public function interceptStreamUnary($method,
$deserialize,
array $metadata = [],
array $options = [],
$continuation)
@ -133,7 +136,7 @@ class ChangeMetadataInterceptor2 extends Grpc\Interceptor
} else {
$metadata["bar"] = array('interceptor_from_stream_request');
}
return $continuation($method, $metadata, $options);
return $continuation($method, $deserialize, $metadata, $options);
}
}
@ -166,17 +169,18 @@ class ChangeRequestInterceptor extends Grpc\Interceptor
{
public function interceptUnaryUnary($method,
$argument,
$deserialize,
array $metadata = [],
array $options = [],
$continuation)
{
$argument->setData('intercepted_unary_request');
return $continuation($method, $argument, $metadata, $options);
return $continuation($method, $argument, $deserialize, $metadata, $options);
}
public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
public function interceptStreamUnary($method, $deserialize, array $metadata = [], array $options = [], $continuation)
{
return new ChangeRequestCall(
$continuation($method, $metadata, $options)
$continuation($method, $deserialize, $metadata, $options)
);
}
}

Loading…
Cancel
Save