Move parameters for all grpc_op types into their own sub-structs.

pull/9489/head
Mark D. Roth 8 years ago committed by Nicolas "Pixel" Noble
parent 196fdc422c
commit e6dd773dff
  1. 9
      include/grpc++/impl/codegen/call.h
  2. 12
      include/grpc/impl/codegen/grpc_types.h
  3. 9
      src/core/ext/lb_policy/grpclb/grpclb.c
  4. 13
      src/core/lib/surface/call.c
  5. 8
      src/core/lib/surface/call_log_batch.c
  6. 5
      src/core/lib/surface/server.c
  7. 23
      src/csharp/ext/grpc_csharp_ext.c
  8. 6
      src/node/ext/call.cc
  9. 8
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m
  10. 6
      src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
  11. 9
      src/php/ext/grpc/call.c
  12. 7
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  13. 12
      src/ruby/ext/grpc/rb_call.c

@ -249,7 +249,7 @@ class CallOpSendMessage {
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = write_options_.flags();
op->reserved = NULL;
op->data.send_message = send_buf_;
op->data.send_message.send_message = send_buf_;
// Flags are per-message: clear them after use.
write_options_.Clear();
}
@ -298,7 +298,7 @@ class CallOpRecvMessage {
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
op->data.recv_message.recv_message = &recv_buf_;
}
void FinishOp(bool* status, int max_receive_message_size) {
@ -379,7 +379,7 @@ class CallOpGenericRecvMessage {
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
op->data.recv_message.recv_message = &recv_buf_;
}
void FinishOp(bool* status, int max_receive_message_size) {
@ -486,7 +486,8 @@ class CallOpRecvInitialMetadata {
memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
op->data.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_arr_;
op->flags = 0;
op->reserved = NULL;
}

@ -418,7 +418,9 @@ typedef struct grpc_op {
grpc_compression_level level;
} maybe_compression_level;
} send_initial_metadata;
struct grpc_byte_buffer *send_message;
struct {
struct grpc_byte_buffer *send_message;
} send_message;
struct {
size_t trailing_metadata_count;
grpc_metadata *trailing_metadata;
@ -430,11 +432,15 @@ typedef struct grpc_op {
object, recv_initial_metadata->array is owned by the caller).
After the operation completes, call grpc_metadata_array_destroy on this
value, or reuse it in a future op. */
grpc_metadata_array *recv_initial_metadata;
struct {
grpc_metadata_array *recv_initial_metadata;
} recv_initial_metadata;
/** ownership of the byte buffer is moved to the caller; the caller must
call grpc_byte_buffer_destroy on this value, or reuse it in a future op.
*/
struct grpc_byte_buffer **recv_message;
struct {
struct grpc_byte_buffer **recv_message;
} recv_message;
struct {
/** ownership of the array is with the caller, but ownership of the
elements stays with the call object (ie key, value members are owned

@ -1178,14 +1178,15 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv;
op->data.recv_initial_metadata.recv_initial_metadata =
&glb_policy->lb_initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
GPR_ASSERT(glb_policy->lb_request_payload != NULL);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = glb_policy->lb_request_payload;
op->data.send_message.send_message = glb_policy->lb_request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
@ -1211,7 +1212,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &glb_policy->lb_response_payload;
op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
op->flags = 0;
op->reserved = NULL;
op++;
@ -1293,7 +1294,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
if (!glb_policy->shutting_down) {
/* keep listening for serverlist updates */
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &glb_policy->lb_response_payload;
op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
op->flags = 0;
op->reserved = NULL;
op++;

@ -1461,7 +1461,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (op->data.send_message == NULL) {
if (op->data.send_message.send_message == NULL) {
error = GRPC_CALL_ERROR_INVALID_MESSAGE;
goto done_with_error;
}
@ -1473,11 +1473,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->sending_message = 1;
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
&op->data.send_message.send_message->data.raw.slice_buffer,
op->flags);
/* If the outgoing buffer is already compressed, mark it as so in the
flags. These will be picked up by the compression filter and further
(wasteful) attempts at compression skipped. */
if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) {
if (op->data.send_message.send_message->data.raw.compression
> GRPC_COMPRESS_NONE) {
call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
stream_op->send_message = &call->sending_stream.base;
@ -1565,7 +1567,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
that case we're not necessarily covered by a poller. */
stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
@ -1588,7 +1591,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
call->receiving_message = 1;
bctl->recv_message = 1;
call->receiving_buffer = op->data.recv_message;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op->recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl, grpc_schedule_on_exec_ctx);

@ -63,7 +63,8 @@ char *grpc_op_string(const grpc_op *op) {
op->data.send_initial_metadata.count);
break;
case GRPC_OP_SEND_MESSAGE:
gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", op->data.send_message);
gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p",
op->data.send_message.send_message);
gpr_strvec_add(&b, tmp);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@ -79,11 +80,12 @@ char *grpc_op_string(const grpc_op *op) {
break;
case GRPC_OP_RECV_INITIAL_METADATA:
gpr_asprintf(&tmp, "RECV_INITIAL_METADATA ptr=%p",
op->data.recv_initial_metadata);
op->data.recv_initial_metadata.recv_initial_metadata);
gpr_strvec_add(&b, tmp);
break;
case GRPC_OP_RECV_MESSAGE:
gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", op->data.recv_message);
gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p",
op->data.recv_message.recv_message);
gpr_strvec_add(&b, tmp);
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:

@ -609,7 +609,7 @@ static void finish_start_new_rpc(
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message = &calld->payload;
op.data.recv_message.recv_message = &calld->payload;
grpc_closure_init(&calld->publish, publish_new_rpc, elem,
grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
@ -857,7 +857,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata = &calld->initial_metadata;
op.data.recv_initial_metadata.recv_initial_metadata =
&calld->initial_metadata;
grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,

@ -537,7 +537,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].data.send_message.send_message = ctx->send_message;
ops[1].flags = write_flags;
ops[1].reserved = NULL;
@ -546,12 +546,13 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[2].reserved = NULL;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[3].data.recv_initial_metadata.recv_initial_metadata =
&(ctx->recv_initial_metadata);
ops[3].flags = 0;
ops[3].reserved = NULL;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message = &(ctx->recv_message);
ops[4].data.recv_message.recv_message = &(ctx->recv_message);
ops[4].flags = 0;
ops[4].reserved = NULL;
@ -590,12 +591,13 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[1].data.recv_initial_metadata.recv_initial_metadata =
&(ctx->recv_initial_metadata);
ops[1].flags = 0;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_RECV_MESSAGE;
ops[2].data.recv_message = &(ctx->recv_message);
ops[2].data.recv_message.recv_message = &(ctx->recv_message);
ops[2].flags = 0;
ops[2].reserved = NULL;
@ -634,7 +636,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].data.send_message.send_message = ctx->send_message;
ops[1].flags = write_flags;
ops[1].reserved = NULL;
@ -698,7 +700,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[0].data.recv_initial_metadata.recv_initial_metadata =
&(ctx->recv_initial_metadata);
ops[0].flags = 0;
ops[0].reserved = NULL;
@ -717,7 +720,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].data.send_message.send_message = ctx->send_message;
ops[0].flags = write_flags;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -765,7 +768,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[nops].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(optional_send_buffer,
optional_send_buffer_len);
ops[nops].data.send_message = ctx->send_message;
ops[nops].data.send_message.send_message = ctx->send_message;
ops[nops].flags = write_flags;
ops[nops].reserved = NULL;
nops ++;
@ -784,7 +787,7 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
ops[0].data.recv_message.recv_message = &(ctx->recv_message);
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,

