Merge pull request #17981 from vjpai/app_exec_ctx

Instantiate application callback exec ctx's in a few places where needed
pull/17957/head
Vijay Pai 6 years ago committed by GitHub
commit a632af0298
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/channel_connectivity.cc
  2. 8
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  3. 2
      src/core/lib/iomgr/cfstream_handle.cc
  4. 1
      src/core/lib/security/credentials/jwt/jwt_credentials.cc
  5. 1
      src/core/lib/security/credentials/jwt/jwt_verifier.cc
  6. 1
      src/core/lib/security/credentials/plugin/plugin_credentials.cc
  7. 1
      src/core/lib/security/transport/server_auth_filter.cc

@ -35,6 +35,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_connectivity_state state;
GRPC_API_TRACE(
@ -202,6 +203,7 @@ void grpc_channel_watch_connectivity_state(
gpr_timespec deadline, grpc_completion_queue* cq, void* tag) {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
state_watcher* w = static_cast<state_watcher*>(gpr_malloc(sizeof(*w)));

@ -441,6 +441,7 @@ static void convert_cronet_array_to_metadata(
*/
static void on_failed(bidirectional_stream* stream, int net_error) {
gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
@ -467,6 +468,7 @@ static void on_failed(bidirectional_stream* stream, int net_error) {
*/
static void on_canceled(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
@ -493,6 +495,7 @@ static void on_canceled(bidirectional_stream* stream) {
*/
static void on_succeeded(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
@ -511,6 +514,7 @@ static void on_succeeded(bidirectional_stream* stream) {
*/
static void on_stream_ready(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
grpc_cronet_transport* t = s->curr_ct;
@ -541,6 +545,7 @@ static void on_response_headers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* headers,
const char* negotiated_protocol) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
@ -580,6 +585,7 @@ static void on_response_headers_received(
Cronet callback
*/
static void on_write_completed(bidirectional_stream* stream, const char* data) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
@ -598,6 +604,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) {
*/
static void on_read_completed(bidirectional_stream* stream, char* data,
int count) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
stream_obj* s = static_cast<stream_obj*>(stream->annotation);
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
@ -640,6 +647,7 @@ static void on_read_completed(bidirectional_stream* stream, char* data,
static void on_response_trailers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* trailers) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);

@ -52,6 +52,7 @@ CFStreamHandle* CFStreamHandle::CreateStreamHandle(
void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
CFStreamEventType type,
void* client_callback_info) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
if (grpc_tcp_trace.enabled()) {
@ -77,6 +78,7 @@ void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
CFStreamEventType type,
void* clientCallBackInfo) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
if (grpc_tcp_trace.enabled()) {

@ -174,6 +174,7 @@ grpc_call_credentials* grpc_service_account_jwt_access_credentials_create(
gpr_free(clean_json);
}
GPR_ASSERT(reserved == nullptr);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
return grpc_service_account_jwt_access_credentials_create_from_auth_json_key(
grpc_auth_json_key_create_from_string(json_key), token_lifetime)

@ -353,6 +353,7 @@ static verifier_cb_ctx* verifier_cb_ctx_create(
grpc_jwt_claims* claims, const char* audience, grpc_slice signature,
const char* signed_jwt, size_t signed_jwt_len, void* user_data,
grpc_jwt_verification_done_cb cb) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
verifier_cb_ctx* ctx =
static_cast<verifier_cb_ctx*>(gpr_zalloc(sizeof(verifier_cb_ctx)));

@ -114,6 +114,7 @@ static void plugin_md_request_metadata_ready(void* request,
grpc_status_code status,
const char* error_details) {
/* called from application code */
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_FINISHED |
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP);
grpc_plugin_credentials::pending_request* r =

@ -169,6 +169,7 @@ static void on_md_processing_done(
grpc_status_code status, const char* error_details) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// If the call was not cancelled while we were in flight, process the result.
if (gpr_atm_full_cas(&calld->state, static_cast<gpr_atm>(STATE_INIT),

Loading…
Cancel
Save