Allow ObjectReuseAsIs to have more control over write ordering

The reuse system used by an object database may be able to benefit
from knowing what objects are coming next, and even improve data
throughput by delaying (or moving up) objects that are stored near
each other in the source database.

Pushing the iteration down into the reuse code makes it possible
for a smarter implementation to aggregate reuse.  But for the
standard pack file format on disk we don't bother, its quite
efficient already.

Change-Id: I64f0048ca7071a8b44950d6c2a5dfbca3be6bba6
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
This commit is contained in:
Shawn O. Pearce 2010-08-02 17:00:35 -07:00
parent fe18e52195
commit 28ba4747bc
5 changed files with 110 additions and 27 deletions

View File

@ -132,6 +132,12 @@ public void copyObjectAsIs(PackOutputStream out, ObjectToPack otp)
src.pack.copyAsIs(out, src, this);
}
public void writeObjects(PackOutputStream out, Iterable<ObjectToPack> list)
throws IOException {
for (ObjectToPack otp : list)
out.writeObject(otp);
}
/**
* Copy bytes from the window to a caller supplied buffer.
*

View File

@ -107,6 +107,42 @@ public void selectObjectRepresentation(PackWriter packer,
ProgressMonitor monitor, Iterable<ObjectToPack> objects)
throws IOException, MissingObjectException;
/**
* Write objects to the pack stream in roughly the order given.
*
* {@code PackWriter} invokes this method to write out one or more objects,
* in approximately the order specified by the iteration over the list. A
* simple implementation of this method would just iterate the list and
* output each object:
*
* <pre>
* for (ObjectToPack obj : list)
* out.writeObject(obj)
* </pre>
*
* However more sophisticated implementors may try to perform some (small)
* reordering to access objects that are stored close to each other at
* roughly the same time. Implementations may choose to write objects out of
* order, but this may increase pack file size due to using a larger header
* format to reach a delta base that is later in the stream. It may also
* reduce data locality for the reader, slowing down data access.
*
* Invoking {@link PackOutputStream#writeObject(ObjectToPack)} will cause
* {@link #copyObjectAsIs(PackOutputStream, ObjectToPack)} to be invoked
* recursively on {@code this} if the current object is scheduled for reuse.
*
* @param out
* the stream to write each object to.
* @param list
* the list of objects to write. Objects should be written in
* approximately this order.
* @throws IOException
* the stream cannot be written to, or one or more required
* objects cannot be accessed from the object database.
*/
public void writeObjects(PackOutputStream out, Iterable<ObjectToPack> list)
throws IOException;
/**
* Output a previously selected representation.
* <p>

View File

@ -133,7 +133,7 @@ public ObjectToPack(RevObject obj) {
* representation; null otherwise - if going to be packed as a
* whole object.
*/
ObjectId getDeltaBaseId() {
public ObjectId getDeltaBaseId() {
return deltaBase;
}
@ -143,7 +143,7 @@ ObjectId getDeltaBaseId() {
* pack; null otherwise - if going to be packed as a whole
* object or delta base is specified only as id.
*/
ObjectToPack getDeltaBase() {
public ObjectToPack getDeltaBase() {
if (deltaBase instanceof ObjectToPack)
return (ObjectToPack) deltaBase;
return null;
@ -188,7 +188,7 @@ void clearDeltaBase() {
* @return true if object is going to be written as delta; false
* otherwise.
*/
boolean isDeltaRepresentation() {
public boolean isDeltaRepresentation() {
return deltaBase != null;
}
@ -198,7 +198,7 @@ boolean isDeltaRepresentation() {
*
* @return true if object is already written; false otherwise.
*/
boolean isWritten() {
public boolean isWritten() {
return getOffset() != 0;
}
@ -223,7 +223,11 @@ void markWantWrite() {
flags |= WANT_WRITE;
}
boolean isReuseAsIs() {
/**
* @return true if an existing representation was selected to be reused
* as-is into the pack stream.
*/
public boolean isReuseAsIs() {
return (flags & REUSE_AS_IS) != 0;
}

View File

@ -49,17 +49,20 @@
import java.security.MessageDigest;
import java.util.zip.CRC32;
import org.eclipse.jgit.JGitText;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.util.NB;
/** Custom output stream to support {@link PackWriter}. */
public final class PackOutputStream extends OutputStream {
private final int BYTES_TO_WRITE_BEFORE_CANCEL_CHECK = 128 * 1024;
private final ProgressMonitor writeMonitor;
private final OutputStream out;
private final boolean ofsDelta;
private final PackWriter packWriter;
private final CRC32 crc = new CRC32();
@ -71,6 +74,8 @@ public final class PackOutputStream extends OutputStream {
private byte[] copyBuffer;
private long checkCancelAt;
/**
* Initialize a pack output stream.
* <p>
@ -89,7 +94,8 @@ public PackOutputStream(final ProgressMonitor writeMonitor,
final OutputStream out, final PackWriter pw) {
this.writeMonitor = writeMonitor;
this.out = out;
this.ofsDelta = pw.isDeltaBaseAsOffset();
this.packWriter = pw;
this.checkCancelAt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
@Override
@ -101,12 +107,27 @@ public void write(final int b) throws IOException {
}
@Override
public void write(final byte[] b, final int off, final int len)
public void write(final byte[] b, int off, int len)
throws IOException {
count += len;
out.write(b, off, len);
crc.update(b, off, len);
md.update(b, off, len);
while (0 < len) {
final int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK);
count += n;
if (checkCancelAt <= count) {
if (writeMonitor.isCancelled()) {
throw new IOException(
JGitText.get().packingCancelledDuringObjectsWriting);
}
checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
out.write(b, off, n);
crc.update(b, off, n);
md.update(b, off, n);
off += n;
len -= n;
}
}
@Override
@ -121,6 +142,25 @@ void writeFileHeader(int version, int objectCount) throws IOException {
write(headerBuffer, 0, 12);
}
/**
* Write one object.
*
* If the object was already written, this method does nothing and returns
* quickly. This case occurs whenever an object was written out of order in
* order to ensure the delta base occurred before the object that needs it.
*
* @param otp
* the object to write.
* @throws IOException
* the object cannot be read from the object reader, or the
* output stream is no longer accepting output. Caller must
* examine the type of exception and possibly its message to
* distinguish between these cases.
*/
public void writeObject(ObjectToPack otp) throws IOException {
packWriter.writeObject(this, otp);
}
/**
* Commits the object header onto the stream.
* <p>
@ -140,7 +180,7 @@ void writeFileHeader(int version, int objectCount) throws IOException {
public void writeHeader(ObjectToPack otp, long rawLength)
throws IOException {
if (otp.isDeltaRepresentation()) {
if (ofsDelta) {
if (packWriter.isDeltaBaseAsOffset()) {
ObjectToPack baseInPack = otp.getDeltaBase();
if (baseInPack != null && baseInPack.isWritten()) {
final long start = count;
@ -168,8 +208,7 @@ public void writeHeader(ObjectToPack otp, long rawLength)
private int encodeTypeSize(int type, long rawLength) {
long nextLength = rawLength >>> 4;
headerBuffer[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00)
| (type << 4) | (rawLength & 0x0F));
headerBuffer[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
rawLength = nextLength;
int n = 1;
while (rawLength > 0) {

View File

@ -503,7 +503,7 @@ public void writePack(ProgressMonitor compressMonitor,
writeMonitor.beginTask(JGitText.get().writingObjects, objCnt);
out.writeFileHeader(PACK_VERSION_GENERATED, objCnt);
out.flush();
writeObjects(writeMonitor, out);
writeObjects(out);
writeChecksum(out);
reader.release();
@ -813,21 +813,19 @@ private void runTasks(ExecutorService pool, List<DeltaTask> tasks,
}
}
private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out)
throws IOException {
for (List<ObjectToPack> list : objectsLists) {
for (ObjectToPack otp : list) {
if (writeMonitor.isCancelled())
throw new IOException(
JGitText.get().packingCancelledDuringObjectsWriting);
if (!otp.isWritten())
writeObject(out, otp);
private void writeObjects(PackOutputStream out) throws IOException {
if (reuseSupport != null) {
for (List<ObjectToPack> list : objectsLists)
reuseSupport.writeObjects(out, list);
} else {
for (List<ObjectToPack> list : objectsLists) {
for (ObjectToPack otp : list)
out.writeObject(otp);
}
}
}
private void writeObject(PackOutputStream out, final ObjectToPack otp)
throws IOException {
void writeObject(PackOutputStream out, ObjectToPack otp) throws IOException {
if (otp.isWritten())
return; // We shouldn't be here.