Exposes event#finish as #close.

- ensures that it's a runtime error if an event if used after it's finished

- updates all calls where the completion_queue is used to ensure the event's
retrieved are explicitly finished
	Change on 2014/12/18 by temiola <temiola@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82445748
pull/1/merge
temiola 10 years ago committed by Michael Lumish
parent da029e39b6
commit 21bb60cf4d
  1. 1
      src/ruby/ext/grpc/rb_call.c
  2. 6
      src/ruby/ext/grpc/rb_completion_queue.c
  3. 123
      src/ruby/ext/grpc/rb_event.c
  4. 11
      src/ruby/ext/grpc/rb_event.h
  5. 11
      src/ruby/ext/grpc/rb_grpc.c
  6. 6
      src/ruby/ext/grpc/rb_grpc.h
  7. 72
      src/ruby/lib/grpc/generic/active_call.rb
  8. 46
      src/ruby/lib/grpc/generic/bidi_call.rb
  9. 9
      src/ruby/lib/grpc/generic/rpc_server.rb

@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include "rb_byte_buffer.h"
#include "rb_completion_queue.h"
#include "rb_event.h"
#include "rb_metadata.h"
#include "rb_grpc.h"

@ -141,8 +141,7 @@ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) {
if (next_call.event == NULL) {
return Qnil;
}
return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
next_call.event);
return grpc_rb_new_event(next_call.event);
}
/* Blocks until the next event for given tag is available, and returns the
@ -160,8 +159,7 @@ static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
if (next_call.event == NULL) {
return Qnil;
}
return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
next_call.event);
return grpc_rb_new_event(next_call.event);
}
/* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */

