Refactor TransportHttp for long-polling

Split Service into MultiRequestService (fetch, push) and
LongPollService (upcoming publish-subscribe).

Change-Id: Ice373d3dee63c395490d2707473ccf20a022e5cf
This commit is contained in:
Ian Wetherbee 2012-06-07 16:10:09 -07:00
parent fe1f1b8f8a
commit 2adc572628
1 changed files with 144 additions and 97 deletions

View File

@ -717,7 +717,7 @@ void close() {
} }
class SmartHttpFetchConnection extends BasePackFetchConnection { class SmartHttpFetchConnection extends BasePackFetchConnection {
private Service svc; private MultiRequestService svc;
SmartHttpFetchConnection(final InputStream advertisement) SmartHttpFetchConnection(final InputStream advertisement)
throws TransportException { throws TransportException {
@ -734,8 +734,8 @@ protected void doFetch(final ProgressMonitor monitor,
final Collection<Ref> want, final Set<ObjectId> have) final Collection<Ref> want, final Set<ObjectId> have)
throws TransportException { throws TransportException {
try { try {
svc = new Service(SVC_UPLOAD_PACK); svc = new MultiRequestService(SVC_UPLOAD_PACK);
init(svc.in, svc.out); init(svc.getInputStream(), svc.getOutputStream());
super.doFetch(monitor, want, have); super.doFetch(monitor, want, have);
} finally { } finally {
svc = null; svc = null;
@ -762,57 +762,36 @@ class SmartHttpPushConnection extends BasePackPushConnection {
protected void doPush(final ProgressMonitor monitor, protected void doPush(final ProgressMonitor monitor,
final Map<String, RemoteRefUpdate> refUpdates) final Map<String, RemoteRefUpdate> refUpdates)
throws TransportException { throws TransportException {
final Service svc = new Service(SVC_RECEIVE_PACK); final Service svc = new MultiRequestService(SVC_RECEIVE_PACK);
init(svc.in, svc.out); init(svc.getInputStream(), svc.getOutputStream());
super.doPush(monitor, refUpdates); super.doPush(monitor, refUpdates);
} }
} }
/** /** Basic service for sending and receiving HTTP requests. */
* State required to speak multiple HTTP requests with the remote. abstract class Service {
* <p> protected final String serviceName;
* A service wrapper provides a normal looking InputStream and OutputStream
* pair which are connected via HTTP to the named remote service. Writing to
* the OutputStream is buffered until either the buffer overflows, or
* reading from the InputStream occurs. If overflow occurs HTTP/1.1 and its
* chunked transfer encoding is used to stream the request data to the
* remote service. If the entire request fits in the memory buffer, the
* older HTTP/1.0 standard and a fixed content length is used instead.
* <p>
* It is an error to attempt to read without there being outstanding data
* ready for transmission on the OutputStream.
* <p>
* No state is preserved between write-read request pairs. The caller is
* responsible for replaying state vector information as part of the request
* data written to the OutputStream. Any session HTTP cookies may or may not
* be preserved between requests, it is left up to the JVM's implementation
* of the HTTP client.
*/
class Service {
private final String serviceName;
private final String requestType; protected final String requestType;
private final String responseType; protected final String responseType;
private final HttpExecuteStream execute; protected HttpURLConnection conn;
boolean finalRequest; protected HttpOutputStream out;
protected final HttpExecuteStream execute;
final UnionInputStream in; final UnionInputStream in;
final HttpOutputStream out; Service(String serviceName) {
HttpURLConnection conn;
Service(final String serviceName) {
this.serviceName = serviceName; this.serviceName = serviceName;
this.requestType = "application/x-" + serviceName + "-request"; //$NON-NLS-1$ //$NON-NLS-2$ this.requestType = "application/x-" + serviceName + "-request"; //$NON-NLS-1$ //$NON-NLS-2$
this.responseType = "application/x-" + serviceName + "-result"; //$NON-NLS-1$ //$NON-NLS-2$ this.responseType = "application/x-" + serviceName + "-result"; //$NON-NLS-1$ //$NON-NLS-2$
this.out = new HttpOutputStream();
this.execute = new HttpExecuteStream(); this.execute = new HttpExecuteStream();
this.in = new UnionInputStream(execute); this.in = new UnionInputStream(execute);
this.out = new HttpOutputStream();
} }
void openStream() throws IOException { void openStream() throws IOException {
@ -823,22 +802,7 @@ void openStream() throws IOException {
conn.setRequestProperty(HDR_ACCEPT, responseType); conn.setRequestProperty(HDR_ACCEPT, responseType);
} }
void execute() throws IOException { void sendRequest() throws IOException {
out.close();
if (conn == null) {
if (out.length() == 0) {
// Request output hasn't started yet, but more data is being
// requested. If there is no request data buffered and the
// final request was already sent, do nothing to ensure the
// caller is shown EOF on the InputStream; otherwise an
// programming error has occurred within this module.
if (finalRequest)
return;
throw new TransportException(uri,
JGitText.get().startingReadStageWithoutWrittenRequestDataPendingIsNotSupported);
}
// Try to compress the content, but only if that is smaller. // Try to compress the content, but only if that is smaller.
TemporaryBuffer buf = new TemporaryBuffer.Heap(http.postBuffer); TemporaryBuffer buf = new TemporaryBuffer.Heap(http.postBuffer);
try { try {
@ -865,8 +829,7 @@ void execute() throws IOException {
} }
} }
out.reset(); void openResponse() throws IOException {
final int status = HttpSupport.response(conn); final int status = HttpSupport.response(conn);
if (status != HttpURLConnection.HTTP_OK) { if (status != HttpURLConnection.HTTP_OK) {
throw new TransportException(uri, status + " " //$NON-NLS-1$ throw new TransportException(uri, status + " " //$NON-NLS-1$
@ -878,26 +841,18 @@ void execute() throws IOException {
conn.getInputStream().close(); conn.getInputStream().close();
throw wrongContentType(responseType, contentType); throw wrongContentType(responseType, contentType);
} }
in.add(openInputStream(conn));
if (!finalRequest)
in.add(execute);
conn = null;
} }
class HttpOutputStream extends TemporaryBuffer { HttpOutputStream getOutputStream() {
HttpOutputStream() { return out;
super(http.postBuffer);
} }
@Override InputStream getInputStream() {
protected OutputStream overflow() throws IOException { return in;
openStream();
conn.setChunkedStreamingMode(0);
return conn.getOutputStream();
}
} }
abstract void execute() throws IOException;
class HttpExecuteStream extends InputStream { class HttpExecuteStream extends InputStream {
public int read() throws IOException { public int read() throws IOException {
execute(); execute();
@ -914,6 +869,98 @@ public long skip(long n) throws IOException {
return 0; return 0;
} }
} }
class HttpOutputStream extends TemporaryBuffer {
HttpOutputStream() {
super(http.postBuffer);
}
@Override
protected OutputStream overflow() throws IOException {
openStream();
conn.setChunkedStreamingMode(0);
return conn.getOutputStream();
}
}
}
/**
* State required to speak multiple HTTP requests with the remote.
* <p>
* A service wrapper provides a normal looking InputStream and OutputStream
* pair which are connected via HTTP to the named remote service. Writing to
* the OutputStream is buffered until either the buffer overflows, or
* reading from the InputStream occurs. If overflow occurs HTTP/1.1 and its
* chunked transfer encoding is used to stream the request data to the
* remote service. If the entire request fits in the memory buffer, the
* older HTTP/1.0 standard and a fixed content length is used instead.
* <p>
* It is an error to attempt to read without there being outstanding data
* ready for transmission on the OutputStream.
* <p>
* No state is preserved between write-read request pairs. The caller is
* responsible for replaying state vector information as part of the request
* data written to the OutputStream. Any session HTTP cookies may or may not
* be preserved between requests, it is left up to the JVM's implementation
* of the HTTP client.
*/
class MultiRequestService extends Service {
boolean finalRequest;
MultiRequestService(final String serviceName) {
super(serviceName);
}
/** Keep opening send-receive pairs to the given URI. */
@Override
void execute() throws IOException {
out.close();
if (conn == null) {
if (out.length() == 0) {
// Request output hasn't started yet, but more data is being
// requested. If there is no request data buffered and the
// final request was already sent, do nothing to ensure the
// caller is shown EOF on the InputStream; otherwise an
// programming error has occurred within this module.
if (finalRequest)
return;
throw new TransportException(uri,
JGitText.get().startingReadStageWithoutWrittenRequestDataPendingIsNotSupported);
}
sendRequest();
}
out.reset();
openResponse();
in.add(openInputStream(conn));
if (!finalRequest)
in.add(execute);
conn = null;
}
}
/** Service for maintaining a single long-poll connection. */
class LongPollService extends Service {
/**
* @param serviceName
*/
LongPollService(String serviceName) {
super(serviceName);
}
/** Only open one send-receive request. */
@Override
void execute() throws IOException {
out.close();
if (conn == null)
sendRequest();
openResponse();
in.add(openInputStream(conn));
}
} }
private static class DummyX509TrustManager implements X509TrustManager { private static class DummyX509TrustManager implements X509TrustManager {