Merge pull request #34 from ctiller/api

New C layer invoke API
pull/69/head
Yang Gao 10 years ago
commit ea36ba3285
  1. 11
      include/grpc/grpc.h
  2. 11
      src/core/surface/byte_buffer.c
  3. 184
      src/core/surface/call.c
  4. 15
      src/cpp/client/channel.cc
  5. 12
      src/cpp/stream/stream_context.cc
  6. 1
      src/cpp/stream/stream_context.h
  7. 7
      test/core/echo/client.c
  8. 11
      test/core/end2end/cq_verifier.c
  9. 1
      test/core/end2end/cq_verifier.h
  10. 10
      test/core/end2end/dualstack_socket_test.c
  11. 213
      test/core/end2end/dualstack_socket_test.c.orig
  12. 4
      test/core/end2end/no_server_test.c
  13. 4
      test/core/end2end/tests/cancel_after_accept.c
  14. 4
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  15. 4
      test/core/end2end/tests/cancel_after_invoke.c
  16. 3
      test/core/end2end/tests/cancel_before_invoke.c
  17. 4
      test/core/end2end/tests/census_simple_request.c
  18. 7
      test/core/end2end/tests/disappearing_server.c
  19. 4
      test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
  20. 4
      test/core/end2end/tests/invoke_large_request.c
  21. 35
      test/core/end2end/tests/max_concurrent_streams.c
  22. 3
      test/core/end2end/tests/ping_pong_streaming.c
  23. 4
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  24. 4
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  25. 4
      test/core/end2end/tests/request_response_with_payload.c
  26. 4
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  27. 4
      test/core/end2end/tests/request_with_large_metadata.c
  28. 4
      test/core/end2end/tests/request_with_payload.c
  29. 6
      test/core/end2end/tests/simple_delayed_request.c
  30. 8
      test/core/end2end/tests/simple_request.c
  31. 40
      test/core/end2end/tests/thread_stress.c
  32. 4
      test/core/end2end/tests/writes_done_hangs_with_pending_read.c
  33. 11
      test/core/fling/client.c
  34. 4
      test/core/surface/lame_client_test.c
  35. 1
      third_party/libevent
  36. 47
      tools/run_tests/run_tests.py

