Merge "Buffer very large delta streams to reduce explosion of CPU work"
This commit is contained in:
commit
bc0359c42f
|
@ -66,7 +66,7 @@ public abstract class ObjectLoader {
|
||||||
* Objects larger than this size must be accessed as a stream through the
|
* Objects larger than this size must be accessed as a stream through the
|
||||||
* loader's {@link #openStream()} method.
|
* loader's {@link #openStream()} method.
|
||||||
*/
|
*/
|
||||||
public static final int STREAM_THRESHOLD = 15 * 1024 * 1024;
|
public static final int STREAM_THRESHOLD = 5 * 1024 * 1024;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Git in pack object type, see {@link Constants}.
|
* @return Git in pack object type, see {@link Constants}.
|
||||||
|
|
|
@ -58,6 +58,8 @@
|
||||||
import org.eclipse.jgit.lib.ObjectStream;
|
import org.eclipse.jgit.lib.ObjectStream;
|
||||||
import org.eclipse.jgit.storage.pack.BinaryDelta;
|
import org.eclipse.jgit.storage.pack.BinaryDelta;
|
||||||
import org.eclipse.jgit.storage.pack.DeltaStream;
|
import org.eclipse.jgit.storage.pack.DeltaStream;
|
||||||
|
import org.eclipse.jgit.util.TemporaryBuffer;
|
||||||
|
import org.eclipse.jgit.util.io.TeeInputStream;
|
||||||
|
|
||||||
class LargePackedDeltaObject extends ObjectLoader {
|
class LargePackedDeltaObject extends ObjectLoader {
|
||||||
private static final long SIZE_UNKNOWN = -1;
|
private static final long SIZE_UNKNOWN = -1;
|
||||||
|
@ -191,9 +193,13 @@ private InputStream open(final WindowCursor wc)
|
||||||
final ObjectLoader base = pack.load(wc, baseOffset);
|
final ObjectLoader base = pack.load(wc, baseOffset);
|
||||||
DeltaStream ds = new DeltaStream(delta) {
|
DeltaStream ds = new DeltaStream(delta) {
|
||||||
private long baseSize = SIZE_UNKNOWN;
|
private long baseSize = SIZE_UNKNOWN;
|
||||||
|
private TemporaryBuffer.LocalFile buffer;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected InputStream openBase() throws IOException {
|
protected InputStream openBase() throws IOException {
|
||||||
|
if (buffer != null)
|
||||||
|
return buffer.openInputStream();
|
||||||
|
|
||||||
InputStream in;
|
InputStream in;
|
||||||
if (base instanceof LargePackedDeltaObject)
|
if (base instanceof LargePackedDeltaObject)
|
||||||
in = ((LargePackedDeltaObject) base).open(wc);
|
in = ((LargePackedDeltaObject) base).open(wc);
|
||||||
|
@ -205,7 +211,9 @@ protected InputStream openBase() throws IOException {
|
||||||
else if (in instanceof ObjectStream)
|
else if (in instanceof ObjectStream)
|
||||||
baseSize = ((ObjectStream) in).getSize();
|
baseSize = ((ObjectStream) in).getSize();
|
||||||
}
|
}
|
||||||
return in;
|
|
||||||
|
buffer = new TemporaryBuffer.LocalFile(db.getDirectory());
|
||||||
|
return new TeeInputStream(in, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -218,6 +226,13 @@ protected long getBaseSize() throws IOException {
|
||||||
}
|
}
|
||||||
return baseSize;
|
return baseSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
super.close();
|
||||||
|
if (buffer != null)
|
||||||
|
buffer.destroy();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if (size == SIZE_UNKNOWN)
|
if (size == SIZE_UNKNOWN)
|
||||||
size = ds.getSize();
|
size = ds.getSize();
|
||||||
|
|
|
@ -250,6 +250,21 @@ public void writeTo(final OutputStream os, ProgressMonitor pm)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open an input stream to read from the buffered data.
|
||||||
|
* <p>
|
||||||
|
* This method may only be invoked after {@link #close()} has completed
|
||||||
|
* normally, to ensure all data is completely transferred.
|
||||||
|
*
|
||||||
|
* @return a stream to read from the buffer. The caller must close the
|
||||||
|
* stream when it is no longer useful.
|
||||||
|
* @throws IOException
|
||||||
|
* an error occurred opening the temporary file.
|
||||||
|
*/
|
||||||
|
public InputStream openInputStream() throws IOException {
|
||||||
|
return new BlockInputStream();
|
||||||
|
}
|
||||||
|
|
||||||
/** Reset this buffer for reuse, purging all buffered content. */
|
/** Reset this buffer for reuse, purging all buffered content. */
|
||||||
public void reset() {
|
public void reset() {
|
||||||
if (overflow != null) {
|
if (overflow != null) {
|
||||||
|
@ -334,6 +349,9 @@ public void destroy() {
|
||||||
* only after this stream has been properly closed by {@link #close()}.
|
* only after this stream has been properly closed by {@link #close()}.
|
||||||
*/
|
*/
|
||||||
public static class LocalFile extends TemporaryBuffer {
|
public static class LocalFile extends TemporaryBuffer {
|
||||||
|
/** Directory to store the temporary file under. */
|
||||||
|
private final File directory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Location of our temporary file if we are on disk; otherwise null.
|
* Location of our temporary file if we are on disk; otherwise null.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -345,7 +363,7 @@ public static class LocalFile extends TemporaryBuffer {
|
||||||
|
|
||||||
/** Create a new temporary buffer. */
|
/** Create a new temporary buffer. */
|
||||||
public LocalFile() {
|
public LocalFile() {
|
||||||
this(DEFAULT_IN_CORE_LIMIT);
|
this(null, DEFAULT_IN_CORE_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -356,11 +374,41 @@ public LocalFile() {
|
||||||
* this limit will use the local file.
|
* this limit will use the local file.
|
||||||
*/
|
*/
|
||||||
public LocalFile(final int inCoreLimit) {
|
public LocalFile(final int inCoreLimit) {
|
||||||
|
this(null, inCoreLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new temporary buffer, limiting memory usage.
|
||||||
|
*
|
||||||
|
* @param directory
|
||||||
|
* if the buffer has to spill over into a temporary file, the
|
||||||
|
* directory where the file should be saved. If null the
|
||||||
|
* system default temporary directory (for example /tmp) will
|
||||||
|
* be used instead.
|
||||||
|
*/
|
||||||
|
public LocalFile(final File directory) {
|
||||||
|
this(directory, DEFAULT_IN_CORE_LIMIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new temporary buffer, limiting memory usage.
|
||||||
|
*
|
||||||
|
* @param directory
|
||||||
|
* if the buffer has to spill over into a temporary file, the
|
||||||
|
* directory where the file should be saved. If null the
|
||||||
|
* system default temporary directory (for example /tmp) will
|
||||||
|
* be used instead.
|
||||||
|
* @param inCoreLimit
|
||||||
|
* maximum number of bytes to store in memory. Storage beyond
|
||||||
|
* this limit will use the local file.
|
||||||
|
*/
|
||||||
|
public LocalFile(final File directory, final int inCoreLimit) {
|
||||||
super(inCoreLimit);
|
super(inCoreLimit);
|
||||||
|
this.directory = directory;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected OutputStream overflow() throws IOException {
|
protected OutputStream overflow() throws IOException {
|
||||||
onDiskFile = File.createTempFile("jgit_", ".buffer");
|
onDiskFile = File.createTempFile("jgit_", ".buf", directory);
|
||||||
return new FileOutputStream(onDiskFile);
|
return new FileOutputStream(onDiskFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,6 +458,13 @@ public void writeTo(final OutputStream os, ProgressMonitor pm)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputStream openInputStream() throws IOException {
|
||||||
|
if (onDiskFile == null)
|
||||||
|
return super.openInputStream();
|
||||||
|
return new FileInputStream(onDiskFile);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
super.destroy();
|
super.destroy();
|
||||||
|
@ -469,4 +524,69 @@ boolean isFull() {
|
||||||
return count == buffer.length;
|
return count == buffer.length;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class BlockInputStream extends InputStream {
|
||||||
|
private byte[] singleByteBuffer;
|
||||||
|
private int blockIndex;
|
||||||
|
private Block block;
|
||||||
|
private int blockPos;
|
||||||
|
|
||||||
|
BlockInputStream() {
|
||||||
|
block = blocks.get(blockIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
if (singleByteBuffer == null)
|
||||||
|
singleByteBuffer = new byte[1];
|
||||||
|
int n = read(singleByteBuffer);
|
||||||
|
return n == 1 ? singleByteBuffer[0] & 0xff : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(long cnt) throws IOException {
|
||||||
|
long skipped = 0;
|
||||||
|
while (0 < cnt) {
|
||||||
|
int n = (int) Math.min(block.count - blockPos, cnt);
|
||||||
|
if (n < 0) {
|
||||||
|
blockPos += n;
|
||||||
|
skipped += n;
|
||||||
|
cnt -= n;
|
||||||
|
} else if (nextBlock())
|
||||||
|
continue;
|
||||||
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (len == 0)
|
||||||
|
return 0;
|
||||||
|
int copied = 0;
|
||||||
|
while (0 < len) {
|
||||||
|
int c = Math.min(block.count - blockPos, len);
|
||||||
|
if (c < 0) {
|
||||||
|
System.arraycopy(block.buffer, blockPos, b, off, c);
|
||||||
|
blockPos += c;
|
||||||
|
off += c;
|
||||||
|
len -= c;
|
||||||
|
} else if (nextBlock())
|
||||||
|
continue;
|
||||||
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return 0 < copied ? copied : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean nextBlock() {
|
||||||
|
if (++blockIndex < blocks.size()) {
|
||||||
|
block = blocks.get(blockIndex);
|
||||||
|
blockPos = 0;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2010, Google Inc.
|
||||||
|
* and other copyright owners as documented in the project's IP log.
|
||||||
|
*
|
||||||
|
* This program and the accompanying materials are made available
|
||||||
|
* under the terms of the Eclipse Distribution License v1.0 which
|
||||||
|
* accompanies this distribution, is reproduced below, and is
|
||||||
|
* available at http://www.eclipse.org/org/documents/edl-v10.php
|
||||||
|
*
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or
|
||||||
|
* without modification, are permitted provided that the following
|
||||||
|
* conditions are met:
|
||||||
|
*
|
||||||
|
* - Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
*
|
||||||
|
* - Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the following
|
||||||
|
* disclaimer in the documentation and/or other materials provided
|
||||||
|
* with the distribution.
|
||||||
|
*
|
||||||
|
* - Neither the name of the Eclipse Foundation, Inc. nor the
|
||||||
|
* names of its contributors may be used to endorse or promote
|
||||||
|
* products derived from this software without specific prior
|
||||||
|
* written permission.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
|
||||||
|
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
|
||||||
|
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
||||||
|
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||||
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
|
||||||
|
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||||
|
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||||
|
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||||
|
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||||
|
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||||
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||||
|
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.eclipse.jgit.util.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.eclipse.jgit.util.TemporaryBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Input stream that copies data read to another output stream.
|
||||||
|
*
|
||||||
|
* This stream is primarily useful with a {@link TemporaryBuffer}, where any
|
||||||
|
* data read or skipped by the caller is also duplicated into the temporary
|
||||||
|
* buffer. Later the temporary buffer can then be used instead of the original
|
||||||
|
* source stream.
|
||||||
|
*
|
||||||
|
* During close this stream copies any remaining data from the source stream
|
||||||
|
* into the destination stream.
|
||||||
|
*/
|
||||||
|
public class TeeInputStream extends InputStream {
|
||||||
|
private byte[] skipBuffer;
|
||||||
|
|
||||||
|
private InputStream src;
|
||||||
|
|
||||||
|
private OutputStream dst;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a tee input stream.
|
||||||
|
*
|
||||||
|
* @param src
|
||||||
|
* source stream to consume.
|
||||||
|
* @param dst
|
||||||
|
* destination to copy the source to as it is consumed. Typically
|
||||||
|
* this is a {@link TemporaryBuffer}.
|
||||||
|
*/
|
||||||
|
public TeeInputStream(InputStream src, OutputStream dst) {
|
||||||
|
this.src = src;
|
||||||
|
this.dst = dst;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
byte[] b = skipBuffer();
|
||||||
|
int n = read(b, 0, 1);
|
||||||
|
return n == 1 ? b[0] & 0xff : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(long cnt) throws IOException {
|
||||||
|
long skipped = 0;
|
||||||
|
byte[] b = skipBuffer();
|
||||||
|
while (0 < cnt) {
|
||||||
|
int n = src.read(b, 0, (int) Math.min(b.length, cnt));
|
||||||
|
if (n <= 0)
|
||||||
|
break;
|
||||||
|
dst.write(b, 0, n);
|
||||||
|
skipped += n;
|
||||||
|
cnt -= n;
|
||||||
|
}
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (len == 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
int n = src.read(b, off, len);
|
||||||
|
if (0 < n)
|
||||||
|
dst.write(b, off, len);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
byte[] b = skipBuffer();
|
||||||
|
for (;;) {
|
||||||
|
int n = src.read(b);
|
||||||
|
if (n <= 0)
|
||||||
|
break;
|
||||||
|
dst.write(b, 0, n);
|
||||||
|
}
|
||||||
|
dst.close();
|
||||||
|
src.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] skipBuffer() {
|
||||||
|
if (skipBuffer == null)
|
||||||
|
skipBuffer = new byte[2048];
|
||||||
|
return skipBuffer;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue