PackBitmapIndexV1: support parallel loading of reverse index

Speed up bitmap creation by loading reverse index in parallel
to reading bitmap from storage. Latency changes from
(time_to_read_bitmap + time_to_load_reverse_index) to
max(time_to_read_bitmap, time_to_load_reverse_index).

Add new option to DfsReaderOptions to control parallel reverse index
loading. Static cached thread pool is added to PackBitmapIndexV1 for
reverse index loading, and when not in use consumes minimal resources.

Signed-off-by: Alina Djamankulova <adjama@google.com>
Change-Id: Ia37a1d739631d053e8bddb925ac8b0b81d22379e
This commit is contained in:
Alina Djamankulova 2021-11-19 16:25:47 -08:00
parent 3cb02ccfdf
commit 4e915f9568
5 changed files with 106 additions and 6 deletions

View File

@ -290,6 +290,31 @@ public void highConcurrencyParallelReads_oneRepo() throws Exception {
assertEquals(1, cache.getMissCount()[0]);
}
@SuppressWarnings("resource")
@Test
public void highConcurrencyParallelReads_oneRepoParallelReverseIndex()
throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test");
resetCache();
DfsReader reader = (DfsReader) r1.newObjectReader();
reader.getOptions().setLoadRevIndexInParallel(true);
for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
// Only load non-garbage pack with bitmap.
if (pack.isGarbage()) {
continue;
}
asyncRun(() -> pack.getBitmapIndex(reader));
asyncRun(() -> pack.getPackIndex(reader));
asyncRun(() -> pack.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
assertEquals(1, cache.getMissCount()[0]);
}
private void resetCache() {
resetCache(32);
}

View File

@ -1061,7 +1061,8 @@ private DfsBlockCache.Ref<PackBitmapIndex> loadBitmapIndex(DfsReader ctx,
}
in = new BufferedInputStream(in, bs);
bmidx = PackBitmapIndex.read(in, () -> idx(ctx),
() -> getReverseIdx(ctx));
() -> getReverseIdx(ctx),
ctx.getOptions().shouldLoadRevIndexInParallel());
} finally {
size = rc.position();
ctx.stats.readBitmapIdxBytes += size;

View File

@ -34,6 +34,8 @@ public class DfsReaderOptions {
private int streamPackBufferSize;
private boolean loadRevIndexInParallel;
/**
* Create a default reader configuration.
*/
@ -112,6 +114,28 @@ public DfsReaderOptions setStreamPackBufferSize(int bufsz) {
return this;
}
/**
* Check if reverse index should be loaded in parallel.
*
* @return true if reverse index is loaded in parallel for bitmap index.
*/
public boolean shouldLoadRevIndexInParallel() {
return loadRevIndexInParallel;
}
/**
* Enable (or disable) parallel loading of reverse index.
*
* @param loadRevIndexInParallel
* whether to load reverse index in parallel.
* @return {@code this}
*/
public DfsReaderOptions setLoadRevIndexInParallel(
boolean loadRevIndexInParallel) {
this.loadRevIndexInParallel = loadRevIndexInParallel;
return this;
}
/**
* Update properties by setting fields from the configuration.
* <p>

View File

@ -95,7 +95,7 @@ public static PackBitmapIndex open(File idxFile, PackIndex packIndex,
*/
public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
PackReverseIndex reverseIndex) throws IOException {
return new PackBitmapIndexV1(fd, () -> packIndex, () -> reverseIndex);
return new PackBitmapIndexV1(fd, packIndex, reverseIndex);
}
/**
@ -114,6 +114,8 @@ public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
* @param reverseIndexSupplier
* the supplier for pack reverse index for the corresponding pack
* file.
* @param loadParallelRevIndex
* whether reverse index should be loaded in parallel
* @return a copy of the index in-memory.
* @throws java.io.IOException
* the stream cannot be read.
@ -122,10 +124,11 @@ public static PackBitmapIndex read(InputStream fd, PackIndex packIndex,
*/
public static PackBitmapIndex read(InputStream fd,
SupplierWithIOException<PackIndex> packIndexSupplier,
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier)
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier,
boolean loadParallelRevIndex)
throws IOException {
return new PackBitmapIndexV1(fd, packIndexSupplier,
reverseIndexSupplier);
reverseIndexSupplier, loadParallelRevIndex);
}
/** Footer checksum applied on the bottom of the pack file. */

View File

@ -17,6 +17,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.internal.JGitText;
@ -40,6 +46,23 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
private static final int MAX_XOR_OFFSET = 126;
private static final ExecutorService executor = Executors
.newCachedThreadPool(new ThreadFactory() {
private final ThreadFactory baseFactory = Executors
.defaultThreadFactory();
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable runnable) {
Thread thread = baseFactory.newThread(runnable);
thread.setName("JGit-PackBitmapIndexV1-" //$NON-NLS-1$
+ threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
private final PackIndex packIndex;
private final PackReverseIndex reverseIndex;
private final EWAHCompressedBitmap commits;
@ -49,15 +72,28 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
private final ObjectIdOwnerMap<StoredBitmap> bitmaps;
PackBitmapIndexV1(final InputStream fd, PackIndex packIndex,
PackReverseIndex reverseIndex) throws IOException {
this(fd, () -> packIndex, () -> reverseIndex, false);
}
PackBitmapIndexV1(final InputStream fd,
SupplierWithIOException<PackIndex> packIndexSupplier,
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier)
SupplierWithIOException<PackReverseIndex> reverseIndexSupplier,
boolean loadParallelRevIndex)
throws IOException {
// An entry is object id, xor offset, flag byte, and a length encoded
// bitmap. The object id is an int32 of the nth position sorted by name.
super(new ObjectIdOwnerMap<StoredBitmap>());
this.bitmaps = getBitmaps();
// Optionally start loading reverse index in parallel to loading bitmap
// from storage.
Future<PackReverseIndex> reverseIndexFuture = null;
if (loadParallelRevIndex) {
reverseIndexFuture = executor.submit(reverseIndexSupplier::get);
}
final byte[] scratch = new byte[32];
IO.readFully(fd, scratch, 0, scratch.length);
@ -164,7 +200,18 @@ class PackBitmapIndexV1 extends BasePackBitmapIndex {
bitmaps.add(sb);
}
this.reverseIndex = reverseIndexSupplier.get();
PackReverseIndex computedReverseIndex;
if (loadParallelRevIndex && reverseIndexFuture != null) {
try {
computedReverseIndex = reverseIndexFuture.get();
} catch (InterruptedException | ExecutionException e) {
// Fallback to loading reverse index through a supplier.
computedReverseIndex = reverseIndexSupplier.get();
}
} else {
computedReverseIndex = reverseIndexSupplier.get();
}
this.reverseIndex = computedReverseIndex;
}
/** {@inheritDoc} */