@ -158,6 +158,7 @@ typedef struct grpc_byte_buffer grpc_byte_buffer;
/* Sample helpers to obtain byte buffers (these will certainly move place */
grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices);
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
@ -312,18 +313,14 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the client.
Produces a GRPC_INVOKE_ACCEPTED event with invoke_accepted_tag when the
call has been invoked (meaning bytes can start flowing to the wire).
Produces a GRPC_CLIENT_METADATA_READ event with metadata_read_tag when
the servers initial metadata has been read.
Produces a GRPC_FINISHED event with finished_tag when the call has been
completed (there may be other events for the call pending at this
time) */
grpc_call_error grpc_call_start_invoke(grpc_call *call,
grpc_completion_queue *cq,
void *invoke_accepted_tag,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags);
grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags);
/* DEPRECATED: users should use grpc_call_server_accept, and
grpc_call_server_end_initial_metadata instead now.

@ -49,6 +49,17 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
bb->data.slice_buffer.count);
}
gpr_log(GPR_INFO, "should never get here");
abort();
return NULL;
}
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:

@ -173,11 +173,14 @@ struct grpc_call {
/* protects variables in this section */
gpr_mu read_mu;
gpr_uint8 received_start;
gpr_uint8 start_ok;
gpr_uint8 reads_done;
gpr_uint8 received_finish;
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
gpr_uint8 pending_writes_done;
gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
@ -190,6 +193,8 @@ struct grpc_call {
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
void *write_tag;
grpc_byte_buffer *pending_write;
gpr_uint32 pending_write_flags;
/* The final status of the call */
grpc_status_code status_code;
@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_alarm = 0;
call->received_metadata = 0;
call->got_status_code = 0;
call->start_ok = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
call->received_finish = 0;
call->reads_done = 0;
call->received_start = 0;
call->pending_write = NULL;
call->pending_writes_done = 0;
grpc_metadata_buffer_init(&call->incoming_metadata);
gpr_ref_init(&call->internal_refcount, 1);
grpc_call_stack_init(channel_stack, server_transport_data,
@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
static void done_invoke(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
void *tag = call->write_tag;
GPR_ASSERT(call->have_write);
call->have_write = 0;
call->write_tag = INVALID_TAG;
grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
}
static void finish_call(grpc_call *call) {
size_t count;
grpc_metadata *elements;
@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) {
elements, count);
}
grpc_call_error grpc_call_start_invoke(grpc_call *call,
grpc_completion_queue *cq,
void *invoke_accepted_tag,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags) {
static void done_write(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
void *tag = call->write_tag;
GPR_ASSERT(call->have_write);
call->have_write = 0;
call->write_tag = INVALID_TAG;
grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
}
static void done_writes_done(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
void *tag = call->write_tag;
GPR_ASSERT(call->have_write);
call->have_write = 0;
call->write_tag = INVALID_TAG;
grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
}
static void call_started(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
grpc_call_element *elem;
grpc_byte_buffer *pending_write = NULL;
gpr_uint32 pending_write_flags = 0;
gpr_uint8 pending_writes_done = 0;
int ok;
grpc_call_op op;
gpr_mu_lock(&call->read_mu);
GPR_ASSERT(!call->received_start);
call->received_start = 1;
ok = call->start_ok = (error == GRPC_OP_OK);
pending_write = call->pending_write;
pending_write_flags = call->pending_write_flags;
pending_writes_done = call->pending_writes_done;
gpr_mu_unlock(&call->read_mu);
if (pending_write) {
if (ok) {
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = pending_write_flags;
op.done_cb = done_write;
op.user_data = call;
op.data.message = pending_write;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
} else {
done_write(call, error);
}
grpc_byte_buffer_destroy(pending_write);
}
if (pending_writes_done) {
if (ok) {
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = done_writes_done;
op.user_data = call;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
} else {
done_writes_done(call, error);
}
}
grpc_call_internal_unref(call);
}
grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags) {
grpc_call_element *elem;
grpc_call_op op;
@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
/* inform the completion queue of an incoming operation */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
gpr_mu_lock(&call->read_mu);
@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
if (call->received_finish) {
/* handle early cancellation */
grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
GRPC_OP_ERROR);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
NULL, 0, NULL);
finish_call(call);
@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
return GRPC_CALL_OK;
}
call->write_tag = invoke_accepted_tag;
call->metadata_tag = metadata_read_tag;
call->have_write = 1;
gpr_mu_unlock(&call->read_mu);
/* call down the filter stack */
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_invoke;
op.done_cb = call_started;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
grpc_call_internal_ref(call);
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call,
call->state = CALL_BOUNDCQ;
call->cq = cq;
call->finished_tag = finished_tag;
call->received_start = 1;
if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@ -546,26 +611,6 @@ grpc_call_error grpc_call_accept(grpc_call *call, grpc_completion_queue *cq,
return GRPC_CALL_OK;
}
static void done_writes_done(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
void *tag = call->write_tag;
GPR_ASSERT(call->have_write);
call->have_write = 0;
call->write_tag = INVALID_TAG;
grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
}
static void done_write(void *user_data, grpc_op_error error) {
grpc_call *call = user_data;
void *tag = call->write_tag;
GPR_ASSERT(call->have_write);
call->have_write = 0;
call->write_tag = INVALID_TAG;
grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
}
void grpc_call_client_initial_metadata_complete(
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
@ -628,7 +673,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
} else {
call->read_tag = tag;
call->have_read = 1;
request_more = 1;
request_more = call->received_start;
}
} else if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@ -665,8 +710,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
/* for now we do no buffering, so a NULL byte_buffer can have no impact
on our behavior -- succeed immediately */
/* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
flush, and that flush should be propogated down from here */
if (byte_buffer == NULL) {
@ -677,15 +720,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
call->write_tag = tag;
call->have_write = 1;
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_write;
op.user_data = call;
op.data.message = byte_buffer;
gpr_mu_lock(&call->read_mu);
if (!call->received_start) {
call->pending_write = grpc_byte_buffer_copy(byte_buffer);
call->pending_write_flags = flags;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
gpr_mu_unlock(&call->read_mu);
} else {
gpr_mu_unlock(&call->read_mu);
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.done_cb = done_write;
op.user_data = call;
op.data.message = byte_buffer;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
}
return GRPC_CALL_OK;
}
@ -717,14 +770,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
call->write_tag = tag;
call->have_write = 1;
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = done_writes_done;
op.user_data = call;
gpr_mu_lock(&call->read_mu);
if (!call->received_start) {
call->pending_writes_done = 1;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
gpr_mu_unlock(&call->read_mu);
} else {
gpr_mu_unlock(&call->read_mu);
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = done_writes_done;
op.user_data = call;
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
}
return GRPC_CALL_OK;
}
@ -829,6 +891,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);