@ -262,7 +262,7 @@ class SendMessageOp : public Op {
}
}
send_message = BufferToByteBuffer(value);
out->data.send_message = send_message;
out->data.send_message.send_message = send_message;
PersistentValue *handle = new PersistentValue(value);
resources->handles.push_back(unique_ptr<PersistentValue>(handle));
return true;
@ -377,7 +377,7 @@ class GetMetadataOp : public Op {
bool ParseOp(Local<Value> value, grpc_op *out,
shared_ptr<Resources> resources) {
out->data.recv_initial_metadata = &recv_metadata;
out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata;
return true;
}
bool IsFinalOp() {
@ -410,7 +410,7 @@ class ReadMessageOp : public Op {
bool ParseOp(Local<Value> value, grpc_op *out,
shared_ptr<Resources> resources) {
out->data.recv_message = &recv_message;
out->data.recv_message.recv_message = &recv_message;
return true;
}
bool IsFinalOp() {

@ -105,14 +105,14 @@
}
if (self = [super init]) {
_op.op = GRPC_OP_SEND_MESSAGE;
_op.data.send_message = message.grpc_byteBuffer;
_op.data.send_message.send_message = message.grpc_byteBuffer;
_handler = handler;
}
return self;
}
- (void)dealloc {
grpc_byte_buffer_destroy(_op.data.send_message);
grpc_byte_buffer_destroy(_op.data.send_message.send_message);
}
@end
@ -145,7 +145,7 @@
if (self = [super init]) {
_op.op = GRPC_OP_RECV_INITIAL_METADATA;
grpc_metadata_array_init(&_headers);
_op.data.recv_initial_metadata = &_headers;
_op.data.recv_initial_metadata.recv_initial_metadata = &_headers;
if (handler) {
// Prevent reference cycle with _handler
__weak typeof(self) weakSelf = self;
@ -177,7 +177,7 @@
- (instancetype)initWithHandler:(void (^)(grpc_byte_buffer *))handler {
if (self = [super init]) {
_op.op = GRPC_OP_RECV_MESSAGE;
_op.data.recv_message = &_receivedMessage;
_op.data.recv_message.recv_message = &_receivedMessage;
if (handler) {
// Prevent reference cycle with _handler
__weak typeof(self) weakSelf = self;

@ -142,7 +142,7 @@ static void drain_cq(grpc_completion_queue *cq) {
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->data.send_message.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
@ -151,12 +151,12 @@ static void drain_cq(grpc_completion_queue *cq) {
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &response_payload_recv;
op->data.recv_message.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;

@ -335,7 +335,7 @@ PHP_METHOD(Call, startBatch) {
1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_message =
ops[op_num].data.send_message.send_message =
string_to_byte_buffer(Z_STRVAL_P(message_value),
Z_STRLEN_P(message_value));
break;
@ -390,10 +390,11 @@ PHP_METHOD(Call, startBatch) {
}
break;
case GRPC_OP_RECV_INITIAL_METADATA:
ops[op_num].data.recv_initial_metadata = &recv_metadata;
ops[op_num].data.recv_initial_metadata.recv_initial_metadata =
&recv_metadata;
break;
case GRPC_OP_RECV_MESSAGE:
ops[op_num].data.recv_message = &message;
ops[op_num].data.recv_message.recv_message = &message;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
ops[op_num].data.recv_status_on_client.trailing_metadata =
@ -498,7 +499,7 @@ cleanup:
}
for (int i = 0; i < op_num; i++) {
if (ops[i].op == GRPC_OP_SEND_MESSAGE) {
grpc_byte_buffer_destroy(ops[i].data.send_message);
grpc_byte_buffer_destroy(ops[i].data.send_message.send_message);
}
if (ops[i].op == GRPC_OP_RECV_MESSAGE) {
grpc_byte_buffer_destroy(message);

@ -606,7 +606,7 @@ def operation_send_message(data, int flags):
op.c_op.type = GRPC_OP_SEND_MESSAGE
op.c_op.flags = flags
byte_buffer = ByteBuffer(data)
op.c_op.data.send_message = byte_buffer.c_byte_buffer
op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer
op.references.append(byte_buffer)
op.is_valid = True
return op
@ -639,7 +639,7 @@ def operation_receive_initial_metadata(int flags):
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
op._received_metadata = Metadata([])
op.c_op.data.receive_initial_metadata = (
op.c_op.data.recv_initial_metadata.receive_initial_metadata = (
&op._received_metadata.c_metadata_array)
op.is_valid = True
return op
@ -652,7 +652,8 @@ def operation_receive_message(int flags):
# n.b. the c_op.data.receive_message field needs to be deleted by us,
# anyway, so we just let that be handled by the ByteBuffer() we allocated
# the line before.
op.c_op.data.receive_message = &op._received_message.c_byte_buffer
op.c_op.data.recv_message.receive_message =
&op._received_message.c_byte_buffer
op.is_valid = True
return op

@ -641,7 +641,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
for (i = 0; i < st->op_num; i++) {
if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
grpc_byte_buffer_destroy(st->ops[i].data.send_message);
grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
}
}
}
@ -673,8 +673,9 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
st->send_metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
st->ops[st->op_num].data.send_message.send_message =
grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
RSTRING_LEN(this_value));
st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@ -686,10 +687,11 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
&st->ops[st->op_num], &st->send_trailing_metadata, this_value);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
&st->recv_metadata;
break;
case GRPC_OP_RECV_MESSAGE:
st->ops[st->op_num].data.recv_message = &st->recv_message;
st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =

Loading…
Cancel
Save