diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTest.java index 549e1f469..4f1314057 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTest.java @@ -10,6 +10,7 @@ package org.eclipse.jgit.internal.storage.dfs; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.eclipse.jgit.lib.Constants.OBJ_BLOB; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -20,11 +21,10 @@ import java.util.List; import java.util.Map; import java.util.stream.LongStream; - import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; + import org.eclipse.jgit.internal.storage.pack.PackExt; import org.eclipse.jgit.junit.TestRepository; import org.eclipse.jgit.junit.TestRng; @@ -42,10 +42,12 @@ public class DfsBlockCacheTest { public TestName testName = new TestName(); private TestRng rng; private DfsBlockCache cache; + private ExecutorService pool; @Before public void setUp() { rng = new TestRng(testName.getMethodName()); + pool = Executors.newFixedThreadPool(10); resetCache(); } @@ -152,49 +154,169 @@ public void hasCacheHotMap() throws Exception { @SuppressWarnings("resource") @Test - public void noConcurrencySerializedReads() throws Exception { - DfsRepositoryDescription repo = new DfsRepositoryDescription("test"); - InMemoryRepository r1 = new InMemoryRepository(repo); - TestRepository repository = new TestRepository<>( - r1); - RevCommit commit = repository.branch("/refs/ref1").commit() - .add("blob1", "blob1").create(); - repository.branch("/refs/ref2").commit().add("blob2", "blob2") - .parent(commit).create(); - - new DfsGarbageCollector(r1).pack(null); + public void noConcurrencySerializedReads_oneRepo() throws Exception { + InMemoryRepository r1 = createRepoWithBitmap("test"); // Reset cache with concurrency Level at 1 i.e. no concurrency. - DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512) - .setBlockLimit(1 << 20).setConcurrencyLevel(1)); - cache = DfsBlockCache.getInstance(); + resetCache(1); DfsReader reader = (DfsReader) r1.newObjectReader(); - ExecutorService pool = Executors.newFixedThreadPool(10); for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) { // Only load non-garbage pack with bitmap. if (pack.isGarbage()) { continue; } - asyncRun(pool, () -> pack.getBitmapIndex(reader)); - asyncRun(pool, () -> pack.getPackIndex(reader)); - asyncRun(pool, () -> pack.getBitmapIndex(reader)); + asyncRun(() -> pack.getBitmapIndex(reader)); + asyncRun(() -> pack.getPackIndex(reader)); + asyncRun(() -> pack.getBitmapIndex(reader)); } + waitForExecutorPoolTermination(); - pool.shutdown(); - pool.awaitTermination(500, TimeUnit.MILLISECONDS); - assertTrue("Threads did not complete, likely due to a deadlock.", - pool.isTerminated()); assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]); assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]); + // Reverse index has no pack extension, it defaults to 0. + assertEquals(1, cache.getMissCount()[0]); + } + + @SuppressWarnings("resource") + @Test + public void noConcurrencySerializedReads_twoRepos() throws Exception { + InMemoryRepository r1 = createRepoWithBitmap("test1"); + InMemoryRepository r2 = createRepoWithBitmap("test2"); + resetCache(1); + + DfsReader reader = (DfsReader) r1.newObjectReader(); + DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks(); + DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks(); + // Safety check that both repos have the same number of packs. + assertEquals(r1Packs.length, r2Packs.length); + + for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) { + DfsPackFile pack1 = r1Packs[i]; + DfsPackFile pack2 = r2Packs[i]; + if (pack1.isGarbage() || pack2.isGarbage()) { + continue; + } + asyncRun(() -> pack1.getBitmapIndex(reader)); + asyncRun(() -> pack2.getBitmapIndex(reader)); + } + + waitForExecutorPoolTermination(); + assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]); + assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]); + assertEquals(2, cache.getMissCount()[0]); + } + + @SuppressWarnings("resource") + @Test + public void lowConcurrencyParallelReads_twoRepos() throws Exception { + InMemoryRepository r1 = createRepoWithBitmap("test1"); + InMemoryRepository r2 = createRepoWithBitmap("test2"); + resetCache(2); + + DfsReader reader = (DfsReader) r1.newObjectReader(); + DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks(); + DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks(); + // Safety check that both repos have the same number of packs. + assertEquals(r1Packs.length, r2Packs.length); + + for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) { + DfsPackFile pack1 = r1Packs[i]; + DfsPackFile pack2 = r2Packs[i]; + if (pack1.isGarbage() || pack2.isGarbage()) { + continue; + } + asyncRun(() -> pack1.getBitmapIndex(reader)); + asyncRun(() -> pack2.getBitmapIndex(reader)); + } + + waitForExecutorPoolTermination(); + assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]); + assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]); + assertEquals(2, cache.getMissCount()[0]); + } + + @SuppressWarnings("resource") + @Test + public void lowConcurrencyParallelReads_twoReposAndIndex() + throws Exception { + InMemoryRepository r1 = createRepoWithBitmap("test1"); + InMemoryRepository r2 = createRepoWithBitmap("test2"); + resetCache(2); + + DfsReader reader = (DfsReader) r1.newObjectReader(); + DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks(); + DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks(); + // Safety check that both repos have the same number of packs. + assertEquals(r1Packs.length, r2Packs.length); + + for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) { + DfsPackFile pack1 = r1Packs[i]; + DfsPackFile pack2 = r2Packs[i]; + if (pack1.isGarbage() || pack2.isGarbage()) { + continue; + } + asyncRun(() -> pack1.getBitmapIndex(reader)); + asyncRun(() -> pack1.getPackIndex(reader)); + asyncRun(() -> pack2.getBitmapIndex(reader)); + } + waitForExecutorPoolTermination(); + + assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]); + // Index is loaded once for each repo. + assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]); + assertEquals(2, cache.getMissCount()[0]); + } + + @SuppressWarnings("resource") + @Test + public void highConcurrencyParallelReads_oneRepo() throws Exception { + InMemoryRepository r1 = createRepoWithBitmap("test"); + resetCache(); + + DfsReader reader = (DfsReader) r1.newObjectReader(); + for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) { + // Only load non-garbage pack with bitmap. + if (pack.isGarbage()) { + continue; + } + asyncRun(() -> pack.getBitmapIndex(reader)); + asyncRun(() -> pack.getPackIndex(reader)); + asyncRun(() -> pack.getBitmapIndex(reader)); + } + waitForExecutorPoolTermination(); + + assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]); + assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]); + assertEquals(1, cache.getMissCount()[0]); } private void resetCache() { + resetCache(32); + } + + private void resetCache(int concurrencyLevel) { DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512) - .setBlockLimit(1 << 20)); + .setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20)); cache = DfsBlockCache.getInstance(); } - private void asyncRun(ExecutorService pool, Callable call) { + private InMemoryRepository createRepoWithBitmap(String repoName) + throws Exception { + DfsRepositoryDescription repoDesc = new DfsRepositoryDescription( + repoName); + InMemoryRepository repo = new InMemoryRepository(repoDesc); + try (TestRepository repository = new TestRepository<>( + repo)) { + RevCommit commit = repository.branch("/refs/ref1" + repoName) + .commit().add("blob1", "blob1" + repoName).create(); + repository.branch("/refs/ref2" + repoName).commit() + .add("blob2", "blob2" + repoName).parent(commit).create(); + } + new DfsGarbageCollector(repo).pack(null); + return repo; + } + + private void asyncRun(Callable call) { pool.execute(() -> { try { call.call(); @@ -203,4 +325,11 @@ private void asyncRun(ExecutorService pool, Callable call) { } }); } + + private void waitForExecutorPoolTermination() throws Exception { + pool.shutdown(); + pool.awaitTermination(500, MILLISECONDS); + assertTrue("Threads did not complete, likely due to a deadlock.", + pool.isTerminated()); + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java index e87bfe24e..54c527c03 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java @@ -104,9 +104,10 @@ public static DfsBlockCache getInstance() { private final ReentrantLock[] loadLocks; /** - * A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile. + * A separate pool of locks per pack extension to prevent concurrent loads + * for same index or bitmap from PackFile. */ - private final ReentrantLock[] refLocks; + private final ReentrantLock[][] refLocks; /** Maximum number of bytes the cache should hold. */ private final long maxBytes; @@ -173,13 +174,16 @@ private DfsBlockCache(DfsBlockCacheConfig cfg) { } table = new AtomicReferenceArray<>(tableSize); - loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()]; + int concurrencyLevel = cfg.getConcurrencyLevel(); + loadLocks = new ReentrantLock[concurrencyLevel]; for (int i = 0; i < loadLocks.length; i++) { loadLocks[i] = new ReentrantLock(true /* fair */); } - refLocks = new ReentrantLock[cfg.getConcurrencyLevel()]; - for (int i = 0; i < refLocks.length; i++) { - refLocks[i] = new ReentrantLock(true /* fair */); + refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel]; + for (int i = 0; i < PackExt.values().length; i++) { + for (int j = 0; j < concurrencyLevel; ++j) { + refLocks[i][j] = new ReentrantLock(true /* fair */); + } } maxBytes = cfg.getBlockLimit(); @@ -636,7 +640,8 @@ private ReentrantLock lockFor(DfsStreamKey key, long position) { } private ReentrantLock lockForRef(DfsStreamKey key) { - return refLocks[(key.hash >>> 1) % refLocks.length]; + int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length; + return refLocks[key.packExtPos][slot]; } private static AtomicLong[] newCounters() {