Fix proxy, finalize API

pull/2800/head^2
Craig Tiller 10 years ago
parent 18541a1cee
commit 402acf6c44
  1. 12
      include/grpc/grpc.h
  2. 9
      src/core/surface/call.c
  3. 5
      src/core/surface/completion_queue.c
  4. 67
      test/core/end2end/fixtures/proxy.c
  5. 3
      test/core/end2end/tests/cancel_after_accept.c
  6. 3
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  7. 3
      test/core/end2end/tests/cancel_after_invoke.c

@ -358,17 +358,19 @@ typedef struct grpc_op {
/** Propagate deadline */ /** Propagate deadline */
#define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1) #define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1)
/** Propagate census context */ /** Propagate census context */
#define GRPC_PROPAGATE_CENSUS_CONTEXT ((gpr_uint32)2) #define GRPC_PROPAGATE_STATS_CONTEXT ((gpr_uint32)2)
#define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)4) #define GRPC_PROPAGATE_TRACING_CONTEXT ((gpr_uint32)4)
#define GRPC_PROPAGATE_AUTH ((gpr_uint32)8) #define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)8)
/* Default propagation mask: clients of the core API are encouraged to encode /* Default propagation mask: clients of the core API are encouraged to encode
deltas from this in their implementations... ie write: deltas from this in their implementations... ie write:
GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
propagation. Doing so gives flexibility in the future to define new propagation. Doing so gives flexibility in the future to define new
propagation types that are default inherited or not. */ propagation types that are default inherited or not. */
#define GRPC_PROPAGATE_DEFAULTS \ #define GRPC_PROPAGATE_DEFAULTS \
((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT))) ((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | \
GRPC_PROPAGATE_STATS_CONTEXT | \
GRPC_PROPAGATE_TRACING_CONTEXT)))
/** Initialize the grpc library. /** Initialize the grpc library.

@ -317,7 +317,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_mu_init(&call->completion_mu); gpr_mu_init(&call->completion_mu);
call->channel = channel; call->channel = channel;
call->cq = cq; call->cq = cq;
if (cq) { if (cq != NULL) {
GRPC_CQ_INTERNAL_REF(cq, "bind"); GRPC_CQ_INTERNAL_REF(cq, "bind");
} }
call->parent = parent_call; call->parent = parent_call;
@ -372,10 +372,15 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
parent_call->send_deadline.clock_type), parent_call->send_deadline.clock_type),
parent_call->send_deadline); parent_call->send_deadline);
} }
if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) { /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
if (propagation_mask & GRPC_PROPAGATE_TRACING_CONTEXT) {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_STATS_CONTEXT);
grpc_call_context_set(call, GRPC_CONTEXT_TRACING, grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
parent_call->context[GRPC_CONTEXT_TRACING].value, parent_call->context[GRPC_CONTEXT_TRACING].value,
NULL); NULL);
} else {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_STATS_CONTEXT);
} }
if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1; call->cancellation_is_inherited = 1;

@ -107,6 +107,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
} }
void grpc_cq_begin_op(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc) {
#ifndef NDEBUG
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown_called);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
#endif
gpr_ref(&cc->pending_events); gpr_ref(&cc->pending_events);
} }

@ -52,6 +52,8 @@ struct grpc_end2end_proxy {
grpc_server *server; grpc_server *server;
grpc_channel *client; grpc_channel *client;
int shutdown;
/* requested call */ /* requested call */
grpc_call *new_call; grpc_call *new_call;
grpc_call_details new_call_details; grpc_call_details new_call_details;
@ -65,6 +67,7 @@ typedef struct {
typedef struct { typedef struct {
gpr_refcount refs; gpr_refcount refs;
grpc_end2end_proxy *proxy;
grpc_call *c2p; grpc_call *c2p;
grpc_call *p2s; grpc_call *p2s;
@ -119,12 +122,15 @@ static closure *new_closure(void (*func)(void *arg, int success), void *arg) {
return cl; return cl;
} }
static void shutdown_complete(void *arg, int success) {} static void shutdown_complete(void *arg, int success) {
grpc_end2end_proxy *proxy = arg;
proxy->shutdown = 1;
grpc_completion_queue_shutdown(proxy->cq);
}
void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) { void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) {
grpc_server_shutdown_and_notify(proxy->server, proxy->cq, grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
new_closure(shutdown_complete, NULL)); new_closure(shutdown_complete, proxy));
grpc_completion_queue_shutdown(proxy->cq);
gpr_thd_join(proxy->thd); gpr_thd_join(proxy->thd);
gpr_free(proxy->proxy_port); gpr_free(proxy->proxy_port);
gpr_free(proxy->server_port); gpr_free(proxy->server_port);
@ -165,14 +171,16 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) {
grpc_op op; grpc_op op;
grpc_call_error err; grpc_call_error err;
op.op = GRPC_OP_SEND_INITIAL_METADATA; if (!pc->proxy->shutdown) {
op.flags = 0; op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; op.flags = 0;
op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
refpc(pc, "on_c2p_sent_initial_metadata"); op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
err = grpc_call_start_batch(pc->c2p, &op, 1, refpc(pc, "on_c2p_sent_initial_metadata");
new_closure(on_c2p_sent_initial_metadata, pc)); err = grpc_call_start_batch(pc->c2p, &op, 1,
GPR_ASSERT(err == GRPC_CALL_OK); new_closure(on_c2p_sent_initial_metadata, pc));
GPR_ASSERT(err == GRPC_CALL_OK);
}
unrefpc(pc, "on_p2s_recv_initial_metadata"); unrefpc(pc, "on_p2s_recv_initial_metadata");
} }
@ -190,7 +198,7 @@ static void on_p2s_sent_message(void *arg, int success) {
grpc_call_error err; grpc_call_error err;
grpc_byte_buffer_destroy(pc->c2p_msg); grpc_byte_buffer_destroy(pc->c2p_msg);
if (success) { if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE; op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0; op.flags = 0;
op.data.recv_message = &pc->c2p_msg; op.data.recv_message = &pc->c2p_msg;
@ -213,7 +221,7 @@ static void on_c2p_recv_msg(void *arg, int success) {
grpc_op op; grpc_op op;
grpc_call_error err; grpc_call_error err;
if (success) { if (!pc->proxy->shutdown && success) {
if (pc->c2p_msg != NULL) { if (pc->c2p_msg != NULL) {
op.op = GRPC_OP_SEND_MESSAGE; op.op = GRPC_OP_SEND_MESSAGE;
op.flags = 0; op.flags = 0;
@ -243,7 +251,7 @@ static void on_c2p_sent_message(void *arg, int success) {
grpc_call_error err; grpc_call_error err;
grpc_byte_buffer_destroy(pc->p2s_msg); grpc_byte_buffer_destroy(pc->p2s_msg);
if (success) { if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE; op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0; op.flags = 0;
op.data.recv_message = &pc->p2s_msg; op.data.recv_message = &pc->p2s_msg;
@ -261,7 +269,7 @@ static void on_p2s_recv_msg(void *arg, int success) {
grpc_op op; grpc_op op;
grpc_call_error err; grpc_call_error err;
if (success && pc->p2s_msg) { if (!pc->proxy->shutdown && success && pc->p2s_msg) {
op.op = GRPC_OP_SEND_MESSAGE; op.op = GRPC_OP_SEND_MESSAGE;
op.flags = 0; op.flags = 0;
op.data.send_message = pc->p2s_msg; op.data.send_message = pc->p2s_msg;
@ -283,19 +291,21 @@ static void on_p2s_status(void *arg, int success) {
grpc_op op; grpc_op op;
grpc_call_error err; grpc_call_error err;
GPR_ASSERT(success); if (!pc->proxy->shutdown) {
op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; GPR_ASSERT(success);
op.flags = 0; op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op.data.send_status_from_server.trailing_metadata_count = op.flags = 0;
pc->p2s_trailing_metadata.count; op.data.send_status_from_server.trailing_metadata_count =
op.data.send_status_from_server.trailing_metadata = pc->p2s_trailing_metadata.count;
pc->p2s_trailing_metadata.metadata; op.data.send_status_from_server.trailing_metadata =
op.data.send_status_from_server.status = pc->p2s_status; pc->p2s_trailing_metadata.metadata;
op.data.send_status_from_server.status_details = pc->p2s_status_details; op.data.send_status_from_server.status = pc->p2s_status;
refpc(pc, "on_c2p_sent_status"); op.data.send_status_from_server.status_details = pc->p2s_status_details;
err = grpc_call_start_batch(pc->c2p, &op, 1, refpc(pc, "on_c2p_sent_status");
new_closure(on_c2p_sent_status, pc)); err = grpc_call_start_batch(pc->c2p, &op, 1,
GPR_ASSERT(err == GRPC_CALL_OK); new_closure(on_c2p_sent_status, pc));
GPR_ASSERT(err == GRPC_CALL_OK);
}
unrefpc(pc, "on_p2s_status"); unrefpc(pc, "on_p2s_status");
} }
@ -313,6 +323,7 @@ static void on_new_call(void *arg, int success) {
grpc_op op; grpc_op op;
proxy_call *pc = gpr_malloc(sizeof(*pc)); proxy_call *pc = gpr_malloc(sizeof(*pc));
memset(pc, 0, sizeof(*pc)); memset(pc, 0, sizeof(*pc));
pc->proxy = proxy;
GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata, GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
proxy->new_call_metadata); proxy->new_call_metadata);
pc->c2p = proxy->new_call; pc->c2p = proxy->new_call;

@ -192,8 +192,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
cq_expect_completion(cqv, tag(1), 1); cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(status == mode.expect_status); GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL);
GPR_ASSERT(0 == strcmp(details, mode.expect_details));
GPR_ASSERT(was_cancelled == 1); GPR_ASSERT(was_cancelled == 1);
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);

@ -195,8 +195,7 @@ static void test_cancel_after_accept_and_writes_closed(
cq_expect_completion(cqv, tag(1), 1); cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(status == mode.expect_status); GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL);
GPR_ASSERT(0 == strcmp(details, mode.expect_details));
GPR_ASSERT(was_cancelled == 1); GPR_ASSERT(was_cancelled == 1);
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);

@ -164,8 +164,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
cq_expect_completion(cqv, tag(1), 1); cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(status == mode.expect_status); GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL);
GPR_ASSERT(0 == strcmp(details, mode.expect_details));
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv);

Loading…
Cancel
Save