commit 642f329ac91d69d02588ba15714edafb09e709da (tree)
parent e2a266e744adaa7ee84a512fdf4cea44686ca0b6
Author: Andrew Kelley <andrew@ziglang.org>
Date: Fri, 9 Jan 2026 15:06:50 -0800
std.Io: exploring a different batch API proposal
Diffstat:
5 files changed, 312 insertions(+), 152 deletions(-)
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
@@ -149,7 +149,10 @@ pub const VTable = struct {
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
- operate: *const fn (?*anyopaque, []Operation) void,
+ batch: *const fn (?*anyopaque, []Operation) ConcurrentError!void,
+ batchSubmit: *const fn (?*anyopaque, *Batch) void,
+ batchWait: *const fn (?*anyopaque, *Batch, resubmissions: []const usize, Timeout) Batch.WaitError!usize,
+ batchCancel: *const fn (?*anyopaque, *Batch) void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus,
@@ -253,26 +256,96 @@ pub const VTable = struct {
};
pub const Operation = union(enum) {
- noop,
+ noop: Noop,
file_read_streaming: FileReadStreaming,
+ pub const Noop = struct {
+ reserved: [2]usize,
+ status: Status(void) = .{ .result = {} },
+ };
+
+ /// Returns 0 on end of stream.
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
- /// Causes `result` to return `error.WouldBlock` instead of blocking.
- nonblocking: bool = false,
- /// Returns 0 on end of stream.
- result: File.Reader.Error!usize,
+ status: Status(File.Reader.Error!usize) = .{ .unstarted = {} },
};
+
+ pub fn Status(Result: type) type {
+ return union {
+ unstarted: void,
+ pending: usize,
+ result: Result,
+ };
+ }
};
-/// Performs all `operations` in a non-deterministic order. Returns after all
-/// `operations` have been completed. The degree to which the operations are
-/// performed concurrently is determined by the `Io` implementation.
-pub fn operate(io: Io, operations: []Operation) void {
- return io.vtable.operate(io.userdata, operations);
+/// Performs all `operations` in an unspecified order, concurrently.
+///
+/// Returns after all `operations` have been completed. If the operations could
+/// not be completed concurrently, returns `error.ConcurrencyUnavailable`.
+///
+/// With this API, it is rare for concurrency to not be available. Even a
+/// single-threaded `Io` implementation can, for example, take advantage of
+/// poll() to implement this. Note that poll() is fallible however.
+///
+/// If `operations.len` is one, `error.ConcurrencyUnavailable` is unreachable.
+///
+/// On entry, all operations must already have `.status = .unstarted` except
+/// noops must have `.status = .{ .result = {} }`, to safety check the state
+/// transitions.
+///
+/// On return, all operations have `.status = .{ .result = ... }`.
+pub fn batch(io: Io, operations: []Operation) ConcurrentError!void {
+ return io.vtable.batch(io.userdata, operations);
+}
+
+/// Performs one `Operation`.
+pub fn operate(io: Io, operation: *Operation) void {
+ return io.vtable.batch(io.userdata, (operation)[0..1]) catch unreachable;
}
+/// Submits many operations together without waiting for all of them to
+/// complete.
+///
+/// This is a low-level abstraction based on `Operation`. For a higher
+/// level API that operates on `Future`, see `Select`.
+pub const Batch = struct {
+ operations: []Operation,
+ index: usize,
+ reserved: ?*anyopaque,
+
+ pub fn init(operations: []Operation) Batch {
+ return .{ .operations = operations, .index = 0, .reserved = null };
+ }
+
+ /// Submits all non-noop `operations`.
+ pub fn submit(b: *Batch, io: Io) void {
+ return io.vtable.batchSubmit(io.userdata, b);
+ }
+
+ pub const WaitError = ConcurrentError || Cancelable || Timeout.Error;
+
+ /// Resubmits the previously completed or noop-initialized `operations` at
+ /// indexes given by `resubmissions`. This set of indexes typically will be empty
+ /// on the first call to `await` since all operations have already been
+ /// submitted via `async`.
+ ///
+ /// Returns the index of a completed `Operation`, or `operations.len` if
+ /// all operations are completed.
+ ///
+ /// When `error.Canceled` is returned, all operations have already completed.
+ pub fn wait(b: *Batch, io: Io, resubmissions: []const usize, timeout: Timeout) WaitError!usize {
+ return io.vtable.batchWait(io.userdata, b, resubmissions, timeout);
+ }
+
+ /// Returns after all `operations` have completed. Each operation
+ /// independently may or may not have been canceled.
+ pub fn cancel(b: *Batch, io: Io) void {
+ return io.vtable.batchCancel(io.userdata, b);
+ }
+};
+
pub const Limit = enum(usize) {
nothing = 0,
unlimited = math.maxInt(usize),
diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig
@@ -557,10 +557,9 @@ pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usiz
var operation: Io.Operation = .{ .file_read_streaming = .{
.file = file,
.data = buffer,
- .result = undefined,
} };
- io.vtable.operate(io.userdata, (&operation)[0..1]);
- return operation.file_read_streaming.result;
+ io.operate(&operation);
+ return operation.file_read_streaming.status.result;
}
pub const ReadPositionalError = error{
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -1587,7 +1587,10 @@ pub fn io(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
- .operate = operate,
+ .batch = batch,
+ .batchSubmit = batchSubmit,
+ .batchWait = batchWait,
+ .batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
@@ -1748,7 +1751,10 @@ pub fn ioBasic(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
- .operate = operate,
+ .batch = batch,
+ .batchSubmit = batchSubmit,
+ .batchWait = batchWait,
+ .batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
@@ -2450,107 +2456,187 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
Thread.futexWake(ptr, max_waiters);
}
-fn operate(userdata: ?*anyopaque, operations: []Io.Operation) void {
+fn batchSubmit(userdata: ?*anyopaque, b: *Io.Batch) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
+ _ = b;
+ return;
+}
+
+fn operate(op: *Io.Operation) void {
+ switch (op.*) {
+ .noop => {},
+ .file_read_streaming => |*o| o.status = .{ .result = fileReadStreaming(o.file, o.data) },
+ }
+}
+fn batchWait(
+ userdata: ?*anyopaque,
+ b: *Io.Batch,
+ resubmissions: []const usize,
+ timeout: Io.Timeout,
+) Io.Batch.WaitError!usize {
+ _ = resubmissions;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const operations = b.operations;
+ if (operations.len == 1) {
+ operate(&operations[0]);
+ return b.operations.len;
+ }
if (is_windows) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
- var operation_index: usize = 0;
-
- while (operation_index < operations.len) {
- var poll_i: usize = 0;
- while (operation_index < operations.len) : (operation_index += 1) {
- switch (operations[operation_index]) {
- .noop => continue,
- .file_read_streaming => |*o| {
- if (o.nonblocking) {
- o.result = error.WouldBlock;
- poll_buffer[poll_i] = .{
- .fd = o.file.handle,
- .events = posix.POLL.IN,
- .revents = 0,
- };
- if (map_buffer.len - poll_i == 0) break;
- map_buffer[poll_i] = @intCast(operation_index);
- poll_i += 1;
- } else {
- o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) {
- error.Canceled => {
- setOperationsError(operations[operation_index..], error.Canceled);
- return;
- },
- else => err,
- };
- }
- },
- }
- }
+ var poll_i: usize = 0;
+
+ for (operations, 0..) |*op, operation_index| switch (op.*) {
+ .noop => continue,
+ .file_read_streaming => |*o| {
+ if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable;
+ poll_buffer[poll_i] = .{
+ .fd = o.file.handle,
+ .events = posix.POLL.IN,
+ .revents = 0,
+ };
+ map_buffer[poll_i] = @intCast(operation_index);
+ poll_i += 1;
+ },
+ };
- if (poll_i == 0) {
- @branchHint(.likely);
- return;
+ if (poll_i == 0) return operations.len;
+
+ const t_io = ioBasic(t);
+ const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
+ const max_poll_ms = std.math.maxInt(i32);
+
+ while (true) {
+ const timeout_ms: i32 = if (deadline) |d| t: {
+ const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock;
+ if (duration.raw.nanoseconds <= 0) return error.Timeout;
+ break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
+ } else -1;
+ const syscall = try Syscall.start();
+ const rc = posix.system.poll(&poll_buffer, poll_i, timeout_ms);
+ syscall.finish();
+ switch (posix.errno(rc)) {
+ .SUCCESS => {
+ if (rc == 0) {
+ // Although spurious timeouts are OK, when no deadline is
+ // passed we must not return `error.Timeout`.
+ if (deadline == null) continue;
+ return error.Timeout;
+ }
+ for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
+ if (poll_fd.revents == 0) continue;
+ operate(&operations[i]);
+ return i;
+ }
+ },
+ .INTR => continue,
+ else => return error.ConcurrencyUnavailable,
}
+ }
+}
- while (true) {
- const syscall = Syscall.start() catch |err| switch (err) {
- error.Canceled => {
- setPollOperationsError(operations, map_buffer[0..poll_i], error.Canceled);
- setOperationsError(operations[operation_index..], error.Canceled);
- return;
- },
+fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
+ _ = b;
+ return;
+}
+
+fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
+
+ if (operations.len == 1) {
+ @branchHint(.likely);
+ return operate(&operations[0]);
+ }
+
+ if (is_windows) @panic("TODO");
+
+ var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
+ var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
+ var poll_i: usize = 0;
+
+ for (operations, 0..) |*op, operation_index| switch (op.*) {
+ .noop => continue,
+ .file_read_streaming => |*o| {
+ if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable;
+ poll_buffer[poll_i] = .{
+ .fd = o.file.handle,
+ .events = posix.POLL.IN,
+ .revents = 0,
};
- const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1);
- syscall.finish();
- switch (posix.errno(poll_rc)) {
- .SUCCESS => {
- if (poll_rc == 0) {
- // Spurious timeout; handle same as INTR.
- continue;
- }
- for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
- if (poll_fd.revents == 0) continue;
- switch (operations[i]) {
- .noop => unreachable,
- .file_read_streaming => |*o| {
- o.result = fileReadStreaming(o.file, o.data);
- },
- }
- }
- break;
- },
- .INTR => continue,
- .NOMEM => {
- setPollOperationsError(operations, map_buffer[0..poll_i], error.SystemResources);
- break;
- },
- else => {
- setPollOperationsError(operations, map_buffer[0..poll_i], error.Unexpected);
- break;
- },
- }
+ map_buffer[poll_i] = @intCast(operation_index);
+ poll_i += 1;
+ },
+ };
+
+ const polls = poll_buffer[0..poll_i];
+ const map = map_buffer[0..poll_i];
+
+ var pending = poll_i;
+ while (pending > 1) {
+ const syscall = Syscall.start() catch |err| switch (err) {
+ error.Canceled => {
+ if (!setOperationsError(operations, polls, map, error.Canceled))
+ recancelInner();
+ return;
+ },
+ };
+ const rc = posix.system.poll(polls.ptr, polls.len, -1);
+ syscall.finish();
+ switch (posix.errno(rc)) {
+ .SUCCESS => {
+ if (rc == 0) {
+ // Spurious timeout; handle the same as INTR.
+ continue;
+ }
+ for (polls, map) |*poll_fd, i| {
+ if (poll_fd.revents == 0) continue;
+ poll_fd.fd = -1;
+ pending -= 1;
+ operate(&operations[i]);
+ }
+ },
+ .INTR => continue,
+ .NOMEM => {
+ assert(setOperationsError(operations, polls, map, error.SystemResources));
+ return;
+ },
+ else => {
+ assert(setOperationsError(operations, polls, map, error.Unexpected));
+ return;
+ },
}
}
+
+ if (pending == 1) for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
+ if (poll_fd.fd == -1) continue;
+ operate(&operations[i]);
+ };
}
-fn setPollOperationsError(
+fn setOperationsError(
operations: []Io.Operation,
+ polls: []const posix.pollfd,
map: []const u8,
err: error{ Canceled, SystemResources, Unexpected },
-) void {
- for (map) |operation_index| switch (operations[operation_index]) {
- .noop => unreachable,
- inline else => |*o| o.result = err,
- };
-}
-
-fn setOperationsError(operations: []Io.Operation, err: error{ Canceled, SystemResources, Unexpected }) void {
- for (operations) |*op| switch (op.*) {
- .noop => unreachable,
- inline else => |*o| o.result = err,
- };
+) bool {
+ var marked = false;
+ for (polls, map) |*poll_fd, i| {
+ if (poll_fd.fd == -1) continue;
+ switch (operations[i]) {
+ .noop => unreachable,
+ inline else => |*o| {
+ o.status = .{ .result = err };
+ marked = true;
+ },
+ }
+ }
+ return marked;
}
const dirCreateDir = switch (native_os) {
diff --git a/lib/std/process.zig b/lib/std/process.zig
@@ -453,9 +453,7 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child {
return io.vtable.processSpawnPath(io.userdata, dir, options);
}
-pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{
- StreamTooLong,
-};
+pub const RunError = SpawnError || Child.CollectOutputError;
pub const RunOptions = struct {
argv: []const []const u8,
@@ -535,13 +533,15 @@ pub fn run(gpa: Allocator, io: Io, options: RunOptions) RunError!RunResult {
const term = try child.wait(io);
- const owned_stdout = try stdout.toOwnedSlice(gpa);
- errdefer gpa.free(owned_stdout);
- const owned_stderr = try stderr.toOwnedSlice(gpa);
+ const stdout_slice = try stdout.toOwnedSlice(gpa);
+ errdefer gpa.free(stdout_slice);
+
+ const stderr_slice = try stderr.toOwnedSlice(gpa);
+ errdefer gpa.free(stderr_slice);
return .{
- .stdout = owned_stdout,
- .stderr = owned_stderr,
+ .stdout = stdout_slice,
+ .stderr = stderr_slice,
.term = term,
};
}
diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig
@@ -125,7 +125,9 @@ pub fn wait(child: *Child, io: Io) WaitError!Term {
return io.vtable.childWait(io.userdata, child);
}
-pub const CollectOutputError = error{StreamTooLong} || Allocator.Error || Io.File.Reader.Error;
+pub const CollectOutputError = error{
+ StreamTooLong,
+} || Io.ConcurrentError || Allocator.Error || Io.File.Reader.Error || Io.Timeout.Error;
pub const CollectOutputOptions = struct {
stdout: *std.ArrayList(u8),
@@ -135,6 +137,7 @@ pub const CollectOutputOptions = struct {
allocator: ?Allocator = null,
stdout_limit: Io.Limit = .unlimited,
stderr_limit: Io.Limit = .unlimited,
+ timeout: Io.Timeout = .none,
};
/// Collect the output from the process's stdout and stderr. Will return once
@@ -144,56 +147,55 @@ pub const CollectOutputOptions = struct {
/// The process must have been started with stdout and stderr set to
/// `process.SpawnOptions.StdIo.pipe`.
pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) CollectOutputError!void {
- const files: [2]Io.File = .{ child.stdout.?, child.stderr.? };
const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr };
const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit };
- var dones: [2]bool = .{ false, false };
- var reads: [2]Io.Operation = undefined;
+
+ if (options.allocator) |gpa| {
+ for (lists) |list| try list.ensureUnusedCapacity(gpa, 1);
+ } else {
+ for (lists) |list| {
+ if (list.unusedCapacitySlice().len == 0)
+ return error.StreamTooLong;
+ }
+ }
+
var vecs: [2][1][]u8 = undefined;
- while (true) {
- for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| {
- if (done) {
- read.* = .noop;
- continue;
- }
+ for (lists, &vecs) |list, *vec|
+ vec[0] = list.unusedCapacitySlice();
+
+ var operations: [2]Io.Operation = .{
+ .{ .file_read_streaming = .{
+ .file = child.stdout.?,
+ .data = &vecs[0],
+ } },
+ .{ .file_read_streaming = .{
+ .file = child.stderr.?,
+ .data = &vecs[1],
+ } },
+ };
+
+ var batch: Io.Batch = .init(&operations);
+ batch.submit(io);
+ defer batch.cancel(io);
+
+ var pending = operations.len;
+ var retry_index: ?usize = null;
+ while (pending > 0) {
+ const resubmissions: []const usize = if (retry_index) |i| &.{i} else &.{};
+ const index = try batch.wait(io, resubmissions, options.timeout);
+ const n = try operations[index].file_read_streaming.status.result;
+ if (n == 0) {
+ pending -= 1;
+ } else {
+ retry_index = index;
+ const list = lists[index];
+ const limit = limits[index];
+ list.items.len += n;
+ if (list.items.len >= @intFromEnum(limit)) return error.StreamTooLong;
if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1);
const cap = list.unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
- vec[0] = cap;
- read.* = .{ .file_read_streaming = .{
- .file = file,
- .data = vec,
- .nonblocking = true,
- .result = undefined,
- } };
- }
- var all_done = true;
- var any_canceled = false;
- var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {};
- io.vtable.operate(io.userdata, &reads);
- for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| {
- if (done.*) continue;
- const n = read.file_read_streaming.result catch |err| switch (err) {
- error.Canceled => {
- any_canceled = true;
- continue;
- },
- error.WouldBlock => continue,
- else => |e| {
- other_err = e;
- continue;
- },
- };
- if (n == 0) {
- done.* = true;
- } else {
- all_done = false;
- }
- list.items.len += n;
- if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong;
+ vecs[index][0] = cap;
}
- if (any_canceled) return error.Canceled;
- try other_err;
- if (all_done) return;
}
}