@ -20,6 +20,7 @@
# include <string.h>
# include <cstddef>
# include <string>
# include <utility>
@ -82,6 +83,10 @@ typedef struct {
grpc_metadata_array c2p_initial_metadata ;
grpc_metadata_array p2s_initial_metadata ;
grpc_core : : Mutex * initial_metadata_mu ;
bool p2s_initial_metadata_received ABSL_GUARDED_BY ( initial_metadata_mu ) ;
grpc_op * deferred_trailing_metadata_op ABSL_GUARDED_BY ( initial_metadata_mu ) ;
grpc_byte_buffer * c2p_msg ;
grpc_byte_buffer * p2s_msg ;
@ -166,6 +171,13 @@ static void unrefpc(proxy_call* pc, const char* /*reason*/) {
grpc_metadata_array_destroy ( & pc - > p2s_initial_metadata ) ;
grpc_metadata_array_destroy ( & pc - > p2s_trailing_metadata ) ;
grpc_slice_unref ( pc - > p2s_status_details ) ;
{
grpc_core : : MutexLock lock ( pc - > initial_metadata_mu ) ;
if ( pc - > deferred_trailing_metadata_op ! = nullptr ) {
gpr_free ( pc - > deferred_trailing_metadata_op ) ;
}
}
delete pc - > initial_metadata_mu ;
gpr_free ( pc ) ;
}
}
@ -179,25 +191,45 @@ static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
unrefpc ( pc , " on_c2p_sent_initial_metadata " ) ;
}
static void on_c2p_sent_status ( void * arg , int /*success*/ ) {
proxy_call * pc = static_cast < proxy_call * > ( arg ) ;
unrefpc ( pc , " on_c2p_sent_status " ) ;
}
static void on_p2s_recv_initial_metadata ( void * arg , int /*success*/ ) {
proxy_call * pc = static_cast < proxy_call * > ( arg ) ;
grpc_op op ;
grpc_call_error err ;
memset ( & op , 0 , sizeof ( op ) ) ;
if ( ! pc - > proxy - > shutdown & & ! grpc_call_is_trailers_only ( pc - > p2s ) ) {
if ( ! pc - > proxy - > shutdown ) {
if ( ! grpc_call_is_trailers_only ( pc - > p2s ) ) {
op . op = GRPC_OP_SEND_INITIAL_METADATA ;
op . flags = 0 ;
op . reserved = nullptr ;
op . data . send_initial_metadata . count = pc - > p2s_initial_metadata . count ;
op . data . send_initial_metadata . metadata = pc - > p2s_initial_metadata . metadata ;
op . data . send_initial_metadata . metadata =
pc - > p2s_initial_metadata . metadata ;
refpc ( pc , " on_c2p_sent_initial_metadata " ) ;
err = grpc_call_start_batch ( pc - > c2p , & op , 1 ,
new_closure ( on_c2p_sent_initial_metadata , pc ) ,
nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
}
grpc_op * deferred_trailing_metadata_op = nullptr ;
{
grpc_core : : MutexLock lock ( pc - > initial_metadata_mu ) ;
// Start the batch without the mutex held, just in case.
// This will be nullptr if the trailing metadata has not yet been seen.
deferred_trailing_metadata_op = pc - > deferred_trailing_metadata_op ;
pc - > p2s_initial_metadata_received = true ;
}
if ( deferred_trailing_metadata_op ! = nullptr ) {
refpc ( pc , " on_c2p_sent_status " ) ;
err = grpc_call_start_batch ( pc - > c2p , deferred_trailing_metadata_op , 1 ,
new_closure ( on_c2p_sent_status , pc ) , nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
}
}
unrefpc ( pc , " on_p2s_recv_initial_metadata " ) ;
}
@ -308,11 +340,6 @@ static void on_p2s_recv_msg(void* arg, int success) {
unrefpc ( pc , " on_p2s_recv_msg " ) ;
}
static void on_c2p_sent_status ( void * arg , int /*success*/ ) {
proxy_call * pc = static_cast < proxy_call * > ( arg ) ;
unrefpc ( pc , " on_c2p_sent_status " ) ;
}
static void on_p2s_status ( void * arg , int success ) {
proxy_call * pc = static_cast < proxy_call * > ( arg ) ;
grpc_op op [ 2 ] ; // Possibly send empty initial metadata also if trailers-only
@ -340,11 +367,30 @@ static void on_p2s_status(void* arg, int success) {
op [ op_count ] . data . send_status_from_server . status_details =
& pc - > p2s_status_details ;
op_count + + ;
// TODO(ctiller): The current core implementation requires initial
// metadata batches to be started *after* initial metadata batches have
// been completed. The C++ Callback API does this accounting too, for
// example.
//
// This entire fixture will need a redesign when the batch API goes away.
bool op_deferred = false ;
{
grpc_core : : MutexLock lock ( pc - > initial_metadata_mu ) ;
if ( ! pc - > p2s_initial_metadata_received ) {
op_deferred = true ;
pc - > deferred_trailing_metadata_op =
static_cast < grpc_op * > ( gpr_malloc ( sizeof ( op ) ) ) ;
memcpy ( pc - > deferred_trailing_metadata_op , & op , sizeof ( op ) ) ;
}
}
if ( ! op_deferred ) {
refpc ( pc , " on_c2p_sent_status " ) ;
err = grpc_call_start_batch ( pc - > c2p , op , op_count ,
new_closure ( on_c2p_sent_status , pc ) , nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
}
}
unrefpc ( pc , " on_p2s_status " ) ;
}
@ -365,6 +411,12 @@ static void on_new_call(void* arg, int success) {
memset ( pc , 0 , sizeof ( * pc ) ) ;
pc - > proxy = proxy ;
std : : swap ( pc - > c2p_initial_metadata , proxy - > new_call_metadata ) ;
pc - > initial_metadata_mu = new grpc_core : : Mutex ( ) ;
{
grpc_core : : MutexLock lock ( pc - > initial_metadata_mu ) ;
pc - > p2s_initial_metadata_received = false ;
pc - > deferred_trailing_metadata_op = nullptr ;
}
pc - > c2p = proxy - > new_call ;
pc - > p2s = grpc_channel_create_call (
proxy - > client , pc - > c2p , GRPC_PROPAGATE_DEFAULTS , proxy - > cq ,
@ -374,6 +426,7 @@ static void on_new_call(void* arg, int success) {
op . reserved = nullptr ;
// Proxy: receive initial metadata from the server
op . op = GRPC_OP_RECV_INITIAL_METADATA ;
op . flags = 0 ;
op . data . recv_initial_metadata . recv_initial_metadata =
@ -384,6 +437,7 @@ static void on_new_call(void* arg, int success) {
nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
// Proxy: send initial metadata to the server
op . op = GRPC_OP_SEND_INITIAL_METADATA ;
op . flags = 0 ;
op . data . send_initial_metadata . count = pc - > c2p_initial_metadata . count ;
@ -394,6 +448,7 @@ static void on_new_call(void* arg, int success) {
nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
// Client: receive message from the proxy
op . op = GRPC_OP_RECV_MESSAGE ;
op . flags = 0 ;
op . data . recv_message . recv_message = & pc - > c2p_msg ;
@ -402,6 +457,7 @@ static void on_new_call(void* arg, int success) {
new_closure ( on_c2p_recv_msg , pc ) , nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
// Proxy: receive message from the server
op . op = GRPC_OP_RECV_MESSAGE ;
op . flags = 0 ;
op . data . recv_message . recv_message = & pc - > p2s_msg ;
@ -410,6 +466,7 @@ static void on_new_call(void* arg, int success) {
new_closure ( on_p2s_recv_msg , pc ) , nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
// Proxy: receive status from the server
op . op = GRPC_OP_RECV_STATUS_ON_CLIENT ;
op . flags = 0 ;
op . data . recv_status_on_client . trailing_metadata =
@ -421,6 +478,7 @@ static void on_new_call(void* arg, int success) {
nullptr ) ;
CHECK_EQ ( err , GRPC_CALL_OK ) ;
// Client: receive close-ack from the proxy
op . op = GRPC_OP_RECV_CLOSE_ON_SERVER ;
op . flags = 0 ;
op . data . recv_close_on_server . cancelled = & pc - > c2p_server_cancelled ;