From 1a86c1044dfb8300241728571a865971aa9f1a66 Mon Sep 17 00:00:00 2001 From: kylezhao Date: Sat, 22 Jan 2022 16:54:20 +0800 Subject: [PATCH] PackOutputStream: Extract cancellation and digest to superclass Checking the cancelled status and keeping a digest of the written data is useful for other output streams. e.g. to write commit-graphs. Pull up that functionality to a superclass, so it can be reused. Change-Id: I177b50be09c4ea631e7a144cc6127085ec2ca411 Signed-off-by: kylezhao --- .../io/CancellableDigestOutputStreamTest.java | 100 ++++++++++++++ .../io/CancellableDigestOutputStream.java | 124 ++++++++++++++++++ .../storage/pack/PackOutputStream.java | 77 +---------- 3 files changed, 229 insertions(+), 72 deletions(-) create mode 100644 org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStreamTest.java create mode 100644 org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStream.java diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStreamTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStreamTest.java new file mode 100644 index 000000000..a05ab0c44 --- /dev/null +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStreamTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2022, Tencent. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Distribution License v. 1.0 which is available at + * https://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +package org.eclipse.jgit.internal.storage.io; + +import static org.eclipse.jgit.internal.storage.io.CancellableDigestOutputStream.BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.InterruptedIOException; + +import org.eclipse.jgit.lib.ProgressMonitor; +import org.eclipse.jgit.util.io.NullOutputStream; +import org.junit.Test; + +public class CancellableDigestOutputStreamTest { + private static class CancelledTestMonitor implements ProgressMonitor { + + private boolean cancelled = false; + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + @Override + public void start(int totalTasks) { + } + + @Override + public void beginTask(String title, int totalWork) { + } + + @Override + public void update(int completed) { + } + + @Override + public void endTask() { + } + + @Override + public boolean isCancelled() { + return cancelled; + } + } + + @Test + public void testCancelInProcess() throws Exception { + CancelledTestMonitor m = new CancelledTestMonitor(); + CancellableDigestOutputStream out = new CancellableDigestOutputStream(m, + NullOutputStream.INSTANCE); + + byte[] KB = new byte[1024]; + int triggerCancelWriteCnt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK + / KB.length; + for (int i = 0; i < triggerCancelWriteCnt + 1; i++) { + out.write(KB); + } + assertTrue(out.length() > BYTES_TO_WRITE_BEFORE_CANCEL_CHECK); + m.setCancelled(true); + + for (int i = 0; i < triggerCancelWriteCnt - 1; i++) { + out.write(KB); + } + + long lastLength = out.length(); + assertThrows(InterruptedIOException.class, () -> { + out.write(1); + }); + assertEquals(lastLength, out.length()); + + assertThrows(InterruptedIOException.class, () -> { + out.write(new byte[1]); + }); + assertEquals(lastLength, out.length()); + } + + @Test + public void testTriggerCheckAfterSingleBytes() throws Exception { + CancelledTestMonitor m = new CancelledTestMonitor(); + CancellableDigestOutputStream out = new CancellableDigestOutputStream(m, + NullOutputStream.INSTANCE); + + byte[] bytes = new byte[BYTES_TO_WRITE_BEFORE_CANCEL_CHECK + 1]; + m.setCancelled(true); + + assertThrows(InterruptedIOException.class, () -> { + out.write(bytes); + }); + assertEquals(BYTES_TO_WRITE_BEFORE_CANCEL_CHECK, out.length()); + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStream.java new file mode 100644 index 000000000..ca2095fee --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/io/CancellableDigestOutputStream.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2022, Tencent. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Distribution License v. 1.0 which is available at + * https://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +package org.eclipse.jgit.internal.storage.io; + +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ProgressMonitor; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.security.MessageDigest; + +/** + * An OutputStream that keeps a digest and checks every N bytes for + * cancellation. + */ +public class CancellableDigestOutputStream extends OutputStream { + + /** The OutputStream checks every this value for cancellation **/ + public static final int BYTES_TO_WRITE_BEFORE_CANCEL_CHECK = 128 * 1024; + + private final ProgressMonitor writeMonitor; + + private final OutputStream out; + + private final MessageDigest md = Constants.newMessageDigest(); + + private long count; + + private long checkCancelAt; + + /** + * Initialize a CancellableDigestOutputStream. + * + * @param writeMonitor + * monitor to update on output progress and check cancel. + * @param out + * target stream to receive all contents. + */ + public CancellableDigestOutputStream(ProgressMonitor writeMonitor, + OutputStream out) { + this.writeMonitor = writeMonitor; + this.out = out; + this.checkCancelAt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; + } + + /** + * Get the monitor which is used to update on output progress and check + * cancel. + * + * @return the monitor + */ + public final ProgressMonitor getWriteMonitor() { + return writeMonitor; + } + + /** + * Obtain the current SHA-1 digest. + * + * @return SHA-1 digest + */ + public final byte[] getDigest() { + return md.digest(); + } + + /** + * Get total number of bytes written since stream start. + * + * @return total number of bytes written since stream start. + */ + public final long length() { + return count; + } + + /** {@inheritDoc} */ + @Override + public final void write(int b) throws IOException { + if (checkCancelAt <= count) { + if (writeMonitor.isCancelled()) { + throw new InterruptedIOException(); + } + checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; + } + + out.write(b); + md.update((byte) b); + count++; + } + + /** {@inheritDoc} */ + @Override + public final void write(byte[] b, int off, int len) throws IOException { + while (0 < len) { + if (checkCancelAt <= count) { + if (writeMonitor.isCancelled()) { + throw new InterruptedIOException(); + } + checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; + } + + int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK); + out.write(b, off, n); + md.update(b, off, n); + count += n; + + off += n; + len -= n; + } + } + + /** {@inheritDoc} */ + @Override + public void flush() throws IOException { + out.flush(); + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index 7104b9453..2d0fe28da 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -17,10 +17,8 @@ import java.io.IOException; import java.io.OutputStream; -import java.security.MessageDigest; -import org.eclipse.jgit.internal.JGitText; -import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.internal.storage.io.CancellableDigestOutputStream; import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.util.NB; @@ -28,25 +26,14 @@ * Custom output stream to support * {@link org.eclipse.jgit.internal.storage.pack.PackWriter}. */ -public final class PackOutputStream extends OutputStream { - private static final int BYTES_TO_WRITE_BEFORE_CANCEL_CHECK = 128 * 1024; - - private final ProgressMonitor writeMonitor; - - private final OutputStream out; +public final class PackOutputStream extends CancellableDigestOutputStream { private final PackWriter packWriter; - private final MessageDigest md = Constants.newMessageDigest(); - - private long count; - private final byte[] headerBuffer = new byte[32]; private final byte[] copyBuffer = new byte[64 << 10]; - private long checkCancelAt; - private boolean ofsDelta; /** @@ -66,48 +53,8 @@ public final class PackOutputStream extends OutputStream { */ public PackOutputStream(final ProgressMonitor writeMonitor, final OutputStream out, final PackWriter pw) { - this.writeMonitor = writeMonitor; - this.out = out; + super(writeMonitor, out); this.packWriter = pw; - this.checkCancelAt = BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; - } - - /** {@inheritDoc} */ - @Override - public final void write(int b) throws IOException { - count++; - out.write(b); - md.update((byte) b); - } - - /** {@inheritDoc} */ - @Override - public final void write(byte[] b, int off, int len) - throws IOException { - while (0 < len) { - final int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK); - count += n; - - if (checkCancelAt <= count) { - if (writeMonitor.isCancelled()) { - throw new IOException( - JGitText.get().packingCancelledDuringObjectsWriting); - } - checkCancelAt = count + BYTES_TO_WRITE_BEFORE_CANCEL_CHECK; - } - - out.write(b, off, n); - md.update(b, off, n); - - off += n; - len -= n; - } - } - - /** {@inheritDoc} */ - @Override - public void flush() throws IOException { - out.flush(); } final void writeFileHeader(int version, long objectCount) @@ -160,7 +107,7 @@ public final void writeHeader(ObjectToPack otp, long rawLength) ObjectToPack b = otp.getDeltaBase(); if (b != null && (b.isWritten() & ofsDelta)) { // Non-short-circuit logic is intentional int n = objectHeader(rawLength, OBJ_OFS_DELTA, headerBuffer); - n = ofsDelta(count - b.getOffset(), headerBuffer, n); + n = ofsDelta(length() - b.getOffset(), headerBuffer, n); write(headerBuffer, 0, n); } else if (otp.isDeltaRepresentation()) { int n = objectHeader(rawLength, OBJ_REF_DELTA, headerBuffer); @@ -209,20 +156,6 @@ public final byte[] getCopyBuffer() { } void endObject() { - writeMonitor.update(1); - } - - /** - * Get total number of bytes written since stream start. - * - * @return total number of bytes written since stream start. - */ - public final long length() { - return count; - } - - /** @return obtain the current SHA-1 digest. */ - final byte[] getDigest() { - return md.digest(); + getWriteMonitor().update(1); } }