Merge "DFS block cache: report index load and evict stats"
This commit is contained in:
commit
076ecf8ded
|
@ -15,16 +15,19 @@
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.eclipse.jgit.internal.storage.dfs.DfsBlockCacheConfig.IndexEventConsumer;
|
||||
import org.eclipse.jgit.internal.storage.pack.PackExt;
|
||||
import org.eclipse.jgit.junit.TestRepository;
|
||||
import org.eclipse.jgit.junit.TestRng;
|
||||
|
@ -152,6 +155,120 @@ public void hasCacheHotMap() throws Exception {
|
|||
assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void hasIndexEventConsumerOnlyLoaded() throws Exception {
|
||||
AtomicInteger loaded = new AtomicInteger();
|
||||
IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
|
||||
@Override
|
||||
public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
|
||||
long loadMicros, long bytes,
|
||||
Duration lastEvictionDuration) {
|
||||
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
|
||||
assertTrue(cacheHit);
|
||||
assertTrue(lastEvictionDuration.isZero());
|
||||
loaded.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
|
||||
.setBlockLimit(512 * 4)
|
||||
.setIndexEventConsumer(indexEventConsumer));
|
||||
cache = DfsBlockCache.getInstance();
|
||||
|
||||
DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
|
||||
InMemoryRepository r1 = new InMemoryRepository(repo);
|
||||
byte[] content = rng.nextBytes(424242);
|
||||
ObjectId id;
|
||||
try (ObjectInserter ins = r1.newObjectInserter()) {
|
||||
id = ins.insert(OBJ_BLOB, content);
|
||||
ins.flush();
|
||||
}
|
||||
|
||||
try (ObjectReader rdr = r1.newObjectReader()) {
|
||||
byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
|
||||
assertTrue(Arrays.equals(content, actual));
|
||||
}
|
||||
// All cache entries are hot and cache is at capacity.
|
||||
assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
|
||||
assertEquals(99, cache.getFillPercentage());
|
||||
|
||||
InMemoryRepository r2 = new InMemoryRepository(repo);
|
||||
content = rng.nextBytes(424242);
|
||||
try (ObjectInserter ins = r2.newObjectInserter()) {
|
||||
ins.insert(OBJ_BLOB, content);
|
||||
ins.flush();
|
||||
}
|
||||
assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
|
||||
assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
|
||||
assertEquals(1, loaded.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void hasIndexEventConsumerLoadedAndEvicted() throws Exception {
|
||||
AtomicInteger loaded = new AtomicInteger();
|
||||
AtomicInteger evicted = new AtomicInteger();
|
||||
IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
|
||||
@Override
|
||||
public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
|
||||
long loadMicros, long bytes,
|
||||
Duration lastEvictionDuration) {
|
||||
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
|
||||
assertTrue(cacheHit);
|
||||
assertTrue(lastEvictionDuration.isZero());
|
||||
loaded.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acceptEvictedEvent(int packExtPos, long bytes,
|
||||
int totalCacheHitCount, Duration lastEvictionDuration) {
|
||||
assertEquals(PackExt.INDEX.getPosition(), packExtPos);
|
||||
assertTrue(totalCacheHitCount > 0);
|
||||
assertTrue(lastEvictionDuration.isZero());
|
||||
evicted.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldReportEvictedEvent() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
|
||||
.setBlockLimit(512 * 4)
|
||||
.setIndexEventConsumer(indexEventConsumer));
|
||||
cache = DfsBlockCache.getInstance();
|
||||
|
||||
DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
|
||||
InMemoryRepository r1 = new InMemoryRepository(repo);
|
||||
byte[] content = rng.nextBytes(424242);
|
||||
ObjectId id;
|
||||
try (ObjectInserter ins = r1.newObjectInserter()) {
|
||||
id = ins.insert(OBJ_BLOB, content);
|
||||
ins.flush();
|
||||
}
|
||||
|
||||
try (ObjectReader rdr = r1.newObjectReader()) {
|
||||
byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
|
||||
assertTrue(Arrays.equals(content, actual));
|
||||
}
|
||||
// All cache entries are hot and cache is at capacity.
|
||||
assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
|
||||
assertEquals(99, cache.getFillPercentage());
|
||||
|
||||
InMemoryRepository r2 = new InMemoryRepository(repo);
|
||||
content = rng.nextBytes(424242);
|
||||
try (ObjectInserter ins = r2.newObjectInserter()) {
|
||||
ins.insert(OBJ_BLOB, content);
|
||||
ins.flush();
|
||||
}
|
||||
assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
|
||||
assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
|
||||
assertEquals(1, loaded.get());
|
||||
assertEquals(1, evicted.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void noConcurrencySerializedReads_oneRepo() throws Exception {
|
||||
|
|
|
@ -12,6 +12,10 @@
|
|||
package org.eclipse.jgit.internal.storage.dfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
@ -166,6 +170,12 @@ public static DfsBlockCache getInstance() {
|
|||
/** Limits of cache hot count per pack file extension. */
|
||||
private final int[] cacheHotLimits = new int[PackExt.values().length];
|
||||
|
||||
/** Consumer of loading and eviction events of indexes. */
|
||||
private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;
|
||||
|
||||
/** Stores timestamps of the last eviction of indexes. */
|
||||
private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private DfsBlockCache(DfsBlockCacheConfig cfg) {
|
||||
tableSize = tableSize(cfg);
|
||||
|
@ -213,6 +223,7 @@ private DfsBlockCache(DfsBlockCacheConfig cfg) {
|
|||
cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
|
||||
}
|
||||
}
|
||||
indexEventConsumer = cfg.getIndexEventConsumer();
|
||||
}
|
||||
|
||||
boolean shouldCopyThroughCache(long length) {
|
||||
|
@ -461,6 +472,7 @@ private void reserveSpace(long reserve, DfsStreamKey key) {
|
|||
live -= dead.size;
|
||||
getStat(liveBytes, dead.key).addAndGet(-dead.size);
|
||||
getStat(statEvict, dead.key).incrementAndGet();
|
||||
reportIndexEvicted(dead);
|
||||
} while (maxBytes < live);
|
||||
clockHand = prev;
|
||||
}
|
||||
|
@ -515,11 +527,13 @@ void put(DfsBlock v) {
|
|||
<T> Ref<T> getOrLoadRef(
|
||||
DfsStreamKey key, long position, RefLoader<T> loader)
|
||||
throws IOException {
|
||||
long start = System.nanoTime();
|
||||
int slot = slot(key, position);
|
||||
HashEntry e1 = table.get(slot);
|
||||
Ref<T> ref = scanRef(e1, key, position);
|
||||
if (ref != null) {
|
||||
getStat(statHit, key).incrementAndGet();
|
||||
reportIndexRequested(ref, true /* cacheHit */, start);
|
||||
return ref;
|
||||
}
|
||||
|
||||
|
@ -532,6 +546,8 @@ <T> Ref<T> getOrLoadRef(
|
|||
ref = scanRef(e2, key, position);
|
||||
if (ref != null) {
|
||||
getStat(statHit, key).incrementAndGet();
|
||||
reportIndexRequested(ref, true /* cacheHit */,
|
||||
start);
|
||||
return ref;
|
||||
}
|
||||
}
|
||||
|
@ -556,6 +572,7 @@ <T> Ref<T> getOrLoadRef(
|
|||
} finally {
|
||||
regionLock.unlock();
|
||||
}
|
||||
reportIndexRequested(ref, false /* cacheHit */, start);
|
||||
return ref;
|
||||
}
|
||||
|
||||
|
@ -682,8 +699,9 @@ private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
|
|||
}
|
||||
|
||||
private static HashEntry clean(HashEntry top) {
|
||||
while (top != null && top.ref.next == null)
|
||||
while (top != null && top.ref.next == null) {
|
||||
top = top.next;
|
||||
}
|
||||
if (top == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -691,6 +709,44 @@ private static HashEntry clean(HashEntry top) {
|
|||
return n == top.next ? top : new HashEntry(n, top.ref);
|
||||
}
|
||||
|
||||
private void reportIndexRequested(Ref<?> ref, boolean cacheHit,
|
||||
long start) {
|
||||
if (indexEventConsumer == null
|
||||
|| !isIndexOrBitmapExtPos(ref.key.packExtPos)) {
|
||||
return;
|
||||
}
|
||||
EvictKey evictKey = new EvictKey(ref);
|
||||
Long prevEvictedTime = indexEvictionMap.get(evictKey);
|
||||
long now = System.nanoTime();
|
||||
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
|
||||
: now - prevEvictedTime.longValue();
|
||||
indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
|
||||
(now - start) / 1000L /* micros */, ref.size,
|
||||
Duration.ofNanos(sinceLastEvictionNanos));
|
||||
}
|
||||
|
||||
private void reportIndexEvicted(Ref<?> dead) {
|
||||
if (indexEventConsumer == null
|
||||
|| !indexEventConsumer.shouldReportEvictedEvent()
|
||||
|| !isIndexOrBitmapExtPos(dead.key.packExtPos)) {
|
||||
return;
|
||||
}
|
||||
EvictKey evictKey = new EvictKey(dead);
|
||||
Long prevEvictedTime = indexEvictionMap.get(evictKey);
|
||||
long now = System.nanoTime();
|
||||
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
|
||||
: now - prevEvictedTime.longValue();
|
||||
indexEvictionMap.put(evictKey, Long.valueOf(now));
|
||||
indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
|
||||
dead.totalHitCount.get(),
|
||||
Duration.ofNanos(sinceLastEvictionNanos));
|
||||
}
|
||||
|
||||
private static boolean isIndexOrBitmapExtPos(int packExtPos) {
|
||||
return packExtPos == PackExt.INDEX.getPosition()
|
||||
|| packExtPos == PackExt.BITMAP_INDEX.getPosition();
|
||||
}
|
||||
|
||||
private static final class HashEntry {
|
||||
/** Next entry in the hash table's chain list. */
|
||||
final HashEntry next;
|
||||
|
@ -712,6 +768,7 @@ static final class Ref<T> {
|
|||
Ref next;
|
||||
|
||||
private volatile int hotCount;
|
||||
private AtomicInteger totalHitCount = new AtomicInteger();
|
||||
|
||||
Ref(DfsStreamKey key, long position, long size, T v) {
|
||||
this.key = key;
|
||||
|
@ -736,6 +793,7 @@ void markHotter() {
|
|||
int cap = DfsBlockCache
|
||||
.getInstance().cacheHotLimits[key.packExtPos];
|
||||
hotCount = Math.min(cap, hotCount + 1);
|
||||
totalHitCount.incrementAndGet();
|
||||
}
|
||||
|
||||
void markColder() {
|
||||
|
@ -747,6 +805,34 @@ boolean isHot() {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class EvictKey {
|
||||
private final int keyHash;
|
||||
private final int packExtPos;
|
||||
private final long position;
|
||||
|
||||
EvictKey(Ref<?> ref) {
|
||||
keyHash = ref.key.hash;
|
||||
packExtPos = ref.key.packExtPos;
|
||||
position = ref.position;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object object) {
|
||||
if (object instanceof EvictKey) {
|
||||
EvictKey other = (EvictKey) object;
|
||||
return keyHash == other.keyHash
|
||||
&& packExtPos == other.packExtPos
|
||||
&& position == other.position;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return DfsBlockCache.getInstance().hash(keyHash, position);
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
interface RefLoader<T> {
|
||||
Ref<T> load() throws IOException;
|
||||
|
@ -763,4 +849,4 @@ interface ReadableChannelSupplier {
|
|||
*/
|
||||
ReadableChannel get() throws IOException;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_STREAM_RATIO;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -46,9 +47,10 @@ public class DfsBlockCacheConfig {
|
|||
private int concurrencyLevel;
|
||||
|
||||
private Consumer<Long> refLock;
|
||||
|
||||
private Map<PackExt, Integer> cacheHotMap;
|
||||
|
||||
private IndexEventConsumer indexEventConsumer;
|
||||
|
||||
/**
|
||||
* Create a default configuration.
|
||||
*/
|
||||
|
@ -215,6 +217,28 @@ public DfsBlockCacheConfig setCacheHotMap(
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the consumer of cache index events.
|
||||
*
|
||||
* @return consumer of cache index events.
|
||||
*/
|
||||
public IndexEventConsumer getIndexEventConsumer() {
|
||||
return indexEventConsumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the consumer of cache index events.
|
||||
*
|
||||
* @param indexEventConsumer
|
||||
* consumer of cache index events.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public DfsBlockCacheConfig setIndexEventConsumer(
|
||||
IndexEventConsumer indexEventConsumer) {
|
||||
this.indexEventConsumer = indexEventConsumer;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update properties by setting fields from the configuration.
|
||||
* <p>
|
||||
|
@ -272,4 +296,52 @@ public DfsBlockCacheConfig fromConfig(Config rc) {
|
|||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/** Consumer of DfsBlockCache loading and eviction events for indexes. */
|
||||
public interface IndexEventConsumer {
|
||||
/**
|
||||
* Accept an event of an index requested. It could be loaded from either
|
||||
* cache or storage.
|
||||
*
|
||||
* @param packExtPos
|
||||
* position in {@code PackExt} enum
|
||||
* @param cacheHit
|
||||
* true if an index was already in cache. Otherwise, the
|
||||
* index was loaded from storage into the cache in the
|
||||
* current request,
|
||||
* @param loadMicros
|
||||
* time to load an index from cache or storage in
|
||||
* microseconds
|
||||
* @param bytes
|
||||
* number of bytes loaded
|
||||
* @param lastEvictionDuration
|
||||
* time since last eviction, 0 if was not evicted yet
|
||||
*/
|
||||
void acceptRequestedEvent(int packExtPos, boolean cacheHit,
|
||||
long loadMicros, long bytes, Duration lastEvictionDuration);
|
||||
|
||||
/**
|
||||
* Accept an event of an index evicted from cache.
|
||||
*
|
||||
* @param packExtPos
|
||||
* position in {@code PackExt} enum
|
||||
* @param bytes
|
||||
* number of bytes evicted
|
||||
* @param totalCacheHitCount
|
||||
* number of times an index was accessed while in cache
|
||||
* @param lastEvictionDuration
|
||||
* time since last eviction, 0 if was not evicted yet
|
||||
*/
|
||||
default void acceptEvictedEvent(int packExtPos, long bytes,
|
||||
int totalCacheHitCount, Duration lastEvictionDuration) {
|
||||
// Off by default.
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if reporting evicted events is enabled.
|
||||
*/
|
||||
default boolean shouldReportEvictedEvent() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue