Merge "PackOutputStream: Extract cancellation and digest to superclass"

This commit is contained in:
Ivan Frade 2022-01-27 11:42:54 -05:00 committed by Gerrit Code Review @ Eclipse.org
commit 27e554e465
3 changed files with 229 additions and 72 deletions

View File

@ -0,0 +1,100 @@
/*
* Copyright (C) 2022, Tencent.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.eclipse.jgit.internal.storage.io;
import static org.eclipse.jgit.internal.storage.io.CancellableDigestOutputStream.BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.InterruptedIOException;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.util.io.NullOutputStream;
import org.junit.Test;
public class CancellableDigestOutputStreamTest {
private static class CancelledTestMonitor implements ProgressMonitor {
private boolean cancelled = false;
public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}
@Override
public void start(int totalTasks) {
}
@Override
public void beginTask(String title, int totalWork) {
}
@Override
public void update(int completed) {
}
@Override
public void endTask() {
}
@Override
public boolean isCancelled() {
return cancelled;
}
}
@Test
public void testCancelInProcess() throws Exception {
CancelledTestMonitor m = new CancelledTestMonitor();
CancellableDigestOutputStream out = new CancellableDigestOutputStream(m,
NullOutputStream.INSTANCE);
byte[] KB = new byte[1024];
int triggerCancelWriteCnt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK
/ KB.length;
for (int i = 0; i < triggerCancelWriteCnt + 1; i++) {
out.write(KB);
}
assertTrue(out.length() > BYTES_TO_WRITE_BEFORE_CANCEL_CHECK);
m.setCancelled(true);
for (int i = 0; i < triggerCancelWriteCnt - 1; i++) {
out.write(KB);
}
long lastLength = out.length();
assertThrows(InterruptedIOException.class, () -> {
out.write(1);
});
assertEquals(lastLength, out.length());
assertThrows(InterruptedIOException.class, () -> {
out.write(new byte[1]);
});
assertEquals(lastLength, out.length());
}
@Test
public void testTriggerCheckAfterSingleBytes() throws Exception {
CancelledTestMonitor m = new CancelledTestMonitor();
CancellableDigestOutputStream out = new CancellableDigestOutputStream(m,
NullOutputStream.INSTANCE);
byte[] bytes = new byte[BYTES_TO_WRITE_BEFORE_CANCEL_CHECK + 1];
m.setCancelled(true);
assertThrows(InterruptedIOException.class, () -> {
out.write(bytes);
});
assertEquals(BYTES_TO_WRITE_BEFORE_CANCEL_CHECK, out.length());
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright (C) 2022, Tencent.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.eclipse.jgit.internal.storage.io;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ProgressMonitor;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.security.MessageDigest;
/**
* An OutputStream that keeps a digest and checks every N bytes for
* cancellation.
*/
public class CancellableDigestOutputStream extends OutputStream {
/** The OutputStream checks every this value for cancellation **/
public static final int BYTES_TO_WRITE_BEFORE_CANCEL_CHECK = 128 * 1024;
private final ProgressMonitor writeMonitor;
private final OutputStream out;
private final MessageDigest md = Constants.newMessageDigest();
private long count;
private long checkCancelAt;
/**
* Initialize a CancellableDigestOutputStream.
*
* @param writeMonitor
* monitor to update on output progress and check cancel.
* @param out
* target stream to receive all contents.
*/
public CancellableDigestOutputStream(ProgressMonitor writeMonitor,
OutputStream out) {
this.writeMonitor = writeMonitor;
this.out = out;
this.checkCancelAt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
/**
* Get the monitor which is used to update on output progress and check
* cancel.
*
* @return the monitor
*/
public final ProgressMonitor getWriteMonitor() {
return writeMonitor;
}
/**
* Obtain the current SHA-1 digest.
*
* @return SHA-1 digest
*/
public final byte[] getDigest() {
return md.digest();
}
/**
* Get total number of bytes written since stream start.
*
* @return total number of bytes written since stream start.
*/
public final long length() {
return count;
}
/** {@inheritDoc} */
@Override
public final void write(int b) throws IOException {
if (checkCancelAt <= count) {
if (writeMonitor.isCancelled()) {
throw new InterruptedIOException();
}
checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
out.write(b);
md.update((byte) b);
count++;
}
/** {@inheritDoc} */
@Override
public final void write(byte[] b, int off, int len) throws IOException {
while (0 < len) {
if (checkCancelAt <= count) {
if (writeMonitor.isCancelled()) {
throw new InterruptedIOException();
}
checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK);
out.write(b, off, n);
md.update(b, off, n);
count += n;
off += n;
len -= n;
}
}
/** {@inheritDoc} */
@Override
public void flush() throws IOException {
out.flush();
}
}

View File

@ -17,10 +17,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.security.MessageDigest;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.internal.storage.io.CancellableDigestOutputStream;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.util.NB;
@ -28,25 +26,14 @@
* Custom output stream to support
* {@link org.eclipse.jgit.internal.storage.pack.PackWriter}.
*/
public final class PackOutputStream extends OutputStream {
private static final int BYTES_TO_WRITE_BEFORE_CANCEL_CHECK = 128 * 1024;
private final ProgressMonitor writeMonitor;
private final OutputStream out;
public final class PackOutputStream extends CancellableDigestOutputStream {
private final PackWriter packWriter;
private final MessageDigest md = Constants.newMessageDigest();
private long count;
private final byte[] headerBuffer = new byte[32];
private final byte[] copyBuffer = new byte[64 << 10];
private long checkCancelAt;
private boolean ofsDelta;
/**
@ -66,48 +53,8 @@ public final class PackOutputStream extends OutputStream {
*/
public PackOutputStream(final ProgressMonitor writeMonitor,
final OutputStream out, final PackWriter pw) {
this.writeMonitor = writeMonitor;
this.out = out;
super(writeMonitor, out);
this.packWriter = pw;
this.checkCancelAt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
/** {@inheritDoc} */
@Override
public final void write(int b) throws IOException {
count++;
out.write(b);
md.update((byte) b);
}
/** {@inheritDoc} */
@Override
public final void write(byte[] b, int off, int len)
throws IOException {
while (0 < len) {
final int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK);
count += n;
if (checkCancelAt <= count) {
if (writeMonitor.isCancelled()) {
throw new IOException(
JGitText.get().packingCancelledDuringObjectsWriting);
}
checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK;
}
out.write(b, off, n);
md.update(b, off, n);
off += n;
len -= n;
}
}
/** {@inheritDoc} */
@Override
public void flush() throws IOException {
out.flush();
}
final void writeFileHeader(int version, long objectCount)
@ -160,7 +107,7 @@ public final void writeHeader(ObjectToPack otp, long rawLength)
ObjectToPack b = otp.getDeltaBase();
if (b != null && (b.isWritten() & ofsDelta)) { // Non-short-circuit logic is intentional
int n = objectHeader(rawLength, OBJ_OFS_DELTA, headerBuffer);
n = ofsDelta(count - b.getOffset(), headerBuffer, n);
n = ofsDelta(length() - b.getOffset(), headerBuffer, n);
write(headerBuffer, 0, n);
} else if (otp.isDeltaRepresentation()) {
int n = objectHeader(rawLength, OBJ_REF_DELTA, headerBuffer);
@ -209,20 +156,6 @@ public final byte[] getCopyBuffer() {
}
void endObject() {
writeMonitor.update(1);
}
/**
* Get total number of bytes written since stream start.
*
* @return total number of bytes written since stream start.
*/
public final long length() {
return count;
}
/** @return obtain the current SHA-1 digest. */
final byte[] getDigest() {
return md.digest();
getWriteMonitor().update(1);
}
}