|
|
|
@ -40,18 +40,68 @@ |
|
|
|
|
if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \
|
|
|
|
|
} while (0) |
|
|
|
|
|
|
|
|
|
static grpc_slice g_empty_slice; |
|
|
|
|
static grpc_slice g_fake_path_key; |
|
|
|
|
static grpc_slice g_fake_path_value; |
|
|
|
|
static grpc_slice g_fake_auth_key; |
|
|
|
|
static grpc_slice g_fake_auth_value; |
|
|
|
|
namespace { |
|
|
|
|
grpc_slice g_empty_slice; |
|
|
|
|
grpc_slice g_fake_path_key; |
|
|
|
|
grpc_slice g_fake_path_value; |
|
|
|
|
grpc_slice g_fake_auth_key; |
|
|
|
|
grpc_slice g_fake_auth_value; |
|
|
|
|
|
|
|
|
|
struct inproc_stream; |
|
|
|
|
bool cancel_stream_locked(inproc_stream* s, grpc_error* error); |
|
|
|
|
void op_state_machine(void* arg, grpc_error* error); |
|
|
|
|
void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
|
|
|
|
bool is_initial); |
|
|
|
|
grpc_error* fill_in_metadata(inproc_stream* s, |
|
|
|
|
const grpc_metadata_batch* metadata, |
|
|
|
|
uint32_t flags, grpc_metadata_batch* out_md, |
|
|
|
|
uint32_t* outflags, bool* markfilled); |
|
|
|
|
|
|
|
|
|
struct shared_mu { |
|
|
|
|
shared_mu() { |
|
|
|
|
// Share one lock between both sides since both sides get affected
|
|
|
|
|
gpr_mu_init(&mu); |
|
|
|
|
gpr_ref_init(&refs, 2); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
gpr_refcount refs; |
|
|
|
|
} shared_mu; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct inproc_transport { |
|
|
|
|
inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, |
|
|
|
|
bool is_client) |
|
|
|
|
: mu(mu), is_client(is_client) { |
|
|
|
|
base.vtable = vtable; |
|
|
|
|
// Start each side of transport with 2 refs since they each have a ref
|
|
|
|
|
// to the other
|
|
|
|
|
gpr_ref_init(&refs, 2); |
|
|
|
|
grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY, |
|
|
|
|
is_client ? "inproc_client" : "inproc_server"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~inproc_transport() { |
|
|
|
|
grpc_connectivity_state_destroy(&connectivity); |
|
|
|
|
if (gpr_unref(&mu->refs)) { |
|
|
|
|
gpr_free(mu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ref() { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_transport %p", this); |
|
|
|
|
gpr_ref(&refs); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void unref() { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_transport %p", this); |
|
|
|
|
if (!gpr_unref(&refs)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); |
|
|
|
|
this->~inproc_transport(); |
|
|
|
|
gpr_free(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct inproc_transport { |
|
|
|
|
grpc_transport base; |
|
|
|
|
shared_mu* mu; |
|
|
|
|
gpr_refcount refs; |
|
|
|
@ -60,128 +110,174 @@ typedef struct inproc_transport { |
|
|
|
|
void (*accept_stream_cb)(void* user_data, grpc_transport* transport, |
|
|
|
|
const void* server_data); |
|
|
|
|
void* accept_stream_data; |
|
|
|
|
bool is_closed; |
|
|
|
|
bool is_closed = false; |
|
|
|
|
struct inproc_transport* other_side; |
|
|
|
|
struct inproc_stream* stream_list; |
|
|
|
|
} inproc_transport; |
|
|
|
|
struct inproc_stream* stream_list = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct inproc_stream { |
|
|
|
|
inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount, |
|
|
|
|
const void* server_data, gpr_arena* arena) |
|
|
|
|
: t(t), refs(refcount), arena(arena) { |
|
|
|
|
// Ref this stream right now for ctor and list.
|
|
|
|
|
ref("inproc_init_stream:init"); |
|
|
|
|
ref("inproc_init_stream:list"); |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch_init(&to_read_initial_md); |
|
|
|
|
grpc_metadata_batch_init(&to_read_trailing_md); |
|
|
|
|
GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_metadata_batch_init(&write_buffer_initial_md); |
|
|
|
|
grpc_metadata_batch_init(&write_buffer_trailing_md); |
|
|
|
|
|
|
|
|
|
stream_list_prev = nullptr; |
|
|
|
|
gpr_mu_lock(&t->mu->mu); |
|
|
|
|
stream_list_next = t->stream_list; |
|
|
|
|
if (t->stream_list) { |
|
|
|
|
t->stream_list->stream_list_prev = this; |
|
|
|
|
} |
|
|
|
|
t->stream_list = this; |
|
|
|
|
gpr_mu_unlock(&t->mu->mu); |
|
|
|
|
|
|
|
|
|
if (!server_data) { |
|
|
|
|
t->ref(); |
|
|
|
|
inproc_transport* st = t->other_side; |
|
|
|
|
st->ref(); |
|
|
|
|
other_side = nullptr; // will get filled in soon
|
|
|
|
|
// Pass the client-side stream address to the server-side for a ref
|
|
|
|
|
ref("inproc_init_stream:clt"); // ref it now on behalf of server
|
|
|
|
|
// side to avoid destruction
|
|
|
|
|
INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", |
|
|
|
|
st->accept_stream_cb, st->accept_stream_data); |
|
|
|
|
(*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this); |
|
|
|
|
} else { |
|
|
|
|
// This is the server-side and is being called through accept_stream_cb
|
|
|
|
|
inproc_stream* cs = (inproc_stream*)server_data; |
|
|
|
|
other_side = cs; |
|
|
|
|
// Ref the server-side stream on behalf of the client now
|
|
|
|
|
ref("inproc_init_stream:srv"); |
|
|
|
|
|
|
|
|
|
// Now we are about to affect the other side, so lock the transport
|
|
|
|
|
// to make sure that it doesn't get destroyed
|
|
|
|
|
gpr_mu_lock(&t->mu->mu); |
|
|
|
|
cs->other_side = this; |
|
|
|
|
// Now transfer from the other side's write_buffer if any to the to_read
|
|
|
|
|
// buffer
|
|
|
|
|
if (cs->write_buffer_initial_md_filled) { |
|
|
|
|
fill_in_metadata(this, &cs->write_buffer_initial_md, |
|
|
|
|
cs->write_buffer_initial_md_flags, &to_read_initial_md, |
|
|
|
|
&to_read_initial_md_flags, &to_read_initial_md_filled); |
|
|
|
|
deadline = GPR_MIN(deadline, cs->write_buffer_deadline); |
|
|
|
|
grpc_metadata_batch_clear(&cs->write_buffer_initial_md); |
|
|
|
|
cs->write_buffer_initial_md_filled = false; |
|
|
|
|
} |
|
|
|
|
if (cs->write_buffer_trailing_md_filled) { |
|
|
|
|
fill_in_metadata(this, &cs->write_buffer_trailing_md, 0, |
|
|
|
|
&to_read_trailing_md, nullptr, |
|
|
|
|
&to_read_trailing_md_filled); |
|
|
|
|
grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); |
|
|
|
|
cs->write_buffer_trailing_md_filled = false; |
|
|
|
|
} |
|
|
|
|
if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { |
|
|
|
|
cancel_other_error = cs->write_buffer_cancel_error; |
|
|
|
|
cs->write_buffer_cancel_error = GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&t->mu->mu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~inproc_stream() { |
|
|
|
|
GRPC_ERROR_UNREF(write_buffer_cancel_error); |
|
|
|
|
GRPC_ERROR_UNREF(cancel_self_error); |
|
|
|
|
GRPC_ERROR_UNREF(cancel_other_error); |
|
|
|
|
|
|
|
|
|
if (recv_inited) { |
|
|
|
|
grpc_slice_buffer_destroy_internal(&recv_message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t->unref(); |
|
|
|
|
|
|
|
|
|
if (closure_at_destroy) { |
|
|
|
|
GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) |
|
|
|
|
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) |
|
|
|
|
#else |
|
|
|
|
#define STREAM_REF(refs, reason) grpc_stream_ref(refs) |
|
|
|
|
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) |
|
|
|
|
#endif |
|
|
|
|
void ref(const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); |
|
|
|
|
STREAM_REF(refs, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void unref(const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); |
|
|
|
|
STREAM_UNREF(refs, reason); |
|
|
|
|
} |
|
|
|
|
#undef STREAM_REF |
|
|
|
|
#undef STREAM_UNREF |
|
|
|
|
|
|
|
|
|
typedef struct inproc_stream { |
|
|
|
|
inproc_transport* t; |
|
|
|
|
grpc_metadata_batch to_read_initial_md; |
|
|
|
|
uint32_t to_read_initial_md_flags; |
|
|
|
|
bool to_read_initial_md_filled; |
|
|
|
|
uint32_t to_read_initial_md_flags = 0; |
|
|
|
|
bool to_read_initial_md_filled = false; |
|
|
|
|
grpc_metadata_batch to_read_trailing_md; |
|
|
|
|
bool to_read_trailing_md_filled; |
|
|
|
|
bool ops_needed; |
|
|
|
|
bool op_closure_scheduled; |
|
|
|
|
bool to_read_trailing_md_filled = false; |
|
|
|
|
bool ops_needed = false; |
|
|
|
|
bool op_closure_scheduled = false; |
|
|
|
|
grpc_closure op_closure; |
|
|
|
|
// Write buffer used only during gap at init time when client-side
|
|
|
|
|
// stream is set up but server side stream is not yet set up
|
|
|
|
|
grpc_metadata_batch write_buffer_initial_md; |
|
|
|
|
bool write_buffer_initial_md_filled; |
|
|
|
|
uint32_t write_buffer_initial_md_flags; |
|
|
|
|
grpc_millis write_buffer_deadline; |
|
|
|
|
bool write_buffer_initial_md_filled = false; |
|
|
|
|
uint32_t write_buffer_initial_md_flags = 0; |
|
|
|
|
grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; |
|
|
|
|
grpc_metadata_batch write_buffer_trailing_md; |
|
|
|
|
bool write_buffer_trailing_md_filled; |
|
|
|
|
grpc_error* write_buffer_cancel_error; |
|
|
|
|
bool write_buffer_trailing_md_filled = false; |
|
|
|
|
grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
|
|
struct inproc_stream* other_side; |
|
|
|
|
bool other_side_closed; // won't talk anymore
|
|
|
|
|
bool write_buffer_other_side_closed; // on hold
|
|
|
|
|
bool other_side_closed = false; // won't talk anymore
|
|
|
|
|
bool write_buffer_other_side_closed = false; // on hold
|
|
|
|
|
grpc_stream_refcount* refs; |
|
|
|
|
grpc_closure* closure_at_destroy; |
|
|
|
|
grpc_closure* closure_at_destroy = nullptr; |
|
|
|
|
|
|
|
|
|
gpr_arena* arena; |
|
|
|
|
|
|
|
|
|
grpc_transport_stream_op_batch* send_message_op; |
|
|
|
|
grpc_transport_stream_op_batch* send_trailing_md_op; |
|
|
|
|
grpc_transport_stream_op_batch* recv_initial_md_op; |
|
|
|
|
grpc_transport_stream_op_batch* recv_message_op; |
|
|
|
|
grpc_transport_stream_op_batch* recv_trailing_md_op; |
|
|
|
|
grpc_transport_stream_op_batch* send_message_op = nullptr; |
|
|
|
|
grpc_transport_stream_op_batch* send_trailing_md_op = nullptr; |
|
|
|
|
grpc_transport_stream_op_batch* recv_initial_md_op = nullptr; |
|
|
|
|
grpc_transport_stream_op_batch* recv_message_op = nullptr; |
|
|
|
|
grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr; |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer recv_message; |
|
|
|
|
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream; |
|
|
|
|
bool recv_inited; |
|
|
|
|
bool recv_inited = false; |
|
|
|
|
|
|
|
|
|
bool initial_md_sent; |
|
|
|
|
bool trailing_md_sent; |
|
|
|
|
bool initial_md_recvd; |
|
|
|
|
bool trailing_md_recvd; |
|
|
|
|
bool initial_md_sent = false; |
|
|
|
|
bool trailing_md_sent = false; |
|
|
|
|
bool initial_md_recvd = false; |
|
|
|
|
bool trailing_md_recvd = false; |
|
|
|
|
|
|
|
|
|
bool closed; |
|
|
|
|
bool closed = false; |
|
|
|
|
|
|
|
|
|
grpc_error* cancel_self_error; |
|
|
|
|
grpc_error* cancel_other_error; |
|
|
|
|
grpc_error* cancel_self_error = GRPC_ERROR_NONE; |
|
|
|
|
grpc_error* cancel_other_error = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
|
|
grpc_millis deadline; |
|
|
|
|
grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; |
|
|
|
|
|
|
|
|
|
bool listed; |
|
|
|
|
bool listed = true; |
|
|
|
|
struct inproc_stream* stream_list_prev; |
|
|
|
|
struct inproc_stream* stream_list_next; |
|
|
|
|
} inproc_stream; |
|
|
|
|
|
|
|
|
|
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); |
|
|
|
|
static void op_state_machine(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
static void ref_transport(inproc_transport* t) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_transport %p", t); |
|
|
|
|
gpr_ref(&t->refs); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void really_destroy_transport(inproc_transport* t) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t); |
|
|
|
|
grpc_connectivity_state_destroy(&t->connectivity); |
|
|
|
|
if (gpr_unref(&t->mu->refs)) { |
|
|
|
|
gpr_free(t->mu); |
|
|
|
|
} |
|
|
|
|
gpr_free(t); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void unref_transport(inproc_transport* t) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_transport %p", t); |
|
|
|
|
if (gpr_unref(&t->refs)) { |
|
|
|
|
really_destroy_transport(t); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) |
|
|
|
|
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) |
|
|
|
|
#else |
|
|
|
|
#define STREAM_REF(refs, reason) grpc_stream_ref(refs) |
|
|
|
|
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
static void ref_stream(inproc_stream* s, const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason); |
|
|
|
|
STREAM_REF(s->refs, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void unref_stream(inproc_stream* s, const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason); |
|
|
|
|
STREAM_UNREF(s->refs, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void really_destroy_stream(inproc_stream* s) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s); |
|
|
|
|
|
|
|
|
|
GRPC_ERROR_UNREF(s->write_buffer_cancel_error); |
|
|
|
|
GRPC_ERROR_UNREF(s->cancel_self_error); |
|
|
|
|
GRPC_ERROR_UNREF(s->cancel_other_error); |
|
|
|
|
|
|
|
|
|
if (s->recv_inited) { |
|
|
|
|
grpc_slice_buffer_destroy_internal(&s->recv_message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
unref_transport(s->t); |
|
|
|
|
|
|
|
|
|
if (s->closure_at_destroy) { |
|
|
|
|
GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
|
|
|
|
bool is_initial) { |
|
|
|
|
void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
|
|
|
|
bool is_initial) { |
|
|
|
|
for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr; |
|
|
|
|
md = md->next) { |
|
|
|
|
char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); |
|
|
|
@ -193,10 +289,10 @@ static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error* fill_in_metadata(inproc_stream* s, |
|
|
|
|
const grpc_metadata_batch* metadata, |
|
|
|
|
uint32_t flags, grpc_metadata_batch* out_md, |
|
|
|
|
uint32_t* outflags, bool* markfilled) { |
|
|
|
|
grpc_error* fill_in_metadata(inproc_stream* s, |
|
|
|
|
const grpc_metadata_batch* metadata, |
|
|
|
|
uint32_t flags, grpc_metadata_batch* out_md, |
|
|
|
|
uint32_t* outflags, bool* markfilled) { |
|
|
|
|
if (grpc_inproc_trace.enabled()) { |
|
|
|
|
log_metadata(metadata, s->t->is_client, outflags != nullptr); |
|
|
|
|
} |
|
|
|
@ -221,109 +317,16 @@ static grpc_error* fill_in_metadata(inproc_stream* s, |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int init_stream(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_stream_refcount* refcount, const void* server_data, |
|
|
|
|
gpr_arena* arena) { |
|
|
|
|
int init_stream(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_stream_refcount* refcount, const void* server_data, |
|
|
|
|
gpr_arena* arena) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data); |
|
|
|
|
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
|
|
|
|
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
|
|
|
|
s->arena = arena; |
|
|
|
|
|
|
|
|
|
s->refs = refcount; |
|
|
|
|
// Ref this stream right now
|
|
|
|
|
ref_stream(s, "inproc_init_stream:init"); |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch_init(&s->to_read_initial_md); |
|
|
|
|
s->to_read_initial_md_flags = 0; |
|
|
|
|
s->to_read_initial_md_filled = false; |
|
|
|
|
grpc_metadata_batch_init(&s->to_read_trailing_md); |
|
|
|
|
s->to_read_trailing_md_filled = false; |
|
|
|
|
grpc_metadata_batch_init(&s->write_buffer_initial_md); |
|
|
|
|
s->write_buffer_initial_md_flags = 0; |
|
|
|
|
s->write_buffer_initial_md_filled = false; |
|
|
|
|
grpc_metadata_batch_init(&s->write_buffer_trailing_md); |
|
|
|
|
s->write_buffer_trailing_md_filled = false; |
|
|
|
|
s->ops_needed = false; |
|
|
|
|
s->op_closure_scheduled = false; |
|
|
|
|
GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
s->t = t; |
|
|
|
|
s->closure_at_destroy = nullptr; |
|
|
|
|
s->other_side_closed = false; |
|
|
|
|
|
|
|
|
|
s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd = |
|
|
|
|
s->trailing_md_recvd = false; |
|
|
|
|
|
|
|
|
|
s->closed = false; |
|
|
|
|
|
|
|
|
|
s->cancel_self_error = GRPC_ERROR_NONE; |
|
|
|
|
s->cancel_other_error = GRPC_ERROR_NONE; |
|
|
|
|
s->write_buffer_cancel_error = GRPC_ERROR_NONE; |
|
|
|
|
s->deadline = GRPC_MILLIS_INF_FUTURE; |
|
|
|
|
s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; |
|
|
|
|
|
|
|
|
|
s->stream_list_prev = nullptr; |
|
|
|
|
gpr_mu_lock(&t->mu->mu); |
|
|
|
|
s->listed = true; |
|
|
|
|
ref_stream(s, "inproc_init_stream:list"); |
|
|
|
|
s->stream_list_next = t->stream_list; |
|
|
|
|
if (t->stream_list) { |
|
|
|
|
t->stream_list->stream_list_prev = s; |
|
|
|
|
} |
|
|
|
|
t->stream_list = s; |
|
|
|
|
gpr_mu_unlock(&t->mu->mu); |
|
|
|
|
|
|
|
|
|
if (!server_data) { |
|
|
|
|
ref_transport(t); |
|
|
|
|
inproc_transport* st = t->other_side; |
|
|
|
|
ref_transport(st); |
|
|
|
|
s->other_side = nullptr; // will get filled in soon
|
|
|
|
|
// Pass the client-side stream address to the server-side for a ref
|
|
|
|
|
ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server
|
|
|
|
|
// side to avoid destruction
|
|
|
|
|
INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb, |
|
|
|
|
st->accept_stream_data); |
|
|
|
|
(*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s); |
|
|
|
|
} else { |
|
|
|
|
// This is the server-side and is being called through accept_stream_cb
|
|
|
|
|
inproc_stream* cs = (inproc_stream*)server_data; |
|
|
|
|
s->other_side = cs; |
|
|
|
|
// Ref the server-side stream on behalf of the client now
|
|
|
|
|
ref_stream(s, "inproc_init_stream:srv"); |
|
|
|
|
|
|
|
|
|
// Now we are about to affect the other side, so lock the transport
|
|
|
|
|
// to make sure that it doesn't get destroyed
|
|
|
|
|
gpr_mu_lock(&s->t->mu->mu); |
|
|
|
|
cs->other_side = s; |
|
|
|
|
// Now transfer from the other side's write_buffer if any to the to_read
|
|
|
|
|
// buffer
|
|
|
|
|
if (cs->write_buffer_initial_md_filled) { |
|
|
|
|
fill_in_metadata(s, &cs->write_buffer_initial_md, |
|
|
|
|
cs->write_buffer_initial_md_flags, |
|
|
|
|
&s->to_read_initial_md, &s->to_read_initial_md_flags, |
|
|
|
|
&s->to_read_initial_md_filled); |
|
|
|
|
s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); |
|
|
|
|
grpc_metadata_batch_clear(&cs->write_buffer_initial_md); |
|
|
|
|
cs->write_buffer_initial_md_filled = false; |
|
|
|
|
} |
|
|
|
|
if (cs->write_buffer_trailing_md_filled) { |
|
|
|
|
fill_in_metadata(s, &cs->write_buffer_trailing_md, 0, |
|
|
|
|
&s->to_read_trailing_md, nullptr, |
|
|
|
|
&s->to_read_trailing_md_filled); |
|
|
|
|
grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); |
|
|
|
|
cs->write_buffer_trailing_md_filled = false; |
|
|
|
|
} |
|
|
|
|
if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { |
|
|
|
|
s->cancel_other_error = cs->write_buffer_cancel_error; |
|
|
|
|
cs->write_buffer_cancel_error = GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&s->t->mu->mu); |
|
|
|
|
} |
|
|
|
|
new (gs) inproc_stream(t, refcount, server_data, arena); |
|
|
|
|
return 0; // return value is not important
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void close_stream_locked(inproc_stream* s) { |
|
|
|
|
void close_stream_locked(inproc_stream* s) { |
|
|
|
|
if (!s->closed) { |
|
|
|
|
// Release the metadata that we would have written out
|
|
|
|
|
grpc_metadata_batch_destroy(&s->write_buffer_initial_md); |
|
|
|
@ -341,21 +344,21 @@ static void close_stream_locked(inproc_stream* s) { |
|
|
|
|
n->stream_list_prev = p; |
|
|
|
|
} |
|
|
|
|
s->listed = false; |
|
|
|
|
unref_stream(s, "close_stream:list"); |
|
|
|
|
s->unref("close_stream:list"); |
|
|
|
|
} |
|
|
|
|
s->closed = true; |
|
|
|
|
unref_stream(s, "close_stream:closing"); |
|
|
|
|
s->unref("close_stream:closing"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This function means that we are done talking/listening to the other side
|
|
|
|
|
static void close_other_side_locked(inproc_stream* s, const char* reason) { |
|
|
|
|
void close_other_side_locked(inproc_stream* s, const char* reason) { |
|
|
|
|
if (s->other_side != nullptr) { |
|
|
|
|
// First release the metadata that came from the other side's arena
|
|
|
|
|
grpc_metadata_batch_destroy(&s->to_read_initial_md); |
|
|
|
|
grpc_metadata_batch_destroy(&s->to_read_trailing_md); |
|
|
|
|
|
|
|
|
|
unref_stream(s->other_side, reason); |
|
|
|
|
s->other_side->unref(reason); |
|
|
|
|
s->other_side_closed = true; |
|
|
|
|
s->other_side = nullptr; |
|
|
|
|
} else if (!s->other_side_closed) { |
|
|
|
@ -367,9 +370,9 @@ static void close_other_side_locked(inproc_stream* s, const char* reason) { |
|
|
|
|
// this stream_op_batch is only one of the pending operations for this
|
|
|
|
|
// stream. This is called when one of the pending operations for the stream
|
|
|
|
|
// is done and about to be NULLed out
|
|
|
|
|
static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, |
|
|
|
|
grpc_transport_stream_op_batch* op, |
|
|
|
|
const char* msg) { |
|
|
|
|
void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, |
|
|
|
|
grpc_transport_stream_op_batch* op, |
|
|
|
|
const char* msg) { |
|
|
|
|
int is_sm = static_cast<int>(op == s->send_message_op); |
|
|
|
|
int is_stm = static_cast<int>(op == s->send_trailing_md_op); |
|
|
|
|
// TODO(vjpai): We should not consider the recv ops here, since they
|
|
|
|
@ -386,8 +389,7 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_schedule_op_closure_locked(inproc_stream* s, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
if (s && s->ops_needed && !s->op_closure_scheduled) { |
|
|
|
|
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error)); |
|
|
|
|
s->op_closure_scheduled = true; |
|
|
|
@ -395,7 +397,7 @@ static void maybe_schedule_op_closure_locked(inproc_stream* s, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fail_helper_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
void fail_helper_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); |
|
|
|
|
// If we're failing this side, we need to make sure that
|
|
|
|
|
// we also send or have already sent trailing metadata
|
|
|
|
@ -525,8 +527,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
// that the incoming byte stream's next() call will always return
|
|
|
|
|
// synchronously. That assumption is true today but may not always be
|
|
|
|
|
// true in the future.
|
|
|
|
|
static void message_transfer_locked(inproc_stream* sender, |
|
|
|
|
inproc_stream* receiver) { |
|
|
|
|
void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { |
|
|
|
|
size_t remaining = |
|
|
|
|
sender->send_message_op->payload->send_message.send_message->length(); |
|
|
|
|
if (receiver->recv_inited) { |
|
|
|
@ -572,7 +573,7 @@ static void message_transfer_locked(inproc_stream* sender, |
|
|
|
|
sender->send_message_op = nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void op_state_machine(void* arg, grpc_error* error) { |
|
|
|
|
void op_state_machine(void* arg, grpc_error* error) { |
|
|
|
|
// This function gets called when we have contents in the unprocessed reads
|
|
|
|
|
// Get what we want based on our ops wanted
|
|
|
|
|
// Schedule our appropriate closures
|
|
|
|
@ -847,7 +848,7 @@ done: |
|
|
|
|
GRPC_ERROR_UNREF(new_err); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
bool ret = false; // was the cancel accepted
|
|
|
|
|
INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error)); |
|
|
|
|
if (s->cancel_self_error == GRPC_ERROR_NONE) { |
|
|
|
@ -900,10 +901,10 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void do_nothing(void* arg, grpc_error* error) {} |
|
|
|
|
void do_nothing(void* arg, grpc_error* error) {} |
|
|
|
|
|
|
|
|
|
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_transport_stream_op_batch* op) { |
|
|
|
|
void perform_stream_op(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_transport_stream_op_batch* op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); |
|
|
|
|
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
|
|
|
|
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
|
|
|
|
@ -1083,7 +1084,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void close_transport_locked(inproc_transport* t) { |
|
|
|
|
void close_transport_locked(inproc_transport* t) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); |
|
|
|
|
grpc_connectivity_state_set( |
|
|
|
|
&t->connectivity, GRPC_CHANNEL_SHUTDOWN, |
|
|
|
@ -1103,7 +1104,7 @@ static void close_transport_locked(inproc_transport* t) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { |
|
|
|
|
void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { |
|
|
|
|
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); |
|
|
|
|
gpr_mu_lock(&t->mu->mu); |
|
|
|
@ -1136,39 +1137,64 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { |
|
|
|
|
gpr_mu_unlock(&t->mu->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void destroy_stream(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
void destroy_stream(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); |
|
|
|
|
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
|
|
|
|
s->closure_at_destroy = then_schedule_closure; |
|
|
|
|
really_destroy_stream(s); |
|
|
|
|
s->~inproc_stream(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void destroy_transport(grpc_transport* gt) { |
|
|
|
|
void destroy_transport(grpc_transport* gt) { |
|
|
|
|
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
|
|
|
|
INPROC_LOG(GPR_INFO, "destroy_transport %p", t); |
|
|
|
|
gpr_mu_lock(&t->mu->mu); |
|
|
|
|
close_transport_locked(t); |
|
|
|
|
gpr_mu_unlock(&t->mu->mu); |
|
|
|
|
unref_transport(t->other_side); |
|
|
|
|
unref_transport(t); |
|
|
|
|
t->other_side->unref(); |
|
|
|
|
t->unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* INTEGRATION GLUE |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void set_pollset(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_pollset* pollset) { |
|
|
|
|
void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) { |
|
|
|
|
// Nothing to do here
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_pollset_set* pollset_set) { |
|
|
|
|
void set_pollset_set(grpc_transport* gt, grpc_stream* gs, |
|
|
|
|
grpc_pollset_set* pollset_set) { |
|
|
|
|
// Nothing to do here
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } |
|
|
|
|
grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } |
|
|
|
|
|
|
|
|
|
const grpc_transport_vtable inproc_vtable = { |
|
|
|
|
sizeof(inproc_stream), "inproc", init_stream, |
|
|
|
|
set_pollset, set_pollset_set, perform_stream_op, |
|
|
|
|
perform_transport_op, destroy_stream, destroy_transport, |
|
|
|
|
get_endpoint}; |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Main inproc transport functions |
|
|
|
|
*/ |
|
|
|
|
void inproc_transports_create(grpc_transport** server_transport, |
|
|
|
|
const grpc_channel_args* server_args, |
|
|
|
|
grpc_transport** client_transport, |
|
|
|
|
const grpc_channel_args* client_args) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "inproc_transports_create"); |
|
|
|
|
shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); |
|
|
|
|
inproc_transport* st = new (gpr_malloc(sizeof(*st))) |
|
|
|
|
inproc_transport(&inproc_vtable, mu, /*is_client=*/false); |
|
|
|
|
inproc_transport* ct = new (gpr_malloc(sizeof(*ct))) |
|
|
|
|
inproc_transport(&inproc_vtable, mu, /*is_client=*/true); |
|
|
|
|
st->other_side = ct; |
|
|
|
|
ct->other_side = st; |
|
|
|
|
*server_transport = reinterpret_cast<grpc_transport*>(st); |
|
|
|
|
*client_transport = reinterpret_cast<grpc_transport*>(ct); |
|
|
|
|
} |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* GLOBAL INIT AND DESTROY |
|
|
|
@ -1190,48 +1216,6 @@ void grpc_inproc_transport_init(void) { |
|
|
|
|
g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_transport_vtable inproc_vtable = { |
|
|
|
|
sizeof(inproc_stream), "inproc", init_stream, |
|
|
|
|
set_pollset, set_pollset_set, perform_stream_op, |
|
|
|
|
perform_transport_op, destroy_stream, destroy_transport, |
|
|
|
|
get_endpoint}; |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Main inproc transport functions |
|
|
|
|
*/ |
|
|
|
|
static void inproc_transports_create(grpc_transport** server_transport, |
|
|
|
|
const grpc_channel_args* server_args, |
|
|
|
|
grpc_transport** client_transport, |
|
|
|
|
const grpc_channel_args* client_args) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "inproc_transports_create"); |
|
|
|
|
inproc_transport* st = |
|
|
|
|
static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st))); |
|
|
|
|
inproc_transport* ct = |
|
|
|
|
static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct))); |
|
|
|
|
// Share one lock between both sides since both sides get affected
|
|
|
|
|
st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu))); |
|
|
|
|
gpr_mu_init(&st->mu->mu); |
|
|
|
|
gpr_ref_init(&st->mu->refs, 2); |
|
|
|
|
st->base.vtable = &inproc_vtable; |
|
|
|
|
ct->base.vtable = &inproc_vtable; |
|
|
|
|
// Start each side of transport with 2 refs since they each have a ref
|
|
|
|
|
// to the other
|
|
|
|
|
gpr_ref_init(&st->refs, 2); |
|
|
|
|
gpr_ref_init(&ct->refs, 2); |
|
|
|
|
st->is_client = false; |
|
|
|
|
ct->is_client = true; |
|
|
|
|
grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY, |
|
|
|
|
"inproc_server"); |
|
|
|
|
grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY, |
|
|
|
|
"inproc_client"); |
|
|
|
|
st->other_side = ct; |
|
|
|
|
ct->other_side = st; |
|
|
|
|
st->stream_list = nullptr; |
|
|
|
|
ct->stream_list = nullptr; |
|
|
|
|
*server_transport = reinterpret_cast<grpc_transport*>(st); |
|
|
|
|
*client_transport = reinterpret_cast<grpc_transport*>(ct); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel* grpc_inproc_channel_create(grpc_server* server, |
|
|
|
|
grpc_channel_args* args, |
|
|
|
|
void* reserved) { |
|
|
|
|