Merge changes I1217f5f2,Iba037e0e,I61e6c93c,I6304d1cb
* changes: dfs: Take size as long instead of int dfs: Read at the aligned position dfs: Add a position argument dfs: Move the deeply nested code to its own method
This commit is contained in:
commit
2f751c34e1
|
@ -412,8 +412,7 @@ DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
|
|||
getStat(statMiss, key).incrementAndGet();
|
||||
boolean credit = true;
|
||||
try {
|
||||
v = file.readOneBlock(requestedPosition, ctx,
|
||||
fileChannel.get());
|
||||
v = file.readOneBlock(position, ctx, fileChannel.get());
|
||||
credit = false;
|
||||
} finally {
|
||||
if (credit) {
|
||||
|
@ -450,7 +449,7 @@ DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void reserveSpace(int reserve, DfsStreamKey key) {
|
||||
private void reserveSpace(long reserve, DfsStreamKey key) {
|
||||
clockLock.lock();
|
||||
try {
|
||||
long live = LongStream.of(getCurrentSize()).sum() + reserve;
|
||||
|
@ -487,7 +486,7 @@ private void reserveSpace(int reserve, DfsStreamKey key) {
|
|||
}
|
||||
}
|
||||
|
||||
private void creditSpace(int credit, DfsStreamKey key) {
|
||||
private void creditSpace(long credit, DfsStreamKey key) {
|
||||
clockLock.lock();
|
||||
try {
|
||||
getStat(liveBytes, key).addAndGet(-credit);
|
||||
|
@ -497,7 +496,7 @@ private void creditSpace(int credit, DfsStreamKey key) {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void addToClock(Ref ref, int credit) {
|
||||
private void addToClock(Ref ref, long credit) {
|
||||
clockLock.lock();
|
||||
try {
|
||||
if (credit != 0) {
|
||||
|
@ -521,17 +520,20 @@ void put(DfsBlock v) {
|
|||
*
|
||||
* @param key
|
||||
* the stream key of the pack.
|
||||
* @param position
|
||||
* the position in the key. The default should be 0.
|
||||
* @param loader
|
||||
* the function to load the reference.
|
||||
* @return the object reference.
|
||||
* @throws IOException
|
||||
* the reference was not in the cache and could not be loaded.
|
||||
*/
|
||||
<T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader)
|
||||
<T> Ref<T> getOrLoadRef(
|
||||
DfsStreamKey key, long position, RefLoader<T> loader)
|
||||
throws IOException {
|
||||
int slot = slot(key, 0);
|
||||
int slot = slot(key, position);
|
||||
HashEntry e1 = table.get(slot);
|
||||
Ref<T> ref = scanRef(e1, key, 0);
|
||||
Ref<T> ref = scanRef(e1, key, position);
|
||||
if (ref != null) {
|
||||
getStat(statHit, key).incrementAndGet();
|
||||
return ref;
|
||||
|
@ -543,7 +545,7 @@ <T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader)
|
|||
try {
|
||||
HashEntry e2 = table.get(slot);
|
||||
if (e2 != e1) {
|
||||
ref = scanRef(e2, key, 0);
|
||||
ref = scanRef(e2, key, position);
|
||||
if (ref != null) {
|
||||
getStat(statHit, key).incrementAndGet();
|
||||
return ref;
|
||||
|
@ -574,10 +576,10 @@ <T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader)
|
|||
}
|
||||
|
||||
<T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
|
||||
return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v);
|
||||
return put(key, 0, size, v);
|
||||
}
|
||||
|
||||
<T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) {
|
||||
<T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
|
||||
int slot = slot(key, pos);
|
||||
HashEntry e1 = table.get(slot);
|
||||
Ref<T> ref = scanRef(e1, key, pos);
|
||||
|
@ -720,12 +722,12 @@ private static final class HashEntry {
|
|||
static final class Ref<T> {
|
||||
final DfsStreamKey key;
|
||||
final long position;
|
||||
final int size;
|
||||
final long size;
|
||||
volatile T value;
|
||||
Ref next;
|
||||
volatile boolean hot;
|
||||
|
||||
Ref(DfsStreamKey key, long position, int size, T v) {
|
||||
Ref(DfsStreamKey key, long position, long size, T v) {
|
||||
this.key = key;
|
||||
this.position = position;
|
||||
this.size = size;
|
||||
|
|
|
@ -89,6 +89,7 @@
|
|||
*/
|
||||
public final class DfsPackFile extends BlockBasedFile {
|
||||
private static final int REC_SIZE = Constants.OBJECT_ID_LENGTH + 8;
|
||||
private static final long REF_POSITION = 0;
|
||||
|
||||
/**
|
||||
* Lock for initialization of {@link #index} and {@link #corruptObjects}.
|
||||
|
@ -194,45 +195,10 @@ private PackIndex idx(DfsReader ctx) throws IOException {
|
|||
|
||||
try {
|
||||
DfsStreamKey idxKey = desc.getStreamKey(INDEX);
|
||||
DfsBlockCache.Ref<PackIndex> idxref = cache.getOrLoadRef(idxKey,
|
||||
() -> {
|
||||
try {
|
||||
ctx.stats.readIdx++;
|
||||
long start = System.nanoTime();
|
||||
try (ReadableChannel rc = ctx.db.openFile(desc,
|
||||
INDEX)) {
|
||||
InputStream in = Channels
|
||||
.newInputStream(rc);
|
||||
int wantSize = 8192;
|
||||
int bs = rc.blockSize();
|
||||
if (0 < bs && bs < wantSize) {
|
||||
bs = (wantSize / bs) * bs;
|
||||
} else if (bs <= 0) {
|
||||
bs = wantSize;
|
||||
}
|
||||
PackIndex idx = PackIndex.read(
|
||||
new BufferedInputStream(in, bs));
|
||||
int sz = (int) Math.min(
|
||||
idx.getObjectCount() * REC_SIZE,
|
||||
Integer.MAX_VALUE);
|
||||
ctx.stats.readIdxBytes += rc.position();
|
||||
index = idx;
|
||||
return new DfsBlockCache.Ref<>(idxKey, 0,
|
||||
sz, idx);
|
||||
} finally {
|
||||
ctx.stats.readIdxMicros += elapsedMicros(
|
||||
start);
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().shortReadOfIndex,
|
||||
desc.getFileName(INDEX)), e);
|
||||
} catch (IOException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().cannotReadIndex,
|
||||
desc.getFileName(INDEX)), e);
|
||||
}
|
||||
});
|
||||
DfsBlockCache.Ref<PackIndex> idxref = cache.getOrLoadRef(
|
||||
idxKey,
|
||||
REF_POSITION,
|
||||
() -> loadPackIndex(ctx, idxKey));
|
||||
PackIndex idx = idxref.get();
|
||||
if (index == null && idx != null) {
|
||||
index = idx;
|
||||
|
@ -267,44 +233,10 @@ PackBitmapIndex getBitmapIndex(DfsReader ctx) throws IOException {
|
|||
PackIndex idx = idx(ctx);
|
||||
PackReverseIndex revidx = getReverseIdx(ctx);
|
||||
DfsStreamKey bitmapKey = desc.getStreamKey(BITMAP_INDEX);
|
||||
DfsBlockCache.Ref<PackBitmapIndex> idxref = cache
|
||||
.getOrLoadRef(bitmapKey, () -> {
|
||||
ctx.stats.readBitmap++;
|
||||
long start = System.nanoTime();
|
||||
try (ReadableChannel rc = ctx.db.openFile(desc,
|
||||
BITMAP_INDEX)) {
|
||||
long size;
|
||||
PackBitmapIndex bmidx;
|
||||
try {
|
||||
InputStream in = Channels.newInputStream(rc);
|
||||
int wantSize = 8192;
|
||||
int bs = rc.blockSize();
|
||||
if (0 < bs && bs < wantSize) {
|
||||
bs = (wantSize / bs) * bs;
|
||||
} else if (bs <= 0) {
|
||||
bs = wantSize;
|
||||
}
|
||||
in = new BufferedInputStream(in, bs);
|
||||
bmidx = PackBitmapIndex.read(in, idx, revidx);
|
||||
} finally {
|
||||
size = rc.position();
|
||||
ctx.stats.readIdxBytes += size;
|
||||
ctx.stats.readIdxMicros += elapsedMicros(start);
|
||||
}
|
||||
int sz = (int) Math.min(size, Integer.MAX_VALUE);
|
||||
bitmapIndex = bmidx;
|
||||
return new DfsBlockCache.Ref<>(bitmapKey, 0, sz,
|
||||
bmidx);
|
||||
} catch (EOFException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().shortReadOfIndex,
|
||||
desc.getFileName(BITMAP_INDEX)), e);
|
||||
} catch (IOException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().cannotReadIndex,
|
||||
desc.getFileName(BITMAP_INDEX)), e);
|
||||
}
|
||||
});
|
||||
DfsBlockCache.Ref<PackBitmapIndex> idxref = cache.getOrLoadRef(
|
||||
bitmapKey,
|
||||
REF_POSITION,
|
||||
() -> loadBitmapIndex(ctx, bitmapKey, idx, revidx));
|
||||
PackBitmapIndex bmidx = idxref.get();
|
||||
if (bitmapIndex == null && bmidx != null) {
|
||||
bitmapIndex = bmidx;
|
||||
|
@ -326,14 +258,10 @@ PackReverseIndex getReverseIdx(DfsReader ctx) throws IOException {
|
|||
PackIndex idx = idx(ctx);
|
||||
DfsStreamKey revKey = new DfsStreamKey.ForReverseIndex(
|
||||
desc.getStreamKey(INDEX));
|
||||
DfsBlockCache.Ref<PackReverseIndex> revref = cache
|
||||
.getOrLoadRef(revKey, () -> {
|
||||
PackReverseIndex revidx = new PackReverseIndex(idx);
|
||||
int sz = (int) Math.min(idx.getObjectCount() * 8,
|
||||
Integer.MAX_VALUE);
|
||||
reverseIndex = revidx;
|
||||
return new DfsBlockCache.Ref<>(revKey, 0, sz, revidx);
|
||||
});
|
||||
DfsBlockCache.Ref<PackReverseIndex> revref = cache.getOrLoadRef(
|
||||
revKey,
|
||||
REF_POSITION,
|
||||
() -> loadReverseIdx(ctx, revKey, idx));
|
||||
PackReverseIndex revidx = revref.get();
|
||||
if (reverseIndex == null && revidx != null) {
|
||||
reverseIndex = revidx;
|
||||
|
@ -1091,4 +1019,91 @@ private void setCorrupt(long offset) {
|
|||
list.add(offset);
|
||||
}
|
||||
}
|
||||
|
||||
private DfsBlockCache.Ref<PackIndex> loadPackIndex(
|
||||
DfsReader ctx, DfsStreamKey idxKey) throws IOException {
|
||||
try {
|
||||
ctx.stats.readIdx++;
|
||||
long start = System.nanoTime();
|
||||
try (ReadableChannel rc = ctx.db.openFile(desc, INDEX)) {
|
||||
InputStream in = Channels.newInputStream(rc);
|
||||
int wantSize = 8192;
|
||||
int bs = rc.blockSize();
|
||||
if (0 < bs && bs < wantSize) {
|
||||
bs = (wantSize / bs) * bs;
|
||||
} else if (bs <= 0) {
|
||||
bs = wantSize;
|
||||
}
|
||||
PackIndex idx = PackIndex.read(new BufferedInputStream(in, bs));
|
||||
ctx.stats.readIdxBytes += rc.position();
|
||||
index = idx;
|
||||
return new DfsBlockCache.Ref<>(
|
||||
idxKey,
|
||||
REF_POSITION,
|
||||
idx.getObjectCount() * REC_SIZE,
|
||||
idx);
|
||||
} finally {
|
||||
ctx.stats.readIdxMicros += elapsedMicros(start);
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().shortReadOfIndex,
|
||||
desc.getFileName(INDEX)), e);
|
||||
} catch (IOException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().cannotReadIndex,
|
||||
desc.getFileName(INDEX)), e);
|
||||
}
|
||||
}
|
||||
|
||||
private DfsBlockCache.Ref<PackReverseIndex> loadReverseIdx(
|
||||
DfsReader ctx, DfsStreamKey revKey, PackIndex idx) {
|
||||
PackReverseIndex revidx = new PackReverseIndex(idx);
|
||||
reverseIndex = revidx;
|
||||
return new DfsBlockCache.Ref<>(
|
||||
revKey,
|
||||
REF_POSITION,
|
||||
idx.getObjectCount() * 8,
|
||||
revidx);
|
||||
}
|
||||
|
||||
private DfsBlockCache.Ref<PackBitmapIndex> loadBitmapIndex(
|
||||
DfsReader ctx,
|
||||
DfsStreamKey bitmapKey,
|
||||
PackIndex idx,
|
||||
PackReverseIndex revidx) throws IOException {
|
||||
ctx.stats.readBitmap++;
|
||||
long start = System.nanoTime();
|
||||
try (ReadableChannel rc = ctx.db.openFile(desc, BITMAP_INDEX)) {
|
||||
long size;
|
||||
PackBitmapIndex bmidx;
|
||||
try {
|
||||
InputStream in = Channels.newInputStream(rc);
|
||||
int wantSize = 8192;
|
||||
int bs = rc.blockSize();
|
||||
if (0 < bs && bs < wantSize) {
|
||||
bs = (wantSize / bs) * bs;
|
||||
} else if (bs <= 0) {
|
||||
bs = wantSize;
|
||||
}
|
||||
in = new BufferedInputStream(in, bs);
|
||||
bmidx = PackBitmapIndex.read(in, idx, revidx);
|
||||
} finally {
|
||||
size = rc.position();
|
||||
ctx.stats.readIdxBytes += size;
|
||||
ctx.stats.readIdxMicros += elapsedMicros(start);
|
||||
}
|
||||
bitmapIndex = bmidx;
|
||||
return new DfsBlockCache.Ref<>(
|
||||
bitmapKey, REF_POSITION, size, bmidx);
|
||||
} catch (EOFException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().shortReadOfIndex,
|
||||
desc.getFileName(BITMAP_INDEX)), e);
|
||||
} catch (IOException e) {
|
||||
throw new IOException(MessageFormat.format(
|
||||
DfsText.get().cannotReadIndex,
|
||||
desc.getFileName(BITMAP_INDEX)), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue