DHT: Fix thread-safety issue in AbstractWriteBuffer

There is a data corruption issue with the 'running' list if a
background thread schedules something onto the buffer while the
application thread is also using it.

Change-Id: I5ba78b98b6632965d677a9c8f209f0cf8320cc3d
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
This commit is contained in:
Shawn O. Pearce 2011-05-26 17:19:30 -07:00
parent 0a39fb2ab6
commit 042a66fe8c
1 changed files with 18 additions and 7 deletions

View File

@ -82,6 +82,8 @@ public abstract class AbstractWriteBuffer implements WriteBuffer {
private final List<Future<?>> running; private final List<Future<?>> running;
private final Object runningLock;
private final Semaphore spaceAvailable; private final Semaphore spaceAvailable;
private int queuedCount; private int queuedCount;
@ -102,6 +104,7 @@ protected AbstractWriteBuffer(ExecutorService executor, int bufferSize) {
this.executor = executor; this.executor = executor;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.running = new LinkedList<Future<?>>(); this.running = new LinkedList<Future<?>>();
this.runningLock = new Object();
this.spaceAvailable = new Semaphore(bufferSize); this.spaceAvailable = new Semaphore(bufferSize);
} }
@ -189,15 +192,19 @@ public void flush() throws DhtException {
} }
} }
synchronized (runningLock) {
checkRunningTasks(true); checkRunningTasks(true);
}
} finally { } finally {
flushing = false; flushing = false;
} }
} }
public void abort() throws DhtException { public void abort() throws DhtException {
synchronized (runningLock) {
checkRunningTasks(true); checkRunningTasks(true);
} }
}
private void acquireSpace(int sz) throws DhtException { private void acquireSpace(int sz) throws DhtException {
try { try {
@ -259,10 +266,12 @@ public T call() throws Exception {
return; return;
} }
synchronized (runningLock) {
if (!flushing) if (!flushing)
checkRunningTasks(false); checkRunningTasks(false);
running.add(executor.submit(op)); running.add(executor.submit(op));
} }
}
/** /**
* Wrap a callback to update the buffer. * Wrap a callback to update the buffer.
@ -284,8 +293,10 @@ protected <T> AsyncCallback<T> wrap(final AsyncCallback<T> callback,
int size) throws DhtException { int size) throws DhtException {
int permits = permitsForSize(size); int permits = permitsForSize(size);
WrappedCallback<T> op = new WrappedCallback<T>(callback, permits); WrappedCallback<T> op = new WrappedCallback<T>(callback, permits);
synchronized (runningLock) {
checkRunningTasks(false); checkRunningTasks(false);
running.add(op); running.add(op);
}
return op; return op;
} }