DFS block cache: report index load and evict stats

Enhance cache performance monitoring for large data such as pack and
bitmap indexes. Provide details about what is loaded and evicted from
cache like total number of cache hits, time in cache before eviction.

Add a custom consumer to report loading events and eviction events when
enabled.

Signed-off-by: Alina Djamankulova <adjama@google.com>
Change-Id: I5739325db7ff7ec370e4defd8f7e46f1c3f5d2dd
This commit is contained in:
Alina Djamankulova 2021-12-29 15:07:27 -08:00
parent f77519775d
commit b536dbdb9b
3 changed files with 279 additions and 4 deletions

View File

@ -15,16 +15,19 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.jgit.internal.storage.dfs.DfsBlockCacheConfig.IndexEventConsumer;
import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.junit.TestRng;
@ -152,6 +155,120 @@ public void hasCacheHotMap() throws Exception {
assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
}
@SuppressWarnings("resource")
@Test
public void hasIndexEventConsumerOnlyLoaded() throws Exception {
AtomicInteger loaded = new AtomicInteger();
IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
@Override
public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
long loadMicros, long bytes,
Duration lastEvictionDuration) {
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
assertTrue(cacheHit);
assertTrue(lastEvictionDuration.isZero());
loaded.incrementAndGet();
}
};
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
.setBlockLimit(512 * 4)
.setIndexEventConsumer(indexEventConsumer));
cache = DfsBlockCache.getInstance();
DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
InMemoryRepository r1 = new InMemoryRepository(repo);
byte[] content = rng.nextBytes(424242);
ObjectId id;
try (ObjectInserter ins = r1.newObjectInserter()) {
id = ins.insert(OBJ_BLOB, content);
ins.flush();
}
try (ObjectReader rdr = r1.newObjectReader()) {
byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
assertTrue(Arrays.equals(content, actual));
}
// All cache entries are hot and cache is at capacity.
assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
assertEquals(99, cache.getFillPercentage());
InMemoryRepository r2 = new InMemoryRepository(repo);
content = rng.nextBytes(424242);
try (ObjectInserter ins = r2.newObjectInserter()) {
ins.insert(OBJ_BLOB, content);
ins.flush();
}
assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
assertEquals(1, loaded.get());
}
@SuppressWarnings("resource")
@Test
public void hasIndexEventConsumerLoadedAndEvicted() throws Exception {
AtomicInteger loaded = new AtomicInteger();
AtomicInteger evicted = new AtomicInteger();
IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
@Override
public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
long loadMicros, long bytes,
Duration lastEvictionDuration) {
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
assertTrue(cacheHit);
assertTrue(lastEvictionDuration.isZero());
loaded.incrementAndGet();
}
@Override
public void acceptEvictedEvent(int packExtPos, long bytes,
int totalCacheHitCount, Duration lastEvictionDuration) {
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
assertTrue(totalCacheHitCount > 0);
assertTrue(lastEvictionDuration.isZero());
evicted.incrementAndGet();
}
@Override
public boolean shouldReportEvictedEvent() {
return true;
}
};
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
.setBlockLimit(512 * 4)
.setIndexEventConsumer(indexEventConsumer));
cache = DfsBlockCache.getInstance();
DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
InMemoryRepository r1 = new InMemoryRepository(repo);
byte[] content = rng.nextBytes(424242);
ObjectId id;
try (ObjectInserter ins = r1.newObjectInserter()) {
id = ins.insert(OBJ_BLOB, content);
ins.flush();
}
try (ObjectReader rdr = r1.newObjectReader()) {
byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
assertTrue(Arrays.equals(content, actual));
}
// All cache entries are hot and cache is at capacity.
assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
assertEquals(99, cache.getFillPercentage());
InMemoryRepository r2 = new InMemoryRepository(repo);
content = rng.nextBytes(424242);
try (ObjectInserter ins = r2.newObjectInserter()) {
ins.insert(OBJ_BLOB, content);
ins.flush();
}
assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
assertEquals(1, loaded.get());
assertEquals(1, evicted.get());
}
@SuppressWarnings("resource")
@Test
public void noConcurrencySerializedReads_oneRepo() throws Exception {

View File

@ -12,6 +12,10 @@
package org.eclipse.jgit.internal.storage.dfs;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -166,6 +170,12 @@ public static DfsBlockCache getInstance() {
/** Limits of cache hot count per pack file extension. */
private final int[] cacheHotLimits = new int[PackExt.values().length];
/** Consumer of loading and eviction events of indexes. */
private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;
/** Stores timestamps of the last eviction of indexes. */
private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
private DfsBlockCache(DfsBlockCacheConfig cfg) {
tableSize = tableSize(cfg);
@ -213,6 +223,7 @@ private DfsBlockCache(DfsBlockCacheConfig cfg) {
cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
}
}
indexEventConsumer = cfg.getIndexEventConsumer();
}
boolean shouldCopyThroughCache(long length) {
@ -461,6 +472,7 @@ private void reserveSpace(long reserve, DfsStreamKey key) {
live -= dead.size;
getStat(liveBytes, dead.key).addAndGet(-dead.size);
getStat(statEvict, dead.key).incrementAndGet();
reportIndexEvicted(dead);
} while (maxBytes < live);
clockHand = prev;
}
@ -515,11 +527,13 @@ void put(DfsBlock v) {
<T> Ref<T> getOrLoadRef(
DfsStreamKey key, long position, RefLoader<T> loader)
throws IOException {
long start = System.nanoTime();
int slot = slot(key, position);
HashEntry e1 = table.get(slot);
Ref<T> ref = scanRef(e1, key, position);
if (ref != null) {
getStat(statHit, key).incrementAndGet();
reportIndexRequested(ref, true /* cacheHit */, start);
return ref;
}
@ -532,6 +546,8 @@ <T> Ref<T> getOrLoadRef(
ref = scanRef(e2, key, position);
if (ref != null) {
getStat(statHit, key).incrementAndGet();
reportIndexRequested(ref, true /* cacheHit */,
start);
return ref;
}
}
@ -556,6 +572,7 @@ <T> Ref<T> getOrLoadRef(
} finally {
regionLock.unlock();
}
reportIndexRequested(ref, false /* cacheHit */, start);
return ref;
}
@ -682,8 +699,9 @@ private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
}
private static HashEntry clean(HashEntry top) {
while (top != null && top.ref.next == null)
while (top != null && top.ref.next == null) {
top = top.next;
}
if (top == null) {
return null;
}
@ -691,6 +709,44 @@ private static HashEntry clean(HashEntry top) {
return n == top.next ? top : new HashEntry(n, top.ref);
}
private void reportIndexRequested(Ref<?> ref, boolean cacheHit,
long start) {
if (indexEventConsumer == null
|| !isIndexOrBitmapExtPos(ref.key.packExtPos)) {
return;
}
EvictKey evictKey = new EvictKey(ref);
Long prevEvictedTime = indexEvictionMap.get(evictKey);
long now = System.nanoTime();
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
: now - prevEvictedTime.longValue();
indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
(now - start) / 1000L /* micros */, ref.size,
Duration.ofNanos(sinceLastEvictionNanos));
}
private void reportIndexEvicted(Ref<?> dead) {
if (indexEventConsumer == null
|| !indexEventConsumer.shouldReportEvictedEvent()
|| !isIndexOrBitmapExtPos(dead.key.packExtPos)) {
return;
}
EvictKey evictKey = new EvictKey(dead);
Long prevEvictedTime = indexEvictionMap.get(evictKey);
long now = System.nanoTime();
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
: now - prevEvictedTime.longValue();
indexEvictionMap.put(evictKey, Long.valueOf(now));
indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
dead.totalHitCount.get(),
Duration.ofNanos(sinceLastEvictionNanos));
}
private static boolean isIndexOrBitmapExtPos(int packExtPos) {
return packExtPos == PackExt.INDEX.getPosition()
|| packExtPos == PackExt.BITMAP_INDEX.getPosition();
}
private static final class HashEntry {
/** Next entry in the hash table's chain list. */
final HashEntry next;
@ -712,6 +768,7 @@ static final class Ref<T> {
Ref next;
private volatile int hotCount;
private AtomicInteger totalHitCount = new AtomicInteger();
Ref(DfsStreamKey key, long position, long size, T v) {
this.key = key;
@ -736,6 +793,7 @@ void markHotter() {
int cap = DfsBlockCache
.getInstance().cacheHotLimits[key.packExtPos];
hotCount = Math.min(cap, hotCount + 1);
totalHitCount.incrementAndGet();
}
void markColder() {
@ -747,6 +805,34 @@ boolean isHot() {
}
}
private static final class EvictKey {
private final int keyHash;
private final int packExtPos;
private final long position;
EvictKey(Ref<?> ref) {
keyHash = ref.key.hash;
packExtPos = ref.key.packExtPos;
position = ref.position;
}
@Override
public boolean equals(Object object) {
if (object instanceof EvictKey) {
EvictKey other = (EvictKey) object;
return keyHash == other.keyHash
&& packExtPos == other.packExtPos
&& position == other.position;
}
return false;
}
@Override
public int hashCode() {
return DfsBlockCache.getInstance().hash(keyHash, position);
}
}
@FunctionalInterface
interface RefLoader<T> {
Ref<T> load() throws IOException;
@ -763,4 +849,4 @@ interface ReadableChannelSupplier {
*/
ReadableChannel get() throws IOException;
}
}
}

View File

@ -18,6 +18,7 @@
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_STREAM_RATIO;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
@ -46,9 +47,10 @@ public class DfsBlockCacheConfig {
private int concurrencyLevel;
private Consumer<Long> refLock;
private Map<PackExt, Integer> cacheHotMap;
private IndexEventConsumer indexEventConsumer;
/**
* Create a default configuration.
*/
@ -215,6 +217,28 @@ public DfsBlockCacheConfig setCacheHotMap(
return this;
}
/**
* Get the consumer of cache index events.
*
* @return consumer of cache index events.
*/
public IndexEventConsumer getIndexEventConsumer() {
return indexEventConsumer;
}
/**
* Set the consumer of cache index events.
*
* @param indexEventConsumer
* consumer of cache index events.
* @return {@code this}
*/
public DfsBlockCacheConfig setIndexEventConsumer(
IndexEventConsumer indexEventConsumer) {
this.indexEventConsumer = indexEventConsumer;
return this;
}
/**
* Update properties by setting fields from the configuration.
* <p>
@ -272,4 +296,52 @@ public DfsBlockCacheConfig fromConfig(Config rc) {
}
return this;
}
}
/** Consumer of DfsBlockCache loading and eviction events for indexes. */
public interface IndexEventConsumer {
/**
* Accept an event of an index requested. It could be loaded from either
* cache or storage.
*
* @param packExtPos
* position in {@code PackExt} enum
* @param cacheHit
* true if an index was already in cache. Otherwise, the
* index was loaded from storage into the cache in the
* current request,
* @param loadMicros
* time to load an index from cache or storage in
* microseconds
* @param bytes
* number of bytes loaded
* @param lastEvictionDuration
* time since last eviction, 0 if was not evicted yet
*/
void acceptRequestedEvent(int packExtPos, boolean cacheHit,
long loadMicros, long bytes, Duration lastEvictionDuration);
/**
* Accept an event of an index evicted from cache.
*
* @param packExtPos
* position in {@code PackExt} enum
* @param bytes
* number of bytes evicted
* @param totalCacheHitCount
* number of times an index was accessed while in cache
* @param lastEvictionDuration
* time since last eviction, 0 if was not evicted yet
*/
default void acceptEvictedEvent(int packExtPos, long bytes,
int totalCacheHitCount, Duration lastEvictionDuration) {
// Off by default.
}
/**
* @return true if reporting evicted events is enabled.
*/
default boolean shouldReportEvictedEvent() {
return false;
}
}
}