@ -41,12 +41,49 @@
#include "rb_call.h"
#include "rb_metadata.h"
/* grpc_rb_event wraps a grpc_event. It provides a peer ruby object,
* 'mark' to minimize copying when an event is created from ruby. */
typedef struct grpc_rb_event {
/* Holder of ruby objects involved in constructing the channel */
VALUE mark;
/* The actual event */
grpc_event *wrapped;
} grpc_rb_event;
/* rb_mCompletionType is a ruby module that holds the completion type values */
VALUE rb_mCompletionType = Qnil;
/* Helper function to free an event. */
void grpc_rb_event_finish(void *p) {
grpc_event_finish(p);
/* Destroys Event instances. */
static void grpc_rb_event_free(void *p) {
grpc_rb_event *ev = NULL;
if (p == NULL) {
return;
};
ev = (grpc_rb_event *)p;
/* Deletes the wrapped object if the mark object is Qnil, which indicates
* that no other object is the actual owner. */
if (ev->wrapped != NULL && ev->mark == Qnil) {
grpc_event_finish(ev->wrapped);
rb_warning("event gc: destroyed the c event");
} else {
rb_warning("event gc: did not destroy the c event");
}
xfree(p);
}
/* Protects the mark object from GC */
static void grpc_rb_event_mark(void *p) {
grpc_rb_event *event = NULL;
if (p == NULL) {
return;
}
event = (grpc_rb_event *)p;
if (event->mark != Qnil) {
rb_gc_mark(event->mark);
}
}
static VALUE grpc_rb_event_result(VALUE self);
@ -54,7 +91,14 @@ static VALUE grpc_rb_event_result(VALUE self);
/* Obtains the type of an event. */
static VALUE grpc_rb_event_type(VALUE self) {
grpc_event *event = NULL;
Data_Get_Struct(self, grpc_event, event);
grpc_rb_event *wrapper = NULL;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "finished!");
return Qnil;
}
event = wrapper->wrapped;
switch (event->type) {
case GRPC_QUEUE_SHUTDOWN:
return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN"));
@ -94,7 +138,14 @@ static VALUE grpc_rb_event_type(VALUE self) {
/* Obtains the tag associated with an event. */
static VALUE grpc_rb_event_tag(VALUE self) {
grpc_event *event = NULL;
Data_Get_Struct(self, grpc_event, event);
grpc_rb_event *wrapper = NULL;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "finished!");
return Qnil;
}
event = wrapper->wrapped;
if (event->tag == NULL) {
return Qnil;
}
@ -103,10 +154,17 @@ static VALUE grpc_rb_event_tag(VALUE self) {
/* Obtains the call associated with an event. */
static VALUE grpc_rb_event_call(VALUE self) {
grpc_event *ev = NULL;
Data_Get_Struct(self, grpc_event, ev);
if (ev->call != NULL) {
return grpc_rb_wrap_call(ev->call);
grpc_event *event = NULL;
grpc_rb_event *wrapper = NULL;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "finished!");
return Qnil;
}
event = wrapper->wrapped;
if (event->call != NULL) {
return grpc_rb_wrap_call(event->call);
}
return Qnil;
}
@ -114,6 +172,7 @@ static VALUE grpc_rb_event_call(VALUE self) {
/* Obtains the metadata associated with an event. */
static VALUE grpc_rb_event_metadata(VALUE self) {
grpc_event *event = NULL;
grpc_rb_event *wrapper = NULL;
grpc_metadata *metadata = NULL;
VALUE key = Qnil;
VALUE new_ary = Qnil;
@ -121,9 +180,14 @@ static VALUE grpc_rb_event_metadata(VALUE self) {
VALUE value = Qnil;
size_t count = 0;
size_t i = 0;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "finished!");
return Qnil;
}
/* Figure out which metadata to read. */
Data_Get_Struct(self, grpc_event, event);
event = wrapper->wrapped;
switch (event->type) {
case GRPC_CLIENT_METADATA_READ:
@ -179,7 +243,13 @@ static VALUE grpc_rb_event_metadata(VALUE self) {
/* Obtains the data associated with an event. */
static VALUE grpc_rb_event_result(VALUE self) {
grpc_event *event = NULL;
Data_Get_Struct(self, grpc_event, event);
grpc_rb_event *wrapper = NULL;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "finished!");
return Qnil;
}
event = wrapper->wrapped;
switch (event->type) {
@ -245,11 +315,19 @@ static VALUE grpc_rb_event_result(VALUE self) {
return Qfalse;
}
/* rb_sNewServerRpc is the struct that holds new server rpc details. */
VALUE rb_sNewServerRpc = Qnil;
/* rb_sStatus is the struct that holds status details. */
VALUE rb_sStatus = Qnil;
static VALUE grpc_rb_event_finish(VALUE self) {
grpc_event *event = NULL;
grpc_rb_event *wrapper = NULL;
Data_Get_Struct(self, grpc_rb_event, wrapper);
if (wrapper->wrapped == NULL) { /* already closed */
return Qnil;
}
event = wrapper->wrapped;
grpc_event_finish(event);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
return Qnil;
}
/* rb_cEvent is the Event class whose instances proxy grpc_event */
VALUE rb_cEvent = Qnil;
@ -262,9 +340,6 @@ void Init_google_rpc_event() {
rb_eEventError = rb_define_class_under(rb_mGoogleRpcCore, "EventError",
rb_eStandardError);
rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject);
rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
"deadline", "metadata", NULL);
rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
/* Prevent allocation or inialization from ruby. */
rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc);
@ -276,6 +351,8 @@ void Init_google_rpc_event() {
rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0);
rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0);
rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0);
rb_define_alias(rb_cEvent, "close", "finish");
/* Constants representing the completion types */
rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore,
@ -298,3 +375,11 @@ void Init_google_rpc_event() {
rb_define_const(rb_mCompletionType, "RESERVED",
INT2NUM(GRPC_COMPLETION_DO_NOT_USE));
}
VALUE grpc_rb_new_event(grpc_event *ev) {
grpc_rb_event *wrapper = ALLOC(grpc_rb_event);
wrapper->wrapped = ev;
wrapper->mark = Qnil;
return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free,
wrapper);
}

@ -35,12 +35,7 @@
#define GRPC_RB_EVENT_H_
#include <ruby.h>
/* rb_sNewServerRpc is the struct that holds new server rpc details. */
extern VALUE rb_sNewServerRpc;
/* rb_sStruct is the struct that holds status details. */
extern VALUE rb_sStatus;
#include <grpc/grpc.h>
/* rb_cEvent is the Event class whose instances proxy grpc_event. */
extern VALUE rb_cEvent;
@ -49,8 +44,8 @@ extern VALUE rb_cEvent;
event processing. */
extern VALUE rb_eEventError;
/* Helper function to free an event. */
void grpc_rb_event_finish(void *p);
/* Used to create new ruby event objects */
VALUE grpc_rb_new_event(grpc_event *ev);
/* Initializes the Event and EventError classes. */
void Init_google_rpc_event();

