Merge pull request #7569 from dgquintas/backport_cpp_compression_level

backport #7502 from dgquintas/cpp_compression_level
pull/7578/head
Nicolas Noble 9 years ago committed by GitHub
commit 623e4fc0cc
  1. 27
      include/grpc++/impl/codegen/async_stream.h
  2. 9
      include/grpc++/impl/codegen/async_unary_call.h
  3. 12
      include/grpc++/impl/codegen/call.h
  4. 15
      include/grpc++/impl/codegen/method_handler_impl.h
  5. 9
      include/grpc++/impl/codegen/server_context.h
  6. 15
      include/grpc++/impl/codegen/sync_stream.h
  7. 15
      src/cpp/server/server_context.cc

@ -330,6 +330,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -345,6 +348,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@ -363,6 +369,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -400,6 +409,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -409,6 +421,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -421,6 +436,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> {
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -459,6 +477,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -474,6 +495,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -486,6 +510,9 @@ class ServerAsyncReaderWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);

@ -126,6 +126,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_buf_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
@ -135,6 +138,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_buf_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@ -153,6 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_buf_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);

@ -180,17 +180,23 @@ class CallNoOp {
class CallOpSendInitialMetadata {
public:
CallOpSendInitialMetadata() : send_(false) {}
CallOpSendInitialMetadata() : send_(false) {
maybe_compression_level_.is_set = false;
}
void SendInitialMetadata(
const std::multimap<grpc::string, grpc::string>& metadata,
uint32_t flags) {
maybe_compression_level_.is_set = false;
send_ = true;
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
// TODO(dgq): expose compression level in API so it can be properly set.
maybe_compression_level_.is_set = false;
}
void set_compression_level(grpc_compression_level level) {
maybe_compression_level_.is_set = true;
maybe_compression_level_.level = level;
}
protected:

@ -65,6 +65,9 @@ class RpcMethodHandler : public MethodHandler {
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -104,6 +107,9 @@ class ClientStreamingHandler : public MethodHandler {
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -144,6 +150,9 @@ class ServerStreamingHandler : public MethodHandler {
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -177,6 +186,9 @@ class BidiStreamingHandler : public MethodHandler {
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -199,6 +211,9 @@ class UnknownMethodHandler : public MethodHandler {
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
if (context->compression_level_set()) {
ops->set_compression_level(context->compression_level());
}
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);

@ -130,7 +130,13 @@ class ServerContext {
grpc_compression_level compression_level() const {
return compression_level_;
}
void set_compression_level(grpc_compression_level level);
void set_compression_level(grpc_compression_level level) {
compression_level_set_ = true;
compression_level_ = level;
}
bool compression_level_set() const { return compression_level_set_; }
grpc_compression_algorithm compression_algorithm() const {
return compression_algorithm_;
@ -217,6 +223,7 @@ class ServerContext {
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
bool compression_level_set_;
grpc_compression_level compression_level_;
grpc_compression_algorithm compression_algorithm_;
};

@ -347,6 +347,9 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -375,6 +378,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -389,6 +395,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
@ -413,6 +422,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -434,6 +446,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);

@ -129,7 +129,8 @@ ServerContext::ServerContext()
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {}
sent_initial_metadata_(false),
compression_level_set_(false) {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
@ -139,7 +140,8 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
deadline_(deadline),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {
sent_initial_metadata_(false),
compression_level_set_(false) {
for (size_t i = 0; i < metadata_count; i++) {
client_metadata_.insert(std::pair<grpc::string_ref, grpc::string_ref>(
metadata[i].key,
@ -194,15 +196,6 @@ bool ServerContext::IsCancelled() const {
}
}
void ServerContext::set_compression_level(grpc_compression_level level) {
// TODO(dgq): get rid of grpc_call_compression_for_level and propagate the
// compression level by adding a new argument to
// CallOpSendInitialMetadata::SendInitialMetadata.
const grpc_compression_algorithm algorithm_for_level =
grpc_call_compression_for_level(call_, level);
set_compression_algorithm(algorithm_for_level);
}
void ServerContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
char* algorithm_name = NULL;

Loading…
Cancel
Save