Simplify ReftableCompactor
The ReftableCompactor supported a byteLimit, but this is currently unused. The FileReftableStack has a more sophisticated strategy that amortizes compaction costs. Rename min/maxUpdateIndex to reflogExpire{Min,Max}UpdateIndex to reflect their purpose more accurately. Since reflogs are generally pruned chronologically (oldest entries are expired first), one can only prune entries on full compaction, so they should not be set by default. Rephrase the function Reader#minUpdateIndex and maxUpdateIndex. These vars are documented to affect log entries, but semantically, they are about ref entries. Since ref entries have their timestamps delta-compressed, it is important for the min/maxUpdateIndex values to be coherent between different tables. The logical timestamps for log entries do not have to be coherent in different tables, as the timestamps of a log entry is part of the key. For example, a table written at update index 20 may contain a tombstone log entry at timestamp 1. Therefore, we set ReftableWriter's min/maxUpdateIndex from the merged tables we are compacting, rather than from the compaction settings (which should only control reflog expiry.) The previous behavior could drop log entries erroneously, especially in the presence of tombstone log entries. Unfortunately, testing this properly requires both an API for adding log tombstones, and a more refined API for controlling automatic compaction. Hence, no test. Change-Id: I2f4eb7866f607fddd0629809e8e61f0b9097717f Signed-off-by: Han-Wen Nienhuys <hanwen@google.com>
This commit is contained in:
parent
5fd922dfd4
commit
753fbaf11a
|
@ -52,7 +52,9 @@
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jgit.internal.storage.io.BlockSource;
|
||||
import org.eclipse.jgit.internal.storage.reftable.ReftableWriter.Stats;
|
||||
|
@ -96,7 +98,9 @@ public void oneTable() throws IOException {
|
|||
ReftableCompactor compactor;
|
||||
try (ByteArrayOutputStream outBuf = new ByteArrayOutputStream()) {
|
||||
compactor = new ReftableCompactor(outBuf);
|
||||
compactor.tryAddFirst(read(inTab));
|
||||
List<ReftableReader> readers = new ArrayList<>();
|
||||
readers.add(read(inTab));
|
||||
compactor.addAll(readers);
|
||||
compactor.compact();
|
||||
outTab = outBuf.toByteArray();
|
||||
}
|
||||
|
|
|
@ -477,10 +477,6 @@ private File compactLocked(int first, int last) throws IOException {
|
|||
try (FileOutputStream fos = new FileOutputStream(tmpTable)) {
|
||||
ReftableCompactor c = new ReftableCompactor(fos)
|
||||
.setConfig(reftableConfig())
|
||||
.setMinUpdateIndex(
|
||||
stack.get(first).reftableReader.minUpdateIndex())
|
||||
.setMaxUpdateIndex(
|
||||
stack.get(last).reftableReader.maxUpdateIndex())
|
||||
.setIncludeDeletes(first > 0);
|
||||
|
||||
List<ReftableReader> compactMe = new ArrayList<>();
|
||||
|
|
|
@ -98,6 +98,15 @@ public long maxUpdateIndex() throws IOException {
|
|||
: 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public long minUpdateIndex() throws IOException {
|
||||
return tables.length > 0 ? tables[0].minUpdateIndex()
|
||||
: 0;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public boolean hasObjectMap() throws IOException {
|
||||
|
|
|
@ -98,19 +98,28 @@ public void setIncludeDeletes(boolean deletes) {
|
|||
includeDeletes = deletes;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the maximum update index for log entries that appear in this
|
||||
* Get the maximum update index for ref entries that appear in this
|
||||
* reftable.
|
||||
*
|
||||
* @return the maximum update index for log entries that appear in this
|
||||
* reftable. This should be 1 higher than the prior reftable's
|
||||
* {@code maxUpdateIndex} if this table is used in a stack.
|
||||
* @return the maximum update index for ref entries that appear in this
|
||||
* reftable.
|
||||
* @throws java.io.IOException
|
||||
* file cannot be read.
|
||||
*/
|
||||
public abstract long maxUpdateIndex() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the minimum update index for ref entries that appear in this
|
||||
* reftable.
|
||||
*
|
||||
* @return the minimum update index for ref entries that appear in this
|
||||
* reftable.
|
||||
* @throws java.io.IOException
|
||||
* file cannot be read.
|
||||
*/
|
||||
public abstract long minUpdateIndex() throws IOException;
|
||||
|
||||
/**
|
||||
* Seek to the first reference, to iterate in order.
|
||||
*
|
||||
|
|
|
@ -61,22 +61,20 @@
|
|||
* to shadow any lower reftable that may have the reference present.
|
||||
* <p>
|
||||
* By default all log entries within the range defined by
|
||||
* {@link #setMinUpdateIndex(long)} and {@link #setMaxUpdateIndex(long)} are
|
||||
* {@link #setReflogExpireMinUpdateIndex(long)} and {@link #setReflogExpireMaxUpdateIndex(long)} are
|
||||
* copied, even if no references in the output file match the log records.
|
||||
* Callers may truncate the log to a more recent time horizon with
|
||||
* {@link #setOldestReflogTimeMillis(long)}, or disable the log altogether with
|
||||
* {@link #setReflogExpireOldestReflogTimeMillis(long)}, or disable the log altogether with
|
||||
* {@code setOldestReflogTimeMillis(Long.MAX_VALUE)}.
|
||||
*/
|
||||
public class ReftableCompactor {
|
||||
private final ReftableWriter writer;
|
||||
private final ArrayDeque<ReftableReader> tables = new ArrayDeque<>();
|
||||
|
||||
private long compactBytesLimit;
|
||||
private long bytesToCompact;
|
||||
private boolean includeDeletes;
|
||||
private long minUpdateIndex = -1;
|
||||
private long maxUpdateIndex;
|
||||
private long oldestReflogTimeMillis;
|
||||
private long reflogExpireMinUpdateIndex = 0;
|
||||
private long reflogExpireMaxUpdateIndex = Long.MAX_VALUE;
|
||||
private long reflogExpireOldestReflogTimeMillis;
|
||||
private Stats stats;
|
||||
|
||||
/**
|
||||
|
@ -102,18 +100,6 @@ public ReftableCompactor setConfig(ReftableConfig cfg) {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set limit on number of bytes from source tables to compact.
|
||||
*
|
||||
* @param bytes
|
||||
* limit on number of bytes from source tables to compact.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public ReftableCompactor setCompactBytesLimit(long bytes) {
|
||||
compactBytesLimit = bytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to include deletions in the output, which may be necessary for
|
||||
* partial compaction.
|
||||
|
@ -139,8 +125,8 @@ public ReftableCompactor setIncludeDeletes(boolean deletes) {
|
|||
* in a stack.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public ReftableCompactor setMinUpdateIndex(long min) {
|
||||
minUpdateIndex = min;
|
||||
public ReftableCompactor setReflogExpireMinUpdateIndex(long min) {
|
||||
reflogExpireMinUpdateIndex = min;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -155,8 +141,8 @@ public ReftableCompactor setMinUpdateIndex(long min) {
|
|||
* used in a stack.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public ReftableCompactor setMaxUpdateIndex(long max) {
|
||||
maxUpdateIndex = max;
|
||||
public ReftableCompactor setReflogExpireMaxUpdateIndex(long max) {
|
||||
reflogExpireMaxUpdateIndex = max;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -170,16 +156,13 @@ public ReftableCompactor setMaxUpdateIndex(long max) {
|
|||
* Specified in Java standard milliseconds since the epoch.
|
||||
* @return {@code this}
|
||||
*/
|
||||
public ReftableCompactor setOldestReflogTimeMillis(long timeMillis) {
|
||||
oldestReflogTimeMillis = timeMillis;
|
||||
public ReftableCompactor setReflogExpireOldestReflogTimeMillis(long timeMillis) {
|
||||
reflogExpireOldestReflogTimeMillis = timeMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add all of the tables, in the specified order.
|
||||
* <p>
|
||||
* Unconditionally adds all tables, ignoring the
|
||||
* {@link #setCompactBytesLimit(long)}.
|
||||
*
|
||||
* @param readers
|
||||
* tables to compact. Tables should be ordered oldest first/most
|
||||
|
@ -191,46 +174,9 @@ public ReftableCompactor setOldestReflogTimeMillis(long timeMillis) {
|
|||
public void addAll(List<ReftableReader> readers) throws IOException {
|
||||
for (ReftableReader r : readers) {
|
||||
tables.add(r);
|
||||
adjustUpdateIndexes(r);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to add this reader at the bottom of the stack.
|
||||
* <p>
|
||||
* A reader may be rejected by returning {@code false} if the compactor is
|
||||
* already rewriting its {@link #setCompactBytesLimit(long)}. When this
|
||||
* happens the caller should stop trying to add tables, and execute the
|
||||
* compaction.
|
||||
*
|
||||
* @param reader
|
||||
* the reader to insert at the bottom of the stack. Caller is
|
||||
* responsible for closing the reader.
|
||||
* @return {@code true} if the compactor accepted this table; {@code false}
|
||||
* if the compactor has reached its limit.
|
||||
* @throws java.io.IOException
|
||||
* if size of {@code reader}, or its update indexes cannot be read.
|
||||
*/
|
||||
public boolean tryAddFirst(ReftableReader reader) throws IOException {
|
||||
long sz = reader.size();
|
||||
if (compactBytesLimit > 0 && bytesToCompact + sz > compactBytesLimit) {
|
||||
return false;
|
||||
}
|
||||
bytesToCompact += sz;
|
||||
adjustUpdateIndexes(reader);
|
||||
tables.addFirst(reader);
|
||||
return true;
|
||||
}
|
||||
|
||||
private void adjustUpdateIndexes(ReftableReader reader) throws IOException {
|
||||
if (minUpdateIndex == -1) {
|
||||
minUpdateIndex = reader.minUpdateIndex();
|
||||
} else {
|
||||
minUpdateIndex = Math.min(minUpdateIndex, reader.minUpdateIndex());
|
||||
}
|
||||
maxUpdateIndex = Math.max(maxUpdateIndex, reader.maxUpdateIndex());
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a compaction to {@code out}.
|
||||
*
|
||||
|
@ -241,8 +187,9 @@ public void compact() throws IOException {
|
|||
MergedReftable mr = new MergedReftable(new ArrayList<>(tables));
|
||||
mr.setIncludeDeletes(includeDeletes);
|
||||
|
||||
writer.setMinUpdateIndex(Math.max(minUpdateIndex, 0));
|
||||
writer.setMaxUpdateIndex(maxUpdateIndex);
|
||||
writer.setMaxUpdateIndex(mr.maxUpdateIndex());
|
||||
writer.setMinUpdateIndex(mr.minUpdateIndex());
|
||||
|
||||
writer.begin();
|
||||
mergeRefs(mr);
|
||||
mergeLogs(mr);
|
||||
|
@ -268,16 +215,14 @@ private void mergeRefs(MergedReftable mr) throws IOException {
|
|||
}
|
||||
|
||||
private void mergeLogs(MergedReftable mr) throws IOException {
|
||||
if (oldestReflogTimeMillis == Long.MAX_VALUE) {
|
||||
if (reflogExpireOldestReflogTimeMillis == Long.MAX_VALUE) {
|
||||
return;
|
||||
}
|
||||
|
||||
try (LogCursor lc = mr.allLogs()) {
|
||||
while (lc.next()) {
|
||||
long updateIndex = lc.getUpdateIndex();
|
||||
if (updateIndex < minUpdateIndex
|
||||
|| updateIndex > maxUpdateIndex) {
|
||||
// Cannot merge log records outside the header's range.
|
||||
if (updateIndex > reflogExpireMaxUpdateIndex || updateIndex < reflogExpireMinUpdateIndex) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -291,7 +236,7 @@ private void mergeLogs(MergedReftable mr) throws IOException {
|
|||
}
|
||||
|
||||
PersonIdent who = log.getWho();
|
||||
if (who.getWhen().getTime() >= oldestReflogTimeMillis) {
|
||||
if (who.getWhen().getTime() >= reflogExpireOldestReflogTimeMillis) {
|
||||
writer.writeLog(
|
||||
refName,
|
||||
updateIndex,
|
||||
|
|
|
@ -139,15 +139,9 @@ public boolean hasObjectMap() throws IOException {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the minimum update index for log entries that appear in this
|
||||
* reftable.
|
||||
*
|
||||
* @return the minimum update index for log entries that appear in this
|
||||
* reftable. This should be 1 higher than the prior reftable's
|
||||
* {@code maxUpdateIndex} if this table is used in a stack.
|
||||
* @throws java.io.IOException
|
||||
* file cannot be read.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public long minUpdateIndex() throws IOException {
|
||||
if (blockSize == -1) {
|
||||
readFileHeader();
|
||||
|
|
Loading…
Reference in New Issue