Merge "Make the FileLfsRepository thread safe"
This commit is contained in:
commit
09810d013c
|
@ -51,6 +51,13 @@
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.text.MessageFormat;
|
import java.text.MessageFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.eclipse.jgit.lfs.lib.AnyLongObjectId;
|
import org.eclipse.jgit.lfs.lib.AnyLongObjectId;
|
||||||
import org.eclipse.jgit.lfs.lib.LongObjectId;
|
import org.eclipse.jgit.lfs.lib.LongObjectId;
|
||||||
|
@ -98,4 +105,36 @@ public void testLargeFileUpload() throws Exception {
|
||||||
assertEquals("expected object length " + Files.size(f), Files.size(f),
|
assertEquals("expected object length " + Files.size(f), Files.size(f),
|
||||||
repository.getSize(id));
|
repository.getSize(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParallelUploads() throws Exception {
|
||||||
|
int count = 10;
|
||||||
|
List<Path> paths = new ArrayList<>(count);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Path f = Paths.get(getTempDirectory().toString(),
|
||||||
|
"largeRandomFile_" + i);
|
||||||
|
createPseudoRandomContentFile(f, 1 * MiB);
|
||||||
|
paths.add(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(count);
|
||||||
|
|
||||||
|
ExecutorService e = Executors.newFixedThreadPool(count);
|
||||||
|
try {
|
||||||
|
for (final Path p : paths) {
|
||||||
|
e.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
barrier.await();
|
||||||
|
putContent(p);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
e.shutdown();
|
||||||
|
e.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,10 +45,8 @@
|
||||||
import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION;
|
import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.Channels;
|
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.nio.channels.WritableByteChannel;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
|
@ -70,7 +68,6 @@ public class FileLfsRepository implements LargeFileRepository {
|
||||||
|
|
||||||
private final String url;
|
private final String url;
|
||||||
private final Path dir;
|
private final Path dir;
|
||||||
private AtomicObjectOutputStream out;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param url
|
* @param url
|
||||||
|
@ -147,21 +144,11 @@ ReadableByteChannel getReadChannel(AnyLongObjectId id)
|
||||||
return FileChannel.open(getPath(id), StandardOpenOption.READ);
|
return FileChannel.open(getPath(id), StandardOpenOption.READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
WritableByteChannel getWriteChannel(AnyLongObjectId id)
|
AtomicObjectOutputStream getOutputStream(AnyLongObjectId id)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path path = getPath(id);
|
Path path = getPath(id);
|
||||||
Files.createDirectories(path.getParent());
|
Files.createDirectories(path.getParent());
|
||||||
out = new AtomicObjectOutputStream(path, id);
|
return new AtomicObjectOutputStream(path, id);
|
||||||
return Channels.newChannel(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Abort the output stream
|
|
||||||
*/
|
|
||||||
void abortWrite() {
|
|
||||||
if (out != null) {
|
|
||||||
out.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static char[] toHexCharArray(int b) {
|
private static char[] toHexCharArray(int b) {
|
||||||
|
|
|
@ -74,13 +74,13 @@ class ObjectUploadListener implements ReadListener {
|
||||||
|
|
||||||
private final HttpServletResponse response;
|
private final HttpServletResponse response;
|
||||||
|
|
||||||
private FileLfsRepository repository;
|
|
||||||
|
|
||||||
private final ServletInputStream in;
|
private final ServletInputStream in;
|
||||||
|
|
||||||
private final ReadableByteChannel inChannel;
|
private final ReadableByteChannel inChannel;
|
||||||
|
|
||||||
private WritableByteChannel out;
|
private final AtomicObjectOutputStream out;
|
||||||
|
|
||||||
|
private WritableByteChannel channel;
|
||||||
|
|
||||||
private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
|
private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
|
||||||
|
|
||||||
|
@ -98,12 +98,12 @@ public ObjectUploadListener(FileLfsRepository repository,
|
||||||
AsyncContext context, HttpServletRequest request,
|
AsyncContext context, HttpServletRequest request,
|
||||||
HttpServletResponse response, AnyLongObjectId id)
|
HttpServletResponse response, AnyLongObjectId id)
|
||||||
throws FileNotFoundException, IOException {
|
throws FileNotFoundException, IOException {
|
||||||
this.repository = repository;
|
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.response = response;
|
this.response = response;
|
||||||
this.in = request.getInputStream();
|
this.in = request.getInputStream();
|
||||||
this.inChannel = Channels.newChannel(in);
|
this.inChannel = Channels.newChannel(in);
|
||||||
this.out = repository.getWriteChannel(id);
|
this.out = repository.getOutputStream(id);
|
||||||
|
this.channel = Channels.newChannel(out);
|
||||||
response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON);
|
response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,12 +117,12 @@ public void onDataAvailable() throws IOException {
|
||||||
while (in.isReady()) {
|
while (in.isReady()) {
|
||||||
if (inChannel.read(buffer) > 0) {
|
if (inChannel.read(buffer) > 0) {
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
out.write(buffer);
|
channel.write(buffer);
|
||||||
buffer.compact();
|
buffer.compact();
|
||||||
} else {
|
} else {
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
while (buffer.hasRemaining()) {
|
while (buffer.hasRemaining()) {
|
||||||
out.write(buffer);
|
channel.write(buffer);
|
||||||
}
|
}
|
||||||
close();
|
close();
|
||||||
return;
|
return;
|
||||||
|
@ -141,7 +141,7 @@ public void onAllDataRead() throws IOException {
|
||||||
protected void close() throws IOException {
|
protected void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
inChannel.close();
|
inChannel.close();
|
||||||
out.close();
|
channel.close();
|
||||||
// TODO check if status 200 is ok for PUT request, HTTP foresees 204
|
// TODO check if status 200 is ok for PUT request, HTTP foresees 204
|
||||||
// for successful PUT without response body
|
// for successful PUT without response body
|
||||||
response.setStatus(HttpServletResponse.SC_OK);
|
response.setStatus(HttpServletResponse.SC_OK);
|
||||||
|
@ -157,9 +157,9 @@ protected void close() throws IOException {
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable e) {
|
public void onError(Throwable e) {
|
||||||
try {
|
try {
|
||||||
repository.abortWrite();
|
out.abort();
|
||||||
inChannel.close();
|
inChannel.close();
|
||||||
out.close();
|
channel.close();
|
||||||
int status;
|
int status;
|
||||||
if (e instanceof CorruptLongObjectException) {
|
if (e instanceof CorruptLongObjectException) {
|
||||||
status = HttpStatus.SC_BAD_REQUEST;
|
status = HttpStatus.SC_BAD_REQUEST;
|
||||||
|
|
Loading…
Reference in New Issue