diff --git a/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java b/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java index 35bf09b0c..1fb91bd29 100644 --- a/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java +++ b/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java @@ -51,6 +51,13 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.eclipse.jgit.lfs.lib.AnyLongObjectId; import org.eclipse.jgit.lfs.lib.LongObjectId; @@ -98,4 +105,36 @@ public void testLargeFileUpload() throws Exception { assertEquals("expected object length " + Files.size(f), Files.size(f), repository.getSize(id)); } + + @Test + public void testParallelUploads() throws Exception { + int count = 10; + List paths = new ArrayList<>(count); + + for (int i = 0; i < count; i++) { + Path f = Paths.get(getTempDirectory().toString(), + "largeRandomFile_" + i); + createPseudoRandomContentFile(f, 1 * MiB); + paths.add(f); + } + + final CyclicBarrier barrier = new CyclicBarrier(count); + + ExecutorService e = Executors.newFixedThreadPool(count); + try { + for (final Path p : paths) { + e.submit(new Callable() { + @Override + public Void call() throws Exception { + barrier.await(); + putContent(p); + return null; + } + }); + } + } finally { + e.shutdown(); + e.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } + } } diff --git a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java index 12da271b3..2e71c0407 100644 --- a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java +++ b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java @@ -45,10 +45,8 @@ import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION; import java.io.IOException; -import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -70,7 +68,6 @@ public class FileLfsRepository implements LargeFileRepository { private final String url; private final Path dir; - private AtomicObjectOutputStream out; /** * @param url @@ -147,21 +144,11 @@ ReadableByteChannel getReadChannel(AnyLongObjectId id) return FileChannel.open(getPath(id), StandardOpenOption.READ); } - WritableByteChannel getWriteChannel(AnyLongObjectId id) + AtomicObjectOutputStream getOutputStream(AnyLongObjectId id) throws IOException { Path path = getPath(id); Files.createDirectories(path.getParent()); - out = new AtomicObjectOutputStream(path, id); - return Channels.newChannel(out); - } - - /** - * Abort the output stream - */ - void abortWrite() { - if (out != null) { - out.abort(); - } + return new AtomicObjectOutputStream(path, id); } private static char[] toHexCharArray(int b) { diff --git a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java index 05970ad1f..e524ac643 100644 --- a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java +++ b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java @@ -74,13 +74,13 @@ class ObjectUploadListener implements ReadListener { private final HttpServletResponse response; - private FileLfsRepository repository; - private final ServletInputStream in; private final ReadableByteChannel inChannel; - private WritableByteChannel out; + private final AtomicObjectOutputStream out; + + private WritableByteChannel channel; private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192); @@ -98,12 +98,12 @@ public ObjectUploadListener(FileLfsRepository repository, AsyncContext context, HttpServletRequest request, HttpServletResponse response, AnyLongObjectId id) throws FileNotFoundException, IOException { - this.repository = repository; this.context = context; this.response = response; this.in = request.getInputStream(); this.inChannel = Channels.newChannel(in); - this.out = repository.getWriteChannel(id); + this.out = repository.getOutputStream(id); + this.channel = Channels.newChannel(out); response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON); } @@ -117,12 +117,12 @@ public void onDataAvailable() throws IOException { while (in.isReady()) { if (inChannel.read(buffer) > 0) { buffer.flip(); - out.write(buffer); + channel.write(buffer); buffer.compact(); } else { buffer.flip(); while (buffer.hasRemaining()) { - out.write(buffer); + channel.write(buffer); } close(); return; @@ -141,7 +141,7 @@ public void onAllDataRead() throws IOException { protected void close() throws IOException { try { inChannel.close(); - out.close(); + channel.close(); // TODO check if status 200 is ok for PUT request, HTTP foresees 204 // for successful PUT without response body response.setStatus(HttpServletResponse.SC_OK); @@ -157,9 +157,9 @@ protected void close() throws IOException { @Override public void onError(Throwable e) { try { - repository.abortWrite(); + out.abort(); inChannel.close(); - out.close(); + channel.close(); int status; if (e instanceof CorruptLongObjectException) { status = HttpStatus.SC_BAD_REQUEST;