Split legacy code to end of file

pull/357/head
Craig Tiller 10 years ago
parent a4716500d2
commit 566316f3a5
  1. 387
      src/core/surface/call.c

@ -48,26 +48,8 @@
#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
typedef struct {
gpr_uint8 md_out_buffer;
size_t md_out_count[2];
size_t md_out_capacity[2];
grpc_metadata *md_out[2];
grpc_byte_buffer *msg_out;
/* input buffers */
grpc_metadata_array initial_md_in;
grpc_metadata_array trailing_md_in;
size_t details_capacity;
char *details;
grpc_status_code status;
size_t msg_in_read_idx;
grpc_byte_buffer *msg_in;
void *finished_tag;
} legacy_state;
typedef struct legacy_state legacy_state;
static void destroy_legacy_state(legacy_state *ls);
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
@ -200,18 +182,10 @@ grpc_call *grpc_call_create(grpc_channel *channel,
return call;
}
legacy_state *get_legacy_state(grpc_call *call) {
if (call->legacy_state == NULL) {
call->legacy_state = gpr_malloc(sizeof(legacy_state));
memset(call->legacy_state, 0, sizeof(legacy_state));
}
return call->legacy_state;
}
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {
size_t i, j;
size_t i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
grpc_channel_internal_unref(c->channel);
@ -228,16 +202,7 @@ static void destroy_call(void *call, int ignored_success) {
gpr_free(c->buffered_initial_metadata.metadata);
gpr_free(c->buffered_trailing_metadata.metadata);
if (c->legacy_state) {
for (i = 0; i < 2; i++) {
for (j = 0; j < c->legacy_state->md_out_count[i]; j++) {
gpr_free(c->legacy_state->md_out[i][j].key);
gpr_free(c->legacy_state->md_out[i][j].value);
}
gpr_free(c->legacy_state->md_out[i]);
}
gpr_free(c->legacy_state->initial_md_in.metadata);
gpr_free(c->legacy_state->trailing_md_in.metadata);
gpr_free(c->legacy_state);
destroy_legacy_state(c->legacy_state);
}
gpr_free(c);
}
@ -749,6 +714,202 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
elem->filter->call_op(elem, NULL, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
if (success) {
if (call->is_client) {
grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
} else {
grpc_call_cancel(call);
}
}
grpc_call_internal_unref(call, 1);
}
void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
grpc_call_internal_ref(call);
call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
static void mark_read_closed(grpc_call *call) {
call->read_closed = 1;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
void grpc_call_read_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
GPR_ASSERT(!call->read_closed);
mark_read_closed(call);
unlock(call);
}
void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
GPR_ASSERT(!call->stream_closed);
if (!call->read_closed) {
mark_read_closed(call);
}
call->stream_closed = 1;
if (grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
unlock(call);
grpc_call_internal_unref(call, 0);
}
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
#define STATUS_OFFSET 1
static void destroy_status(void *ignored) {}
static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
&status)) {
status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
}
grpc_mdelem_set_user_data(md, destroy_status,
(void *)(gpr_intptr)(status + STATUS_OFFSET));
}
return status;
}
void grpc_call_recv_message(grpc_call_element *elem,
grpc_byte_buffer *byte_buffer) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
/* there's an outstanding read */
*call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
} else {
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
unlock(call);
}
void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdstr *key = md->key;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
lock(call);
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
if (!call->got_initial_metadata) {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
.data.recv_metadata
: &call->buffered_initial_metadata;
} else {
dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
.data.recv_metadata
: &call->buffered_trailing_metadata;
}
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
dest->metadata =
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = GPR_MAX(
call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
call->owned_metadata =
gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
call->owned_metadata[call->owned_metadata_count++] = md;
}
unlock(call);
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
/*
* LEGACY API IMPLEMENTATION
* All this code will disappear as soon as wrappings are updated
*/
struct legacy_state {
gpr_uint8 md_out_buffer;
size_t md_out_count[2];
size_t md_out_capacity[2];
grpc_metadata *md_out[2];
grpc_byte_buffer *msg_out;
/* input buffers */
grpc_metadata_array initial_md_in;
grpc_metadata_array trailing_md_in;
size_t details_capacity;
char *details;
grpc_status_code status;
size_t msg_in_read_idx;
grpc_byte_buffer *msg_in;
void *finished_tag;
};
static legacy_state *get_legacy_state(grpc_call *call) {
if (call->legacy_state == NULL) {
call->legacy_state = gpr_malloc(sizeof(legacy_state));
memset(call->legacy_state, 0, sizeof(legacy_state));
}
return call->legacy_state;
}
static void destroy_legacy_state(legacy_state *ls) {
size_t i, j;
for (i = 0; i < 2; i++) {
for (j = 0; j < ls->md_out_count[i]; j++) {
gpr_free(ls->md_out[i][j].key);
gpr_free(ls->md_out[i][j].value);
}
gpr_free(ls->md_out[i]);
}
gpr_free(ls->initial_md_in.metadata);
gpr_free(ls->trailing_md_in.metadata);
gpr_free(ls);
}
grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
legacy_state *ls;
@ -1008,151 +1169,3 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
return err;
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
if (success) {
if (call->is_client) {
grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
} else {
grpc_call_cancel(call);
}
}
grpc_call_internal_unref(call, 1);
}
void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
grpc_call_internal_ref(call);
call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
static void mark_read_closed(grpc_call *call) {
call->read_closed = 1;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
void grpc_call_read_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
GPR_ASSERT(!call->read_closed);
mark_read_closed(call);
unlock(call);
}
void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
GPR_ASSERT(!call->stream_closed);
if (!call->read_closed) {
mark_read_closed(call);
}
call->stream_closed = 1;
if (grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
unlock(call);
grpc_call_internal_unref(call, 0);
}
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
#define STATUS_OFFSET 1
static void destroy_status(void *ignored) {}
static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
&status)) {
status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
}
grpc_mdelem_set_user_data(md, destroy_status,
(void *)(gpr_intptr)(status + STATUS_OFFSET));
}
return status;
}
void grpc_call_recv_message(grpc_call_element *elem,
grpc_byte_buffer *byte_buffer) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
/* there's an outstanding read */
*call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
} else {
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
unlock(call);
}
void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdstr *key = md->key;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
lock(call);
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
if (!call->got_initial_metadata) {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
.data.recv_metadata
: &call->buffered_initial_metadata;
} else {
dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
.data.recv_metadata
: &call->buffered_trailing_metadata;
}
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
dest->metadata =
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = GPR_MAX(
call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
call->owned_metadata =
gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
call->owned_metadata[call->owned_metadata_count++] = md;
}
unlock(call);
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}

Loading…
Cancel
Save