From 44a75d9ea8a549d5eb3ab155bcca530b4e0ad595 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Sat, 19 Aug 2017 11:28:34 -0700 Subject: [PATCH] reftable: explicitly store update_index per ref Add an update_index to every reference in a reftable, storing the exact transaction that last modified the reference. This is necessary to fix some merge race conditions. Consider updates at T1, T3 are present in two reftables. Compacting these will create a table with range [T1,T3]. If T2 arrives during or after the compaction its impossible for readers to know how to merge the [T1,T3] table with the T2 table. With an explicit update_index per reference, MergedReftable is able to individually sort each reference, merging individual entries at T3 from [T1,T3] ahead of identically named entries appearing in T2. Change-Id: Ie4065d4176a5a0207dcab9696ae05d086e042140 --- Documentation/technical/reftable.md | 5 ++ .../storage/reftable/MergedReftableTest.java | 51 +++++++++++++++++-- .../storage/reftable/ReftableTest.java | 17 ++++--- .../storage/reftable/BlockReader.java | 5 ++ .../storage/reftable/BlockWriter.java | 14 +++-- .../storage/reftable/MergedReftable.java | 37 +++++++------- .../internal/storage/reftable/RefCursor.java | 3 ++ .../storage/reftable/ReftableCompactor.java | 2 +- .../storage/reftable/ReftableReader.java | 14 +++++ .../storage/reftable/ReftableWriter.java | 25 ++++++++- 10 files changed, 134 insertions(+), 39 deletions(-) diff --git a/Documentation/technical/reftable.md b/Documentation/technical/reftable.md index 34f752950..47c61a350 100644 --- a/Documentation/technical/reftable.md +++ b/Documentation/technical/reftable.md @@ -243,6 +243,7 @@ its value(s). Records are formatted as: varint( prefix_length ) varint( (suffix_length << 3) | value_type ) suffix + varint( update_index_delta ) value? The `prefix_length` field specifies how many leading bytes of the @@ -258,6 +259,10 @@ Recovering a reference name from any `ref_record` is a simple concat: The `suffix_length` value provides the number of bytes available in `suffix` to copy from `suffix` to complete the reference name. +The `update_index` that last modified the reference can be obtained by +adding `update_index_delta` to the `min_update_index` from the file +header: `min_update_index + update_index_delta`. + The `value` follows. Its format is determined by `value_type`, one of the following: diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/MergedReftableTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/MergedReftableTest.java index f9ebaf692..adba048e6 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/MergedReftableTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/MergedReftableTest.java @@ -261,6 +261,41 @@ public void oneTableSeek() throws IOException { } } + @Test + public void missedUpdate() throws IOException { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + ReftableWriter writer = new ReftableWriter() + .setMinUpdateIndex(1) + .setMaxUpdateIndex(3) + .begin(buf); + writer.writeRef(ref("refs/heads/a", 1), 1); + writer.writeRef(ref("refs/heads/c", 3), 3); + writer.finish(); + byte[] base = buf.toByteArray(); + + byte[] delta = write(Arrays.asList( + ref("refs/heads/b", 2), + ref("refs/heads/c", 4)), + 2); + MergedReftable mr = merge(base, delta); + try (RefCursor rc = mr.allRefs()) { + assertTrue(rc.next()); + assertEquals("refs/heads/a", rc.getRef().getName()); + assertEquals(id(1), rc.getRef().getObjectId()); + assertEquals(1, rc.getUpdateIndex()); + + assertTrue(rc.next()); + assertEquals("refs/heads/b", rc.getRef().getName()); + assertEquals(id(2), rc.getRef().getObjectId()); + assertEquals(2, rc.getUpdateIndex()); + + assertTrue(rc.next()); + assertEquals("refs/heads/c", rc.getRef().getName()); + assertEquals(id(3), rc.getRef().getObjectId()); + assertEquals(3, rc.getUpdateIndex()); + } + } + @Test public void compaction() throws IOException { List delta1 = Arrays.asList( @@ -322,12 +357,18 @@ private byte[] write(Ref... refs) throws IOException { } private byte[] write(Collection refs) throws IOException { + return write(refs, 1); + } + + private byte[] write(Collection refs, long updateIndex) + throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - ReftableWriter writer = new ReftableWriter().begin(buffer); - for (Ref r : RefComparator.sort(refs)) { - writer.writeRef(r); - } - writer.finish(); + new ReftableWriter() + .setMinUpdateIndex(updateIndex) + .setMaxUpdateIndex(updateIndex) + .begin(buffer) + .sortAndWriteRefs(refs) + .finish(); return buffer.toByteArray(); } diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/ReftableTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/ReftableTest.java index 6809d7b2b..3ea3061e3 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/ReftableTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/reftable/ReftableTest.java @@ -126,7 +126,7 @@ public void emptyVirtualTableFromRefs() throws IOException { @Test public void estimateCurrentBytesOneRef() throws IOException { Ref exp = ref(MASTER, 1); - int expBytes = 24 + 4 + 5 + 3 + MASTER.length() + 20 + 68; + int expBytes = 24 + 4 + 5 + 4 + MASTER.length() + 20 + 68; byte[] table; ReftableConfig cfg = new ReftableConfig(); @@ -155,7 +155,7 @@ public void estimateCurrentBytesWithIndex() throws IOException { cfg.setIndexObjects(false); cfg.setMaxIndexLevels(1); - int expBytes = 139654; + int expBytes = 147860; byte[] table; ReftableWriter writer = new ReftableWriter().setConfig(cfg); try (ByteArrayOutputStream buf = new ByteArrayOutputStream()) { @@ -174,7 +174,7 @@ public void estimateCurrentBytesWithIndex() throws IOException { public void oneIdRef() throws IOException { Ref exp = ref(MASTER, 1); byte[] table = write(exp); - assertEquals(24 + 4 + 5 + 3 + MASTER.length() + 20 + 68, table.length); + assertEquals(24 + 4 + 5 + 4 + MASTER.length() + 20 + 68, table.length); ReftableReader t = read(table); try (RefCursor rc = t.allRefs()) { @@ -203,7 +203,7 @@ public void oneIdRef() throws IOException { public void oneTagRef() throws IOException { Ref exp = tag(V1_0, 1, 2); byte[] table = write(exp); - assertEquals(24 + 4 + 5 + 2 + V1_0.length() + 40 + 68, table.length); + assertEquals(24 + 4 + 5 + 3 + V1_0.length() + 40 + 68, table.length); ReftableReader t = read(table); try (RefCursor rc = t.allRefs()) { @@ -224,7 +224,7 @@ public void oneSymbolicRef() throws IOException { Ref exp = sym(HEAD, MASTER); byte[] table = write(exp); assertEquals( - 24 + 4 + 5 + 2 + HEAD.length() + 1 + MASTER.length() + 68, + 24 + 4 + 5 + 2 + HEAD.length() + 2 + MASTER.length() + 68, table.length); ReftableReader t = read(table); @@ -281,7 +281,7 @@ public void oneDeletedRef() throws IOException { String name = "refs/heads/gone"; Ref exp = newRef(name); byte[] table = write(exp); - assertEquals(24 + 4 + 5 + 2 + name.length() + 68, table.length); + assertEquals(24 + 4 + 5 + 3 + name.length() + 68, table.length); ReftableReader t = read(table); try (RefCursor rc = t.allRefs()) { @@ -425,13 +425,14 @@ public void withReflog() throws IOException { writer.finish(); byte[] table = buffer.toByteArray(); - assertEquals(245, table.length); + assertEquals(247, table.length); ReftableReader t = read(table); try (RefCursor rc = t.allRefs()) { assertTrue(rc.next()); assertEquals(MASTER, rc.getRef().getName()); assertEquals(id(1), rc.getRef().getObjectId()); + assertEquals(1, rc.getUpdateIndex()); assertTrue(rc.next()); assertEquals(NEXT, rc.getRef().getName()); @@ -636,7 +637,7 @@ public void nameTooLongDoesNotWrite() throws IOException { writer.finish(); fail("expected BlockSizeTooSmallException"); } catch (BlockSizeTooSmallException e) { - assertEquals(84, e.getMinimumBlockSize()); + assertEquals(85, e.getMinimumBlockSize()); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockReader.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockReader.java index a92bedceb..ce2ba4a2e 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockReader.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockReader.java @@ -166,6 +166,10 @@ long readPositionFromIndex() throws IOException { return readVarint64(); } + long readUpdateIndexDelta() { + return readVarint64(); + } + Ref readRef() throws IOException { String name = RawParseUtils.decode(UTF_8, nameBuf, 0, nameLen); switch (valueType & VALUE_TYPE_MASK) { @@ -490,6 +494,7 @@ private int scanToKey(byte[] key, int rPtr, int rIdx, int rCmp) { void skipValue() { switch (blockType) { case REF_BLOCK_TYPE: + readVarint64(); // update_index_delta switch (valueType & VALUE_TYPE_MASK) { case VALUE_NONE: return; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockWriter.java index 8f3e889de..b3173e838 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/BlockWriter.java @@ -354,10 +354,12 @@ void writeValue(ReftableOutputStream os) { static class RefEntry extends Entry { final Ref ref; + final long updateIndexDelta; - RefEntry(Ref ref) { + RefEntry(Ref ref, long updateIndexDelta) { super(nameUtf8(ref)); this.ref = ref; + this.updateIndexDelta = updateIndexDelta; } @Override @@ -380,17 +382,18 @@ int valueType() { @Override int valueSize() { + int n = computeVarintSize(updateIndexDelta); switch (valueType()) { case VALUE_NONE: - return 0; + return n; case VALUE_1ID: - return OBJECT_ID_LENGTH; + return n + OBJECT_ID_LENGTH; case VALUE_2ID: - return 2 * OBJECT_ID_LENGTH; + return n + 2 * OBJECT_ID_LENGTH; case VALUE_SYMREF: if (ref.isSymbolic()) { int nameLen = nameUtf8(ref.getTarget()).length; - return computeVarintSize(nameLen) + nameLen; + return n + computeVarintSize(nameLen) + nameLen; } } throw new IllegalStateException(); @@ -398,6 +401,7 @@ int valueSize() { @Override void writeValue(ReftableOutputStream os) throws IOException { + os.writeVarint(updateIndexDelta); switch (valueType()) { case VALUE_NONE: return; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/MergedReftable.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/MergedReftable.java index 71144cd30..9fc6ae2bb 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/MergedReftable.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/MergedReftable.java @@ -100,13 +100,6 @@ public RefCursor allRefs() throws IOException { @Override public RefCursor seekRef(String name) throws IOException { - if (name.endsWith("/")) { //$NON-NLS-1$ - return seekRefPrefix(name); - } - return seekSingleRef(name); - } - - private RefCursor seekRefPrefix(String name) throws IOException { MergedRefCursor m = new MergedRefCursor(); for (int i = 0; i < tables.length; i++) { m.add(new RefQueueEntry(tables[i].seekRef(name), i)); @@ -114,17 +107,6 @@ private RefCursor seekRefPrefix(String name) throws IOException { return m; } - private RefCursor seekSingleRef(String name) throws IOException { - // Walk the tables from highest priority (end of list) to lowest. - // As soon as the reference is found (queue not empty), all lower - // priority tables are irrelevant as current table shadows them. - MergedRefCursor m = new MergedRefCursor(); - for (int i = tables.length - 1; i >= 0 && m.queue.isEmpty(); i--) { - m.add(new RefQueueEntry(tables[i].seekRef(name), i)); - } - return m; - } - @Override public RefCursor byObjectId(AnyObjectId name) throws IOException { MergedRefCursor m = new MergedRefCursor(); @@ -168,6 +150,7 @@ private class MergedRefCursor extends RefCursor { private final PriorityQueue queue; private RefQueueEntry head; private Ref ref; + private long updateIndex; MergedRefCursor() { queue = new PriorityQueue<>(queueSize(), RefQueueEntry::compare); @@ -205,6 +188,7 @@ public boolean next() throws IOException { } ref = t.rc.getRef(); + updateIndex = t.rc.getUpdateIndex(); boolean include = includeDeletes || !t.rc.wasDeleted(); skipShadowedRefs(ref.getName()); add(t); @@ -239,8 +223,17 @@ public Ref getRef() { return ref; } + @Override + public long getUpdateIndex() { + return updateIndex; + } + @Override public void close() { + if (head != null) { + head.rc.close(); + head = null; + } while (!queue.isEmpty()) { queue.remove().rc.close(); } @@ -250,6 +243,10 @@ public void close() { private static class RefQueueEntry { static int compare(RefQueueEntry a, RefQueueEntry b) { int cmp = a.name().compareTo(b.name()); + if (cmp == 0) { + // higher updateIndex shadows lower updateIndex. + cmp = Long.signum(b.updateIndex() - a.updateIndex()); + } if (cmp == 0) { // higher index shadows lower index, so higher index first. cmp = b.stackIdx - a.stackIdx; @@ -268,6 +265,10 @@ static int compare(RefQueueEntry a, RefQueueEntry b) { String name() { return rc.getRef().getName(); } + + long updateIndex() { + return rc.getUpdateIndex(); + } } private class MergedLogCursor extends LogCursor { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/RefCursor.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/RefCursor.java index 786fae1a6..d8e9c609f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/RefCursor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/RefCursor.java @@ -61,6 +61,9 @@ public abstract class RefCursor implements AutoCloseable { /** @return reference at the current position. */ public abstract Ref getRef(); + /** @return updateIndex that last modified the current reference, */ + public abstract long getUpdateIndex(); + /** @return {@code true} if the current reference was deleted. */ public boolean wasDeleted() { Ref r = getRef(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableCompactor.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableCompactor.java index 4f9226710..e7f406ce0 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableCompactor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableCompactor.java @@ -220,7 +220,7 @@ public Stats getStats() { private void mergeRefs(MergedReftable mr) throws IOException { try (RefCursor rc = mr.allRefs()) { while (rc.next()) { - writer.writeRef(rc.getRef()); + writer.writeRef(rc.getRef(), rc.getUpdateIndex()); } } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableReader.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableReader.java index be1eb40ed..407a77c7d 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableReader.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableReader.java @@ -455,6 +455,7 @@ private class RefCursorImpl extends RefCursor { private final boolean prefix; private Ref ref; + private long updateIndex; BlockReader block; RefCursorImpl(long scanEnd, byte[] match, boolean prefix) { @@ -483,6 +484,7 @@ public boolean next() throws IOException { return false; } + updateIndex = minUpdateIndex + block.readUpdateIndexDelta(); ref = block.readRef(); if (!includeDeletes && wasDeleted()) { continue; @@ -496,6 +498,11 @@ public Ref getRef() { return ref; } + @Override + public long getUpdateIndex() { + return updateIndex; + } + @Override public void close() { // Do nothing. @@ -574,6 +581,7 @@ private class ObjCursorImpl extends RefCursor { private final ObjectId match; private Ref ref; + private long updateIndex; private int listIdx; private LongList blockPos; @@ -647,6 +655,7 @@ public boolean next() throws IOException { } block.parseKey(); + updateIndex = minUpdateIndex + block.readUpdateIndexDelta(); ref = block.readRef(); ObjectId id = ref.getObjectId(); if (id != null && match.equals(id) @@ -661,6 +670,11 @@ public Ref getRef() { return ref; } + @Override + public long getUpdateIndex() { + return updateIndex; + } + @Override public void close() { // Do nothing. diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableWriter.java index 45b759f95..0ac2445fc 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/reftable/ReftableWriter.java @@ -214,7 +214,7 @@ public ReftableWriter begin(OutputStream os) throws IOException { public ReftableWriter sortAndWriteRefs(Collection refsToPack) throws IOException { Iterator itr = refsToPack.stream() - .map(RefEntry::new) + .map(r -> new RefEntry(r, maxUpdateIndex - minUpdateIndex)) .sorted(Entry::compare) .iterator(); while (itr.hasNext()) { @@ -236,7 +236,28 @@ public ReftableWriter sortAndWriteRefs(Collection refsToPack) * if reftable cannot be written. */ public void writeRef(Ref ref) throws IOException { - long blockPos = refs.write(new RefEntry(ref)); + writeRef(ref, maxUpdateIndex); + } + + /** + * Write one reference to the reftable. + *

+ * References must be passed in sorted order. + * + * @param ref + * the reference to store. + * @param updateIndex + * the updateIndex that modified this reference. Must be + * {@code >= minUpdateIndex} for this file. + * @throws IOException + * if reftable cannot be written. + */ + public void writeRef(Ref ref, long updateIndex) throws IOException { + if (updateIndex < minUpdateIndex) { + throw new IllegalArgumentException(); + } + long d = updateIndex - minUpdateIndex; + long blockPos = refs.write(new RefEntry(ref, d)); indexRef(ref, blockPos); }