Adds support for per message compression

pull/3013/head
Tim Emiola 9 years ago
parent e710e2bb99
commit 7840a55736
  1. 57
      src/ruby/ext/grpc/rb_call.c
  2. 5
      src/ruby/lib/grpc/generic/active_call.rb
  3. 8
      src/ruby/spec/call_spec.rb
  4. 28
      src/ruby/spec/generic/active_call_spec.rb

@ -82,6 +82,10 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
/* id_write_flag is name of the attribute used to access the write_flag
* saved on the call. */
static ID id_write_flag;
/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
static VALUE sym_send_message;
static VALUE sym_send_metadata;
@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
/*
call-seq:
write_flag = call.write_flag
Gets the write_flag value saved the call. */
static VALUE grpc_rb_call_get_write_flag(VALUE self) {
return rb_ivar_get(self, id_write_flag);
}
/*
call-seq:
call.write_flag = write_flag
Saves the write_flag on the call. */
static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
rb_obj_classname(write_flag));
return Qnil;
}
return rb_ivar_set(self, id_write_flag, write_flag);
}
/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
to fill grpc_metadata_array.
@ -437,17 +465,19 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
uint write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
static void grpc_run_batch_stack_init(run_batch_stack *st) {
static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
grpc_metadata_array_init(&st->recv_metadata);
grpc_metadata_array_init(&st->recv_trailing_metadata);
st->op_num = 0;
st->write_flag = write_flag;
}
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
uint write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
return Qnil;
}
grpc_run_batch_stack_init(&st);
if (rb_write_flag != Qnil) {
write_flag = NUM2UINT(rb_write_flag);
}
grpc_run_batch_stack_init(&st, write_flag);
grpc_run_batch_stack_fill_ops(&st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
static void Init_grpc_write_flags() {
/* Constants representing the write flags in grpc.h */
VALUE grpc_rb_mWriteFlags =
rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
UINT2NUM(GRPC_WRITE_BUFFER_HINT));
rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
UINT2NUM(GRPC_WRITE_NO_COMPRESS));
}
static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
@ -735,10 +781,14 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
id_status = rb_intern("status");
id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
id_cq = rb_intern("__cq");
@ -766,6 +816,7 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
Init_grpc_write_flags();
}
/* Gets the call from the ruby object */

@ -59,7 +59,7 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
def_delegators :@call, :cancel, :metadata
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
# client_invoke begins a client invocation.
#
@ -484,6 +484,7 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status, :start_call, :wait)
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=)
end
end

@ -31,6 +31,14 @@ require 'grpc'
include GRPC::Core::StatusCodes
describe GRPC::Core::WriteFlags do
it 'should define the known write flag values' do
m = GRPC::Core::WriteFlags
expect(m.const_get(:BUFFER_HINT)).to_not be_nil
expect(m.const_get(:NO_COMPRESS)).to_not be_nil
end
end
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {

@ -35,6 +35,7 @@ describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CallOps = GRPC::Core::CallOps
WriteFlags = GRPC::Core::WriteFlags
before(:each) do
@pass_through = proc { |x| x }
@ -129,6 +130,31 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
TEST_WRITE_FLAGS.each do |f|
it "successfully makes calls with write_flag set to #{f}" do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline)
msg = 'message is a string'
client_call.write_flag = f
client_call.remote_send(msg)
# confirm that the message was marshalled
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
end
end
describe '#client_invoke' do
@ -261,7 +287,7 @@ describe GRPC::ActiveCall do
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
n = 3 # arbitrary value > 1
n = 3 # arbitrary value > 1
n.times do
server_call.remote_send(reply)
expect(e.next).to eq(reply)

Loading…
Cancel
Save