@ -245,16 +245,27 @@ void grpc_rb_shutdown(void *vm) {
grpc_shutdown();
}
/* Initialize the Google RPC module structs */
/* rb_sNewServerRpc is the struct that holds new server rpc details. */
VALUE rb_sNewServerRpc = Qnil;
/* rb_sStatus is the struct that holds status details. */
VALUE rb_sStatus = Qnil;
/* Initialize the Google RPC module. */
VALUE rb_mGoogle = Qnil;
VALUE rb_mGoogleRPC = Qnil;
VALUE rb_mGoogleRpcCore = Qnil;
void Init_grpc() {
grpc_init();
ruby_vm_at_exit(grpc_rb_shutdown);
rb_mGoogle = rb_define_module("Google");
rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC");
rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core");
rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
"deadline", "metadata", NULL);
rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
Init_google_rpc_byte_buffer();
Init_google_rpc_event();

@ -47,6 +47,12 @@ extern VALUE rb_mGoogleRpcCore;
/* Class used to wrap timeval structs. */
extern VALUE rb_cTimeVal;
/* rb_sNewServerRpc is the struct that holds new server rpc details. */
extern VALUE rb_sNewServerRpc;
/* rb_sStruct is the struct that holds status details. */
extern VALUE rb_sStatus;
/* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the
wrapped struct does not need to participate in ruby gc. */
extern const RUBY_DATA_FUNC GC_NOT_MARKED;

@ -73,6 +73,7 @@ module Google::RPC
# wait for the invocation to be accepted
ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
ev.close
[finished_tag, client_metadata_read]
end
@ -191,11 +192,17 @@ module Google::RPC
def writes_done(assert_finished=true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close
end
if assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" if ev.nil?
ev.close
return @call.status
end
end
@ -206,22 +213,21 @@ module Google::RPC
# event.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
begin
raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
if ev.result.code != Core::StatusCodes::OK
raise BadStatus.new(ev.result.code, ev.result.details)
if ev.result.code != Core::StatusCodes::OK
raise BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
ensure
ev.close
end
res = ev.result
# NOTE(temiola): This is necessary to allow the C call struct wrapped
# within the active_call to be GCed; this is necessary so that other
# C-level destructors get called in the required order.
ev = nil # allow the event to be GCed
res
end
@ -246,8 +252,11 @@ module Google::RPC
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, WRITE_ACCEPTED)
ev = nil
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
end
# send_status sends a status to the remote endpoint
@ -260,7 +269,11 @@ module Google::RPC
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("Status sent: #{code}:'#{details}'")
if assert_finished
return finished
@ -283,13 +296,17 @@ module Google::RPC
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
if !ev.result.nil?
logger.debug("received req.to_s: #{ev.result.to_s}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
begin
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
if !ev.result.nil?
logger.debug("received req.to_s: #{ev.result.to_s}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
ensure
ev.close
end
logger.debug('found nil; the final response has been sent')
nil
@ -515,12 +532,15 @@ module Google::RPC
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
ev = nil
begin
ev = @cq.pluck(self, ZERO)
raise "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
ensure
ev.close unless ev.nil?
end
end

@ -149,15 +149,27 @@ module Google::RPC
payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
assert_event_type(ev, WRITE_ACCEPTED)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
end
if is_client
@call.writes_done(write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
assert_event_type(ev, FINISHED)
begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
end
rescue StandardError => e
@ -180,19 +192,23 @@ module Google::RPC
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
assert_event_type(ev, READ)
# handle the next event.
if ev.result.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
begin
assert_event_type(ev, READ)
# handle the next event.
if ev.result.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
end
# push the latest read onto the queue and continue reading
logger.debug("received req.to_s: #{ev.result.to_s}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
ensure
ev.close
end
# push the latest read onto the queue and continue reading
logger.debug("received req.to_s: #{ev.result.to_s}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
end
rescue StandardError => e

@ -217,18 +217,13 @@ module Google::RPC
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
if !c.nil?
mth = ev.result.method.to_sym
# NOTE(temiola): This is necessary to allow the C call struct wrapped
# within the active_call created by the event to be GCed; this is
# necessary so that other C-level destructors get called in the
# required order.
ev = nil
ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end

Loading…
Cancel
Save