commit 523aa213c9f7466bdff5c7030de7d221f1547621 (tree)
parent 37316a3cf61a0b193003a19b44116da346c6cd3e
Author: Andrew Kelley <andrew@ziglang.org>
Date: Mon, 26 Jan 2026 19:07:01 -0800
std.Io.Threaded: batchWait and batchCancel for Windows
Diffstat:
3 files changed, 217 insertions(+), 124 deletions(-)
diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig
@@ -1721,7 +1721,6 @@ fn evalZigTest(
// a crash of some kind. Either way, the child will terminate by itself -- wait for it.
const stderr_reader = multi_reader.reader(1);
const stderr_owned = try arena.dupe(u8, stderr_reader.buffered());
- stderr_reader.tossBuffered();
// Clean up everything and wait for the child to exit.
child.stdin.?.close(io);
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
@@ -350,6 +350,8 @@ pub const Batch = struct {
}
};
+ /// After calling this, it is safe to unconditionally defer a call to
+ /// `cancel`.
pub fn init(operations: []Operation, ring: []u32) Batch {
const len: u31 = @intCast(operations.len);
assert(ring.len == len);
@@ -408,12 +410,18 @@ pub const Batch = struct {
/// Starts work on any submitted operations and returns when at least one has completeed.
///
/// Returns `error.Timeout` if `timeout` expires first.
+ ///
+ /// Depending on the `Io` implementation, may allocate resources that are
+ /// freed with `cancel`, even if an error is returned.
pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void {
return io.vtable.batchWait(io.userdata, b, timeout);
}
/// Returns after all `operations` have completed. Operations which have not completed
/// after this function returns were successfully dropped and had no side effects.
+ ///
+ /// This function is idempotent with respect to itself and `wait`. It is
+ /// safe to unconditionally `defer` a call to this function after `init`.
pub fn cancel(b: *Batch, io: Io) void {
return io.vtable.batchCancel(io.userdata, b);
}
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -1255,6 +1255,32 @@ const AlertableSyscall = struct {
assert(is_windows);
}
+ fn start() Io.Cancelable!AlertableSyscall {
+ const thread = Thread.current orelse return .{ .thread = null };
+ switch (thread.cancel_protection) {
+ .blocked => return .{ .thread = null },
+ .unblocked => {},
+ }
+ const old_status = thread.status.fetchOr(.{
+ .cancelation = @enumFromInt(0b010),
+ .awaitable = .null,
+ }, .monotonic);
+ switch (old_status.cancelation) {
+ .parked => unreachable,
+ .blocked => unreachable,
+ .blocked_alertable => unreachable,
+ .blocked_canceling => unreachable,
+ .blocked_alertable_canceling => unreachable,
+ .none => return .{ .thread = thread }, // new status is `.blocked_alertable`
+ .canceling => {
+ // Status is unchanged (still `.canceling`)---change to `.canceled` before return.
+ thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic);
+ return error.Canceled;
+ },
+ .canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged)
+ }
+ }
+
fn checkCancel(s: AlertableSyscall) Io.Cancelable!void {
comptime assert(is_windows);
const thread = s.thread orelse return;
@@ -2501,10 +2527,10 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
const op = ring[submit_head.index(len)];
const operation = &operations[op];
switch (operation.*) {
- .noop => {
- try operate(t, operation);
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
+ .noop => |*o| {
+ _ = o.status.unstarted;
+ o.status = .{ .result = {} };
+ submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
@@ -2524,8 +2550,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
1 => if (timeout == .none) {
const op = map_buffer[0];
try operate(t, &operations[op]);
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
+ submitComplete(ring, &complete_tail, op);
poll_i = 0;
return;
},
@@ -2560,8 +2585,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
ring[submit_head.index(len)] = op;
} else {
try operate(t, &operations[op]);
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
+ submitComplete(ring, &complete_tail, op);
}
}
return;
@@ -2584,19 +2608,49 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
switch (operations[op]) {
- .noop => {
- operate(t, &operations[op]) catch unreachable;
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
+ .noop => |*o| {
+ _ = o.status.unstarted;
+ o.status = .{ .result = {} };
+ submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| _ = o.status.unstarted,
}
}
+ if (is_windows) {
+ // Iterate over pending and issue cancelations, then free the allocation for IO_STATUS_BLOCK
+ if (b.impl.reserved) |reserved| {
+ const gpa = t.allocator;
+ const metadatas_ptr: [*]WinOpMetadata = @ptrCast(@alignCast(reserved));
+ const metadatas = metadatas_ptr[0..b.operations.len];
+ for (metadatas, 0..) |*metadata, op| {
+ const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
+ if (done) continue;
+ switch (operations[op]) {
+ .noop => unreachable,
+ .file_read_streaming => |*o| {
+ _ = windows.ntdll.NtCancelIoFile(o.file.handle, &metadata.iosb);
+ },
+ }
+ }
+ for (metadatas) |*metadata| {
+ while (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) {
+ waitForApcOrAlert();
+ }
+ }
+ gpa.free(metadatas);
+ b.impl.reserved = null;
+ }
+ }
b.impl.submit_head = submit_tail;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
+const WinOpMetadata = struct {
+ iosb: windows.IO_STATUS_BLOCK,
+ pending: bool,
+};
+
fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void {
const operations = b.operations;
const len: u31 = @intCast(operations.len);
@@ -2606,16 +2660,16 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
- var overlapped_buffer: [poll_buffer_len]windows.OVERLAPPED = undefined;
- var handles_buffer: [poll_buffer_len]windows.HANDLE = undefined;
- var map_buffer: [poll_buffer_len]u32 = undefined; // handles_buffer index to operations index
- var buffer_i: usize = 0;
+ const metadatas_ptr: [*]WinOpMetadata = if (b.impl.reserved) |reserved| @ptrCast(@alignCast(reserved)) else a: {
+ const gpa = t.allocator;
+ const metadatas = gpa.alloc(WinOpMetadata, operations.len) catch return error.ConcurrencyUnavailable;
+ b.impl.reserved = metadatas.ptr;
+ @memset(metadatas, .{ .iosb = undefined, .pending = false });
+ break :a metadatas.ptr;
+ };
+ const metadatas = metadatas_ptr[0..operations.len];
defer {
- for (map_buffer[0..buffer_i]) |op| {
- submit_head = submit_head.prev(len);
- ring[submit_head.index(len)] = op;
- }
b.impl.submit_head = submit_head;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
@@ -2624,74 +2678,76 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
const operation = &operations[op];
+ const metadata = &metadatas[op];
+ metadata.* = .{ .iosb = undefined, .pending = false };
switch (operation.*) {
- .noop => {
- try operate(t, operation);
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
+ .noop => |*o| {
+ _ = o.status.unstarted;
+ o.status = .{ .result = {} };
+ submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
- if (handles_buffer.len - buffer_i == 0) return error.ConcurrencyUnavailable;
- const overlapped = &overlapped_buffer[buffer_i];
- overlapped.* = .{
- .Internal = 0,
- .InternalHigh = 0,
- .DUMMYUNIONNAME = .{ .Pointer = null },
- .hEvent = null,
- };
- var n: windows.DWORD = undefined;
- const buf = o.data[0];
- const buf_len = std.math.lossyCast(windows.DWORD, buf.len);
- if (windows.kernel32.ReadFile(o.file.handle, buf.ptr, buf_len, &n, overlapped) == 0) {
- @panic("TODO");
+ switch (try ntReadFile(o.file.handle, o.data, &metadata.iosb)) {
+ .status => {
+ o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
+ submitComplete(ring, &complete_tail, op);
+ },
+ .pending => {
+ o.status = .{ .pending = b };
+ metadata.pending = true;
+ },
}
- handles_buffer[buffer_i] = o.file.handle;
- map_buffer[buffer_i] = op;
- buffer_i += 1;
},
}
}
- switch (buffer_i) {
- 0 => return,
- 1 => if (timeout == .none) {
- const op = map_buffer[0];
- try operate(t, &operations[op]);
- ring[complete_tail.index(len)] = op;
- complete_tail = complete_tail.next(len);
- buffer_i = 0;
- return;
- },
- else => {},
- }
-
- const handles = handles_buffer[0..buffer_i];
- const map = map_buffer[0..buffer_i];
+ var delay_interval: windows.LARGE_INTEGER = timeoutToWindowsInterval(timeout);
- const syscall: Syscall = try .start();
- const index_result = windows.WaitForMultipleObjectsEx(handles, false, windows.INFINITE, true);
- syscall.finish();
- const index = index_result catch |err| switch (err) {
- error.Unexpected => @panic("TODO"),
- error.WaitAbandoned => @panic("TODO"),
- error.WaitTimeOut => @panic("TODO"),
- };
- var n: windows.DWORD = undefined;
- if (0 == windows.kernel32.GetOverlappedResult(handles[index], &overlapped_buffer[index], &n, 0)) {
- switch (windows.GetLastError()) {
- .BROKEN_PIPE => @panic("TODO"),
- .OPERATION_ABORTED => @panic("TODO"),
- else => @panic("TODO"),
+ while (true) {
+ const alertable_syscall = try AlertableSyscall.start();
+ const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
+ alertable_syscall.finish();
+ switch (delay_rc) {
+ .SUCCESS => {
+ // The thread woke due to the timeout. Although spurious
+ // timeouts are OK, when no deadline is passed we must not
+ // return `error.Timeout`.
+ if (timeout != .none) return error.Timeout;
+ },
+ else => {},
}
- } else switch (operations[map[index]]) {
- .noop => unreachable,
- .file_read_streaming => |*o| {
- o.status = .{ .result = n };
- },
+ var any_done = false;
+ var any_pending = false;
+ for (metadatas, 0..) |*metadata, op_usize| {
+ if (!metadata.pending) continue;
+ any_pending = true;
+ const op: u31 = @intCast(op_usize);
+ const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
+ switch (operations[op]) {
+ .noop => unreachable,
+ .file_read_streaming => |*o| {
+ assert(o.status.pending == b);
+ if (!done) continue;
+ o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
+ },
+ }
+ any_done = true;
+ metadata.pending = false;
+ submitComplete(ring, &complete_tail, op);
+ }
+ if (any_done) return;
+ if (!any_pending) return;
}
}
+fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
+ const ct = complete_tail.*;
+ const len: u31 = @intCast(ring.len);
+ ring[ct.index(len)] = op;
+ complete_tail.* = ct.next(len);
+}
+
const dirCreateDir = switch (native_os) {
.windows => dirCreateDirWindows,
.wasi => dirCreateDirWasi,
@@ -5529,7 +5585,7 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8,
fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize {
var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined;
- // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
+ // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf);
@@ -8617,75 +8673,88 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize {
+ var io_status_block: windows.IO_STATUS_BLOCK = undefined;
+ if (ntReadFile(file.handle, data, &io_status_block)) |result| switch (result) {
+ .status => return ntReadFileResult(&io_status_block),
+ .pending => {
+ // Once we get here we received PENDING so we must not return from the
+ // function until the operation completes.
+ defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
+ waitForApcOrAlert();
+ };
+
+ const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
+ error.Canceled => |e| {
+ _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
+ return e;
+ },
+ };
+ defer alertable_syscall.finish();
+ waitForApcOrAlert();
+ while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
+ alertable_syscall.checkCancel() catch |err| switch (err) {
+ error.Canceled => |e| {
+ _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
+ return e;
+ },
+ };
+ waitForApcOrAlert();
+ }
+ },
+ } else |err| return err;
+ return ntReadFileResult(&io_status_block);
+}
+
+fn ntReadFileResult(io_status_block: *windows.IO_STATUS_BLOCK) !usize {
+ switch (io_status_block.u.Status) {
+ .SUCCESS, .END_OF_FILE, .PIPE_BROKEN => return io_status_block.Information,
+ .PENDING => unreachable,
+ .INVALID_DEVICE_REQUEST => return error.IsDir,
+ .LOCK_NOT_GRANTED => return error.LockViolation,
+ .ACCESS_DENIED => return error.AccessDenied,
+ else => |status| return windows.unexpectedStatus(status),
+ }
+}
+
+fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STATUS_BLOCK) Io.Cancelable!enum { status, pending } {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
- if (index == data.len) return 0;
+ if (index == data.len) {
+ iosb.u.Status = .SUCCESS;
+ iosb.Information = 0;
+ return .status;
+ }
const buffer = data[index];
- var io_status_block: windows.IO_STATUS_BLOCK = undefined;
const syscall: Syscall = try .start();
while (true) {
- io_status_block.u.Status = .PENDING;
+ iosb.u.Status = .PENDING;
switch (windows.ntdll.NtReadFile(
- file.handle,
+ handle,
null, // event
noopApc, // apc callback
null, // apc context
- &io_status_block,
+ iosb,
buffer.ptr,
@min(std.math.maxInt(u32), buffer.len),
null, // byte offset
null, // key
)) {
- .SUCCESS, .END_OF_FILE, .PIPE_BROKEN => {
+ .PENDING => {
syscall.finish();
- return io_status_block.Information;
+ return .pending;
},
- .PENDING => break,
.CANCELLED => {
try syscall.checkCancel();
continue;
},
- .INVALID_DEVICE_REQUEST => return syscall.fail(error.IsDir),
- .LOCK_NOT_GRANTED => return syscall.fail(error.LockViolation),
- .ACCESS_DENIED => return syscall.fail(error.AccessDenied),
- .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), // streaming read of async mode file
- else => |status| return syscall.unexpectedNtstatus(status),
- }
- }
- {
- // Once we get here we received PENDING so we must not return from the
- // function until the operation completes.
- defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
- waitForApcOrAlert();
- };
-
- const alertable_syscall = syscall.toAlertable() catch |err| switch (err) {
- error.Canceled => |e| {
- _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
- return e;
+ else => |status| {
+ syscall.finish();
+ iosb.u.Status = status;
+ return .status;
},
- };
- defer alertable_syscall.finish();
- waitForApcOrAlert();
- while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
- alertable_syscall.checkCancel() catch |err| switch (err) {
- error.Canceled => |e| {
- _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
- return e;
- },
- };
- waitForApcOrAlert();
}
}
- switch (io_status_block.u.Status) {
- .SUCCESS, .END_OF_FILE, .PIPE_BROKEN => return io_status_block.Information,
- .PENDING => unreachable, // cannot return until the operation completes
- .INVALID_DEVICE_REQUEST => return error.IsDir,
- .LOCK_NOT_GRANTED => return error.LockViolation,
- .ACCESS_DENIED => return error.AccessDenied,
- else => |status| return windows.unexpectedStatus(status),
- }
}
fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
@@ -9318,7 +9387,7 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) process.Execut
};
defer w.CloseHandle(h_file);
- // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
+ // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data);
@@ -12989,7 +13058,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr
if (is_windows) {
var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined;
- // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
+ // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer);
const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong;
@@ -16657,7 +16726,7 @@ const parking_sleep = struct {
/// Spurious wakeups are possible.
///
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
-fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
+fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
comptime assert(use_parking_futex or use_parking_sleep);
switch (native_os) {
.windows => {
@@ -16713,6 +16782,23 @@ fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) err
}
}
+fn timeoutToWindowsInterval(timeout: Io.Timeout) windows.LARGE_INTEGER {
+ switch (timeout) {
+ .none => {
+ return std.math.minInt(windows.LARGE_INTEGER); // infinite timeout
+ },
+ .deadline => |deadline| {
+ const nanoseconds = deadline.raw.nanoseconds;
+ return @intCast(@divTrunc(nanoseconds, 100));
+ },
+ .duration => |duration| {
+ const now_timestamp = nowWindows(duration.clock) catch unreachable;
+ const deadline_ns = now_timestamp.nanoseconds + duration.raw.nanoseconds;
+ return @intCast(@divTrunc(deadline_ns, 100));
+ },
+ }
+}
+
const UnparkTid = switch (native_os) {
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles?
.windows => usize,