commit 71156aff806856d5d48e72cd8aeb9315b9ae0b62 (tree)
parent ffc6da29e3fa53a9c81bcb8af7467cdd6345538f
Author: Jacob Young <jacobly0@users.noreply.github.com>
Date: Sat, 31 Jan 2026 20:22:53 -0500
std.Progress: implement ipc resource cleanup
Diffstat:
4 files changed, 554 insertions(+), 541 deletions(-)
diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig
@@ -386,10 +386,14 @@ pub const ZigProcess = struct {
child: std.process.Child,
multi_reader_buffer: Io.File.MultiReader.Buffer(2),
multi_reader: Io.File.MultiReader,
- progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void,
+ progress_ipc_index: ?if (std.Progress.have_ipc) std.Progress.Ipc.Index else noreturn,
pub const StreamEnum = enum { stdout, stderr };
+ pub fn saveState(zp: *ZigProcess, prog_node: std.Progress.Node) void {
+ zp.progress_ipc_index = if (std.Progress.have_ipc) prog_node.takeIpcIndex() else null;
+ }
+
pub fn deinit(zp: *ZigProcess, io: Io) void {
zp.child.kill(io);
zp.multi_reader.deinit();
@@ -417,7 +421,14 @@ pub fn evalZigProcess(
if (s.getZigProcess()) |zp| update: {
assert(watch);
- if (std.Progress.have_ipc) if (zp.progress_ipc_fd) |fd| prog_node.setIpcFd(fd);
+ if (zp.progress_ipc_index) |ipc_index| prog_node.setIpcIndex(ipc_index);
+ zp.progress_ipc_index = null;
+ var exited = false;
+ defer if (exited) {
+ s.cast(Compile).?.zig_process = null;
+ zp.deinit(io);
+ gpa.destroy(zp);
+ } else zp.saveState(prog_node);
const result = zigProcessUpdate(s, zp, watch, web_server, gpa) catch |err| switch (err) {
error.BrokenPipe, error.EndOfStream => |reason| {
std.log.info("{s} restart required: {t}", .{ argv[0], reason });
@@ -426,7 +437,7 @@ pub fn evalZigProcess(
return s.fail("unable to wait for {s}: {t}", .{ argv[0], e });
};
_ = term;
- s.clearZigProcess(gpa);
+ exited = true;
break :update;
},
else => |e| return e,
@@ -442,7 +453,7 @@ pub fn evalZigProcess(
return s.fail("unable to wait for {s}: {t}", .{ argv[0], e });
};
s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0;
- s.clearZigProcess(gpa);
+ exited = true;
try handleChildProcessTerm(s, term);
return error.MakeFailed;
}
@@ -467,19 +478,16 @@ pub fn evalZigProcess(
.progress_node = prog_node,
}) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err });
- zp.* = .{
- .child = zp.child,
- .multi_reader_buffer = undefined,
- .multi_reader = undefined,
- .progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {},
- };
zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{
zp.child.stdout.?, zp.child.stderr.?,
});
- if (watch) s.setZigProcess(zp);
+ if (watch) s.cast(Compile).?.zig_process = zp;
defer if (!watch) zp.deinit(io);
- const result = try zigProcessUpdate(s, zp, watch, web_server, gpa);
+ const result = result: {
+ defer if (watch) zp.saveState(prog_node);
+ break :result try zigProcessUpdate(s, zp, watch, web_server, gpa);
+ };
if (!watch) {
// Send EOF to stdin.
@@ -670,26 +678,6 @@ pub fn getZigProcess(s: *Step) ?*ZigProcess {
};
}
-fn setZigProcess(s: *Step, zp: *ZigProcess) void {
- switch (s.id) {
- .compile => s.cast(Compile).?.zig_process = zp,
- else => unreachable,
- }
-}
-
-fn clearZigProcess(s: *Step, gpa: Allocator) void {
- switch (s.id) {
- .compile => {
- const compile = s.cast(Compile).?;
- if (compile.zig_process) |zp| {
- gpa.destroy(zp);
- compile.zig_process = null;
- }
- },
- else => unreachable,
- }
-}
-
fn sendMessage(io: Io, file: Io.File, tag: std.zig.Client.Message.Tag) !void {
const header: std.zig.Client.Message.Header = .{
.tag = tag,
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -19,7 +19,7 @@ const Alignment = std.mem.Alignment;
const assert = std.debug.assert;
const posix = std.posix;
const windows = std.os.windows;
-const ws2_32 = std.os.windows.ws2_32;
+const ws2_32 = windows.ws2_32;
/// Thread-safe.
///
@@ -2609,8 +2609,7 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
// opportunity to find additional ready operations.
break :t 0;
}
- const max_poll_ms = std.math.maxInt(i32);
- break :t max_poll_ms;
+ break :t std.math.maxInt(i32);
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms);
@@ -2730,6 +2729,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
break :allocation allocation;
};
@memcpy(slice[0..poll_buffer_len], storage.slice);
+ storage.slice = slice;
}
storage.slice[len] = .{
.fd = file.handle,
@@ -2783,9 +2783,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
}
const d = deadline orelse break :t -1;
const duration = d.durationFromNow(t_io);
- if (duration.raw.nanoseconds <= 0) return error.Timeout;
- const max_poll_ms = std.math.maxInt(i32);
- break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
+ break :t @min(@max(0, duration.raw.toMilliseconds()), std.math.maxInt(i32));
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms);
@@ -14420,7 +14418,10 @@ const WindowsEnvironStrings = struct {
PATHEXT: ?[:0]const u16 = null,
fn scan() WindowsEnvironStrings {
- const ptr = windows.peb().ProcessParameters.Environment;
+ const peb = windows.peb();
+ assert(windows.ntdll.RtlEnterCriticalSection(peb.FastPebLock) == .SUCCESS);
+ defer assert(windows.ntdll.RtlLeaveCriticalSection(peb.FastPebLock) == .SUCCESS);
+ const ptr = peb.ProcessParameters.Environment;
var result: WindowsEnvironStrings = .{};
var i: usize = 0;
@@ -14446,7 +14447,7 @@ const WindowsEnvironStrings = struct {
inline for (@typeInfo(WindowsEnvironStrings).@"struct".fields) |field| {
const field_name_w = comptime std.unicode.wtf8ToWtf16LeStringLiteral(field.name);
- if (std.os.windows.eqlIgnoreCaseWtf16(key_w, field_name_w)) @field(result, field.name) = value_w;
+ if (windows.eqlIgnoreCaseWtf16(key_w, field_name_w)) @field(result, field.name) = value_w;
}
}
@@ -14465,29 +14466,46 @@ fn scanEnviron(t: *Threaded) void {
// This value expires with any call that modifies the environment,
// which is outside of this Io implementation's control, so references
// must be short-lived.
- const ptr = windows.peb().ProcessParameters.Environment;
+ const peb = windows.peb();
+ assert(windows.ntdll.RtlEnterCriticalSection(peb.FastPebLock) == .SUCCESS);
+ defer assert(windows.ntdll.RtlLeaveCriticalSection(peb.FastPebLock) == .SUCCESS);
+ const ptr = peb.ProcessParameters.Environment;
var i: usize = 0;
while (ptr[i] != 0) {
- const key_start = i;
// There are some special environment variables that start with =,
// so we need a special case to not treat = as a key/value separator
// if it's the first character.
// https://devblogs.microsoft.com/oldnewthing/20100506-00/?p=14133
- if (ptr[key_start] == '=') i += 1;
-
+ const key_start = i;
+ if (ptr[i] == '=') i += 1;
while (ptr[i] != 0 and ptr[i] != '=') : (i += 1) {}
const key_w = ptr[key_start..i];
- if (std.mem.eql(u16, key_w, &.{ 'N', 'O', '_', 'C', 'O', 'L', 'O', 'R' })) {
+
+ const value_start = i + 1;
+ while (ptr[i] != 0) : (i += 1) {} // skip over '=' and value
+ const value_w = ptr[value_start..i];
+ i += 1; // skip over null byte
+
+ if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'N', 'O', '_', 'C', 'O', 'L', 'O', 'R' })) {
t.environ.exist.NO_COLOR = true;
- } else if (std.mem.eql(u16, key_w, &.{ 'C', 'L', 'I', 'C', 'O', 'L', 'O', 'R', '_', 'F', 'O', 'R', 'C', 'E' })) {
+ } else if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'C', 'L', 'I', 'C', 'O', 'L', 'O', 'R', '_', 'F', 'O', 'R', 'C', 'E' })) {
t.environ.exist.CLICOLOR_FORCE = true;
+ } else if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'Z', 'I', 'G', '_', 'P', 'R', 'O', 'G', 'R', 'E', 'S', 'S' })) {
+ t.environ.zig_progress_file = file: {
+ var value_buf: [std.fmt.count("{d}", .{std.math.maxInt(usize)})]u8 = undefined;
+ const len = std.unicode.calcWtf8Len(value_w);
+ if (len > value_buf.len) break :file error.UnrecognizedFormat;
+ assert(std.unicode.wtf16LeToWtf8(&value_buf, value_w) == len);
+ break :file .{
+ .handle = @ptrFromInt(std.fmt.parseInt(usize, value_buf[0..len], 10) catch
+ break :file error.UnrecognizedFormat),
+ .flags = .{ .nonblocking = true },
+ };
+ };
}
comptime assert(@sizeOf(Environ.String) == 0);
-
- while (ptr[i] != 0) : (i += 1) {} // skip over '=' and value
- i += 1; // skip over null byte
}
} else if (native_os == .wasi and !builtin.link_libc) {
var environ_count: usize = undefined;
@@ -14549,20 +14567,9 @@ fn scanEnviron(t: *Threaded) void {
t.environ.exist.CLICOLOR_FORCE = true;
} else if (std.mem.eql(u8, key, "ZIG_PROGRESS")) {
t.environ.zig_progress_file = file: {
- const int = std.fmt.parseInt(switch (@typeInfo(File.Handle)) {
- .int => |int_info| @Int(
- .unsigned,
- int_info.bits - @intFromBool(int_info.signedness == .signed),
- ),
- .pointer => usize,
- else => break :file error.UnsupportedOperation,
- }, value, 10) catch break :file error.UnrecognizedFormat;
break :file .{
- .handle = switch (@typeInfo(File.Handle)) {
- .int => int,
- .pointer => @ptrFromInt(int),
- else => comptime unreachable,
- },
+ .handle = std.fmt.parseInt(u31, value, 10) catch
+ break :file error.UnrecognizedFormat,
.flags = .{ .nonblocking = true },
};
};
@@ -14668,16 +14675,17 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
const any_ignore = (options.stdin == .ignore or options.stdout == .ignore or options.stderr == .ignore);
const dev_null_fd = if (any_ignore) try getDevNullFd(t) else undefined;
- const prog_pipe: [2]posix.fd_t = p: {
- if (options.progress_node.index == .none) {
- break :p .{ -1, -1 };
- } else {
- // We use CLOEXEC for the same reason as in `pipe_flags`.
- break :p try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
- }
- };
+ const prog_pipe: [2]posix.fd_t = if (options.progress_node.index != .none)
+ // We use CLOEXEC for the same reason as in `pipe_flags`.
+ try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true })
+ else
+ .{ -1, -1 };
errdefer destroyPipe(prog_pipe);
+ if (native_os == .linux and prog_pipe[0] != -1) {
+ _ = posix.system.fcntl(prog_pipe[0], posix.F.SETPIPE_SZ, @as(u32, std.Progress.max_packet_len * 2));
+ }
+
var arena_allocator = std.heap.ArenaAllocator.init(t.allocator);
defer arena_allocator.deinit();
const arena = arena_allocator.allocator();
@@ -14801,7 +14809,7 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
if (options.stderr == .pipe) posix.close(stderr_pipe[1]);
if (prog_pipe[1] != -1) posix.close(prog_pipe[1]);
- options.progress_node.setIpcFd(prog_pipe[0]);
+ options.progress_node.setIpcFile(t, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } });
return .{
.pid = pid,
@@ -15259,8 +15267,9 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
const prog_pipe = if (options.progress_node.index != .none) try t.windowsCreatePipe(.{
.server = .{ .attributes = .{ .INHERIT = false }, .mode = .{ .IO = .ASYNCHRONOUS } },
- .client = .{ .attributes = .{ .INHERIT = true }, .mode = .{ .IO = .SYNCHRONOUS_NONALERT } },
+ .client = .{ .attributes = .{ .INHERIT = true }, .mode = .{ .IO = .ASYNCHRONOUS } },
.inbound = true,
+ .quota = std.Progress.max_packet_len * 2,
}) else undefined;
errdefer if (options.progress_node.index != .none) for (prog_pipe) |handle| windows.CloseHandle(handle);
@@ -15476,7 +15485,7 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
if (options.progress_node.index != .none) {
windows.CloseHandle(prog_pipe[1]);
- options.progress_node.setIpcFd(prog_pipe[0]);
+ options.progress_node.setIpcFile(t, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } });
}
return .{
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig
@@ -11,7 +11,7 @@ const windows = std.os.windows;
const testing = std.testing;
const assert = std.debug.assert;
const posix = std.posix;
-const Writer = std.Io.Writer;
+const Writer = Io.Writer;
/// Currently this API only supports this value being set to stderr, which
/// happens automatically inside `start`.
@@ -21,13 +21,10 @@ io: Io,
terminal_mode: TerminalMode,
-update_worker: ?Io.Future(void),
+update_worker: ?Io.Future(WorkerError!void),
/// Atomically set by SIGWINCH as well as the root done() function.
redraw_event: Io.Event,
-/// Indicates a request to shut down and reset global state.
-/// Accessed atomically.
-done: bool,
need_clear: bool,
status: Status,
@@ -43,15 +40,19 @@ draw_buffer: []u8,
/// This is in a separate array from `node_storage` but with the same length so
/// that it can be iterated over efficiently without trashing too much of the
/// CPU cache.
-node_parents: []Node.Parent,
-node_storage: []Node.Storage,
-node_freelist_next: []Node.OptionalIndex,
+node_parents: [node_storage_buffer_len]Node.Parent,
+node_storage: [node_storage_buffer_len]Node.Storage,
+node_freelist_next: [node_storage_buffer_len]Node.OptionalIndex,
node_freelist: Freelist,
/// This is the number of elements in node arrays which have been used so far. Nodes before this
/// index are either active, or on the freelist. The remaining nodes are implicitly free. This
/// value may at times temporarily exceed the node count.
node_end_index: u32,
+ipc_next: Ipc.SlotAtomic,
+ipc: [ipc_storage_buffer_len]Ipc,
+ipc_files: [ipc_storage_buffer_len]Io.File,
+
start_failure: StartFailure,
pub const Status = enum {
@@ -77,6 +78,80 @@ const Freelist = packed struct(u32) {
generation: u24,
};
+pub const Ipc = packed struct(u32) {
+ /// mutex protecting `file` use, only locked by `serializeIpc`
+ locked: bool,
+ /// when unlocked: whether `file` is defined
+ /// when locked: whether `file` does not need to be closed
+ valid: bool,
+ unused: @Int(.unsigned, 32 - 2 - @bitSizeOf(Generation)) = 0,
+ generation: Generation,
+
+ pub const Slot = std.math.IntFittingRange(0, ipc_storage_buffer_len - 1);
+ pub const Generation = @Int(.unsigned, 32 - @bitSizeOf(Slot));
+
+ const SlotAtomic = @Int(.unsigned, std.math.ceilPowerOfTwoAssert(usize, @min(@bitSizeOf(Slot), 8)));
+
+ pub const Index = packed struct(u32) {
+ slot: Slot,
+ generation: Generation,
+ };
+
+ const Data = struct {
+ state: State,
+ bytes_read: u16,
+ main_index: u8,
+ start_index: u8,
+ nodes_len: u8,
+
+ const State = enum { unused, pending, ready };
+
+ /// No operations have been started on this file.
+ const unused: Data = .{
+ .state = .unused,
+ .bytes_read = 0,
+ .main_index = 0,
+ .start_index = 0,
+ .nodes_len = 0,
+ };
+
+ fn findLastPacket(data: *const Data, buffer: *const [max_packet_len]u8) struct { u16, u16 } {
+ assert(data.state == .ready);
+ var packet_start: u16 = 0;
+ var packet_end: u16 = 0;
+ const bytes_read = data.bytes_read;
+ while (bytes_read - packet_end >= 1) {
+ const nodes_len: u16 = buffer[packet_end];
+ const packet_len = 1 + nodes_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent));
+ if (packet_end + packet_len > bytes_read) break;
+ packet_start = packet_end;
+ packet_end += packet_len;
+ }
+ return .{ packet_start, packet_end };
+ }
+
+ fn rebase(
+ data: *Data,
+ buffer: *[max_packet_len]u8,
+ vec: *[1][]u8,
+ batch: *std.Io.Batch,
+ slot: Slot,
+ packet_end: u16,
+ ) void {
+ assert(data.state == .ready);
+ const remaining = buffer[packet_end..data.bytes_read];
+ @memmove(buffer[0..remaining.len], remaining);
+ vec.* = .{buffer[remaining.len..]};
+ batch.addAt(slot, .{ .file_read_streaming = .{
+ .file = global_progress.ipc_files[slot],
+ .data = vec,
+ } });
+ data.state = .pending;
+ data.bytes_read = @intCast(remaining.len);
+ }
+ };
+};
+
pub const TerminalMode = union(enum) {
off,
ansi_escape_codes,
@@ -116,7 +191,7 @@ pub const Node = struct {
pub const none: Node = .{ .index = .none };
- pub const max_name_len = 40;
+ pub const max_name_len = 120;
const Storage = extern struct {
/// Little endian.
@@ -127,25 +202,16 @@ pub const Node = struct {
name: [max_name_len]u8 align(@alignOf(usize)),
/// Not thread-safe.
- fn getIpcFd(s: Storage) ?Io.File.Handle {
- return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(Io.File.Handle)) {
- .int => @bitCast(s.completed_count),
- .pointer => @ptrFromInt(s.completed_count),
- else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
- } else null;
+ fn getIpcIndex(s: Storage) ?Ipc.Index {
+ return if (s.estimated_total_count == std.math.maxInt(u32)) @bitCast(s.completed_count) else null;
}
/// Thread-safe.
- fn setIpcFd(s: *Storage, fd: Io.File.Handle) void {
- const integer: u32 = switch (@typeInfo(Io.File.Handle)) {
- .int => @bitCast(fd),
- .pointer => @intCast(@intFromPtr(fd)),
- else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
- };
+ fn setIpcIndex(s: *Storage, ipc_index: Ipc.Index) void {
// `estimated_total_count` max int indicates the special state that
// causes `completed_count` to be treated as a file descriptor, so
// the order here matters.
- @atomicStore(u32, &s.completed_count, integer, .monotonic);
+ @atomicStore(u32, &s.completed_count, @bitCast(ipc_index), .monotonic);
@atomicStore(u32, &s.estimated_total_count, std.math.maxInt(u32), .release); // synchronizes with acquire in `serialize`
}
@@ -155,6 +221,14 @@ pub const Node = struct {
s.estimated_total_count = @byteSwap(s.estimated_total_count);
}
+ fn copyRoot(dest: *Node.Storage, src: *align(1) const Node.Storage) void {
+ dest.* = .{
+ .completed_count = src.completed_count,
+ .estimated_total_count = src.estimated_total_count,
+ .name = if (src.name[0] == 0) dest.name else src.name,
+ };
+ }
+
comptime {
assert((@sizeOf(Storage) % 4) == 0);
}
@@ -242,7 +316,7 @@ pub const Node = struct {
}
const free_index = @atomicRmw(u32, &global_progress.node_end_index, .Add, 1, .monotonic);
- if (free_index >= global_progress.node_storage.len) {
+ if (free_index >= node_storage_buffer_len) {
// Ran out of node storage memory. Progress for this node will not be tracked.
_ = @atomicRmw(u32, &global_progress.node_end_index, .Sub, 1, .monotonic);
return Node.none;
@@ -292,15 +366,17 @@ pub const Node = struct {
const index = n.index.unwrap() orelse return;
const storage = storageByIndex(index);
// Avoid u32 max int which is used to indicate a special state.
- const saturated = @min(std.math.maxInt(u32) - 1, count);
- @atomicStore(u32, &storage.estimated_total_count, saturated, .monotonic);
+ const saturated_total_count = @min(std.math.maxInt(u32) - 1, count);
+ @atomicStore(u32, &storage.estimated_total_count, saturated_total_count, .monotonic);
}
/// Thread-safe.
pub fn increaseEstimatedTotalItems(n: Node, count: usize) void {
const index = n.index.unwrap() orelse return;
const storage = storageByIndex(index);
- _ = @atomicRmw(u32, &storage.estimated_total_count, .Add, std.math.lossyCast(u32, count), .monotonic);
+ // Avoid u32 max int which is used to indicate a special state.
+ const saturated_total_count = @min(std.math.maxInt(u32) - 1, count);
+ _ = @atomicRmw(u32, &storage.estimated_total_count, .Add, saturated_total_count, .monotonic);
}
/// Finish a started `Node`. Thread-safe.
@@ -310,11 +386,25 @@ pub const Node = struct {
return;
}
const index = n.index.unwrap() orelse return;
+ const io = global_progress.io;
const parent_ptr = parentByIndex(index);
if (@atomicLoad(Node.Parent, parent_ptr, .monotonic).unwrap()) |parent_index| {
_ = @atomicRmw(u32, &storageByIndex(parent_index).completed_count, .Add, 1, .monotonic);
@atomicStore(Node.Parent, parent_ptr, .unused, .monotonic);
+ if (storageByIndex(index).getIpcIndex()) |ipc_index| {
+ const file = global_progress.ipc_files[ipc_index.slot];
+ const ipc = @atomicRmw(
+ Ipc,
+ &global_progress.ipc[ipc_index.slot],
+ .And,
+ .{ .locked = true, .valid = false, .generation = std.math.maxInt(Ipc.Generation) },
+ .release,
+ );
+ assert(ipc.valid and ipc.generation == ipc_index.generation);
+ if (!ipc.locked) file.close(io);
+ }
+
const freelist = &global_progress.node_freelist;
var old_freelist = @atomicLoad(Freelist, freelist, .monotonic);
while (true) {
@@ -332,42 +422,52 @@ pub const Node = struct {
};
}
} else {
- @atomicStore(bool, &global_progress.done, true, .monotonic);
- const io = global_progress.io;
- global_progress.redraw_event.set(io);
- if (global_progress.update_worker) |*worker| worker.await(io);
+ if (global_progress.update_worker) |*worker| worker.cancel(io) catch {};
+ for (&global_progress.ipc, &global_progress.ipc_files) |ipc, ipc_file| {
+ assert(!ipc.locked or !ipc.valid); // missing call to end()
+ if (ipc.locked or ipc.valid) ipc_file.close(io);
+ }
}
}
- /// Posix-only. Used by `std.process.Child`. Thread-safe.
- pub fn setIpcFd(node: Node, fd: Io.File.Handle) void {
+ /// Used by `std.process.Child`. Thread-safe.
+ pub fn setIpcFile(node: Node, expected_io_userdata: ?*anyopaque, file: Io.File) void {
const index = node.index.unwrap() orelse return;
- switch (@typeInfo(Io.File.Handle)) {
- .int => {
- assert(fd >= 0);
- assert(fd != posix.STDOUT_FILENO);
- assert(fd != posix.STDIN_FILENO);
- assert(fd != posix.STDERR_FILENO);
- },
- .pointer => {
- assert(fd != windows.INVALID_HANDLE_VALUE);
- },
- else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
- }
- storageByIndex(index).setIpcFd(fd);
+ const io = global_progress.io;
+ assert(io.userdata == expected_io_userdata);
+ for (0..ipc_storage_buffer_len) |_| {
+ const slot: Ipc.Slot = @truncate(
+ @atomicRmw(Ipc.SlotAtomic, &global_progress.ipc_next, .Add, 1, .monotonic),
+ );
+ if (slot >= ipc_storage_buffer_len) continue;
+ const ipc_ptr = &global_progress.ipc[slot];
+ const ipc = @atomicLoad(Ipc, ipc_ptr, .monotonic);
+ if (ipc.locked or ipc.valid) continue;
+ const generation = ipc.generation +% 1;
+ if (@cmpxchgWeak(
+ Ipc,
+ ipc_ptr,
+ ipc,
+ .{ .locked = false, .valid = true, .generation = generation },
+ .acquire,
+ .monotonic,
+ )) |_| continue;
+ global_progress.ipc_files[slot] = file;
+ storageByIndex(index).setIpcIndex(.{ .slot = slot, .generation = generation });
+ break;
+ } else file.close(io);
}
- /// Posix-only. Thread-safe. Assumes the node is storing an IPC file
- /// descriptor.
- pub fn getIpcFd(node: Node) ?Io.File.Handle {
- const index = node.index.unwrap() orelse return null;
- const storage = storageByIndex(index);
- const int = @atomicLoad(u32, &storage.completed_count, .monotonic);
- return switch (@typeInfo(Io.File.Handle)) {
- .int => @bitCast(int),
- .pointer => @ptrFromInt(int),
- else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
- };
+ pub fn setIpcIndex(node: Node, ipc_index: Ipc.Index) void {
+ storageByIndex(node.index.unwrap() orelse return).setIpcIndex(ipc_index);
+ }
+
+ /// Not thread-safe.
+ pub fn takeIpcIndex(node: Node) ?Ipc.Index {
+ const storage = storageByIndex(node.index.unwrap() orelse return null);
+ assert(storage.estimated_total_count == std.math.maxInt(u32));
+ @atomicStore(u32, &storage.estimated_total_count, 0, .monotonic);
+ return @bitCast(storage.completed_count);
}
fn storageByIndex(index: Node.Index) *Node.Storage {
@@ -387,7 +487,9 @@ pub const Node = struct {
const storage = storageByIndex(free_index);
@atomicStore(u32, &storage.completed_count, 0, .monotonic);
- @atomicStore(u32, &storage.estimated_total_count, std.math.lossyCast(u32, estimated_total_items), .monotonic);
+ // Avoid u32 max int which is used to indicate a special state.
+ const saturated_total_count = @min(std.math.maxInt(u32) - 1, estimated_total_items);
+ @atomicStore(u32, &storage.estimated_total_count, saturated_total_count, .monotonic);
const name_len = @min(max_name_len, name.len);
copyAtomicStore(storage.name[0..name_len], name[0..name_len]);
if (name_len < storage.name.len)
@@ -414,16 +516,20 @@ var global_progress: Progress = .{
.rows = 0,
.cols = 0,
.draw_buffer = undefined,
- .done = false,
.need_clear = false,
.status = .working,
- .start_failure = .unstarted,
- .node_parents = &node_parents_buffer,
- .node_storage = &node_storage_buffer,
- .node_freelist_next = &node_freelist_next_buffer,
+ .node_parents = undefined,
+ .node_storage = undefined,
+ .node_freelist_next = undefined,
.node_freelist = .{ .head = .none, .generation = 0 },
.node_end_index = 0,
+
+ .ipc_next = 0,
+ .ipc = undefined,
+ .ipc_files = undefined,
+
+ .start_failure = .unstarted,
};
pub const StartFailure = union(enum) {
@@ -433,17 +539,23 @@ pub const StartFailure = union(enum) {
parent_ipc: error{ UnsupportedOperation, UnrecognizedFormat },
};
-const node_storage_buffer_len = 83;
-var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined;
-var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined;
-var node_freelist_next_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined;
+/// One less than a power of two ensures `max_packet_len` is already a power of two.
+const node_storage_buffer_len = ipc_storage_buffer_len - 1;
+
+/// Power of two to avoid wasted `ipc_next` increments.
+const ipc_storage_buffer_len = 128;
+
+pub const max_packet_len = std.math.ceilPowerOfTwoAssert(
+ usize,
+ 1 + node_storage_buffer_len * (@sizeOf(Node.Storage) + @sizeOf(Node.OptionalIndex)),
+);
var default_draw_buffer: [4096]u8 = undefined;
var debug_start_trace = std.debug.Trace.init;
pub const have_ipc = switch (builtin.os.tag) {
- .wasi, .freestanding, .windows => false,
+ .wasi, .freestanding => false,
else => true,
};
@@ -475,9 +587,9 @@ pub fn start(io: Io, options: Options) Node {
}
debug_start_trace.add("first initialized here");
- @memset(global_progress.node_parents, .unused);
+ @memset(&global_progress.node_parents, .unused);
+ @memset(&global_progress.ipc, .{ .locked = false, .valid = false, .generation = 0 });
const root_node = Node.init(@enumFromInt(0), .none, options.root_name, options.estimated_total_items);
- global_progress.done = false;
global_progress.node_end_index = 1;
assert(options.draw_buffer.len >= 200);
@@ -551,58 +663,55 @@ pub fn setStatus(new_status: Status) void {
}
/// Returns whether a resize is needed to learn the terminal size.
-fn wait(io: Io, timeout_ns: u64) bool {
+fn wait(io: Io, timeout_ns: u64) Io.Cancelable!bool {
const timeout: Io.Timeout = .{ .duration = .{
.clock = .awake,
.raw = .fromNanoseconds(timeout_ns),
} };
const resize_flag = if (global_progress.redraw_event.waitTimeout(io, timeout)) |_| true else |err| switch (err) {
- error.Timeout, error.Canceled => false,
+ error.Timeout => false,
+ error.Canceled => |e| return e,
};
global_progress.redraw_event.reset();
return resize_flag or (global_progress.cols == 0);
}
-fn updateTask(io: Io) void {
+const WorkerError = error{WindowTooSmall} || Io.ConcurrentError || Io.Cancelable ||
+ Io.File.Writer.Error || Io.Operation.FileReadStreaming.Error;
+
+fn updateTask(io: Io) WorkerError!void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
+ serialized_buffer.init();
+ defer serialized_buffer.batch.cancel(io);
// In this function we bypass the wrapper code inside `Io.lockStderr` /
// `Io.tryLockStderr` in order to avoid clearing the terminal twice.
// We still want to go through the `Io` instance however in case it uses a
// task-switching mutex.
- {
- const resize_flag = wait(io, global_progress.initial_delay_ns);
- if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
- maybeUpdateSize(io, resize_flag) catch return;
-
- const buffer, _ = computeRedraw(&serialized_buffer);
- if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
- defer io.unlockStderr();
- global_progress.need_clear = true;
- locked_stderr.file_writer.interface.writeAll(buffer) catch return;
- }
+ try maybeUpdateSize(io, try wait(io, global_progress.initial_delay_ns));
+ errdefer {
+ const cancel_protection = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(cancel_protection);
+ const stderr = io.vtable.lockStderr(io.userdata, null) catch |err| switch (err) {
+ error.Canceled => unreachable, // blocked
+ };
+ defer io.unlockStderr();
+ clearWrittenWithEscapeCodes(stderr.file_writer) catch {};
}
-
while (true) {
- const resize_flag = wait(io, global_progress.refresh_rate_ns);
-
- if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- const stderr = io.vtable.lockStderr(io.userdata, null) catch return;
- defer io.unlockStderr();
- return clearWrittenWithEscapeCodes(stderr.file_writer) catch {};
- }
-
- maybeUpdateSize(io, resize_flag) catch return;
-
- const buffer, _ = computeRedraw(&serialized_buffer);
- if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
+ const buffer, _ = try computeRedraw(io, &serialized_buffer);
+ if (try io.vtable.tryLockStderr(io.userdata, null)) |locked_stderr| {
defer io.unlockStderr();
global_progress.need_clear = true;
- locked_stderr.file_writer.interface.writeAll(buffer) catch return;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch |err| switch (err) {
+ error.WriteFailed => return locked_stderr.file_writer.err.?,
+ };
}
+
+ try maybeUpdateSize(io, try wait(io, global_progress.refresh_rate_ns));
}
}
@@ -614,79 +723,60 @@ fn windowsApiWriteMarker() void {
_ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null);
}
-fn windowsApiUpdateTask(io: Io) void {
+fn windowsApiUpdateTask(io: Io) WorkerError!void {
+ // Store this data in the thread so that it does not need to be part of the
+ // linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
+ serialized_buffer.init();
+ defer serialized_buffer.batch.cancel(io);
// In this function we bypass the wrapper code inside `Io.lockStderr` /
// `Io.tryLockStderr` in order to avoid clearing the terminal twice.
// We still want to go through the `Io` instance however in case it uses a
// task-switching mutex.
- {
- const resize_flag = wait(io, global_progress.initial_delay_ns);
- if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
- maybeUpdateSize(io, resize_flag) catch return;
-
- const buffer, const nl_n = computeRedraw(&serialized_buffer);
- if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
- defer io.unlockStderr();
- windowsApiWriteMarker();
- global_progress.need_clear = true;
- locked_stderr.file_writer.interface.writeAll(buffer) catch return;
- windowsApiMoveToMarker(nl_n) catch return;
- }
+ try maybeUpdateSize(io, try wait(io, global_progress.initial_delay_ns));
+ errdefer {
+ const cancel_protection = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(cancel_protection);
+ _ = io.vtable.lockStderr(io.userdata, null) catch |err| switch (err) {
+ error.Canceled => unreachable, // blocked
+ };
+ defer io.unlockStderr();
+ clearWrittenWindowsApi() catch {};
}
-
while (true) {
- const resize_flag = wait(io, global_progress.refresh_rate_ns);
-
- if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- _ = io.vtable.lockStderr(io.userdata, null) catch return;
- defer io.unlockStderr();
- return clearWrittenWindowsApi() catch {};
- }
-
- maybeUpdateSize(io, resize_flag) catch return;
-
- const buffer, const nl_n = computeRedraw(&serialized_buffer);
+ const buffer, const nl_n = try computeRedraw(io, &serialized_buffer);
if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
defer io.unlockStderr();
- clearWrittenWindowsApi() catch return;
+ try clearWrittenWindowsApi();
windowsApiWriteMarker();
global_progress.need_clear = true;
- locked_stderr.file_writer.interface.writeAll(buffer) catch return;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch |err| switch (err) {
+ error.WriteFailed => return locked_stderr.file_writer.err.?,
+ };
windowsApiMoveToMarker(nl_n) catch return;
}
+
+ try maybeUpdateSize(io, try wait(io, global_progress.refresh_rate_ns));
}
}
-fn ipcThreadRun(io: Io, file: Io.File) void {
+fn ipcThreadRun(io: Io, file: Io.File) WorkerError!void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
+ serialized_buffer.init();
+ defer serialized_buffer.batch.cancel(io);
+ var fw = file.writerStreaming(io, &.{});
- {
- _ = wait(io, global_progress.initial_delay_ns);
-
- if (@atomicLoad(bool, &global_progress.done, .monotonic))
- return;
-
- const serialized = serialize(&serialized_buffer);
- writeIpc(io, file, serialized) catch |err| switch (err) {
- error.BrokenPipe => return,
- };
- }
-
+ _ = try io.sleep(.fromNanoseconds(global_progress.initial_delay_ns), .awake);
while (true) {
- _ = wait(io, global_progress.refresh_rate_ns);
-
- if (@atomicLoad(bool, &global_progress.done, .monotonic))
- return;
-
- const serialized = serialize(&serialized_buffer);
- writeIpc(io, file, serialized) catch |err| switch (err) {
- error.BrokenPipe => return,
+ writeIpc(&fw.interface, try serialize(io, &serialized_buffer)) catch |err| switch (err) {
+ error.WriteFailed => return fw.err.?,
};
+
+ _ = try io.sleep(.fromNanoseconds(global_progress.refresh_rate_ns), .awake);
}
}
@@ -865,31 +955,49 @@ const Serialized = struct {
const Buffer = struct {
parents: [node_storage_buffer_len]Node.Parent,
storage: [node_storage_buffer_len]Node.Storage,
- map: [node_storage_buffer_len]Node.OptionalIndex,
- parents_copy: [node_storage_buffer_len]Node.Parent,
- storage_copy: [node_storage_buffer_len]Node.Storage,
- ipc_metadata_fds_copy: [node_storage_buffer_len]Fd,
- ipc_metadata_copy: [node_storage_buffer_len]SavedMetadata,
-
- ipc_metadata_fds: [node_storage_buffer_len]Fd,
- ipc_metadata: [node_storage_buffer_len]SavedMetadata,
+ ipc_start: u8,
+ ipc_end: u8,
+ ipc_data: [ipc_storage_buffer_len]Ipc.Data,
+ ipc_buffers: [ipc_storage_buffer_len][max_packet_len]u8,
+ ipc_vecs: [ipc_storage_buffer_len][1][]u8,
+ batch_storage: [ipc_storage_buffer_len]Io.Operation.Storage,
+ batch: Io.Batch,
+
+ fn init(buffer: *Buffer) void {
+ buffer.ipc_start = 0;
+ buffer.ipc_end = 0;
+ @memset(&buffer.ipc_data, .unused);
+ buffer.batch = .init(&buffer.batch_storage);
+ }
};
};
-fn serialize(serialized_buffer: *Serialized.Buffer) Serialized {
- var serialized_len: usize = 0;
- var any_ipc = false;
+fn serialize(io: Io, serialized_buffer: *Serialized.Buffer) !Serialized {
+ var prev_parents: [node_storage_buffer_len]Node.Parent = undefined;
+ var prev_storage: [node_storage_buffer_len]Node.Storage = undefined;
+ {
+ const ipc_start = serialized_buffer.ipc_start;
+ const ipc_end = serialized_buffer.ipc_end;
+ @memcpy(prev_parents[ipc_start..ipc_end], serialized_buffer.parents[ipc_start..ipc_end]);
+ @memcpy(prev_storage[ipc_start..ipc_end], serialized_buffer.storage[ipc_start..ipc_end]);
+ }
// Iterate all of the nodes and construct a serializable copy of the state that can be examined
// without atomics. The `@min` call is here because `node_end_index` might briefly exceed the
// node count sometimes.
- const end_index = @min(@atomicLoad(u32, &global_progress.node_end_index, .monotonic), global_progress.node_storage.len);
+ const end_index = @min(
+ @atomicLoad(u32, &global_progress.node_end_index, .monotonic),
+ node_storage_buffer_len,
+ );
+ var map: [node_storage_buffer_len]Node.OptionalIndex = undefined;
+ var serialized_len: u8 = 0;
+ var maybe_ipc_start: ?u8 = null;
for (
global_progress.node_parents[0..end_index],
global_progress.node_storage[0..end_index],
- serialized_buffer.map[0..end_index],
- ) |*parent_ptr, *storage_ptr, *map| {
+ map[0..end_index],
+ ) |*parent_ptr, *storage_ptr, *map_entry| {
const parent = @atomicLoad(Node.Parent, parent_ptr, .monotonic);
if (parent == .unused) {
// We might read "mixed" node data in this loop, due to weird atomic things
@@ -903,17 +1011,17 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized {
// parent, it will just not be printed at all. The general idea here is that performance
// is more important than 100% correct output every frame, given that this API is likely
// to be used in hot paths!
- map.* = .none;
+ map_entry.* = .none;
continue;
}
const dest_storage = &serialized_buffer.storage[serialized_len];
copyAtomicLoad(&dest_storage.name, &storage_ptr.name);
- dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // sychronizes with release in `setIpcFd`
+ dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // sychronizes with release in `setIpcIndex`
dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic);
- any_ipc = any_ipc or (dest_storage.getIpcFd() != null);
serialized_buffer.parents[serialized_len] = parent;
- map.* = @enumFromInt(serialized_len);
+ map_entry.* = @enumFromInt(serialized_len);
+ if (maybe_ipc_start == null and dest_storage.getIpcIndex() != null) maybe_ipc_start = serialized_len;
serialized_len += 1;
}
@@ -922,266 +1030,212 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized {
parent.* = switch (parent.*) {
.unused => unreachable,
.none => .none,
- _ => |p| serialized_buffer.map[@intFromEnum(p)].toParent(),
+ _ => |p| map[@intFromEnum(p)].toParent(),
};
}
- // Find nodes which correspond to child processes.
- if (any_ipc)
- serialized_len = serializeIpc(serialized_len, serialized_buffer);
-
- return .{
- .parents = serialized_buffer.parents[0..serialized_len],
- .storage = serialized_buffer.storage[0..serialized_len],
+ // Fill pipe buffers.
+ const batch = &serialized_buffer.batch;
+ batch.awaitConcurrent(io, .{
+ .duration = .{ .raw = .zero, .clock = .awake },
+ }) catch |err| switch (err) {
+ error.Timeout => {},
+ else => |e| return e,
+ };
+ var ready_len: u8 = 0;
+ while (batch.next()) |operation| switch (operation.index) {
+ 0...ipc_storage_buffer_len - 1 => {
+ const ipc_data = &serialized_buffer.ipc_data[operation.index];
+ ipc_data.bytes_read += @intCast(
+ operation.result.file_read_streaming catch |err| switch (err) {
+ error.EndOfStream => {
+ const file = global_progress.ipc_files[operation.index];
+ const ipc = @atomicRmw(
+ Ipc,
+ &global_progress.ipc[operation.index],
+ .And,
+ .{
+ .locked = false,
+ .valid = true,
+ .generation = std.math.maxInt(Ipc.Generation),
+ },
+ .release,
+ );
+ assert(ipc.locked);
+ if (!ipc.valid) file.close(io);
+ ipc_data.* = .unused;
+ continue;
+ },
+ else => |e| return e,
+ },
+ );
+ assert(ipc_data.state == .pending);
+ ipc_data.state = .ready;
+ ready_len += 1;
+ },
+ else => unreachable,
};
-}
-
-const SavedMetadata = struct {
- remaining_read_trash_bytes: u16,
- main_index: u8,
- start_index: u8,
- nodes_len: u8,
-};
-
-const Fd = enum(i32) {
- _,
-
- fn init(fd: Io.File.Handle) Fd {
- return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd);
- }
-
- fn get(fd: Fd) Io.File.Handle {
- return if (is_windows)
- @ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd)))))
- else
- @intFromEnum(fd);
- }
-};
-
-var ipc_metadata_len: u8 = 0;
-
-fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize {
- const io = global_progress.io;
- const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy;
- const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy;
- const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds;
- const ipc_metadata = &serialized_buffer.ipc_metadata;
-
- var serialized_len = start_serialized_len;
- var pipe_buf: [2 * 4096]u8 = undefined;
-
- const old_ipc_metadata_fds = ipc_metadata_fds_copy[0..ipc_metadata_len];
- const old_ipc_metadata = ipc_metadata_copy[0..ipc_metadata_len];
- ipc_metadata_len = 0;
- main_loop: for (
- serialized_buffer.parents[0..serialized_len],
- serialized_buffer.storage[0..serialized_len],
- 0..,
+ // Find nodes which correspond to child processes.
+ const ipc_start = maybe_ipc_start orelse serialized_len;
+ serialized_buffer.ipc_start = ipc_start;
+ for (
+ serialized_buffer.parents[ipc_start..serialized_len],
+ serialized_buffer.storage[ipc_start..serialized_len],
+ ipc_start..,
) |main_parent, *main_storage, main_index| {
if (main_parent == .unused) continue;
- const file: Io.File = .{
- .handle = main_storage.getIpcFd() orelse continue,
- .flags = .{ .nonblocking = true },
- };
- const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata);
- var bytes_read: usize = 0;
- while (true) {
- const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) {
- error.WouldBlock, error.EndOfStream => break,
- else => |e| {
- std.log.debug("failed to read child progress data: {t}", .{e});
- main_storage.completed_count = 0;
- main_storage.estimated_total_count = 0;
- continue :main_loop;
- },
- };
- if (opt_saved_metadata) |m| {
- if (m.remaining_read_trash_bytes > 0) {
- assert(bytes_read == 0);
- if (m.remaining_read_trash_bytes >= n) {
- m.remaining_read_trash_bytes = @intCast(m.remaining_read_trash_bytes - n);
- continue;
- }
- const src = pipe_buf[m.remaining_read_trash_bytes..n];
- @memmove(pipe_buf[0..src.len], src);
- m.remaining_read_trash_bytes = 0;
- bytes_read = src.len;
- continue;
- }
- }
- bytes_read += n;
- }
- // Ignore all but the last message on the pipe.
- var input: []u8 = pipe_buf[0..bytes_read];
- if (input.len == 0) {
- serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, file.handle);
- continue;
- }
-
- const storage, const parents = while (true) {
- const subtree_len: usize = input[0];
- const expected_bytes = 1 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent));
- if (input.len < expected_bytes) {
- // Ignore short reads. We'll handle the next full message when it comes instead.
- const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len);
- serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, file.handle);
- continue :main_loop;
- }
- if (input.len > expected_bytes) {
- input = input[expected_bytes..];
- continue;
- }
- const storage_bytes = input[1..][0 .. subtree_len * @sizeOf(Node.Storage)];
- const parents_bytes = input[1 + storage_bytes.len ..][0 .. subtree_len * @sizeOf(Node.Parent)];
- break .{
- std.mem.bytesAsSlice(Node.Storage, storage_bytes),
- std.mem.bytesAsSlice(Node.Parent, parents_bytes),
- };
- };
-
- const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len));
+ const ipc_index = main_storage.getIpcIndex() orelse continue;
+ const ipc = &global_progress.ipc[ipc_index.slot];
+ const ipc_data = &serialized_buffer.ipc_data[ipc_index.slot];
+ state: switch (ipc_data.state) {
+ .unused => {
+ if (@cmpxchgWeak(
+ Ipc,
+ ipc,
+ .{ .locked = false, .valid = true, .generation = ipc_index.generation },
+ .{ .locked = true, .valid = true, .generation = ipc_index.generation },
+ .acquire,
+ .monotonic,
+ )) |_| continue;
+
+ const ipc_vec = &serialized_buffer.ipc_vecs[ipc_index.slot];
+ ipc_vec.* = .{&serialized_buffer.ipc_buffers[ipc_index.slot]};
+ batch.addAt(ipc_index.slot, .{ .file_read_streaming = .{
+ .file = global_progress.ipc_files[ipc_index.slot],
+ .data = ipc_vec,
+ } });
+
+ ipc_data.* = .{
+ .state = .pending,
+ .bytes_read = 0,
+ .main_index = @intCast(main_index),
+ .start_index = serialized_len,
+ .nodes_len = 0,
+ };
+ main_storage.completed_count = 0;
+ main_storage.estimated_total_count = 0;
+ },
+ .pending => {
+ const start_index = ipc_data.start_index;
+ const nodes_len = @min(ipc_data.nodes_len, node_storage_buffer_len - serialized_len);
+
+ main_storage.copyRoot(&prev_storage[ipc_data.main_index]);
+ @memcpy(
+ serialized_buffer.storage[serialized_len..][0..nodes_len],
+ prev_storage[start_index..][0..nodes_len],
+ );
+ for (
+ serialized_buffer.parents[serialized_len..][0..nodes_len],
+ prev_parents[serialized_len..][0..nodes_len],
+ ) |*parent, prev_parent| parent.* = switch (prev_parent) {
+ .none, .unused => .none,
+ _ => if (@intFromEnum(prev_parent) == ipc_data.main_index)
+ @enumFromInt(main_index)
+ else if (@intFromEnum(prev_parent) >= start_index and
+ @intFromEnum(prev_parent) < start_index + nodes_len)
+ @enumFromInt(@intFromEnum(prev_parent) - start_index + serialized_len)
+ else
+ .none,
+ };
- // Remember in case the pipe is empty on next update.
- ipc_metadata_fds[ipc_metadata_len] = Fd.init(file.handle);
- ipc_metadata[ipc_metadata_len] = .{
- .remaining_read_trash_bytes = 0,
- .start_index = @intCast(serialized_len),
- .nodes_len = nodes_len,
- .main_index = @intCast(main_index),
- };
- ipc_metadata_len += 1;
-
- // Mount the root here.
- copyRoot(main_storage, &storage[0]);
- if (is_big_endian) main_storage.byteSwap();
-
- // Copy the rest of the tree to the end.
- const storage_dest = serialized_buffer.storage[serialized_len..][0..nodes_len];
- @memcpy(storage_dest, storage[1..][0..nodes_len]);
-
- // Always little-endian over the pipe.
- if (is_big_endian) for (storage_dest) |*s| s.byteSwap();
-
- // Patch up parent pointers taking into account how the subtree is mounted.
- for (serialized_buffer.parents[serialized_len..][0..nodes_len], parents[1..][0..nodes_len]) |*dest, p| {
- dest.* = switch (p) {
- // Fix bad data so the rest of the code does not see `unused`.
- .none, .unused => .none,
- // Root node is being mounted here.
- @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index),
- // Other nodes mounted at the end.
- // Don't trust child data; if the data is outside the expected range, ignore the data.
- // This also handles the case when data was truncated.
- _ => |off| if (@intFromEnum(off) > nodes_len)
- .none
- else
- @enumFromInt(serialized_len + @intFromEnum(off) - 1),
- };
+ ipc_data.main_index = @intCast(main_index);
+ ipc_data.start_index = serialized_len;
+ ipc_data.nodes_len = nodes_len;
+ serialized_len += nodes_len;
+ },
+ .ready => {
+ const ipc_buffer = &serialized_buffer.ipc_buffers[ipc_index.slot];
+ const packet_start, const packet_end = ipc_data.findLastPacket(ipc_buffer);
+ const packet_is_empty = packet_end - packet_start <= 1;
+ if (!packet_is_empty) {
+ const storage, const parents, const nodes_len = packet_contents: {
+ var packet_index: usize = packet_start;
+ const nodes_len: u16 = ipc_buffer[packet_index];
+ packet_index += 1;
+ const storage_bytes =
+ ipc_buffer[packet_index..][0 .. nodes_len * @sizeOf(Node.Storage)];
+ packet_index += storage_bytes.len;
+ const parents_bytes =
+ ipc_buffer[packet_index..][0 .. nodes_len * @sizeOf(Node.Parent)];
+ packet_index += parents_bytes.len;
+ assert(packet_index == packet_end);
+ const storage: []align(1) const Node.Storage = @ptrCast(storage_bytes);
+ const parents: []align(1) const Node.Parent = @ptrCast(parents_bytes);
+ const children_nodes_len =
+ @min(nodes_len - 1, node_storage_buffer_len - serialized_len);
+ break :packet_contents .{ storage, parents, children_nodes_len };
+ };
+
+ // Mount the root here.
+ main_storage.copyRoot(&storage[0]);
+ if (is_big_endian) main_storage.byteSwap();
+
+ // Copy the rest of the tree to the end.
+ const serialized_storage =
+ serialized_buffer.storage[serialized_len..][0..nodes_len];
+ @memcpy(serialized_storage, storage[1..][0..nodes_len]);
+ if (is_big_endian) for (serialized_storage) |*s| s.byteSwap();
+
+ // Patch up parent pointers taking into account how the subtree is mounted.
+ for (
+ serialized_buffer.parents[serialized_len..][0..nodes_len],
+ parents[1..][0..nodes_len],
+ ) |*parent, prev_parent| parent.* = switch (prev_parent) {
+ // Fix bad data so the rest of the code does not see `unused`.
+ .none, .unused => .none,
+ // Root node is being mounted here.
+ @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index),
+ // Other nodes mounted at the end.
+ // Don't trust child data; if the data is outside the expected range,
+ // ignore the data. This also handles the case when data was truncated.
+ _ => if (@intFromEnum(prev_parent) <= nodes_len)
+ @enumFromInt(@intFromEnum(prev_parent) - 1 + serialized_len)
+ else
+ .none,
+ };
+
+ ipc_data.main_index = @intCast(main_index);
+ ipc_data.start_index = serialized_len;
+ ipc_data.nodes_len = nodes_len;
+ serialized_len += nodes_len;
+ }
+ const ipc_vec = &serialized_buffer.ipc_vecs[ipc_index.slot];
+ ipc_data.rebase(ipc_buffer, ipc_vec, batch, ipc_index.slot, packet_end);
+ ready_len -= 1;
+ if (packet_is_empty) continue :state .pending;
+ },
}
-
- serialized_len += nodes_len;
- }
-
- // Save a copy in case any pipes are empty on the next update.
- @memcpy(serialized_buffer.parents_copy[0..serialized_len], serialized_buffer.parents[0..serialized_len]);
- @memcpy(serialized_buffer.storage_copy[0..serialized_len], serialized_buffer.storage[0..serialized_len]);
- @memcpy(ipc_metadata_fds_copy[0..ipc_metadata_len], ipc_metadata_fds[0..ipc_metadata_len]);
- @memcpy(ipc_metadata_copy[0..ipc_metadata_len], ipc_metadata[0..ipc_metadata_len]);
-
- return serialized_len;
-}
-
-fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void {
- dest.* = .{
- .completed_count = src.completed_count,
- .estimated_total_count = src.estimated_total_count,
- .name = if (src.name[0] == 0) dest.name else src.name,
- };
-}
-
-fn findOld(
- ipc_fd: Io.File.Handle,
- old_metadata_fds: []Fd,
- old_metadata: []SavedMetadata,
-) ?*SavedMetadata {
- for (old_metadata_fds, old_metadata) |fd, *m| {
- if (fd.get() == ipc_fd)
- return m;
}
- return null;
-}
-
-fn useSavedIpcData(
- start_serialized_len: usize,
- serialized_buffer: *Serialized.Buffer,
- main_storage: *Node.Storage,
- main_index: usize,
- opt_saved_metadata: ?*SavedMetadata,
- remaining_read_trash_bytes: u16,
- fd: Io.File.Handle,
-) usize {
- const parents_copy = &serialized_buffer.parents_copy;
- const storage_copy = &serialized_buffer.storage_copy;
- const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds;
- const ipc_metadata = &serialized_buffer.ipc_metadata;
-
- const saved_metadata = opt_saved_metadata orelse {
- main_storage.completed_count = 0;
- main_storage.estimated_total_count = 0;
- if (remaining_read_trash_bytes > 0) {
- ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
- ipc_metadata[ipc_metadata_len] = .{
- .remaining_read_trash_bytes = remaining_read_trash_bytes,
- .start_index = @intCast(start_serialized_len),
- .nodes_len = 0,
- .main_index = @intCast(main_index),
- };
- ipc_metadata_len += 1;
- }
- return start_serialized_len;
+ serialized_buffer.ipc_end = serialized_len;
+
+ // Ignore data from unused pipes. This ensures that if a child process exists we will
+ // eventually see `EndOfStream` and close the pipe.
+ if (ready_len > 0) for (
+ &serialized_buffer.ipc_data,
+ &serialized_buffer.ipc_buffers,
+ &serialized_buffer.ipc_vecs,
+ 0..,
+ ) |*ipc_data, *ipc_buffer, *ipc_vec, ipc_slot| switch (ipc_data.state) {
+ .unused, .pending => {},
+ .ready => {
+ _, const packet_end = ipc_data.findLastPacket(ipc_buffer);
+ ipc_data.rebase(ipc_buffer, ipc_vec, batch, @intCast(ipc_slot), packet_end);
+ ready_len -= 1;
+ },
};
+ assert(ready_len == 0);
- const start_index = saved_metadata.start_index;
- const nodes_len = @min(saved_metadata.nodes_len, serialized_buffer.storage.len - start_serialized_len);
- const old_main_index = saved_metadata.main_index;
-
- ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
- ipc_metadata[ipc_metadata_len] = .{
- .remaining_read_trash_bytes = remaining_read_trash_bytes,
- .start_index = @intCast(start_serialized_len),
- .nodes_len = nodes_len,
- .main_index = @intCast(main_index),
+ return .{
+ .parents = serialized_buffer.parents[0..serialized_len],
+ .storage = serialized_buffer.storage[0..serialized_len],
};
- ipc_metadata_len += 1;
-
- const parents = parents_copy[start_index..][0..nodes_len];
- const storage = storage_copy[start_index..][0..nodes_len];
-
- copyRoot(main_storage, &storage_copy[old_main_index]);
-
- @memcpy(serialized_buffer.storage[start_serialized_len..][0..storage.len], storage);
-
- for (serialized_buffer.parents[start_serialized_len..][0..parents.len], parents) |*dest, p| {
- dest.* = switch (p) {
- .none, .unused => .none,
- _ => |prev| d: {
- if (@intFromEnum(prev) == old_main_index) {
- break :d @enumFromInt(main_index);
- } else if (@intFromEnum(prev) > nodes_len) {
- break :d .none;
- } else {
- break :d @enumFromInt(@intFromEnum(prev) - start_index + start_serialized_len);
- }
- },
- };
- }
-
- return start_serialized_len + storage.len;
}
-fn computeRedraw(serialized_buffer: *Serialized.Buffer) struct { []u8, usize } {
- const serialized = serialize(serialized_buffer);
+fn computeRedraw(io: Io, serialized_buffer: *Serialized.Buffer) !struct { []u8, usize } {
+ if (global_progress.rows == 0 or global_progress.cols == 0) return error.WindowTooSmall;
+
+ const serialized = try serialize(io, serialized_buffer);
// Now we can analyze our copy of the graph without atomics, reconstructing
// children lists which do not exist in the canonical data. These are
@@ -1416,9 +1470,7 @@ fn withinRowLimit(p: *Progress, nl_n: usize) bool {
return nl_n + 2 < p.rows;
}
-var remaining_write_trash_bytes: usize = 0;
-
-fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!void {
+fn writeIpc(writer: *Io.Writer, serialized: Serialized) Io.Writer.Error!void {
// Byteswap if necessary to ensure little endian over the pipe. This is
// needed because the parent or child process might be running in qemu.
if (is_big_endian) for (serialized.storage) |*s| s.byteSwap();
@@ -1429,62 +1481,8 @@ fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!voi
const storage = std.mem.sliceAsBytes(serialized.storage);
const parents = std.mem.sliceAsBytes(serialized.parents);
- var vecs: [3][]const u8 = .{ header, storage, parents };
-
- // Ensures the packet can fit in the pipe buffer.
- const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) +
- node_storage_buffer_len * @sizeOf(Node.OptionalIndex);
- comptime assert(upper_bound_msg_len <= 4096);
-
- while (remaining_write_trash_bytes > 0) {
- // We do this in a separate write call to give a better chance for the
- // writev below to be in a single packet.
- const n = @min(parents.len, remaining_write_trash_bytes);
- if (file.writeStreaming(io, &.{}, &.{parents[0..n]}, 1)) |written| {
- remaining_write_trash_bytes -= written;
- continue;
- } else |err| switch (err) {
- error.WouldBlock => return,
- error.BrokenPipe => return error.BrokenPipe,
- else => |e| {
- std.log.debug("failed to send progress to parent process: {t}", .{e});
- return error.BrokenPipe;
- },
- }
- }
-
- // If this write would block we do not want to keep trying, but we need to
- // know if a partial message was written.
- if (writevNonblock(io, file, &vecs)) |written| {
- const total = header.len + storage.len + parents.len;
- if (written < total) {
- remaining_write_trash_bytes = total - written;
- }
- } else |err| switch (err) {
- error.WouldBlock => {},
- error.BrokenPipe => return error.BrokenPipe,
- else => |e| {
- std.log.debug("failed to send progress to parent process: {t}", .{e});
- return error.BrokenPipe;
- },
- }
-}
-
-fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error!usize {
- var iov_index: usize = 0;
- var written: usize = 0;
- var total_written: usize = 0;
- while (true) {
- while (if (iov_index < iov.len)
- written >= iov[iov_index].len
- else
- return total_written) : (iov_index += 1) written -= iov[iov_index].len;
- iov[iov_index].ptr += written;
- iov[iov_index].len -= written;
- written = try file.writeStreaming(io, &.{}, iov, 1);
- if (written == 0) return total_written;
- total_written += written;
- }
+ var vec = [3][]const u8{ header, storage, parents };
+ try writer.writeVecAll(&vec);
}
fn maybeUpdateSize(io: Io, resize_flag: bool) !void {
diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig
@@ -1848,6 +1848,24 @@ pub const F = struct {
pub const RDLCK = if (is_sparc) 1 else 0;
pub const WRLCK = if (is_sparc) 2 else 1;
pub const UNLCK = if (is_sparc) 3 else 2;
+
+ pub const LINUX_SPECIFIC_BASE = 1024;
+
+ pub const SETLEASE = LINUX_SPECIFIC_BASE + 0;
+ pub const GETLEASE = LINUX_SPECIFIC_BASE + 1;
+ pub const NOTIFY = LINUX_SPECIFIC_BASE + 2;
+ pub const DUPFD_QUERY = LINUX_SPECIFIC_BASE + 3;
+ pub const CREATED_QUERY = LINUX_SPECIFIC_BASE + 4;
+ pub const CANCELLK = LINUX_SPECIFIC_BASE + 5;
+ pub const DUPFD_CLOEXEC = LINUX_SPECIFIC_BASE + 6;
+ pub const SETPIPE_SZ = LINUX_SPECIFIC_BASE + 7;
+ pub const GETPIPE_SZ = LINUX_SPECIFIC_BASE + 8;
+ pub const ADD_SEALS = LINUX_SPECIFIC_BASE + 9;
+ pub const GET_SEALS = LINUX_SPECIFIC_BASE + 10;
+ pub const GET_RW_HINT = LINUX_SPECIFIC_BASE + 11;
+ pub const SET_RW_HINT = LINUX_SPECIFIC_BASE + 12;
+ pub const GET_FILE_RW_HINT = LINUX_SPECIFIC_BASE + 13;
+ pub const SET_FILE_RW_HINT = LINUX_SPECIFIC_BASE + 14;
};
pub const F_OWNER = enum(i32) {