zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 5c42193b177e4b56ca91acb256cad36dd7cd4dec (tree)
parent 2774436a831194b5a97fc2774ad5570762ed0965
Author: Andrew Kelley <andrew@ziglang.org>
Date:   Fri, 23 Jan 2026 12:59:36 -0800

std.Io.Threaded: rework cancelation

Now it can handle sync cancelation and alertable cancelation on Windows.

Also fix the API of NtCancelIoFileEx

Diffstat:
Mlib/std/Io/Threaded.zig | 243+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Mlib/std/os/windows/ntdll.zig | 9++++++---
2 files changed, 193 insertions(+), 59 deletions(-)

diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -339,7 +339,8 @@ const Group = struct { .canceled => true, .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }; if (result) { @@ -542,7 +543,8 @@ const Future = struct { .canceled => true, .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }; thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); @@ -650,9 +652,11 @@ const Thread = struct { /// To request cancelation, set the status to `.blocked_canceling` and repeatedly interrupt the system call until the status changes. blocked = 0b011, - /// Windows-only: the thread is blocked in a call to `GetAddrInfoExW`. - /// To request cancelation, set the status to `.canceling` and call `GetAddrInfoExCancel`. - blocked_windows_dns = 0b010, + /// Windows-only: the thread is blocked in an alertable wait via + /// `NtDelayExecution`. To request cancelation, set the status to + /// `blocked_alertable_canceling` and repeatedly alert the thread + /// until the status changes. + blocked_alertable = 0b010, /// The thread has an outstanding cancelation request but is not in a cancelable operation. /// When it acknowledges the cancelation, it will set the status to `.canceled`. @@ -663,9 +667,16 @@ const Thread = struct { /// will not change for the remainder of this task's execution. canceled = 0b111, - /// The thread is blocked in a cancelable system call, and is being canceled. The thread which triggered the cancelation will send signals to this thread - /// until its status changes. + /// The thread is blocked in a cancelable system call, and is being + /// canceled. The thread which triggered the cancelation will send + /// signals to this thread until its status changes. blocked_canceling = 0b101, + + /// Windows-only: the thread is blocked in an alertable wait via + /// `NtDelayExecution`, and is being canceled. The thread which + /// triggered the cancelation will send signals to this thread + /// until its status changes. + blocked_alertable_canceling = 0b100, }, /// We cannot turn this value back into a pointer. Instead, it exists so that a task can be @@ -694,7 +705,8 @@ const Thread = struct { switch (status.cancelation) { .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, .none, .canceled => {}, .canceling => { @@ -1007,19 +1019,14 @@ const Thread = struct { .monotonic, ) orelse return true, - .blocked_windows_dns => thread.status.cmpxchgWeak( - .{ .cancelation = .blocked_windows_dns, .awaitable = awaitable }, - .{ .cancelation = .canceling, .awaitable = awaitable }, + .blocked_alertable => thread.status.cmpxchgWeak( + .{ .cancelation = .blocked_alertable, .awaitable = awaitable }, + .{ .cancelation = .blocked_alertable_canceling, .awaitable = awaitable }, .monotonic, .monotonic, ) orelse { - if (builtin.target.os.tag != .windows) unreachable; - if (true) { - // TODO: cancel Windows DNS queries. This code path is currently impossible - // as `netLookupFallible` doesn't actually use `.blocked_windows_dns` yet. - unreachable; - } - return false; + if (!is_windows) unreachable; + return true; }, .canceling, .canceled => { @@ -1029,7 +1036,8 @@ const Thread = struct { return false; }, - .blocked_canceling => unreachable, + .blocked_canceling => unreachable, // `awaitable` has not been canceled before now + .blocked_alertable_canceling => unreachable, // `awaitable` has not been canceled before now }; } } @@ -1044,40 +1052,59 @@ const Thread = struct { /// `Group.waitForCancelWithSignaling`: they use exponential backoff starting at a 1us delay and /// doubling each call. In practice, it is rare to send more than one signal. fn signalCanceledSyscall(thread: *Thread, t: *Threaded, awaitable: AwaitableId) bool { - const bad_status: Status = .{ .cancelation = .blocked_canceling, .awaitable = awaitable }; - if (thread.status.load(.monotonic) != bad_status) return false; + const status = thread.status.load(.monotonic); + if (status.awaitable != awaitable) { + // The thread has moved on and is working on something totally different. + return false; + } // The thread ID and/or handle can be read non-atomically because they never change and were // released by the store that made `thread` available to us. - if (std.Thread.use_pthreads) { - return switch (std.c.pthread_kill(thread.handle, .IO)) { - 0 => true, - else => false, - }; - } else switch (builtin.target.os.tag) { - .linux => { - const pid: posix.pid_t = pid: { - const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); - if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); - const pid = std.os.linux.getpid(); - @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); - break :pid pid; - }; - return switch (std.os.linux.tgkill(pid, @bitCast(thread.id), .IO)) { + switch (status.cancelation) { + .blocked_canceling => if (std.Thread.use_pthreads) { + return switch (std.c.pthread_kill(thread.handle, .IO)) { 0 => true, else => false, }; + } else switch (native_os) { + .linux => { + const pid: posix.pid_t = pid: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :pid pid; + }; + return switch (std.os.linux.tgkill(pid, @bitCast(thread.id), .IO)) { + 0 => true, + else => false, + }; + }, + .windows => { + var iosb: windows.IO_STATUS_BLOCK = undefined; + return switch (windows.ntdll.NtCancelSynchronousIoFile(thread.handle, null, &iosb)) { + .NOT_FOUND => true, // this might mean the operation hasn't started yet + .SUCCESS => false, // the OS confirmed that our cancelation worked + else => false, + }; + }, + else => return false, }, - .windows => { - var iosb: windows.IO_STATUS_BLOCK = undefined; - return switch (windows.ntdll.NtCancelSynchronousIoFile(thread.handle, null, &iosb)) { - .NOT_FOUND => true, // this might mean the operation hasn't started yet - .SUCCESS => false, // the OS confirmed that our cancelation worked + + .blocked_alertable_canceling => { + if (!is_windows) unreachable; + return switch (windows.ntdll.NtAlertThread(thread.handle)) { + .SUCCESS => true, else => false, }; }, - else => return false, + + else => { + // The thread is working on `awaitable`, but no longer needs signaling (they already + // woke up and saw the cancelation). + return false; + }, } } @@ -1118,7 +1145,8 @@ const Syscall = struct { }, .monotonic).cancelation) { .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, .none => return .{ .thread = thread }, // new status is `.blocked` .canceling => return error.Canceled, // new status is `.canceled` @@ -1137,7 +1165,8 @@ const Syscall = struct { }, .monotonic).cancelation) { .none => unreachable, .parked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .canceling => unreachable, .canceled => unreachable, .blocked => {}, // new status is `.blocked` (unchanged) @@ -1153,13 +1182,41 @@ const Syscall = struct { }, .monotonic).cancelation) { .none => unreachable, .parked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .canceling => unreachable, .canceled => unreachable, .blocked => {}, // new status is `.none` .blocked_canceling => {}, // new status is `.canceling` } } + /// Indicates instead of `NtCancelSynchronousIoFile` we need to use + /// `NtAlertThread` to interrupt the wait. + /// + /// Windows only, called from blocked state only. + fn toAlertable(s: Syscall) Io.Cancelable!AlertableSyscall { + comptime assert(is_windows); + const thread = s.thread orelse return .{ .thread = null }; + var prev = thread.status.load(.monotonic); + while (true) prev = switch (prev.cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, + .canceling => unreachable, + .canceled => unreachable, + + .blocked => thread.status.cmpxchgWeak(prev, .{ + .cancelation = .blocked_alertable, + .awaitable = prev.awaitable, + }, .monotonic, .monotonic) orelse return .{ .thread = thread }, + + .blocked_canceling => thread.status.cmpxchgWeak(prev, .{ + .cancelation = .canceled, + .awaitable = prev.awaitable, + }, .monotonic, .monotonic) orelse return error.Canceled, + }; + } /// Convenience wrapper which calls `finish`, then returns `err`. fn fail(s: Syscall, err: anytype) @TypeOf(err) { s.finish(); @@ -1191,6 +1248,72 @@ const Syscall = struct { } }; +const AlertableSyscall = struct { + thread: ?*Thread, + + comptime { + assert(is_windows); + } + + fn checkCancel(s: AlertableSyscall) Io.Cancelable!void { + comptime assert(is_windows); + const thread = s.thread orelse return; + const old_status = thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic); + switch (old_status.cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked => unreachable, + .blocked_canceling => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked_alertable => {}, // new status is `.blocked_alertable` (unchanged) + .blocked_alertable_canceling => { + // New status is `.canceling`---change to `.canceled` before return. + thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic); + return error.Canceled; + }, + } + } + + fn finish(s: AlertableSyscall) void { + comptime assert(is_windows); + const thread = s.thread orelse return; + switch (thread.status.fetchXor(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic).cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked => unreachable, + .blocked_canceling => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked_alertable => {}, // new status is `.none` + .blocked_alertable_canceling => {}, // new status is `.canceling` + } + } + + fn fail(s: AlertableSyscall, err: anytype) @TypeOf(err) { + s.finish(); + return err; + } + + fn ntstatusBug(s: AlertableSyscall, status: windows.NTSTATUS) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return windows.statusBug(status); + } + + fn unexpectedNtstatus(s: AlertableSyscall, status: windows.NTSTATUS) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return windows.unexpectedStatus(status); + } +}; + const max_iovecs_len = 8; const splat_buffer_size = 64; const default_PATH = "/usr/local/bin:/bin/:/usr/bin"; @@ -1977,7 +2100,8 @@ fn groupAsyncEager( .canceled => true, .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }; } else false; @@ -1988,7 +2112,8 @@ fn groupAsyncEager( .canceled => true, .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }; } else false; @@ -2167,7 +2292,8 @@ fn recancelInner() void { .canceling => unreachable, // called `recancel` but cancelation was already pending .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } } @@ -12096,8 +12222,7 @@ fn netLookupFallible( var res: *ws2_32.ADDRINFOEXW = undefined; const timeout: ?*ws2_32.timeval = null; while (true) { - // TODO: hook this up to cancelation with `Thread.Status.cancelation.blocked_windows_dns`. - // See matching TODO in `Thread.cancelAwaitable`. + // TODO: hook this up to cancelation with `NtDelayExecution` and APC callbacks. try Thread.checkCancel(); // TODO make this append to the queue eagerly rather than blocking until the whole thing finishes const rc: ws2_32.WinsockError = @enumFromInt(ws2_32.GetAddrInfoExW(name_w, port_w, .DNS, null, &hints, &res, timeout, null, null, null)); @@ -15780,7 +15905,8 @@ const parking_futex = struct { .canceled => break :cancelable, // status is still `.canceled` .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } // We could now be unparked for a cancelation at any time! @@ -15831,7 +15957,8 @@ const parking_futex = struct { }, .canceled => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }, } @@ -15872,7 +15999,8 @@ const parking_futex = struct { .canceling => continue, // race with a canceler who hasn't called `removeCanceledWaiter` yet .canceled => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } // We're waking this waiter. Remove them from the bucket and add them to our local list. @@ -15938,7 +16066,8 @@ const parking_sleep = struct { .canceled => break :cancelable, // status is still `.canceled` .parked => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } while (park(deadline, null)) { @@ -15956,7 +16085,8 @@ const parking_sleep = struct { .none => unreachable, .canceled => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } } else |err| switch (err) { @@ -15975,7 +16105,8 @@ const parking_sleep = struct { .none => unreachable, .canceled => unreachable, .blocked => unreachable, - .blocked_windows_dns => unreachable, + .blocked_alertable => unreachable, + .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, }, } diff --git a/lib/std/os/windows/ntdll.zig b/lib/std/os/windows/ntdll.zig @@ -596,8 +596,11 @@ pub extern "ntdll" fn NtDelayExecution( pub extern "ntdll" fn NtCancelIoFileEx( FileHandle: HANDLE, - /// Documentation has this as IO_STATUS_BLOCK but it's actually the APC - /// context parameter. - IoRequestToCancel: ?*anyopaque, + IoRequestToCancel: *const IO_STATUS_BLOCK, IoStatusBlock: *IO_STATUS_BLOCK, ) callconv(.winapi) NTSTATUS; + +pub extern "ntdll" fn NtCancelIoFile( + handle: HANDLE, + iosbToCancel: *const IO_STATUS_BLOCK, +) callconv(.winapi) NTSTATUS;