From 2156aa894cefbabd322fc405138c306bb4e939cd Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Thu, 11 Feb 2010 18:10:45 -0800 Subject: [PATCH] Reduce multi-level buffered streams in transport code Some transports actually provide stream buffering on their own, without needing to be wrapped up inside of a BufferedInputStream in order to smooth out system calls to read or write. A great example of this is the JSch SSH client, or the Apache MINA SSHD server. Both use custom buffering to packetize the streams into the encrypted SSH channel, and wrapping them up inside of a BufferedInputStream or BufferedOutputStream is relatively pointless. Our SideBandOutputStream implementation also provides some fairly large buffering, equal to one complete side-band packet on the main data channel. Wrapping that inside of a BufferedOutputStream just to smooth out small writes from PackWriter causes extra data copies, and provides no advantage. We can save some memory and some CPU cycles by letting PackWriter dump directly into the SideBandOutputStream's internal buffer array. Instead we push the buffering streams down to be as close to the network socket (or operating system pipe) as possible. This allows us to smooth out the smaller reads/writes from pkt-line messages during advertisement and negotation, but avoid copying altogether when the stream switches to larger writes over a side band channel. Change-Id: I2f6f16caee64783c77d3dd1b2a41b3cc0c64c159 Signed-off-by: Shawn O. Pearce --- .../eclipse/jgit/junit/TestRepository.java | 8 ++++--- .../jgit/lib/ConcurrentRepackTest.java | 10 ++++---- .../src/org/eclipse/jgit/lib/PackWriter.java | 12 +++------- .../jgit/transport/BasePackConnection.java | 24 ++++++++++++------- .../transport/BasePackPushConnection.java | 1 + .../eclipse/jgit/transport/BundleWriter.java | 12 ++++------ .../jgit/transport/TransportGitAnon.java | 20 ++++++++++++++-- .../jgit/transport/TransportLocal.java | 18 ++++++++++---- .../eclipse/jgit/transport/UploadPack.java | 7 ++---- .../jgit/transport/WalkPushConnection.java | 3 +++ 10 files changed, 71 insertions(+), 44 deletions(-) diff --git a/org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java b/org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java index e738276bd..59504aa78 100644 --- a/org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java +++ b/org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java @@ -43,9 +43,11 @@ package org.eclipse.jgit.junit; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collections; @@ -570,10 +572,10 @@ public void packAndPrune() throws Exception { pw.preparePack(all, Collections. emptySet()); final ObjectId name = pw.computeName(); - FileOutputStream out; + OutputStream out; final File pack = nameFor(odb, name, ".pack"); - out = new FileOutputStream(pack); + out = new BufferedOutputStream(new FileOutputStream(pack)); try { pw.writePack(out); } finally { @@ -582,7 +584,7 @@ public void packAndPrune() throws Exception { pack.setReadOnly(); final File idx = nameFor(odb, name, ".idx"); - out = new FileOutputStream(idx); + out = new BufferedOutputStream(new FileOutputStream(idx)); try { pw.writeIndex(out); } finally { diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java index 9e83aa0e1..69430ed33 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009, Google Inc. + * Copyright (C) 2009-2010, Google Inc. * Copyright (C) 2009, Robin Rosenberg * and other copyright owners as documented in the project's IP log. * @@ -44,9 +44,11 @@ package org.eclipse.jgit.lib; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import org.eclipse.jgit.errors.IncorrectObjectTypeException; @@ -203,16 +205,16 @@ private File[] pack(final Repository src, final RevObject... list) private static void write(final File[] files, final PackWriter pw) throws IOException { final long begin = files[0].getParentFile().lastModified(); - FileOutputStream out; + OutputStream out; - out = new FileOutputStream(files[0]); + out = new BufferedOutputStream(new FileOutputStream(files[0])); try { pw.writePack(out); } finally { out.close(); } - out = new FileOutputStream(files[1]); + out = new BufferedOutputStream(new FileOutputStream(files[1])); try { pw.writeIndex(out); } finally { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java index 6162deab7..b30e5f7c2 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * and other copyright owners as documented in the project's IP log. * @@ -44,7 +44,6 @@ package org.eclipse.jgit.lib; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.security.MessageDigest; @@ -97,7 +96,6 @@ * undefined behavior. *

*/ - public class PackWriter { /** * Title of {@link ProgressMonitor} task used during counting objects to @@ -578,9 +576,8 @@ private List sortByName() { *

* * @param packStream - * output stream of pack data. If the stream is not buffered it - * will be buffered by the writer. Caller is responsible for - * closing the stream. + * output stream of pack data. The stream should be buffered by + * the caller. The caller is responsible for closing the stream. * @throws IOException * an error occurred reading a local object's data to include in * the pack, or writing compressed object data to the output @@ -590,8 +587,6 @@ public void writePack(OutputStream packStream) throws IOException { if (reuseDeltas || reuseObjects) searchForReuse(); - if (!(packStream instanceof BufferedOutputStream)) - packStream = new BufferedOutputStream(packStream); out = new PackOutputStream(packStream); writeMonitor.beginTask(WRITING_OBJECTS_PROGRESS, getObjectsNumber()); @@ -599,7 +594,6 @@ public void writePack(OutputStream packStream) throws IOException { writeObjects(); writeChecksum(); - out.flush(); windowCursor.release(); writeMonitor.endTask(); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java index 0411c61fa..a2c572c60 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java @@ -1,5 +1,4 @@ /* - * Copyright (C) 2009, Constantine Plotnikov * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Robin Rosenberg @@ -47,8 +46,6 @@ package org.eclipse.jgit.transport; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -97,10 +94,10 @@ abstract class BasePackConnection extends BaseConnection { /** Timer to manage {@link #timeoutIn} and {@link #timeoutOut}. */ private InterruptTimer myTimer; - /** Buffered input stream reading from the remote. */ + /** Input stream reading from the remote. */ protected InputStream in; - /** Buffered output stream sending to the remote. */ + /** Output stream sending to the remote. */ protected OutputStream out; /** Packet line decoder around {@link #in}. */ @@ -127,6 +124,17 @@ abstract class BasePackConnection extends BaseConnection { uri = transport.uri; } + /** + * Configure this connection with the directional pipes. + * + * @param myIn + * input stream to receive data from the peer. Caller must ensure + * the input is buffered, otherwise read performance may suffer. + * @param myOut + * output stream to transmit data to the peer. Caller must ensure + * the output is buffered, otherwise write performance may + * suffer. + */ protected final void init(InputStream myIn, OutputStream myOut) { final int timeout = transport.getTimeout(); if (timeout > 0) { @@ -140,10 +148,8 @@ protected final void init(InputStream myIn, OutputStream myOut) { myOut = timeoutOut; } - in = myIn instanceof BufferedInputStream ? myIn - : new BufferedInputStream(myIn, IndexPack.BUFFER_SIZE); - out = myOut instanceof BufferedOutputStream ? myOut - : new BufferedOutputStream(myOut); + in = myIn; + out = myOut; pckIn = new PacketLineIn(in); pckOut = new PacketLineOut(out); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java index ba1170747..e10cefd3a 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java @@ -244,6 +244,7 @@ private void writePack(final Map refUpdates, writer.preparePack(newObjects, remoteObjects); final long start = System.currentTimeMillis(); writer.writePack(out); + out.flush(); packTransferTime = System.currentTimeMillis() - start; } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java index db1312ca3..7b0a5eec4 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,7 +43,6 @@ package org.eclipse.jgit.transport; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -155,18 +154,15 @@ public void assume(final RevCommit c) { * This method can only be called once per BundleWriter instance. * * @param os - * the stream the bundle is written to. If the stream is not - * buffered it will be buffered by the writer. Caller is - * responsible for closing the stream. + * the stream the bundle is written to. The stream should be + * buffered by the caller. The caller is responsible for closing + * the stream. * @throws IOException * an error occurred reading a local object's data to include in * the bundle, or writing compressed object data to the output * stream. */ public void writeBundle(OutputStream os) throws IOException { - if (!(os instanceof BufferedOutputStream)) - os = new BufferedOutputStream(os); - final HashSet inc = new HashSet(); final HashSet exc = new HashSet(); inc.addAll(include.values()); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java index a127ff50a..8a0b4357c 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java @@ -45,7 +45,11 @@ package org.eclipse.jgit.transport; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -136,7 +140,13 @@ class TcpFetchConnection extends BasePackFetchConnection { super(TransportGitAnon.this); sock = openConnection(); try { - init(sock.getInputStream(), sock.getOutputStream()); + InputStream sIn = sock.getInputStream(); + OutputStream sOut = sock.getOutputStream(); + + sIn = new BufferedInputStream(sIn); + sOut = new BufferedOutputStream(sOut); + + init(sIn, sOut); service("git-upload-pack", pckOut); } catch (IOException err) { close(); @@ -169,7 +179,13 @@ class TcpPushConnection extends BasePackPushConnection { super(TransportGitAnon.this); sock = openConnection(); try { - init(sock.getInputStream(), sock.getOutputStream()); + InputStream sIn = sock.getInputStream(); + OutputStream sOut = sock.getOutputStream(); + + sIn = new BufferedInputStream(sIn); + sOut = new BufferedOutputStream(sOut); + + init(sIn, sOut); service("git-receive-pack", pckOut); } catch (IOException err) { close(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java index a9bdcd809..b9b9dbd00 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java @@ -47,6 +47,8 @@ package org.eclipse.jgit.transport; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -259,8 +261,12 @@ class ForkLocalFetchConnection extends BasePackFetchConnection { errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream()); errorReaderThread.start(); - final InputStream upIn = uploadPack.getInputStream(); - final OutputStream upOut = uploadPack.getOutputStream(); + InputStream upIn = uploadPack.getInputStream(); + OutputStream upOut = uploadPack.getOutputStream(); + + upIn = new BufferedInputStream(upIn); + upOut = new BufferedOutputStream(upOut); + init(upIn, upOut); readAdvertisedRefs(); } @@ -385,8 +391,12 @@ class ForkLocalPushConnection extends BasePackPushConnection { errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream()); errorReaderThread.start(); - final InputStream rpIn = receivePack.getInputStream(); - final OutputStream rpOut = receivePack.getOutputStream(); + InputStream rpIn = receivePack.getInputStream(); + OutputStream rpOut = receivePack.getOutputStream(); + + rpIn = new BufferedInputStream(rpIn); + rpOut = new BufferedOutputStream(rpOut); + init(rpIn, rpOut); readAdvertisedRefs(); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java index 39c4243ba..3d5abd34b 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java @@ -583,12 +583,9 @@ private void sendPack() throws IOException { } } pw.writePack(packOut); + packOut.flush(); - if (sideband) { - packOut.flush(); + if (sideband) pckOut.end(); - } else { - rawOut.flush(); - } } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java index 88b7ca438..f977915bb 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java @@ -45,6 +45,7 @@ import static org.eclipse.jgit.transport.WalkRemoteObjectDatabase.ROOT_DIR; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -251,6 +252,7 @@ private void sendpack(final List updates, final String wt = "Put " + base.substring(0, 12); OutputStream os = dest.writeFile(pathPack, monitor, wt + "..pack"); try { + os = new BufferedOutputStream(os); pw.writePack(os); } finally { os.close(); @@ -258,6 +260,7 @@ private void sendpack(final List updates, os = dest.writeFile(pathIdx, monitor, wt + "..idx"); try { + os = new BufferedOutputStream(os); pw.writeIndex(os); } finally { os.close();