|
|
|
@ -207,6 +207,7 @@ class WriteOptions { |
|
|
|
|
|
|
|
|
|
namespace internal { |
|
|
|
|
|
|
|
|
|
/// Internal methods for setting the state
|
|
|
|
|
class InternalInterceptorBatchMethods |
|
|
|
|
: public experimental::InterceptorBatchMethods { |
|
|
|
|
public: |
|
|
|
@ -247,7 +248,6 @@ class CallNoOp { |
|
|
|
|
void FinishOp(bool* status) {} |
|
|
|
|
void SetInterceptionHookPoint( |
|
|
|
|
InternalInterceptorBatchMethods* interceptor_methods) {} |
|
|
|
|
|
|
|
|
|
void SetFinishInterceptionHookPoint( |
|
|
|
|
InternalInterceptorBatchMethods* interceptor_methods) {} |
|
|
|
|
void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { |
|
|
|
@ -846,6 +846,8 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { |
|
|
|
|
// Only the client can hijack when sending down initial metadata
|
|
|
|
|
GPR_CODEGEN_ASSERT(!reverse_ && ops_ != nullptr && |
|
|
|
|
call_->client_rpc_info() != nullptr); |
|
|
|
|
// It is illegal to call Hijack twice
|
|
|
|
|
GPR_CODEGEN_ASSERT(!ran_hijacking_interceptor_); |
|
|
|
|
auto* rpc_info = call_->client_rpc_info(); |
|
|
|
|
rpc_info->hijacked_ = true; |
|
|
|
|
rpc_info->hijacked_interceptor_ = curr_iteration_; |
|
|
|
@ -960,6 +962,8 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { |
|
|
|
|
// This needs to be set before interceptors are run
|
|
|
|
|
void SetCall(Call* call) { call_ = call; } |
|
|
|
|
|
|
|
|
|
// This needs to be set before interceptors are run using RunInterceptors().
|
|
|
|
|
// Alternatively, RunInterceptors(std::function<void(void)> f) can be used.
|
|
|
|
|
void SetCallOpSetInterface(CallOpSetInterface* ops) { ops_ = ops; } |
|
|
|
|
|
|
|
|
|
// Returns true if no interceptors are run. This should be used only by
|
|
|
|
@ -969,6 +973,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { |
|
|
|
|
// ContinueFinalizeOpsAfterInterception will be called. Note that neither of
|
|
|
|
|
// them is invoked if there were no interceptors registered.
|
|
|
|
|
bool RunInterceptors() { |
|
|
|
|
GPR_CODEGEN_ASSERT(ops_); |
|
|
|
|
auto* client_rpc_info = call_->client_rpc_info(); |
|
|
|
|
if (client_rpc_info != nullptr) { |
|
|
|
|
if (client_rpc_info->interceptors_.size() == 0) { |
|
|
|
@ -990,9 +995,10 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { |
|
|
|
|
|
|
|
|
|
// Returns true if no interceptors are run. Returns false otherwise if there
|
|
|
|
|
// are interceptors registered. After the interceptors are done running \a f
|
|
|
|
|
// will
|
|
|
|
|
// be invoked. This is to be used only by BaseAsyncRequest and SyncRequest.
|
|
|
|
|
// will be invoked. This is to be used only by BaseAsyncRequest and
|
|
|
|
|
// SyncRequest.
|
|
|
|
|
bool RunInterceptors(std::function<void(void)> f) { |
|
|
|
|
// This is used only by the server for initial call request
|
|
|
|
|
GPR_CODEGEN_ASSERT(reverse_ == true); |
|
|
|
|
GPR_CODEGEN_ASSERT(call_->client_rpc_info() == nullptr); |
|
|
|
|
auto* server_rpc_info = call_->server_rpc_info(); |
|
|
|
@ -1013,8 +1019,6 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { |
|
|
|
|
} else { |
|
|
|
|
if (rpc_info->hijacked_) { |
|
|
|
|
curr_iteration_ = rpc_info->hijacked_interceptor_; |
|
|
|
|
// gpr_log(GPR_ERROR, "running from the hijacked %d",
|
|
|
|
|
// rpc_info->hijacked_interceptor_);
|
|
|
|
|
} else { |
|
|
|
|
curr_iteration_ = rpc_info->interceptors_.size() - 1; |
|
|
|
|
} |
|
|
|
@ -1173,14 +1177,12 @@ class CallOpSet : public CallOpSetInterface, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FillOps(Call* call) override { |
|
|
|
|
// gpr_log(GPR_ERROR, "filling ops %p", this);
|
|
|
|
|
done_intercepting_ = false; |
|
|
|
|
g_core_codegen_interface->grpc_call_ref(call->call()); |
|
|
|
|
call_ = |
|
|
|
|
*call; // It's fine to create a copy of call since it's just pointers
|
|
|
|
|
|
|
|
|
|
if (RunInterceptors()) { |
|
|
|
|
// gpr_log(GPR_ERROR, "no interceptors on send path");
|
|
|
|
|
ContinueFillOpsAfterInterception(); |
|
|
|
|
} else { |
|
|
|
|
// After the interceptors are run, ContinueFillOpsAfterInterception will
|
|
|
|
@ -1193,7 +1195,6 @@ class CallOpSet : public CallOpSetInterface, |
|
|
|
|
// We have already finished intercepting and filling in the results. This
|
|
|
|
|
// round trip from the core needed to be made because interceptors were
|
|
|
|
|
// run
|
|
|
|
|
// gpr_log(GPR_ERROR, "done intercepting");
|
|
|
|
|
*tag = return_tag_; |
|
|
|
|
*status = saved_status_; |
|
|
|
|
g_core_codegen_interface->grpc_call_unref(call_.call()); |
|
|
|
@ -1207,14 +1208,11 @@ class CallOpSet : public CallOpSetInterface, |
|
|
|
|
this->Op5::FinishOp(status); |
|
|
|
|
this->Op6::FinishOp(status); |
|
|
|
|
saved_status_ = *status; |
|
|
|
|
// gpr_log(GPR_ERROR, "done finish ops");
|
|
|
|
|
if (RunInterceptorsPostRecv()) { |
|
|
|
|
*tag = return_tag_; |
|
|
|
|
g_core_codegen_interface->grpc_call_unref(call_.call()); |
|
|
|
|
// gpr_log(GPR_ERROR, "no interceptors");
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
// gpr_log(GPR_ERROR, "running interceptors");
|
|
|
|
|
// Interceptors are going to be run, so we can't return the tag just yet.
|
|
|
|
|
// After the interceptors are run, ContinueFinalizeResultAfterInterception
|
|
|
|
|
return false; |
|
|
|
@ -1252,8 +1250,6 @@ class CallOpSet : public CallOpSetInterface, |
|
|
|
|
this->Op4::AddOp(ops, &nops); |
|
|
|
|
this->Op5::AddOp(ops, &nops); |
|
|
|
|
this->Op6::AddOp(ops, &nops); |
|
|
|
|
// gpr_log(GPR_ERROR, "going to start call batch %p with %lu ops", this,
|
|
|
|
|
// nops);
|
|
|
|
|
GPR_CODEGEN_ASSERT(GRPC_CALL_OK == |
|
|
|
|
g_core_codegen_interface->grpc_call_start_batch( |
|
|
|
|
call_.call(), ops, nops, cq_tag(), nullptr)); |
|
|
|
@ -1263,7 +1259,6 @@ class CallOpSet : public CallOpSetInterface, |
|
|
|
|
// path
|
|
|
|
|
void ContinueFinalizeResultAfterInterception() override { |
|
|
|
|
done_intercepting_ = true; |
|
|
|
|
// gpr_log(GPR_ERROR, "going to start call batch %p for dummy tag", this);
|
|
|
|
|
GPR_CODEGEN_ASSERT(GRPC_CALL_OK == |
|
|
|
|
g_core_codegen_interface->grpc_call_start_batch( |
|
|
|
|
call_.call(), nullptr, 0, cq_tag(), nullptr)); |
|
|
|
|