Merge "Fix hanging fetch via SSH"
This commit is contained in:
commit
80edcac06f
|
@ -47,6 +47,7 @@
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/** Thread to copy from an input stream to an output stream. */
|
/** Thread to copy from an input stream to an output stream. */
|
||||||
public class StreamCopyThread extends Thread {
|
public class StreamCopyThread extends Thread {
|
||||||
|
@ -58,6 +59,8 @@ public class StreamCopyThread extends Thread {
|
||||||
|
|
||||||
private volatile boolean done;
|
private volatile boolean done;
|
||||||
|
|
||||||
|
private final AtomicInteger flushCount = new AtomicInteger(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a thread to copy data from an input stream to an output stream.
|
* Create a thread to copy data from an input stream to an output stream.
|
||||||
*
|
*
|
||||||
|
@ -82,6 +85,7 @@ public StreamCopyThread(final InputStream i, final OutputStream o) {
|
||||||
* the request.
|
* the request.
|
||||||
*/
|
*/
|
||||||
public void flush() {
|
public void flush() {
|
||||||
|
flushCount.incrementAndGet();
|
||||||
interrupt();
|
interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,22 +113,30 @@ public void halt() throws InterruptedException {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
final byte[] buf = new byte[BUFFER_SIZE];
|
final byte[] buf = new byte[BUFFER_SIZE];
|
||||||
int interruptCounter = 0;
|
int flushCountBeforeRead = 0;
|
||||||
|
boolean readInterrupted = false;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
if (interruptCounter > 0) {
|
if (readInterrupted) {
|
||||||
dst.flush();
|
dst.flush();
|
||||||
interruptCounter--;
|
readInterrupted = false;
|
||||||
|
if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
|
||||||
|
// There was a flush() call since last blocked read.
|
||||||
|
// Set interrupt status, so next blocked read will throw
|
||||||
|
// an InterruptedIOException and we will flush again.
|
||||||
|
interrupt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (done)
|
if (done)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
flushCountBeforeRead = flushCount.get();
|
||||||
final int n;
|
final int n;
|
||||||
try {
|
try {
|
||||||
n = src.read(buf);
|
n = src.read(buf);
|
||||||
} catch (InterruptedIOException wakey) {
|
} catch (InterruptedIOException wakey) {
|
||||||
interruptCounter++;
|
readInterrupted = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (n < 0)
|
if (n < 0)
|
||||||
|
@ -141,7 +153,7 @@ public void run() {
|
||||||
|
|
||||||
// set interrupt status, which will be checked
|
// set interrupt status, which will be checked
|
||||||
// when we block in src.read
|
// when we block in src.read
|
||||||
if (writeInterrupted)
|
if (writeInterrupted || flushCount.get() > 0)
|
||||||
interrupt();
|
interrupt();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue