Reviewer comments

reviewable/pr22575/r5
Yash Tibrewal 5 years ago
parent e5a1509d94
commit ce8a1df713
  1. 73
      src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
  2. 37
      src/core/ext/filters/http/message_decompress/message_decompress_filter.h
  3. 7
      src/core/lib/surface/call.cc
  4. 4
      src/core/lib/surface/call.h

@ -1,20 +1,20 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
@ -44,7 +44,7 @@ class ChannelData {};
class CallData {
public:
CallData(const grpc_call_element_args& args)
explicit CallData(const grpc_call_element_args& args)
: call_combiner_(args.call_combiner) {
// Initialize state for recv_initial_metadata_ready callback
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
@ -64,9 +64,11 @@ class CallData {
~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); }
static void DecompressStartTransportStreamOpBatch(
public:
void DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
private:
static void OnRecvInitialMetadataReady(void* arg, grpc_error* error);
// Methods for processing a receive message event
@ -82,8 +84,7 @@ class CallData {
void MaybeResumeOnRecvTrailingMetadataReady();
static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error);
private:
grpc_core::CallCombiner* call_combiner_ = nullptr;
grpc_core::CallCombiner* call_combiner_;
// Overall error for the call
grpc_error* error_ = GRPC_ERROR_NONE;
// Fields for handling recv_initial_metadata_ready callback
@ -285,36 +286,40 @@ void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) {
void CallData::DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
CallData* calld = static_cast<CallData*>(elem->call_data);
// Handle recv_initial_metadata.
if (batch->recv_initial_metadata) {
calld->recv_initial_metadata_ =
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
calld->original_recv_initial_metadata_ready_ =
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->on_recv_initial_metadata_ready_;
&on_recv_initial_metadata_ready_;
}
// Handle recv_message
if (batch->recv_message) {
calld->recv_message_ = batch->payload->recv_message.recv_message;
calld->original_recv_message_ready_ =
recv_message_ = batch->payload->recv_message.recv_message;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
batch->payload->recv_message.recv_message_ready =
&calld->on_recv_message_ready_;
batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_;
}
// Handle recv_trailing_metadata
if (batch->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready_ =
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->on_recv_trailing_metadata_ready_;
&on_recv_trailing_metadata_ready_;
}
// Pass control down the stack.
grpc_call_next_op(elem, batch);
}
void DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("decompress_start_transport_stream_op_batch", 0);
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->DecompressStartTransportStreamOpBatch(elem, batch);
}
static grpc_error* DecompressInitCallElem(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(*args);
@ -333,12 +338,12 @@ static grpc_error* DecompressInitChannelElem(
return GRPC_ERROR_NONE;
}
void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) { return; }
void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {}
} // namespace
const grpc_channel_filter grpc_message_decompress_filter = {
CallData::DecompressStartTransportStreamOpBatch,
DecompressStartTransportStreamOpBatch,
grpc_channel_next_op,
sizeof(CallData),
DecompressInitCallElem,

@ -1,20 +1,20 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H
#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H
@ -25,5 +25,4 @@
extern const grpc_channel_filter grpc_message_decompress_filter;
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
*/
#endif // GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H

@ -200,8 +200,6 @@ struct grpc_call {
/* Stream compression algorithm for *incoming* data */
grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
GRPC_STREAM_COMPRESS_NONE;
/* Maximum size for uncompressed receive message in bytes. -1 for unlimited */
int max_uncompressed_receive_message_length = -1;
/* Supported encodings (compression algorithms), a bitset.
* Always support no compression. */
uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
@ -499,11 +497,6 @@ void grpc_call_set_completion_queue(grpc_call* call,
&call->pollent);
}
void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call,
int limit) {
call->max_uncompressed_receive_message_length = limit;
}
#ifndef NDEBUG
#define REF_REASON reason
#define REF_ARG , const char* reason

@ -59,10 +59,6 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq);
/* Sets the max uncompressed receive message length for the call. */
void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call,
int limit);
#ifndef NDEBUG
void grpc_call_internal_ref(grpc_call* call, const char* reason);
void grpc_call_internal_unref(grpc_call* call, const char* reason);

Loading…
Cancel
Save