From a847a8f8bc3f2fdc542d11671a4d34997f2b0538 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 11 Feb 2015 21:20:25 -0800 Subject: [PATCH] Finish streaming, lame client --- include/grpc++/stream.h | 8 +++++++- src/core/surface/lame_client.c | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 4a804c7c902..a59ccd8c051 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -91,6 +91,7 @@ class ClientReader final : public ClientStreamingInterface, const google::protobuf::Message &request) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); buf.AddSendMessage(request); buf.AddClientSendClose(); call_.PerformOps(&buf); @@ -178,7 +179,12 @@ class ClientReaderWriter final : public ClientStreamingInterface, // Blocking create a stream. ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) {} + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + } virtual bool Read(R *msg) override { CallOpBuffer buf; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 411dbabfd32..a8fdeed87f9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,6 +47,7 @@ typedef struct { } call_data; typedef struct { + grpc_mdelem *status; grpc_mdelem *message; } channel_data; @@ -57,6 +58,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_START: + grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); grpc_call_stream_closed(elem); break; @@ -93,18 +95,22 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *channeld = elem->channel_data; + char status[12]; GPR_ASSERT(is_first); GPR_ASSERT(is_last); channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", "Rpc sent on a lame channel."); + gpr_ltoa(GRPC_STATUS_UNKNOWN, status); + channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_mdelem_unref(channeld->message); + grpc_mdelem_unref(channeld->status); } static const grpc_channel_filter lame_filter = {