@ -104,7 +104,6 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
context->set_call(call);
grpc_event* ev;
void* finished_tag = reinterpret_cast<char*>(call);
void* invoke_tag = reinterpret_cast<char*>(call) + 1;
void* metadata_read_tag = reinterpret_cast<char*>(call) + 2;
void* write_tag = reinterpret_cast<char*>(call) + 3;
void* halfclose_tag = reinterpret_cast<char*>(call) + 4;
@ -115,19 +114,11 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
// add_metadata from context
//
// invoke
GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag,
finished_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future);
bool success = ev->data.invoke_accepted == GRPC_OP_OK;
grpc_event_finish(ev);
if (!success) {
GetFinalStatus(cq, finished_tag, &status);
return status;
}
GPR_ASSERT(grpc_call_invoke(call, cq, metadata_read_tag, finished_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer* write_buffer = nullptr;
success = SerializeProto(request, &write_buffer);
bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =

@ -80,17 +80,9 @@ void StreamContext::Start(bool buffered) {
if (is_client_) {
// TODO(yangg) handle metadata send path
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(),
client_metadata_read_tag(),
finished_tag(), flag);
grpc_call_error error = grpc_call_invoke(
call(), cq(), client_metadata_read_tag(), finished_tag(), flag);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_event* invoke_ev =
grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future);
if (invoke_ev->data.invoke_accepted != GRPC_OP_OK) {
peer_halfclosed_ = true;
self_halfclosed_ = true;
}
grpc_event_finish(invoke_ev);
} else {
// TODO(yangg) metadata needs to be added before accept
// TODO(yangg) correctly set flag to accept

@ -76,7 +76,6 @@ class StreamContext final : public StreamContextInterface {
void* read_tag() { return reinterpret_cast<char*>(this) + 1; }
void* write_tag() { return reinterpret_cast<char*>(this) + 2; }
void* halfclose_tag() { return reinterpret_cast<char*>(this) + 3; }
void* invoke_tag() { return reinterpret_cast<char*>(this) + 4; }
void* client_metadata_read_tag() { return reinterpret_cast<char*>(this) + 5; }
grpc_call* call() { return call_; }
grpc_completion_queue* cq() { return cq_; }

@ -79,11 +79,8 @@ int main(int argc, char **argv) {
GPR_ASSERT(argc == 2);
channel = grpc_channel_create(argv[1], NULL);
call = grpc_channel_create_call(channel, "/foo", "localhost", gpr_inf_future);
GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
0) == GRPC_CALL_OK);
ev = grpc_completion_queue_next(cq, gpr_inf_future);
GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
grpc_event_finish(ev);
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
GRPC_CALL_OK);
start_write_next_slice(call, bytes_written, WRITE_SLICE_LENGTH);
bytes_written += WRITE_SLICE_LENGTH;

@ -70,7 +70,6 @@ typedef struct expectation {
union {
grpc_op_error finish_accepted;
grpc_op_error write_accepted;
grpc_op_error invoke_accepted;
struct {
const char *method;
const char *host;
@ -182,7 +181,7 @@ static void verify_matches(expectation *e, grpc_event *ev) {
GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted);
break;
case GRPC_INVOKE_ACCEPTED:
GPR_ASSERT(e->data.invoke_accepted == ev->data.invoke_accepted);
abort();
break;
case GRPC_SERVER_RPC_NEW:
GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method,
@ -268,8 +267,7 @@ static size_t expectation_to_string(char *out, expectation *e) {
return sprintf(out, "GRPC_WRITE_ACCEPTED result=%d",
e->data.write_accepted);
case GRPC_INVOKE_ACCEPTED:
return sprintf(out, "GRPC_INVOKE_ACCEPTED result=%d",
e->data.invoke_accepted);
return sprintf(out, "GRPC_INVOKE_ACCEPTED");
case GRPC_SERVER_RPC_NEW:
timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now());
return sprintf(out, "GRPC_SERVER_RPC_NEW method=%s host=%s timeout=%fsec",
@ -414,11 +412,6 @@ static metadata *metadata_from_args(va_list args) {
}
}
void cq_expect_invoke_accepted(cq_verifier *v, void *tag,
grpc_op_error result) {
add(v, GRPC_INVOKE_ACCEPTED, tag)->data.invoke_accepted = result;
}
void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result) {
add(v, GRPC_WRITE_ACCEPTED, tag)->data.write_accepted = result;
}

@ -56,7 +56,6 @@ void cq_verify_empty(cq_verifier *v);
Any functions taking ... expect a NULL terminated list of key/value pairs
(each pair using two parameter slots) of metadata that MUST be present in
the event. */
void cq_expect_invoke_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_finish_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_read(cq_verifier *v, void *tag, gpr_slice bytes);

@ -115,14 +115,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, client_cq, tag(1), tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
if (expect_ok) {
/* Check for a successful request. */
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
@ -151,10 +147,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_call_destroy(s);
} else {
/* Check for a failed connection. */
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED,
NULL, NULL);
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_ERROR);
cq_verify(v_client);
grpc_call_destroy(c);

@ -0,0 +1,213 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
/* This test exercises IPv4, IPv6, and dualstack sockets in various ways. */
static void *tag(gpr_intptr i) { return (void *)i; }
static gpr_timespec ms_from_now(int ms) {
return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_MS * ms));
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
do {
ev = grpc_completion_queue_next(cq, ms_from_now(5000));
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
gpr_log(GPR_INFO, "Drained event type %d", type);
} while (type != GRPC_QUEUE_SHUTDOWN);
}
void test_connect(const char *server_host, const char *client_host, int port,
int expect_ok) {
char *client_hostport;
char *server_hostport;
grpc_channel *client;
grpc_server *server;
grpc_completion_queue *client_cq;
grpc_completion_queue *server_cq;
grpc_call *c;
grpc_call *s;
cq_verifier *v_client;
cq_verifier *v_server;
gpr_timespec deadline;
gpr_join_host_port(&client_hostport, client_host, port);
gpr_join_host_port(&server_hostport, server_host, port);
gpr_log(GPR_INFO, "Testing with server=%s client=%s (expecting %s)",
server_hostport, client_hostport, expect_ok ? "success" : "failure");
/* Create server. */
server_cq = grpc_completion_queue_create();
server = grpc_server_create(server_cq, NULL);
GPR_ASSERT(grpc_server_add_http2_port(server, server_hostport));
grpc_server_start(server);
gpr_free(server_hostport);
v_server = cq_verifier_create(server_cq);
/* Create client. */
client_cq = grpc_completion_queue_create();
client = grpc_channel_create(client_hostport, NULL);
gpr_free(client_hostport);
v_client = cq_verifier_create(client_cq);
if (expect_ok) {
/* Normal deadline, shouldn't be reached. */
deadline = ms_from_now(60000);
} else {
/* Give up faster when failure is expected.
BUG: Setting this to 1000 reveals a memory leak (b/18608927). */
deadline = ms_from_now(1500);
}
/* Send a trivial request. */
c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, client_cq, tag(1), tag(2), tag(3), 0));
if (expect_ok) {
/* Check for a successful request. */
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
deadline, NULL);
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s, server_cq, tag(102), 0));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write_status(s, GRPC_STATUS_UNIMPLEMENTED, "xyz",
tag(5)));
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_UNIMPLEMENTED,
"xyz", NULL);
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);
grpc_call_destroy(c);
grpc_call_destroy(s);
} else {
/* Check for a failed connection. */
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED,
NULL, NULL);
cq_verify(v_client);
grpc_call_destroy(c);
}
cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
/* Destroy client. */
grpc_channel_destroy(client);
grpc_completion_queue_shutdown(client_cq);
drain_cq(client_cq);
grpc_completion_queue_destroy(client_cq);
/* Destroy server. */
grpc_server_shutdown(server);
grpc_server_destroy(server);
grpc_completion_queue_shutdown(server_cq);
drain_cq(server_cq);
grpc_completion_queue_destroy(server_cq);
}
int main(int argc, char **argv) {
int do_ipv6 = 1;
int i;
int port = grpc_pick_unused_port_or_die();
grpc_test_init(argc, argv);
grpc_init();
if (!grpc_ipv6_loopback_available()) {
gpr_log(GPR_INFO, "Can't bind to ::1. Skipping IPv6 tests.");
do_ipv6 = 0;
}
for (i = 0; i <= 1; i++) {
/* For coverage, test with and without dualstack sockets. */
grpc_forbid_dualstack_sockets_for_testing = i;
/* :: and 0.0.0.0 are handled identically. */
test_connect("::", "127.0.0.1", port, 1);
test_connect("::", "::ffff:127.0.0.1", port, 1);
test_connect("::", "localhost", port, 1);
test_connect("0.0.0.0", "127.0.0.1", port, 1);
test_connect("0.0.0.0", "::ffff:127.0.0.1", port, 1);
test_connect("0.0.0.0", "localhost", port, 1);
if (do_ipv6) {
test_connect("::", "::1", port, 1);
test_connect("0.0.0.0", "::1", port, 1);
}
/* These only work when the families agree. */
test_connect("127.0.0.1", "127.0.0.1", port, 1);
if (do_ipv6) {
test_connect("::1", "::1", port, 1);
test_connect("::1", "127.0.0.1", port, 0);
test_connect("127.0.0.1", "::1", port, 0);
}
}
grpc_shutdown();
return 0;
}

@ -57,10 +57,8 @@ int main(int argc, char **argv) {
/* create a call, channel to a non existant server */
chan = grpc_channel_create("nonexistant:54321", NULL);
call = grpc_channel_create_call(chan, "/foo", "nonexistant", deadline);
GPR_ASSERT(grpc_call_start_invoke(call, cq, tag(1), tag(2), tag(3), 0) ==
GRPC_CALL_OK);
GPR_ASSERT(grpc_call_invoke(call, cq, tag(2), tag(3), 0) == GRPC_CALL_OK);
/* verify that all tags get completed */
cq_expect_invoke_accepted(cqv, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(cqv, tag(2), NULL);
cq_expect_finished_with_status(cqv, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);

@ -124,9 +124,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",

@ -124,9 +124,7 @@ static void test_cancel_after_accept_and_writes_closed(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",

@ -122,9 +122,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == call_cancel(c));

@ -119,8 +119,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);

@ -109,9 +109,7 @@ static void test_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
tag(1);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);

@ -100,11 +100,8 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c, f->client_cq, tag(1),
tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_invoke(c, f->client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);

@ -115,9 +115,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);

@ -126,9 +126,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -113,9 +113,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
@ -157,7 +155,6 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
grpc_call *s1;
grpc_call *s2;
int live_call;
grpc_call *live_call_obj;
gpr_timespec deadline;
cq_verifier *v_client;
cq_verifier *v_server;
@ -191,26 +188,24 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c1, f.client_cq, tag(300),
tag(301), tag(302), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c2, f.client_cq, tag(400),
tag(401), tag(402), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_invoke(c1, f.client_cq, tag(301), tag(302), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_invoke(c2, f.client_cq, tag(401), tag(402), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c1, tag(303)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c2, tag(303)));
ev = grpc_completion_queue_next(
f.client_cq, gpr_time_add(gpr_now(), gpr_time_from_seconds(10)));
GPR_ASSERT(ev);
GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED);
GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED);
GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
/* The /alpha or /beta calls started above could be invoked (but NOT both);
* check this here */
live_call = (int)(gpr_intptr)ev->tag;
live_call_obj = live_call == 300 ? c1 : c2;
/* We'll get tag 303 or 403, we want 300, 400 */
live_call = ((int)(gpr_intptr)ev->tag) - 3;
grpc_event_finish(ev);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
cq_verify(v_client);
cq_expect_server_rpc_new(v_server, &s1, tag(100),
live_call == 300 ? "/alpha" : "/beta",
"test.google.com", deadline, NULL);
@ -230,14 +225,8 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
/* first request is finished, we should be able to start the second */
cq_expect_finished_with_status(v_client, tag(live_call + 2),
GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL);
live_call = (live_call == 300) ? 400 : 300;
live_call_obj = live_call == 300 ? c1 : c2;
cq_expect_invoke_accepted(v_client, tag(live_call), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
live_call = (live_call == 300) ? 400 : 300;
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200)));

