Removed use of call.accept in gRPC Ruby

Change on 2014/12/11 by temiola <temiola@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81895023
pull/1/merge
temiola 10 years ago committed by Jan Tattermusch
parent 4a3be1c996
commit 71bb137c56
  1. 63
      src/ruby/ext/grpc/rb_call.c
  2. 6
      src/ruby/ext/grpc/rb_channel.c
  3. 3
      src/ruby/lib/grpc/generic/rpc_server.rb
  4. 22
      src/ruby/spec/call_spec.rb
  5. 15
      src/ruby/spec/client_server_spec.rb
  6. 9
      src/ruby/spec/generic/active_call_spec.rb
  7. 3
      src/ruby/spec/generic/client_stub_spec.rb

@ -383,43 +383,65 @@ static VALUE grpc_rb_call_writes_done(VALUE self, VALUE tag) {
}
/* call-seq:
call.accept(completion_queue, flags=nil)
call.server_end_initial_metadata(flag)
Accept an incoming RPC, binding a completion queue to it.
To be called after adding metadata to the call, but before sending
messages.
Only to be called on servers, before sending messages.
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the server.
Produces no events. */
static VALUE grpc_rb_call_accept(int argc, VALUE *argv, VALUE self) {
VALUE cqueue = Qnil;
VALUE finished_tag = Qnil;
Can only be called on the server, must be called after
grpc_call_server_accept
Produces no events */
static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv,
VALUE self) {
VALUE flags = Qnil;
grpc_call *call = NULL;
grpc_completion_queue *cq = NULL;
grpc_call_error err;
/* "21" == 2 mandatory args, 1 (flags) is optional */
rb_scan_args(argc, argv, "21", &cqueue, &finished_tag, &flags);
/* "01" == 1 (flags) is optional */
rb_scan_args(argc, argv, "01", &flags);
if (NIL_P(flags)) {
flags = UINT2NUM(0); /* Default to no flags */
}
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
Data_Get_Struct(self, grpc_call, call);
err = grpc_call_accept(call, cq, ROBJECT(finished_tag), NUM2UINT(flags));
err = grpc_call_server_end_initial_metadata(call, NUM2UINT(flags));
if (err != GRPC_CALL_OK) {
rb_raise(rb_eCallError, "end_initial_metadata failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
}
return Qnil;
}
/* call-seq:
call.server_accept(completion_queue, finished_tag)
Accept an incoming RPC, binding a completion queue to it.
To be called before sending or receiving messages.
REQUIRES: Can be called at most once per call.
Can only be called on the server.
Produces a GRPC_FINISHED event with finished_tag when the call has been
completed (there may be other events for the call pending at this
time) */
static VALUE grpc_rb_call_server_accept(VALUE self, VALUE cqueue,
VALUE finished_tag) {
grpc_call *call = NULL;
grpc_completion_queue *cq = grpc_rb_get_wrapped_completion_queue(cqueue);
grpc_call_error err;
Data_Get_Struct(self, grpc_call, call);
err = grpc_call_server_accept(call, cq, ROBJECT(finished_tag));
if (err != GRPC_CALL_OK) {
rb_raise(rb_eCallError, "accept failed: %s (code=%d)",
rb_raise(rb_eCallError, "server_accept failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
}
/* Add the completion queue as an instance attribute, prevents it from being
* GCed until this call object is GCed */
rb_ivar_set(self, id_cq, cqueue);
return Qnil;
}
/* rb_cCall is the ruby class that proxies grpc_call. */
VALUE rb_cCall = Qnil;
@ -436,6 +458,8 @@ void Init_google_rpc_error_codes() {
UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
rb_define_const(rb_RpcErrors, "NOT_ON_CLIENT",
UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
rb_define_const(rb_RpcErrors, "ALREADY_ACCEPTED",
UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
rb_define_const(rb_RpcErrors, "ALREADY_INVOKED",
UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
rb_define_const(rb_RpcErrors, "NOT_INVOKED",
@ -457,6 +481,9 @@ void Init_google_rpc_error_codes() {
rb_str_new2("not available on a server"));
rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
rb_str_new2("not available on a client"));
rb_hash_aset(rb_error_code_details,
UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
rb_str_new2("call is already accepted"));
rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
rb_str_new2("call is already invoked"));
rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
@ -485,7 +512,9 @@ void Init_google_rpc_call() {
rb_define_method(rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, 1);
/* Add ruby analogues of the Call methods. */
rb_define_method(rb_cCall, "accept", grpc_rb_call_accept, -1);
rb_define_method(rb_cCall, "server_accept", grpc_rb_call_server_accept, 2);
rb_define_method(rb_cCall, "server_end_initial_metadata",
grpc_rb_call_server_end_initial_metadata, -1);
rb_define_method(rb_cCall, "add_metadata", grpc_rb_call_add_metadata,
-1);
rb_define_method(rb_cCall, "cancel", grpc_rb_call_cancel, 0);

@ -247,6 +247,12 @@ void Init_google_rpc_channel() {
id_target = rb_intern("__target");
rb_define_const(rb_cChannel, "SSL_TARGET",
ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
rb_define_const(rb_cChannel, "ENABLE_CENSUS",
ID2SYM(rb_intern(GRPC_ARG_ENABLE_CENSUS)));
rb_define_const(rb_cChannel, "MAX_CONCURRENT_STREAMS",
ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
rb_define_const(rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
}
/* Gets the wrapped channel from the ruby wrapper */

@ -247,7 +247,8 @@ module Google::RPC
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata on the call
call.accept(call_queue, finished_tag)
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata()
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests

@ -40,21 +40,23 @@ describe GRPC::Core::RpcErrors do
:ERROR => 1,
:NOT_ON_SERVER => 2,
:NOT_ON_CLIENT => 3,
:ALREADY_INVOKED => 4,
:NOT_INVOKED => 5,
:ALREADY_FINISHED => 6,
:TOO_MANY_OPERATIONS => 7,
:INVALID_FLAGS => 8,
:ALREADY_ACCEPTED => 4,
:ALREADY_INVOKED => 5,
:NOT_INVOKED => 6,
:ALREADY_FINISHED => 7,
:TOO_MANY_OPERATIONS => 8,
:INVALID_FLAGS => 9,
:ErrorMessages => {
0=>'ok',
1=>'unknown error',
2=>'not available on a server',
3=>'not available on a client',
4=>'call is already invoked',
5=>'call is not yet invoked',
6=>'call is already finished',
7=>'outstanding read or write present',
8=>'a bad flag was given',
4=>'call is already accepted',
5=>'call is already invoked',
6=>'call is not yet invoked',
7=>'call is already finished',
8=>'outstanding read or write present',
9=>'a bad flag was given',
}
}
end

@ -70,7 +70,8 @@ shared_context 'setup: tags' do
ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
expect(ev).not_to be_nil
expect(ev.type).to be(SERVER_RPC_NEW)
ev.call.accept(@server_queue, @server_finished_tag)
ev.call.server_accept(@server_queue, @server_finished_tag)
ev.call.server_end_initial_metadata()
ev.call.start_read(@server_tag)
ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
expect(ev.type).to be(READ)
@ -115,7 +116,8 @@ shared_examples 'basic GRPC message delivery is OK' do
# accept the call
server_call = ev.call
server_call.accept(@server_queue, @server_finished_tag)
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata
# confirm the server can read the inbound message
server_call.start_read(@server_tag)
@ -150,7 +152,8 @@ shared_examples 'basic GRPC message delivery is OK' do
# accept the call - need to do this to sent status.
server_call = ev.call
server_call.accept(@server_queue, @server_finished_tag)
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
sts = Status.new(StatusCodes::NOT_FOUND, 'not found')
server_call.start_write_status(sts, @server_tag)
@ -287,7 +290,8 @@ shared_examples 'GRPC metadata delivery works OK' do
server_call = ev.call
# ... server accepts the call without adding metadata
server_call.accept(@server_queue, @server_finished_tag)
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
# ... these server sends some data, allowing the metadata read
server_call.start_write(ByteBuffer.new('reply with metadata'),
@ -312,7 +316,8 @@ shared_examples 'GRPC metadata delivery works OK' do
# ... server adds metadata and accepts the call
server_call.add_metadata(md)
server_call.accept(@server_queue, @server_finished_tag)
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
# Now the client can read the metadata
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)

@ -109,7 +109,8 @@ describe GRPC::ActiveCall do
expect(ev.tag).to be(@server_tag)
# Accept the call, and verify that the server reads the response ok.
ev.call.accept(@client_queue, @server_tag)
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata()
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq(msg)
@ -130,7 +131,8 @@ describe GRPC::ActiveCall do
# confirm that the message was marshalled
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.accept(@client_queue, @server_tag)
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata()
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
@ -368,7 +370,8 @@ describe GRPC::ActiveCall do
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.add_metadata(kw)
ev.call.accept(@client_queue, @server_done_tag)
ev.call.server_accept(@client_queue, @server_done_tag)
ev.call.server_end_initial_metadata()
ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: @server_done_tag)

@ -520,7 +520,8 @@ describe 'ClientStub' do
server_call = ev.call
server_call.metadata = ev.result.metadata
finished_tag = Object.new
server_call.accept(server_queue, finished_tag)
server_call.server_accept(server_queue, finished_tag)
server_call.server_end_initial_metadata()
GRPC::ActiveCall.new(server_call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
finished_tag: finished_tag)
end

Loading…
Cancel
Save