diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7fcf6e2b04b..18be81308d8 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -283,6 +283,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); + grpc_sopb_init(&call->send_ops); + grpc_sopb_init(&call->recv_ops); /* one ref is dropped in response to destroy, the other in stream_closed */ gpr_ref_init(&call->internal_refcount, 2); @@ -330,6 +332,8 @@ static void destroy_call(void *call, int ignored_success) { destroy_legacy_state(c->legacy_state); } grpc_bbq_destroy(&c->incoming_queue); + grpc_sopb_destroy(&c->send_ops); + grpc_sopb_destroy(&c->recv_ops); gpr_free(c); } @@ -1091,41 +1095,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_mdctx_unlock(mdctx); } -#if 0 -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - set_status_code(call, STATUS_FROM_CORE, status); - set_status_details(call, STATUS_FROM_CORE, - grpc_mdstr_from_string(call->metadata_context, message)); - unlock(call); -} - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -#endif /* * BATCH API IMPLEMENTATION diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index f1d71afaf27..de2f354c782 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -242,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } + +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { + return channel->max_message_length; +} \ No newline at end of file diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index fed30887896..9c2af560c13 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1503,8 +1503,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { return window + window_update < MAX_WINDOW; } -static void free_md(void *p, grpc_op_error result) { gpr_free(p); } - static void add_metadata_batch(transport *t, stream *s) { grpc_metadata_batch b; size_t i; @@ -1522,8 +1520,7 @@ static void add_metadata_batch(transport *t, stream *s) { s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); - grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, - s->incoming_metadata); + /* TODO(ctiller): don't leak incoming_metadata */ /* reset */ s->incoming_deadline = gpr_inf_future; diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 882c078d512..ea22b0e1c82 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { case GRPC_OP_METADATA: grpc_metadata_batch_destroy(&ops[i].data.metadata); break; - case GRPC_OP_FLOW_CTL_CB: - ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); - break; case GRPC_NO_OP: case GRPC_OP_BEGIN_MESSAGE: break; @@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { assert_contained_metadata_ok(sopb->ops, sopb->nops); + GPR_ASSERT(sopb->nops <= sopb->capacity); if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { assert_contained_metadata_ok(sopb->ops, sopb->nops); } -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_FLOW_CTL_CB; - op->data.flow_ctl_cb.cb = cb; - op->data.flow_ctl_cb.arg = arg; - assert_contained_metadata_ok(sopb->ops, sopb->nops); -} - void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops) { size_t orig_nops = sopb->nops; diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index c3901bf6088..f5de64d583a 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -154,12 +154,10 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); -/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg); /* Append a buffer to a buffer - does not ref/unref any internal objects */ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops); +char *grpc_sopb_string(grpc_stream_op_buffer *sopb); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index ef0020dc58b..35195348e70 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -56,14 +56,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, return transport->vtable->init_stream(transport, stream, server_data); } -void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, - grpc_stream_op *ops, size_t nops, int is_last) { - transport->vtable->send_batch(transport, stream, ops, nops, is_last); -} - -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow) { - transport->vtable->set_allow_window_updates(transport, stream, allow); +void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + grpc_transport_op *op) { + transport->vtable->perform_op(transport, stream, op); } void grpc_transport_add_to_pollset(grpc_transport *transport, @@ -76,11 +71,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status) { - transport->vtable->abort_stream(transport, stream, status); -} - void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data) { transport->vtable->ping(transport, cb, user_data); diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 5f7e1be2682..e886690234f 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -66,65 +66,76 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { } } -char *grpc_call_op_string(grpc_call_op *op) { +char *grpc_sopb_string(grpc_stream_op_buffer *sopb) { + char *out; + char *tmp; + size_t i; + gpr_strvec b; + gpr_strvec_init(&b); + + for (i = 0; i < sopb->nops; i++) { + grpc_stream_op *op = &sopb->ops[i]; + if (i) gpr_strvec_add(&b, gpr_strdup(", ")); + switch (op->type) { + case GRPC_NO_OP: + gpr_strvec_add(&b, gpr_strdup("NO_OP")); + break; + case GRPC_OP_BEGIN_MESSAGE: + gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_SLICE: + gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice)); + break; + case GRPC_OP_METADATA: + put_metadata_list(&b, op->data.metadata); + break; + } + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +char *grpc_transport_op_string(grpc_transport_op *op) { char *tmp; char *out; + int first = 1; gpr_strvec b; gpr_strvec_init(&b); - switch (op->dir) { - case GRPC_CALL_DOWN: - gpr_strvec_add(&b, gpr_strdup(">")); - break; - case GRPC_CALL_UP: - gpr_strvec_add(&b, gpr_strdup("<")); - break; + if (op->send_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("SEND")); + if (op->is_last_send) { + gpr_strvec_add(&b, gpr_strdup("_LAST")); + } + gpr_strvec_add(&b, gpr_strdup("[")); + gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); + gpr_strvec_add(&b, gpr_strdup("]")); } - switch (op->type) { - case GRPC_SEND_METADATA: - gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); - put_metadata_list(&b, op->data.metadata); - break; - case GRPC_SEND_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); - break; - case GRPC_SEND_PREFORMATTED_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE")); - break; - case GRPC_SEND_FINISH: - gpr_strvec_add(&b, gpr_strdup("SEND_FINISH")); - break; - case GRPC_REQUEST_DATA: - gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA")); - break; - case GRPC_RECV_METADATA: - gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); - put_metadata_list(&b, op->data.metadata); - break; - case GRPC_RECV_MESSAGE: - gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); - break; - case GRPC_RECV_HALF_CLOSE: - gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE")); - break; - case GRPC_RECV_FINISH: - gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); - break; - case GRPC_RECV_SYNTHETIC_STATUS: - gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", - op->data.synthetic_status.status, - op->data.synthetic_status.message); - gpr_strvec_add(&b, tmp); - break; - case GRPC_CANCEL_OP: - gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); - break; + + if (op->recv_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("RECV")); } - gpr_asprintf(&tmp, " flags=0x%08x", op->flags); - gpr_strvec_add(&b, tmp); + if (op->bind_pollset) { - gpr_strvec_add(&b, gpr_strdup("bind_pollset")); + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("BIND")); + } + + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); + gpr_strvec_add(&b, tmp); } out = gpr_strvec_flatten(&b, NULL); @@ -134,8 +145,8 @@ char *grpc_call_op_string(grpc_call_op *op) { } void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_call_op *op) { - char *str = grpc_call_op_string(op); + grpc_call_element *elem, grpc_transport_op *op) { + char *str = grpc_transport_op_string(op); gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); gpr_free(str); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 1225f7db0c2..acc0a69708a 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -37,7 +37,6 @@ #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/http_filter.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/iomgr/endpoint_pair.h" @@ -60,8 +59,7 @@ static grpc_transport_setup_result server_setup_transport( void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; return grpc_server_setup_transport(f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } @@ -76,7 +74,7 @@ static grpc_transport_setup_result client_setup_transport( sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = { - &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter, + &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters(