Progress converting to new error system

pull/6897/head
Craig Tiller 9 years ago
parent 86df5a8521
commit 6a64bfd982
  1. 59
      src/core/lib/http/httpcli.c
  2. 35
      src/core/lib/http/parser.c
  3. 7
      src/core/lib/iomgr/tcp_posix.c
  4. 2
      test/core/http/httpcli_test.c
  5. 20
      test/core/http/parser_test.c

@ -39,12 +39,14 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/support/string.h"
@ -68,6 +70,7 @@ typedef struct {
grpc_closure on_read;
grpc_closure done_write;
grpc_closure connected;
grpc_error *overall_error;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@ -92,7 +95,8 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context) {
grpc_pollset_set_destroy(context->pollset_set);
}
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *due_to_error);
static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
@ -112,9 +116,22 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_iomgr_unregister_object(&req->iomgr_obj);
gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing);
GRPC_ERROR_UNREF(req->overall_error);
gpr_free(req);
}
static void append_error(internal_request *req, grpc_error *error) {
if (req->overall_error == GRPC_ERROR_NONE) {
req->overall_error = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
}
grpc_resolved_address *addr = &req->addresses->addrs[req->next_address - 1];
char *addr_text = grpc_sockaddr_to_uri((struct sockaddr *)addr->addr);
req->overall_error = grpc_error_add_child(
req->overall_error,
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text));
gpr_free(addr_text);
}
static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) {
grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read);
}
@ -124,27 +141,25 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
internal_request *req = user_data;
size_t i;
for (i = 0; i < req->incoming.count; i++) {
GRPC_ERROR_REF(error);
for (i = 0; error == GRPC_ERROR_NONE && i < req->incoming.count; i++) {
if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
req->have_read_byte = 1;
if (!grpc_http_parser_parse(&req->parser, req->incoming.slices[i])) {
finish(exec_ctx, req, 0);
return;
}
error = grpc_http_parser_parse(&req->parser, req->incoming.slices[i]);
}
}
if (error == GRPC_ERROR_NONE) {
do_read(exec_ctx, req);
} else if (!req->have_read_byte) {
next_address(exec_ctx, req);
next_address(exec_ctx, req, GRPC_ERROR_REF(error));
} else {
grpc_error *err = grpc_http_parser_eof(&req->parser);
if (err == GRPC_ERROR_NONE && (req->parser.type != GRPC_HTTP_RESPONSE)) {
err = GRPC_ERROR_CREATE("Expected http response, got http request");
}
finish(exec_ctx, req, err);
append_error(req, GRPC_ERROR_REF(error));
finish(exec_ctx, req, error);
}
GRPC_ERROR_UNREF(error);
}
static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) {
@ -156,7 +171,7 @@ static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
if (error == GRPC_ERROR_NONE) {
on_written(exec_ctx, req);
} else {
next_address(exec_ctx, req);
next_address(exec_ctx, req, GRPC_ERROR_REF(error));
}
}
@ -171,7 +186,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
internal_request *req = arg;
if (!ep) {
next_address(exec_ctx, req);
next_address(exec_ctx, req,
GRPC_ERROR_CREATE("Unexplained handshake failure"));
return;
}
@ -184,7 +200,7 @@ static void on_connected(grpc_exec_ctx *exec_ctx, void *arg,
internal_request *req = arg;
if (!req->ep) {
next_address(exec_ctx, req);
next_address(exec_ctx, req, GRPC_ERROR_REF(error));
return;
}
req->handshaker->handshake(
@ -193,10 +209,16 @@ static void on_connected(grpc_exec_ctx *exec_ctx, void *arg,
on_handshake_done);
}
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) {
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
grpc_error *error) {
grpc_resolved_address *addr;
if (error != GRPC_ERROR_NONE) {
append_error(req, error);
}
if (req->next_address == req->addresses->naddrs) {
finish(exec_ctx, req, 0);
finish(exec_ctx, req,
GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
&req->overall_error, 1));
return;
}
addr = &req->addresses->addrs[req->next_address++];
@ -213,7 +235,7 @@ static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
return;
}
req->next_address = 0;
next_address(exec_ctx, req);
next_address(exec_ctx, req, GRPC_ERROR_NONE);
}
static void internal_request_begin(grpc_exec_ctx *exec_ctx,
@ -233,6 +255,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
req->overall_error = GRPC_ERROR_NONE;
grpc_closure_init(&req->on_read, on_read, req);
grpc_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);

