Avoid storing large packs in block cache during reuse

When a large pack (> 30% of the block cache) is being reused by
copying it pollutes the block cache with noise by storing blocks
that are never referenced again.

Avoid this by streaming the file directly from its channel onto
the output stream.

Change-Id: I2e53de27f3dcfb93de68b1fad45f75ab23e79fe7
This commit is contained in:
Shawn Pearce 2015-04-23 12:12:43 -07:00
parent 12e38d7275
commit c761c8bb5c
6 changed files with 239 additions and 71 deletions

View File

@ -135,6 +135,9 @@ public static DfsBlockCache getInstance() {
/** Maximum number of bytes the cache should hold. */
private final long maxBytes;
/** Pack files smaller than this size can be copied through the cache. */
private final long maxStreamThroughCache;
/**
* Suggested block size to read from pack files in.
* <p>
@ -191,6 +194,7 @@ else if (eb < 4)
eb = tableSize;
maxBytes = cfg.getBlockLimit();
maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
blockSize = cfg.getBlockSize();
blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
@ -206,6 +210,10 @@ else if (eb < 4)
statMiss = new AtomicLong();
}
boolean copyThroughCache(long length) {
return length <= maxStreamThroughCache;
}
/** @return total number of bytes in the cache. */
public long getCurrentSize() {
return liveBytes;

View File

@ -47,7 +47,11 @@
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_DFS_SECTION;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_BLOCK_LIMIT;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_BLOCK_SIZE;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_STREAM_RATIO;
import java.text.MessageFormat;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.lib.Config;
/** Configuration parameters for {@link DfsBlockCache}. */
@ -59,13 +63,14 @@ public class DfsBlockCacheConfig {
public static final int MB = 1024 * KB;
private long blockLimit;
private int blockSize;
private double streamRatio;
/** Create a default configuration. */
public DfsBlockCacheConfig() {
setBlockLimit(32 * MB);
setBlockSize(64 * KB);
setStreamRatio(0.30);
}
/**
@ -105,6 +110,27 @@ public DfsBlockCacheConfig setBlockSize(final int newSize) {
return this;
}
/**
* @return highest percentage of {@link #getBlockLimit()} a single pack can
* occupy while being copied by the pack reuse strategy. <b>Default
* is 0.30, or 30%</b>.
* @since 4.0
*/
public double getStreamRatio() {
return streamRatio;
}
/**
* @param ratio
* percentage of cache to occupy with a copied pack.
* @return {@code this}
* @since 4.0
*/
public DfsBlockCacheConfig setStreamRatio(double ratio) {
streamRatio = Math.max(0, Math.min(ratio, 1.0));
return this;
}
/**
* Update properties by setting fields from the configuration.
* <p>
@ -127,6 +153,22 @@ public DfsBlockCacheConfig fromConfig(final Config rc) {
CONFIG_DFS_SECTION,
CONFIG_KEY_BLOCK_SIZE,
getBlockSize()));
String v = rc.getString(
CONFIG_CORE_SECTION,
CONFIG_DFS_SECTION,
CONFIG_KEY_STREAM_RATIO);
if (v != null) {
try {
setStreamRatio(Double.parseDouble(v));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(MessageFormat.format(
JGitText.get().enumValueNotSupported3,
CONFIG_CORE_SECTION,
CONFIG_DFS_SECTION,
CONFIG_KEY_STREAM_RATIO, v));
}
}
return this;
}
}

View File

@ -54,8 +54,11 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Set;
import java.util.zip.CRC32;
import java.util.zip.DataFormatException;
@ -80,7 +83,6 @@
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.util.IO;
import org.eclipse.jgit.util.LongList;
/**
@ -466,9 +468,112 @@ private byte[] decompress(long position, int sz, DfsReader ctx)
void copyPackAsIs(PackOutputStream out, boolean validate, DfsReader ctx)
throws IOException {
// Pin the first window, this ensures the length is accurate.
ctx.pin(this, 0);
ctx.copyPackAsIs(this, length, validate, out);
MessageDigest md = initCopyPack(out, validate, ctx);
long p;
if (cache.copyThroughCache(length))
p = copyPackThroughCache(out, ctx, md);
else
p = copyPackBypassCache(out, ctx, md);
verifyPackChecksum(p, md, ctx);
}
private MessageDigest initCopyPack(PackOutputStream out, boolean validate,
DfsReader ctx) throws IOException {
// If the length hasn't been determined yet, pin to set it.
if (length == -1)
ctx.pin(this, 0);
if (!validate) {
ctx.unpin();
return null;
}
int hdrlen = 12;
byte[] buf = out.getCopyBuffer();
int n = ctx.copy(this, 0, buf, 0, hdrlen);
ctx.unpin();
if (n != hdrlen)
throw packfileIsTruncated();
MessageDigest md = Constants.newMessageDigest();
md.update(buf, 0, hdrlen);
return md;
}
private long copyPackThroughCache(PackOutputStream out, DfsReader ctx,
MessageDigest md) throws IOException {
long position = 12;
long remaining = length - (12 + 20);
while (0 < remaining) {
DfsBlock b = cache.getOrLoad(this, position, ctx);
int ptr = (int) (position - b.start);
int n = (int) Math.min(b.size() - ptr, remaining);
b.write(out, position, n, md);
position += n;
remaining -= n;
}
return position;
}
private long copyPackBypassCache(PackOutputStream out, DfsReader ctx,
MessageDigest md) throws IOException {
try (ReadableChannel rc = ctx.db.openFile(packDesc, PACK)) {
ByteBuffer buf = newCopyBuffer(out, rc, ctx);
long position = 12;
long remaining = length - (12 + 20);
while (0 < remaining) {
DfsBlock b = cache.get(key, alignToBlock(position));
if (b != null) {
int ptr = (int) (position - b.start);
int n = (int) Math.min(b.size() - ptr, remaining);
b.write(out, position, n, md);
position += n;
remaining -= n;
rc.position(position);
continue;
}
buf.position(0);
int n = read(rc, buf);
if (n <= 0)
throw packfileIsTruncated();
else if (n > remaining)
n = (int) remaining;
out.write(buf.array(), 0, n);
if (md != null)
md.update(buf.array(), 0, n);
position += n;
remaining -= n;
}
return position;
}
}
private ByteBuffer newCopyBuffer(PackOutputStream out, ReadableChannel rc,
DfsReader ctx) {
int bs = blockSize(rc);
int bufferSize = ctx.getOptions().getStreamPackBufferSize();
if (bufferSize > bs)
bs = (bufferSize / bs) * bs;
byte[] copyBuf = out.getCopyBuffer();
if (bs > copyBuf.length)
copyBuf = new byte[bs];
return ByteBuffer.wrap(copyBuf, 0, bs);
}
private void verifyPackChecksum(long position, MessageDigest md,
DfsReader ctx) throws IOException {
if (md != null) {
byte[] buf = new byte[20];
byte[] actHash = md.digest();
if (ctx.copy(this, position, buf, 0, 20) != 20)
throw packfileIsTruncated();
if (!Arrays.equals(actHash, buf)) {
invalid = true;
throw new IOException(MessageFormat.format(
JGitText.get().packfileCorruptionDetected,
getPackName()));
}
}
}
void copyAsIs(PackOutputStream out, DfsObjectToPack src,
@ -668,6 +773,12 @@ void setInvalid() {
invalid = true;
}
private IOException packfileIsTruncated() {
invalid = true;
return new IOException(MessageFormat.format(
JGitText.get().packfileIsTruncated, getPackName()));
}
private void readFully(long position, byte[] dstbuf, int dstoff, int cnt,
DfsReader ctx) throws IOException {
if (ctx.copy(this, position, dstbuf, dstoff, cnt) != cnt)
@ -692,18 +803,8 @@ DfsBlock readOneBlock(long pos, DfsReader ctx)
ReadableChannel rc = ctx.db.openFile(packDesc, PACK);
try {
// If the block alignment is not yet known, discover it. Prefer the
// larger size from either the cache or the file itself.
int size = blockSize;
if (size == 0) {
size = rc.blockSize();
if (size <= 0)
size = cache.getBlockSize();
else if (size < cache.getBlockSize())
size = (cache.getBlockSize() / size) * size;
blockSize = size;
pos = (pos / size) * size;
}
int size = blockSize(rc);
pos = (pos / size) * size;
// If the size of the file is not yet known, try to discover it.
// Channels may choose to return -1 to indicate they don't
@ -725,7 +826,7 @@ else if (size < cache.getBlockSize())
byte[] buf = new byte[size];
rc.position(pos);
int cnt = IO.read(rc, buf, 0, size);
int cnt = read(rc, ByteBuffer.wrap(buf, 0, size));
if (cnt != size) {
if (0 <= len) {
throw new EOFException(MessageFormat.format(
@ -754,6 +855,30 @@ else if (size < cache.getBlockSize())
}
}
private int blockSize(ReadableChannel rc) {
// If the block alignment is not yet known, discover it. Prefer the
// larger size from either the cache or the file itself.
int size = blockSize;
if (size == 0) {
size = rc.blockSize();
if (size <= 0)
size = cache.getBlockSize();
else if (size < cache.getBlockSize())
size = (cache.getBlockSize() / size) * size;
blockSize = size;
}
return size;
}
private static int read(ReadableChannel rc, ByteBuffer buf)
throws IOException {
int n;
do {
n = rc.read(buf);
} while (0 < n && buf.hasRemaining());
return buf.position();
}
ObjectLoader load(DfsReader ctx, long pos)
throws IOException {
try {

View File

@ -44,14 +44,10 @@
package org.eclipse.jgit.internal.storage.dfs;
import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
import static org.eclipse.jgit.lib.Constants.OBJECT_ID_LENGTH;
import java.io.IOException;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -65,7 +61,6 @@
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.errors.StoredObjectRepresentationNotAvailableException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.file.BitmapIndexImpl;
import org.eclipse.jgit.internal.storage.file.PackBitmapIndex;
import org.eclipse.jgit.internal.storage.file.PackIndex;
@ -81,7 +76,6 @@
import org.eclipse.jgit.lib.AsyncObjectSizeQueue;
import org.eclipse.jgit.lib.BitmapIndex;
import org.eclipse.jgit.lib.BitmapIndex.BitmapBuilder;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.InflaterCache;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectLoader;
@ -547,52 +541,6 @@ int copy(DfsPackFile pack, long position, byte[] dstbuf, int dstoff, int cnt)
return cnt - need;
}
void copyPackAsIs(DfsPackFile pack, long length, boolean validate,
PackOutputStream out) throws IOException {
MessageDigest md = null;
if (validate) {
md = Constants.newMessageDigest();
byte[] buf = out.getCopyBuffer();
pin(pack, 0);
if (block.copy(0, buf, 0, 12) != 12) {
pack.setInvalid();
throw new IOException(MessageFormat.format(
JGitText.get().packfileIsTruncated, pack.getPackName()));
}
md.update(buf, 0, 12);
}
long position = 12;
long remaining = length - (12 + 20);
while (0 < remaining) {
pin(pack, position);
int ptr = (int) (position - block.start);
int n = (int) Math.min(block.size() - ptr, remaining);
block.write(out, position, n, md);
position += n;
remaining -= n;
}
if (md != null) {
byte[] buf = new byte[20];
byte[] actHash = md.digest();
pin(pack, position);
if (block.copy(position, buf, 0, 20) != 20) {
pack.setInvalid();
throw new IOException(MessageFormat.format(
JGitText.get().packfileIsTruncated, pack.getPackName()));
}
if (!Arrays.equals(actHash, buf)) {
pack.setInvalid();
throw new IOException(MessageFormat.format(
JGitText.get().packfileCorruptionDetected,
pack.getPackDescription().getFileName(PACK)));
}
}
}
/**
* Inflate a region of the pack starting at {@code position}.
*
@ -664,6 +612,10 @@ void pin(DfsPackFile pack, long position) throws IOException {
}
}
void unpin() {
block = null;
}
/** Release the current window cursor. */
@Override
public void release() {

View File

@ -46,6 +46,7 @@
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_CORE_SECTION;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_DFS_SECTION;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_DELTA_BASE_CACHE_LIMIT;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_STREAM_BUFFER;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_STREAM_FILE_TRESHOLD;
import org.eclipse.jgit.lib.Config;
@ -60,9 +61,10 @@ public class DfsReaderOptions {
public static final int MiB = 1024 * KiB;
private int deltaBaseCacheLimit;
private int streamFileThreshold;
private int streamPackBufferSize;
/** Create a default reader configuration. */
public DfsReaderOptions() {
setDeltaBaseCacheLimit(10 * MiB);
@ -104,6 +106,27 @@ public DfsReaderOptions setStreamFileThreshold(final int newLimit) {
return this;
}
/**
* @return number of bytes to use for buffering when streaming a pack file
* during copying. If 0 the block size of the pack is used.
* @since 4.0
*/
public int getStreamPackBufferSize() {
return streamPackBufferSize;
}
/**
* @param bufsz
* new buffer size in bytes for buffers used when streaming pack
* files during copying.
* @return {@code this}
* @since 4.0
*/
public DfsReaderOptions setStreamPackBufferSize(int bufsz) {
streamPackBufferSize = Math.max(0, bufsz);
return this;
}
/**
* Update properties by setting fields from the configuration.
* <p>
@ -130,6 +153,12 @@ public DfsReaderOptions fromConfig(Config rc) {
sft = Math.min(sft, maxMem / 4); // don't use more than 1/4 of the heap
sft = Math.min(sft, Integer.MAX_VALUE); // cannot exceed array length
setStreamFileThreshold((int) sft);
setStreamPackBufferSize(rc.getInt(
CONFIG_CORE_SECTION,
CONFIG_DFS_SECTION,
CONFIG_KEY_STREAM_BUFFER,
getStreamPackBufferSize()));
return this;
}
}

View File

@ -299,4 +299,16 @@ public class ConfigConstants {
* @since 3.3
*/
public static final String CONFIG_KEY_PRUNE = "prune";
/**
* The "streamBuffer" key
* @since 4.0
*/
public static final String CONFIG_KEY_STREAM_BUFFER = "streamBuffer";
/**
* The "streamRatio" key
* @since 4.0
*/
public static final String CONFIG_KEY_STREAM_RATIO = "streamRatio";
}