PackInserter: Implement newReader()

Change-Id: Ib9e7f6439332eaed3d936f895a5271a7d514d3e9
This commit is contained in:
Dave Borowitz 2017-11-01 11:46:48 -04:00
parent 080b4770e7
commit 678c99c057
2 changed files with 351 additions and 2 deletions

View File

@ -45,6 +45,7 @@
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
import static org.eclipse.jgit.lib.Constants.OBJ_COMMIT;
import static org.hamcrest.Matchers.greaterThan;
@ -73,6 +74,7 @@
import org.eclipse.jgit.dircache.DirCache;
import org.eclipse.jgit.dircache.DirCacheBuilder;
import org.eclipse.jgit.dircache.DirCacheEntry;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.junit.RepositoryTestCase;
import org.eclipse.jgit.lib.CommitBuilder;
import org.eclipse.jgit.lib.Constants;
@ -80,12 +82,29 @@
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.storage.file.WindowCacheConfig;
import org.eclipse.jgit.treewalk.CanonicalTreeParser;
import org.eclipse.jgit.util.IO;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("boxing")
public class PackInserterTest extends RepositoryTestCase {
private WindowCacheConfig origWindowCacheConfig;
@Before
public void setWindowCacheConfig() {
origWindowCacheConfig = new WindowCacheConfig();
origWindowCacheConfig.install();
}
@After
public void resetWindowCacheConfig() {
origWindowCacheConfig.install();
}
@Before
public void emptyAtSetUp() throws Exception {
assertEquals(0, listPacks().size());
@ -327,6 +346,100 @@ public void insertLargeInputStreamBypassesCheckExisting() throws Exception {
assertEquals(2, listPacks().size());
}
@Test
public void readBackSmallFiles() throws Exception {
ObjectId blobId1;
ObjectId blobId2;
ObjectId blobId3;
byte[] blob1 = Constants.encode("blob1");
byte[] blob2 = Constants.encode("blob2");
byte[] blob3 = Constants.encode("blob3");
try (PackInserter ins = newInserter()) {
assertThat(blob1.length, lessThan(ins.getBufferSize()));
blobId1 = ins.insert(OBJ_BLOB, blob1);
try (ObjectReader reader = ins.newReader()) {
assertBlob(reader, blobId1, blob1);
}
// Read-back should not mess up the file pointer.
blobId2 = ins.insert(OBJ_BLOB, blob2);
ins.flush();
blobId3 = ins.insert(OBJ_BLOB, blob3);
}
assertPacksOnly();
List<PackFile> packs = listPacks();
assertEquals(1, packs.size());
assertEquals(2, packs.get(0).getObjectCount());
try (ObjectReader reader = db.newObjectReader()) {
assertBlob(reader, blobId1, blob1);
assertBlob(reader, blobId2, blob2);
try {
reader.open(blobId3);
fail("Expected MissingObjectException");
} catch (MissingObjectException expected) {
// Expected.
}
}
}
@Test
public void readBackLargeFile() throws Exception {
ObjectId blobId;
byte[] blob = newLargeBlob();
WindowCacheConfig wcc = new WindowCacheConfig();
wcc.setStreamFileThreshold(1024);
wcc.install();
try (ObjectReader reader = db.newObjectReader()) {
assertThat(blob.length, greaterThan(reader.getStreamFileThreshold()));
}
try (PackInserter ins = newInserter()) {
blobId = ins.insert(OBJ_BLOB, blob);
try (ObjectReader reader = ins.newReader()) {
// Double-check threshold is propagated.
assertThat(blob.length, greaterThan(reader.getStreamFileThreshold()));
assertBlob(reader, blobId, blob);
}
}
assertPacksOnly();
// Pack was streamed out to disk and read back from the temp file, but
// ultimately rolled back and deleted.
assertEquals(0, listPacks().size());
try (ObjectReader reader = db.newObjectReader()) {
try {
reader.open(blobId);
fail("Expected MissingObjectException");
} catch (MissingObjectException expected) {
// Expected.
}
}
}
@Test
public void readBackFallsBackToRepo() throws Exception {
ObjectId blobId;
byte[] blob = Constants.encode("foo contents");
try (PackInserter ins = newInserter()) {
assertThat(blob.length, lessThan(ins.getBufferSize()));
blobId = ins.insert(OBJ_BLOB, blob);
ins.flush();
}
try (PackInserter ins = newInserter();
ObjectReader reader = ins.newReader()) {
assertBlob(reader, blobId, blob);
}
}
private List<PackFile> listPacks() throws Exception {
List<PackFile> fromOpenDb = listPacks(db);
List<PackFile> reopened;
@ -380,7 +493,12 @@ private static void assertBlob(ObjectReader reader, ObjectId id,
ObjectLoader loader = reader.open(id);
assertEquals(OBJ_BLOB, loader.getType());
assertEquals(expected.length, loader.getSize());
assertArrayEquals(expected, loader.getBytes());
try (ObjectStream s = loader.openStream()) {
int n = (int) s.getSize();
byte[] actual = new byte[n];
assertEquals(n, IO.readFully(s, actual, 0));
assertArrayEquals(expected, actual);
}
}
private void assertPacksOnly() throws Exception {

View File

@ -45,7 +45,10 @@
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.eclipse.jgit.lib.Constants.OBJECT_ID_LENGTH;
import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
@ -53,19 +56,35 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.zip.CRC32;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.eclipse.jgit.errors.CorruptObjectException;
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.LargeObjectException;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.lib.AbbreviatedObjectId;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.InflaterCache;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdOwnerMap;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.transport.PackParser;
import org.eclipse.jgit.transport.PackedObjectInfo;
import org.eclipse.jgit.util.BlockList;
@ -93,6 +112,7 @@ class PackInserter extends ObjectInserter {
private int compression = Deflater.BEST_COMPRESSION;
private File tmpPack;
private PackStream packOut;
private Inflater cachedInflater;
PackInserter(ObjectDirectory db) {
this.db = db;
@ -217,7 +237,7 @@ public PackParser newPackParser(InputStream in) {
@Override
public ObjectReader newReader() {
throw new UnsupportedOperationException();
return new Reader();
}
@Override
@ -315,6 +335,11 @@ public void close() {
}
} finally {
clear();
try {
InflaterCache.release(cachedInflater);
} finally {
cachedInflater = null;
}
}
}
@ -325,6 +350,15 @@ private void clear() {
packOut = null;
}
private Inflater inflater() {
if (cachedInflater == null) {
cachedInflater = InflaterCache.get();
} else {
cachedInflater.reset();
}
return cachedInflater;
}
private class PackStream extends OutputStream {
final byte[] hdrBuf;
final CRC32 crc32;
@ -404,5 +438,202 @@ public void close() throws IOException {
out.close();
file.close();
}
byte[] inflate(long filePos, int len) throws IOException, DataFormatException {
byte[] dstbuf;
try {
dstbuf = new byte[len];
} catch (OutOfMemoryError noMemory) {
return null; // Caller will switch to large object streaming.
}
byte[] srcbuf = buffer();
Inflater inf = inflater();
filePos += setInput(filePos, inf, srcbuf);
for (int dstoff = 0;;) {
int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
dstoff += n;
if (inf.finished()) {
return dstbuf;
}
if (inf.needsInput()) {
filePos += setInput(filePos, inf, srcbuf);
} else if (n == 0) {
throw new DataFormatException();
}
}
}
private int setInput(long filePos, Inflater inf, byte[] buf)
throws IOException {
if (file.getFilePointer() != filePos) {
file.seek(filePos);
}
int n = file.read(buf);
if (n < 0) {
throw new EOFException(JGitText.get().unexpectedEofInPack);
}
inf.setInput(buf, 0, n);
return n;
}
}
private class Reader extends ObjectReader {
private final ObjectReader ctx;
private Reader() {
ctx = db.newReader();
setStreamFileThreshold(ctx.getStreamFileThreshold());
}
@Override
public ObjectReader newReader() {
return db.newReader();
}
@Override
public ObjectInserter getCreatedFromInserter() {
return PackInserter.this;
}
@Override
public Collection<ObjectId> resolve(AbbreviatedObjectId id)
throws IOException {
Collection<ObjectId> stored = ctx.resolve(id);
if (objectList == null) {
return stored;
}
Set<ObjectId> r = new HashSet<>(stored.size() + 2);
r.addAll(stored);
for (PackedObjectInfo obj : objectList) {
if (id.prefixCompare(obj) == 0) {
r.add(obj.copy());
}
}
return r;
}
@Override
public ObjectLoader open(AnyObjectId objectId, int typeHint)
throws MissingObjectException, IncorrectObjectTypeException,
IOException {
if (objectMap == null) {
return ctx.open(objectId, typeHint);
}
PackedObjectInfo obj = objectMap.get(objectId);
if (obj == null) {
return ctx.open(objectId, typeHint);
}
byte[] buf = buffer();
RandomAccessFile f = packOut.file;
f.seek(obj.getOffset());
int cnt = f.read(buf, 0, 20);
if (cnt <= 0) {
throw new EOFException(JGitText.get().unexpectedEofInPack);
}
int c = buf[0] & 0xff;
int type = (c >> 4) & 7;
if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA) {
throw new IOException(MessageFormat.format(
JGitText.get().cannotReadBackDelta, Integer.toString(type)));
}
if (typeHint != OBJ_ANY && type != typeHint) {
throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
}
long sz = c & 0x0f;
int ptr = 1;
int shift = 4;
while ((c & 0x80) != 0) {
if (ptr >= cnt) {
throw new EOFException(JGitText.get().unexpectedEofInPack);
}
c = buf[ptr++] & 0xff;
sz += ((long) (c & 0x7f)) << shift;
shift += 7;
}
long zpos = obj.getOffset() + ptr;
if (sz < getStreamFileThreshold()) {
byte[] data = inflate(obj, zpos, (int) sz);
if (data != null) {
return new ObjectLoader.SmallObject(type, data);
}
}
return new StreamLoader(f, type, sz, zpos);
}
private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
throws IOException, CorruptObjectException {
try {
return packOut.inflate(zpos, sz);
} catch (DataFormatException dfe) {
CorruptObjectException coe = new CorruptObjectException(
MessageFormat.format(
JGitText.get().objectAtHasBadZlibStream,
Long.valueOf(obj.getOffset()),
tmpPack.getAbsolutePath()));
coe.initCause(dfe);
throw coe;
}
}
@Override
public Set<ObjectId> getShallowCommits() throws IOException {
return ctx.getShallowCommits();
}
@Override
public void close() {
ctx.close();
}
private class StreamLoader extends ObjectLoader {
private final RandomAccessFile file;
private final int type;
private final long size;
private final long pos;
StreamLoader(RandomAccessFile file, int type, long size, long pos) {
this.file = file;
this.type = type;
this.size = size;
this.pos = pos;
}
@Override
public ObjectStream openStream()
throws MissingObjectException, IOException {
int bufsz = buffer().length;
file.seek(pos);
return new ObjectStream.Filter(
type, size,
new BufferedInputStream(
new InflaterInputStream(
Channels.newInputStream(packOut.file.getChannel()),
inflater(), bufsz),
bufsz));
}
@Override
public int getType() {
return type;
}
@Override
public long getSize() {
return size;
}
@Override
public byte[] getCachedBytes() throws LargeObjectException {
throw new LargeObjectException.ExceedsLimit(
getStreamFileThreshold(), size);
}
}
}
}