From 21e4aa2b9eaf392825e52ada6034cc3044c69c67 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Tue, 16 Apr 2013 08:19:13 -0700 Subject: [PATCH 1/2] Split delta search buckets by byte weight Instead of assuming all objects cost the same amount of time to delta compress, aggregate the byte size of objects in the list and partition threads with roughly equal total bytes. Before splitting the list select the N largest paths and assign each one to its own thread. This allows threads to get through the worst cases in parallel before attempting smaller paths that are more likely to be splittable. By running the largest path buckets first on each thread the likely slowest part of compression is done early, while progress is still reporting a low percentage. This gives users a better impression of how fast the phase will run. On very complex inputs the slow part is more likely to happen first, making a user realize its time to go grab lunch, or even run it overnight. If the worst sections are earlier, memory overruns may show up earlier, giving the user a chance to correct the configuration and try again before wasting large amounts of time. It also makes it less likely the delta compression phase reaches 92% in 30 minutes and then crawls for 10 hours through the remaining 8%. Change-Id: I7621c4349b99e40098825c4966b8411079992e5f --- .../jgit/internal/storage/pack/DeltaTask.java | 200 ++++++++++++++++-- .../internal/storage/pack/DeltaWindow.java | 2 +- .../internal/storage/pack/PackWriter.java | 27 +-- 3 files changed, 187 insertions(+), 42 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index cc212fb81..ca2fff688 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -43,7 +43,12 @@ package org.eclipse.jgit.internal.storage.pack; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -53,7 +58,10 @@ final class DeltaTask implements Callable { static final class Block { + private static final int MIN_TOP_PATH = 50 << 20; + final List tasks; + final int threads; final PackConfig config; final ObjectReader templateReader; final DeltaCache dc; @@ -62,10 +70,13 @@ static final class Block { final int beginIndex; final int endIndex; + private long totalWeight; + Block(int threads, PackConfig config, ObjectReader reader, DeltaCache dc, ThreadSafeProgressMonitor pm, ObjectToPack[] list, int begin, int end) { this.tasks = new ArrayList(threads); + this.threads = threads; this.config = config; this.templateReader = reader; this.dc = dc; @@ -75,7 +86,7 @@ static final class Block { this.endIndex = end; } - synchronized Slice stealWork() { + synchronized DeltaWindow stealWork(DeltaTask forThread) { for (;;) { DeltaTask maxTask = null; Slice maxSlice = null; @@ -92,9 +103,122 @@ synchronized Slice stealWork() { if (maxTask == null) return null; if (maxTask.tryStealWork(maxSlice)) - return maxSlice; + return forThread.initWindow(maxSlice); } } + + void partitionTasks() { + ArrayList topPaths = computeTopPaths(); + Iterator topPathItr = topPaths.iterator(); + int nextTop = 0; + long weightPerThread = totalWeight / threads; + for (int i = beginIndex; i < endIndex;) { + DeltaTask task = new DeltaTask(this); + long w = 0; + + // Assign the thread one top path. + if (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + w += p.weight; + task.add(p.slice); + } + + // Assign the task thread ~average weight. + int s = i; + for (; w < weightPerThread && i < endIndex;) { + if (nextTop < topPaths.size() + && i == topPaths.get(nextTop).slice.beginIndex) { + if (s < i) + task.add(new Slice(s, i)); + s = i = topPaths.get(nextTop++).slice.endIndex; + } else + w += list[i++].getWeight(); + } + + // Round up the slice to the end of a path. + if (s < i) { + int h = list[i - 1].getPathHash(); + while (i < endIndex) { + if (h == list[i].getPathHash()) + i++; + else + break; + } + task.add(new Slice(s, i)); + } + if (!task.slices.isEmpty()) + tasks.add(task); + } + while (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + DeltaTask task = new DeltaTask(this); + task.add(p.slice); + tasks.add(task); + } + + topPaths = null; + } + + private ArrayList computeTopPaths() { + ArrayList topPaths = new ArrayList( + threads); + int cp = beginIndex; + int ch = list[cp].getPathHash(); + long cw = list[cp].getWeight(); + totalWeight = list[cp].getWeight(); + + for (int i = cp + 1; i < endIndex; i++) { + ObjectToPack o = list[i]; + if (ch != o.getPathHash()) { + if (MIN_TOP_PATH < cw) { + if (topPaths.size() < threads) { + Slice s = new Slice(cp, i); + topPaths.add(new WeightedPath(cw, s)); + if (topPaths.size() == threads) + Collections.sort(topPaths); + } else if (topPaths.get(0).weight < cw) { + Slice s = new Slice(cp, i); + WeightedPath p = new WeightedPath(cw, s); + topPaths.set(0, p); + if (p.compareTo(topPaths.get(1)) > 0) + Collections.sort(topPaths); + } + } + cp = i; + ch = o.getPathHash(); + cw = 0; + } + if (o.isEdge() || o.doNotAttemptDelta()) + continue; + cw += o.getWeight(); + totalWeight += o.getWeight(); + } + + // Sort by starting index to identify gaps later. + Collections.sort(topPaths, new Comparator() { + public int compare(WeightedPath a, WeightedPath b) { + return a.slice.beginIndex - b.slice.beginIndex; + } + }); + return topPaths; + } + } + + static final class WeightedPath implements Comparable { + final long weight; + final Slice slice; + + WeightedPath(long weight, Slice s) { + this.weight = weight; + this.slice = s; + } + + public int compareTo(WeightedPath o) { + int cmp = Long.signum(weight - o.weight); + if (cmp != 0) + return cmp; + return slice.beginIndex - o.slice.beginIndex; + } } static final class Slice { @@ -112,36 +236,82 @@ final int size() { } private final Block block; - private final Slice firstSlice; - private volatile DeltaWindow dw; + private final LinkedList slices; - DeltaTask(Block b, int beginIndex, int endIndex) { + private ObjectReader or; + private DeltaWindow dw; + + DeltaTask(Block b) { this.block = b; - this.firstSlice = new Slice(beginIndex, endIndex); + this.slices = new LinkedList(); + } + + void add(Slice s) { + if (!slices.isEmpty()) { + Slice last = slices.getLast(); + if (last.endIndex == s.beginIndex) { + slices.removeLast(); + slices.add(new Slice(last.beginIndex, s.endIndex)); + return; + } + } + slices.add(s); } public Object call() throws Exception { - ObjectReader or = block.templateReader.newReader(); + or = block.templateReader.newReader(); try { - for (Slice s = firstSlice; s != null; s = block.stealWork()) { - dw = new DeltaWindow(block.config, block.dc, or, block.pm, - block.list, s.beginIndex, s.endIndex); - dw.search(); - dw = null; + DeltaWindow w; + for (;;) { + synchronized (this) { + if (slices.isEmpty()) + break; + w = initWindow(slices.removeFirst()); + } + runWindow(w); } + while ((w = block.stealWork(this)) != null) + runWindow(w); } finally { - or.release(); block.pm.endWorker(); + or.release(); + or = null; } return null; } - Slice remaining() { + DeltaWindow initWindow(Slice s) { + DeltaWindow w = new DeltaWindow(block.config, block.dc, + or, block.pm, + block.list, s.beginIndex, s.endIndex); + synchronized (this) { + dw = w; + } + return w; + } + + private void runWindow(DeltaWindow w) throws IOException { + try { + w.search(); + } finally { + synchronized (this) { + dw = null; + } + } + } + + synchronized Slice remaining() { + if (!slices.isEmpty()) + return slices.getLast(); DeltaWindow d = dw; return d != null ? d.remaining() : null; } - boolean tryStealWork(Slice s) { + synchronized boolean tryStealWork(Slice s) { + if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) { + slices.removeLast(); + return true; + } DeltaWindow d = dw; return d != null ? d.tryStealWork(s) : false; } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index 66871bb14..cc7fac800 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -134,7 +134,7 @@ synchronized DeltaTask.Slice remaining() { } synchronized boolean tryStealWork(DeltaTask.Slice s) { - if (s.beginIndex <= cur) + if (s.beginIndex <= cur || end <= s.beginIndex) return false; end = s.beginIndex; return true; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index 54a5826c0..a3ef27c21 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1337,35 +1337,10 @@ private void searchForDeltas(final ProgressMonitor monitor, final DeltaCache dc = new ThreadSafeDeltaCache(config); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); - int estSize = cnt / threads; - if (estSize < config.getDeltaSearchWindowSize()) - estSize = config.getDeltaSearchWindowSize(); - DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, reader, dc, pm, list, 0, cnt); - for (int i = 0; i < cnt;) { - final int start = i; - int end; - - if (cnt - i < estSize) { - // If we don't have enough to fill the remaining block, - // schedule what is left over as a single block. - end = cnt; - } else { - // Try to split the block at the end of a path. - end = start + estSize; - int h = list[end - 1].getPathHash(); - while (end < cnt) { - if (h == list[end].getPathHash()) - end++; - else - break; - } - } - i = end; - taskBlock.tasks.add(new DeltaTask(taskBlock, start, end)); - } + taskBlock.partitionTasks(); pm.startWorkers(taskBlock.tasks.size()); final Executor executor = config.getExecutor(); From 5d8a9f6f3f43ac43c6b1c48cdfad55e545171ea3 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Tue, 16 Apr 2013 09:25:29 -0700 Subject: [PATCH 2/2] Rescale "Compressing objects" progress meter by size Instead of counting objects processed, count number of bytes added into the window. This should rescale the progress meter so that 30% complete means 30% of the total uncompressed content size has been inflated and fed into the window. In theory the progress meter should be more accurate about its percentage complete/remaining fraction than with objects. When counting objects small objects move the progress meter more rapidly than large objects, but demand a smaller amount of work than large objects being compressed. Change-Id: Id2848c16a2148b5ca51e0ca1e29c5be97eefeb48 --- .../jgit/internal/storage/pack/DeltaTask.java | 16 ++++++- .../internal/storage/pack/DeltaWindow.java | 13 ++++-- .../internal/storage/pack/PackWriter.java | 45 ++++++++++++++----- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index ca2fff688..c4b01949d 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -57,6 +57,8 @@ import org.eclipse.jgit.storage.pack.PackConfig; final class DeltaTask implements Callable { + static final long MAX_METER = 9 << 20; + static final class Block { private static final int MIN_TOP_PATH = 50 << 20; @@ -71,6 +73,7 @@ static final class Block { final int endIndex; private long totalWeight; + private long bytesPerUnit; Block(int threads, PackConfig config, ObjectReader reader, DeltaCache dc, ThreadSafeProgressMonitor pm, @@ -86,6 +89,13 @@ static final class Block { this.endIndex = end; } + int cost() { + int d = (int) (totalWeight / bytesPerUnit); + if (totalWeight % bytesPerUnit != 0) + d++; + return d; + } + synchronized DeltaWindow stealWork(DeltaTask forThread) { for (;;) { DeltaTask maxTask = null; @@ -200,6 +210,10 @@ public int compare(WeightedPath a, WeightedPath b) { return a.slice.beginIndex - b.slice.beginIndex; } }); + + bytesPerUnit = 1; + while (MAX_METER <= (totalWeight / bytesPerUnit)) + bytesPerUnit <<= 10; return topPaths; } } @@ -282,7 +296,7 @@ public Object call() throws Exception { DeltaWindow initWindow(Slice s) { DeltaWindow w = new DeltaWindow(block.config, block.dc, - or, block.pm, + or, block.pm, block.bytesPerUnit, block.list, s.beginIndex, s.endIndex); synchronized (this) { dw = w; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index cc7fac800..19d06a23f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -64,6 +64,8 @@ final class DeltaWindow { private final DeltaCache deltaCache; private final ObjectReader reader; private final ProgressMonitor monitor; + private final long bytesPerUnit; + private long bytesProcessed; /** Maximum number of bytes to admit to the window at once. */ private final long maxMemory; @@ -92,12 +94,13 @@ final class DeltaWindow { private Deflater deflater; DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or, - ProgressMonitor pm, + ProgressMonitor pm, long bpu, ObjectToPack[] in, int beginIndex, int endIndex) { config = pc; deltaCache = dc; reader = or; monitor = pm; + bytesPerUnit = bpu; toSearch = in; cur = beginIndex; end = endIndex; @@ -162,12 +165,14 @@ void search() throws IOException { // We don't actually want to make a delta for // them, just need to push them into the window // so they can be read by other objects. - // keepInWindow(); } else { // Search for a delta for the current window slot. - // - monitor.update(1); + if (bytesPerUnit <= (bytesProcessed += next.getWeight())) { + int d = (int) (bytesProcessed / bytesPerUnit); + monitor.update(d); + bytesProcessed -= d * bytesPerUnit; + } searchInWindow(); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index a3ef27c21..a7122592f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1286,9 +1286,7 @@ public int compare(ObjectToPack a, ObjectToPack b) { return; final long searchStart = System.currentTimeMillis(); - beginPhase(PackingPhase.COMPRESSING, monitor, nonEdgeCnt); searchForDeltas(monitor, list, cnt); - endPhase(monitor); stats.deltaSearchNonEdgeObjects = nonEdgeCnt; stats.timeCompressing = System.currentTimeMillis() - searchStart; @@ -1327,25 +1325,49 @@ private void searchForDeltas(final ProgressMonitor monitor, int threads = config.getThreads(); if (threads == 0) threads = Runtime.getRuntime().availableProcessors(); + if (threads <= 1 || cnt <= config.getDeltaSearchWindowSize()) + singleThreadDeltaSearch(monitor, list, cnt); + else + parallelDeltaSearch(monitor, list, cnt, threads); + } - if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) { - new DeltaWindow(config, new DeltaCache(config), reader, monitor, - list, 0, cnt).search(); - return; + private void singleThreadDeltaSearch(ProgressMonitor monitor, + ObjectToPack[] list, int cnt) throws IOException { + long totalWeight = 0; + for (int i = 0; i < cnt; i++) { + ObjectToPack o = list[i]; + if (!o.isEdge() && !o.doNotAttemptDelta()) + totalWeight += o.getWeight(); } - final DeltaCache dc = new ThreadSafeDeltaCache(config); - final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); + long bytesPerUnit = 1; + while (DeltaTask.MAX_METER <= (totalWeight / bytesPerUnit)) + bytesPerUnit <<= 10; + int cost = (int) (totalWeight / bytesPerUnit); + if (totalWeight % bytesPerUnit != 0) + cost++; + beginPhase(PackingPhase.COMPRESSING, monitor, cost); + new DeltaWindow(config, new DeltaCache(config), reader, + monitor, bytesPerUnit, + list, 0, cnt).search(); + endPhase(monitor); + } + + private void parallelDeltaSearch(ProgressMonitor monitor, + ObjectToPack[] list, int cnt, int threads) throws IOException { + DeltaCache dc = new ThreadSafeDeltaCache(config); + ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, reader, dc, pm, list, 0, cnt); taskBlock.partitionTasks(); + beginPhase(PackingPhase.COMPRESSING, monitor, taskBlock.cost()); pm.startWorkers(taskBlock.tasks.size()); - final Executor executor = config.getExecutor(); - final List errors = Collections - .synchronizedList(new ArrayList()); + Executor executor = config.getExecutor(); + final List errors = + Collections.synchronizedList(new ArrayList(threads)); if (executor instanceof ExecutorService) { // Caller supplied us a service, use it directly. runTasks((ExecutorService) executor, pm, taskBlock, errors); @@ -1409,6 +1431,7 @@ public void run() { fail.initCause(err); throw fail; } + endPhase(monitor); } private static void runTasks(ExecutorService pool,