@ -122,8 +122,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));

@ -145,9 +145,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -136,9 +136,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -125,9 +125,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -138,9 +138,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -128,9 +128,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta, 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
deadline, "key", meta.value, NULL);

@ -122,9 +122,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(c, payload, tag(4), 0));
/* destroy byte buffer early to ensure async code keeps track of its contents

@ -106,10 +106,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c, f->client_cq, tag(1),
tag(2), tag(3), 0));
gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_micros(delay_us)));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_invoke(c, f->client_cq, tag(2), tag(3), 0));
config.init_server(f, server_args);

@ -113,9 +113,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
@ -160,9 +158,7 @@ static void simple_request_body2(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);

@ -106,25 +106,30 @@ static void drain_cq(int client, grpc_completion_queue *cq) {
/* Kick off a new request - assumes g_mu taken */
static void start_request() {
gpr_slice slice = gpr_slice_malloc(100);
grpc_byte_buffer *buf;
grpc_call *call = grpc_channel_create_call(
g_fixture.client, "/Foo", "test.google.com", g_test_end_time);
memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice));
buf = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
g_active_requests++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(call, g_fixture.client_cq,
NULL, NULL, NULL, 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_invoke(call, g_fixture.client_cq, NULL, NULL, 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(call, NULL));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(call, buf, NULL, 0));
grpc_byte_buffer_destroy(buf);
}
/* Async client: handle sending requests, reading responses, and starting
new requests when old ones finish */
static void client_thread(void *p) {
int id = (gpr_intptr)p;
gpr_intptr id = (gpr_intptr)p;
grpc_event *ev;
gpr_slice slice = gpr_slice_malloc(100);
grpc_byte_buffer *buf;
char *estr;
memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice));
buf = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
for (;;) {
ev = grpc_completion_queue_next(g_fixture.client_cq, n_seconds_time(1));
@ -135,14 +140,6 @@ static void client_thread(void *p) {
gpr_log(GPR_ERROR, "unexpected event: %s", estr);
gpr_free(estr);
break;
case GRPC_INVOKE_ACCEPTED:
/* better not keep going if the invoke failed */
if (ev->data.invoke_accepted == GRPC_OP_OK) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, NULL));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(ev->call, buf, NULL, 0));
}
break;
case GRPC_READ:
break;
case GRPC_WRITE_ACCEPTED:
@ -173,7 +170,6 @@ static void client_thread(void *p) {
gpr_mu_unlock(&g_mu);
}
grpc_byte_buffer_destroy(buf);
gpr_event_set(&g_client_done[id], (void *)1);
}
@ -196,17 +192,17 @@ static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) {
static void server_thread(void *p) {
int id = (gpr_intptr)p;
grpc_event *ev;
gpr_slice slice = gpr_slice_malloc(100);
grpc_byte_buffer *buf;
grpc_event *ev;
char *estr;
memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice));
request_server_call();
memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice));
buf = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
request_server_call();
for (;;) {
ev = grpc_completion_queue_next(g_fixture.server_cq, n_seconds_time(1));
if (ev) {

@ -128,9 +128,7 @@ static void test_writes_done_hangs_with_pending_read(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));

@ -55,9 +55,8 @@ static void init_ping_pong_request() {}
static void step_ping_pong_request() {
call = grpc_channel_create_call(channel, "/Reflector/reflectUnary",
"localhost", gpr_inf_future);
GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
GPR_ASSERT(grpc_call_start_write(call, the_buffer, (void *)1,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
@ -66,7 +65,6 @@ static void step_ping_pong_request() {
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_call_destroy(call);
call = NULL;
}
@ -74,9 +72,8 @@ static void step_ping_pong_request() {
static void init_ping_pong_stream() {
call = grpc_channel_create_call(channel, "/Reflector/reflectStream",
"localhost", gpr_inf_future);
GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
0) == GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
}

@ -62,11 +62,9 @@ int main(int argc, char **argv) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(call, &md, 0));
/* and invoke the call */
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(call, cq, tag(1), tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(call, cq, tag(2), tag(3), 0));
/* the call should immediately fail */
cq_expect_invoke_accepted(cqv, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(cqv, tag(2), NULL);
cq_expect_finished(cqv, tag(3), NULL);
cq_verify(cqv);

@ -0,0 +1 @@
Subproject commit f7d92c63928a1460f3d99b9bc418bd3b686a0dca

@ -15,6 +15,7 @@ import watch_dirs
class SimpleConfig(object):
def __init__(self, config):
self.build_config = config
self.maxjobs = 32 * multiprocessing.cpu_count()
def run_command(self, binary):
return [binary]
@ -22,22 +23,36 @@ class SimpleConfig(object):
# ValgrindConfig: compile with some CONFIG=config, but use valgrind to run
class ValgrindConfig(object):
def __init__(self, config):
def __init__(self, config, tool):
self.build_config = config
self.tool = tool
self.maxjobs = 4 * multiprocessing.cpu_count()
def run_command(self, binary):
return ['valgrind', binary]
return ['valgrind', binary, '--tool=%s' % self.tool]
# SanConfig: compile with CONFIG=config, filter out incompatible binaries
class SanConfig(object):
def __init__(self, config):
self.build_config = config
self.maxjobs = 16 * multiprocessing.cpu_count()
def run_command(self, binary):
if '_ssl_' in binary:
return None
return [binary]
# different configurations we can run under
_CONFIGS = {
'dbg': SimpleConfig('dbg'),
'opt': SimpleConfig('opt'),
'tsan': SimpleConfig('tsan'),
'msan': SimpleConfig('msan'),
'asan': SimpleConfig('asan'),
'tsan': SanConfig('tsan'),
'msan': SanConfig('msan'),
'asan': SanConfig('asan'),
'gcov': SimpleConfig('gcov'),
'valgrind': ValgrindConfig('dbg'),
'memcheck': ValgrindConfig('dbg', 'memcheck'),
'helgrind': ValgrindConfig('dbg', 'helgrind')
}
@ -81,14 +96,18 @@ def _build_and_run(check_cancelled):
return 1
# run all the tests
if not jobset.run((
config.run_command(x)
for config in run_configs
for filt in filters
for x in itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (
config.build_config, filt)),
runs_per_test))), check_cancelled):
if not jobset.run(
itertools.ifilter(
lambda x: x is not None, (
config.run_command(x)
for config in run_configs
for filt in filters
for x in itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (
config.build_config, filt)),
runs_per_test)))),
check_cancelled,
maxjobs=min(c.maxjobs for c in run_configs)):
return 2
return 0

Loading…
Cancel
Save