DFS block cache: harden against race over ref locks.

With change https://git.eclipse.org/r/c/jgit/jgit/+/186455 a thread
loading a bitmap index could hold two ref locks at the same time (one
for bitmap and one for either index or reverse index). So it is possible
that two threads loading bitmaps end up in a deadlock for ref locks e.g.
threadA has refLock[1] (for bitmap) and wants refLock[2] (for index or
revIndex) and threadB has refLock[2] (for bitmap) and wants refLock[1].

This change introduces separate pools of locks per pack extension
instead of a shared pool. So threads loading bitmap can hold two
locks but with different extensions and no overlap, e.g. threadA holds
refLock[BITMAP_INDEX][1] and refLock[INDEX][2] and threadB holds
refLock[BITMAP_INDEX][2] and refLock[INDEX][1].

More unit tests were added to cover various paralell loading scenarios.

Signed-off-by: Alina Djamankulova <adjama@google.com>
Change-Id: I89704b4721c21548929608d3798ef60925280755
This commit is contained in:
Alina Djamankulova 2021-11-16 09:53:20 -08:00
parent 78b7d9e4fa
commit 49d243b13c
2 changed files with 167 additions and 33 deletions

View File

@ -10,6 +10,7 @@
package org.eclipse.jgit.internal.storage.dfs;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -20,11 +21,10 @@
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.junit.TestRng;
@ -42,10 +42,12 @@ public class DfsBlockCacheTest {
public TestName testName = new TestName();
private TestRng rng;
private DfsBlockCache cache;
private ExecutorService pool;
@Before
public void setUp() {
rng = new TestRng(testName.getMethodName());
pool = Executors.newFixedThreadPool(10);
resetCache();
}
@ -152,49 +154,169 @@ public void hasCacheHotMap() throws Exception {
@SuppressWarnings("resource")
@Test
public void noConcurrencySerializedReads() throws Exception {
DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
InMemoryRepository r1 = new InMemoryRepository(repo);
TestRepository<InMemoryRepository> repository = new TestRepository<>(
r1);
RevCommit commit = repository.branch("/refs/ref1").commit()
.add("blob1", "blob1").create();
repository.branch("/refs/ref2").commit().add("blob2", "blob2")
.parent(commit).create();
new DfsGarbageCollector(r1).pack(null);
public void noConcurrencySerializedReads_oneRepo() throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test");
// Reset cache with concurrency Level at 1 i.e. no concurrency.
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
.setBlockLimit(1 << 20).setConcurrencyLevel(1));
cache = DfsBlockCache.getInstance();
resetCache(1);
DfsReader reader = (DfsReader) r1.newObjectReader();
ExecutorService pool = Executors.newFixedThreadPool(10);
for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
// Only load non-garbage pack with bitmap.
if (pack.isGarbage()) {
continue;
}
asyncRun(pool, () -> pack.getBitmapIndex(reader));
asyncRun(pool, () -> pack.getPackIndex(reader));
asyncRun(pool, () -> pack.getBitmapIndex(reader));
asyncRun(() -> pack.getBitmapIndex(reader));
asyncRun(() -> pack.getPackIndex(reader));
asyncRun(() -> pack.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
pool.shutdown();
pool.awaitTermination(500, TimeUnit.MILLISECONDS);
assertTrue("Threads did not complete, likely due to a deadlock.",
pool.isTerminated());
assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
// Reverse index has no pack extension, it defaults to 0.
assertEquals(1, cache.getMissCount()[0]);
}
@SuppressWarnings("resource")
@Test
public void noConcurrencySerializedReads_twoRepos() throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test1");
InMemoryRepository r2 = createRepoWithBitmap("test2");
resetCache(1);
DfsReader reader = (DfsReader) r1.newObjectReader();
DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
// Safety check that both repos have the same number of packs.
assertEquals(r1Packs.length, r2Packs.length);
for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
DfsPackFile pack1 = r1Packs[i];
DfsPackFile pack2 = r2Packs[i];
if (pack1.isGarbage() || pack2.isGarbage()) {
continue;
}
asyncRun(() -> pack1.getBitmapIndex(reader));
asyncRun(() -> pack2.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
assertEquals(2, cache.getMissCount()[0]);
}
@SuppressWarnings("resource")
@Test
public void lowConcurrencyParallelReads_twoRepos() throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test1");
InMemoryRepository r2 = createRepoWithBitmap("test2");
resetCache(2);
DfsReader reader = (DfsReader) r1.newObjectReader();
DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
// Safety check that both repos have the same number of packs.
assertEquals(r1Packs.length, r2Packs.length);
for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
DfsPackFile pack1 = r1Packs[i];
DfsPackFile pack2 = r2Packs[i];
if (pack1.isGarbage() || pack2.isGarbage()) {
continue;
}
asyncRun(() -> pack1.getBitmapIndex(reader));
asyncRun(() -> pack2.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
assertEquals(2, cache.getMissCount()[0]);
}
@SuppressWarnings("resource")
@Test
public void lowConcurrencyParallelReads_twoReposAndIndex()
throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test1");
InMemoryRepository r2 = createRepoWithBitmap("test2");
resetCache(2);
DfsReader reader = (DfsReader) r1.newObjectReader();
DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
// Safety check that both repos have the same number of packs.
assertEquals(r1Packs.length, r2Packs.length);
for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
DfsPackFile pack1 = r1Packs[i];
DfsPackFile pack2 = r2Packs[i];
if (pack1.isGarbage() || pack2.isGarbage()) {
continue;
}
asyncRun(() -> pack1.getBitmapIndex(reader));
asyncRun(() -> pack1.getPackIndex(reader));
asyncRun(() -> pack2.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
// Index is loaded once for each repo.
assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
assertEquals(2, cache.getMissCount()[0]);
}
@SuppressWarnings("resource")
@Test
public void highConcurrencyParallelReads_oneRepo() throws Exception {
InMemoryRepository r1 = createRepoWithBitmap("test");
resetCache();
DfsReader reader = (DfsReader) r1.newObjectReader();
for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
// Only load non-garbage pack with bitmap.
if (pack.isGarbage()) {
continue;
}
asyncRun(() -> pack.getBitmapIndex(reader));
asyncRun(() -> pack.getPackIndex(reader));
asyncRun(() -> pack.getBitmapIndex(reader));
}
waitForExecutorPoolTermination();
assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
assertEquals(1, cache.getMissCount()[0]);
}
private void resetCache() {
resetCache(32);
}
private void resetCache(int concurrencyLevel) {
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
.setBlockLimit(1 << 20));
.setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20));
cache = DfsBlockCache.getInstance();
}
private void asyncRun(ExecutorService pool, Callable<?> call) {
private InMemoryRepository createRepoWithBitmap(String repoName)
throws Exception {
DfsRepositoryDescription repoDesc = new DfsRepositoryDescription(
repoName);
InMemoryRepository repo = new InMemoryRepository(repoDesc);
try (TestRepository<InMemoryRepository> repository = new TestRepository<>(
repo)) {
RevCommit commit = repository.branch("/refs/ref1" + repoName)
.commit().add("blob1", "blob1" + repoName).create();
repository.branch("/refs/ref2" + repoName).commit()
.add("blob2", "blob2" + repoName).parent(commit).create();
}
new DfsGarbageCollector(repo).pack(null);
return repo;
}
private void asyncRun(Callable<?> call) {
pool.execute(() -> {
try {
call.call();
@ -203,4 +325,11 @@ private void asyncRun(ExecutorService pool, Callable<?> call) {
}
});
}
private void waitForExecutorPoolTermination() throws Exception {
pool.shutdown();
pool.awaitTermination(500, MILLISECONDS);
assertTrue("Threads did not complete, likely due to a deadlock.",
pool.isTerminated());
}
}

View File

@ -104,9 +104,10 @@ public static DfsBlockCache getInstance() {
private final ReentrantLock[] loadLocks;
/**
* A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile.
* A separate pool of locks per pack extension to prevent concurrent loads
* for same index or bitmap from PackFile.
*/
private final ReentrantLock[] refLocks;
private final ReentrantLock[][] refLocks;
/** Maximum number of bytes the cache should hold. */
private final long maxBytes;
@ -173,13 +174,16 @@ private DfsBlockCache(DfsBlockCacheConfig cfg) {
}
table = new AtomicReferenceArray<>(tableSize);
loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
int concurrencyLevel = cfg.getConcurrencyLevel();
loadLocks = new ReentrantLock[concurrencyLevel];
for (int i = 0; i < loadLocks.length; i++) {
loadLocks[i] = new ReentrantLock(true /* fair */);
}
refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
for (int i = 0; i < refLocks.length; i++) {
refLocks[i] = new ReentrantLock(true /* fair */);
refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
for (int i = 0; i < PackExt.values().length; i++) {
for (int j = 0; j < concurrencyLevel; ++j) {
refLocks[i][j] = new ReentrantLock(true /* fair */);
}
}
maxBytes = cfg.getBlockLimit();
@ -636,7 +640,8 @@ private ReentrantLock lockFor(DfsStreamKey key, long position) {
}
private ReentrantLock lockForRef(DfsStreamKey key) {
return refLocks[(key.hash >>> 1) % refLocks.length];
int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
return refLocks[key.packExtPos][slot];
}
private static AtomicLong[] newCounters() {