@ -174,14 +174,15 @@ static grpc_error *add_header(grpc_http_parser *parser) {
GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length);
hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length);
if (parser->type == GRPC_HTTP_RESPONSE) {
hdr_count = &parser->http.response->hdr_count;
hdrs = &parser->http.response->hdrs;
} else if (parser->type == GRPC_HTTP_REQUEST) {
hdr_count = &parser->http.request->hdr_count;
hdrs = &parser->http.request->hdrs;
} else {
return 0;
switch (parser->type) {
case GRPC_HTTP_RESPONSE:
hdr_count = &parser->http.response->hdr_count;
hdrs = &parser->http.response->hdrs;
break;
case GRPC_HTTP_REQUEST:
hdr_count = &parser->http.request->hdr_count;
hdrs = &parser->http.request->hdrs;
break;
}
if (*hdr_count == parser->hdr_capacity) {
@ -212,12 +213,13 @@ static grpc_error *finish_line(grpc_http_parser *parser) {
parser->state = GRPC_HTTP_BODY;
break;
}
if (!add_header(parser)) {
return 0;
err = add_header(parser);
if (err != GRPC_ERROR_NONE) {
return err;
}
break;
case GRPC_HTTP_BODY:
GPR_UNREACHABLE_CODE(return 0);
GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
}
parser->cur_line_length = 0;
@ -248,29 +250,28 @@ static grpc_error *addbyte_body(grpc_http_parser *parser, uint8_t byte) {
return GRPC_ERROR_NONE;
}
static grpc_error *check_line(grpc_http_parser *parser) {
static bool check_line(grpc_http_parser *parser) {
if (parser->cur_line_length >= 2 &&
parser->cur_line[parser->cur_line_length - 2] == '\r' &&
parser->cur_line[parser->cur_line_length - 1] == '\n') {
return GRPC_ERROR_NONE;
return true;
}
// HTTP request with \n\r line termiantors.
else if (parser->cur_line_length >= 2 &&
parser->cur_line[parser->cur_line_length - 2] == '\n' &&
parser->cur_line[parser->cur_line_length - 1] == '\r') {
return GRPC_ERROR_NONE;
return true;
}
// HTTP request with only \n line terminators.
else if (parser->cur_line_length >= 1 &&
parser->cur_line[parser->cur_line_length - 1] == '\n') {
parser->cur_line_end_length = 1;
return GRPC_ERROR_NONE;
return true;
}
return GRPC_ERROR_CREATE(
"Expected line ending (one of \\r\\n, \\n\\r, or \\n)");
return false;
}
static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) {

@ -222,15 +222,14 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
/* TODO(klempner): Log interesting errors */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, 0);
call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, 0);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@ -257,7 +256,7 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, 0);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
tcp_continue_read(exec_ctx, tcp);

@ -103,6 +103,7 @@ static void test_get(int port) {
}
gpr_mu_unlock(g_mu);
gpr_free(host);
grpc_http_response_destroy(&response);
}
static void test_post(int port) {
@ -139,6 +140,7 @@ static void test_post(int port) {
}
gpr_mu_unlock(g_mu);
gpr_free(host);
grpc_http_response_destroy(&response);
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,

@ -65,7 +65,7 @@ static void test_request_succeeds(grpc_slice_split_mode split_mode,
GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i]) == GRPC_ERROR_NONE);
gpr_slice_unref(slices[i]);
}
GPR_ASSERT(grpc_http_parser_eof(&parser));
GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE);
GPR_ASSERT(GRPC_HTTP_REQUEST == parser.type);
GPR_ASSERT(0 == strcmp(expect_method, request.method));
@ -96,6 +96,7 @@ static void test_request_succeeds(grpc_slice_split_mode split_mode,
va_end(args);
GPR_ASSERT(i == request.hdr_count);
grpc_http_request_destroy(&request);
grpc_http_parser_destroy(&parser);
gpr_free(slices);
}
@ -109,6 +110,7 @@ static void test_succeeds(grpc_slice_split_mode split_mode, char *response_text,
gpr_slice *slices;
va_list args;
grpc_http_response response;
memset(&response, 0, sizeof(response));
grpc_split_slices(split_mode, &input_slice, 1, &slices, &num_slices);
gpr_slice_unref(input_slice);
@ -119,7 +121,7 @@ static void test_succeeds(grpc_slice_split_mode split_mode, char *response_text,
GPR_ASSERT(grpc_http_parser_parse(&parser, slices[i]) == GRPC_ERROR_NONE);
gpr_slice_unref(slices[i]);
}
GPR_ASSERT(grpc_http_parser_eof(&parser));
GPR_ASSERT(grpc_http_parser_eof(&parser) == GRPC_ERROR_NONE);
GPR_ASSERT(GRPC_HTTP_RESPONSE == parser.type);
GPR_ASSERT(expect_status == response.status);
@ -147,6 +149,7 @@ static void test_succeeds(grpc_slice_split_mode split_mode, char *response_text,
va_end(args);
GPR_ASSERT(i == response.hdr_count);
grpc_http_response_destroy(&response);
grpc_http_parser_destroy(&parser);
gpr_free(slices);
}
@ -190,7 +193,7 @@ static void test_request_fails(grpc_slice_split_mode split_mode,
size_t num_slices;
size_t i;
gpr_slice *slices;
int done = 0;
grpc_error *error = GRPC_ERROR_NONE;
grpc_http_request request;
memset(&request, 0, sizeof(request));
@ -200,15 +203,16 @@ static void test_request_fails(grpc_slice_split_mode split_mode,
grpc_http_parser_init(&parser, GRPC_HTTP_REQUEST, &request);
for (i = 0; i < num_slices; i++) {
if (!done && !grpc_http_parser_parse(&parser, slices[i])) {
done = 1;
if (error == GRPC_ERROR_NONE) {
error = grpc_http_parser_parse(&parser, slices[i]);
}
gpr_slice_unref(slices[i]);
}
if (!done && !grpc_http_parser_eof(&parser)) {
done = 1;
if (error == GRPC_ERROR_NONE) {
error = grpc_http_parser_eof(&parser);
}
GPR_ASSERT(done);
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_ERROR_UNREF(error);
grpc_http_request_destroy(&request);
grpc_http_parser_destroy(&parser);

Loading…
Cancel
Save