Merge changes Id2848c16,I7621c434

* changes:
  Rescale "Compressing objects" progress meter by size
  Split delta search buckets by byte weight
This commit is contained in:
Shawn Pearce 2013-04-17 14:53:13 -04:00 committed by Gerrit Code Review @ Eclipse.org
commit fa1bc6abb7
3 changed files with 243 additions and 56 deletions

View File

@ -43,7 +43,12 @@
package org.eclipse.jgit.internal.storage.pack;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@ -52,8 +57,13 @@
import org.eclipse.jgit.storage.pack.PackConfig;
final class DeltaTask implements Callable<Object> {
static final long MAX_METER = 9 << 20;
static final class Block {
private static final int MIN_TOP_PATH = 50 << 20;
final List<DeltaTask> tasks;
final int threads;
final PackConfig config;
final ObjectReader templateReader;
final DeltaCache dc;
@ -62,10 +72,14 @@ static final class Block {
final int beginIndex;
final int endIndex;
private long totalWeight;
private long bytesPerUnit;
Block(int threads, PackConfig config, ObjectReader reader,
DeltaCache dc, ThreadSafeProgressMonitor pm,
ObjectToPack[] list, int begin, int end) {
this.tasks = new ArrayList<DeltaTask>(threads);
this.threads = threads;
this.config = config;
this.templateReader = reader;
this.dc = dc;
@ -75,7 +89,14 @@ static final class Block {
this.endIndex = end;
}
synchronized Slice stealWork() {
int cost() {
int d = (int) (totalWeight / bytesPerUnit);
if (totalWeight % bytesPerUnit != 0)
d++;
return d;
}
synchronized DeltaWindow stealWork(DeltaTask forThread) {
for (;;) {
DeltaTask maxTask = null;
Slice maxSlice = null;
@ -92,9 +113,126 @@ synchronized Slice stealWork() {
if (maxTask == null)
return null;
if (maxTask.tryStealWork(maxSlice))
return maxSlice;
return forThread.initWindow(maxSlice);
}
}
void partitionTasks() {
ArrayList<WeightedPath> topPaths = computeTopPaths();
Iterator<WeightedPath> topPathItr = topPaths.iterator();
int nextTop = 0;
long weightPerThread = totalWeight / threads;
for (int i = beginIndex; i < endIndex;) {
DeltaTask task = new DeltaTask(this);
long w = 0;
// Assign the thread one top path.
if (topPathItr.hasNext()) {
WeightedPath p = topPathItr.next();
w += p.weight;
task.add(p.slice);
}
// Assign the task thread ~average weight.
int s = i;
for (; w < weightPerThread && i < endIndex;) {
if (nextTop < topPaths.size()
&& i == topPaths.get(nextTop).slice.beginIndex) {
if (s < i)
task.add(new Slice(s, i));
s = i = topPaths.get(nextTop++).slice.endIndex;
} else
w += list[i++].getWeight();
}
// Round up the slice to the end of a path.
if (s < i) {
int h = list[i - 1].getPathHash();
while (i < endIndex) {
if (h == list[i].getPathHash())
i++;
else
break;
}
task.add(new Slice(s, i));
}
if (!task.slices.isEmpty())
tasks.add(task);
}
while (topPathItr.hasNext()) {
WeightedPath p = topPathItr.next();
DeltaTask task = new DeltaTask(this);
task.add(p.slice);
tasks.add(task);
}
topPaths = null;
}
private ArrayList<WeightedPath> computeTopPaths() {
ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
threads);
int cp = beginIndex;
int ch = list[cp].getPathHash();
long cw = list[cp].getWeight();
totalWeight = list[cp].getWeight();
for (int i = cp + 1; i < endIndex; i++) {
ObjectToPack o = list[i];
if (ch != o.getPathHash()) {
if (MIN_TOP_PATH < cw) {
if (topPaths.size() < threads) {
Slice s = new Slice(cp, i);
topPaths.add(new WeightedPath(cw, s));
if (topPaths.size() == threads)
Collections.sort(topPaths);
} else if (topPaths.get(0).weight < cw) {
Slice s = new Slice(cp, i);
WeightedPath p = new WeightedPath(cw, s);
topPaths.set(0, p);
if (p.compareTo(topPaths.get(1)) > 0)
Collections.sort(topPaths);
}
}
cp = i;
ch = o.getPathHash();
cw = 0;
}
if (o.isEdge() || o.doNotAttemptDelta())
continue;
cw += o.getWeight();
totalWeight += o.getWeight();
}
// Sort by starting index to identify gaps later.
Collections.sort(topPaths, new Comparator<WeightedPath>() {
public int compare(WeightedPath a, WeightedPath b) {
return a.slice.beginIndex - b.slice.beginIndex;
}
});
bytesPerUnit = 1;
while (MAX_METER <= (totalWeight / bytesPerUnit))
bytesPerUnit <<= 10;
return topPaths;
}
}
static final class WeightedPath implements Comparable<WeightedPath> {
final long weight;
final Slice slice;
WeightedPath(long weight, Slice s) {
this.weight = weight;
this.slice = s;
}
public int compareTo(WeightedPath o) {
int cmp = Long.signum(weight - o.weight);
if (cmp != 0)
return cmp;
return slice.beginIndex - o.slice.beginIndex;
}
}
static final class Slice {
@ -112,36 +250,82 @@ final int size() {
}
private final Block block;
private final Slice firstSlice;
private volatile DeltaWindow dw;
private final LinkedList<Slice> slices;
DeltaTask(Block b, int beginIndex, int endIndex) {
private ObjectReader or;
private DeltaWindow dw;
DeltaTask(Block b) {
this.block = b;
this.firstSlice = new Slice(beginIndex, endIndex);
this.slices = new LinkedList<Slice>();
}
void add(Slice s) {
if (!slices.isEmpty()) {
Slice last = slices.getLast();
if (last.endIndex == s.beginIndex) {
slices.removeLast();
slices.add(new Slice(last.beginIndex, s.endIndex));
return;
}
}
slices.add(s);
}
public Object call() throws Exception {
ObjectReader or = block.templateReader.newReader();
or = block.templateReader.newReader();
try {
for (Slice s = firstSlice; s != null; s = block.stealWork()) {
dw = new DeltaWindow(block.config, block.dc, or, block.pm,
block.list, s.beginIndex, s.endIndex);
dw.search();
dw = null;
DeltaWindow w;
for (;;) {
synchronized (this) {
if (slices.isEmpty())
break;
w = initWindow(slices.removeFirst());
}
runWindow(w);
}
while ((w = block.stealWork(this)) != null)
runWindow(w);
} finally {
or.release();
block.pm.endWorker();
or.release();
or = null;
}
return null;
}
Slice remaining() {
DeltaWindow initWindow(Slice s) {
DeltaWindow w = new DeltaWindow(block.config, block.dc,
or, block.pm, block.bytesPerUnit,
block.list, s.beginIndex, s.endIndex);
synchronized (this) {
dw = w;
}
return w;
}
private void runWindow(DeltaWindow w) throws IOException {
try {
w.search();
} finally {
synchronized (this) {
dw = null;
}
}
}
synchronized Slice remaining() {
if (!slices.isEmpty())
return slices.getLast();
DeltaWindow d = dw;
return d != null ? d.remaining() : null;
}
boolean tryStealWork(Slice s) {
synchronized boolean tryStealWork(Slice s) {
if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
slices.removeLast();
return true;
}
DeltaWindow d = dw;
return d != null ? d.tryStealWork(s) : false;
}

View File

@ -64,6 +64,8 @@ final class DeltaWindow {
private final DeltaCache deltaCache;
private final ObjectReader reader;
private final ProgressMonitor monitor;
private final long bytesPerUnit;
private long bytesProcessed;
/** Maximum number of bytes to admit to the window at once. */
private final long maxMemory;
@ -92,12 +94,13 @@ final class DeltaWindow {
private Deflater deflater;
DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or,
ProgressMonitor pm,
ProgressMonitor pm, long bpu,
ObjectToPack[] in, int beginIndex, int endIndex) {
config = pc;
deltaCache = dc;
reader = or;
monitor = pm;
bytesPerUnit = bpu;
toSearch = in;
cur = beginIndex;
end = endIndex;
@ -134,7 +137,7 @@ synchronized DeltaTask.Slice remaining() {
}
synchronized boolean tryStealWork(DeltaTask.Slice s) {
if (s.beginIndex <= cur)
if (s.beginIndex <= cur || end <= s.beginIndex)
return false;
end = s.beginIndex;
return true;
@ -162,12 +165,14 @@ void search() throws IOException {
// We don't actually want to make a delta for
// them, just need to push them into the window
// so they can be read by other objects.
//
keepInWindow();
} else {
// Search for a delta for the current window slot.
//
monitor.update(1);
if (bytesPerUnit <= (bytesProcessed += next.getWeight())) {
int d = (int) (bytesProcessed / bytesPerUnit);
monitor.update(d);
bytesProcessed -= d * bytesPerUnit;
}
searchInWindow();
}
}

View File

@ -1286,9 +1286,7 @@ public int compare(ObjectToPack a, ObjectToPack b) {
return;
final long searchStart = System.currentTimeMillis();
beginPhase(PackingPhase.COMPRESSING, monitor, nonEdgeCnt);
searchForDeltas(monitor, list, cnt);
endPhase(monitor);
stats.deltaSearchNonEdgeObjects = nonEdgeCnt;
stats.timeCompressing = System.currentTimeMillis() - searchStart;
@ -1327,50 +1325,49 @@ private void searchForDeltas(final ProgressMonitor monitor,
int threads = config.getThreads();
if (threads == 0)
threads = Runtime.getRuntime().availableProcessors();
if (threads <= 1 || cnt <= config.getDeltaSearchWindowSize())
singleThreadDeltaSearch(monitor, list, cnt);
else
parallelDeltaSearch(monitor, list, cnt, threads);
}
if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) {
new DeltaWindow(config, new DeltaCache(config), reader, monitor,
list, 0, cnt).search();
return;
private void singleThreadDeltaSearch(ProgressMonitor monitor,
ObjectToPack[] list, int cnt) throws IOException {
long totalWeight = 0;
for (int i = 0; i < cnt; i++) {
ObjectToPack o = list[i];
if (!o.isEdge() && !o.doNotAttemptDelta())
totalWeight += o.getWeight();
}
final DeltaCache dc = new ThreadSafeDeltaCache(config);
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
long bytesPerUnit = 1;
while (DeltaTask.MAX_METER <= (totalWeight / bytesPerUnit))
bytesPerUnit <<= 10;
int cost = (int) (totalWeight / bytesPerUnit);
if (totalWeight % bytesPerUnit != 0)
cost++;
int estSize = cnt / threads;
if (estSize < config.getDeltaSearchWindowSize())
estSize = config.getDeltaSearchWindowSize();
beginPhase(PackingPhase.COMPRESSING, monitor, cost);
new DeltaWindow(config, new DeltaCache(config), reader,
monitor, bytesPerUnit,
list, 0, cnt).search();
endPhase(monitor);
}
private void parallelDeltaSearch(ProgressMonitor monitor,
ObjectToPack[] list, int cnt, int threads) throws IOException {
DeltaCache dc = new ThreadSafeDeltaCache(config);
ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
reader, dc, pm,
list, 0, cnt);
for (int i = 0; i < cnt;) {
final int start = i;
int end;
if (cnt - i < estSize) {
// If we don't have enough to fill the remaining block,
// schedule what is left over as a single block.
end = cnt;
} else {
// Try to split the block at the end of a path.
end = start + estSize;
int h = list[end - 1].getPathHash();
while (end < cnt) {
if (h == list[end].getPathHash())
end++;
else
break;
}
}
i = end;
taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
}
taskBlock.partitionTasks();
beginPhase(PackingPhase.COMPRESSING, monitor, taskBlock.cost());
pm.startWorkers(taskBlock.tasks.size());
final Executor executor = config.getExecutor();
final List<Throwable> errors = Collections
.synchronizedList(new ArrayList<Throwable>());
Executor executor = config.getExecutor();
final List<Throwable> errors =
Collections.synchronizedList(new ArrayList<Throwable>(threads));
if (executor instanceof ExecutorService) {
// Caller supplied us a service, use it directly.
runTasks((ExecutorService) executor, pm, taskBlock, errors);
@ -1434,6 +1431,7 @@ public void run() {
fail.initCause(err);
throw fail;
}
endPhase(monitor);
}
private static void runTasks(ExecutorService pool,