Allow users to show server messages while pushing

Allow users to provide their OutputStream (via Transport#
push(monitor, refUpdates, out)) so that server messages can be written
to it (in SideBandInputStream) while they're coming in.

CQ: 7065
Bug: 398404
Change-Id: I670782784b38702d52bca98203909aca0496d1c0
Signed-off-by: Andre Dietisheim <andre.dietisheim@gmail.com>
Signed-off-by: Chris Aniszczyk <zx@twitter.com>
Signed-off-by: Matthias Sohn <matthias.sohn@sap.com>
This commit is contained in:
André Dietisheim 2013-02-01 15:58:44 +01:00 committed by Matthias Sohn
parent 8fcde4b31b
commit a31920555f
13 changed files with 311 additions and 38 deletions

View File

@ -47,6 +47,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -180,4 +182,40 @@ public void testPush_CreateBranch() throws Exception {
+ "come back next year!\n", //
result.getMessages());
}
@Test
public void testPush_HookMessagesToOutputStream() throws Exception {
final TestRepository src = createTestRepository();
final RevBlob Q_txt = src.blob("new text");
final RevCommit Q = src.commit().add("Q", Q_txt).create();
final Repository db = src.getRepository();
final String dstName = Constants.R_HEADS + "new.branch";
Transport t;
PushResult result;
t = Transport.open(db, remoteURI);
OutputStream out = new ByteArrayOutputStream();
try {
final String srcExpr = Q.name();
final boolean forceUpdate = false;
final String localName = null;
final ObjectId oldId = null;
RemoteRefUpdate update = new RemoteRefUpdate(src.getRepository(),
srcExpr, dstName, forceUpdate, localName, oldId);
result = t.push(NullProgressMonitor.INSTANCE,
Collections.singleton(update), out);
} finally {
t.close();
}
String expectedMessage = "message line 1\n" //
+ "error: no soup for you!\n" //
+ "come back next year!\n";
assertEquals(expectedMessage, //
result.getMessages());
assertEquals(expectedMessage, out.toString());
}
}

View File

@ -48,6 +48,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -431,6 +432,12 @@ public void close() {
// nothing here
}
public void push(ProgressMonitor monitor,
Map<String, RemoteRefUpdate> refsToUpdate, OutputStream out)
throws TransportException {
push(monitor, refsToUpdate);
}
public void push(ProgressMonitor monitor,
Map<String, RemoteRefUpdate> refsToUpdate)
throws TransportException {

View File

@ -43,6 +43,7 @@
package org.eclipse.jgit.api;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@ -92,6 +93,8 @@ public class PushCommand extends
private boolean thin = Transport.DEFAULT_PUSH_THIN;
private OutputStream out;
/**
* @param repo
*/
@ -150,7 +153,7 @@ public Iterable<PushResult> call() throws GitAPIException,
.findRemoteRefUpdatesFor(refSpecs);
try {
PushResult result = transport.push(monitor, toPush);
PushResult result = transport.push(monitor, toPush, out);
pushResults.add(result);
} catch (TransportException e) {
@ -404,4 +407,16 @@ public PushCommand setForce(boolean force) {
this.force = force;
return this;
}
/**
* Sets the output stream to write sideband messages to
*
* @param out
* @return {@code this}
* @since 3.0
*/
public PushCommand setOutputStream(OutputStream out) {
this.out = out;
return this;
}
}

View File

@ -45,6 +45,7 @@
package org.eclipse.jgit.transport;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
@ -66,6 +67,12 @@ abstract class BaseFetchConnection extends BaseConnection implements
public final void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException {
fetch(monitor, want, have, null);
}
public final void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have,
OutputStream out) throws TransportException {
markStartedOperation();
doFetch(monitor, want, have);
}

View File

