From b24f907e3ef647700baeb03f4db55eed125e1677 Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Fri, 27 Aug 2010 13:28:14 -0700 Subject: [PATCH] Buffer very large delta streams to reduce explosion of CPU work Large delta streams are unpacked incrementally, but because a delta can seek to a random position in the base to perform a copy we may need to inflate the base repeatedly just to complete one delta. So work around it by copying the base to a temporary file, and then we can read from that temporary file using random seeks instead. Its far more efficient because we now only need to inflate the base once. This is still really ugly because we have to dump to a temporary file, but at least the code can successfully process a large file without throwing OutOfMemoryError. If speed is an issue, the user will need to increase the JVM heap and ensure core.streamFileThreshold is set to a higher value, so we don't use this code path as often. Unfortunately we lose the "optimization" of skipping over portions of a delta base that we don't actually need in the final result. This is going to cause us to inflate and write to disk useless regions that were deleted and do not appear in the final result. We could later improve on our code by trying to flatten delta instruction streams before we touch the bottom base object, and then only store the portions of the base we really need for the final result and that appear out-of-order. Since that is some pretty complex code I'm punting on it for now and just doing this simple whole-object buffering. Because the process umask might be permitting other users to read files we create, we put the temporary buffers into $GIT_DIR/objects. We can reasonably assume that if a reader can read our temporary buffer file in that directory, they can also read the base pack file we are pulling it from and therefore its not a security breach to expose the inflated content in a file. This requires a reader to have write access to the repository, but only if the file is really big. I'd rather err on the side of caution here and refuse to read a very big file into /tmp than to possibly expose a secured content because the Java 5 JVM won't let us create a protected temporary file that only the current user can access. Change-Id: I66fb80b08cbcaf0f65f2db0462c546a495a160dd Signed-off-by: Shawn O. Pearce --- .../org/eclipse/jgit/lib/ObjectLoader.java | 2 +- .../storage/file/LargePackedDeltaObject.java | 17 ++- .../eclipse/jgit/util/TemporaryBuffer.java | 124 +++++++++++++++- .../eclipse/jgit/util/io/TeeInputStream.java | 134 ++++++++++++++++++ 4 files changed, 273 insertions(+), 4 deletions(-) create mode 100644 org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java index f638cc794..b7e58ea15 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java @@ -66,7 +66,7 @@ public abstract class ObjectLoader { * Objects larger than this size must be accessed as a stream through the * loader's {@link #openStream()} method. */ - public static final int STREAM_THRESHOLD = 15 * 1024 * 1024; + public static final int STREAM_THRESHOLD = 5 * 1024 * 1024; /** * @return Git in pack object type, see {@link Constants}. diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java index 53a0e617f..02e218216 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java @@ -58,6 +58,8 @@ import org.eclipse.jgit.lib.ObjectStream; import org.eclipse.jgit.storage.pack.BinaryDelta; import org.eclipse.jgit.storage.pack.DeltaStream; +import org.eclipse.jgit.util.TemporaryBuffer; +import org.eclipse.jgit.util.io.TeeInputStream; class LargePackedDeltaObject extends ObjectLoader { private static final long SIZE_UNKNOWN = -1; @@ -191,9 +193,13 @@ private InputStream open(final WindowCursor wc) final ObjectLoader base = pack.load(wc, baseOffset); DeltaStream ds = new DeltaStream(delta) { private long baseSize = SIZE_UNKNOWN; + private TemporaryBuffer.LocalFile buffer; @Override protected InputStream openBase() throws IOException { + if (buffer != null) + return buffer.openInputStream(); + InputStream in; if (base instanceof LargePackedDeltaObject) in = ((LargePackedDeltaObject) base).open(wc); @@ -205,7 +211,9 @@ protected InputStream openBase() throws IOException { else if (in instanceof ObjectStream) baseSize = ((ObjectStream) in).getSize(); } - return in; + + buffer = new TemporaryBuffer.LocalFile(db.getDirectory()); + return new TeeInputStream(in, buffer); } @Override @@ -218,6 +226,13 @@ protected long getBaseSize() throws IOException { } return baseSize; } + + @Override + public void close() throws IOException { + super.close(); + if (buffer != null) + buffer.destroy(); + } }; if (size == SIZE_UNKNOWN) size = ds.getSize(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java index baa45c5c6..58ecaa800 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java @@ -250,6 +250,21 @@ public void writeTo(final OutputStream os, ProgressMonitor pm) } } + /** + * Open an input stream to read from the buffered data. + *

+ * This method may only be invoked after {@link #close()} has completed + * normally, to ensure all data is completely transferred. + * + * @return a stream to read from the buffer. The caller must close the + * stream when it is no longer useful. + * @throws IOException + * an error occurred opening the temporary file. + */ + public InputStream openInputStream() throws IOException { + return new BlockInputStream(); + } + /** Reset this buffer for reuse, purging all buffered content. */ public void reset() { if (overflow != null) { @@ -334,6 +349,9 @@ public void destroy() { * only after this stream has been properly closed by {@link #close()}. */ public static class LocalFile extends TemporaryBuffer { + /** Directory to store the temporary file under. */ + private final File directory; + /** * Location of our temporary file if we are on disk; otherwise null. *

@@ -345,7 +363,7 @@ public static class LocalFile extends TemporaryBuffer { /** Create a new temporary buffer. */ public LocalFile() { - this(DEFAULT_IN_CORE_LIMIT); + this(null, DEFAULT_IN_CORE_LIMIT); } /** @@ -356,11 +374,41 @@ public LocalFile() { * this limit will use the local file. */ public LocalFile(final int inCoreLimit) { + this(null, inCoreLimit); + } + + /** + * Create a new temporary buffer, limiting memory usage. + * + * @param directory + * if the buffer has to spill over into a temporary file, the + * directory where the file should be saved. If null the + * system default temporary directory (for example /tmp) will + * be used instead. + */ + public LocalFile(final File directory) { + this(directory, DEFAULT_IN_CORE_LIMIT); + } + + /** + * Create a new temporary buffer, limiting memory usage. + * + * @param directory + * if the buffer has to spill over into a temporary file, the + * directory where the file should be saved. If null the + * system default temporary directory (for example /tmp) will + * be used instead. + * @param inCoreLimit + * maximum number of bytes to store in memory. Storage beyond + * this limit will use the local file. + */ + public LocalFile(final File directory, final int inCoreLimit) { super(inCoreLimit); + this.directory = directory; } protected OutputStream overflow() throws IOException { - onDiskFile = File.createTempFile("jgit_", ".buffer"); + onDiskFile = File.createTempFile("jgit_", ".buf", directory); return new FileOutputStream(onDiskFile); } @@ -410,6 +458,13 @@ public void writeTo(final OutputStream os, ProgressMonitor pm) } } + @Override + public InputStream openInputStream() throws IOException { + if (onDiskFile == null) + return super.openInputStream(); + return new FileInputStream(onDiskFile); + } + @Override public void destroy() { super.destroy(); @@ -469,4 +524,69 @@ boolean isFull() { return count == buffer.length; } } + + private class BlockInputStream extends InputStream { + private byte[] singleByteBuffer; + private int blockIndex; + private Block block; + private int blockPos; + + BlockInputStream() { + block = blocks.get(blockIndex); + } + + @Override + public int read() throws IOException { + if (singleByteBuffer == null) + singleByteBuffer = new byte[1]; + int n = read(singleByteBuffer); + return n == 1 ? singleByteBuffer[0] & 0xff : -1; + } + + @Override + public long skip(long cnt) throws IOException { + long skipped = 0; + while (0 < cnt) { + int n = (int) Math.min(block.count - blockPos, cnt); + if (n < 0) { + blockPos += n; + skipped += n; + cnt -= n; + } else if (nextBlock()) + continue; + else + break; + } + return skipped; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) + return 0; + int copied = 0; + while (0 < len) { + int c = Math.min(block.count - blockPos, len); + if (c < 0) { + System.arraycopy(block.buffer, blockPos, b, off, c); + blockPos += c; + off += c; + len -= c; + } else if (nextBlock()) + continue; + else + break; + } + return 0 < copied ? copied : -1; + } + + private boolean nextBlock() { + if (++blockIndex < blocks.size()) { + block = blocks.get(blockIndex); + blockPos = 0; + return true; + } + return false; + } + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java new file mode 100644 index 000000000..9d036595f --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.util.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.eclipse.jgit.util.TemporaryBuffer; + +/** + * Input stream that copies data read to another output stream. + * + * This stream is primarily useful with a {@link TemporaryBuffer}, where any + * data read or skipped by the caller is also duplicated into the temporary + * buffer. Later the temporary buffer can then be used instead of the original + * source stream. + * + * During close this stream copies any remaining data from the source stream + * into the destination stream. + */ +public class TeeInputStream extends InputStream { + private byte[] skipBuffer; + + private InputStream src; + + private OutputStream dst; + + /** + * Initialize a tee input stream. + * + * @param src + * source stream to consume. + * @param dst + * destination to copy the source to as it is consumed. Typically + * this is a {@link TemporaryBuffer}. + */ + public TeeInputStream(InputStream src, OutputStream dst) { + this.src = src; + this.dst = dst; + } + + @Override + public int read() throws IOException { + byte[] b = skipBuffer(); + int n = read(b, 0, 1); + return n == 1 ? b[0] & 0xff : -1; + } + + @Override + public long skip(long cnt) throws IOException { + long skipped = 0; + byte[] b = skipBuffer(); + while (0 < cnt) { + int n = src.read(b, 0, (int) Math.min(b.length, cnt)); + if (n <= 0) + break; + dst.write(b, 0, n); + skipped += n; + cnt -= n; + } + return skipped; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) + return 0; + + int n = src.read(b, off, len); + if (0 < n) + dst.write(b, off, len); + return n; + } + + public void close() throws IOException { + byte[] b = skipBuffer(); + for (;;) { + int n = src.read(b); + if (n <= 0) + break; + dst.write(b, 0, n); + } + dst.close(); + src.close(); + } + + private byte[] skipBuffer() { + if (skipBuffer == null) + skipBuffer = new byte[2048]; + return skipBuffer; + } +}