Merge "PackBitmapIndexV1: support parallel loading of reverse index"
This commit is contained in:
commit
ab8a5a3ccb
|
@ -290,6 +290,31 @@ public void highConcurrencyParallelReads_oneRepo() throws Exception {
|
|||
assertEquals(1, cache.getMissCount()[0]);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void highConcurrencyParallelReads_oneRepoParallelReverseIndex()
|
||||
throws Exception {
|
||||
InMemoryRepository r1 = createRepoWithBitmap("test");
|
||||
resetCache();
|
||||
|
||||
DfsReader reader = (DfsReader) r1.newObjectReader();
|
||||
reader.getOptions().setLoadRevIndexInParallel(true);
|
||||
for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
|
||||
// Only load non-garbage pack with bitmap.
|
||||
if (pack.isGarbage()) {
|
||||
continue;
|
||||
}
|
||||
asyncRun(() -> pack.getBitmapIndex(reader));
|
||||
asyncRun(() -> pack.getPackIndex(reader));
|
||||
asyncRun(() -> pack.getBitmapIndex(reader));
|
||||
}
|
||||
waitForExecutorPoolTermination();
|
||||
|
||||
assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
|
||||
assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
|
||||
assertEquals(1, cache.getMissCount()[0]);
|
||||
}
|
||||
|
||||
private void resetCache() {
|
||||
resetCache(32);
|
||||
}
|
||||
|
|
|
@ -1061,7 +1061,8 @@ private DfsBlockCache.Ref<PackBitmapIndex> loadBitmapIndex(DfsReader ctx,
|
|||
}
|
||||
in = new BufferedInputStream(in, bs);
|
||||
bmidx = PackBitmapIndex.read(in, () -> idx(ctx),
|
||||
() -> getReverseIdx(ctx));
|
||||
() -> getReverseIdx(ctx),
|
||||
ctx.getOptions().shouldLoadRevIndexInParallel());
|
||||
} finally {
|
||||
size = rc.position();
|
||||
ctx.stats.readBitmapIdxBytes += size;
|
||||
|
|
|
@ -34,6 +34,8 @@ public class DfsReaderOptions {
|
|||
|
||||
private int streamPackBufferSize;
|
||||
|
||||
private boolean loadRevIndexInParallel;
|
||||
|
||||
/**
|
||||
* Create a default reader configuration.
|
||||
*/
|
||||
|
@ -112,6 +114,28 @@ public DfsReaderOptions setStreamPackBufferSize(int bufsz) {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if reverse index should be loaded in parallel.
|
||||
*
|
||||
* @return true if reverse index is loaded in parallel for bitmap index.
|
||||
*/
|
||||
public boolean shouldLoadRevIndexInParallel() {
|
||||
return loadRevIndexInParallel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable (or disable) parallel loading of reverse index.
|
||||
*
|
||||
* @param loadRevIndexInParallel
|
||||
* whether to load reverse index in parallel.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public DfsReaderOptions setLoadRevIndexInParallel(
|
||||
boolean loadRevIndexInParallel) {
|
||||
this.loadRevIndexInParallel = loadRevIndexInParallel;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update properties by setting fields from the configuration.
|
||||
* <p>
|
||||
|
|
|
@ -95,7 +95,7 @@ public static PackBitmapIndex open(File idxFile, PackIndex packIndex,
|
|||
*/
|
||||
public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
|
||||
PackReverseIndex reverseIndex) throws IOException {
|
||||
return new PackBitmapIndexV1(fd, () -> packIndex, () -> reverseIndex);
|
||||
return new PackBitmapIndexV1(fd, packIndex, reverseIndex);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -114,6 +114,8 @@ public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
|
|||
* @param reverseIndexSupplier
|
||||
* the supplier for pack reverse index for the corresponding pack
|
||||
* file.
|
||||
* @param loadParallelRevIndex
|
||||
* whether reverse index should be loaded in parallel
|
||||
* @return a copy of the index in-memory.
|
||||
* @throws java.io.IOException
|
||||
* the stream cannot be read.
|
||||
|
@ -122,10 +124,11 @@ public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
|
|||
*/
|
||||
public static PackBitmapIndex read(InputStream fd,
|
||||
SupplierWithIOException<PackIndex> packIndexSupplier,
|
||||
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier)
|
||||
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier,
|
||||
boolean loadParallelRevIndex)
|
||||
throws IOException {
|
||||
return new PackBitmapIndexV1(fd, packIndexSupplier,
|
||||
reverseIndexSupplier);
|
||||
reverseIndexSupplier, loadParallelRevIndex);
|
||||
}
|
||||
|
||||
/** Footer checksum applied on the bottom of the pack file. */
|
||||
|
|
|
@ -17,6 +17,12 @@
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jgit.annotations.Nullable;
|
||||
import org.eclipse.jgit.internal.JGitText;
|
||||
|
@ -40,6 +46,23 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
|
|||
|
||||
private static final int MAX_XOR_OFFSET = 126;
|
||||
|
||||
private static final ExecutorService executor = Executors
|
||||
.newCachedThreadPool(new ThreadFactory() {
|
||||
private final ThreadFactory baseFactory = Executors
|
||||
.defaultThreadFactory();
|
||||
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = baseFactory.newThread(runnable);
|
||||
thread.setName("JGit-PackBitmapIndexV1-" //$NON-NLS-1$
|
||||
+ threadNumber.getAndIncrement());
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
|
||||
private final PackIndex packIndex;
|
||||
private final PackReverseIndex reverseIndex;
|
||||
private final EWAHCompressedBitmap commits;
|
||||
|
@ -49,15 +72,28 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
|
|||
|
||||
private final ObjectIdOwnerMap<StoredBitmap> bitmaps;
|
||||
|
||||
PackBitmapIndexV1(final InputStream fd, PackIndex packIndex,
|
||||
PackReverseIndex reverseIndex) throws IOException {
|
||||
this(fd, () -> packIndex, () -> reverseIndex, false);
|
||||
}
|
||||
|
||||
PackBitmapIndexV1(final InputStream fd,
|
||||
SupplierWithIOException<PackIndex> packIndexSupplier,
|
||||
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier)
|
||||
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier,
|
||||
boolean loadParallelRevIndex)
|
||||
throws IOException {
|
||||
// An entry is object id, xor offset, flag byte, and a length encoded
|
||||
// bitmap. The object id is an int32 of the nth position sorted by name.
|
||||
super(new ObjectIdOwnerMap<StoredBitmap>());
|
||||
this.bitmaps = getBitmaps();
|
||||
|
||||
// Optionally start loading reverse index in parallel to loading bitmap
|
||||
// from storage.
|
||||
Future<PackReverseIndex> reverseIndexFuture = null;
|
||||
if (loadParallelRevIndex) {
|
||||
reverseIndexFuture = executor.submit(reverseIndexSupplier::get);
|
||||
}
|
||||
|
||||
final byte[] scratch = new byte[32];
|
||||
IO.readFully(fd, scratch, 0, scratch.length);
|
||||
|
||||
|
@ -164,7 +200,18 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
|
|||
bitmaps.add(sb);
|
||||
}
|
||||
|
||||
this.reverseIndex = reverseIndexSupplier.get();
|
||||
PackReverseIndex computedReverseIndex;
|
||||
if (loadParallelRevIndex && reverseIndexFuture != null) {
|
||||
try {
|
||||
computedReverseIndex = reverseIndexFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
// Fallback to loading reverse index through a supplier.
|
||||
computedReverseIndex = reverseIndexSupplier.get();
|
||||
}
|
||||
} else {
|
||||
computedReverseIndex = reverseIndexSupplier.get();
|
||||
}
|
||||
this.reverseIndex = computedReverseIndex;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
|
Loading…
Reference in New Issue