diff --git a/src/com/google/common/io/protocol/BoundInputStream.java b/src/com/google/common/io/protocol/BoundInputStream.java new file mode 100644 index 0000000000..dbccca7531 --- /dev/null +++ b/src/com/google/common/io/protocol/BoundInputStream.java @@ -0,0 +1,91 @@ +// Copyright 2008 Google Inc. All Rights Reserved. + +package com.google.common.io.protocol; + +import java.io.*; + +/** + * An input stream backed by another input stream, where reading from the + * underlying input stream is limited to a fixed number of bytes. Also does + * some buffering. + * + */ +public class BoundInputStream extends InputStream { + + /** Buffer size */ + static final int BUF_SIZE = 4096; + + /** Number of bytes that may still be read from the underlying stream */ + private int remaining; + + /** Small buffer to avoid making OS calls for each byte read. */ + private byte[] buf; + + /** Current position in the buffer */ + private int bufPos; + + /** Filled size of the buffer */ + private int bufSize; + + /** Underlying stream to read from */ + private InputStream base; + + public BoundInputStream(InputStream base, int len) { + this.base = base; + this.remaining = len; + + buf = new byte[Math.min(len, BUF_SIZE)]; + } + + /** + * Make sure there is at least one byte in the buffer. If not possible, + * return false. + */ + private boolean checkBuf() throws IOException { + if (remaining <= 0) { + return false; + } + + if (bufPos >= bufSize) { + bufSize = base.read(buf, 0, Math.min(remaining, buf.length)); + if (bufSize <= 0) { + remaining = 0; + return false; + } + bufPos = 0; + } + return true; + } + + public int available() { + return bufSize - bufPos; + } + + public int read() throws IOException { + if (!checkBuf()) { + return -1; + } + remaining--; + return buf[bufPos++] & 255; + } + + public int read(byte[] data, int start, int count) throws IOException { + if (!checkBuf()) { + return -1; + } + count = Math.min(count, bufSize - bufPos); + System.arraycopy(buf, bufPos, data, start, count); + bufPos += count; + remaining -= count; + return count; + } + + /** + * How many bytes are remaining, based on the length provided to the + * constructor. The underlying stream may terminate earlier. Provided mainly + * for testing purposes. + */ + public int getRemaining() { + return remaining; + } +} diff --git a/src/com/google/common/io/protocol/ProtoBufUtil.java b/src/com/google/common/io/protocol/ProtoBufUtil.java index 72e1bca9ed..a14bef6e98 100644 --- a/src/com/google/common/io/protocol/ProtoBufUtil.java +++ b/src/com/google/common/io/protocol/ProtoBufUtil.java @@ -40,7 +40,7 @@ public final class ProtoBufUtil { /** * Get an int with "tag" from the proto buffer. If the given field can't be * retrieved, return the provided default value. - * + * * @param proto The proto buffer. * @param tag The tag value that identifies which protocol buffer field to * retrieve. @@ -110,14 +110,40 @@ public final class ProtoBufUtil { } } + /** + * Returns an input stream for reading protocol buffer + * responses. This method reads a 32-bit signed integer from the + * stream, which determines the data size and compression. If the + * integer is negative, indicating a GZipped input stream which we + * do not support, an exception is thrown. Otherwise, just a + * BoundInputStream is returned. The input stream returned is always + * limited to the data available. + * + * @param dataInput the data input to read from + * @return an input stream, limited to the data size read from the stream + * @throws IOException if the incoming stream is gzipped. + */ + public static InputStream getInputStreamForProtoBufResponse( + DataInput dataInput) throws IOException { + + int size = dataInput.readInt(); + InputStream is = new BoundInputStream((InputStream) dataInput, + Math.abs(size)); + + if (size < 0) { + throw new IOException("Cannot read gzipped streams"); + } + return is; + } + /** * Reads a single protocol buffer from the given input stream. This method is - * provided where the client needs incremental access to the contents of a - * protocol buffer which contains a sequence of protocol buffers. + * provided where the client needs incremental access to the contents of a + * protocol buffer which contains a sequence of protocol buffers. *

- * Please use {@link #getInputStreamForProtoBufResponse} to obtain an input + * Please use {@link #getInputStreamForProtoBufResponse} to obtain an input * stream suitable for this method. - * + * * @param umbrellaType the type of the "outer" protocol buffer containing * the message to read * @param is the stream to read the protocol buffer from @@ -125,24 +151,46 @@ public final class ProtoBufUtil { * with the data read and the type will be set) * @return the tag id of the message, -1 at the end of the stream */ - public static int readNextProtoBuf(ProtoBufType umbrellaType, + public static int readNextProtoBuf(ProtoBufType umbrellaType, InputStream is, ProtoBuf result) throws IOException { long tagAndType = ProtoBuf.readVarInt(is, true /* permits EOF */); if (tagAndType == -1) { return -1; } - + if ((tagAndType & 7) != ProtoBuf.WIRETYPE_LENGTH_DELIMITED) { throw new IOException("Message expected"); } int tag = (int) (tagAndType >>> 3); - + result.setType((ProtoBufType) umbrellaType.getData(tag)); int length = (int) ProtoBuf.readVarInt(is, false); result.parse(is, length); return tag; } - + + /** + * Reads a size int and a protocol buffer from a DataInput. If the size + * is negative, this is interpreted as an indicator that the protocol buffer + * is packed with GZIP. In this case, -size bytes are read, and the data is + * unpacked with GZIP before constructing the protocol buffer. + * + * @param protoBufType the protocol buffer type to read + * @param dataInput the data input to read from + * @return a protocol buffer of the given type + * @throws IOException + */ + public static ProtoBuf readProtoBufResponse(ProtoBufType protoBufType, + DataInput dataInput) throws IOException { + ProtoBuf response = new ProtoBuf(protoBufType); + InputStream is = getInputStreamForProtoBufResponse(dataInput); + response.parse(is); + if (is.read() != -1) { + throw new IOException(); + } + return response; + } + /** * A wrapper for getProtoValueOrNegativeOne that drills into * a sub message returning the long value if it exists, returning -1 if it @@ -162,7 +210,7 @@ public final class ProtoBufUtil { } catch (IllegalArgumentException e) { return -1; } catch (ClassCastException e) { - return -1; + return -1; } } @@ -182,12 +230,12 @@ public final class ProtoBufUtil { public static int getSubProtoValueOrDefault(ProtoBuf proto, int sub, int tag, int defaultValue) { try { - return getProtoValueOrDefault(getSubProtoOrNull(proto, sub), tag, + return getProtoValueOrDefault(getSubProtoOrNull(proto, sub), tag, defaultValue); } catch (IllegalArgumentException e) { return defaultValue; } catch (ClassCastException e) { - return defaultValue; + return defaultValue; } }