@ -47,6 +47,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
@ -59,13 +60,13 @@
import org.eclipse.jgit.internal.storage.file.PackLock;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Config.SectionParser;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.MutableObjectId;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Config.SectionParser;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevCommitList;
import org.eclipse.jgit.revwalk.RevFlag;
@ -265,8 +266,17 @@ public FetchConfig parse(final Config cfg) {
public final void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException {
fetch(monitor, want, have, null);
}
/**
* @since 3.0
*/
public final void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have,
OutputStream outputStream) throws TransportException {
markStartedOperation();
doFetch(monitor, want, have);
doFetch(monitor, want, have, outputStream);
}
public boolean didFetchIncludeTags() {
@ -298,12 +308,15 @@ public Collection<PackLock> getPackLocks() {
* additional objects to assume that already exist locally. This
* will be added to the set of objects reachable from the
* destination repository's references.
* @param outputStream
* ouputStream to write sideband messages to
* @throws TransportException
* if any exception occurs.
* @since 3.0
*/
protected void doFetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException {
final Collection<Ref> want, final Set<ObjectId> have,
OutputStream outputStream) throws TransportException {
try {
markRefsAdvertised();
markReachable(have, maxTimeWanted(want));
@ -321,7 +334,7 @@ protected void doFetch(final ProgressMonitor monitor,
state = null;
pckState = null;
receivePack(monitor);
receivePack(monitor, outputStream);
}
} catch (CancelledException ce) {
close();
@ -702,11 +715,13 @@ private void markCommon(final RevObject obj, final AckNackResult anr)
((RevCommit) obj).carry(COMMON);
}
private void receivePack(final ProgressMonitor monitor) throws IOException {
private void receivePack(final ProgressMonitor monitor,
OutputStream outputStream) throws IOException {
onReceivePack();
InputStream input = in;
if (sideband)
input = new SideBandInputStream(input, monitor, getMessageWriter());
input = new SideBandInputStream(input, monitor, getMessageWriter(),
outputStream);
ObjectInserter ins = local.newObjectInserter();
try {

View File

@ -45,6 +45,7 @@
package org.eclipse.jgit.transport;
import java.io.IOException;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashSet;
@ -138,8 +139,17 @@ public BasePackPushConnection(final PackTransport packTransport) {
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException {
push(monitor, refUpdates, null);
}
/**
* @since 3.0
*/
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates, OutputStream outputStream)
throws TransportException {
markStartedOperation();
doPush(monitor, refUpdates);
doPush(monitor, refUpdates, outputStream);
}
@Override
@ -172,14 +182,17 @@ protected TransportException noRepository() {
* progress monitor to receive status updates.
* @param refUpdates
* update commands to be applied to the remote repository.
* @param outputStream
* output stream to write sideband messages to
* @throws TransportException
* if any exception occurs.
* @since 3.0
*/
protected void doPush(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException {
final Map<String, RemoteRefUpdate> refUpdates,
OutputStream outputStream) throws TransportException {
try {
writeCommands(refUpdates.values(), monitor);
writeCommands(refUpdates.values(), monitor, outputStream);
if (writePack)
writePack(refUpdates, monitor);
if (sentCommand) {
@ -208,8 +221,8 @@ protected void doPush(final ProgressMonitor monitor,
}
private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
final ProgressMonitor monitor) throws IOException {
final String capabilities = enableCapabilities(monitor);
final ProgressMonitor monitor, OutputStream outputStream) throws IOException {
final String capabilities = enableCapabilities(monitor, outputStream);
for (final RemoteRefUpdate rru : refUpdates) {
if (!capableDeleteRefs && rru.isDelete()) {
rru.setStatus(Status.REJECTED_NODELETE);
@ -242,7 +255,8 @@ private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
outNeedsEnd = false;
}
private String enableCapabilities(final ProgressMonitor monitor) {
private String enableCapabilities(final ProgressMonitor monitor,
OutputStream outputStream) {
final StringBuilder line = new StringBuilder();
capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
@ -250,7 +264,8 @@ private String enableCapabilities(final ProgressMonitor monitor) {
capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K);
if (capableSideBand) {
in = new SideBandInputStream(in, monitor, getMessageWriter());
in = new SideBandInputStream(in, monitor, getMessageWriter(),
outputStream);
pckIn = new PacketLineIn(in);
}

View File

@ -46,6 +46,7 @@
package org.eclipse.jgit.transport;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
@ -111,6 +112,47 @@ public void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException;
/**
* Fetch objects we don't have but that are reachable from advertised refs.
* <p>
* Only one call per connection is allowed. Subsequent calls will result in
* {@link TransportException}.
* </p>
* <p>
* Implementations are free to use network connections as necessary to
* efficiently (for both client and server) transfer objects from the remote
* repository into this repository. When possible implementations should
* avoid replacing/overwriting/duplicating an object already available in
* the local destination repository. Locally available objects and packs
* should always be preferred over remotely available objects and packs.
* {@link Transport#isFetchThin()} should be honored if applicable.
* </p>
*
* @param monitor
* progress monitor to inform the end-user about the amount of
* work completed, or to indicate cancellation. Implementations
* should poll the monitor at regular intervals to look for
* cancellation requests from the user.
* @param want
* one or more refs advertised by this connection that the caller
* wants to store locally.
* @param have
* additional objects known to exist in the destination
* repository, especially if they aren't yet reachable by the ref
* database. Connections should take this set as an addition to
* what is reachable through all Refs, not in replace of it.
* @param out
* OutputStream to write sideband messages to
* @throws TransportException
* objects could not be copied due to a network failure,
* protocol error, or error on remote side, or connection was
* already used for fetch.
* @since 3.0
*/
public void fetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have,
OutputStream out) throws TransportException;
/**
* Did the last {@link #fetch(ProgressMonitor, Collection, Set)} get tags?
* <p>

View File

@ -44,6 +44,7 @@
package org.eclipse.jgit.transport;
import java.io.OutputStream;
import java.util.Map;
import org.eclipse.jgit.errors.TransportException;
@ -111,4 +112,50 @@ public interface PushConnection extends Connection {
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException;
/**
* Pushes to the remote repository basing on provided specification. This
* possibly result in update/creation/deletion of refs on remote repository
* and sending objects that remote repository need to have a consistent
* objects graph from new refs.
* <p>
* <p>
* Only one call per connection is allowed. Subsequent calls will result in
* {@link TransportException}.
* </p>
* <p>
* Implementation may use local repository to send a minimum set of objects
* needed by remote repository in efficient way.
* {@link Transport#isPushThin()} should be honored if applicable.
* refUpdates should be filled with information about status of each update.
* </p>
*
* @param monitor
* progress monitor to update the end-user about the amount of
* work completed, or to indicate cancellation. Implementors
* should poll the monitor at regular intervals to look for
* cancellation requests from the user.
* @param refUpdates
* map of remote refnames to remote refs update
* specifications/statuses. Can't be empty. This indicate what
* refs caller want to update on remote side. Only refs updates
* with {@link Status#NOT_ATTEMPTED} should passed.
* Implementation must ensure that and appropriate status with
* optional message should be set during call. No refUpdate with
* {@link Status#AWAITING_REPORT} or {@link Status#NOT_ATTEMPTED}
* can be leaved by implementation after return from this call.
* @param out
* output stream to write sideband messages to
* @throws TransportException
* objects could not be copied due to a network failure,
* critical protocol error, or error on remote side, or
* connection was already used for push - new connection must be
* created. Non-critical errors concerning only isolated refs
* should be placed in refUpdates.
* @since 3.0
*/
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates, OutputStream out)
throws TransportException;
}

View File

@ -44,6 +44,7 @@
package org.eclipse.jgit.transport;
import java.io.IOException;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
@ -64,7 +65,7 @@
/**
* Class performing push operation on remote repository.
*
* @see Transport#push(ProgressMonitor, Collection)
* @see Transport#push(ProgressMonitor, Collection, OutputStream)
*/
class PushProcess {
/** Task name for {@link ProgressMonitor} used during opening connection. */
@ -82,6 +83,9 @@ class PushProcess {
/** Revision walker for checking some updates properties. */
private final RevWalk walker;
/** an outputstream to write messages to */
private final OutputStream out;
/**
* Create process for specified transport and refs updates specification.
*
@ -90,13 +94,33 @@ class PushProcess {
* connection.
* @param toPush
* specification of refs updates (and local tracking branches).
*
* @throws TransportException
*/
PushProcess(final Transport transport,
final Collection<RemoteRefUpdate> toPush) throws TransportException {
this(transport, toPush, null);
}
/**
* Create process for specified transport and refs updates specification.
*
* @param transport
* transport between remote and local repository, used to create
* connection.
* @param toPush
* specification of refs updates (and local tracking branches).
* @param out
* OutputStream to write messages to
* @throws TransportException
*/
PushProcess(final Transport transport,
final Collection<RemoteRefUpdate> toPush, OutputStream out)
throws TransportException {
this.walker = new RevWalk(transport.local);
this.transport = transport;
this.toPush = new HashMap<String, RemoteRefUpdate>();
this.out = out;
for (final RemoteRefUpdate rru : toPush) {
if (this.toPush.put(rru.getRemoteName(), rru) != null)
throw new TransportException(MessageFormat.format(
@ -138,7 +162,7 @@ PushResult execute(final ProgressMonitor monitor)
if (transport.isDryRun())
modifyUpdatesForDryRun();
else if (!preprocessed.isEmpty())
connection.push(monitor, preprocessed);
connection.push(monitor, preprocessed, out);
} finally {
connection.close();
res.addMessages(connection.getMessages());

View File

@ -48,6 +48,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.text.MessageFormat;
import java.util.regex.Matcher;
@ -99,6 +100,8 @@ class SideBandInputStream extends InputStream {
private final Writer messages;
private final OutputStream out;
private String progressBuffer = ""; //$NON-NLS-1$
private String currentTask;
@ -112,12 +115,13 @@ class SideBandInputStream extends InputStream {
private int available;
SideBandInputStream(final InputStream in, final ProgressMonitor progress,
final Writer messageStream) {
final Writer messageStream, OutputStream outputStream) {
rawIn = in;
pckIn = new PacketLineIn(rawIn);
monitor = progress;
messages = messageStream;
currentTask = ""; //$NON-NLS-1$
out = outputStream;
}
@Override
@ -232,6 +236,8 @@ private void doProgressLine(final String msg) throws IOException {
}
messages.write(msg);
if (out != null)
out.write(msg.getBytes());
}
private void beginTask(final int totalWorkUnits) {

View File

@ -50,6 +50,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@ -1134,6 +1135,68 @@ public FetchResult fetch(final ProgressMonitor monitor,
* converted by {@link #findRemoteRefUpdatesFor(Collection)}. No
* more than 1 RemoteRefUpdate with the same remoteName is
* allowed. These objects are modified during this call.
* @param out
* output stream to write messages to
* @return information about results of remote refs updates, tracking refs
* updates and refs advertised by remote repository.
* @throws NotSupportedException
* this transport implementation does not support pushing
* objects.
* @throws TransportException
* the remote connection could not be established or object
* copying (if necessary) failed at I/O or protocol level or
* update specification was incorrect.
* @since 3.0
*/
public PushResult push(final ProgressMonitor monitor,
Collection<RemoteRefUpdate> toPush, OutputStream out)
throws NotSupportedException,
TransportException {
if (toPush == null || toPush.isEmpty()) {
// If the caller did not ask for anything use the defaults.
try {
toPush = findRemoteRefUpdatesFor(push);
} catch (final IOException e) {
throw new TransportException(MessageFormat.format(
JGitText.get().problemWithResolvingPushRefSpecsLocally, e.getMessage()), e);
}
if (toPush.isEmpty())
throw new TransportException(JGitText.get().nothingToPush);
}
final PushProcess pushProcess = new PushProcess(this, toPush, out);
return pushProcess.execute(monitor);
}
/**
* Push objects and refs from the local repository to the remote one.
* <p>
* This is a utility function providing standard push behavior. It updates
* remote refs and sends necessary objects according to remote ref update
* specification. After successful remote ref update, associated locally
* stored tracking branch is updated if set up accordingly. Detailed
* operation result is provided after execution.
* <p>
* For setting up remote ref update specification from ref spec, see helper
* method {@link #findRemoteRefUpdatesFor(Collection)}, predefined refspecs
* ({@link #REFSPEC_TAGS}, {@link #REFSPEC_PUSH_ALL}) or consider using
* directly {@link RemoteRefUpdate} for more possibilities.
* <p>
* When {@link #isDryRun()} is true, result of this operation is just
* estimation of real operation result, no real action is performed.
*
* @see RemoteRefUpdate
*
* @param monitor
* progress monitor to inform the user about our processing
* activity. Must not be null. Use {@link NullProgressMonitor} if
* progress updates are not interesting or necessary.
* @param toPush
* specification of refs to push. May be null or the empty
* collection to use the specifications from the RemoteConfig
* converted by {@link #findRemoteRefUpdatesFor(Collection)}. No
* more than 1 RemoteRefUpdate with the same remoteName is
* allowed. These objects are modified during this call.
*
* @return information about results of remote refs updates, tracking refs
* updates and refs advertised by remote repository.
* @throws NotSupportedException
@ -1147,19 +1210,7 @@ public FetchResult fetch(final ProgressMonitor monitor,
public PushResult push(final ProgressMonitor monitor,
Collection<RemoteRefUpdate> toPush) throws NotSupportedException,
TransportException {
if (toPush == null || toPush.isEmpty()) {
// If the caller did not ask for anything use the defaults.
try {
toPush = findRemoteRefUpdatesFor(push);
} catch (final IOException e) {
throw new TransportException(MessageFormat.format(
JGitText.get().problemWithResolvingPushRefSpecsLocally, e.getMessage()), e);
}
if (toPush.isEmpty())
throw new TransportException(JGitText.get().nothingToPush);
}
final PushProcess pushProcess = new PushProcess(this, toPush);
return pushProcess.execute(monitor);
return push(monitor, toPush, null);
}
/**

View File

@ -739,12 +739,12 @@ class SmartHttpFetchConnection extends BasePackFetchConnection {
@Override
protected void doFetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException {
final Collection<Ref> want, final Set<ObjectId> have,
final OutputStream outputStream) throws TransportException {
try {
svc = new MultiRequestService(SVC_UPLOAD_PACK);
init(svc.getInputStream(), svc.getOutputStream());
super.doFetch(monitor, want, have);
super.doFetch(monitor, want, have, outputStream);
} finally {
svc = null;
}
@ -768,11 +768,11 @@ class SmartHttpPushConnection extends BasePackPushConnection {
}
protected void doPush(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException {
final Map<String, RemoteRefUpdate> refUpdates,
OutputStream outputStream) throws TransportException {
final Service svc = new MultiRequestService(SVC_RECEIVE_PACK);
init(svc.getInputStream(), svc.getOutputStream());
super.doPush(monitor, refUpdates);
super.doPush(monitor, refUpdates, outputStream);
}
}

View File

@ -137,6 +137,12 @@ class WalkPushConnection extends BaseConnection implements PushConnection {
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException {
push(monitor, refUpdates, null);
}
public void push(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates, OutputStream out)
throws TransportException {
markStartedOperation();
packNames = null;
newRefs = new TreeMap<String, Ref>(getRefsMap());