Merge remote-tracking branch 'upstream/master' into fixit_call_c

pull/4318/head
yang-g 9 years ago
commit 55c513f738
  1. 1
      src/core/surface/call.c
  2. 3
      src/core/transport/chttp2/frame_data.h
  3. 6
      src/core/transport/chttp2/hpack_encoder.c
  4. 18
      src/core/transport/chttp2/incoming_metadata.c
  5. 2
      src/core/transport/chttp2/incoming_metadata.h
  6. 4
      src/csharp/Grpc.Core/Channel.cs
  7. 4
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
  8. 2
      src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs
  9. 40
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  10. 2
      src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
  11. 10
      src/csharp/Grpc.Core/Server.cs
  12. 12
      src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
  13. 27
      test/core/security/credentials_test.c
  14. 37
      test/core/surface/byte_buffer_reader_test.c

@ -1270,6 +1270,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
if (call->receiving_message) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
call->receiving_message = 1;
bctl->recv_message = 1;

@ -94,9 +94,6 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
gpr_uint32 write_bytes, int is_eof,
gpr_slice_buffer *outbuf);

@ -458,12 +458,6 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
GRPC_MDELEM_UNREF(mdelem);
}
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) {
gpr_slice slice = gpr_slice_malloc(9);
fill_header(GPR_SLICE_START_PTR(slice), GRPC_CHTTP2_FRAME_DATA, id, 0, 1);
return slice;
}
static gpr_uint32 elems_for_bytes(gpr_uint32 bytes) {
return (bytes + 31) / 32;
}

@ -56,16 +56,6 @@ void grpc_chttp2_incoming_metadata_buffer_destroy(
gpr_free(buffer->elems);
}
void grpc_chttp2_incoming_metadata_buffer_reset(
grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
GPR_ASSERT(!buffer->published);
for (i = 0; i < buffer->count; i++) {
GRPC_MDELEM_UNREF(buffer->elems[i].md);
}
buffer->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) {
GPR_ASSERT(!buffer->published);
@ -83,14 +73,6 @@ void grpc_chttp2_incoming_metadata_buffer_set_deadline(
buffer->deadline = deadline;
}
void grpc_chttp2_incoming_metadata_buffer_swap(
grpc_chttp2_incoming_metadata_buffer *a,
grpc_chttp2_incoming_metadata_buffer *b) {
GPR_ASSERT(!a->published);
GPR_ASSERT(!b->published);
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b);
}
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) {
GPR_ASSERT(!buffer->published);

@ -49,8 +49,6 @@ void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);

@ -173,7 +173,7 @@ namespace Grpc.Core
{
throw new OperationCanceledException("Channel has reached FatalFailure state.");
}
await WaitForStateChangedAsync(currentState, deadline);
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = handle.CheckConnectivityState(false);
}
}
@ -198,7 +198,7 @@ namespace Grpc.Core
handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release());
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
}
internal ChannelSafeHandle Handle

@ -70,12 +70,12 @@ namespace Grpc.Core.Internal
}
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task;
var result = await taskSource.Task.ConfigureAwait(false);
this.current = result;
if (result == null)
{
await call.StreamingCallFinishedTask;
await call.StreamingCallFinishedTask.ConfigureAwait(false);
return false;
}
return true;

@ -96,7 +96,7 @@ namespace Grpc.Core.Internal
try
{
var metadata = new Metadata();
await interceptor(serviceUrl, metadata);
await interceptor(serviceUrl, metadata).ConfigureAwait(false);
using (var metadataArray = MetadataArraySafeHandle.Create(metadata))
{

@ -78,13 +78,13 @@ namespace Grpc.Core.Internal
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
Preconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
var result = await handler(request, context);
Preconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
var result = await handler(request, context).ConfigureAwait(false);
status = context.Status;
await responseStream.WriteAsync(result);
await responseStream.WriteAsync(result).ConfigureAwait(false);
}
catch (Exception e)
{
@ -93,13 +93,13 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask;
await finishedTask.ConfigureAwait(false);
}
}
@ -134,11 +134,11 @@ namespace Grpc.Core.Internal
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
Preconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
await handler(request, responseStream, context);
Preconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
catch (Exception e)
@ -149,13 +149,13 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask;
await finishedTask.ConfigureAwait(false);
}
}
@ -190,11 +190,11 @@ namespace Grpc.Core.Internal
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
var result = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
try
{
await responseStream.WriteAsync(result);
await responseStream.WriteAsync(result).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -209,13 +209,13 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask;
await finishedTask.ConfigureAwait(false);
}
}
@ -250,7 +250,7 @@ namespace Grpc.Core.Internal
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
await handler(requestStream, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
catch (Exception e)
@ -260,13 +260,13 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask;
await finishedTask.ConfigureAwait(false);
}
}
@ -284,8 +284,8 @@ namespace Grpc.Core.Internal
var finishedTask = asyncCall.ServerSideCallAsync();
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty);
await finishedTask;
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}

