One test compiles

pull/1369/head
Craig Tiller 10 years ago
parent 6ad62a7204
commit fbf5be26a3
  1. 36
      src/core/surface/call.c
  2. 4
      src/core/surface/channel.c
  3. 5
      src/core/transport/chttp2_transport.c
  4. 14
      src/core/transport/stream_op.c
  5. 6
      src/core/transport/stream_op.h
  6. 16
      src/core/transport/transport.c
  7. 109
      src/core/transport/transport_op_string.c
  8. 6
      test/core/end2end/fixtures/chttp2_socket_pair.c

@ -283,6 +283,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
/* one ref is dropped in response to destroy, the other in
stream_closed */
gpr_ref_init(&call->internal_refcount, 2);
@ -330,6 +332,8 @@ static void destroy_call(void *call, int ignored_success) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
grpc_sopb_destroy(&c->send_ops);
grpc_sopb_destroy(&c->recv_ops);
gpr_free(c);
}
@ -1091,41 +1095,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_mdctx_unlock(mdctx);
}
#if 0
void grpc_call_read_closed(grpc_call_element *elem) {
set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
}
void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
set_read_state(call, READ_STATE_STREAM_CLOSED);
grpc_call_internal_unref(call, 0);
}
void grpc_call_recv_message(grpc_call_element *elem,
grpc_byte_buffer *byte_buffer) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
grpc_bbq_push(&call->incoming_queue, byte_buffer);
finish_read_ops(call);
unlock(call);
}
void grpc_call_recv_synthetic_status(grpc_call_element *elem,
grpc_status_code status,
const char *message) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
set_status_code(call, STATUS_FROM_CORE, status);
set_status_details(call, STATUS_FROM_CORE,
grpc_mdstr_from_string(call->metadata_context, message));
unlock(call);
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
#endif
/*
* BATCH API IMPLEMENTATION

@ -242,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
return channel->max_message_length;
}

@ -1503,8 +1503,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
size_t i;
@ -1522,8 +1520,7 @@ static void add_metadata_batch(transport *t, stream *s) {
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
s->incoming_metadata);
/* TODO(ctiller): don't leak incoming_metadata */
/* reset */
s->incoming_deadline = gpr_inf_future;

@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
case GRPC_OP_METADATA:
grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
case GRPC_OP_FLOW_CTL_CB:
ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
break;
case GRPC_NO_OP:
case GRPC_OP_BEGIN_MESSAGE:
break;
@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
GPR_ASSERT(sopb->nops <= sopb->capacity);
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
void (*cb)(void *arg, grpc_op_error error),
void *arg) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_FLOW_CTL_CB;
op->data.flow_ctl_cb.cb = cb;
op->data.flow_ctl_cb.arg = arg;
assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops) {
size_t orig_nops = sopb->nops;

@ -154,12 +154,10 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
void (*cb)(void *arg, grpc_op_error error),
void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */

@ -56,14 +56,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
return transport->vtable->init_stream(transport, stream, server_data);
}
void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
grpc_stream_op *ops, size_t nops, int is_last) {
transport->vtable->send_batch(transport, stream, ops, nops, is_last);
}
void grpc_transport_set_allow_window_updates(grpc_transport *transport,
grpc_stream *stream, int allow) {
transport->vtable->set_allow_window_updates(transport, stream, allow);
void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_op *op) {
transport->vtable->perform_op(transport, stream, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
@ -76,11 +71,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
grpc_status_code status) {
transport->vtable->abort_stream(transport, stream, status);
}
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);

@ -66,65 +66,76 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
}
}
char *grpc_call_op_string(grpc_call_op *op) {
char *tmp;
char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
char *out;
char *tmp;
size_t i;
gpr_strvec b;
gpr_strvec_init(&b);
switch (op->dir) {
case GRPC_CALL_DOWN:
gpr_strvec_add(&b, gpr_strdup(">"));
break;
case GRPC_CALL_UP:
gpr_strvec_add(&b, gpr_strdup("<"));
break;
}
for (i = 0; i < sopb->nops; i++) {
grpc_stream_op *op = &sopb->ops[i];
if (i) gpr_strvec_add(&b, gpr_strdup(", "));
switch (op->type) {
case GRPC_SEND_METADATA:
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_SEND_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
case GRPC_NO_OP:
gpr_strvec_add(&b, gpr_strdup("NO_OP"));
break;
case GRPC_SEND_PREFORMATTED_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
break;
case GRPC_SEND_FINISH:
gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
case GRPC_OP_BEGIN_MESSAGE:
gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
gpr_strvec_add(&b, tmp);
break;
case GRPC_REQUEST_DATA:
gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
case GRPC_OP_SLICE:
gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
break;
case GRPC_RECV_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
case GRPC_OP_METADATA:
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
break;
case GRPC_RECV_HALF_CLOSE:
gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
break;
case GRPC_RECV_FINISH:
gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
case GRPC_RECV_SYNTHETIC_STATUS:
gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
op->data.synthetic_status.status,
op->data.synthetic_status.message);
gpr_strvec_add(&b, tmp);
break;
case GRPC_CANCEL_OP:
gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
return out;
}
char *grpc_transport_op_string(grpc_transport_op *op) {
char *tmp;
char *out;
int first = 1;
gpr_strvec b;
gpr_strvec_init(&b);
if (op->send_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
gpr_strvec_add(&b, gpr_strdup("SEND"));
if (op->is_last_send) {
gpr_strvec_add(&b, gpr_strdup("_LAST"));
}
gpr_strvec_add(&b, gpr_strdup("["));
gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
gpr_strvec_add(&b, gpr_strdup("]"));
}
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
gpr_strvec_add(&b, gpr_strdup("RECV"));
}
if (op->bind_pollset) {
gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
gpr_strvec_add(&b, gpr_strdup("BIND"));
}
if (op->cancel_with_status != GRPC_STATUS_OK) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, NULL);
@ -134,8 +145,8 @@ char *grpc_call_op_string(grpc_call_op *op) {
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_call_op *op) {
char *str = grpc_call_op_string(op);
grpc_call_element *elem, grpc_transport_op *op) {
char *str = grpc_transport_op_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}

@ -37,7 +37,6 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint_pair.h"
@ -60,8 +59,7 @@
static grpc_transport_setup_result server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
&grpc_http_filter};
static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
return grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
@ -76,7 +74,7 @@ static grpc_transport_setup_result client_setup_transport(
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {
&grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter,
&grpc_client_surface_filter, &grpc_http_client_filter,
&grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
grpc_channel *channel = grpc_channel_create_from_filters(

Loading…
Cancel
Save