zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit e57c557ad41188f85984cb8e6b79c8432cb95ec2 (tree)
parent a1d4120fd9a3b34bde79d34dd14725d751021fc6
Author: Matthew Lugg <mlugg@mlugg.co.uk>
Date:   Tue, 30 Dec 2025 12:59:32 +0000

std.Io.Threaded: hugely improve Windows and NetBSD support

The most interesting thing here is the replacement of the pthread futex
implementation with an implementation based on thread park/unpark APIs.
Thread parking tends to be the primitive provided by systems which do
not have a futex primitive, such as NetBSD, so this implementation is
far more efficient than the pthread one. It is also useful on Windows,
where `RtlWaitOnAddress` is itself a userland implementation based on
thread park/unpark; we can implement it ourselves including support for
features which Windows' implementation lacks, such as cancelation and
waking a number of waiters with 1<n<infinity.

Compared to the pthread implementation, this thread-parking-based one
also supports full robust cancelation. Thread parking also turns out to
be useful for implementing `sleep`, so is now used for that on Windows
and NetBSD.

This commit also introduces proper cancelation support for most Windows
operations. The most notable omission right now is DNS lookups through
`GetAddrInfoEx`, just because they're a little more work due to having
a unique cancelation mechanism---but the machinery is all there, so I'll
finish gluing it together soon.

As of this commit, there are very few parts of `Io.Threaded` which do
not support full robust cancelation. The only ones which actually really
matter (because they could block for a prolonged period of time) are DNS
lookups on Windows (as discussed above) and futex waits on WASM.

Diffstat:
Mlib/std/Io/Threaded.zig | 2821+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Mlib/std/c.zig | 3+++
Mlib/std/c/netbsd.zig | 20+++++++++++++++++++-
Mlib/std/debug/SelfInfo/Windows.zig | 3+--
Mlib/std/os/windows.zig | 72++++++------------------------------------------------------------------
Mlib/std/os/windows/ntdll.zig | 27+++++++++++++++++++++++++++
Mlib/std/posix.zig | 1+
Mlib/std/process/Child.zig | 4++--
8 files changed, 1681 insertions(+), 1270 deletions(-)

diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -105,12 +105,7 @@ pub const Environ = struct { }; }; -pub const RobustCancel = if (std.Thread.use_pthreads or native_os == .linux) enum { - enabled, - disabled, -} else enum { - disabled, -}; +pub const RobustCancel = enum { enabled, disabled }; pub const Pid = if (native_os == .linux) enum(posix.pid_t) { unknown = 0, @@ -514,13 +509,21 @@ const AwaitableId = enum(@Int(.unsigned, @bitSizeOf(usize) - 3)) { const Thread = struct { next: ?*Thread, - /// The value that needs to be passed to pthread_kill or tgkill in order to - /// send a signal. - signalee_id: SignaleeId, + + id: std.Thread.Id, + handle: Handle, status: std.atomic.Value(Status), cancel_protection: Io.CancelProtection, + /// Always released when `Status.cancelation` is set to `.parked`. + futex_waiter: if (use_parking_futex) ?*parking_futex.Waiter else ?noreturn, + + const Handle = Handle: { + if (std.Thread.use_pthreads) break :Handle std.c.pthread_t; + if (builtin.target.os.tag == .windows) break :Handle windows.HANDLE; + break :Handle void; + }; const Status = packed struct(usize) { /// The specific values of these enum fields are chosen to simplify the implementation of @@ -531,7 +534,7 @@ const Thread = struct { none = 0b000, /// The thread is parked in a cancelable futex wait or sleep. - /// Only applicable on Windows, NetBSD, and Illumos. + /// Only applicable if `use_parking_futex` or `use_parking_sleep`. /// To request cancelation, set the status to `.canceling` and unpark the thread. /// To unpark for another reason (futex wake), set the status to `.none` and unpark the thread. parked = 0b001, @@ -540,8 +543,8 @@ const Thread = struct { /// To request cancelation, set the status to `.blocked_canceling` and repeatedly interrupt the system call until the status changes. blocked = 0b011, - /// Windows-only: the thread is blocked on a DNS query. - /// To request cancelation, set the status to `.canceling` and call `DnsCancelQuery`. + /// Windows-only: the thread is blocked in a call to `GetAddrInfoExW`. + /// To request cancelation, set the status to `.canceling` and call `GetAddrInfoExCancel`. blocked_windows_dns = 0b010, /// The thread has an outstanding cancelation request but is not in a cancelable operation. @@ -597,10 +600,6 @@ const Thread = struct { } } - fn currentSignaleeId() SignaleeId { - return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId(); - } - fn futexWaitUncancelable(ptr: *const u32, expect: u32, timeout_ns: ?u64) void { return Thread.futexWaitInner(ptr, expect, true, timeout_ns) catch unreachable; } @@ -614,8 +613,19 @@ const Thread = struct { if (builtin.single_threaded) unreachable; // nobody would ever wake us - if (builtin.cpu.arch.isWasm()) { + if (use_parking_futex) { + return parking_futex.wait( + ptr, + expect, + uncancelable, + if (timeout_ns) |ns| .{ .duration = .{ + .raw = .fromNanoseconds(ns), + .clock = .boot, + } } else .none, + ); + } else if (builtin.cpu.arch.isWasm()) { comptime assert(builtin.cpu.has(.wasm, .atomics)); + // TODO implement cancelation for WASM futex waits by signaling the futex if (!uncancelable) try Thread.checkCancel(); const to: i64 = if (timeout_ns) |ns| ns else -1; const signed_expect: i32 = @bitCast(expect); @@ -689,24 +699,6 @@ const Thread = struct { else => recoverableOsBugDetected(), } }, - .windows => { - var timeout_value: windows.LARGE_INTEGER = undefined; - var timeout_ptr: ?*const windows.LARGE_INTEGER = null; - // NTDLL functions work with time in units of 100 nanoseconds. - // Positive values are absolute deadlines while negative values are relative durations. - if (timeout_ns) |delay| { - timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100)); - timeout_value = -timeout_value; - timeout_ptr = &timeout_value; - } - if (!uncancelable) try Thread.checkCancel(); - switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), timeout_ptr)) { - .SUCCESS => {}, - .CANCELLED => {}, - .TIMEOUT => {}, // timeout - else => recoverableOsBugDetected(), - } - }, .freebsd => { const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); var tm_size: usize = 0; @@ -738,7 +730,7 @@ const Thread = struct { tm_ptr = &tm; tm = timestampToPosix(ns); } - if (thread) |t| try t.beginSyscall(); + const syscall: Syscall = if (uncancelable) .{ .thread = null } else try .start(); const rc = std.c.futex( ptr, std.c.FUTEX.WAIT | std.c.FUTEX.PRIVATE_FLAG, @@ -746,7 +738,7 @@ const Thread = struct { tm_ptr, null, // uaddr2 is ignored ); - if (thread) |t| t.endSyscall(); + syscall.finish(); if (is_debug) switch (posix.errno(rc)) { .SUCCESS => {}, .NOSYS => unreachable, // constant op known good value @@ -765,9 +757,9 @@ const Thread = struct { } else { timeout_us = 0; } - if (thread) |t| try t.beginSyscall(); + const syscall: Syscall = if (uncancelable) .{ .thread = null } else try .start(); const rc = std.c.umtx_sleep(@ptrCast(ptr), @bitCast(expect), timeout_us); - if (thread) |t| t.endSyscall(); + syscall.finish(); if (is_debug) switch (std.posix.errno(rc)) { .SUCCESS => {}, .BUSY => {}, // ptr != expect @@ -777,14 +769,7 @@ const Thread = struct { else => unreachable, }; }, - else => if (std.Thread.use_pthreads) { - // TODO integrate the following function being called with robust cancelation. - return pthreads_futex.wait(ptr, expect, timeout_ns) catch |err| switch (err) { - error.Timeout => {}, - }; - } else { - @compileError("unimplemented: futexWait"); - }, + else => @compileError("unimplemented: futexWait"), } } @@ -794,7 +779,9 @@ const Thread = struct { if (builtin.single_threaded) return; // nothing to wake up - if (builtin.cpu.arch.isWasm()) { + if (use_parking_futex) { + return parking_futex.wake(ptr, max_waiters); + } else if (builtin.cpu.arch.isWasm()) { comptime assert(builtin.cpu.has(.wasm, .atomics)); const woken_count = asm volatile ( \\local.get %[ptr] @@ -839,12 +826,6 @@ const Thread = struct { } } }, - .windows => { - switch (max_waiters) { - 1 => windows.ntdll.RtlWakeAddressSingle(ptr), - else => windows.ntdll.RtlWakeAddressAll(ptr), - } - }, .freebsd => { const rc = std.c._umtx_op( @intFromPtr(ptr), @@ -877,11 +858,7 @@ const Thread = struct { @min(max_waiters, std.math.maxInt(c_int)), ); }, - else => if (std.Thread.use_pthreads) { - return pthreads_futex.wake(ptr, max_waiters); - } else { - @compileError("unimplemented: futexWake"); - }, + else => @compileError("unimplemented: futexWake"), } } @@ -905,10 +882,14 @@ const Thread = struct { .parked => thread.status.cmpxchgWeak( .{ .cancelation = .parked, .awaitable = awaitable }, .{ .cancelation = .canceling, .awaitable = awaitable }, - .monotonic, + .acquire, // acquire `thread.futex_waiter` .monotonic, ) orelse { - if (true) @panic("MLUGG TODO: unpark thread"); + if (!use_parking_futex and !use_parking_sleep) unreachable; + if (thread.futex_waiter) |futex_waiter| { + parking_futex.removeCanceledWaiter(futex_waiter); + } + unpark(&.{thread.id}, null); return false; }, @@ -924,7 +905,15 @@ const Thread = struct { .{ .cancelation = .canceling, .awaitable = awaitable }, .monotonic, .monotonic, - ) orelse return false, + ) orelse { + if (builtin.target.os.tag != .windows) unreachable; + if (true) { + // TODO: cancel Windows DNS queries. This code path is currently impossible + // as `netLookupFallible` doesn't actually use `.blocked_windows_dns` yet. + unreachable; + } + return false; + }, .canceling, .canceled => { // This can happen when the task start raced with the cancelation, so the thread @@ -951,26 +940,38 @@ const Thread = struct { const bad_status: Status = .{ .cancelation = .blocked_canceling, .awaitable = awaitable }; if (thread.status.load(.monotonic) != bad_status) return false; - // The thread ID can be read non-atomically because it never changes and was released by the - // store that made `thread` available to us. - const signalee_id = thread.signalee_id; + // The thread ID and/or handle can be read non-atomically because they never change and were + // released by the store that made `thread` available to us. if (std.Thread.use_pthreads) { - if (std.c.pthread_kill(signalee_id, .IO) != 0) return false; - } else if (native_os == .linux) { - const pid: posix.pid_t = pid: { - const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); - if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); - const pid = std.os.linux.getpid(); - @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); - break :pid pid; + return switch (std.c.pthread_kill(thread.handle, .IO)) { + 0 => true, + else => false, }; - if (std.os.linux.tgkill(pid, @bitCast(signalee_id), .IO) != 0) return false; - } else { - @compileError("MLUGG TODO"); + } else switch (builtin.target.os.tag) { + .linux => { + const pid: posix.pid_t = pid: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :pid pid; + }; + return switch (std.os.linux.tgkill(pid, @bitCast(thread.id), .IO)) { + 0 => true, + else => false, + }; + }, + .windows => { + var iosb: windows.IO_STATUS_BLOCK = undefined; + return switch (windows.ntdll.NtCancelSynchronousIoFile(thread.handle, null, &iosb)) { + .NOT_FOUND => true, // this might mean the operation hasn't started yet + .SUCCESS => false, // the OS confirmed that our cancelation worked + else => false, + }; + }, + else => return false, } - - return true; } /// Like a `*Thread`, but 2 bits smaller than a pointer (because the LSBs are always 0 due to @@ -1069,6 +1070,18 @@ const Syscall = struct { s.finish(); return posix.unexpectedErrno(err); } + /// Convenience wrapper which calls `finish`, then calls `windows.statusBug`. + fn ntstatusBug(s: Syscall, status: windows.NTSTATUS) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return windows.statusBug(status); + } + /// Convenience wrapper which calls `finish`, then calls `windows.unexpectedStatus`. + fn unexpectedNtstatus(s: Syscall, status: windows.NTSTATUS) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return windows.unexpectedStatus(status); + } }; const max_iovecs_len = 8; @@ -1233,15 +1246,45 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { var thread: Thread = .{ .next = undefined, - .signalee_id = Thread.currentSignaleeId(), + .id = std.Thread.getCurrentId(), + .handle = handle: { + if (std.Thread.use_pthreads) break :handle std.c.pthread_self(); + if (builtin.target.os.tag == .windows) break :handle undefined; // populated below + }, .status = .init(.{ .cancelation = .none, .awaitable = .null, }), .cancel_protection = .unblocked, + .futex_waiter = undefined, }; Thread.current = &thread; + if (builtin.target.os.tag == .windows) { + assert(windows.ntdll.NtOpenThread( + &thread.handle, + .{ + .SPECIFIC = .{ + .THREAD = .{ + .TERMINATE = true, // for `NtCancelSynchronousIoFile` + }, + }, + }, + &.{ + .Length = @sizeOf(windows.OBJECT_ATTRIBUTES), + .RootDirectory = null, + .ObjectName = null, + .Attributes = .{}, + .SecurityDescriptor = null, + .SecurityQualityOfService = null, + }, + &windows.teb().ClientId, + ) == .SUCCESS); + } + defer if (builtin.target.os.tag == .windows) { + windows.CloseHandle(thread.handle); + }; + { var head = t.worker_threads.load(.monotonic); while (true) { @@ -2127,26 +2170,34 @@ fn dirCreateDirWasi(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permi fn dirCreateDirWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); const sub_path_w = try windows.sliceToPrefixedFileW(dir.handle, sub_path); _ = permissions; // TODO use this value - const sub_dir_handle = windows.OpenFile(sub_path_w.span(), .{ - .dir = dir.handle, - .access_mask = .{ - .GENERIC = .{ .READ = true }, - .STANDARD = .{ .SYNCHRONIZE = true }, - }, - .creation = .CREATE, - .filter = .dir_only, - }) catch |err| switch (err) { - error.IsDir => return error.Unexpected, - error.PipeBusy => return error.Unexpected, - error.NoDevice => return error.Unexpected, - error.WouldBlock => return error.Unexpected, - error.AntivirusInterference => return error.Unexpected, - else => |e| return e, + + const syscall: Syscall = try .start(); + const sub_dir_handle = while (true) { + break windows.OpenFile(sub_path_w.span(), .{ + .dir = dir.handle, + .access_mask = .{ + .GENERIC = .{ .READ = true }, + .STANDARD = .{ .SYNCHRONIZE = true }, + }, + .creation = .CREATE, + .filter = .dir_only, + }) catch |err| switch (err) { + error.IsDir => return syscall.fail(error.Unexpected), + error.PipeBusy => return syscall.fail(error.Unexpected), + error.NoDevice => return syscall.fail(error.Unexpected), + error.WouldBlock => return syscall.fail(error.Unexpected), + error.AntivirusInterference => return syscall.fail(error.Unexpected), + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return syscall.fail(e), + }; }; + syscall.finish(); windows.CloseHandle(sub_dir_handle); } @@ -2225,9 +2276,7 @@ fn dirCreateDirPathOpenWindows( .path = sub_path, }; - while (true) { - try Thread.checkCancel(); - + components: while (true) { const sub_path_w_array = try w.sliceToPrefixedFileW(dir.handle, component.path); const sub_path_w = sub_path_w_array.span(); const is_last = it.peekNext() == null; @@ -2242,7 +2291,9 @@ fn dirCreateDirPathOpenWindows( .Buffer = @constCast(sub_path_w.ptr), }; var io_status_block: w.IO_STATUS_BLOCK = undefined; - const rc = w.ntdll.NtCreateFile( + + const syscall: Syscall = try .start(); + while (true) switch (w.ntdll.NtCreateFile( &result.handle, .{ .SPECIFIC = .{ .FILE_DIRECTORY = .{ @@ -2277,16 +2328,20 @@ fn dirCreateDirPathOpenWindows( }, null, 0, - ); - - switch (rc) { + )) { .SUCCESS => { + syscall.finish(); component = it.next() orelse return result; w.CloseHandle(result.handle); + continue :components; + }, + .CANCELLED => { + try syscall.checkCancel(); continue; }, - .OBJECT_NAME_INVALID => return error.BadPathName, + .OBJECT_NAME_INVALID => return syscall.fail(error.BadPathName), .OBJECT_NAME_COLLISION => { + syscall.finish(); assert(!is_last); // stat the file and return an error if it's not a directory // this is important because otherwise a dangling symlink @@ -2297,23 +2352,24 @@ fn dirCreateDirPathOpenWindows( if (fstat.kind != .directory) return error.NotDir; component = it.next().?; - continue; + continue :components; }, .OBJECT_NAME_NOT_FOUND, .OBJECT_PATH_NOT_FOUND, => { + syscall.finish(); component = it.previous() orelse return error.FileNotFound; - continue; + continue :components; }, - .NOT_A_DIRECTORY => return error.NotDir, + .NOT_A_DIRECTORY => return syscall.fail(error.NotDir), // This can happen if the directory has 'List folder contents' permission set to 'Deny' // and the directory is trying to be opened for iteration. - .ACCESS_DENIED => return error.AccessDenied, - .INVALID_PARAMETER => |err| return w.statusBug(err), - else => return w.unexpectedStatus(rc), - } + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .INVALID_PARAMETER => |s| return syscall.ntstatusBug(s), + else => |s| return syscall.unexpectedNtstatus(s), + }; } } @@ -2637,20 +2693,31 @@ fn fileStatLinux(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); var io_status_block: windows.IO_STATUS_BLOCK = undefined; var info: windows.FILE.ALL_INFORMATION = undefined; - const rc = windows.ntdll.NtQueryInformationFile(file.handle, &io_status_block, &info, @sizeOf(windows.FILE.ALL_INFORMATION), .All); - switch (rc) { - .SUCCESS => {}, - // Buffer overflow here indicates that there is more information available than was able to be stored in the buffer - // size provided. This is treated as success because the type of variable-length information that this would be relevant for - // (name, volume name, etc) we don't care about. - .BUFFER_OVERFLOW => {}, - .INVALID_PARAMETER => |err| return windows.statusBug(err), - .ACCESS_DENIED => return error.AccessDenied, - else => return windows.unexpectedStatus(rc), + { + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtQueryInformationFile( + file.handle, + &io_status_block, + &info, + @sizeOf(windows.FILE.ALL_INFORMATION), + .All, + )) { + .SUCCESS => break syscall.finish(), + // Buffer overflow here indicates that there is more information available than was able to be stored in the buffer + // size provided. This is treated as success because the type of variable-length information that this would be relevant for + // (name, volume name, etc) we don't care about. + .BUFFER_OVERFLOW => break syscall.finish(), + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |s| return syscall.unexpectedNtstatus(s), + }; } return .{ .inode = info.InternalInformation.IndexNumber, @@ -2658,15 +2725,25 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { .permissions = .default_file, .kind = if (info.BasicInformation.FileAttributes.REPARSE_POINT) reparse_point: { var tag_info: windows.FILE.ATTRIBUTE_TAG_INFO = undefined; - const tag_rc = windows.ntdll.NtQueryInformationFile(file.handle, &io_status_block, &tag_info, @sizeOf(windows.FILE.ATTRIBUTE_TAG_INFO), .AttributeTag); - switch (tag_rc) { - .SUCCESS => {}, + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtQueryInformationFile( + file.handle, + &io_status_block, + &tag_info, + @sizeOf(windows.FILE.ATTRIBUTE_TAG_INFO), + .AttributeTag, + )) { + .SUCCESS => break syscall.finish(), // INFO_LENGTH_MISMATCH and ACCESS_DENIED are the only documented possible errors // https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/d295752f-ce89-4b98-8553-266d37c84f0e - .INFO_LENGTH_MISMATCH => |err| return windows.statusBug(err), - .ACCESS_DENIED => return error.AccessDenied, - else => return windows.unexpectedStatus(rc), - } + .INFO_LENGTH_MISMATCH => |err| return syscall.ntstatusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |s| return syscall.unexpectedNtstatus(s), + }; if (tag_info.ReparseTag.IsSurrogate) break :reparse_point .sym_link; // Unknown reparse point break :reparse_point .unknown; @@ -2853,7 +2930,6 @@ fn dirAccessWindows( ) Dir.AccessError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); _ = options; // TODO @@ -2879,16 +2955,21 @@ fn dirAccessWindows( .SecurityQualityOfService = null, }; var basic_info: windows.FILE.BASIC_INFORMATION = undefined; - switch (windows.ntdll.NtQueryAttributesFile(&attr, &basic_info)) { - .SUCCESS => return, - .OBJECT_NAME_NOT_FOUND => return error.FileNotFound, - .OBJECT_PATH_NOT_FOUND => return error.FileNotFound, - .OBJECT_NAME_INVALID => |err| return windows.statusBug(err), - .INVALID_PARAMETER => |err| return windows.statusBug(err), - .ACCESS_DENIED => return error.AccessDenied, - .OBJECT_PATH_SYNTAX_BAD => |err| return windows.statusBug(err), - else => |rc| return windows.unexpectedStatus(rc), - } + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtQueryAttributesFile(&attr, &basic_info)) { + .SUCCESS => return syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .OBJECT_NAME_NOT_FOUND => return syscall.fail(error.FileNotFound), + .OBJECT_PATH_NOT_FOUND => return syscall.fail(error.FileNotFound), + .OBJECT_NAME_INVALID => |err| return syscall.ntstatusBug(err), + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .OBJECT_PATH_SYNTAX_BAD => |err| return syscall.ntstatusBug(err), + else => |rc| return syscall.unexpectedNtstatus(rc), + }; } const dirCreateFile = switch (native_os) { @@ -3071,27 +3152,40 @@ fn dirCreateFileWindows( const w = windows; const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); const sub_path_w_array = try w.sliceToPrefixedFileW(dir.handle, sub_path); const sub_path_w = sub_path_w_array.span(); - const handle = try w.OpenFile(sub_path_w, .{ - .dir = dir.handle, - .access_mask = .{ - .STANDARD = .{ .SYNCHRONIZE = true }, - .GENERIC = .{ - .WRITE = true, - .READ = flags.read, - }, - }, - .creation = if (flags.exclusive) - .CREATE - else if (flags.truncate) - .OVERWRITE_IF - else - .OPEN_IF, - }); + const handle = handle: { + const syscall: Syscall = try .start(); + while (true) { + if (w.OpenFile(sub_path_w, .{ + .dir = dir.handle, + .access_mask = .{ + .STANDARD = .{ .SYNCHRONIZE = true }, + .GENERIC = .{ + .WRITE = true, + .READ = flags.read, + }, + }, + .creation = if (flags.exclusive) + .CREATE + else if (flags.truncate) + .OVERWRITE_IF + else + .OPEN_IF, + })) |handle| { + syscall.finish(); + break :handle handle; + } else |err| switch (err) { + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return syscall.fail(e), + } + } + }; errdefer w.CloseHandle(handle); var io_status_block: w.IO_STATUS_BLOCK = undefined; @@ -3100,7 +3194,8 @@ fn dirCreateFileWindows( .shared => false, .exclusive => true, }; - const status = w.ntdll.NtLockFile( + const syscall: Syscall = try .start(); + while (true) switch (w.ntdll.NtLockFile( handle, null, null, @@ -3111,16 +3206,16 @@ fn dirCreateFileWindows( null, @intFromBool(flags.lock_nonblocking), @intFromBool(exclusive), - ); - switch (status) { - .SUCCESS => {}, - .INSUFFICIENT_RESOURCES => return error.SystemResources, - .LOCK_NOT_GRANTED => return error.WouldBlock, - .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } - - return .{ .handle = handle }; + )) { + .SUCCESS => { + syscall.finish(); + return .{ .handle = handle }; + }, + .INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources), + .LOCK_NOT_GRANTED => return syscall.fail(error.WouldBlock), + .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer + else => |status| return syscall.unexpectedNtstatus(status), + }; } fn dirCreateFileWasi( @@ -3399,14 +3494,14 @@ fn dirOpenFileWindows( flags: File.OpenFlags, ) File.OpenError!File { const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; const sub_path_w_array = try windows.sliceToPrefixedFileW(dir.handle, sub_path); const sub_path_w = sub_path_w_array.span(); const dir_handle = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle; - return dirOpenFileWtf16(t, dir_handle, sub_path_w, flags); + return dirOpenFileWtf16(dir_handle, sub_path_w, flags); } pub fn dirOpenFileWtf16( - t: *Threaded, dir_handle: ?windows.HANDLE, sub_path_w: [:0]const u16, flags: File.OpenFlags, @@ -3415,7 +3510,6 @@ pub fn dirOpenFileWtf16( if (!allow_directory and std.mem.eql(u16, sub_path_w, &.{'.'})) return error.IsDir; if (!allow_directory and std.mem.eql(u16, sub_path_w, &.{ '.', '.' })) return error.IsDir; const path_len_bytes = std.math.cast(u16, sub_path_w.len * 2) orelse return error.NameTooLong; - _ = t; const w = windows; var nt_name: w.UNICODE_STRING = .{ @@ -3437,11 +3531,10 @@ pub fn dirOpenFileWtf16( const max_attempts = 13; var attempt: u5 = 0; + var syscall: Syscall = try .start(); const handle = while (true) { - try Thread.checkCancel(); - var result: w.HANDLE = undefined; - const rc = w.ntdll.NtCreateFile( + switch (w.ntdll.NtCreateFile( &result, .{ .STANDARD = .{ .SYNCHRONIZE = true }, @@ -3463,49 +3556,59 @@ pub fn dirOpenFileWtf16( }, null, 0, - ); - switch (rc) { - .SUCCESS => break result, - .OBJECT_NAME_INVALID => return error.BadPathName, - .OBJECT_NAME_NOT_FOUND => return error.FileNotFound, - .OBJECT_PATH_NOT_FOUND => return error.FileNotFound, - .BAD_NETWORK_PATH => return error.NetworkNotFound, // \\server was not found - .BAD_NETWORK_NAME => return error.NetworkNotFound, // \\server was found but \\server\share wasn't - .NO_MEDIA_IN_DEVICE => return error.NoDevice, - .INVALID_PARAMETER => |err| return w.statusBug(err), + )) { + .SUCCESS => { + syscall.finish(); + break result; + }, + .OBJECT_NAME_INVALID => return syscall.fail(error.BadPathName), + .OBJECT_NAME_NOT_FOUND => return syscall.fail(error.FileNotFound), + .OBJECT_PATH_NOT_FOUND => return syscall.fail(error.FileNotFound), + .BAD_NETWORK_PATH => return syscall.fail(error.NetworkNotFound), // \\server was not found + .BAD_NETWORK_NAME => return syscall.fail(error.NetworkNotFound), // \\server was found but \\server\share wasn't + .NO_MEDIA_IN_DEVICE => return syscall.fail(error.NoDevice), + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, .SHARING_VIOLATION => { // This occurs if the file attempting to be opened is a running // executable. However, there's a kernel bug: the error may be // incorrectly returned for an indeterminate amount of time // after an executable file is closed. Here we work around the // kernel bug with retry attempts. + syscall.finish(); if (max_attempts - attempt == 0) return error.SharingViolation; _ = w.kernel32.SleepEx((@as(u32, 1) << attempt) >> 1, w.TRUE); attempt += 1; + syscall = try .start(); continue; }, - .ACCESS_DENIED => return error.AccessDenied, - .PIPE_BUSY => return error.PipeBusy, - .PIPE_NOT_AVAILABLE => return error.NoDevice, - .OBJECT_PATH_SYNTAX_BAD => |err| return w.statusBug(err), - .OBJECT_NAME_COLLISION => return error.PathAlreadyExists, - .FILE_IS_A_DIRECTORY => return error.IsDir, - .NOT_A_DIRECTORY => return error.NotDir, - .USER_MAPPED_FILE => return error.AccessDenied, - .INVALID_HANDLE => |err| return w.statusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .PIPE_BUSY => return syscall.fail(error.PipeBusy), + .PIPE_NOT_AVAILABLE => return syscall.fail(error.NoDevice), + .OBJECT_PATH_SYNTAX_BAD => |err| return syscall.ntstatusBug(err), + .OBJECT_NAME_COLLISION => return syscall.fail(error.PathAlreadyExists), + .FILE_IS_A_DIRECTORY => return syscall.fail(error.IsDir), + .NOT_A_DIRECTORY => return syscall.fail(error.NotDir), + .USER_MAPPED_FILE => return syscall.fail(error.AccessDenied), + .INVALID_HANDLE => |err| return syscall.ntstatusBug(err), .DELETE_PENDING => { // This error means that there *was* a file in this location on // the file system, but it was deleted. However, the OS is not // finished with the deletion operation, and so this CreateFile // call has failed. Here, we simulate the kernel bug being // fixed by sleeping and retrying until the error goes away. + syscall.finish(); if (max_attempts - attempt == 0) return error.SharingViolation; _ = w.kernel32.SleepEx((@as(u32, 1) << attempt) >> 1, w.TRUE); attempt += 1; + syscall = try .start(); continue; }, - .VIRUS_INFECTED, .VIRUS_DELETED => return error.AntivirusInterference, - else => return w.unexpectedStatus(rc), + .VIRUS_INFECTED, .VIRUS_DELETED => return syscall.fail(error.AntivirusInterference), + else => |rc| return syscall.unexpectedNtstatus(rc), } }; errdefer w.CloseHandle(handle); @@ -3515,7 +3618,8 @@ pub fn dirOpenFileWtf16( .shared => false, .exclusive => true, }; - const status = w.ntdll.NtLockFile( + syscall = try .start(); + while (true) switch (w.ntdll.NtLockFile( handle, null, null, @@ -3526,14 +3630,13 @@ pub fn dirOpenFileWtf16( null, @intFromBool(flags.lock_nonblocking), @intFromBool(exclusive), - ); - switch (status) { - .SUCCESS => {}, - .INSUFFICIENT_RESOURCES => return error.SystemResources, - .LOCK_NOT_GRANTED => return error.WouldBlock, - .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } + )) { + .SUCCESS => break syscall.finish(), + .INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources), + .LOCK_NOT_GRANTED => return syscall.fail(error.WouldBlock), + .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer + else => |status| return syscall.unexpectedNtstatus(status), + }; return .{ .handle = handle }; } @@ -3773,8 +3876,9 @@ pub fn dirOpenDirWindows( }; var io_status_block: w.IO_STATUS_BLOCK = undefined; var result: Dir = .{ .handle = undefined }; - try Thread.checkCancel(); - const rc = w.ntdll.NtCreateFile( + + const syscall: Syscall = try .start(); + while (true) switch (w.ntdll.NtCreateFile( &result.handle, // TODO remove some of these flags if options.access_sub_paths is false .{ @@ -3810,21 +3914,26 @@ pub fn dirOpenDirWindows( }, null, 0, - ); - - switch (rc) { - .SUCCESS => return result, - .OBJECT_NAME_INVALID => return error.BadPathName, - .OBJECT_NAME_NOT_FOUND => return error.FileNotFound, + )) { + .SUCCESS => { + syscall.finish(); + return result; + }, + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .OBJECT_NAME_INVALID => return syscall.fail(error.BadPathName), + .OBJECT_NAME_NOT_FOUND => return syscall.fail(error.FileNotFound), .OBJECT_NAME_COLLISION => |err| return w.statusBug(err), - .OBJECT_PATH_NOT_FOUND => return error.FileNotFound, - .NOT_A_DIRECTORY => return error.NotDir, + .OBJECT_PATH_NOT_FOUND => return syscall.fail(error.FileNotFound), + .NOT_A_DIRECTORY => return syscall.fail(error.NotDir), // This can happen if the directory has 'List folder contents' permission set to 'Deny' // and the directory is trying to be opened for iteration. - .ACCESS_DENIED => return error.AccessDenied, - .INVALID_PARAMETER => |err| return w.statusBug(err), - else => return w.unexpectedStatus(rc), - } + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + else => |rc| return syscall.unexpectedNtstatus(rc), + }; } fn dirClose(userdata: ?*anyopaque, dirs: []const Dir) void { @@ -4264,9 +4373,9 @@ fn dirReadWindows(userdata: ?*anyopaque, dr: *Dir.Reader, buffer: []Dir.Entry) D // buffered data. if (buffer_index != 0) break; - try Thread.checkCancel(); var io_status_block: w.IO_STATUS_BLOCK = undefined; - const rc = w.ntdll.NtQueryDirectoryFile( + const syscall: Syscall = try .start(); + const rc = while (true) switch (w.ntdll.NtQueryDirectoryFile( dr.dir.handle, null, null, @@ -4278,7 +4387,16 @@ fn dirReadWindows(userdata: ?*anyopaque, dr: *Dir.Reader, buffer: []Dir.Entry) D w.FALSE, null, @intFromBool(dr.state == .reset), - ); + )) { + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |rc| { + syscall.finish(); + break rc; + }, + }; dr.state = .reading; if (io_status_block.Information == 0) { dr.state = .finished; @@ -4466,32 +4584,40 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); - var path_name_w = try windows.sliceToPrefixedFileW(dir.handle, sub_path); - const h_file = blk: { - const res = windows.OpenFile(path_name_w.span(), .{ - .dir = dir.handle, - .access_mask = .{ - .GENERIC = .{ .READ = true }, - .STANDARD = .{ .SYNCHRONIZE = true }, - }, - .creation = .OPEN, - .filter = .any, - }) catch |err| switch (err) { - error.WouldBlock => unreachable, - else => |e| return e, - }; - break :blk res; + const h_file = handle: { + const syscall: Syscall = try .start(); + while (true) { + if (windows.OpenFile(path_name_w.span(), .{ + .dir = dir.handle, + .access_mask = .{ + .GENERIC = .{ .READ = true }, + .STANDARD = .{ .SYNCHRONIZE = true }, + }, + .creation = .OPEN, + .filter = .any, + })) |handle| { + syscall.finish(); + break :handle handle; + } else |err| switch (err) { + error.WouldBlock => unreachable, + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return syscall.fail(e), + } + } }; defer windows.CloseHandle(h_file); return realPathWindows(h_file, out_buffer); } fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize { - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined; + // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + try Thread.checkCancel(); const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf); const len = std.unicode.calcWtf8Len(wide_slice); @@ -4885,8 +5011,6 @@ fn dirDeleteWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, remov _ = t; const w = windows; - try Thread.checkCancel(); - const sub_path_w_buf = try w.sliceToPrefixedFileW(dir.handle, sub_path); const sub_path_w = sub_path_w_buf.span(); @@ -4909,47 +5033,49 @@ fn dirDeleteWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, remov var io_status_block: w.IO_STATUS_BLOCK = undefined; var tmp_handle: w.HANDLE = undefined; - var rc = w.ntdll.NtCreateFile( - &tmp_handle, - .{ .STANDARD = .{ - .RIGHTS = .{ .DELETE = true }, - .SYNCHRONIZE = true, - } }, - &.{ - .Length = @sizeOf(w.OBJECT_ATTRIBUTES), - .RootDirectory = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle, - .Attributes = .{}, - .ObjectName = &nt_name, - .SecurityDescriptor = null, - .SecurityQualityOfService = null, - }, - &io_status_block, - null, - .{}, - .VALID_FLAGS, - .OPEN, - .{ - .DIRECTORY_FILE = remove_dir, - .NON_DIRECTORY_FILE = !remove_dir, - .OPEN_REPARSE_POINT = true, // would we ever want to delete the target instead? - }, - null, - 0, - ); - switch (rc) { - .SUCCESS => {}, - .OBJECT_NAME_INVALID => |err| return w.statusBug(err), - .OBJECT_NAME_NOT_FOUND => return error.FileNotFound, - .OBJECT_PATH_NOT_FOUND => return error.FileNotFound, - .BAD_NETWORK_PATH => return error.NetworkNotFound, // \\server was not found - .BAD_NETWORK_NAME => return error.NetworkNotFound, // \\server was found but \\server\share wasn't - .INVALID_PARAMETER => |err| return w.statusBug(err), - .FILE_IS_A_DIRECTORY => return error.IsDir, - .NOT_A_DIRECTORY => return error.NotDir, - .SHARING_VIOLATION => return error.FileBusy, - .ACCESS_DENIED => return error.AccessDenied, - .DELETE_PENDING => return, - else => return w.unexpectedStatus(rc), + { + const syscall: Syscall = try .start(); + while (true) switch (w.ntdll.NtCreateFile( + &tmp_handle, + .{ .STANDARD = .{ + .RIGHTS = .{ .DELETE = true }, + .SYNCHRONIZE = true, + } }, + &.{ + .Length = @sizeOf(w.OBJECT_ATTRIBUTES), + .RootDirectory = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle, + .Attributes = .{}, + .ObjectName = &nt_name, + .SecurityDescriptor = null, + .SecurityQualityOfService = null, + }, + &io_status_block, + null, + .{}, + .VALID_FLAGS, + .OPEN, + .{ + .DIRECTORY_FILE = remove_dir, + .NON_DIRECTORY_FILE = !remove_dir, + .OPEN_REPARSE_POINT = true, // would we ever want to delete the target instead? + }, + null, + 0, + )) { + .SUCCESS => break syscall.finish(), + .OBJECT_NAME_INVALID => |err| return syscall.ntstatusBug(err), + .OBJECT_NAME_NOT_FOUND => return syscall.fail(error.FileNotFound), + .OBJECT_PATH_NOT_FOUND => return syscall.fail(error.FileNotFound), + .BAD_NETWORK_PATH => return syscall.fail(error.NetworkNotFound), // \\server was not found + .BAD_NETWORK_NAME => return syscall.fail(error.NetworkNotFound), // \\server was found but \\server\share wasn't + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + .FILE_IS_A_DIRECTORY => return syscall.fail(error.IsDir), + .NOT_A_DIRECTORY => return syscall.fail(error.NotDir), + .SHARING_VIOLATION => return syscall.fail(error.FileBusy), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .DELETE_PENDING => return syscall.finish(), + else => |rc| return syscall.unexpectedNtstatus(rc), + }; } defer w.CloseHandle(tmp_handle); @@ -4964,9 +5090,7 @@ fn dirDeleteWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, remov // // The strategy here is just to try using FileDispositionInformationEx and fall back to // FileDispositionInformation if the return value lets us know that some aspect of it is not supported. - const need_fallback = need_fallback: { - try Thread.checkCancel(); - + const rc = rc: { // Deletion with posix semantics if the filesystem supports it. const info: w.FILE.DISPOSITION.INFORMATION.EX = .{ .Flags = .{ .DELETE = true, @@ -4974,29 +5098,32 @@ fn dirDeleteWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, remov .IGNORE_READONLY_ATTRIBUTE = true, } }; - rc = w.ntdll.NtSetInformationFile( + const syscall: Syscall = try .start(); + while (true) switch (w.ntdll.NtSetInformationFile( tmp_handle, &io_status_block, &info, @sizeOf(w.FILE.DISPOSITION.INFORMATION.EX), .DispositionEx, - ); - switch (rc) { - .SUCCESS => return, + )) { + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, // The filesystem does not support FileDispositionInformationEx .INVALID_PARAMETER, // The operating system does not support FileDispositionInformationEx .INVALID_INFO_CLASS, // The operating system does not support one of the flags .NOT_SUPPORTED, - => break :need_fallback true, - // For all other statuses, fall down to the switch below to handle them. - else => break :need_fallback false, - } - }; + => break, // use fallback path below; `syscall` still active - if (need_fallback) { - try Thread.checkCancel(); + // For all other statuses, fall down to the switch below to handle them. + else => |rc| { + syscall.finish(); + break :rc rc; + }, + }; // Deletion with file pending semantics, which requires waiting or moving // files to get them removed (from here). @@ -5004,14 +5131,23 @@ fn dirDeleteWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, remov .DeleteFile = w.TRUE, }; - rc = w.ntdll.NtSetInformationFile( + while (true) switch (w.ntdll.NtSetInformationFile( tmp_handle, &io_status_block, &file_dispo, @sizeOf(w.FILE.DISPOSITION.INFORMATION), .Disposition, - ); - } + )) { + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |rc| { + syscall.finish(); + break :rc rc; + }, + }; + }; switch (rc) { .SUCCESS => {}, .DIRECTORY_NOT_EMPTY => return error.DirNotEmpty, @@ -5135,23 +5271,33 @@ fn dirRenameWindows( const new_path_w = new_path_w_buf.span(); const replace_if_exists = true; - try Thread.checkCancel(); - - const src_fd = w.OpenFile(old_path_w, .{ - .dir = old_dir.handle, - .access_mask = .{ - .GENERIC = .{ .WRITE = true }, - .STANDARD = .{ - .RIGHTS = .{ .DELETE = true }, - .SYNCHRONIZE = true, - }, - }, - .creation = .OPEN, - .filter = .any, // This function is supposed to rename both files and directories. - .follow_symlinks = false, - }) catch |err| switch (err) { - error.WouldBlock => unreachable, // Not possible without `.share_access_nonblocking = true`. - else => |e| return e, + const src_fd = src_fd: { + const syscall: Syscall = try .start(); + while (true) { + if (w.OpenFile(old_path_w, .{ + .dir = old_dir.handle, + .access_mask = .{ + .GENERIC = .{ .WRITE = true }, + .STANDARD = .{ + .RIGHTS = .{ .DELETE = true }, + .SYNCHRONIZE = true, + }, + }, + .creation = .OPEN, + .filter = .any, // This function is supposed to rename both files and directories. + .follow_symlinks = false, + })) |handle| { + syscall.finish(); + break :src_fd handle; + } else |err| switch (err) { + error.WouldBlock => unreachable, // Not possible without `.share_access_nonblocking = true`. + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return e, + } + } }; defer w.CloseHandle(src_fd); @@ -5354,8 +5500,6 @@ fn dirSymLinkWindows( _ = t; const w = windows; - try Thread.checkCancel(); - // Target path does not use sliceToPrefixedFileW because certain paths // are handled differently when creating a symlink than they would be // when converting to an NT namespaced path. CreateSymbolicLink in @@ -5385,22 +5529,34 @@ fn dirSymLinkWindows( Flags: w.ULONG, }; - const symlink_handle = w.OpenFile(sym_link_path_w.span(), .{ - .access_mask = .{ - .GENERIC = .{ .READ = true, .WRITE = true }, - .STANDARD = .{ .SYNCHRONIZE = true }, - }, - .dir = dir.handle, - .creation = .CREATE, - .filter = if (flags.is_directory) .dir_only else .non_directory_only, - }) catch |err| switch (err) { - error.IsDir => return error.PathAlreadyExists, - error.NotDir => return error.Unexpected, - error.WouldBlock => return error.Unexpected, - error.PipeBusy => return error.Unexpected, - error.NoDevice => return error.Unexpected, - error.AntivirusInterference => return error.Unexpected, - else => |e| return e, + const symlink_handle = handle: { + const syscall: Syscall = try .start(); + while (true) { + if (w.OpenFile(sym_link_path_w.span(), .{ + .access_mask = .{ + .GENERIC = .{ .READ = true, .WRITE = true }, + .STANDARD = .{ .SYNCHRONIZE = true }, + }, + .dir = dir.handle, + .creation = .CREATE, + .filter = if (flags.is_directory) .dir_only else .non_directory_only, + })) |handle| { + syscall.finish(); + break :handle handle; + } else |err| switch (err) { + error.IsDir => return syscall.fail(error.PathAlreadyExists), + error.NotDir => return syscall.fail(error.Unexpected), + error.WouldBlock => return syscall.fail(error.Unexpected), + error.PipeBusy => return syscall.fail(error.Unexpected), + error.NoDevice => return syscall.fail(error.Unexpected), + error.AntivirusInterference => return syscall.fail(error.Unexpected), + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return e, + } + } }; defer w.CloseHandle(symlink_handle); @@ -5576,11 +5732,21 @@ fn dirReadLinkWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, buf _ = t; const w = windows; - try Thread.checkCancel(); - var sub_path_w_buf = try windows.sliceToPrefixedFileW(dir.handle, sub_path); - const result_w = try w.ReadLink(dir.handle, sub_path_w_buf.span(), &sub_path_w_buf.data); + const syscall: Syscall = try .start(); + const result_w = while (true) { + if (w.ReadLink(dir.handle, sub_path_w_buf.span(), &sub_path_w_buf.data)) |res| { + syscall.finish(); + break res; + } else |err| switch (err) { + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return syscall.fail(e), + } + }; const len = std.unicode.calcWtf8Len(result_w); if (len > buffer.len) return error.NameTooLong; @@ -5997,17 +6163,25 @@ fn fileSyncWindows(userdata: ?*anyopaque, file: File) File.SyncError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - try Thread.checkCancel(); - - if (windows.kernel32.FlushFileBuffers(file.handle) != 0) - return; - - switch (windows.GetLastError()) { - .SUCCESS => return, - .INVALID_HANDLE => unreachable, - .ACCESS_DENIED => return error.AccessDenied, // a sync was performed but the system couldn't update the access time - .UNEXP_NET_ERR => return error.InputOutput, - else => |err| return windows.unexpectedError(err), + const syscall: Syscall = try .start(); + while (true) { + if (windows.kernel32.FlushFileBuffers(file.handle) != 0) { + return syscall.finish(); + } + switch (windows.GetLastError()) { + .SUCCESS => unreachable, // `FlushFileBuffers` returned nonzero + .INVALID_HANDLE => unreachable, + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), // a sync was performed but the system couldn't update the access time + .UNEXP_NET_ERR => return syscall.fail(error.InputOutput), + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, + } } } @@ -6074,9 +6248,22 @@ fn fileIsTty(userdata: ?*anyopaque, file: File) Io.Cancelable!bool { fn isTty(file: File) Io.Cancelable!bool { if (is_windows) { if (try isCygwinPty(file)) return true; - try Thread.checkCancel(); var out: windows.DWORD = undefined; - return windows.kernel32.GetConsoleMode(file.handle, &out) != 0; + const syscall: Syscall = try .start(); + while (windows.kernel32.GetConsoleMode(file.handle, &out) == 0) { + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => { + syscall.finish(); + return false; + }, + } + } + syscall.finish(); + return true; } if (builtin.link_libc) { @@ -6146,35 +6333,65 @@ fn fileEnableAnsiEscapeCodes(userdata: ?*anyopaque, file: File) File.EnableAnsiE const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - if (is_windows) { - try Thread.checkCancel(); + if (!is_windows) { + if (try supportsAnsiEscapeCodes(file)) return; + return error.NotTerminalDevice; + } - // For Windows Terminal, VT Sequences processing is enabled by default. - var original_console_mode: windows.DWORD = 0; - if (windows.kernel32.GetConsoleMode(file.handle, &original_console_mode) != 0) { - if (original_console_mode & windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING != 0) return; + // For Windows Terminal, VT Sequences processing is enabled by default. + var original_console_mode: windows.DWORD = 0; - // For Windows Console, VT Sequences processing support was added in Windows 10 build 14361, but disabled by default. - // https://devblogs.microsoft.com/commandline/tmux-support-arrives-for-bash-on-ubuntu-on-windows/ - // - // Note: In Microsoft's example for enabling virtual terminal processing, it - // shows attempting to enable `DISABLE_NEWLINE_AUTO_RETURN` as well: - // https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#example-of-enabling-virtual-terminal-processing - // This is avoided because in the old Windows Console, that flag causes \n (as opposed to \r\n) - // to behave unexpectedly (the cursor moves down 1 row but remains on the same column). - // Additionally, the default console mode in Windows Terminal does not have - // `DISABLE_NEWLINE_AUTO_RETURN` set, so by only enabling `ENABLE_VIRTUAL_TERMINAL_PROCESSING` - // we end up matching the mode of Windows Terminal. - const requested_console_modes = windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING; - const console_mode = original_console_mode | requested_console_modes; - try Thread.checkCancel(); - if (windows.kernel32.SetConsoleMode(file.handle, console_mode) != 0) return; + { + const syscall: Syscall = try .start(); + while (windows.kernel32.GetConsoleMode(file.handle, &original_console_mode) == 0) { + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => { + syscall.finish(); + if (try isCygwinPty(file)) return; + return error.NotTerminalDevice; + }, + } } - if (try isCygwinPty(file)) return; - } else { - if (try supportsAnsiEscapeCodes(file)) return; + syscall.finish(); + } + + if (original_console_mode & windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING != 0) return; + + // For Windows Console, VT Sequences processing support was added in Windows 10 build 14361, but disabled by default. + // https://devblogs.microsoft.com/commandline/tmux-support-arrives-for-bash-on-ubuntu-on-windows/ + // + // Note: In Microsoft's example for enabling virtual terminal processing, it + // shows attempting to enable `DISABLE_NEWLINE_AUTO_RETURN` as well: + // https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#example-of-enabling-virtual-terminal-processing + // This is avoided because in the old Windows Console, that flag causes \n (as opposed to \r\n) + // to behave unexpectedly (the cursor moves down 1 row but remains on the same column). + // Additionally, the default console mode in Windows Terminal does not have + // `DISABLE_NEWLINE_AUTO_RETURN` set, so by only enabling `ENABLE_VIRTUAL_TERMINAL_PROCESSING` + // we end up matching the mode of Windows Terminal. + const requested_console_modes = windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING; + const console_mode = original_console_mode | requested_console_modes; + + { + const syscall: Syscall = try .start(); + while (windows.kernel32.SetConsoleMode(file.handle, console_mode) == 0) { + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => { + syscall.finish(); + if (try isCygwinPty(file)) return; + return error.NotTerminalDevice; + }, + } + } + syscall.finish(); } - return error.NotTerminalDevice; } fn fileSupportsAnsiEscapeCodes(userdata: ?*anyopaque, file: File) Io.Cancelable!bool { @@ -6185,11 +6402,27 @@ fn fileSupportsAnsiEscapeCodes(userdata: ?*anyopaque, file: File) Io.Cancelable! fn supportsAnsiEscapeCodes(file: File) Io.Cancelable!bool { if (is_windows) { - try Thread.checkCancel(); var console_mode: windows.DWORD = 0; - if (windows.kernel32.GetConsoleMode(file.handle, &console_mode) != 0) { - if (console_mode & windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING != 0) return true; + + const syscall: Syscall = try .start(); + while (windows.kernel32.GetConsoleMode(file.handle, &console_mode) == 0) { + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => { + syscall.finish(); + break; + }, + } + } else { + syscall.finish(); + if (console_mode & windows.ENABLE_VIRTUAL_TERMINAL_PROCESSING != 0) { + return true; + } } + return isCygwinPty(file); } @@ -6220,20 +6453,26 @@ fn isCygwinPty(file: File) Io.Cancelable!bool { // This allows us to avoid the more costly NtQueryInformationFile call // for handles that aren't named pipes. { - try Thread.checkCancel(); var io_status: windows.IO_STATUS_BLOCK = undefined; var device_info: windows.FILE.FS_DEVICE_INFORMATION = undefined; - const rc = windows.ntdll.NtQueryVolumeInformationFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtQueryVolumeInformationFile( handle, &io_status, &device_info, @sizeOf(windows.FILE.FS_DEVICE_INFORMATION), .Device, - ); - switch (rc) { - .SUCCESS => {}, - else => return false, - } + )) { + .SUCCESS => break syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => { + syscall.finish(); + return false; + }, + }; if (device_info.DeviceType.FileDevice != .NAMED_PIPE) return false; } @@ -6248,19 +6487,25 @@ fn isCygwinPty(file: File) Io.Cancelable!bool { var name_info_bytes align(@alignOf(windows.FILE.NAME_INFORMATION)) = [_]u8{0} ** (name_bytes_offset + num_name_bytes); var io_status_block: windows.IO_STATUS_BLOCK = undefined; - try Thread.checkCancel(); - const rc = windows.ntdll.NtQueryInformationFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtQueryInformationFile( handle, &io_status_block, &name_info_bytes, @intCast(name_info_bytes.len), .Name, - ); - switch (rc) { - .SUCCESS => {}, + )) { + .SUCCESS => break syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, .INVALID_PARAMETER => unreachable, - else => return false, - } + else => { + syscall.finish(); + return false; + }, + }; const name_info: *const windows.FILE_NAME_INFO = @ptrCast(&name_info_bytes); const name_bytes = name_info_bytes[name_bytes_offset .. name_bytes_offset + name_info.FileNameLength]; @@ -6279,28 +6524,30 @@ fn fileSetLength(userdata: ?*anyopaque, file: File, length: u64) File.SetLengthE if (signed_len < 0) return error.FileTooBig; // Avoid ambiguous EINVAL errors. if (is_windows) { - try Thread.checkCancel(); - var io_status_block: windows.IO_STATUS_BLOCK = undefined; const eof_info: windows.FILE.END_OF_FILE_INFORMATION = .{ .EndOfFile = signed_len, }; - const status = windows.ntdll.NtSetInformationFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtSetInformationFile( file.handle, &io_status_block, &eof_info, @sizeOf(windows.FILE.END_OF_FILE_INFORMATION), .EndOfFile, - ); - switch (status) { - .SUCCESS => return, - .INVALID_HANDLE => |err| return windows.statusBug(err), // Handle not open for writing. - .ACCESS_DENIED => return error.AccessDenied, - .USER_MAPPED_FILE => return error.AccessDenied, - .INVALID_PARAMETER => return error.FileTooBig, - else => return windows.unexpectedStatus(status), - } + )) { + .SUCCESS => return syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_HANDLE => |err| return syscall.ntstatusBug(err), // Handle not open for writing. + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .USER_MAPPED_FILE => return syscall.fail(error.AccessDenied), + .INVALID_PARAMETER => return syscall.fail(error.FileTooBig), + else => |status| return syscall.unexpectedNtstatus(status), + }; } if (native_os == .wasi and !builtin.link_libc) { @@ -6368,7 +6615,6 @@ fn fileSetPermissions(userdata: ?*anyopaque, file: File, permissions: File.Permi _ = t; switch (native_os) { .windows => { - try Thread.checkCancel(); var io_status_block: windows.IO_STATUS_BLOCK = undefined; const info: windows.FILE.BASIC_INFORMATION = .{ .CreationTime = 0, @@ -6377,19 +6623,23 @@ fn fileSetPermissions(userdata: ?*anyopaque, file: File, permissions: File.Permi .ChangeTime = 0, .FileAttributes = permissions.toAttributes(), }; - const status = windows.ntdll.NtSetInformationFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtSetInformationFile( file.handle, &io_status_block, &info, @sizeOf(windows.FILE.BASIC_INFORMATION), .Basic, - ); - switch (status) { - .SUCCESS => return, - .INVALID_HANDLE => |err| return windows.statusBug(err), - .ACCESS_DENIED => return error.AccessDenied, - else => return windows.unexpectedStatus(status), - } + )) { + .SUCCESS => return syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_HANDLE => |err| return syscall.ntstatusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + else => |status| return syscall.unexpectedNtstatus(status), + }; }, .wasi => return error.Unexpected, // Unsupported OS. else => return setPermissionsPosix(file.handle, permissions.toMode()), @@ -6484,8 +6734,6 @@ fn fileSetTimestamps( _ = t; if (is_windows) { - try Thread.checkCancel(); - var access_time_buffer: windows.FILETIME = undefined; var modify_time_buffer: windows.FILETIME = undefined; var system_time_buffer: windows.LARGE_INTEGER = undefined; @@ -6513,13 +6761,22 @@ fn fileSetTimestamps( }; // https://github.com/ziglang/zig/issues/1840 - const rc = windows.kernel32.SetFileTime(file.handle, null, access_ptr, modify_ptr); - if (rc == 0) { - switch (windows.GetLastError()) { - else => |err| return windows.unexpectedError(err), + const syscall: Syscall = try .start(); + while (true) { + switch (windows.kernel32.SetFileTime(file.handle, null, access_ptr, modify_ptr)) { + 0 => switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, + }, + else => return syscall.finish(), } } - return; } if (native_os == .wasi and !builtin.link_libc) { @@ -6601,27 +6858,26 @@ fn fileLock(userdata: ?*anyopaque, file: File, lock: File.Lock) File.LockError!v .none => { // To match the non-Windows behavior, unlock var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const status = windows.ntdll.NtUnlockFile( + while (true) switch (windows.ntdll.NtUnlockFile( file.handle, &io_status_block, &windows_lock_range_off, &windows_lock_range_len, 0, - ); - switch (status) { - .SUCCESS => {}, - .RANGE_NOT_LOCKED => {}, + )) { + .SUCCESS => return, + .CANCELLED => continue, + .RANGE_NOT_LOCKED => return, .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } - return; + else => |status| return windows.unexpectedStatus(status), + }; }, .shared => false, .exclusive => true, }; - try Thread.checkCancel(); var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const status = windows.ntdll.NtLockFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtLockFile( file.handle, null, null, @@ -6632,14 +6888,17 @@ fn fileLock(userdata: ?*anyopaque, file: File, lock: File.Lock) File.LockError!v null, windows.FALSE, @intFromBool(exclusive), - ); - switch (status) { - .SUCCESS => return, - .INSUFFICIENT_RESOURCES => return error.SystemResources, - .LOCK_NOT_GRANTED => |err| return windows.statusBug(err), // passed FailImmediately=false - .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } + )) { + .SUCCESS => return syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources), + .LOCK_NOT_GRANTED => |err| return syscall.ntstatusBug(err), // passed FailImmediately=false + .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer + else => |status| return syscall.unexpectedNtstatus(status), + }; } const operation: i32 = switch (lock) { @@ -6680,26 +6939,26 @@ fn fileTryLock(userdata: ?*anyopaque, file: File, lock: File.Lock) File.LockErro .none => { // To match the non-Windows behavior, unlock var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const status = windows.ntdll.NtUnlockFile( + while (true) switch (windows.ntdll.NtUnlockFile( file.handle, &io_status_block, &windows_lock_range_off, &windows_lock_range_len, 0, - ); - switch (status) { + )) { .SUCCESS => return true, + .CANCELLED => continue, .RANGE_NOT_LOCKED => return false, .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } + else => |status| return windows.unexpectedStatus(status), + }; }, .shared => false, .exclusive => true, }; - try Thread.checkCancel(); var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const status = windows.ntdll.NtLockFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtLockFile( file.handle, null, null, @@ -6710,14 +6969,23 @@ fn fileTryLock(userdata: ?*anyopaque, file: File, lock: File.Lock) File.LockErro null, windows.TRUE, @intFromBool(exclusive), - ); - switch (status) { - .SUCCESS => return true, - .INSUFFICIENT_RESOURCES => return error.SystemResources, - .LOCK_NOT_GRANTED => return false, - .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => return windows.unexpectedStatus(status), - } + )) { + .SUCCESS => { + syscall.finish(); + return true; + }, + .LOCK_NOT_GRANTED => { + syscall.finish(); + return false; + }, + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources), + .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer + else => |status| return syscall.unexpectedNtstatus(status), + }; } const operation: i32 = switch (lock) { @@ -6761,20 +7029,19 @@ fn fileUnlock(userdata: ?*anyopaque, file: File) void { if (is_windows) { var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const status = windows.ntdll.NtUnlockFile( + while (true) switch (windows.ntdll.NtUnlockFile( file.handle, &io_status_block, &windows_lock_range_off, &windows_lock_range_len, 0, - ); - if (is_debug) switch (status) { - .SUCCESS => {}, - .RANGE_NOT_LOCKED => unreachable, // Function asserts unlocked. - .ACCESS_VIOLATION => unreachable, // bad io_status_block pointer - else => unreachable, // Resource deallocation must succeed. + )) { + .SUCCESS => return, + .CANCELLED => continue, + .RANGE_NOT_LOCKED => if (is_debug) unreachable else return, // Function asserts unlocked. + .ACCESS_VIOLATION => if (is_debug) unreachable else return, // bad io_status_block pointer + else => if (is_debug) unreachable else return, // Resource deallocation must succeed. }; - return; } while (true) { @@ -6797,14 +7064,14 @@ fn fileDowngradeLock(userdata: ?*anyopaque, file: File) File.DowngradeLockError! _ = t; if (is_windows) { - try Thread.checkCancel(); // On Windows it works like a semaphore + exclusivity flag. To // implement this function, we first obtain another lock in shared // mode. This changes the exclusivity flag, but increments the // semaphore to 2. So we follow up with an NtUnlockFile which // decrements the semaphore but does not modify the exclusivity flag. var io_status_block: windows.IO_STATUS_BLOCK = undefined; - switch (windows.ntdll.NtLockFile( + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtLockFile( file.handle, null, null, @@ -6816,26 +7083,29 @@ fn fileDowngradeLock(userdata: ?*anyopaque, file: File) File.DowngradeLockError! windows.TRUE, windows.FALSE, )) { - .SUCCESS => {}, - .INSUFFICIENT_RESOURCES => |err| return windows.statusBug(err), - .LOCK_NOT_GRANTED => |err| return windows.statusBug(err), // File was not locked in exclusive mode. - .ACCESS_VIOLATION => |err| return windows.statusBug(err), // bad io_status_block pointer - else => |status| return windows.unexpectedStatus(status), - } - const status = windows.ntdll.NtUnlockFile( + .SUCCESS => break syscall.finish(), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + .INSUFFICIENT_RESOURCES => |err| return syscall.ntstatusBug(err), + .LOCK_NOT_GRANTED => |err| return syscall.ntstatusBug(err), // File was not locked in exclusive mode. + .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer + else => |status| return syscall.unexpectedNtstatus(status), + }; + while (true) switch (windows.ntdll.NtUnlockFile( file.handle, &io_status_block, &windows_lock_range_off, &windows_lock_range_len, 0, - ); - if (is_debug) switch (status) { - .SUCCESS => {}, - .RANGE_NOT_LOCKED => unreachable, // File was not locked. - .ACCESS_VIOLATION => unreachable, // bad io_status_block pointer - else => unreachable, // Resource deallocation must succeed. + )) { + .SUCCESS => return, + .CANCELLED => continue, + .RANGE_NOT_LOCKED => if (is_debug) unreachable else return, // File was not locked. + .ACCESS_VIOLATION => if (is_debug) unreachable else return, // bad io_status_block pointer + else => if (is_debug) unreachable else return, // Resource deallocation must succeed. }; - return; } const operation = posix.LOCK.SH | posix.LOCK.NB; @@ -7158,21 +7428,34 @@ fn fileReadStreamingWindows(userdata: ?*anyopaque, file: File, data: []const []u const buffer = data[index]; const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len); + const syscall: Syscall = try .start(); while (true) { - try Thread.checkCancel(); var n: DWORD = undefined; - if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) != 0) + if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) != 0) { + syscall.finish(); return n; + } switch (windows.GetLastError()) { - .IO_PENDING => |err| return windows.errorBug(err), - .OPERATION_ABORTED => continue, - .BROKEN_PIPE => return 0, - .HANDLE_EOF => return 0, - .NETNAME_DELETED => return error.ConnectionResetByPeer, - .LOCK_VIOLATION => return error.LockViolation, - .ACCESS_DENIED => return error.AccessDenied, - .INVALID_HANDLE => return error.NotOpenForReading, - else => |err| return windows.unexpectedError(err), + .IO_PENDING => |err| { + syscall.finish(); + return windows.errorBug(err); + }, + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .BROKEN_PIPE, .HANDLE_EOF => { + syscall.finish(); + return 0; + }, + .NETNAME_DELETED => return syscall.fail(error.ConnectionResetByPeer), + .LOCK_VIOLATION => return syscall.fail(error.LockViolation), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .INVALID_HANDLE => return syscall.fail(error.NotOpenForReading), + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, } } } @@ -7302,21 +7585,34 @@ fn fileReadPositionalWindows(userdata: ?*anyopaque, file: File, data: []const [] .hEvent = null, }; + const syscall: Syscall = try .start(); while (true) { - try Thread.checkCancel(); var n: DWORD = undefined; - if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, &overlapped) != 0) + if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, &overlapped) != 0) { + syscall.finish(); return n; + } switch (windows.GetLastError()) { - .IO_PENDING => |err| return windows.errorBug(err), - .OPERATION_ABORTED => continue, - .BROKEN_PIPE => return 0, - .HANDLE_EOF => return 0, - .NETNAME_DELETED => return error.ConnectionResetByPeer, - .LOCK_VIOLATION => return error.LockViolation, - .ACCESS_DENIED => return error.AccessDenied, - .INVALID_HANDLE => return error.NotOpenForReading, - else => |err| return windows.unexpectedError(err), + .IO_PENDING => |err| { + syscall.finish(); + return windows.errorBug(err); + }, + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .BROKEN_PIPE, .HANDLE_EOF => { + syscall.finish(); + return 0; + }, + .NETNAME_DELETED => return syscall.fail(error.ConnectionResetByPeer), + .LOCK_VIOLATION => return syscall.fail(error.LockViolation), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .INVALID_HANDLE => return syscall.fail(error.NotOpenForReading), + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, } } } @@ -7355,8 +7651,26 @@ fn fileSeekBy(userdata: ?*anyopaque, file: File, offset: i64) File.SeekError!voi } if (native_os == .windows) { - try Thread.checkCancel(); - return windows.SetFilePointerEx_CURRENT(fd, offset); + const syscall: Syscall = try .start(); + while (true) { + if (windows.kernel32.SetFilePointerEx(fd, offset, null, windows.FILE_CURRENT) != 0) { + return syscall.finish(); + } + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_FUNCTION => return syscall.fail(error.Unseekable), + .NEGATIVE_SEEK => return syscall.fail(error.Unseekable), + .INVALID_PARAMETER => unreachable, + .INVALID_HANDLE => unreachable, + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, + } + } } if (native_os == .wasi and !builtin.link_libc) { @@ -7422,8 +7736,31 @@ fn fileSeekTo(userdata: ?*anyopaque, file: File, offset: u64) File.SeekError!voi const fd = file.handle; if (native_os == .windows) { - try Thread.checkCancel(); - return windows.SetFilePointerEx_BEGIN(fd, offset); + // "The starting point is zero or the beginning of the file. If [FILE_BEGIN] + // is specified, then the liDistanceToMove parameter is interpreted as an unsigned value." + // https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-setfilepointerex + const ipos: windows.LARGE_INTEGER = @bitCast(offset); + + const syscall: Syscall = try .start(); + while (true) { + if (windows.kernel32.SetFilePointerEx(fd, ipos, null, windows.FILE_BEGIN) != 0) { + return syscall.finish(); + } + switch (windows.GetLastError()) { + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_FUNCTION => return syscall.fail(error.Unseekable), + .NEGATIVE_SEEK => return syscall.fail(error.Unseekable), + .INVALID_PARAMETER => unreachable, + .INVALID_HANDLE => unreachable, + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, + } + } } if (native_os == .wasi and !builtin.link_libc) { @@ -7527,7 +7864,7 @@ fn processExecutableOpen(userdata: ?*anyopaque, flags: File.OpenFlags) std.proce const image_path_unicode_string = &windows.peb().ProcessParameters.ImagePathName; const image_path_name = image_path_unicode_string.Buffer.?[0 .. image_path_unicode_string.Length / 2 :0]; const prefixed_path_w = try windows.wToPrefixedFileW(null, image_path_name); - return dirOpenFileWtf16(t, null, prefixed_path_w.span(), flags); + return dirOpenFileWtf16(null, prefixed_path_w.span(), flags); }, .driverkit, .ios, @@ -7736,7 +8073,6 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) std.process.Ex return error.FileNotFound; }, .windows => { - try Thread.checkCancel(); const w = windows; const image_path_unicode_string = &w.peb().ProcessParameters.ImagePathName; const image_path_name = image_path_unicode_string.Buffer.?[0 .. image_path_unicode_string.Length / 2 :0]; @@ -7746,24 +8082,34 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) std.process.Ex // that the symlink points to, though, so we need to get the realpath. var path_name_w_buf = try w.wToPrefixedFileW(null, image_path_name); - const h_file = blk: { - const res = w.OpenFile(path_name_w_buf.span(), .{ - .dir = null, - .access_mask = .{ - .GENERIC = .{ .READ = true }, - .STANDARD = .{ .SYNCHRONIZE = true }, - }, - .creation = .OPEN, - .filter = .any, - }) catch |err| switch (err) { - error.WouldBlock => unreachable, - else => |e| return e, - }; - break :blk res; + const h_file = handle: { + const syscall: Syscall = try .start(); + while (true) { + if (w.OpenFile(path_name_w_buf.span(), .{ + .dir = null, + .access_mask = .{ + .GENERIC = .{ .READ = true }, + .STANDARD = .{ .SYNCHRONIZE = true }, + }, + .creation = .OPEN, + .filter = .any, + })) |handle| { + syscall.finish(); + break :handle handle; + } else |err| switch (err) { + error.WouldBlock => unreachable, + error.OperationCanceled => { + try syscall.checkCancel(); + continue; + }, + else => |e| return e, + } + } }; defer w.CloseHandle(h_file); // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + try Thread.checkCancel(); const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data); const len = std.unicode.calcWtf8Len(wide_slice); @@ -7916,8 +8262,6 @@ fn writeFilePositionalWindows( bytes: []const u8, offset: u64, ) File.WritePositionalError!usize { - try Thread.checkCancel(); - var bytes_written: windows.DWORD = undefined; var overlapped: windows.OVERLAPPED = .{ .Internal = 0, @@ -7931,21 +8275,31 @@ fn writeFilePositionalWindows( .hEvent = null, }; const adjusted_len = std.math.lossyCast(u32, bytes.len); - if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, &overlapped) == 0) { + const syscall: Syscall = try .start(); + while (true) { + if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, &overlapped) != 0) { + syscall.finish(); + return bytes_written; + } switch (windows.GetLastError()) { - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.Canceled, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .NO_DATA => return error.BrokenPipe, - .INVALID_HANDLE => return error.NotOpenForWriting, - .LOCK_VIOLATION => return error.LockViolation, - .ACCESS_DENIED => return error.AccessDenied, - .WORKING_SET_QUOTA => return error.SystemResources, - else => |err| return windows.unexpectedError(err), + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_USER_BUFFER => return syscall.fail(error.SystemResources), + .NOT_ENOUGH_MEMORY => return syscall.fail(error.SystemResources), + .NOT_ENOUGH_QUOTA => return syscall.fail(error.SystemResources), + .NO_DATA => return syscall.fail(error.BrokenPipe), + .INVALID_HANDLE => return syscall.fail(error.NotOpenForWriting), + .LOCK_VIOLATION => return syscall.fail(error.LockViolation), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .WORKING_SET_QUOTA => return syscall.fail(error.SystemResources), + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, } } - return bytes_written; } fn fileWriteStreaming( @@ -8078,25 +8432,33 @@ fn writeFileStreamingWindows( handle: windows.HANDLE, bytes: []const u8, ) File.Writer.Error!usize { - try Thread.checkCancel(); - var bytes_written: windows.DWORD = undefined; const adjusted_len = std.math.lossyCast(u32, bytes.len); - if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, null) == 0) { + const syscall: Syscall = try .start(); + while (true) { + if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, null) != 0) { + syscall.finish(); + return bytes_written; + } switch (windows.GetLastError()) { - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.Canceled, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .NO_DATA => return error.BrokenPipe, - .INVALID_HANDLE => return error.NotOpenForWriting, - .LOCK_VIOLATION => return error.LockViolation, - .ACCESS_DENIED => return error.AccessDenied, - .WORKING_SET_QUOTA => return error.SystemResources, - else => |err| return windows.unexpectedError(err), + .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .INVALID_USER_BUFFER => return syscall.fail(error.SystemResources), + .NOT_ENOUGH_MEMORY => return syscall.fail(error.SystemResources), + .NOT_ENOUGH_QUOTA => return syscall.fail(error.SystemResources), + .NO_DATA => return syscall.fail(error.BrokenPipe), + .INVALID_HANDLE => return syscall.fail(error.NotOpenForWriting), + .LOCK_VIOLATION => return syscall.fail(error.LockViolation), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .WORKING_SET_QUOTA => return syscall.fail(error.SystemResources), + else => |err| { + syscall.finish(); + return windows.unexpectedError(err); + }, } } - return bytes_written; } fn fileWriteFileStreaming( @@ -8716,9 +9078,7 @@ fn fileWriteFilePositional( return error.Unimplemented; } -fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; +fn nowPosix(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const clock_id: posix.clockid_t = clockToPosix(clock); var tp: posix.timespec = undefined; switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) { @@ -8728,15 +9088,17 @@ fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp } } -const now = switch (native_os) { - .windows => nowWindows, - .wasi => nowWasi, - else => nowPosix, -}; - -fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { +fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; + return switch (native_os) { + .windows => nowWindows(clock), + .wasi => nowWasi(clock), + else => nowPosix(clock), + }; +} + +fn nowWindows(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { switch (clock) { .real => { // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds @@ -8769,25 +9131,24 @@ fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestam } } -fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; +fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { var ns: std.os.wasi.timestamp_t = undefined; const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns); if (err != .SUCCESS) return error.Unexpected; return .fromNanoseconds(ns); } -const sleep = switch (native_os) { - .windows => sleepWindows, - .wasi => sleepWasi, - .linux => sleepLinux, - else => sleepPosix, -}; - -fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { +fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; + if (use_parking_sleep) return parking_sleep.sleep(timeout); + switch (native_os) { + .wasi => return sleepWasi(t, timeout), + .linux => return sleepLinux(timeout), + else => return sleepPosix(t, timeout), + } +} + +fn sleepLinux(timeout: Io.Timeout) Io.SleepError!void { const clock_id: posix.clockid_t = clockToPosix(switch (timeout) { .none => .awake, .duration => |d| d.clock, @@ -8824,21 +9185,7 @@ fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { } } -fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const t_io = ioBasic(t); - try Thread.checkCancel(); - const ms = ms: { - const d = (try timeout.toDurationFromNow(t_io)) orelse - break :ms std.math.maxInt(windows.DWORD); - break :ms std.math.lossyCast(windows.DWORD, d.raw.toMilliseconds()); - }; - // TODO: alertable true with checkCancel in a loop plus deadline - _ = windows.kernel32.SleepEx(ms, windows.FALSE); -} - -fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); +fn sleepWasi(t: *Threaded, timeout: Io.Timeout) Io.SleepError!void { const t_io = ioBasic(t); const w = std.os.wasi; @@ -8867,8 +9214,7 @@ fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { syscall.finish(); } -fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); +fn sleepPosix(t: *Threaded, timeout: Io.Timeout) Io.SleepError!void { const t_io = ioBasic(t); const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type; const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type; @@ -9037,93 +9383,90 @@ fn netListenIpWindows( var storage: WsaAddress = undefined; var addr_len = addressToWsa(&address, &storage); - { - const syscall: Syscall = try .start(); - while (true) { - const rc = ws2_32.bind(socket_handle, &storage.any, addr_len); - if (rc != ws2_32.SOCKET_ERROR) { - syscall.finish(); - break; - } - switch (ws2_32.WSAGetLastError()) { - .EINTR => { - try syscall.checkCancel(); - continue; - }, - .NOTINITIALISED => { - try initializeWsa(t); - try syscall.checkCancel(); - continue; - }, - else => |e| { - syscall.finish(); - switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, - .EADDRINUSE => return error.AddressInUse, - .EADDRNOTAVAIL => return error.AddressUnavailable, - .ENOTSOCK => |err| return wsaErrorBug(err), - .EFAULT => |err| return wsaErrorBug(err), - .EINVAL => |err| return wsaErrorBug(err), - .ENOBUFS => return error.SystemResources, - .ENETDOWN => return error.NetworkDown, - else => |err| return windows.unexpectedWSAError(err), - } - }, - } + var syscall: Syscall = try .start(); + while (true) { + const rc = ws2_32.bind(socket_handle, &storage.any, addr_len); + if (rc != ws2_32.SOCKET_ERROR) { + syscall.finish(); + break; } - } - { - const syscall: Syscall = try .start(); - while (true) { - const rc = ws2_32.listen(socket_handle, options.kernel_backlog); - if (rc != ws2_32.SOCKET_ERROR) { + switch (ws2_32.WSAGetLastError()) { + .NOTINITIALISED => { syscall.finish(); - break; - } - switch (ws2_32.WSAGetLastError()) { - .EINTR => { - try syscall.checkCancel(); - continue; - }, - .NOTINITIALISED => { - try initializeWsa(t); - try syscall.checkCancel(); - continue; - }, - else => |e| { - syscall.finish(); - switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, - .ENETDOWN => return error.NetworkDown, - .EADDRINUSE => return error.AddressInUse, - .EISCONN => |err| return wsaErrorBug(err), - .EINVAL => |err| return wsaErrorBug(err), - .EMFILE, .ENOBUFS => return error.SystemResources, - .ENOTSOCK => |err| return wsaErrorBug(err), - .EOPNOTSUPP => |err| return wsaErrorBug(err), - .EINPROGRESS => |err| return wsaErrorBug(err), - else => |err| return windows.unexpectedWSAError(err), - } - }, - } + try initializeWsa(t); + syscall = try .start(); + continue; + }, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => |e| { + syscall.finish(); + switch (e) { + .EADDRINUSE => return error.AddressInUse, + .EADDRNOTAVAIL => return error.AddressUnavailable, + .ENOTSOCK => |err| return wsaErrorBug(err), + .EFAULT => |err| return wsaErrorBug(err), + .EINVAL => |err| return wsaErrorBug(err), + .ENOBUFS => return error.SystemResources, + .ENETDOWN => return error.NetworkDown, + else => |err| return windows.unexpectedWSAError(err), + } + }, } } - try wsaGetSockName(t, socket_handle, &storage.any, &addr_len); - - return .{ - .socket = .{ - .handle = socket_handle, - .address = addressFromWsa(&storage), - }, - }; -} - -fn netListenIpUnavailable( - userdata: ?*anyopaque, - address: IpAddress, - options: IpAddress.ListenOptions, -) IpAddress.ListenError!net.Server { + syscall = try .start(); + while (true) { + const rc = ws2_32.listen(socket_handle, options.kernel_backlog); + if (rc != ws2_32.SOCKET_ERROR) { + syscall.finish(); + break; + } + switch (ws2_32.WSAGetLastError()) { + .NOTINITIALISED => { + syscall.finish(); + try initializeWsa(t); + syscall = try .start(); + continue; + }, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + else => |e| { + syscall.finish(); + switch (e) { + .ENETDOWN => return error.NetworkDown, + .EADDRINUSE => return error.AddressInUse, + .EISCONN => |err| return wsaErrorBug(err), + .EINVAL => |err| return wsaErrorBug(err), + .EMFILE, .ENOBUFS => return error.SystemResources, + .ENOTSOCK => |err| return wsaErrorBug(err), + .EOPNOTSUPP => |err| return wsaErrorBug(err), + .EINPROGRESS => |err| return wsaErrorBug(err), + else => |err| return windows.unexpectedWSAError(err), + } + }, + } + } + + try wsaGetSockName(t, socket_handle, &storage.any, &addr_len); + + return .{ + .socket = .{ + .handle = socket_handle, + .address = addressFromWsa(&storage), + }, + }; +} + +fn netListenIpUnavailable( + userdata: ?*anyopaque, + address: IpAddress, + options: IpAddress.ListenOptions, +) IpAddress.ListenError!net.Server { _ = userdata; _ = address; _ = options; @@ -9193,24 +9536,24 @@ fn netListenUnixWindows( var storage: WsaAddress = undefined; const addr_len = addressUnixToWsa(address, &storage); - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.bind(socket_handle, &storage.any, addr_len); if (rc != ws2_32.SOCKET_ERROR) break; switch (ws2_32.WSAGetLastError()) { - .EINTR => { - try syscall.checkCancel(); - continue; - }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); + continue; + }, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .EADDRINUSE => return error.AddressInUse, .EADDRNOTAVAIL => return error.AddressUnavailable, .ENOTSOCK => |err| return wsaErrorBug(err), @@ -9232,15 +9575,16 @@ fn netListenUnixWindows( return socket_handle; } switch (ws2_32.WSAGetLastError()) { - .EINTR => continue, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => continue, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .ENETDOWN => return error.NetworkDown, .EADDRINUSE => return error.AddressInUse, .EISCONN => |err| return wsaErrorBug(err), @@ -9469,7 +9813,7 @@ fn wsaGetSockName( addr: *ws2_32.sockaddr, addr_len: *i32, ) !void { - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.getsockname(handle, addr, addr_len); if (rc != ws2_32.SOCKET_ERROR) { @@ -9477,19 +9821,19 @@ fn wsaGetSockName( return; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .ENETDOWN => return error.NetworkDown, .EFAULT => |err| return wsaErrorBug(err), .ENOTSOCK => |err| return wsaErrorBug(err), @@ -9530,21 +9874,30 @@ fn setSocketOption(fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void fn setSocketOptionWsa(t: *Threaded, socket: Io.net.Socket.Handle, level: i32, opt_name: u32, option: u32) !void { const o: []const u8 = @ptrCast(&option); + var syscall: Syscall = try .start(); const rc = ws2_32.setsockopt(socket, level, @bitCast(opt_name), o.ptr, @intCast(o.len)); while (true) { - if (rc != ws2_32.SOCKET_ERROR) return; + if (rc != ws2_32.SOCKET_ERROR) return syscall.finish(); switch (ws2_32.WSAGetLastError()) { - .EINTR => continue, - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); continue; }, - .ENETDOWN => return error.NetworkDown, - .EFAULT => |err| return wsaErrorBug(err), - .ENOTSOCK => |err| return wsaErrorBug(err), - .EINVAL => |err| return wsaErrorBug(err), - else => |err| return windows.unexpectedWSAError(err), + .ENETDOWN => return syscall.fail(error.NetworkDown), + .EFAULT, .ENOTSOCK, .EINVAL => |err| { + syscall.finish(); + return wsaErrorBug(err); + }, + else => |err| { + syscall.finish(); + return windows.unexpectedWSAError(err); + }, } } } @@ -9592,7 +9945,7 @@ fn netConnectIpWindows( var storage: WsaAddress = undefined; var addr_len = addressToWsa(address, &storage); - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.connect(socket_handle, &storage.any, addr_len); if (rc != ws2_32.SOCKET_ERROR) { @@ -9600,19 +9953,19 @@ fn netConnectIpWindows( break; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .EADDRNOTAVAIL => return error.AddressUnavailable, .ECONNREFUSED => return error.ConnectionRefused, .ECONNRESET => return error.ConnectionResetByPeer, @@ -9682,27 +10035,36 @@ fn netConnectUnixWindows( var storage: WsaAddress = undefined; const addr_len = addressUnixToWsa(address, &storage); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.connect(socket_handle, &storage.any, addr_len); if (rc != ws2_32.SOCKET_ERROR) break; switch (ws2_32.WSAGetLastError()) { - .EINTR => continue, - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); continue; }, - - .ECONNREFUSED => return error.FileNotFound, - .EFAULT => |err| return wsaErrorBug(err), - .EINVAL => |err| return wsaErrorBug(err), - .EISCONN => |err| return wsaErrorBug(err), - .ENOTSOCK => |err| return wsaErrorBug(err), - .EWOULDBLOCK => return error.WouldBlock, - .EACCES => return error.AccessDenied, - .ENOBUFS => return error.SystemResources, - .EAFNOSUPPORT => return error.AddressFamilyUnsupported, - else => |err| return windows.unexpectedWSAError(err), + else => |e| { + syscall.finish(); + switch (e) { + .ECONNREFUSED => return error.FileNotFound, + .EFAULT => |err| return wsaErrorBug(err), + .EINVAL => |err| return wsaErrorBug(err), + .EISCONN => |err| return wsaErrorBug(err), + .ENOTSOCK => |err| return wsaErrorBug(err), + .EWOULDBLOCK => return error.WouldBlock, + .EACCES => return error.AccessDenied, + .ENOBUFS => return error.SystemResources, + .EAFNOSUPPORT => return error.AddressFamilyUnsupported, + else => |err| return windows.unexpectedWSAError(err), + } + }, } } @@ -9756,7 +10118,7 @@ fn netBindIpWindows( var storage: WsaAddress = undefined; var addr_len = addressToWsa(address, &storage); - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.bind(socket_handle, &storage.any, addr_len); if (rc != ws2_32.SOCKET_ERROR) { @@ -9764,19 +10126,19 @@ fn netBindIpWindows( break; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .EADDRINUSE => return error.AddressInUse, .EADDRNOTAVAIL => return error.AddressUnavailable, .ENOTSOCK => |err| return wsaErrorBug(err), @@ -9886,7 +10248,7 @@ fn openSocketWsa( const mode = posixSocketMode(options.mode); const protocol = posixProtocol(options.protocol); const flags: u32 = ws2_32.WSA_FLAG_OVERLAPPED | ws2_32.WSA_FLAG_NO_HANDLE_INHERIT; - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.WSASocketW(family, @bitCast(mode), @bitCast(protocol), null, 0, flags); if (rc != ws2_32.INVALID_SOCKET) { @@ -9894,19 +10256,19 @@ fn openSocketWsa( return rc; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .EAFNOSUPPORT => return error.AddressFamilyUnsupported, .EMFILE => return error.ProcessFdQuotaExceeded, .ENOBUFS => return error.SystemResources, @@ -9984,7 +10346,7 @@ fn netAcceptWindows(userdata: ?*anyopaque, listen_handle: net.Socket.Handle) net const t: *Threaded = @ptrCast(@alignCast(userdata)); var storage: WsaAddress = undefined; var addr_len: i32 = @sizeOf(WsaAddress); - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.accept(listen_handle, &storage.any, &addr_len); if (rc != ws2_32.INVALID_SOCKET) { @@ -9995,19 +10357,19 @@ fn netAcceptWindows(userdata: ?*anyopaque, listen_handle: net.Socket.Handle) net } }; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .ECONNRESET => return error.ConnectionAborted, .EFAULT => |err| return wsaErrorBug(err), .ENOTSOCK => |err| return wsaErrorBug(err), @@ -10141,48 +10503,41 @@ fn netReadWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, data: [][]u8 break :b bufs; }; + var syscall: Syscall = try .start(); while (true) { - try Thread.checkCancel(); - var flags: u32 = 0; - var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); var n: u32 = undefined; - const rc = ws2_32.WSARecv(handle, bufs.ptr, @intCast(bufs.len), &n, &flags, &overlapped, null); - if (rc != ws2_32.SOCKET_ERROR) return n; - const wsa_error: ws2_32.WinsockError = switch (ws2_32.WSAGetLastError()) { - .IO_PENDING => e: { - var result_flags: u32 = undefined; - const overlapped_rc = ws2_32.WSAGetOverlappedResult( - handle, - &overlapped, - &n, - windows.TRUE, - &result_flags, - ); - if (overlapped_rc == windows.FALSE) { - break :e ws2_32.WSAGetLastError(); - } else { - return n; - } + const rc = ws2_32.WSARecv(handle, bufs.ptr, @intCast(bufs.len), &n, &flags, null, null); + if (rc != ws2_32.SOCKET_ERROR) { + syscall.finish(); + return n; + } + switch (ws2_32.WSAGetLastError()) { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; }, - else => |err| err, - }; - switch (wsa_error) { - .EINTR => continue, - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); continue; }, - .ECONNRESET => return error.ConnectionResetByPeer, + .ECONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .ENETDOWN => return syscall.fail(error.NetworkDown), + .ENETRESET => return syscall.fail(error.ConnectionResetByPeer), + .ENOTCONN => return syscall.fail(error.SocketUnconnected), .EFAULT => unreachable, // a pointer is not completely contained in user address space. - .EINVAL => |err| return wsaErrorBug(err), - .EMSGSIZE => |err| return wsaErrorBug(err), - .ENETDOWN => return error.NetworkDown, - .ENETRESET => return error.ConnectionResetByPeer, - .ENOTCONN => return error.SocketUnconnected, - else => |err| return windows.unexpectedWSAError(err), + + else => |err| { + syscall.finish(); + switch (err) { + .EINVAL => return wsaErrorBug(err), + .EMSGSIZE => return wsaErrorBug(err), + else => return windows.unexpectedWSAError(err), + } + }, } } } @@ -10269,7 +10624,7 @@ fn netSendOne( .controllen = @intCast(message.control.len), .flags = 0, }; - const syscall: Syscall = try .start(); + var syscall: Syscall = try .start(); while (true) { const rc = posix.system.sendmsg(handle, &msg, flags); if (is_windows) { @@ -10279,19 +10634,19 @@ fn netSendOne( return; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try syscall.checkCancel(); + syscall = try .start(); continue; }, else => |e| { syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .EACCES => return error.AccessDenied, .EADDRNOTAVAIL => return error.AddressUnavailable, .ECONNRESET => return error.ConnectionResetByPeer, @@ -10729,49 +11084,44 @@ fn netWriteWindows( }, }; + var syscall: Syscall = try .start(); while (true) { - try Thread.checkCancel(); - var n: u32 = undefined; - var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); - const rc = ws2_32.WSASend(handle, &iovecs, len, &n, 0, &overlapped, null); - if (rc != ws2_32.SOCKET_ERROR) return n; - const wsa_error: ws2_32.WinsockError = switch (ws2_32.WSAGetLastError()) { - .IO_PENDING => e: { - var result_flags: u32 = undefined; - const overlapped_rc = ws2_32.WSAGetOverlappedResult( - handle, - &overlapped, - &n, - windows.TRUE, - &result_flags, - ); - if (overlapped_rc == windows.FALSE) { - break :e ws2_32.WSAGetLastError(); - } else { - return n; - } + const rc = ws2_32.WSASend(handle, &iovecs, len, &n, 0, null, null); + if (rc != ws2_32.SOCKET_ERROR) { + syscall.finish(); + return n; + } + switch (ws2_32.WSAGetLastError()) { + .IO_PENDING => unreachable, // not overlapped + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; }, - else => |err| err, - }; - switch (wsa_error) { - .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => continue, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); + syscall = try .start(); continue; }, - .ECONNABORTED => return error.ConnectionResetByPeer, - .ECONNRESET => return error.ConnectionResetByPeer, - .EINVAL => return error.SocketUnconnected, - .ENETDOWN => return error.NetworkDown, - .ENETRESET => return error.ConnectionResetByPeer, - .ENOBUFS => return error.SystemResources, - .ENOTCONN => return error.SocketUnconnected, - .ENOTSOCK => |err| return wsaErrorBug(err), - .EOPNOTSUPP => |err| return wsaErrorBug(err), - .ESHUTDOWN => |err| return wsaErrorBug(err), - else => |err| return windows.unexpectedWSAError(err), + .ECONNABORTED => return syscall.fail(error.ConnectionResetByPeer), + .ECONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .EINVAL => return syscall.fail(error.SocketUnconnected), + .ENETDOWN => return syscall.fail(error.NetworkDown), + .ENETRESET => return syscall.fail(error.ConnectionResetByPeer), + .ENOBUFS => return syscall.fail(error.SystemResources), + .ENOTCONN => return syscall.fail(error.SocketUnconnected), + + else => |err| { + syscall.finish(); + switch (err) { + .ENOTSOCK => return wsaErrorBug(err), + .EOPNOTSUPP => return wsaErrorBug(err), + .ESHUTDOWN => return wsaErrorBug(err), + else => return windows.unexpectedWSAError(err), + } + }, } } } @@ -10872,7 +11222,6 @@ fn netShutdownPosix(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.S fn netShutdownWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void { if (!have_networking) return error.NetworkDown; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); const wsa_how: i32 = switch (how) { .recv => ws2_32.SD_RECEIVE, @@ -10880,27 +11229,27 @@ fn netShutdownWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net .both => ws2_32.SD_BOTH, }; - try current_thread.beginSyscall(); + var syscall: Syscall = try .start(); while (true) { const rc = ws2_32.shutdown(handle, wsa_how); if (rc != ws2_32.SOCKET_ERROR) { - current_thread.endSyscall(); + syscall.finish(); return; } switch (ws2_32.WSAGetLastError()) { - .EINTR => { - try current_thread.checkCancel(); + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); continue; }, .NOTINITIALISED => { + syscall.finish(); try initializeWsa(t); - try current_thread.checkCancel(); + syscall = try .start(); continue; }, else => |e| { - current_thread.endSyscall(); + syscall.finish(); switch (e) { - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, .ECONNABORTED => return error.ConnectionAborted, .ECONNRESET => return error.ConnectionResetByPeer, .ENETDOWN => return error.NetworkDown, @@ -11093,18 +11442,17 @@ fn netLookupFallible( .provider = null, .next = null, }; - const cancel_handle: ?*windows.HANDLE = null; var res: *ws2_32.ADDRINFOEXW = undefined; const timeout: ?*ws2_32.timeval = null; while (true) { + // TODO: hook this up to cancelation with `Thread.Status.cancelation.blocked_windows_dns`. + // See matching TODO in `Thread.cancelAwaitable`. try Thread.checkCancel(); - // TODO make this append to the queue eagerly rather than blocking until - // the whole thing finishes - const rc: ws2_32.WinsockError = @enumFromInt(ws2_32.GetAddrInfoExW(name_w, port_w, .DNS, null, &hints, &res, timeout, null, null, cancel_handle)); + // TODO make this append to the queue eagerly rather than blocking until the whole thing finishes + const rc: ws2_32.WinsockError = @enumFromInt(ws2_32.GetAddrInfoExW(name_w, port_w, .DNS, null, &hints, &res, timeout, null, null, null)); switch (rc) { @as(ws2_32.WinsockError, @enumFromInt(0)) => break, - .EINTR => continue, - .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => continue, .NOTINITIALISED => { try initializeWsa(t); continue; @@ -11352,29 +11700,33 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) std.process.SetCurrentD _ = t; if (is_windows) { - try Thread.checkCancel(); var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined; // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + try Thread.checkCancel(); const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer); const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong; - try Thread.checkCancel(); var nt_name: windows.UNICODE_STRING = .{ .Length = path_len_bytes, .MaximumLength = path_len_bytes, .Buffer = @constCast(dir_path.ptr), }; - switch (windows.ntdll.RtlSetCurrentDirectory_U(&nt_name)) { - .SUCCESS => return, - .OBJECT_NAME_INVALID => return error.BadPathName, - .OBJECT_NAME_NOT_FOUND => return error.FileNotFound, - .OBJECT_PATH_NOT_FOUND => return error.FileNotFound, - .NO_MEDIA_IN_DEVICE => return error.NoDevice, - .INVALID_PARAMETER => |err| return windows.statusBug(err), - .ACCESS_DENIED => return error.AccessDenied, - .OBJECT_PATH_SYNTAX_BAD => |err| return windows.statusBug(err), - .NOT_A_DIRECTORY => return error.NotDir, - else => |status| return windows.unexpectedStatus(status), - } + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.RtlSetCurrentDirectory_U(&nt_name)) { + .SUCCESS => return syscall.finish(), + .OBJECT_NAME_INVALID => return syscall.fail(error.BadPathName), + .OBJECT_NAME_NOT_FOUND => return syscall.fail(error.FileNotFound), + .OBJECT_PATH_NOT_FOUND => return syscall.fail(error.FileNotFound), + .NO_MEDIA_IN_DEVICE => return syscall.fail(error.NoDevice), + .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), + .ACCESS_DENIED => return syscall.fail(error.AccessDenied), + .OBJECT_PATH_SYNTAX_BAD => |err| return syscall.ntstatusBug(err), + .NOT_A_DIRECTORY => return syscall.fail(error.NotDir), + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| return syscall.unexpectedNtstatus(status), + }; } if (dir.handle == posix.AT.FDCWD) return; @@ -12185,391 +12537,6 @@ fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void { fn doNothingSignalHandler(_: posix.SIG) callconv(.c) void {} -const pthreads_futex = struct { - const c = std.c; - const atomic = std.atomic; - - const Event = struct { - cond: c.pthread_cond_t, - mutex: c.pthread_mutex_t, - state: enum { empty, waiting, notified }, - - fn init(self: *Event) void { - // Use static init instead of pthread_cond/mutex_init() since this is generally faster. - self.cond = .{}; - self.mutex = .{}; - self.state = .empty; - } - - fn deinit(self: *Event) void { - // Some platforms reportedly give EINVAL for statically initialized pthread types. - const rc = c.pthread_cond_destroy(&self.cond); - assert(rc == .SUCCESS or rc == .INVAL); - - const rm = c.pthread_mutex_destroy(&self.mutex); - assert(rm == .SUCCESS or rm == .INVAL); - - self.* = undefined; - } - - fn wait(self: *Event, timeout: ?u64) error{Timeout}!void { - assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - - // Early return if the event was already set. - if (self.state == .notified) { - return; - } - - // Compute the absolute timeout if one was specified. - // POSIX requires that REALTIME is used by default for the pthread timedwait functions. - // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere. - var ts: c.timespec = undefined; - if (timeout) |timeout_ns| { - ts = std.posix.clock_gettime(c.CLOCK.REALTIME) catch return error.Timeout; - ts.sec +|= @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s)); - ts.nsec += @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s)); - - if (ts.nsec >= std.time.ns_per_s) { - ts.sec +|= 1; - ts.nsec -= std.time.ns_per_s; - } - } - - // Start waiting on the event - there can be only one thread waiting. - assert(self.state == .empty); - self.state = .waiting; - - while (true) { - // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout. - const rc = blk: { - if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex); - break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); - }; - - // After waking up, check if the event was set. - if (self.state == .notified) { - return; - } - - assert(self.state == .waiting); - switch (rc) { - .SUCCESS => {}, - .TIMEDOUT => { - // If timed out, reset the event to avoid the set() thread doing an unnecessary signal(). - self.state = .empty; - return error.Timeout; - }, - .INVAL => recoverableOsBugDetected(), // cond, mutex, and potentially ts should all be valid - .PERM => recoverableOsBugDetected(), // mutex is locked when cond_*wait() functions are called - else => recoverableOsBugDetected(), - } - } - } - - fn set(self: *Event) void { - assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - - // Make sure that multiple calls to set() were not done on the same Event. - const old_state = self.state; - assert(old_state != .notified); - - // Mark the event as set and wake up the waiting thread if there was one. - // This must be done while the mutex as the wait() thread could deallocate - // the condition variable once it observes the new state, potentially causing a UAF if done unlocked. - self.state = .notified; - if (old_state == .waiting) { - assert(c.pthread_cond_signal(&self.cond) == .SUCCESS); - } - } - }; - - const Treap = std.Treap(usize, std.math.order); - const Waiter = struct { - node: Treap.Node, - prev: ?*Waiter, - next: ?*Waiter, - tail: ?*Waiter, - is_queued: bool, - event: Event, - }; - - // An unordered set of Waiters - const WaitList = struct { - top: ?*Waiter = null, - len: usize = 0, - - fn push(self: *WaitList, waiter: *Waiter) void { - waiter.next = self.top; - self.top = waiter; - self.len += 1; - } - - fn pop(self: *WaitList) ?*Waiter { - const waiter = self.top orelse return null; - self.top = waiter.next; - self.len -= 1; - return waiter; - } - }; - - const WaitQueue = struct { - fn insert(treap: *Treap, address: usize, waiter: *Waiter) void { - // prepare the waiter to be inserted. - waiter.next = null; - waiter.is_queued = true; - - // Find the wait queue entry associated with the address. - // If there isn't a wait queue on the address, this waiter creates the queue. - var entry = treap.getEntryFor(address); - const entry_node = entry.node orelse { - waiter.prev = null; - waiter.tail = waiter; - entry.set(&waiter.node); - return; - }; - - // There's a wait queue on the address; get the queue head and tail. - const head: *Waiter = @fieldParentPtr("node", entry_node); - const tail = head.tail orelse unreachable; - - // Push the waiter to the tail by replacing it and linking to the previous tail. - head.tail = waiter; - tail.next = waiter; - waiter.prev = tail; - } - - fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList { - // Find the wait queue associated with this address and get the head/tail if any. - var entry = treap.getEntryFor(address); - var queue_head: ?*Waiter = if (entry.node) |node| @fieldParentPtr("node", node) else null; - const queue_tail = if (queue_head) |head| head.tail else null; - - // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well. - defer entry.set(blk: { - const new_head = queue_head orelse break :blk null; - new_head.tail = queue_tail; - break :blk &new_head.node; - }); - - var removed = WaitList{}; - while (removed.len < max_waiters) { - // dequeue and collect waiters from their wait queue. - const waiter = queue_head orelse break; - queue_head = waiter.next; - removed.push(waiter); - - // When dequeueing, we must mark is_queued as false. - // This ensures that a waiter which calls tryRemove() returns false. - assert(waiter.is_queued); - waiter.is_queued = false; - } - - return removed; - } - - fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool { - if (!waiter.is_queued) { - return false; - } - - queue_remove: { - // Find the wait queue associated with the address. - var entry = blk: { - // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup. - if (waiter.prev == null) { - assert(waiter.node.key == address); - break :blk treap.getEntryForExisting(&waiter.node); - } - break :blk treap.getEntryFor(address); - }; - - // The queue head and tail must exist if we're removing a queued waiter. - const head: *Waiter = @fieldParentPtr("node", entry.node orelse unreachable); - const tail = head.tail orelse unreachable; - - // A waiter with a previous link is never the head of the queue. - if (waiter.prev) |prev| { - assert(waiter != head); - prev.next = waiter.next; - - // A waiter with both a previous and next link is in the middle. - // We only need to update the surrounding waiter's links to remove it. - if (waiter.next) |next| { - assert(waiter != tail); - next.prev = waiter.prev; - break :queue_remove; - } - - // A waiter with a previous but no next link means it's the tail of the queue. - // In that case, we need to update the head's tail reference. - assert(waiter == tail); - head.tail = waiter.prev; - break :queue_remove; - } - - // A waiter with no previous link means it's the queue head of queue. - // We must replace (or remove) the head waiter reference in the treap. - assert(waiter == head); - entry.set(blk: { - const new_head = waiter.next orelse break :blk null; - new_head.tail = head.tail; - break :blk &new_head.node; - }); - } - - // Mark the waiter as successfully removed. - waiter.is_queued = false; - return true; - } - }; - - const Bucket = struct { - mutex: c.pthread_mutex_t align(atomic.cache_line) = .{}, - pending: atomic.Value(usize) = atomic.Value(usize).init(0), - treap: Treap = .{}, - - // Global array of buckets that addresses map to. - // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing. - var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize); - - // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353 - fn from(address: usize) *Bucket { - // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio. - // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array - // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers). - const max_multiplier_bits = @bitSizeOf(usize); - const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits); - - const max_bucket_bits = @ctz(buckets.len); - comptime assert(std.math.isPowerOfTwo(buckets.len)); - - const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits); - return &buckets[index]; - } - }; - - const Address = struct { - fn from(ptr: *const u32) usize { - // Get the alignment of the pointer. - const alignment = @alignOf(atomic.Value(u32)); - comptime assert(std.math.isPowerOfTwo(alignment)); - - // Make sure the pointer is aligned, - // then cut off the zero bits from the alignment to get the unique address. - const addr = @intFromPtr(ptr); - assert(addr & (alignment - 1) == 0); - return addr >> @ctz(@as(usize, alignment)); - } - }; - - fn wait(ptr: *const u32, expect: u32, timeout: ?u64) error{Timeout}!void { - const address = Address.from(ptr); - const bucket = Bucket.from(address); - - // Announce that there's a waiter in the bucket before checking the ptr/expect condition. - // If the announcement is reordered after the ptr check, the waiter could deadlock: - // - // - T1: checks ptr == expect which is true - // - T2: updates ptr to != expect - // - T2: does Futex.wake(), sees no pending waiters, exits - // - T1: bumps pending waiters (was reordered after the ptr == expect check) - // - T1: goes to sleep and misses both the ptr change and T2's wake up - // - // acquire barrier to ensure the announcement happens before the ptr check below. - var pending = bucket.pending.fetchAdd(1, .acquire); - assert(pending < std.math.maxInt(usize)); - - // If the wait gets canceled, remove the pending count we previously added. - // This is done outside the mutex lock to keep the critical section short in case of contention. - var canceled = false; - defer if (canceled) { - pending = bucket.pending.fetchSub(1, .monotonic); - assert(pending > 0); - }; - - var waiter: Waiter = undefined; - { - assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - canceled = @atomicLoad(u32, ptr, .monotonic) != expect; - if (canceled) { - return; - } - - waiter.event.init(); - WaitQueue.insert(&bucket.treap, address, &waiter); - } - - defer { - assert(!waiter.is_queued); - waiter.event.deinit(); - } - - waiter.event.wait(timeout) catch { - // If we fail to cancel after a timeout, it means a wake() thread - // dequeued us and will wake us up. We must wait until the event is - // set as that's a signal that the wake() thread won't access the - // waiter memory anymore. If we return early without waiting, the - // waiter on the stack would be invalidated and the wake() thread - // risks a UAF. - defer if (!canceled) waiter.event.wait(null) catch unreachable; - - assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - canceled = WaitQueue.tryRemove(&bucket.treap, address, &waiter); - if (canceled) { - return error.Timeout; - } - }; - } - - fn wake(ptr: *const u32, max_waiters: u32) void { - const address = Address.from(ptr); - const bucket = Bucket.from(address); - - // Quick check if there's even anything to wake up. - // The change to the ptr's value must happen before we check for pending waiters. - // If not, the wake() thread could miss a sleeping waiter and have it deadlock: - // - // - T2: p = has pending waiters (reordered before the ptr update) - // - T1: bump pending waiters - // - T1: if ptr == expected: sleep() - // - T2: update ptr != expected - // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping) - // - // What we really want here is a Release load, but that doesn't exist under the C11 memory model. - // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing, - // LLVM lowers the fetchAdd(0, .release) into an mfence+load which avoids gaining ownership of the cache-line. - if (bucket.pending.fetchAdd(0, .release) == 0) { - return; - } - - // Keep a list of all the waiters notified and wake then up outside the mutex critical section. - var notified = WaitList{}; - defer if (notified.len > 0) { - const pending = bucket.pending.fetchSub(notified.len, .monotonic); - assert(pending >= notified.len); - - while (notified.pop()) |waiter| { - assert(!waiter.is_queued); - waiter.event.set(); - } - }; - - assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - // Another pending check again to avoid the WaitQueue lookup if not necessary. - if (bucket.pending.load(.monotonic) > 0) { - notified = WaitQueue.remove(&bucket.treap, address, max_waiters); - } - } -}; - fn scanEnviron(t: *Threaded) void { t.mutex.lock(); defer t.mutex.unlock(); @@ -12688,3 +12655,459 @@ fn scanEnviron(t: *Threaded) void { test { _ = @import("Threaded/test.zig"); } + +const use_parking_futex = switch (builtin.target.os.tag) { + .windows => true, // RtlWaitOnAddress is a userland implementation anyway + .netbsd => true, // NetBSD has `futex(2)`, but it's historically been quite buggy. TODO: evaluate whether it's okay to use now. + .illumos => true, // Illumos has no futex mechanism + else => false, +}; +const use_parking_sleep = switch (builtin.target.os.tag) { + // On Windows, we can implement sleep either with `NtDelayExecution` (which is how `SleepEx` in + // kernel32 works) or `NtWaitForAlertByThreadId` (thread parking). We're already using the + // latter for futex, so we may as well use it for sleeping too, to maximise code reuse. I'm + // also more confident that it will always correctly handle the cancelation race (so "unpark" + // before "park" causes "park" to return immediately): it *seems* like alertable sleeps paired + // with `NtAlertThread` do actually do this too, but there could be some caveat (e.g. it might + // fail under some specific condition), whereas `NtWaitForAlertByThreadId` must reliably trigger + // this behavior because `RtlWaitOnAddress` relies on it. + .windows => true, + + // These targets have `_lwp_park`, which is superior to POSIX nanosleep because it has a better + // cancelation mechanism. + .netbsd, + .illumos, + => true, + + else => false, +}; + +const parking_futex = struct { + comptime { + assert(use_parking_futex); + } + + const Bucket = struct { + /// Used as a fast check for `wake` to avoid having to acquire `mutex` to discover there are no + /// waiters. It is important for `wait` to increment this *before* checking the futex value to + /// avoid a race. + num_waiters: std.atomic.Value(u32), + /// Protects `waiters`. + mutex: std.Thread.Mutex, + waiters: std.DoublyLinkedList, + + /// Prevent false sharing between buckets. + _: void align(std.atomic.cache_line) = {}, + + const init: Bucket = .{ .num_waiters = .init(0), .mutex = .{}, .waiters = .{} }; + }; + + const Waiter = struct { + node: std.DoublyLinkedList.Node, + address: usize, + tid: std.Thread.Id, + /// `thread_status.cancelation` is `.parked` while the thread is waiting. The single thread + /// which atomically updates it (to `.none` or `.canceling`) is responsible for: + /// + /// * Removing the `Waiter` from `Bucket.waiters` + /// * Decrementing `Bucket.num_waiters` + /// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope + /// while it is still in the `Bucket`). + thread_status: *std.atomic.Value(Thread.Status), + }; + + fn bucketForAddress(address: usize) *Bucket { + const global = struct { + /// Length must be a power of two. The longer this array, the less likely contention is + /// between different futexes. This length seems like it'll provide a reasonable balance + /// between contention and memory usage: assuming a 128-byte `Bucket` (due to cache line + /// alignment), this uses 32 KiB of memory. + var buckets: [256]Bucket = @splat(.init); + }; + + // Here we use Fibonacci hashing: the golden ratio can be used to evenly redistribute input + // values across a range, giving a poor, but extremely quick to compute, hash. + + // This literal is the rounded value of '2^64 / phi' (where 'phi' is the golden ratio). The + // shift then converts it to '2^b / phi', where 'b' is the pointer bit width. + const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - @bitSizeOf(usize)); + const hashed = address *% fibonacci_multiplier; + + comptime assert(std.math.isPowerOfTwo(global.buckets.len)); + // The high bits of `hashed` have better entropy than the low bits. + const index = hashed >> (@bitSizeOf(usize) - @ctz(global.buckets.len)); + + return &global.buckets[index]; + } + + fn wait(ptr: *const u32, expect: u32, uncancelable: bool, timeout: Io.Timeout) Io.Cancelable!void { + const bucket = bucketForAddress(@intFromPtr(ptr)); + + // Put the threadlocal access outside of the critical section. + const opt_thread = Thread.current; + const self_tid = if (opt_thread) |thread| thread.id else std.Thread.getCurrentId(); + + var waiter: Waiter = .{ + .node = undefined, // populated by list append + .address = @intFromPtr(ptr), + .tid = self_tid, + .thread_status = undefined, // populated in critical section + }; + + var status_buf: std.atomic.Value(Thread.Status) = undefined; + + { + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + + _ = bucket.num_waiters.fetchAdd(1, .acquire); + + if (@atomicLoad(u32, ptr, .monotonic) != expect) { + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + return; + } + + // This is in the critical section to avoid marking the thread as parked until we're + // certain that we're actually going to park. + waiter.thread_status = status: { + cancelable: { + if (uncancelable) break :cancelable; + const thread = opt_thread orelse break :cancelable; + switch (thread.cancel_protection) { + .blocked => break :cancelable, + .unblocked => {}, + } + thread.futex_waiter = &waiter; + const old_status = thread.status.fetchOr( + .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, + .release, // release `thread.futex_waiter` + ); + switch (old_status.cancelation) { + .none => {}, // status is now `.parked` + .canceling => { + // status is now `.canceled` + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + return error.Canceled; + }, + .canceled => break :cancelable, // status is still `.canceled` + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + // We could now be unparked for a cancelation at any time! + break :status &thread.status; + } + // This is an uncancelable wait, so just use `status_buf`. Note that the value of + // `status_buf.awaitable` is irrelevant because this is only visible to futex code, + // while only cancelation cares about `awaitable`. + status_buf.raw = .{ .cancelation = .parked, .awaitable = .null }; + break :status &status_buf; + }; + + bucket.waiters.append(&waiter.node); + } + + if (park(timeout, ptr)) { + // We were unparked by either `wake` or cancelation, so our current status is either + // `.none` or `.canceling`. In either case, they've already removed `waiter` from + // `bucket`, so we have nothing more to do! + } else |err| switch (err) { + error.Timeout => { + // We're not out of the woods yet: an unpark could race with the timeout. + const old_status = waiter.thread_status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ); + switch (old_status.cancelation) { + .parked => { + // No race. It is our responsibility to remove `waiter` from `bucket`. + // New status is `.none`. + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + bucket.waiters.remove(&waiter.node); + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + }, + .none, .canceling => { + // Race condition: the timeout was reached, then `wake` or a canceler tried + // to unpark us. Whoever did that will remove us from `bucket`. Wait for + // that (and drop the unpark request in doing so). + // New status is `.none` or `.canceling` respectively. + park(.none, ptr) catch |e| switch (e) { + error.Timeout => unreachable, + }; + }, + .canceled => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + }, + } + } + + fn wake(ptr: *const u32, max_waiters: u32) void { + if (max_waiters == 0) return; + + const bucket = bucketForAddress(@intFromPtr(ptr)); + + // To ensure the store to `ptr` is ordered before this check, we effectively want a `.release` + // load, but that doesn't exist in the C11 memory model, so emulate it with a non-mutating rmw. + if (bucket.num_waiters.fetchAdd(0, .release) == 0) { + @branchHint(.likely); + return; // no waiters + } + + // Waiters removed from the linked list under the mutex so we can unpark their threads outside + // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. + var waking_head: ?*std.DoublyLinkedList.Node = null; + { + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + + var num_removed: u32 = 0; + var it = bucket.waiters.first; + while (num_removed < max_waiters) { + const waiter: *Waiter = @fieldParentPtr("node", it orelse break); + it = waiter.node.next; + if (waiter.address != @intFromPtr(ptr)) continue; + const old_status = waiter.thread_status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ); + switch (old_status.cancelation) { + .parked => {}, // state updated to `.none` + .none => unreachable, // if another `wake` call is unparking this thread, it should have removed it from the list + .canceling => continue, // race with a canceler who hasn't called `removeCanceledWaiter` yet + .canceled => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + // We're waking this waiter. Remove them from the bucket and add them to our local list. + bucket.waiters.remove(&waiter.node); + waiter.node.next = waking_head; + waking_head = &waiter.node; + num_removed += 1; + // Signal to `waiter` that they're about to be unparked, in case we're racing with their + // timeout. See corresponding logic in `wake`. + waiter.address = 0; + } + + _ = bucket.num_waiters.fetchSub(num_removed, .monotonic); + } + + var unpark_buf: [128]UnparkTid = undefined; + var unpark_len: usize = 0; + + // Finally, unpark the threads. + while (waking_head) |node| { + waking_head = node.next; + const waiter: *Waiter = @fieldParentPtr("node", node); + unpark_buf[unpark_len] = waiter.tid; + unpark_len += 1; + if (unpark_len == unpark_buf.len) { + unpark(&unpark_buf, ptr); + unpark_len = 0; + } + } + if (unpark_len > 0) { + unpark(unpark_buf[0..unpark_len], ptr); + } + } + + fn removeCanceledWaiter(waiter: *Waiter) void { + const bucket = bucketForAddress(waiter.address); + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + bucket.waiters.remove(&waiter.node); + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + } +}; +const parking_sleep = struct { + comptime { + assert(use_parking_sleep); + } + fn sleep(timeout: Io.Timeout) Io.Cancelable!void { + const opt_thread = Thread.current; + cancelable: { + const thread = opt_thread orelse break :cancelable; + switch (thread.cancel_protection) { + .blocked => break :cancelable, + .unblocked => {}, + } + thread.futex_waiter = null; + { + const old_status = thread.status.fetchOr( + .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, + .release, // release `thread.futex_waiter` + ); + switch (old_status.cancelation) { + .none => {}, // status is now `.parked` + .canceling => return error.Canceled, // status is now `.canceled` + .canceled => break :cancelable, // status is still `.canceled` + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + } + if (park(timeout, null)) { + // The only reason this could possibly happen is cancelation. + const old_status = thread.status.load(.monotonic); + assert(old_status.cancelation == .canceling); + thread.status.store( + .{ .cancelation = .canceled, .awaitable = old_status.awaitable }, + .monotonic, + ); + return error.Canceled; + } else |err| switch (err) { + error.Timeout => { + // We're not out of the woods yet: an unpark could race with the timeout. + const old_status = thread.status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ); + switch (old_status.cancelation) { + .parked => return, // No race; new status is `.none` + .canceling => { + // Race condition: the timeout was reached, then someone tried to unpark + // us for a cancelation. Whoever did that will have called `unpark`, so + // drop that unpark request by waiting for it. + // Status is still `.canceling`. + park(.none, null) catch |e| switch (e) { + error.Timeout => unreachable, + }; + return; + }, + .none => unreachable, + .canceled => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + }, + } + } + // Uncancelable sleep; we expect not to be manually unparked. + if (park(timeout, null)) { + unreachable; // unexpected unpark + } else |err| switch (err) { + error.Timeout => return, + } + } +}; + +/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. +fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { + comptime assert(use_parking_futex or use_parking_sleep); + switch (builtin.target.os.tag) { + .windows => { + var timeout_buf: windows.LARGE_INTEGER = undefined; + const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) { + .none => null, + .deadline => |timestamp| continue :timeout .{ .duration = .{ + .clock = timestamp.clock, + .raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw), + } }, + .duration => |duration| { + _ = duration.clock; // Windows only supports monotonic + timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100)); + break :timeout &timeout_buf; + }, + }; + // `RtlWaitOnAddress` passes the futex address in as the first argument to this call, + // but it's unclear what that actually does, especially since `NtAlertThreadByThreadId` + // does *not* accept the address so the kernel can't really be using it as a hint. An + // old Microsoft blog post discusses a more traditional futex-like mechanism in the + // kernel which definitely isn't how `RtlWaitOnAddress` works today: + // + // https://devblogs.microsoft.com/oldnewthing/20160826-00/?p=94185 + // + // ...so it's possible this argument is simply a remnant which no longer does anything + // (perhaps the implementation changed during development but someone forgot to remove + // this parameter). However, to err on the side of caution, let's match the behavior of + // `RtlWaitOnAddress` and pass the pointer, in case the kernel ever does something + // stupid such as trying to dereference it. + switch (windows.ntdll.NtWaitForAlertByThreadId(addr_hint, raw_timeout)) { + .ALERTED => return, + .TIMEOUT => return error.Timeout, + else => unreachable, + } + }, + .netbsd => { + var ts_buf: posix.timespec = undefined; + const ts: ?*posix.timespec, const abstime: bool, const clock_real: bool = switch (timeout) { + .none => .{ null, false, false }, + .deadline => |timestamp| timeout: { + ts_buf = timestampToPosix(timestamp.raw.nanoseconds); + break :timeout .{ &ts_buf, true, timestamp.clock == .real }; + }, + .duration => |duration| timeout: { + ts_buf = timestampToPosix(duration.raw.nanoseconds); + break :timeout .{ &ts_buf, false, duration.clock == .real }; + }, + }; + switch (posix.errno(std.c._lwp_park( + if (clock_real) .REALTIME else .MONOTONIC, + .{ .ABSTIME = abstime }, + ts, + 0, + addr_hint, + null, + ))) { + .SUCCESS, .ALREADY, .INTR => return, + .TIMEDOUT => return error.Timeout, + .INVAL => unreachable, + .SRCH => unreachable, + else => unreachable, + } + }, + .illumos => @panic("TODO: illumos lwp_park"), + else => comptime unreachable, + } +} + +const UnparkTid = switch (builtin.target.os.tag) { + // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles? + .windows => usize, + else => std.Thread.Id, +}; +/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. +fn unpark(tids: []const UnparkTid, addr_hint: ?*const anyopaque) void { + comptime assert(use_parking_futex or use_parking_sleep); + switch (builtin.target.os.tag) { + .windows => { + // TODO: this condition is currently disabled because mingw-w64 does not contain this + // symbol. Once it's added, enable this check to use the new bulk API where possible. + if (false and (builtin.os.version_range.windows.isAtLeast(.win11_dt) orelse false)) { + _ = windows.ntdll.NtAlertMultipleThreadByThreadId(tids.ptr, @intCast(tids.len), null, null); + } else { + for (tids) |tid| { + _ = windows.ntdll.NtAlertThreadByThreadId(@intCast(tid)); + } + } + }, + .netbsd => { + switch (posix.errno(std.c._lwp_unpark_all(@ptrCast(tids.ptr), tids.len, addr_hint))) { + .SUCCESS => return, + // For errors, fall through to a loop over `tids`, though this is only expected to + // be possible for ENOMEM (and even that is questionable). + .SRCH => recoverableOsBugDetected(), + .FAULT => recoverableOsBugDetected(), + .INVAL => recoverableOsBugDetected(), + .NOMEM => {}, + else => recoverableOsBugDetected(), + } + for (tids) |tid| { + switch (posix.errno(std.c._lwp_unpark(@bitCast(tid), addr_hint))) { + .SUCCESS => {}, + .SRCH => recoverableOsBugDetected(), + else => recoverableOsBugDetected(), + } + } + }, + .illumos => @panic("TODO: illumos lwp_unpark"), + else => comptime unreachable, + } +} diff --git a/lib/std/c.zig b/lib/std/c.zig @@ -11399,6 +11399,9 @@ pub const vm_region_flavor_t = darwin.vm_region_flavor_t; pub const _ksiginfo = netbsd._ksiginfo; pub const _lwp_self = netbsd._lwp_self; +pub const _lwp_park = netbsd._lwp_park; +pub const _lwp_unpark = netbsd._lwp_unpark; +pub const _lwp_unpark_all = netbsd._lwp_unpark_all; pub const lwpid_t = netbsd.lwpid_t; pub const lwp_gettid = dragonfly.lwp_gettid; diff --git a/lib/std/c/netbsd.zig b/lib/std/c/netbsd.zig @@ -1,17 +1,35 @@ const std = @import("../std.zig"); const clock_t = std.c.clock_t; +const clockid_t = std.c.clockid_t; const pid_t = std.c.pid_t; const pthread_t = std.c.pthread_t; const sigval_t = std.c.sigval_t; const uid_t = std.c.uid_t; +const timespec = std.c.timespec; pub extern "c" fn ptrace(request: c_int, pid: pid_t, addr: ?*anyopaque, data: c_int) c_int; pub const lwpid_t = i32; -pub extern "c" fn _lwp_self() lwpid_t; pub extern "c" fn pthread_setname_np(thread: pthread_t, name: [*:0]const u8, arg: ?*anyopaque) c_int; +pub extern "c" fn _lwp_self() lwpid_t; + +pub extern "c" fn _lwp_park( + clock_id: clockid_t, + flags: packed struct(u32) { + ABSTIME: bool = false, + unused: u31 = 0, + }, + ts: ?*timespec, + unpark: lwpid_t, + hint: ?*const anyopaque, + unpark_hint: ?*const anyopaque, +) c_int; + +pub extern "c" fn _lwp_unpark(lwp: lwpid_t, hint: ?*const anyopaque) c_int; +pub extern "c" fn _lwp_unpark_all(targets: [*]const lwpid_t, ntargets: usize, hint: ?*const anyopaque) c_int; + pub const TCIFLUSH = 1; pub const TCOFLUSH = 2; pub const TCIOFLUSH = 3; diff --git a/lib/std/debug/SelfInfo/Windows.zig b/lib/std/debug/SelfInfo/Windows.zig @@ -315,8 +315,7 @@ const Module = struct { ); if (len == 0) return error.MissingDebugInfo; const name_w = name_buffer[0 .. len + 4 :0]; - // TODO eliminate the reference to Io.Threaded.global_single_threaded here - const coff_file = Io.Threaded.global_single_threaded.dirOpenFileWtf16(null, name_w, .{}) catch |err| switch (err) { + const coff_file = Io.Threaded.dirOpenFileWtf16(null, name_w, .{}) catch |err| switch (err) { error.Canceled => |e| return e, error.Unexpected => |e| return e, error.FileNotFound => return error.MissingDebugInfo, diff --git a/lib/std/os/windows.zig b/lib/std/os/windows.zig @@ -2253,7 +2253,7 @@ pub fn GetProcessHeap() ?*HEAP { pub const OBJECT_ATTRIBUTES = extern struct { Length: ULONG, RootDirectory: ?HANDLE, - ObjectName: *UNICODE_STRING, + ObjectName: ?*UNICODE_STRING, Attributes: ATTRIBUTES, SecurityDescriptor: ?*anyopaque, SecurityQualityOfService: ?*anyopaque, @@ -2306,6 +2306,7 @@ pub const OpenError = error{ NetworkNotFound, AntivirusInterference, BadPathName, + OperationCanceled, }; pub const OpenFileOptions = struct { @@ -2405,6 +2406,7 @@ pub fn OpenFile(sub_path_w: []const u16, options: OpenFileOptions) OpenError!HAN continue; }, .VIRUS_INFECTED, .VIRUS_DELETED => return error.AntivirusInterference, + .CANCELLED => return error.OperationCanceled, else => return unexpectedStatus(rc), } } @@ -2985,6 +2987,7 @@ pub const ReadLinkError = error{ AntivirusInterference, UnsupportedReparsePointType, NotLink, + OperationCanceled, }; /// `sub_path_w` will never be accessed after `out_buffer` has been written to, so it @@ -3015,6 +3018,7 @@ pub fn ReadLink(dir: ?HANDLE, sub_path_w: []const u16, out_buffer: []u16) ReadLi const rc = DeviceIoControl(result_handle, FSCTL.GET_REPARSE_POINT, .{ .out = reparse_buf[0..] }); switch (rc) { .SUCCESS => {}, + .CANCELLED => return error.OperationCanceled, .NOT_A_REPARSE_POINT => return error.NotLink, else => return unexpectedStatus(rc), } @@ -3339,71 +3343,6 @@ pub fn GetStdHandle(handle_id: DWORD) GetStdHandleError!HANDLE { return handle; } -pub const SetFilePointerError = error{ - Unseekable, - Unexpected, -}; - -/// The SetFilePointerEx function with the `dwMoveMethod` parameter set to `FILE_BEGIN`. -pub fn SetFilePointerEx_BEGIN(handle: HANDLE, offset: u64) SetFilePointerError!void { - // "The starting point is zero or the beginning of the file. If [FILE_BEGIN] - // is specified, then the liDistanceToMove parameter is interpreted as an unsigned value." - // https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-setfilepointerex - const ipos = @as(LARGE_INTEGER, @bitCast(offset)); - if (kernel32.SetFilePointerEx(handle, ipos, null, FILE_BEGIN) == 0) { - switch (GetLastError()) { - .INVALID_FUNCTION => return error.Unseekable, - .NEGATIVE_SEEK => return error.Unseekable, - .INVALID_PARAMETER => unreachable, - .INVALID_HANDLE => unreachable, - else => |err| return unexpectedError(err), - } - } -} - -/// The SetFilePointerEx function with the `dwMoveMethod` parameter set to `FILE_CURRENT`. -pub fn SetFilePointerEx_CURRENT(handle: HANDLE, offset: i64) SetFilePointerError!void { - if (kernel32.SetFilePointerEx(handle, offset, null, FILE_CURRENT) == 0) { - switch (GetLastError()) { - .INVALID_FUNCTION => return error.Unseekable, - .NEGATIVE_SEEK => return error.Unseekable, - .INVALID_PARAMETER => unreachable, - .INVALID_HANDLE => unreachable, - else => |err| return unexpectedError(err), - } - } -} - -/// The SetFilePointerEx function with the `dwMoveMethod` parameter set to `FILE_END`. -pub fn SetFilePointerEx_END(handle: HANDLE, offset: i64) SetFilePointerError!void { - if (kernel32.SetFilePointerEx(handle, offset, null, FILE_END) == 0) { - switch (GetLastError()) { - .INVALID_FUNCTION => return error.Unseekable, - .NEGATIVE_SEEK => return error.Unseekable, - .INVALID_PARAMETER => unreachable, - .INVALID_HANDLE => unreachable, - else => |err| return unexpectedError(err), - } - } -} - -/// The SetFilePointerEx function with parameters to get the current offset. -pub fn SetFilePointerEx_CURRENT_get(handle: HANDLE) SetFilePointerError!u64 { - var result: LARGE_INTEGER = undefined; - if (kernel32.SetFilePointerEx(handle, 0, &result, FILE_CURRENT) == 0) { - switch (GetLastError()) { - .INVALID_FUNCTION => return error.Unseekable, - .NEGATIVE_SEEK => return error.Unseekable, - .INVALID_PARAMETER => unreachable, - .INVALID_HANDLE => unreachable, - else => |err| return unexpectedError(err), - } - } - // Based on the docs for FILE_BEGIN, it seems that the returned signed integer - // should be interpreted as an unsigned integer. - return @as(u64, @bitCast(result)); -} - pub const QueryObjectNameError = error{ AccessDenied, InvalidHandle, @@ -3562,6 +3501,7 @@ pub fn GetFinalPathNameByHandle( error.NetworkNotFound => return error.Unexpected, error.AntivirusInterference => return error.Unexpected, error.BadPathName => return error.Unexpected, + error.OperationCanceled => @panic("TODO: better integrate cancelation"), else => |e| return e, }; defer CloseHandle(mgmt_handle); diff --git a/lib/std/os/windows/ntdll.zig b/lib/std/os/windows/ntdll.zig @@ -554,3 +554,30 @@ pub extern "ntdll" fn RtlWakeConditionVariable( pub extern "ntdll" fn RtlWakeAllConditionVariable( ConditionVariable: *CONDITION_VARIABLE, ) callconv(.winapi) void; + +pub extern "ntdll" fn NtWaitForAlertByThreadId( + Address: ?*const anyopaque, + Timeout: ?*const LARGE_INTEGER, +) callconv(.winapi) NTSTATUS; +pub extern "ntdll" fn NtAlertThreadByThreadId( + ThreadId: DWORD, +) callconv(.winapi) NTSTATUS; +pub extern "ntdll" fn NtAlertMultipleThreadByThreadId( + ThreadIds: [*]const ULONG_PTR, + ThreadCount: ULONG, + Unknown1: ?*const anyopaque, + Unknown2: ?*const anyopaque, +) callconv(.winapi) NTSTATUS; + +pub extern "ntdll" fn NtOpenThread( + ThreadHandle: *HANDLE, + DesiredAccess: ACCESS_MASK, + ObjectAttributes: *const OBJECT_ATTRIBUTES, + ClientId: *const windows.CLIENT_ID, +) callconv(.winapi) NTSTATUS; + +pub extern "ntdll" fn NtCancelSynchronousIoFile( + ThreadHandle: HANDLE, + RequestToCancel: ?*IO_STATUS_BLOCK, + IoStatusBlock: *IO_STATUS_BLOCK, +) callconv(.winapi) NTSTATUS; diff --git a/lib/std/posix.zig b/lib/std/posix.zig @@ -1124,6 +1124,7 @@ pub fn mkdirW(dir_path_w: []const u16, mode: mode_t) MakeDirError!void { error.NoDevice => return error.Unexpected, error.WouldBlock => return error.Unexpected, error.AntivirusInterference => return error.Unexpected, + error.OperationCanceled => return error.Unexpected, else => |e| return e, }; windows.CloseHandle(sub_dir_handle); diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig @@ -778,6 +778,7 @@ fn spawnWindows(self: *Child, io: Io) SpawnError!void { error.WouldBlock => return error.Unexpected, // not possible for "NUL" error.NetworkNotFound => return error.Unexpected, // not possible for "NUL" error.AntivirusInterference => return error.Unexpected, // not possible for "NUL" + error.OperationCanceled => return error.Unexpected, // we're not canceling the operation else => |e| return e, } else @@ -1129,8 +1130,7 @@ fn windowsCreateProcessPathExt( defer dir_buf.shrinkRetainingCapacity(dir_path_len); const dir_path_z = dir_buf.items[0 .. dir_buf.items.len - 1 :0]; const prefixed_path = try windows.wToPrefixedFileW(null, dir_path_z); - // TODO eliminate this reference - break :dir Io.Threaded.global_single_threaded.dirOpenDirWindows(.cwd(), prefixed_path.span(), .{ + break :dir Io.Threaded.dirOpenDirWindows(.cwd(), prefixed_path.span(), .{ .iterate = true, }) catch return error.FileNotFound; };