@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
}
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task;
var result = await taskSource.Task.ConfigureAwait(false);
this.current = result;
return result != null;
}

@ -148,10 +148,10 @@ namespace Grpc.Core
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
await Task.Run(() => GrpcEnvironment.Release());
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
}
/// <summary>
@ -169,7 +169,7 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
}
@ -268,7 +268,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, environment);
await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
}
catch (Exception e)
{
@ -288,7 +288,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
Task.Run(async () => await HandleCallAsync(newRpc));
Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false);
}
}

@ -48,9 +48,9 @@ namespace Grpc.Core.Utils
public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
while (await streamReader.MoveNext())
while (await streamReader.MoveNext().ConfigureAwait(false))
{
await asyncAction(streamReader.Current);
await asyncAction(streamReader.Current).ConfigureAwait(false);
}
}
@ -61,7 +61,7 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
while (await streamReader.MoveNext())
while (await streamReader.MoveNext().ConfigureAwait(false))
{
result.Add(streamReader.Current);
}
@ -77,11 +77,11 @@ namespace Grpc.Core.Utils
{
foreach (var element in elements)
{
await streamWriter.WriteAsync(element);
await streamWriter.WriteAsync(element).ConfigureAwait(false);
}
if (complete)
{
await streamWriter.CompleteAsync();
await streamWriter.CompleteAsync().ConfigureAwait(false);
}
}
@ -93,7 +93,7 @@ namespace Grpc.Core.Utils
{
foreach (var element in elements)
{
await streamWriter.WriteAsync(element);
await streamWriter.WriteAsync(element).ConfigureAwait(false);
}
}
}

@ -31,8 +31,10 @@
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/security/credentials.h"
#include <stdlib.h>
#include <string.h>
#include "src/core/httpcli/httpcli.h"
@ -1013,6 +1015,30 @@ static void test_metadata_plugin_failure(void) {
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_get_well_known_google_credentials_file_path(void) {
#ifdef GPR_POSIX_FILE
char *path;
char *old_home = gpr_getenv("HOME");
gpr_setenv("HOME", "/tmp");
path = grpc_get_well_known_google_credentials_file_path();
GPR_ASSERT(path != NULL);
GPR_ASSERT(0 == strcmp("/tmp/.config/" GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY
"/" GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE,
path));
gpr_free(path);
#if defined(GPR_POSIX_ENV) || defined(GPR_LINUX_ENV)
unsetenv("HOME");
path = grpc_get_well_known_google_credentials_file_path();
GPR_ASSERT(path == NULL);
#endif /* GPR_POSIX_ENV || GPR_LINUX_ENV */
gpr_setenv("HOME", old_home);
#else /* GPR_POSIX_FILE */
char *path = grpc_get_well_known_google_credentials_file_path();
GPR_ASSERT(path != NULL);
gpr_free(path);
#endif
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_empty_md_store();
@ -1043,5 +1069,6 @@ int main(int argc, char **argv) {
test_google_default_creds_access_token();
test_metadata_plugin_success();
test_metadata_plugin_failure();
test_get_well_known_google_credentials_file_path();
return 0;
}

@ -217,6 +217,42 @@ static void test_readall(void) {
grpc_byte_buffer_destroy(buffer);
}
static void test_byte_buffer_copy(void) {
char *lotsa_as[512];
char *lotsa_bs[1024];
gpr_slice slices[2];
grpc_byte_buffer *buffer;
grpc_byte_buffer *copied_buffer;
grpc_byte_buffer_reader reader;
gpr_slice slice_out;
LOG_TEST("test_byte_buffer_copy");
memset(lotsa_as, 'a', 512);
memset(lotsa_bs, 'b', 1024);
/* use slices large enough to overflow inlining */
slices[0] = gpr_slice_malloc(512);
memcpy(GPR_SLICE_START_PTR(slices[0]), lotsa_as, 512);
slices[1] = gpr_slice_malloc(1024);
memcpy(GPR_SLICE_START_PTR(slices[1]), lotsa_bs, 1024);
buffer = grpc_raw_byte_buffer_create(slices, 2);
gpr_slice_unref(slices[0]);
gpr_slice_unref(slices[1]);
copied_buffer = grpc_byte_buffer_copy(buffer);
grpc_byte_buffer_reader_init(&reader, copied_buffer);
slice_out = grpc_byte_buffer_reader_readall(&reader);
GPR_ASSERT(GPR_SLICE_LENGTH(slice_out) == 512 + 1024);
GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(slice_out), lotsa_as, 512) == 0);
GPR_ASSERT(memcmp(&(GPR_SLICE_START_PTR(slice_out)[512]), lotsa_bs, 1024) ==
0);
gpr_slice_unref(slice_out);
grpc_byte_buffer_destroy(buffer);
grpc_byte_buffer_destroy(copied_buffer);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
@ -225,6 +261,7 @@ int main(int argc, char **argv) {
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
test_byte_buffer_from_reader();
test_byte_buffer_copy();
test_readall();
return 0;
}

Loading…
Cancel
Save