From 21e4aa2b9eaf392825e52ada6034cc3044c69c67 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Tue, 16 Apr 2013 08:19:13 -0700 Subject: [PATCH] Split delta search buckets by byte weight Instead of assuming all objects cost the same amount of time to delta compress, aggregate the byte size of objects in the list and partition threads with roughly equal total bytes. Before splitting the list select the N largest paths and assign each one to its own thread. This allows threads to get through the worst cases in parallel before attempting smaller paths that are more likely to be splittable. By running the largest path buckets first on each thread the likely slowest part of compression is done early, while progress is still reporting a low percentage. This gives users a better impression of how fast the phase will run. On very complex inputs the slow part is more likely to happen first, making a user realize its time to go grab lunch, or even run it overnight. If the worst sections are earlier, memory overruns may show up earlier, giving the user a chance to correct the configuration and try again before wasting large amounts of time. It also makes it less likely the delta compression phase reaches 92% in 30 minutes and then crawls for 10 hours through the remaining 8%. Change-Id: I7621c4349b99e40098825c4966b8411079992e5f --- .../jgit/internal/storage/pack/DeltaTask.java | 200 ++++++++++++++++-- .../internal/storage/pack/DeltaWindow.java | 2 +- .../internal/storage/pack/PackWriter.java | 27 +-- 3 files changed, 187 insertions(+), 42 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index cc212fb81..ca2fff688 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -43,7 +43,12 @@ package org.eclipse.jgit.internal.storage.pack; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -53,7 +58,10 @@ final class DeltaTask implements Callable { static final class Block { + private static final int MIN_TOP_PATH = 50 << 20; + final List tasks; + final int threads; final PackConfig config; final ObjectReader templateReader; final DeltaCache dc; @@ -62,10 +70,13 @@ static final class Block { final int beginIndex; final int endIndex; + private long totalWeight; + Block(int threads, PackConfig config, ObjectReader reader, DeltaCache dc, ThreadSafeProgressMonitor pm, ObjectToPack[] list, int begin, int end) { this.tasks = new ArrayList(threads); + this.threads = threads; this.config = config; this.templateReader = reader; this.dc = dc; @@ -75,7 +86,7 @@ static final class Block { this.endIndex = end; } - synchronized Slice stealWork() { + synchronized DeltaWindow stealWork(DeltaTask forThread) { for (;;) { DeltaTask maxTask = null; Slice maxSlice = null; @@ -92,9 +103,122 @@ synchronized Slice stealWork() { if (maxTask == null) return null; if (maxTask.tryStealWork(maxSlice)) - return maxSlice; + return forThread.initWindow(maxSlice); } } + + void partitionTasks() { + ArrayList topPaths = computeTopPaths(); + Iterator topPathItr = topPaths.iterator(); + int nextTop = 0; + long weightPerThread = totalWeight / threads; + for (int i = beginIndex; i < endIndex;) { + DeltaTask task = new DeltaTask(this); + long w = 0; + + // Assign the thread one top path. + if (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + w += p.weight; + task.add(p.slice); + } + + // Assign the task thread ~average weight. + int s = i; + for (; w < weightPerThread && i < endIndex;) { + if (nextTop < topPaths.size() + && i == topPaths.get(nextTop).slice.beginIndex) { + if (s < i) + task.add(new Slice(s, i)); + s = i = topPaths.get(nextTop++).slice.endIndex; + } else + w += list[i++].getWeight(); + } + + // Round up the slice to the end of a path. + if (s < i) { + int h = list[i - 1].getPathHash(); + while (i < endIndex) { + if (h == list[i].getPathHash()) + i++; + else + break; + } + task.add(new Slice(s, i)); + } + if (!task.slices.isEmpty()) + tasks.add(task); + } + while (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + DeltaTask task = new DeltaTask(this); + task.add(p.slice); + tasks.add(task); + } + + topPaths = null; + } + + private ArrayList computeTopPaths() { + ArrayList topPaths = new ArrayList( + threads); + int cp = beginIndex; + int ch = list[cp].getPathHash(); + long cw = list[cp].getWeight(); + totalWeight = list[cp].getWeight(); + + for (int i = cp + 1; i < endIndex; i++) { + ObjectToPack o = list[i]; + if (ch != o.getPathHash()) { + if (MIN_TOP_PATH < cw) { + if (topPaths.size() < threads) { + Slice s = new Slice(cp, i); + topPaths.add(new WeightedPath(cw, s)); + if (topPaths.size() == threads) + Collections.sort(topPaths); + } else if (topPaths.get(0).weight < cw) { + Slice s = new Slice(cp, i); + WeightedPath p = new WeightedPath(cw, s); + topPaths.set(0, p); + if (p.compareTo(topPaths.get(1)) > 0) + Collections.sort(topPaths); + } + } + cp = i; + ch = o.getPathHash(); + cw = 0; + } + if (o.isEdge() || o.doNotAttemptDelta()) + continue; + cw += o.getWeight(); + totalWeight += o.getWeight(); + } + + // Sort by starting index to identify gaps later. + Collections.sort(topPaths, new Comparator() { + public int compare(WeightedPath a, WeightedPath b) { + return a.slice.beginIndex - b.slice.beginIndex; + } + }); + return topPaths; + } + } + + static final class WeightedPath implements Comparable { + final long weight; + final Slice slice; + + WeightedPath(long weight, Slice s) { + this.weight = weight; + this.slice = s; + } + + public int compareTo(WeightedPath o) { + int cmp = Long.signum(weight - o.weight); + if (cmp != 0) + return cmp; + return slice.beginIndex - o.slice.beginIndex; + } } static final class Slice { @@ -112,36 +236,82 @@ final int size() { } private final Block block; - private final Slice firstSlice; - private volatile DeltaWindow dw; + private final LinkedList slices; - DeltaTask(Block b, int beginIndex, int endIndex) { + private ObjectReader or; + private DeltaWindow dw; + + DeltaTask(Block b) { this.block = b; - this.firstSlice = new Slice(beginIndex, endIndex); + this.slices = new LinkedList(); + } + + void add(Slice s) { + if (!slices.isEmpty()) { + Slice last = slices.getLast(); + if (last.endIndex == s.beginIndex) { + slices.removeLast(); + slices.add(new Slice(last.beginIndex, s.endIndex)); + return; + } + } + slices.add(s); } public Object call() throws Exception { - ObjectReader or = block.templateReader.newReader(); + or = block.templateReader.newReader(); try { - for (Slice s = firstSlice; s != null; s = block.stealWork()) { - dw = new DeltaWindow(block.config, block.dc, or, block.pm, - block.list, s.beginIndex, s.endIndex); - dw.search(); - dw = null; + DeltaWindow w; + for (;;) { + synchronized (this) { + if (slices.isEmpty()) + break; + w = initWindow(slices.removeFirst()); + } + runWindow(w); } + while ((w = block.stealWork(this)) != null) + runWindow(w); } finally { - or.release(); block.pm.endWorker(); + or.release(); + or = null; } return null; } - Slice remaining() { + DeltaWindow initWindow(Slice s) { + DeltaWindow w = new DeltaWindow(block.config, block.dc, + or, block.pm, + block.list, s.beginIndex, s.endIndex); + synchronized (this) { + dw = w; + } + return w; + } + + private void runWindow(DeltaWindow w) throws IOException { + try { + w.search(); + } finally { + synchronized (this) { + dw = null; + } + } + } + + synchronized Slice remaining() { + if (!slices.isEmpty()) + return slices.getLast(); DeltaWindow d = dw; return d != null ? d.remaining() : null; } - boolean tryStealWork(Slice s) { + synchronized boolean tryStealWork(Slice s) { + if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) { + slices.removeLast(); + return true; + } DeltaWindow d = dw; return d != null ? d.tryStealWork(s) : false; } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index 66871bb14..cc7fac800 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -134,7 +134,7 @@ synchronized DeltaTask.Slice remaining() { } synchronized boolean tryStealWork(DeltaTask.Slice s) { - if (s.beginIndex <= cur) + if (s.beginIndex <= cur || end <= s.beginIndex) return false; end = s.beginIndex; return true; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index 54a5826c0..a3ef27c21 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1337,35 +1337,10 @@ private void searchForDeltas(final ProgressMonitor monitor, final DeltaCache dc = new ThreadSafeDeltaCache(config); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); - int estSize = cnt / threads; - if (estSize < config.getDeltaSearchWindowSize()) - estSize = config.getDeltaSearchWindowSize(); - DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, reader, dc, pm, list, 0, cnt); - for (int i = 0; i < cnt;) { - final int start = i; - int end; - - if (cnt - i < estSize) { - // If we don't have enough to fill the remaining block, - // schedule what is left over as a single block. - end = cnt; - } else { - // Try to split the block at the end of a path. - end = start + estSize; - int h = list[end - 1].getPathHash(); - while (end < cnt) { - if (h == list[end].getPathHash()) - end++; - else - break; - } - } - i = end; - taskBlock.tasks.add(new DeltaTask(taskBlock, start, end)); - } + taskBlock.partitionTasks(); pm.startWorkers(taskBlock.tasks.size()); final Executor executor = config.getExecutor();