client side smart HTTP

During fetch over http:// clients now try to take advantage of
the info/refs?service=git-upload-pack URL to determine if the
remote side will support a standard upload-pack command stream.
If so each block of 32 have lines is sent in one POST request,
prefixed by all of the 'want' lines and any previously discovered
common bases as 'have' lines.

During push over http:// clients now try to take advantage of
the info/refs?service=git-receive-pack URL to determine if the
remote side will support a standard receive-pack command stream.
If so, commands are sent along with their pack in a single HTTP
POST request.

Bug: 291002
Change-Id: I8c69b16ac15c442e1a4c3bd60b4ea1a47882b851
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
This commit is contained in:
Shawn O. Pearce 2010-01-06 10:21:05 -08:00
parent 2e5214462e
commit 8c836c6f21
5 changed files with 606 additions and 55 deletions

View File

@ -1,6 +1,6 @@
/*
* Copyright (C) 2009, Constantine Plotnikov <constantine.plotnikov@gmail.com>
* Copyright (C) 2008-2009, Google Inc.
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@ -110,6 +110,9 @@ abstract class BasePackConnection extends BaseConnection {
/** Send {@link PacketLineOut#end()} before closing {@link #out}? */
protected boolean outNeedsEnd;
/** True if this is a stateless RPC connection. */
protected boolean statelessRPC;
/** Capability tokens advertised by the remote side. */
private final Set<String> remoteCapablities = new HashSet<String>();
@ -117,7 +120,7 @@ abstract class BasePackConnection extends BaseConnection {
protected final Set<ObjectId> additionalHaves = new HashSet<ObjectId>();
BasePackConnection(final PackTransport packTransport) {
transport = (Transport)packTransport;
transport = (Transport) packTransport;
local = transport.local;
uri = transport.uri;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (C) 2008-2009, Google Inc.
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@ -51,9 +51,11 @@
import java.util.Date;
import java.util.Set;
import org.eclipse.jgit.errors.PackProtocolException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.MutableObjectId;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.PackLock;
@ -68,6 +70,8 @@
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.revwalk.filter.CommitTimeRevFilter;
import org.eclipse.jgit.revwalk.filter.RevFilter;
import org.eclipse.jgit.transport.PacketLineIn.AckNackResult;
import org.eclipse.jgit.util.TemporaryBuffer;
/**
* Fetch implementation using the native Git pack transfer service.
@ -81,6 +85,11 @@
* easily wrapped up into a local process pipe, anonymous TCP socket, or a
* command executed through an SSH tunnel.
* <p>
* If {@link BasePackConnection#statelessRPC} is {@code true}, this connection
* can be tunneled over a request-response style RPC system like HTTP. The RPC
* call boundary is determined by this class switching from writing to the
* OutputStream to reading from the InputStream.
* <p>
* Concrete implementations should just call
* {@link #init(java.io.InputStream, java.io.OutputStream)} and
* {@link #readAdvertisedRefs()} methods in constructor or before any use. They
@ -140,6 +149,9 @@ static enum MultiAck {
/** Marks a commit known to both sides of the connection. */
final RevFlag COMMON;
/** Like {@link #COMMON} but means its also in {@link #pckState}. */
private final RevFlag STATE;
/** Marks a commit listed in the advertised refs. */
final RevFlag ADVERTISED;
@ -157,6 +169,11 @@ static enum MultiAck {
private PackLock packLock;
/** RPC state, if {@link BasePackConnection#statelessRPC} is true. */
private TemporaryBuffer.Heap state;
private PacketLineOut pckState;
BasePackFetchConnection(final PackTransport packTransport) {
super(packTransport);
@ -169,6 +186,7 @@ static enum MultiAck {
reachableCommits = new RevCommitList<RevCommit>();
REACHABLE = walk.newFlag("REACHABLE");
COMMON = walk.newFlag("COMMON");
STATE = walk.newFlag("STATE");
ADVERTISED = walk.newFlag("ADVERTISED");
walk.carry(COMMON);
@ -222,11 +240,18 @@ protected void doFetch(final ProgressMonitor monitor,
markRefsAdvertised();
markReachable(have, maxTimeWanted(want));
if (statelessRPC) {
state = new TemporaryBuffer.Heap(Integer.MAX_VALUE);
pckState = new PacketLineOut(state);
}
if (sendWants(want)) {
negotiate(monitor);
walk.dispose();
reachableCommits = null;
state = null;
pckState = null;
receivePack(monitor);
}
@ -307,6 +332,7 @@ private void markReachable(final Set<ObjectId> have, final int maxTime)
}
private boolean sendWants(final Collection<Ref> want) throws IOException {
final PacketLineOut p = statelessRPC ? pckState : pckOut;
boolean first = true;
for (final Ref r : want) {
try {
@ -329,14 +355,16 @@ private boolean sendWants(final Collection<Ref> want) throws IOException {
first = false;
}
line.append('\n');
pckOut.writeString(line.toString());
p.writeString(line.toString());
}
pckOut.end();
if (first)
return false;
p.end();
outNeedsEnd = false;
return !first;
return true;
}
private String enableCapabilities() {
private String enableCapabilities() throws TransportException {
final StringBuilder line = new StringBuilder();
if (includeTags)
includeTags = wantCapability(line, OPTION_INCLUDE_TAG);
@ -356,6 +384,16 @@ else if (wantCapability(line, OPTION_MULTI_ACK))
sideband = true;
else if (wantCapability(line, OPTION_SIDE_BAND))
sideband = true;
if (statelessRPC && multiAck != MultiAck.DETAILED) {
// Our stateless RPC implementation relies upon the detailed
// ACK status to tell us common objects for reuse in future
// requests. If its not enabled, we can't talk to the peer.
//
throw new PackProtocolException(uri, "stateless RPC requires "
+ OPTION_MULTI_ACK_DETAILED + " to be enabled");
}
return line.toString();
}
@ -368,6 +406,9 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
boolean receivedContinue = false;
boolean receivedAck = false;
if (statelessRPC)
state.writeTo(out, null);
negotiateBegin();
SEND_HAVES: for (;;) {
final RevCommit c = walk.next();
@ -392,7 +433,7 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
pckOut.end();
resultsPending++; // Each end will cause a result to come back.
if (havesSent == 32) {
if (havesSent == 32 && !statelessRPC) {
// On the first block we race ahead and try to send
// more of the second block while waiting for the
// remote to respond to our first block request.
@ -402,9 +443,7 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
}
READ_RESULT: for (;;) {
final PacketLineIn.AckNackResult anr;
anr = pckIn.readACK(ackId);
final AckNackResult anr = pckIn.readACK(ackId);
switch (anr) {
case NAK:
// More have lines are necessary to compute the
@ -421,6 +460,8 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
multiAck = MultiAck.OFF;
resultsPending = 0;
receivedAck = true;
if (statelessRPC)
state.writeTo(out, null);
break SEND_HAVES;
case ACK_CONTINUE:
@ -431,7 +472,7 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
// we need to continue to talk about other parts of
// our local history.
//
markCommon(walk.parseAny(ackId));
markCommon(walk.parseAny(ackId), anr);
receivedAck = true;
receivedContinue = true;
havesSinceLastContinue = 0;
@ -442,13 +483,16 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
throw new CancelledException();
}
if (statelessRPC)
state.writeTo(out, null);
if (receivedContinue && havesSinceLastContinue > MAX_HAVES) {
// Our history must be really different from the remote's.
// We just sent a whole slew of have lines, and it did not
// recognize any of them. Avoid sending our entire history
// to them by giving up early.
//
break;
break SEND_HAVES;
}
}
@ -456,6 +500,11 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
//
if (monitor.isCancelled())
throw new CancelledException();
// When statelessRPC is true we should always leave SEND_HAVES
// loop above while in the middle of a request. This allows us
// to just write done immediately.
//
pckOut.writeString("done\n");
pckOut.flush();
@ -469,11 +518,8 @@ private void negotiate(final ProgressMonitor monitor) throws IOException,
}
READ_RESULT: while (resultsPending > 0 || multiAck != MultiAck.OFF) {
final PacketLineIn.AckNackResult anr;
anr = pckIn.readACK(ackId);
final AckNackResult anr = pckIn.readACK(ackId);
resultsPending--;
switch (anr) {
case NAK:
// A NAK is a response to an end we queued earlier
@ -543,7 +589,18 @@ private void markAdvertised(final AnyObjectId id) {
}
}
private void markCommon(final RevObject obj) {
private void markCommon(final RevObject obj, final AckNackResult anr)
throws IOException {
if (statelessRPC && anr == AckNackResult.ACK_COMMON && !obj.has(STATE)) {
StringBuilder s;
s = new StringBuilder(6 + Constants.OBJECT_ID_STRING_LENGTH);
s.append("have "); //$NON-NLS-1$
s.append(obj.name());
s.append('\n');
pckState.writeString(s.toString());
obj.add(STATE);
}
obj.add(COMMON);
if (obj instanceof RevCommit)
((RevCommit) obj).carry(COMMON);

View File

@ -1,4 +1,5 @@
/*
* Copyright (C) 2009-2010, Google Inc.
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
*
@ -43,10 +44,22 @@
package org.eclipse.jgit.transport;
import static org.eclipse.jgit.util.HttpSupport.ENCODING_GZIP;
import static org.eclipse.jgit.util.HttpSupport.HDR_ACCEPT;
import static org.eclipse.jgit.util.HttpSupport.HDR_ACCEPT_ENCODING;
import static org.eclipse.jgit.util.HttpSupport.HDR_CONTENT_ENCODING;
import static org.eclipse.jgit.util.HttpSupport.HDR_CONTENT_TYPE;
import static org.eclipse.jgit.util.HttpSupport.HDR_PRAGMA;
import static org.eclipse.jgit.util.HttpSupport.HDR_USER_AGENT;
import static org.eclipse.jgit.util.HttpSupport.METHOD_POST;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.Proxy;
@ -55,27 +68,53 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.PackProtocolException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.Config.SectionParser;
import org.eclipse.jgit.util.HttpSupport;
import org.eclipse.jgit.util.IO;
import org.eclipse.jgit.util.RawParseUtils;
import org.eclipse.jgit.util.TemporaryBuffer;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.eclipse.jgit.util.io.UnionInputStream;
/**
* Transport over the non-Git aware HTTP and FTP protocol.
* Transport over HTTP and FTP protocols.
* <p>
* The HTTP transport does not require any specialized Git support on the remote
* (server side) repository. Object files are retrieved directly through
* standard HTTP GET requests, making it easy to serve a Git repository through
* If the transport is using HTTP and the remote HTTP service is Git-aware
* (speaks the "smart-http protocol") this client will automatically take
* advantage of the additional Git-specific HTTP extensions. If the remote
* service does not support these extensions, the client will degrade to direct
* file fetching.
* <p>
* If the remote (server side) repository does not have the specialized Git
* support, object files are retrieved directly through standard HTTP GET (or
* binary FTP GET) requests. This make it easy to serve a Git repository through
* a standard web host provider that does not offer specific support for Git.
*
* @see WalkFetchConnection
*/
public class TransportHttp extends HttpTransport implements WalkTransport {
public class TransportHttp extends HttpTransport implements WalkTransport,
PackTransport {
private static final String SVC_UPLOAD_PACK = "git-upload-pack";
private static final String SVC_RECEIVE_PACK = "git-receive-pack";
private static final String userAgent = computeUserAgent();
static boolean canHandle(final URIish uri) {
if (!uri.isRemote())
return false;
@ -83,10 +122,37 @@ static boolean canHandle(final URIish uri) {
return "http".equals(s) || "https".equals(s) || "ftp".equals(s);
}
private static String computeUserAgent() {
String version;
final Package pkg = TransportHttp.class.getPackage();
if (pkg != null && pkg.getImplementationVersion() != null) {
version = pkg.getImplementationVersion();
} else {
version = "unknown"; //$NON-NLS-1$
}
return "JGit/" + version; //$NON-NLS-1$
}
private static final Config.SectionParser<HttpConfig> HTTP_KEY = new SectionParser<HttpConfig>() {
public HttpConfig parse(final Config cfg) {
return new HttpConfig(cfg);
}
};
private static class HttpConfig {
final int postBuffer;
HttpConfig(final Config rc) {
postBuffer = rc.getInt("http", "postbuffer", 1 * 1024 * 1024);
}
}
private final URL baseUrl;
private final URL objectsUrl;
private final HttpConfig http;
private final ProxySelector proxySelector;
TransportHttp(final Repository local, final URIish uri)
@ -101,22 +167,75 @@ static boolean canHandle(final URIish uri) {
} catch (MalformedURLException e) {
throw new NotSupportedException("Invalid URL " + uri, e);
}
http = local.getConfig().get(HTTP_KEY);
proxySelector = ProxySelector.getDefault();
}
@Override
public FetchConnection openFetch() throws TransportException {
final HttpObjectDB c = new HttpObjectDB(objectsUrl);
final WalkFetchConnection r = new WalkFetchConnection(this, c);
r.available(c.readAdvertisedRefs());
return r;
public FetchConnection openFetch() throws TransportException,
NotSupportedException {
final String service = SVC_UPLOAD_PACK;
try {
final HttpURLConnection c = connect(service);
final InputStream in = openInputStream(c);
try {
if (isSmartHttp(c, service)) {
readSmartHeaders(in, service);
return new SmartHttpFetchConnection(in);
} else {
// Assume this server doesn't support smart HTTP fetch
// and fall back on dumb object walking.
//
HttpObjectDB d = new HttpObjectDB(objectsUrl);
WalkFetchConnection wfc = new WalkFetchConnection(this, d);
BufferedReader br = new BufferedReader(
new InputStreamReader(in, Constants.CHARSET));
try {
wfc.available(d.readAdvertisedImpl(br));
} finally {
br.close();
}
return wfc;
}
} finally {
in.close();
}
} catch (NotSupportedException err) {
throw err;
} catch (TransportException err) {
throw err;
} catch (IOException err) {
throw new TransportException(uri, "error reading info/refs", err);
}
}
@Override
public PushConnection openPush() throws NotSupportedException,
TransportException {
final String s = getURI().getScheme();
throw new NotSupportedException("Push not supported over " + s + ".");
final String service = SVC_RECEIVE_PACK;
try {
final HttpURLConnection c = connect(service);
final InputStream in = openInputStream(c);
try {
if (isSmartHttp(c, service)) {
readSmartHeaders(in, service);
return new SmartHttpPushConnection(in);
} else {
final String msg = "remote does not support smart HTTP push";
throw new NotSupportedException(msg);
}
} finally {
in.close();
}
} catch (NotSupportedException err) {
throw err;
} catch (TransportException err) {
throw err;
} catch (IOException err) {
throw new TransportException(uri, "error reading info/refs", err);
}
}
@Override
@ -124,6 +243,112 @@ public void close() {
// No explicit connections are maintained.
}
private HttpURLConnection connect(final String service)
throws TransportException, NotSupportedException {
final URL u;
try {
final StringBuilder b = new StringBuilder();
b.append(baseUrl);
if (b.charAt(b.length() - 1) != '/')
b.append('/');
b.append(Constants.INFO_REFS);
b.append(b.indexOf("?") < 0 ? '?' : '&');
b.append("service=");
b.append(service);
u = new URL(b.toString());
} catch (MalformedURLException e) {
throw new NotSupportedException("Invalid URL " + uri, e);
}
try {
final HttpURLConnection conn = httpOpen(u);
String expType = "application/x-" + service + "-advertisement";
conn.setRequestProperty(HDR_ACCEPT, expType + ", */*");
final int status = HttpSupport.response(conn);
switch (status) {
case HttpURLConnection.HTTP_OK:
return conn;
case HttpURLConnection.HTTP_NOT_FOUND:
throw new NoRemoteRepositoryException(uri, u + " not found");
case HttpURLConnection.HTTP_FORBIDDEN:
throw new TransportException(uri, service + " not permitted");
default:
String err = status + " " + conn.getResponseMessage();
throw new TransportException(uri, err);
}
} catch (NotSupportedException e) {
throw e;
} catch (TransportException e) {
throw e;
} catch (IOException e) {
throw new TransportException(uri, "cannot open " + service, e);
}
}
final HttpURLConnection httpOpen(final URL u) throws IOException {
final Proxy proxy = HttpSupport.proxyFor(proxySelector, u);
HttpURLConnection conn = (HttpURLConnection) u.openConnection(proxy);
conn.setRequestProperty(HDR_ACCEPT_ENCODING, ENCODING_GZIP);
conn.setRequestProperty(HDR_PRAGMA, "no-cache");//$NON-NLS-1$
conn.setRequestProperty(HDR_USER_AGENT, userAgent);
return conn;
}
final InputStream openInputStream(HttpURLConnection conn)
throws IOException {
InputStream input = conn.getInputStream();
if (ENCODING_GZIP.equals(conn.getHeaderField(HDR_CONTENT_ENCODING)))
input = new GZIPInputStream(input);
return input;
}
IOException wrongContentType(String expType, String actType) {
final String why = "expected Content-Type " + expType
+ "; received Content-Type " + actType;
return new TransportException(uri, why);
}
private boolean isSmartHttp(final HttpURLConnection c, final String service) {
final String expType = "application/x-" + service + "-advertisement";
final String actType = c.getContentType();
return expType.equals(actType);
}
private void readSmartHeaders(final InputStream in, final String service)
throws IOException {
// A smart reply will have a '#' after the first 4 bytes, but
// a dumb reply cannot contain a '#' until after byte 41. Do a
// quick check to make sure its a smart reply before we parse
// as a pkt-line stream.
//
final byte[] magic = new byte[5];
IO.readFully(in, magic, 0, magic.length);
if (magic[4] != '#') {
throw new TransportException(uri, "expected pkt-line with"
+ " '# service=', got '" + RawParseUtils.decode(magic)
+ "'");
}
final PacketLineIn pckIn = new PacketLineIn(new UnionInputStream(
new ByteArrayInputStream(magic), in));
final String exp = "# service=" + service;
final String act = pckIn.readString();
if (!exp.equals(act)) {
throw new TransportException(uri, "expected '" + exp + "', got '"
+ act + "'");
}
while (pckIn.readString() != PacketLineIn.END) {
// for now, ignore the remaining header lines
}
}
class HttpObjectDB extends WalkRemoteObjectDatabase {
private final URL objectsUrl;
@ -186,13 +411,10 @@ Collection<String> getPackNames() throws IOException {
FileStream open(final String path) throws IOException {
final URL base = objectsUrl;
final URL u = new URL(base, path);
final Proxy proxy = HttpSupport.proxyFor(proxySelector, u);
final HttpURLConnection c;
c = (HttpURLConnection) u.openConnection(proxy);
final HttpURLConnection c = httpOpen(u);
switch (HttpSupport.response(c)) {
case HttpURLConnection.HTTP_OK:
final InputStream in = c.getInputStream();
final InputStream in = openInputStream(c);
final int len = c.getContentLength();
return new FileStream(in, len);
case HttpURLConnection.HTTP_NOT_FOUND:
@ -204,26 +426,7 @@ FileStream open(final String path) throws IOException {
}
}
Map<String, Ref> readAdvertisedRefs() throws TransportException {
try {
final BufferedReader br = openReader(INFO_REFS);
try {
return readAdvertisedImpl(br);
} finally {
br.close();
}
} catch (IOException err) {
try {
throw new TransportException(new URL(objectsUrl, INFO_REFS)
+ ": cannot read available refs", err);
} catch (MalformedURLException mue) {
throw new TransportException(objectsUrl + INFO_REFS
+ ": cannot read available refs", err);
}
}
}
private Map<String, Ref> readAdvertisedImpl(final BufferedReader br)
Map<String, Ref> readAdvertisedImpl(final BufferedReader br)
throws IOException, PackProtocolException {
final TreeMap<String, Ref> avail = new TreeMap<String, Ref>();
for (;;) {
@ -279,4 +482,220 @@ void close() {
// We do not maintain persistent connections.
}
}
class SmartHttpFetchConnection extends BasePackFetchConnection {
SmartHttpFetchConnection(final InputStream advertisement)
throws TransportException {
super(TransportHttp.this);
statelessRPC = true;
init(advertisement, DisabledOutputStream.INSTANCE);
outNeedsEnd = false;
try {
readAdvertisedRefs();
} catch (IOException err) {
close();
throw new TransportException(uri, "remote hung up", err);
}
}
@Override
protected void doFetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException {
final Service svc = new Service(SVC_UPLOAD_PACK);
init(svc.in, svc.out);
super.doFetch(monitor, want, have);
}
}
class SmartHttpPushConnection extends BasePackPushConnection {
SmartHttpPushConnection(final InputStream advertisement)
throws TransportException {
super(TransportHttp.this);
statelessRPC = true;
init(advertisement, DisabledOutputStream.INSTANCE);
outNeedsEnd = false;
try {
readAdvertisedRefs();
} catch (IOException err) {
close();
throw new TransportException(uri, "remote hung up", err);
}
}
protected void doPush(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException {
final Service svc = new Service(SVC_RECEIVE_PACK);
init(svc.in, svc.out);
super.doPush(monitor, refUpdates);
}
}
/**
* State required to speak multiple HTTP requests with the remote.
* <p>
* A service wrapper provides a normal looking InputStream and OutputStream
* pair which are connected via HTTP to the named remote service. Writing to
* the OutputStream is buffered until either the buffer overflows, or
* reading from the InputStream occurs. If overflow occurs HTTP/1.1 and its
* chunked transfer encoding is used to stream the request data to the
* remote service. If the entire request fits in the memory buffer, the
* older HTTP/1.0 standard and a fixed content length is used instead.
* <p>
* It is an error to attempt to read without there being outstanding data
* ready for transmission on the OutputStream.
* <p>
* No state is preserved between write-read request pairs. The caller is
* responsible for replaying state vector information as part of the request
* data written to the OutputStream. Any session HTTP cookies may or may not
* be preserved between requests, it is left up to the JVM's implementation
* of the HTTP client.
*/
class Service {
private final String serviceName;
private final String requestType;
private final String responseType;
private final UnionInputStream httpIn;
final HttpInputStream in;
final HttpOutputStream out;
HttpURLConnection conn;
Service(final String serviceName) {
this.serviceName = serviceName;
this.requestType = "application/x-" + serviceName + "-request";
this.responseType = "application/x-" + serviceName + "-result";
this.httpIn = new UnionInputStream();
this.in = new HttpInputStream(httpIn);
this.out = new HttpOutputStream();
}
void openStream() throws IOException {
conn = httpOpen(new URL(baseUrl, serviceName));
conn.setRequestMethod(METHOD_POST);
conn.setInstanceFollowRedirects(false);
conn.setDoOutput(true);
conn.setRequestProperty(HDR_CONTENT_TYPE, requestType);
conn.setRequestProperty(HDR_ACCEPT, responseType);
}
void execute() throws IOException {
out.close();
if (conn == null) {
// Output hasn't started yet, because everything fit into
// our request buffer. Send with a Content-Length header.
//
if (out.length() == 0) {
throw new TransportException(uri, "Starting read stage"
+ " without written request data pending"
+ " is not supported");
}
// Try to compress the content, but only if that is smaller.
TemporaryBuffer buf = new TemporaryBuffer.Heap(http.postBuffer);
try {
GZIPOutputStream gzip = new GZIPOutputStream(buf);
out.writeTo(gzip, null);
gzip.close();
if (out.length() < buf.length())
buf = out;
} catch (IOException err) {
// Most likely caused by overflowing the buffer, meaning
// its larger if it were compressed. Don't compress.
buf = out;
}
openStream();
if (buf != out)
conn.setRequestProperty(HDR_CONTENT_ENCODING, ENCODING_GZIP);
conn.setFixedLengthStreamingMode((int) buf.length());
final OutputStream httpOut = conn.getOutputStream();
try {
buf.writeTo(httpOut, null);
} finally {
httpOut.close();
}
}
out.reset();
final int status = HttpSupport.response(conn);
if (status != HttpURLConnection.HTTP_OK) {
throw new TransportException(uri, status + " "
+ conn.getResponseMessage());
}
final String contentType = conn.getContentType();
if (!responseType.equals(contentType)) {
conn.getInputStream().close();
throw wrongContentType(responseType, contentType);
}
httpIn.add(openInputStream(conn));
conn = null;
}
class HttpOutputStream extends TemporaryBuffer {
HttpOutputStream() {
super(http.postBuffer);
}
@Override
protected OutputStream overflow() throws IOException {
openStream();
conn.setChunkedStreamingMode(0);
return conn.getOutputStream();
}
}
class HttpInputStream extends InputStream {
private final UnionInputStream src;
HttpInputStream(UnionInputStream httpIn) {
this.src = httpIn;
}
private InputStream self() throws IOException {
if (src.isEmpty()) {
// If we have no InputStreams available it means we must
// have written data previously to the service, but have
// not yet finished the HTTP request in order to get the
// response from the service. Ensure we get it now.
//
execute();
}
return src;
}
public int available() throws IOException {
return self().available();
}
public int read() throws IOException {
return self().read();
}
public int read(byte[] b, int off, int len) throws IOException {
return self().read(b, off, len);
}
public long skip(long n) throws IOException {
return self().skip(n);
}
public void close() throws IOException {
src.close();
}
}
}
}

View File

@ -65,6 +65,9 @@ public class HttpSupport {
/** The {@code Pragma} header. */
public static final String HDR_PRAGMA = "Pragma";
/** The {@code User-Agent} header. */
public static final String HDR_USER_AGENT = "User-Agent";
/** The {@code Date} header. */
public static final String HDR_DATE = "Date";
@ -83,6 +86,9 @@ public class HttpSupport {
/** The {@code If-Modified-Since} header. */
public static final String HDR_IF_MODIFIED_SINCE = "If-Modified-Since";
/** The {@code Accept} header. */
public static final String HDR_ACCEPT = "Accept";
/** The {@code Content-Type} header. */
public static final String HDR_CONTENT_TYPE = "Content-Type";

View File

@ -0,0 +1,66 @@
/*
* Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.util.io;
import java.io.IOException;
import java.io.OutputStream;
/** An OutputStream which always throws IllegalStateExeption during write. */
public final class DisabledOutputStream extends OutputStream {
/** The canonical instance which always throws IllegalStateException. */
public static final DisabledOutputStream INSTANCE = new DisabledOutputStream();
private DisabledOutputStream() {
// Do nothing, but we want to hide our constructor to prevent
// more than one instance from being created.
}
@Override
public void write(int b) throws IOException {
// We shouldn't be writing output at this stage, there
// is nobody listening to us.
//
throw new IllegalStateException("Writing not permitted");
}
}