Fix hanging fetch via SSH

Signaling the need to flush() only via the interrupted status of a
copying thread doesn't work realiably with jsch. The write() method of
com.jcraft.jsch.Session catches the InterruptedException in several
places. As a result StreamCopyThread can easily miss the interrupt if it
was interrupted during the dst.write() or dst.flush() call. When it
happens, StreamCopyThread will not send some data to the remote side and
will not get the response back, because remote side will wait for more
data from us.

The flushCount field incremented during flush() method guarantees we
don't miss flush() even if jsch catches InterruptedException in
dst.write() or dst.flush() calls.

Checking the flushCount after dst.write() is needed because dst.write()
can clear interrupt status, in this case the next blocking src.read()
won't throw an exception and we will not call flush().

Flush is performed only after src.read() was blocked and thrown an
InterruptedIOException exception, this guarantees that we flush all the
data available in src so far (src.read() doesn't block while more is
available).

FlushCount is reset to 0 only when there were no flush() calls since
last blocked read, that means we flushed all data available in src. If
there were flush() calls, the interrupt status is restored, so next
blocked read will throw InterruptedException and we will flush()
again.

Change-Id: I692b226edaff502f06235ec05da9052b5fe6478a
Signed-off-by: Dmitry Neverov <dmitry.neverov@gmail.com>
This commit is contained in:
Dmitry Neverov 2015-08-21 17:44:49 +02:00
parent 847b3d1258
commit a86566dcf0
1 changed files with 17 additions and 5 deletions

View File

@ -47,6 +47,7 @@
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
/** Thread to copy from an input stream to an output stream. */
public class StreamCopyThread extends Thread {
@ -58,6 +59,8 @@ public class StreamCopyThread extends Thread {
private volatile boolean done;
private final AtomicInteger flushCount = new AtomicInteger(0);
/**
* Create a thread to copy data from an input stream to an output stream.
*
@ -82,6 +85,7 @@ public StreamCopyThread(final InputStream i, final OutputStream o) {
* the request.
*/
public void flush() {
flushCount.incrementAndGet();
interrupt();
}
@ -109,22 +113,30 @@ public void halt() throws InterruptedException {
public void run() {
try {
final byte[] buf = new byte[BUFFER_SIZE];
int interruptCounter = 0;
int flushCountBeforeRead = 0;
boolean readInterrupted = false;
for (;;) {
try {
if (interruptCounter > 0) {
if (readInterrupted) {
dst.flush();
interruptCounter--;
readInterrupted = false;
if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
// There was a flush() call since last blocked read.
// Set interrupt status, so next blocked read will throw
// an InterruptedIOException and we will flush again.
interrupt();
}
}
if (done)
break;
flushCountBeforeRead = flushCount.get();
final int n;
try {
n = src.read(buf);
} catch (InterruptedIOException wakey) {
interruptCounter++;
readInterrupted = true;
continue;
}
if (n < 0)
@ -141,7 +153,7 @@ public void run() {
// set interrupt status, which will be checked
// when we block in src.read
if (writeInterrupted)
if (writeInterrupted || flushCount.get() > 0)
interrupt();
break;
}