zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 8c5f731afbf2572f98b2840f3307e2c8a607d7a8 (tree)
parent b27bdd5af0976d0b94819d6dda557d0dbb8f0d98
Author: mlugg <mlugg@noreply.codeberg.org>
Date:   Sun, 21 Dec 2025 14:50:58 +0100

Merge pull request 'Some std.Io goodies' (#30235) from std.Io-misc into master

Reviewed-on: https://codeberg.org/ziglang/zig/pulls/30235
Reviewed-by: Andrew Kelley <andrewrk@noreply.codeberg.org>

Diffstat:
M.mailmap | 3+++
Mlib/std/Build/Fuzz.zig | 2+-
Mlib/std/Build/WebServer.zig | 4++--
Mlib/std/Io.zig | 758+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
Mlib/std/Io/Threaded.zig | 1026+++++++++++++++++++++++++++----------------------------------------------------
Mlib/std/Io/net/HostName.zig | 107++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Mlib/std/Io/net/test.zig | 30++++++++++++++----------------
Mlib/std/Io/test.zig | 172+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/Sema.zig | 27---------------------------
Msrc/codegen/x86_64/CodeGen.zig | 14++++++++------
10 files changed, 1210 insertions(+), 933 deletions(-)

diff --git a/.mailmap b/.mailmap @@ -2,6 +2,7 @@ Adam Goertz <adambgoertz@gmail.com> Ali Chraghi <alichraghi@proton.me> <alichraghi@pm.me> Andrea Orru <andrea@orru.io> <andreaorru1991@gmail.com> Andrew Kelley <andrew@ziglang.org> <superjoe30@gmail.com> +Andrew Kelley <andrew@ziglang.org> <andrewrk@noreply.codeberg.org> Bogdan Romanyuk <wrongnull@gmail.com> <65823030+wrongnull@users.noreply.github.com> Casey Banner <kcbanner@gmail.com> Dacheng Gao <successgdc@gmail.com> @@ -22,6 +23,7 @@ Felix "xq" Queißner <xq@random-projects.net> <git@masterq32.de> Felix "xq" Queißner <xq@random-projects.net> <git@mq32.de> Felix "xq" Queißner <xq@random-projects.net> <git@random-projects.net> Frank Denis <124872+jedisct1@users.noreply.github.com> <github@pureftpd.org> +Frank Denis <124872+jedisct1@users.noreply.github.com> <jedisct1@noreply.codeberg.org> Garrett Beck <garrettlennoxbeck@gmail.com> <138411610+garrettlennoxbeck@users.noreply.github.com> Gaëtan S <blaxoujunior@gmail.com> GalaxyShard <dominic.adragna@byteroach.com> @@ -55,6 +57,7 @@ Marc Tiehuis <marc@tiehu.is> <marctiehuis@gmail.com> Mason Remaley <mason@anthropicstudios.com> Mason Remaley <mason@anthropicstudios.com> <MasonRemaley@users.noreply.github.com> Matthew Lugg <mlugg@mlugg.co.uk> +Matthew Lugg <mlugg@mlugg.co.uk> <mlugg@noreply.codeberg.org> Meghan Denny <hello@nektro.net> Meghan Denny <hello@nektro.net> <meghan@bun.sh> Michael Bartnett <michael.bartnett@gmail.com> <michaelbartnett@users.noreply.github.com> diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig @@ -124,7 +124,7 @@ pub fn init( .coverage_files = .empty, .coverage_mutex = .init, .queue_mutex = .init, - .queue_cond = .{}, + .queue_cond = .init, .msg_queue = .empty, }; } diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig @@ -122,8 +122,8 @@ pub fn init(opts: Options) WebServer { .update_id = .init(0), .runner_request_mutex = .init, - .runner_request_ready_cond = .{}, - .runner_request_empty_cond = .{}, + .runner_request_ready_cond = .init, + .runner_request_empty_cond = .init, .runner_request = null, }; } diff --git a/lib/std/Io.zig b/lib/std/Io.zig @@ -650,17 +650,17 @@ pub const VTable = struct { groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, + recancel: *const fn (?*anyopaque) void, + swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection, + checkCancel: *const fn (?*anyopaque) Cancelable!void, + /// Blocks until one of the futures from the list has a result ready, such /// that awaiting it will not block. Returns that index. select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize, - mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void, - mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, - mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, - - conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void, - conditionWaitUncancelable: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) void, - conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void, + futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void, + futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, + futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, dirMake: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void, dirMakePath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void, @@ -701,7 +701,7 @@ pub const VTable = struct { netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void, netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface, netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name, - netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void, + netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void, }; pub const Cancelable = error{ @@ -986,7 +986,14 @@ pub fn Future(Result: type) type { any_future: ?*AnyFuture, result: Result, - /// Equivalent to `await` but places a cancellation request. + /// Equivalent to `await` but places a cancellation request. This causes the task to receive + /// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a + /// call to a function in `Io` which can return `error.Canceled`. + /// + /// After cancelation of a task is requested, only the next cancelation point in that task + /// will return `error.Canceled`: future points will not re-signal the cancelation. As such, + /// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation + /// requests, see also `recancel` and `CancelProtection`. /// /// Idempotent. Not threadsafe. pub fn cancel(f: *@This(), io: Io) Result { @@ -1083,6 +1090,8 @@ pub const Group = struct { /// Equivalent to `wait` but immediately requests cancellation on all /// members of the group. /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + /// /// Idempotent. Not threadsafe. pub fn cancel(g: *Group, io: Io) void { const token = g.token orelse return; @@ -1091,6 +1100,61 @@ pub const Group = struct { } }; +/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the +/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation +/// point. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn recancel(io: Io) void { + io.vtable.recancel(io.userdata); +} + +/// In rare cases, it is desirable to completely block cancelation notification, so that a region +/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every +/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce +/// cancelation points. +/// +/// To modify a task's cancel protection state, see `swapCancelProtection`. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub const CancelProtection = enum { + /// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point. + /// + /// This is the default state, which all tasks are created in. + unblocked, + /// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned). + blocked, +}; +/// Updates the current task's cancel protection state (see `CancelProtection`). +/// +/// The typical usage for this function is to protect a block of code from cancelation: +/// ``` +/// const old_cancel_protect = io.swapCancelProtection(.blocked); +/// defer _ = io.swapCancelProtection(old_cancel_protect); +/// doSomeWork() catch |err| switch (err) { +/// error.Canceled => unreachable, +/// }; +/// ``` +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection { + return io.vtable.swapCancelProtection(io.userdata, new); +} + +/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`) +/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding +/// non-blocked cancelation request, but otherwise is a no-op. +/// +/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound +/// tasks which may need to respond to cancelation before completing. Short tasks, or those which +/// perform other `Io` operations (and hence have other cancelation points), will typically already +/// respond quickly to cancelation requests. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn checkCancel(io: Io) Cancelable!void { + return io.vtable.checkCancel(io.userdata); +} + pub fn Select(comptime U: type) type { return struct { io: Io, @@ -1144,7 +1208,9 @@ pub fn Select(comptime U: type) type { const args_casted: *const Args = @ptrCast(@alignCast(context)); const unerased_select: *S = @fieldParentPtr("group", group); const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); - unerased_select.queue.putOneUncancelable(unerased_select.io, elem); + unerased_select.queue.putOneUncancelable(unerased_select.io, elem) catch |err| switch (err) { + error.Closed => unreachable, + }; } }; _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic); @@ -1158,12 +1224,17 @@ pub fn Select(comptime U: type) type { /// Not threadsafe. pub fn wait(s: *S) Cancelable!U { s.outstanding -= 1; - return s.queue.getOne(s.io); + return s.queue.getOne(s.io) catch |err| switch (err) { + error.Canceled => |e| return e, + error.Closed => unreachable, + }; } /// Equivalent to `wait` but requests cancellation on all remaining /// tasks owned by the select. /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + /// /// It is illegal to call `wait` after this. /// /// Idempotent. Not threadsafe. @@ -1174,104 +1245,340 @@ pub fn Select(comptime U: type) type { }; } +/// Atomically checks if the value at `ptr` equals `expected`, and if so, blocks until either: +/// +/// * a matching (same `ptr` argument) `futexWake` call occurs, or +/// * a spurious ("random") wakeup occurs. +/// +/// Typically, `futexWake` should be called immediately after updating the value at `ptr.*`, to +/// unblock tasks using `futexWait` to wait for the value to change from what it previously was. +/// +/// The caller is responsible for identifying spurious wakeups if necessary, typically by checking +/// the value at `ptr.*`. +/// +/// Asserts that `T` is 4 bytes in length and has a well-defined layout with no padding bits. +pub fn futexWait(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) Cancelable!void { + return futexWaitTimeout(io, T, ptr, expected, .none); +} +/// Same as `futexWait`, except also unblocks if `timeout` expires. As with `futexWait`, spurious +/// wakeups are possible. It remains the caller's responsibility to differentiate between these +/// three possible wake-up reasons if necessary. +pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T, timeout: Timeout) Cancelable!void { + comptime assert(@sizeOf(T) == 4); + const expected_raw: *align(1) const u32 = @ptrCast(&expected); + return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_raw.*, timeout); +} +/// Same as `futexWait`, except does not introduce a cancelation point. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void { + comptime assert(@sizeOf(T) == @sizeOf(u32)); + const expected_raw: *align(1) const u32 = @ptrCast(&expected); + io.vtable.futexWaitUncancelable(io.userdata, @ptrCast(ptr), expected_raw.*); +} +/// Unblocks pending futex waits on `ptr`, up to a limit of `max_waiters` calls. +pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, max_waiters: u32) void { + comptime assert(@sizeOf(T) == @sizeOf(u32)); + if (max_waiters == 0) return; + return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters); +} + pub const Mutex = struct { - state: State, + state: std.atomic.Value(State), - pub const State = enum(usize) { - locked_once = 0b00, - unlocked = 0b01, - contended = 0b10, - /// contended - _, + pub const init: Mutex = .{ .state = .init(.unlocked) }; - pub fn isUnlocked(state: State) bool { - return @intFromEnum(state) & @intFromEnum(State.unlocked) == @intFromEnum(State.unlocked); - } + const State = enum(u32) { + unlocked, + locked_once, + contended, }; - pub const init: Mutex = .{ .state = .unlocked }; - - pub fn tryLock(mutex: *Mutex) bool { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + pub fn tryLock(m: *Mutex) bool { + switch (m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - return prev_state.isUnlocked(); + .monotonic, + ) orelse return true) { + .unlocked => unreachable, + .locked_once, .contended => return false, + } } - pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + pub fn lock(m: *Mutex, io: Io) Cancelable!void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - if (prev_state.isUnlocked()) { + .monotonic, + ) orelse { @branchHint(.likely); return; + }; + if (initial_state == .contended) { + try io.futexWait(State, &m.state.raw, .contended); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + try io.futexWait(State, &m.state.raw, .contended); } - return io.vtable.mutexLock(io.userdata, prev_state, mutex); } - /// Same as `lock` but cannot be canceled. - pub fn lockUncancelable(mutex: *Mutex, io: std.Io) void { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + /// Same as `lock`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn lockUncancelable(m: *Mutex, io: Io) void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - if (prev_state.isUnlocked()) { + .monotonic, + ) orelse { @branchHint(.likely); return; + }; + if (initial_state == .contended) { + io.futexWaitUncancelable(State, &m.state.raw, .contended); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + io.futexWaitUncancelable(State, &m.state.raw, .contended); } - return io.vtable.mutexLockUncancelable(io.userdata, prev_state, mutex); } - pub fn unlock(mutex: *Mutex, io: std.Io) void { - const prev_state = @cmpxchgWeak(State, &mutex.state, .locked_once, .unlocked, .release, .acquire) orelse { - @branchHint(.likely); - return; - }; - assert(prev_state != .unlocked); // mutex not locked - return io.vtable.mutexUnlock(io.userdata, prev_state, mutex); + pub fn unlock(m: *Mutex, io: Io) void { + switch (m.state.swap(.unlocked, .release)) { + .unlocked => unreachable, + .locked_once => {}, + .contended => { + @branchHint(.unlikely); + io.futexWake(State, &m.state.raw, 1); + }, + } } }; pub const Condition = struct { - state: u64 = 0, + state: std.atomic.Value(State), + /// Incremented whenever the condition is signaled + epoch: std.atomic.Value(u32), + + const State = packed struct(u32) { + waiters: u16, + signals: u16, + }; + + pub const init: Condition = .{ + .state = .init(.{ .waiters = 0, .signals = 0 }), + .epoch = .init(0), + }; pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { - return io.vtable.conditionWait(io.userdata, cond, mutex); + try waitInner(cond, io, mutex, false); } + /// Same as `wait`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { - return io.vtable.conditionWaitUncancelable(io.userdata, cond, mutex); + waitInner(cond, io, mutex, true) catch |err| switch (err) { + error.Canceled => unreachable, + }; + } + + fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void { + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load + + { + const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters + } + + mutex.unlock(io); + defer mutex.lockUncancelable(io); + + while (true) { + const result = if (uncancelable) + io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch) + else + io.futexWait(u32, &cond.epoch.raw, epoch); + + epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod + + // Even on error, try to consume a pending signal first. Otherwise a race might + // cause a signal to get stuck in the state with no corresponding waiter. + { + var prev_state = cond.state.load(.monotonic); + while (prev_state.signals > 0) { + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters - 1, + .signals = prev_state.signals - 1, + }, .acquire, .monotonic) orelse { + // We successfully consumed a signal. + return; + }; + } + } + + // There are no more signals available; this was a spurious wakeup or an error. If it + // was an error, we will remove ourselves as a waiter and return that error. Otherwise, + // we'll loop back to the futex wait. + result catch |err| { + const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters > 0); // underflow caused by illegal state + return err; + }; + } } pub fn signal(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .one); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.signals + 1, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + io.futexWake(u32, &cond.epoch.raw, 1); + return; + }; + } } pub fn broadcast(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .all); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.waiters, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + io.futexWake(u32, &cond.epoch.raw, prev_state.waiters - prev_state.signals); + return; + }; + } } +}; - pub const Wake = enum { - /// Wake up only one thread. - one, - /// Wake up all threads. - all, - }; +/// Logical boolean flag which can be set and unset and supports a "wait until set" operation. +pub const Event = enum(u32) { + unset, + waiting, + is_set, + + /// Returns whether the logical boolean is `true`. + pub fn isSet(event: *const Event) bool { + return switch (@atomicLoad(Event, event, .acquire)) { + .unset, .waiting => false, + .is_set => true, + }; + } + + /// Blocks until the logical boolean is `true`. + pub fn wait(event: *Event, io: Io) Io.Cancelable!void { + if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { + .unset => unreachable, + .waiting => {}, + .is_set => return, + }; + errdefer { + // Ideally we would restore the event back to `.unset` instead of `.waiting`, but there + // might be other threads waiting on the event. In theory we could track the *number* of + // waiting threads in the unused bits of the `Event`, but that has its own problem: the + // waiters would wake up when a *new waiter* was added. So it's easiest to just leave + // the state at `.waiting`---at worst it causes one redundant call to `futexWake`. + } + while (true) { + try io.futexWait(Event, event, .waiting); + switch (@atomicLoad(Event, event, .acquire)) { + .unset => unreachable, // `reset` called before pending `wait` returned + .waiting => continue, + .is_set => return, + } + } + } + + /// Same as `wait`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn waitUncancelable(event: *Event, io: Io) void { + if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { + .unset => unreachable, + .waiting => {}, + .is_set => return, + }; + while (true) { + io.futexWaitUncancelable(Event, event, .waiting); + switch (@atomicLoad(Event, event, .acquire)) { + .unset => unreachable, // `reset` called before pending `wait` returned + .waiting => continue, + .is_set => return, + } + } + } + + /// Blocks the calling thread until either the logical boolean is set, the timeout expires, or a + /// spurious wakeup occurs. If the timeout expires or a spurious wakeup occurs, `error.Timeout` + /// is returned. + pub fn waitTimeout(event: *Event, io: Io, timeout: Timeout) (error{Timeout} || Cancelable)!void { + if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { + .unset => unreachable, + .waiting => assert(!builtin.single_threaded), // invalid state + .is_set => return, + }; + errdefer { + // Ideally we would restore the event back to `.unset` instead of `.waiting`, but there + // might be other threads waiting on the event. In theory we could track the *number* of + // waiting threads in the unused bits of the `Event`, but that has its own problem: the + // waiters would wake up when a *new waiter* was added. So it's easiest to just leave + // the state at `.waiting`---at worst it causes one redundant call to `futexWake`. + } + io.futexWaitTimeout(Event, event, .waiting, timeout); + switch (@atomicLoad(Event, event, .acquire)) { + .unset => unreachable, // `reset` called before pending `wait` returned + .waiting => return error.Timeout, + .is_set => return, + } + } + + /// Sets the logical boolean to true, and hence unblocks any pending calls to `wait`. The + /// logical boolean remains true until `reset` is called, so future calls to `set` have no + /// semantic effect. + /// + /// Any memory accesses prior to a `set` call are "released", so that if this `set` call causes + /// `isSet` to return `true` or a wait to finish, those tasks will be able to observe those + /// memory accesses. + pub fn set(e: *Event, io: Io) void { + switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) { + .unset, .is_set => {}, + .waiting => io.futexWake(Event, e, std.math.maxInt(u32)), + } + } + + /// Sets the logical boolean to false. + /// + /// Assumes that there is no pending call to `wait` or `waitUncancelable`. + /// + /// However, concurrent calls to `isSet`, `set`, and `reset` are allowed. + pub fn reset(e: *Event) void { + @atomicStore(Event, e, .unset, .monotonic); + } }; +pub const QueueClosedError = error{Closed}; + pub const TypeErasedQueue = struct { mutex: Mutex, + closed: bool, /// Ring buffer. This data is logically *after* queued getters. buffer: []u8, @@ -1283,12 +1590,14 @@ pub const TypeErasedQueue = struct { const Put = struct { remaining: []const u8, + needed: usize, condition: Condition, node: std.DoublyLinkedList.Node, }; const Get = struct { remaining: []u8, + needed: usize, condition: Condition, node: std.DoublyLinkedList.Node, }; @@ -1296,6 +1605,7 @@ pub const TypeErasedQueue = struct { pub fn init(buffer: []u8) TypeErasedQueue { return .{ .mutex = .init, + .closed = false, .buffer = buffer, .start = 0, .len = 0, @@ -1304,7 +1614,27 @@ pub const TypeErasedQueue = struct { }; } - pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize { + pub fn close(q: *TypeErasedQueue, io: Io) void { + q.mutex.lockUncancelable(io); + defer q.mutex.unlock(io); + q.closed = true; + { + var it = q.getters.first; + while (it) |node| : (it = node.next) { + const getter: *Get = @alignCast(@fieldParentPtr("node", node)); + getter.condition.signal(io); + } + } + { + var it = q.putters.first; + while (it) |node| : (it = node.next) { + const putter: *Put = @alignCast(@fieldParentPtr("node", node)); + putter.condition.signal(io); + } + } + } + + pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) (QueueClosedError || Cancelable)!usize { assert(elements.len >= min); if (elements.len == 0) return 0; try q.mutex.lock(io); @@ -1312,14 +1642,17 @@ pub const TypeErasedQueue = struct { return q.putLocked(io, elements, min, false); } - /// Same as `put` but cannot be canceled. - pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { + /// Same as `put`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) QueueClosedError!usize { assert(elements.len >= min); if (elements.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); return q.putLocked(io, elements, min, true) catch |err| switch (err) { error.Canceled => unreachable, + error.Closed => |e| return e, }; } @@ -1333,49 +1666,79 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize { + fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { + // A closed queue cannot be added to, even if there is space in the buffer. + if (q.closed) return error.Closed; + // Getters have first priority on the data, and only when the getters // queue is empty do we start populating the buffer. - var remaining = elements; + // The number of elements we add immediately, before possibly blocking. + var n: usize = 0; + while (q.getters.popFirst()) |getter_node| { const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node)); - const copy_len = @min(getter.remaining.len, remaining.len); + const copy_len = @min(getter.remaining.len, elements.len - n); assert(copy_len > 0); - @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]); - remaining = remaining[copy_len..]; + @memcpy(getter.remaining[0..copy_len], elements[n..][0..copy_len]); getter.remaining = getter.remaining[copy_len..]; - if (getter.remaining.len == 0) { + getter.needed -|= copy_len; + n += copy_len; + if (getter.needed == 0) { getter.condition.signal(io); - if (remaining.len > 0) continue; - } else q.getters.prepend(getter_node); - assert(remaining.len == 0); - return elements.len; + } else { + assert(n == elements.len); // we didn't have enough elements for the getter + q.getters.prepend(getter_node); + } + if (n == elements.len) return elements.len; } while (q.puttableSlice()) |slice| { - const copy_len = @min(slice.len, remaining.len); + const copy_len = @min(slice.len, elements.len - n); assert(copy_len > 0); - @memcpy(slice[0..copy_len], remaining[0..copy_len]); + @memcpy(slice[0..copy_len], elements[n..][0..copy_len]); q.len += copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) return elements.len; + n += copy_len; + if (n == elements.len) return elements.len; } - const total_filled = elements.len - remaining.len; - if (total_filled >= min) return total_filled; + // Don't block if we hit the target. + if (n >= target) return n; - var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + var pending: Put = .{ + .remaining = elements[n..], + .needed = target - n, + .condition = .init, + .node = .{}, + }; q.putters.append(&pending.node); - defer if (pending.remaining.len > 0) q.putters.remove(&pending.node); - while (pending.remaining.len > 0) if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - return elements.len; + defer if (pending.needed > 0) q.putters.remove(&pending.node); + + while (pending.needed > 0 and !q.closed) { + if (uncancelable) { + pending.condition.waitUncancelable(io, &q.mutex); + continue; + } + pending.condition.wait(io, &q.mutex) catch |err| switch (err) { + error.Canceled => if (pending.remaining.len == elements.len) { + // Canceled while waiting, and appended no elements. + return error.Canceled; + } else { + // Canceled while waiting, but appended some elements, so report those first. + io.recancel(); + return elements.len - pending.remaining.len; + }, + }; + } + if (pending.remaining.len == elements.len) { + // The queue was closed while we were waiting. We appended no elements. + assert(q.closed); + return error.Closed; + } + return elements.len - pending.remaining.len; } - pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize { + pub fn get(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) (QueueClosedError || Cancelable)!usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; try q.mutex.lock(io); @@ -1383,13 +1746,17 @@ pub const TypeErasedQueue = struct { return q.getLocked(io, buffer, min, false); } - pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize { + /// Same as `get`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) QueueClosedError!usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); return q.getLocked(io, buffer, min, true) catch |err| switch (err) { error.Canceled => unreachable, + error.Closed => |e| return e, }; } @@ -1399,21 +1766,23 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize { + fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { // The ring buffer gets first priority, then data should come from any // queued putters, then finally the ring buffer should be filled with // data from putters so they can be resumed. - var remaining = buffer; + // The number of elements we received immediately, before possibly blocking. + var n: usize = 0; + while (q.gettableSlice()) |slice| { - const copy_len = @min(slice.len, remaining.len); + const copy_len = @min(slice.len, buffer.len - n); assert(copy_len > 0); - @memcpy(remaining[0..copy_len], slice[0..copy_len]); + @memcpy(buffer[n..][0..copy_len], slice[0..copy_len]); q.start += copy_len; if (q.buffer.len - q.start == 0) q.start = 0; q.len -= copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) { + n += copy_len; + if (n == buffer.len) { q.fillRingBufferFromPutters(io); return buffer.len; } @@ -1422,33 +1791,64 @@ pub const TypeErasedQueue = struct { // Copy directly from putters into buffer. while (q.putters.popFirst()) |putter_node| { const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node)); - const copy_len = @min(putter.remaining.len, remaining.len); + const copy_len = @min(putter.remaining.len, buffer.len - n); assert(copy_len > 0); - @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]); + @memcpy(buffer[n..][0..copy_len], putter.remaining[0..copy_len]); putter.remaining = putter.remaining[copy_len..]; - remaining = remaining[copy_len..]; - if (putter.remaining.len == 0) { + putter.needed -|= copy_len; + n += copy_len; + if (putter.needed == 0) { putter.condition.signal(io); - if (remaining.len > 0) continue; - } else q.putters.prepend(putter_node); - assert(remaining.len == 0); - q.fillRingBufferFromPutters(io); - return buffer.len; + } else { + assert(n == buffer.len); // we didn't have enough space for the putter + q.putters.prepend(putter_node); + } + if (n == buffer.len) { + q.fillRingBufferFromPutters(io); + return buffer.len; + } } - // Both ring buffer and putters queue is empty. - const total_filled = buffer.len - remaining.len; - if (total_filled >= min) return total_filled; + // No need to call `fillRingBufferFromPutters` from this point onwards, + // because we emptied the ring buffer *and* the putter queue! + + // Don't block if we hit the target or if the queue is closed. Return how + // many elements we could get immediately, unless the queue was closed and + // empty, in which case report `error.Closed`. + if (n == 0 and q.closed) return error.Closed; + if (n >= target or q.closed) return n; - var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + var pending: Get = .{ + .remaining = buffer[n..], + .needed = target - n, + .condition = .init, + .node = .{}, + }; q.getters.append(&pending.node); - defer if (pending.remaining.len > 0) q.getters.remove(&pending.node); - while (pending.remaining.len > 0) if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - q.fillRingBufferFromPutters(io); - return buffer.len; + defer if (pending.needed > 0) q.getters.remove(&pending.node); + + while (pending.needed > 0 and !q.closed) { + if (uncancelable) { + pending.condition.waitUncancelable(io, &q.mutex); + continue; + } + pending.condition.wait(io, &q.mutex) catch |err| switch (err) { + error.Canceled => if (pending.remaining.len == buffer.len) { + // Canceled while waiting, and received no elements. + return error.Canceled; + } else { + // Canceled while waiting, but received some elements, so report those first. + io.recancel(); + return buffer.len - pending.remaining.len; + }, + }; + } + if (pending.remaining.len == buffer.len) { + // The queue was closed while we were waiting. We received no elements. + assert(q.closed); + return error.Closed; + } + return buffer.len - pending.remaining.len; } /// Called when there is nonzero space available in the ring buffer and @@ -1464,7 +1864,8 @@ pub const TypeErasedQueue = struct { @memcpy(slice[0..copy_len], putter.remaining[0..copy_len]); q.len += copy_len; putter.remaining = putter.remaining[copy_len..]; - if (putter.remaining.len == 0) { + putter.needed -|= copy_len; + if (putter.needed == 0) { putter.condition.signal(io); break; } @@ -1487,59 +1888,112 @@ pub fn Queue(Elem: type) type { return .{ .type_erased = .init(@ptrCast(buffer)) }; } - /// Appends elements to the end of the queue. The function returns when - /// at least `min` elements have been added to the buffer or sent - /// directly to a consumer. + pub fn close(q: *@This(), io: Io) void { + q.type_erased.close(io); + } + + /// Appends elements to the end of the queue, potentially blocking if + /// there is insufficient capacity. Returns when any one of the + /// following conditions is satisfied: /// - /// Returns how many elements have been added to the queue. + /// * At least `target` elements have been added to the queue + /// * The queue is closed + /// * The current task is canceled /// - /// Asserts that `elements.len >= min`. - pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize { - return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// Returns how many of `elements` have been added to the queue, if any. + /// If an error is returned, no elements have been added. + /// + /// If the queue is closed or the task is canceled, but some items were + /// already added before the closure or cancelation, then `put` may + /// return a number lower than `target`, in which case future calls are + /// guaranteed to return `error.Canceled` or `error.Closed`. + /// + /// A return value of 0 is only possible if `target` is 0, in which case + /// the call is guaranteed to queue as many of `elements` as is possible + /// *without* blocking. + /// + /// Asserts that `elements.len >= target`. + pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem)); } /// Same as `put` but blocks until all elements have been added to the queue. - pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void { - assert(try q.put(io, elements, elements.len) == elements.len); + /// + /// If the queue is closed or canceled, `error.Closed` or `error.Canceled` + /// is returned, and it is unspecified how many, if any, of `elements` were + /// added to the queue prior to cancelation or closure. + pub fn putAll(q: *@This(), io: Io, elements: []const Elem) (QueueClosedError || Cancelable)!void { + const n = try q.put(io, elements, elements.len); + if (n != elements.len) { + _ = try q.put(io, elements[n..], elements.len - n); + unreachable; // partial `put` implies queue was closed or we were canceled + } } - /// Same as `put` but cannot be interrupted. - pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { - return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// Same as `put`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) QueueClosedError!usize { + return @divExact(try q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void { + /// Appends `item` to the end of the queue, blocking if the queue is full. + pub fn putOne(q: *@This(), io: Io, item: Elem) (QueueClosedError || Cancelable)!void { assert(try q.put(io, &.{item}, 1) == 1); } - pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { - assert(q.putUncancelable(io, &.{item}, 1) == 1); + /// Same as `putOne`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) QueueClosedError!void { + assert(try q.putUncancelable(io, &.{item}, 1) == 1); } - /// Receives elements from the beginning of the queue. The function - /// returns when at least `min` elements have been populated inside - /// `buffer`. + /// Receives elements from the beginning of the queue, potentially blocking + /// if there are insufficient elements currently in the queue. Returns when + /// any one of the following conditions is satisfied: + /// + /// * At least `target` elements have been received from the queue + /// * The queue is closed and contains no buffered elements + /// * The current task is canceled + /// + /// Returns how many elements of `buffer` have been populated, if any. + /// If an error is returned, no elements have been populated. + /// + /// If the queue is closed or the task is canceled, but some items were + /// already received before the closure or cancelation, then `get` may + /// return a number lower than `target`, in which case future calls are + /// guaranteed to return `error.Canceled` or `error.Closed`. /// - /// Returns how many elements of `buffer` have been populated. + /// A return value of 0 is only possible if `target` is 0, in which case + /// the call is guaranteed to fill as much of `buffer` as is possible + /// *without* blocking. /// - /// Asserts that `buffer.len >= min`. - pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize { - return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// Asserts that `buffer.len >= target`. + pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { - return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// Same as `get`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) QueueClosedError!usize { + return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { + /// Receives one element from the beginning of the queue, blocking if the queue is empty. + pub fn getOne(q: *@This(), io: Io) (QueueClosedError || Cancelable)!Elem { var buf: [1]Elem = undefined; assert(try q.get(io, &buf, 1) == 1); return buf[0]; } - pub fn getOneUncancelable(q: *@This(), io: Io) Elem { + /// Same as `getOne`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn getOneUncancelable(q: *@This(), io: Io) QueueClosedError!Elem { var buf: [1]Elem = undefined; - assert(q.getUncancelable(io, &buf, 1) == 1); + assert(try q.getUncancelable(io, &buf, 1) == 1); return buf[0]; } @@ -1627,10 +2081,6 @@ pub fn concurrent( return future; } -pub fn cancelRequested(io: Io) bool { - return io.vtable.cancelRequested(io.userdata); -} - pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable; pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void { diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -86,7 +86,9 @@ const Thread = struct { /// The value that needs to be passed to pthread_kill or tgkill in order to /// send a signal. signal_id: SignaleeId, - current_closure: ?*Closure = null, + current_closure: ?*Closure, + /// Only populated if `current_closure != null`. Indicates the current cancel protection mode. + cancel_protection: Io.CancelProtection, const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; @@ -98,6 +100,12 @@ const Thread = struct { fn checkCancel(thread: *Thread) error{Canceled}!void { const closure = thread.current_closure orelse return; + + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + switch (@cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -115,6 +123,11 @@ const Thread = struct { fn beginSyscall(thread: *Thread) error{Canceled}!void { const closure = thread.current_closure orelse return; + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + switch (@cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -135,6 +148,12 @@ const Thread = struct { fn endSyscall(thread: *Thread) void { const closure = thread.current_closure orelse return; + + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + _ = @cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -155,6 +174,220 @@ const Thread = struct { fn currentSignalId() SignaleeId { return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId(); } + + fn futexWaitUncancelable(ptr: *const u32, expect: u32) void { + return Thread.futexWaitTimed(null, ptr, expect, null) catch unreachable; + } + + fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void { + return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) { + error.Canceled => return error.Canceled, + error.Timeout => unreachable, + }; + } + + fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void { + @branchHint(.cold); + + if (builtin.single_threaded) unreachable; // nobody would ever wake us + + if (builtin.cpu.arch.isWasm()) { + comptime assert(builtin.cpu.has(.wasm, .atomics)); + if (thread) |t| try t.checkCancel(); + const to: i64 = if (timeout_ns) |ns| ns else -1; + const signed_expect: i32 = @bitCast(expect); + const result = asm volatile ( + \\local.get %[ptr] + \\local.get %[expected] + \\local.get %[timeout] + \\memory.atomic.wait32 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (ptr), + [expected] "r" (signed_expect), + [timeout] "r" (to), + ); + switch (result) { + 0 => {}, // ok + 1 => {}, // expected != loaded + 2 => {}, // timeout + else => assert(!is_debug), + } + } else switch (native_os) { + .linux => { + const linux = std.os.linux; + var ts_buffer: linux.timespec = undefined; + const ts: ?*linux.timespec = if (timeout_ns) |ns| ts: { + ts_buffer = timestampToPosix(ns); + break :ts &ts_buffer; + } else null; + if (thread) |t| try t.beginSyscall(); + const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, ts); + if (thread) |t| t.endSyscall(); + switch (linux.errno(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // caller's responsibility to retry + .AGAIN => {}, // ptr.* != expect + .INVAL => {}, // possibly timeout overflow + .TIMEDOUT => {}, // timeout + .FAULT => recoverableOsBugDetected(), // ptr was invalid + else => recoverableOsBugDetected(), + } + }, + .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { + const c = std.c; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + }; + if (thread) |t| try t.beginSyscall(); + const status = switch (darwin_supports_ulock_wait2) { + true => c.__ulock_wait2(flags, ptr, expect, ns: { + const ns = timeout_ns orelse break :ns 0; + if (ns == 0) break :ns 1; + break :ns ns; + }, 0), + false => c.__ulock_wait(flags, ptr, expect, us: { + const ns = timeout_ns orelse break :us 0; + const us = std.math.lossyCast(u32, ns / std.time.ns_per_us); + if (us == 0) break :us 1; + break :us us; + }), + }; + if (thread) |t| t.endSyscall(); + if (status >= 0) return; + switch (@as(c.E, @enumFromInt(-status))) { + .INTR => {}, // spurious wake + // Address of the futex was paged out. This is unlikely, but possible in theory, and + // pthread/libdispatch on darwin bother to handle it. In this case we'll return + // without waiting, but the caller should retry anyway. + .FAULT => {}, + .TIMEDOUT => {}, // timeout + else => recoverableOsBugDetected(), + } + }, + .windows => { + var timeout_value: windows.LARGE_INTEGER = undefined; + var timeout_ptr: ?*const windows.LARGE_INTEGER = null; + // NTDLL functions work with time in units of 100 nanoseconds. + // Positive values are absolute deadlines while negative values are relative durations. + if (timeout_ns) |delay| { + timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100)); + timeout_value = -timeout_value; + timeout_ptr = &timeout_value; + } + if (thread) |t| try t.checkCancel(); + switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), timeout_ptr)) { + .SUCCESS => {}, + .CANCELLED => {}, + .TIMEOUT => {}, // timeout + else => recoverableOsBugDetected(), + } + }, + .freebsd => { + const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); + var tm_size: usize = 0; + var tm: std.c._umtx_time = undefined; + var tm_ptr: ?*const std.c._umtx_time = null; + if (timeout_ns) |ns| { + tm_ptr = &tm; + tm_size = @sizeOf(@TypeOf(tm)); + tm.flags = 0; // use relative time not UMTX_ABSTIME + tm.clockid = .MONOTONIC; + tm.timeout = timestampToPosix(ns); + } + if (thread) |t| try t.beginSyscall(); + const rc = std.c._umtx_op(@intFromPtr(ptr), flags, @as(c_ulong, expect), tm_size, @intFromPtr(tm_ptr)); + if (thread) |t| t.endSyscall(); + if (is_debug) switch (posix.errno(rc)) { + .SUCCESS => {}, + .FAULT => unreachable, // one of the args points to invalid memory + .INVAL => unreachable, // arguments should be correct + .TIMEDOUT => {}, // timeout + .INTR => {}, // spurious wake + else => unreachable, + }; + }, + else => @compileError("unimplemented: futexWait"), + } + } + + fn futexWake(ptr: *const u32, max_waiters: u32) void { + @branchHint(.cold); + + if (builtin.single_threaded) return; // nothing to wake up + + if (builtin.cpu.arch.isWasm()) { + comptime assert(builtin.cpu.has(.wasm, .atomics)); + assert(max_waiters != 0); + const woken_count = asm volatile ( + \\local.get %[ptr] + \\local.get %[waiters] + \\memory.atomic.notify 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (ptr), + [waiters] "r" (max_waiters), + ); + _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled + } else switch (native_os) { + .linux => { + const linux = std.os.linux; + switch (linux.errno(linux.futex_3arg( + ptr, + .{ .cmd = .WAKE, .private = true }, + @min(max_waiters, std.math.maxInt(i32)), + ))) { + .SUCCESS => return, // successful wake up + .INVAL => return, // invalid futex_wait() on ptr done elsewhere + .FAULT => return, // pointer became invalid while doing the wake + else => return recoverableOsBugDetected(), // deadlock due to operating system bug + } + }, + .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { + const c = std.c; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + .WAKE_ALL = max_waiters > 1, + }; + while (true) { + const status = c.__ulock_wake(flags, ptr, 0); + if (status >= 0) return; + switch (@as(c.E, @enumFromInt(-status))) { + .INTR, .CANCELED => continue, // spurious wake() + .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t + .NOENT => return, // nothing was woken up + .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD + else => unreachable, // deadlock due to operating system bug + } + } + }, + .windows => { + assert(max_waiters != 0); + switch (max_waiters) { + 1 => windows.ntdll.RtlWakeAddressSingle(ptr), + else => windows.ntdll.RtlWakeAddressAll(ptr), + } + }, + .freebsd => { + const rc = std.c._umtx_op( + @intFromPtr(ptr), + @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE), + @as(c_ulong, max_waiters), + 0, // there is no timeout struct + 0, // there is no timeout struct pointer + ); + switch (posix.errno(rc)) { + .SUCCESS => {}, + .FAULT => {}, // it's ok if the ptr doesn't point to valid memory + .INVAL => unreachable, // arguments should be correct + else => unreachable, // deadlock due to operating system bug + } + }, + else => @compileError("unimplemented: futexWake"), + } + } }; const max_iovecs_len = 8; @@ -298,6 +531,8 @@ pub fn init( .have_signal_handler = false, .main_thread = .{ .signal_id = Thread.currentSignalId(), + .current_closure = null, + .cancel_protection = undefined, }, }; @@ -332,7 +567,11 @@ pub const init_single_threaded: Threaded = .{ .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ .signal_id = undefined }, + .main_thread = .{ + .signal_id = undefined, + .current_closure = null, + .cancel_protection = undefined, + }, }; pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { @@ -367,6 +606,8 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { var thread: Thread = .{ .signal_id = Thread.currentSignalId(), + .current_closure = null, + .cancel_protection = undefined, }; Thread.current = &thread; @@ -403,13 +644,13 @@ pub fn io(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, - .mutexLock = mutexLock, - .mutexLockUncancelable = mutexLockUncancelable, - .mutexUnlock = mutexUnlock, + .recancel = recancel, + .swapCancelProtection = swapCancelProtection, + .checkCancel = checkCancel, - .conditionWait = conditionWait, - .conditionWaitUncancelable = conditionWaitUncancelable, - .conditionWake = conditionWake, + .futexWait = futexWait, + .futexWaitUncancelable = futexWaitUncancelable, + .futexWake = futexWake, .dirMake = dirMake, .dirMakePath = dirMakePath, @@ -499,13 +740,13 @@ pub fn ioBasic(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, - .mutexLock = mutexLock, - .mutexLockUncancelable = mutexLockUncancelable, - .mutexUnlock = mutexUnlock, + .recancel = recancel, + .swapCancelProtection = swapCancelProtection, + .checkCancel = checkCancel, - .conditionWait = conditionWait, - .conditionWaitUncancelable = conditionWaitUncancelable, - .conditionWake = conditionWake, + .futexWait = futexWait, + .futexWaitUncancelable = futexWaitUncancelable, + .futexWake = futexWake, .dirMake = dirMake, .dirMakePath = dirMakePath, @@ -577,26 +818,31 @@ const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system. const AsyncClosure = struct { closure: Closure, func: *const fn (context: *anyopaque, result: *anyopaque) void, - reset_event: ResetEvent, - select_condition: ?*ResetEvent, + event: Io.Event, + select_condition: ?*Io.Event, context_alignment: Alignment, result_offset: usize, alloc_len: usize, - const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); + const done_event: *Io.Event = @ptrFromInt(@alignOf(Io.Event)); fn start(closure: *Closure, t: *Threaded) void { const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); const current_thread = Thread.getCurrent(t); + current_thread.current_closure = closure; + current_thread.cancel_protection = .unblocked; + ac.func(ac.contextPointer(), ac.resultPointer()); + current_thread.current_closure = null; + current_thread.cancel_protection = undefined; - if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| { - assert(select_reset != done_reset_event); - select_reset.set(); + if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| { + assert(select_event != done_event); + select_event.set(ioBasic(t)); } - ac.reset_event.set(); + ac.event.set(ioBasic(t)); } fn resultPointer(ac: *AsyncClosure) [*]u8 { @@ -638,7 +884,7 @@ const AsyncClosure = struct { .context_alignment = context_alignment, .result_offset = actual_result_offset, .alloc_len = alloc_len, - .reset_event = .unset, + .event = .unset, .select_condition = null, }; @memcpy(ac.contextPointer()[0..context.len], context); @@ -646,10 +892,10 @@ const AsyncClosure = struct { } fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void { - ac.reset_event.wait(t) catch |err| switch (err) { + ac.event.wait(ioBasic(t)) catch |err| switch (err) { error.Canceled => { ac.closure.requestCancel(t); - ac.reset_event.waitUncancelable(); + ac.event.waitUncancelable(ioBasic(t)); }, }; @memcpy(result, ac.resultPointer()[0..result.len]); @@ -771,14 +1017,19 @@ const GroupClosure = struct { const current_thread = Thread.getCurrent(t); const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const reset_event: *ResetEvent = @ptrCast(&group.context); + const event: *Io.Event = @ptrCast(&group.context); + current_thread.current_closure = closure; + current_thread.cancel_protection = .unblocked; + gc.func(group, gc.contextPointer()); + current_thread.current_closure = null; + current_thread.cancel_protection = undefined; const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel); assert((prev_state / sync_one_pending) > 0); - if (prev_state == (sync_one_pending | sync_is_waiting)) reset_event.set(); + if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); } fn contextPointer(gc: *GroupClosure) [*]u8 { @@ -939,10 +1190,10 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { if (builtin.single_threaded) return; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const reset_event: *ResetEvent = @ptrCast(&group.context); + const event: *Io.Event = @ptrCast(&group.context); const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); assert(prev_state & GroupClosure.sync_is_waiting == 0); - if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.wait(t) catch |err| switch (err) { + if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) { error.Canceled => { var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { @@ -950,7 +1201,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { gc.closure.requestCancel(t); node = node.next orelse break; } - reset_event.waitUncancelable(); + event.waitUncancelable(ioBasic(t)); }, }; @@ -979,10 +1230,10 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void } const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const reset_event: *ResetEvent = @ptrCast(&group.context); + const event: *Io.Event = @ptrCast(&group.context); const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); assert(prev_state & GroupClosure.sync_is_waiting == 0); - if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.waitUncancelable(); + if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t)); { var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); @@ -995,6 +1246,32 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void } } +fn recancel(userdata: ?*anyopaque) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread: *Thread = .getCurrent(t); + const cancel_status = &current_thread.current_closure.?.cancel_status; + switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) { + .none => unreachable, // called `recancel` when not canceled + .requested => unreachable, // called `recancel` when cancelation was already outstanding + .acknowledged => {}, + _ => unreachable, // invalid state: not in a syscall + } + @atomicStore(CancelStatus, cancel_status, .requested, .monotonic); +} + +fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread: *Thread = .getCurrent(t); + const old = current_thread.cancel_protection; + current_thread.cancel_protection = new; + return old; +} + +fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + return Thread.getCurrent(t).checkCancel(); +} + fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, @@ -1020,187 +1297,35 @@ fn cancel( ac.waitAndDeinit(t, result); } -fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); +fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const current_thread = Thread.getCurrent(t); - if (prev_state == .contended) { - try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } - while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } -} - -fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - _ = userdata; - if (prev_state == .contended) { - futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } - while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } -} - -fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - _ = userdata; - _ = prev_state; - if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) { - futexWake(@ptrCast(&mutex.state), 1); - } -} - -fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Deadlock. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - const t: *Threaded = @ptrCast(@alignCast(userdata)); const t_io = ioBasic(t); - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - var epoch = cond_epoch.load(.acquire); - var state = cond_state.fetchAdd(one_waiter, .monotonic); - assert(state & waiter_mask != waiter_mask); - state += one_waiter; - - mutex.unlock(t_io); - defer mutex.lockUncancelable(t_io); - - while (true) { - futexWaitUncancelable(cond_epoch, epoch); - epoch = cond_epoch.load(.acquire); - state = cond_state.load(.monotonic); - while (state & signal_mask != 0) { - const new_state = state - one_waiter - one_signal; - state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; - } + const timeout_ns: ?u64 = ns: { + const d = (timeout.toDurationFromNow(t_io) catch break :ns 10) orelse break :ns null; + break :ns std.math.lossyCast(u64, d.raw.toNanoseconds()); + }; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => try current_thread.futexWaitTimed(ptr, expected, timeout_ns), } } -fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { - if (builtin.single_threaded) unreachable; // Deadlock. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); +fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); - const t_io = ioBasic(t); - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - // Observe the epoch, then check the state again to see if we should wake up. - // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: - // - // - T1: s = LOAD(&state) - // - T2: UPDATE(&s, signal) - // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) - // - T1: e = LOAD(&epoch) (was reordered after the state load) - // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) - // - // Acquire barrier to ensure the epoch load happens before the state load. - var epoch = cond_epoch.load(.acquire); - var state = cond_state.fetchAdd(one_waiter, .monotonic); - assert(state & waiter_mask != waiter_mask); - state += one_waiter; - - mutex.unlock(t_io); - defer mutex.lockUncancelable(t_io); - - while (true) { - try futexWait(current_thread, cond_epoch, epoch); - - epoch = cond_epoch.load(.acquire); - state = cond_state.load(.monotonic); - - // Try to wake up by consuming a signal and decremented the waiter we - // added previously. Acquire barrier ensures code before the wake() - // which added the signal happens before we decrement it and return. - while (state & signal_mask != 0) { - const new_state = state - one_waiter - one_signal; - state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; - } + _ = t; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => Thread.futexWaitUncancelable(ptr, expected), } } -fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { - if (builtin.single_threaded) unreachable; // Nothing to wake up. +fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - var state = cond_state.load(.monotonic); - while (true) { - const waiters = (state & waiter_mask) / one_waiter; - const signals = (state & signal_mask) / one_signal; - - // Reserves which waiters to wake up by incrementing the signals count. - // Therefore, the signals count is always less than or equal to the - // waiters count. We don't need to Futex.wake if there's nothing to - // wake up or if other wake() threads have reserved to wake up the - // current waiters. - const wakeable = waiters - signals; - if (wakeable == 0) { - return; - } - - const to_wake = switch (wake) { - .one => 1, - .all => wakeable, - }; - - // Reserve the amount of waiters to wake by incrementing the signals - // count. Release barrier ensures code before the wake() happens before - // the signal it posted and consumed by the wait() threads. - const new_state = state + (one_signal * to_wake); - state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { - // Wake up the waiting threads we reserved above by changing the epoch value. - // - // A waiting thread could miss a wake up if *exactly* ((1<<32)-1) - // wake()s happen between it observing the epoch and sleeping on - // it. This is very unlikely due to how many precise amount of - // Futex.wake() calls that would be between the waiting thread's - // potential preemption. - // - // Release barrier ensures the signal being added to the state - // happens before the epoch is changed. If not, the waiting thread - // could potentially deadlock from missing both the state and epoch - // change: - // - // - T2: UPDATE(&epoch, 1) (reordered before the state change) - // - T1: e = LOAD(&epoch) - // - T1: s = LOAD(&state) - // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) - // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) - _ = cond_epoch.fetchAdd(1, .release); - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - futexWake(cond_epoch, to_wake); - return; - }; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => Thread.futexWake(ptr, max_waiters), } } @@ -3630,28 +3755,28 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); - var reset_event: ResetEvent = .unset; + var event: Io.Event = .unset; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) { + if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, &event, .seq_cst) == AsyncClosure.done_event) { for (futures[0..i]) |cleanup_future| { const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future)); - if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { - cleanup_closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event. + if (@atomicRmw(?*Io.Event, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { + cleanup_closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. } } return i; } } - try reset_event.wait(t); + try event.wait(ioBasic(t)); var result: ?usize = null; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { - closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event. + if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { + closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. if (result == null) result = i; // In case multiple are ready, return first. } } @@ -5670,11 +5795,13 @@ fn netLookup( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) void { +) net.HostName.LookupError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); - const t_io = io(t); - resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, current_thread, host_name, resolved, options) }); + defer resolved.close(io(t)); + netLookupFallible(t, host_name, resolved, options) catch |err| switch (err) { + error.Closed => unreachable, // `resolved` must not be closed until `netLookup` returns + else => |e| return e, + }; } fn netLookupUnavailable( @@ -5682,22 +5809,23 @@ fn netLookupUnavailable( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) void { +) net.HostName.LookupError!void { _ = host_name; _ = options; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const t_io = ioBasic(t); - resolved.putOneUncancelable(t_io, .{ .end = error.NetworkDown }); + resolved.close(ioBasic(t)); + return error.NetworkDown; } fn netLookupFallible( t: *Threaded, - current_thread: *Thread, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) !void { +) (net.HostName.LookupError || Io.QueueClosedError)!void { if (!have_networking) return error.NetworkDown; + + const current_thread: *Thread = .getCurrent(t); const t_io = io(t); const name = host_name.bytes; assert(name.len <= HostName.max_len); @@ -6238,7 +6366,7 @@ fn lookupDnsSearch( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) HostName.LookupError!void { +) (HostName.LookupError || Io.QueueClosedError)!void { const t_io = io(t); const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed; @@ -6282,7 +6410,7 @@ fn lookupDns( rc: *const HostName.ResolvConf, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) HostName.LookupError!void { +) (HostName.LookupError || Io.QueueClosedError)!void { const t_io = io(t); const family_records: [2]struct { af: IpAddress.Family, rr: HostName.DnsRecord } = .{ .{ .af = .ip6, .rr = .A }, @@ -6496,8 +6624,10 @@ fn lookupHosts( return error.DetectingNetworkConfigurationFailed; }, }, - error.Canceled => |e| return e, - error.UnknownHostName => |e| return e, + error.Canceled, + error.Closed, + error.UnknownHostName, + => |e| return e, }; } @@ -6507,7 +6637,7 @@ fn lookupHostsReader( resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, reader: *Io.Reader, -) error{ ReadFailed, Canceled, UnknownHostName }!void { +) error{ ReadFailed, Canceled, UnknownHostName, Closed }!void { const t_io = io(t); var addresses_len: usize = 0; var canonical_name: ?HostName = null; @@ -6612,460 +6742,6 @@ fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) Hos /// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11; -fn futexWait(current_thread: *Thread, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - try current_thread.checkCancel(); - const timeout: i64 = -1; - const signed_expect: i32 = @bitCast(expect); - const result = asm volatile ( - \\local.get %[ptr] - \\local.get %[expected] - \\local.get %[timeout] - \\memory.atomic.wait32 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [expected] "r" (signed_expect), - [timeout] "r" (timeout), - ); - switch (result) { - 0 => {}, // ok - 1 => {}, // expected != loaded - 2 => assert(!is_debug), // timeout - else => assert(!is_debug), - } - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - try current_thread.beginSyscall(); - const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); - current_thread.endSyscall(); - switch (linux.errno(rc)) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // caller's responsibility to retry - .AGAIN => {}, // ptr.* != expect - .INVAL => {}, // possibly timeout overflow - .TIMEDOUT => recoverableOsBugDetected(), - .FAULT => recoverableOsBugDetected(), // ptr was invalid - else => recoverableOsBugDetected(), - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - }; - try current_thread.beginSyscall(); - const status = if (darwin_supports_ulock_wait2) - c.__ulock_wait2(flags, ptr, expect, 0, 0) - else - c.__ulock_wait(flags, ptr, expect, 0); - current_thread.endSyscall(); - - if (status >= 0) return; - - if (is_debug) switch (@as(c.E, @enumFromInt(-status))) { - .INTR => {}, // spurious wake - // Address of the futex was paged out. This is unlikely, but possible in theory, and - // pthread/libdispatch on darwin bother to handle it. In this case we'll return - // without waiting, but the caller should retry anyway. - .FAULT => {}, - .TIMEDOUT => unreachable, - else => unreachable, - }; - }, - .windows => { - try current_thread.checkCancel(); - switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) { - .SUCCESS => {}, - .CANCELLED => return error.Canceled, - else => recoverableOsBugDetected(), - } - }, - .freebsd => { - const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); - try current_thread.beginSyscall(); - const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0); - current_thread.endSyscall(); - if (is_debug) switch (posix.errno(rc)) { - .SUCCESS => {}, - .FAULT => unreachable, // one of the args points to invalid memory - .INVAL => unreachable, // arguments should be correct - .TIMEDOUT => unreachable, // no timeout provided - .INTR => {}, // spurious wake - else => unreachable, - }; - }, - else => @compileError("unimplemented: futexWait"), - } -} - -pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - const timeout: i64 = -1; - const signed_expect: i32 = @bitCast(expect); - const result = asm volatile ( - \\local.get %[ptr] - \\local.get %[expected] - \\local.get %[timeout] - \\memory.atomic.wait32 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [expected] "r" (signed_expect), - [timeout] "r" (timeout), - ); - switch (result) { - 0 => {}, // ok - 1 => {}, // expected != loaded - 2 => recoverableOsBugDetected(), // timeout - else => recoverableOsBugDetected(), - } - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); - switch (linux.errno(rc)) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // caller's responsibility to repeat - .AGAIN => {}, // ptr.* != expect - .INVAL => {}, // possibly timeout overflow - .TIMEDOUT => recoverableOsBugDetected(), - .FAULT => recoverableOsBugDetected(), // ptr was invalid - else => recoverableOsBugDetected(), - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - }; - const status = if (darwin_supports_ulock_wait2) - c.__ulock_wait2(flags, ptr, expect, 0, 0) - else - c.__ulock_wait(flags, ptr, expect, 0); - - if (status >= 0) return; - - switch (@as(c.E, @enumFromInt(-status))) { - // Wait was interrupted by the OS or other spurious signalling. - .INTR => {}, - // Address of the futex was paged out. This is unlikely, but possible in theory, and - // pthread/libdispatch on darwin bother to handle it. In this case we'll return - // without waiting, but the caller should retry anyway. - .FAULT => {}, - .TIMEDOUT => recoverableOsBugDetected(), - else => recoverableOsBugDetected(), - } - }, - .windows => { - switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) { - .SUCCESS, .CANCELLED => {}, - else => recoverableOsBugDetected(), - } - }, - .freebsd => { - const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); - const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0); - switch (posix.errno(rc)) { - .SUCCESS => {}, - .INTR => {}, // spurious wake - .FAULT => recoverableOsBugDetected(), // one of the args points to invalid memory - .INVAL => recoverableOsBugDetected(), // arguments should be correct - .TIMEDOUT => recoverableOsBugDetected(), // no timeout provided - else => recoverableOsBugDetected(), - } - }, - else => @compileError("unimplemented: futexWaitUncancelable"), - } -} - -pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - assert(max_waiters != 0); - const woken_count = asm volatile ( - \\local.get %[ptr] - \\local.get %[waiters] - \\memory.atomic.notify 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [waiters] "r" (max_waiters), - ); - _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - switch (linux.errno(linux.futex_3arg( - &ptr.raw, - .{ .cmd = .WAKE, .private = true }, - @min(max_waiters, std.math.maxInt(i32)), - ))) { - .SUCCESS => return, // successful wake up - .INVAL => return, // invalid futex_wait() on ptr done elsewhere - .FAULT => return, // pointer became invalid while doing the wake - else => return recoverableOsBugDetected(), // deadlock due to operating system bug - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - .WAKE_ALL = max_waiters > 1, - }; - while (true) { - const status = c.__ulock_wake(flags, ptr, 0); - if (status >= 0) return; - switch (@as(c.E, @enumFromInt(-status))) { - .INTR, .CANCELED => continue, // spurious wake() - .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t - .NOENT => return, // nothing was woken up - .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD - else => unreachable, // deadlock due to operating system bug - } - } - }, - .windows => { - assert(max_waiters != 0); - switch (max_waiters) { - 1 => windows.ntdll.RtlWakeAddressSingle(ptr), - else => windows.ntdll.RtlWakeAddressAll(ptr), - } - }, - .freebsd => { - const rc = std.c._umtx_op( - @intFromPtr(&ptr.raw), - @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE), - @as(c_ulong, max_waiters), - 0, // there is no timeout struct - 0, // there is no timeout struct pointer - ); - switch (posix.errno(rc)) { - .SUCCESS => {}, - .FAULT => {}, // it's ok if the ptr doesn't point to valid memory - .INVAL => unreachable, // arguments should be correct - else => unreachable, // deadlock due to operating system bug - } - }, - else => @compileError("unimplemented: futexWake"), - } -} - -/// A thread-safe logical boolean value which can be `set` and `unset`. -/// -/// It can also block threads until the value is set with cancelation via timed -/// waits. Statically initializable; four bytes on all targets. -pub const ResetEvent = switch (native_os) { - .illumos, .netbsd, .openbsd => ResetEventPosix, - else => ResetEventFutex, -}; - -/// A `ResetEvent` implementation based on futexes. -const ResetEventFutex = enum(u32) { - unset = 0, - waiting = 1, - is_set = 2, - - /// Returns whether the logical boolean is `set`. - /// - /// Once `reset` is called, this returns false until the next `set`. - /// - /// The memory accesses before the `set` can be said to happen before - /// `isSet` returns true. - pub fn isSet(ref: *const ResetEventFutex) bool { - if (builtin.single_threaded) return switch (ref.*) { - .unset => false, - .waiting => unreachable, - .is_set => true, - }; - // Acquire barrier ensures memory accesses before `set` happen before - // returning true. - return @atomicLoad(ResetEventFutex, ref, .acquire) == .is_set; - } - - /// Blocks the calling thread until `set` is called. - /// - /// This is effectively a more efficient version of `while (!isSet()) {}`. - /// - /// The memory accesses before the `set` can be said to happen before `wait` returns. - pub fn wait(ref: *ResetEventFutex, t: *Threaded) Io.Cancelable!void { - if (builtin.single_threaded) switch (ref.*) { - .unset => unreachable, // Deadlock, no other threads to wake us up. - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - // Try to set the state from `unset` to `waiting` to indicate to the - // `set` thread that others are blocked on the ResetEventFutex. Avoid using - // any strict barriers until we know the ResetEventFutex is set. - var state = @atomicLoad(ResetEventFutex, ref, .acquire); - if (state == .is_set) { - @branchHint(.likely); - return; - } - if (state == .unset) { - state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting; - } - const current_thread = Thread.getCurrent(t); - while (state == .waiting) { - try futexWait(current_thread, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); - state = @atomicLoad(ResetEventFutex, ref, .acquire); - } - assert(state == .is_set); - } - - /// Same as `wait` except uninterruptible. - pub fn waitUncancelable(ref: *ResetEventFutex) void { - if (builtin.single_threaded) switch (ref.*) { - .unset => unreachable, // Deadlock, no other threads to wake us up. - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - // Try to set the state from `unset` to `waiting` to indicate to the - // `set` thread that others are blocked on the ResetEventFutex. Avoid using - // any strict barriers until we know the ResetEventFutex is set. - var state = @atomicLoad(ResetEventFutex, ref, .acquire); - if (state == .is_set) { - @branchHint(.likely); - return; - } - if (state == .unset) { - state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting; - } - while (state == .waiting) { - futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); - state = @atomicLoad(ResetEventFutex, ref, .acquire); - } - assert(state == .is_set); - } - - /// Marks the logical boolean as `set` and unblocks any threads in `wait` - /// or `timedWait` to observe the new state. - /// - /// The logical boolean stays `set` until `reset` is called, making future - /// `set` calls do nothing semantically. - /// - /// The memory accesses before `set` can be said to happen before `isSet` - /// returns true or `wait`/`timedWait` return successfully. - pub fn set(ref: *ResetEventFutex) void { - if (builtin.single_threaded) { - ref.* = .is_set; - return; - } - if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) { - futexWake(@ptrCast(ref), std.math.maxInt(u32)); - } - } - - /// Unmarks the ResetEventFutex as if `set` was never called. - /// - /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent - /// calls to `set`, `isSet` and `reset` are allowed. - pub fn reset(ref: *ResetEventFutex) void { - if (builtin.single_threaded) { - ref.* = .unset; - return; - } - @atomicStore(ResetEventFutex, ref, .unset, .monotonic); - } -}; - -/// A `ResetEvent` implementation based on pthreads API. -const ResetEventPosix = struct { - cond: std.c.pthread_cond_t, - mutex: std.c.pthread_mutex_t, - state: ResetEventFutex, - - pub const unset: ResetEventPosix = .{ - .cond = std.c.PTHREAD_COND_INITIALIZER, - .mutex = std.c.PTHREAD_MUTEX_INITIALIZER, - .state = .unset, - }; - - pub fn isSet(rep: *const ResetEventPosix) bool { - if (builtin.single_threaded) return switch (rep.state) { - .unset => false, - .waiting => unreachable, - .is_set => true, - }; - return @atomicLoad(ResetEventFutex, &rep.state, .acquire) == .is_set; - } - - pub fn wait(rep: *ResetEventPosix, t: *Threaded) Io.Cancelable!void { - if (builtin.single_threaded) switch (rep.*) { - .unset => unreachable, // Deadlock, no other threads to wake us up. - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - const current_thread = Thread.getCurrent(t); - assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS); - sw: switch (rep.state) { - .unset => { - rep.state = .waiting; - continue :sw .waiting; - }, - .waiting => { - try current_thread.beginSyscall(); - assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS); - current_thread.endSyscall(); - continue :sw rep.state; - }, - .is_set => return, - } - } - - pub fn waitUncancelable(rep: *ResetEventPosix) void { - if (builtin.single_threaded) switch (rep.*) { - .unset => unreachable, // Deadlock, no other threads to wake us up. - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS); - sw: switch (rep.state) { - .unset => { - rep.state = .waiting; - continue :sw .waiting; - }, - .waiting => { - assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS); - continue :sw rep.state; - }, - .is_set => return, - } - } - - pub fn set(rep: *ResetEventPosix) void { - if (builtin.single_threaded) { - rep.* = .is_set; - return; - } - if (@atomicRmw(ResetEventFutex, &rep.state, .Xchg, .is_set, .release) == .waiting) { - assert(std.c.pthread_cond_broadcast(&rep.cond) == .SUCCESS); - } - } - - pub fn reset(rep: *ResetEventPosix) void { - if (builtin.single_threaded) { - rep.* = .unset; - return; - } - @atomicStore(ResetEventFutex, &rep.state, .unset, .monotonic); - } -}; - fn closeSocketWindows(s: ws2_32.SOCKET) void { const rc = ws2_32.closesocket(s); if (is_debug) switch (rc) { diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig @@ -82,19 +82,22 @@ pub const LookupError = error{ pub const LookupResult = union(enum) { address: IpAddress, canonical_name: HostName, - end: LookupError!void, }; -/// Adds any number of `IpAddress` into resolved, exactly one canonical_name, -/// and then always finishes by adding one `LookupResult.end` entry. +/// Adds any number of `LookupResult.address` into `resolved`, and exactly one +/// `LookupResult.canonical_name`. /// /// Guaranteed not to block if provided queue has capacity at least 16. +/// +/// Closes `resolved` before return, even on error. +/// +/// Asserts `resolved` is not closed until this call returns. pub fn lookup( host_name: HostName, io: Io, resolved: *Io.Queue(LookupResult), options: LookupOptions, -) void { +) LookupError!void { return io.vtable.netLookup(io.userdata, host_name, resolved, options); } @@ -211,23 +214,25 @@ pub fn connect( port: u16, options: IpAddress.ConnectOptions, ) ConnectError!Stream { - var connect_many_buffer: [32]ConnectManyResult = undefined; - var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer); + var connect_many_buffer: [32]IpAddress.ConnectError!Stream = undefined; + var connect_many_queue: Io.Queue(IpAddress.ConnectError!Stream) = .init(&connect_many_buffer); var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options }); - var saw_end = false; defer { - connect_many.cancel(io); - if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) { - .connection => |loser| if (loser) |s| s.close(io) else |_| continue, - .end => break, - }; + connect_many.cancel(io) catch {}; + while (connect_many_queue.getOneUncancelable(io)) |loser| { + if (loser) |s| s.close(io) else |_| {} + } else |err| switch (err) { + error.Closed => {}, + } } - var aggregate_error: ConnectError = error.UnknownHostName; + var ip_connect_error: ?IpAddress.ConnectError = null; - while (connect_many_queue.getOne(io)) |result| switch (result) { - .connection => |connection| if (connection) |stream| return stream else |err| switch (err) { + while (connect_many_queue.getOne(io)) |result| { + if (result) |stream| { + return stream; + } else |err| switch (err) { error.SystemResources, error.OptionUnsupported, error.ProcessFdQuotaExceeded, @@ -237,66 +242,80 @@ pub fn connect( error.WouldBlock => return error.Unexpected, - else => |e| aggregate_error = e, - }, - .end => |end| { - saw_end = true; - try end; - return aggregate_error; - }, + else => |e| ip_connect_error = e, + } } else |err| switch (err) { error.Canceled => |e| return e, + error.Closed => { + // There was no successful connection attempt. If there was a lookup error, return that. + try connect_many.await(io); + // Otherwise, return the error from a failed IP connection attempt. + return ip_connect_error orelse + return error.UnknownHostName; + }, } } -pub const ConnectManyResult = union(enum) { - connection: IpAddress.ConnectError!Stream, - end: ConnectError!void, -}; - /// Asynchronously establishes a connection to all IP addresses associated with /// a host name, adding them to a results queue upon completion. +/// +/// Closes `results` before return, even on error. +/// +/// Asserts `results` is not closed until this call returns. pub fn connectMany( host_name: HostName, io: Io, port: u16, - results: *Io.Queue(ConnectManyResult), + results: *Io.Queue(IpAddress.ConnectError!Stream), options: IpAddress.ConnectOptions, -) void { +) LookupError!void { + defer results.close(io); + var canonical_name_buffer: [max_len]u8 = undefined; var lookup_buffer: [32]HostName.LookupResult = undefined; var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer); - var group: Io.Group = .init; - defer group.cancel(io); - - group.async(io, lookup, .{ host_name, io, &lookup_queue, .{ + var lookup_future = io.async(lookup, .{ host_name, io, &lookup_queue, .{ .port = port, .canonical_name_buffer = &canonical_name_buffer, } }); + defer lookup_future.cancel(io) catch {}; + + var group: Io.Group = .init; + defer group.cancel(io); while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) { .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }), .canonical_name => continue, - .end => |lookup_result| { - group.wait(io); - results.putOneUncancelable(io, .{ .end = lookup_result }); - return; - }, } else |err| switch (err) { - error.Canceled => |e| { - group.cancel(io); - results.putOneUncancelable(io, .{ .end = e }); + error.Canceled => |e| return e, + error.Closed => { + group.wait(io); + return lookup_future.await(io); }, } } - fn enqueueConnection( address: IpAddress, io: Io, - queue: *Io.Queue(ConnectManyResult), + queue: *Io.Queue(IpAddress.ConnectError!Stream), options: IpAddress.ConnectOptions, ) void { - queue.putOneUncancelable(io, .{ .connection = address.connect(io, options) }); + enqueueConnectionFallible(address, io, queue, options) catch |err| switch (err) { + error.Canceled => {}, + }; +} +fn enqueueConnectionFallible( + address: IpAddress, + io: Io, + queue: *Io.Queue(IpAddress.ConnectError!Stream), + options: IpAddress.ConnectOptions, +) Io.Cancelable!void { + const result = address.connect(io, options); + errdefer if (result) |s| s.close(io) else |_| {}; + queue.putOne(io, result) catch |err| switch (err) { + error.Closed => unreachable, // `queue` must not be closed + error.Canceled => |e| return e, + }; } pub const ResolvConf = struct { diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig @@ -129,7 +129,7 @@ test "resolve DNS" { var results_buffer: [32]net.HostName.LookupResult = undefined; var results: Io.Queue(net.HostName.LookupResult) = .init(&results_buffer); - net.HostName.lookup(try .init("localhost"), io, &results, .{ + try net.HostName.lookup(try .init("localhost"), io, &results, .{ .port = 80, .canonical_name_buffer = &canonical_name_buffer, }); @@ -142,11 +142,10 @@ test "resolve DNS" { addresses_found += 1; }, .canonical_name => |canonical_name| try testing.expectEqualStrings("localhost", canonical_name.bytes), - .end => |end| { - try end; - break; - }, - } else |err| return err; + } else |err| switch (err) { + error.Closed => {}, + error.Canceled => |e| return e, + } try testing.expect(addresses_found != 0); } @@ -161,20 +160,19 @@ test "resolve DNS" { net.HostName.lookup(try .init("example.com"), io, &results, .{ .port = 80, .canonical_name_buffer = &canonical_name_buffer, - }); + }) catch |err| switch (err) { + error.UnknownHostName => return error.SkipZigTest, + error.NameServerFailure => return error.SkipZigTest, + else => |e| return e, + }; while (results.getOne(io)) |result| switch (result) { .address => {}, .canonical_name => {}, - .end => |end| { - end catch |err| switch (err) { - error.UnknownHostName => return error.SkipZigTest, - error.NameServerFailure => return error.SkipZigTest, - else => return err, - }; - break; - }, - } else |err| return err; + } else |err| switch (err) { + error.Closed => {}, + error.Canceled => |e| return e, + } } } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig @@ -209,10 +209,10 @@ test "select" { return; }, }; - defer if (get_a.cancel(io)) |_| {} else |_| @panic("fail"); + defer _ = get_a.cancel(io) catch {}; var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }); - defer if (get_b.cancel(io)) |_| {} else |_| @panic("fail"); + defer _ = get_b.cancel(io) catch {}; var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake }); defer timeout.cancel(io) catch {}; @@ -225,12 +225,9 @@ test "select" { .get_a => return error.TestFailure, .get_b => return error.TestFailure, .timeout => { - // Unblock the queues to avoid making this unit test depend on - // cancellation. - queue.putOneUncancelable(io, 1); - queue.putOneUncancelable(io, 1); - try testing.expectEqual(1, try get_a.await(io)); - try testing.expectEqual(1, try get_b.await(io)); + queue.close(io); + try testing.expectError(error.Closed, get_a.await(io)); + try testing.expectError(error.Closed, get_b.await(io)); }, } } @@ -255,3 +252,162 @@ test "Queue" { try testQueue(4); try testQueue(5); } + +test "Queue.close single-threaded" { + const io = std.testing.io; + + var buf: [10]u8 = undefined; + var queue: Io.Queue(u8) = .init(&buf); + + try queue.putAll(io, &.{ 0, 1, 2, 3, 4, 5, 6 }); + try expectEqual(3, try queue.put(io, &.{ 7, 8, 9, 10 }, 0)); // there is capacity for 3 more items + + var get_buf: [4]u8 = undefined; + + // Receive some elements before closing + try expectEqual(4, try queue.get(io, &get_buf, 0)); + try expectEqual(0, get_buf[0]); + try expectEqual(1, get_buf[1]); + try expectEqual(2, get_buf[2]); + try expectEqual(3, get_buf[3]); + try expectEqual(4, try queue.getOne(io)); + + // ...and add a couple more now there's space + try queue.putAll(io, &.{ 20, 21 }); + + queue.close(io); + + // Receive more elements *after* closing + try expectEqual(4, try queue.get(io, &get_buf, 0)); + try expectEqual(5, get_buf[0]); + try expectEqual(6, get_buf[1]); + try expectEqual(7, get_buf[2]); + try expectEqual(8, get_buf[3]); + try expectEqual(9, try queue.getOne(io)); + + // Cannot put anything while closed, even if the buffer has space + try expectError(error.Closed, queue.putOne(io, 100)); + try expectError(error.Closed, queue.putAll(io, &.{ 101, 102 })); + try expectError(error.Closed, queue.putUncancelable(io, &.{ 103, 104 }, 0)); + + // Even if we ask for 3 items, the queue is closed, so we only get the last 2 + try expectEqual(2, try queue.get(io, &get_buf, 4)); + try expectEqual(20, get_buf[0]); + try expectEqual(21, get_buf[1]); + + // The queue is now empty, so `get` should return `error.Closed` too + try expectError(error.Closed, queue.getOne(io)); + try expectError(error.Closed, queue.get(io, &get_buf, 0)); + try expectError(error.Closed, queue.putUncancelable(io, &get_buf, 2)); +} + +test "Event" { + const global = struct { + fn waitAndRead(io: Io, event: *Io.Event, ptr: *const u32) Io.Cancelable!u32 { + try event.wait(io); + return ptr.*; + } + }; + + const io = std.testing.io; + + var event: Io.Event = .unset; + var buffer: u32 = undefined; + + { + var future = io.concurrent(global.waitAndRead, .{ io, &event, &buffer }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + + buffer = 123; + event.set(io); + + const result = try future.await(io); + + try std.testing.expectEqual(123, result); + } + + event.reset(); + + { + var future = io.concurrent(global.waitAndRead, .{ io, &event, &buffer }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + try std.testing.expectError(error.Canceled, future.cancel(io)); + } +} + +test "recancel" { + const global = struct { + fn worker(io: Io) Io.Cancelable!void { + var dummy_event: Io.Event = .unset; + + if (dummy_event.wait(io)) { + return; + } else |err| switch (err) { + error.Canceled => io.recancel(), + } + + // Now we expect to see `error.Canceled` again. + return dummy_event.wait(io); + } + }; + + const io = std.testing.io; + var future = io.concurrent(global.worker, .{io}) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + if (future.cancel(io)) { + return error.UnexpectedSuccess; // both `wait` calls should have returned `error.Canceled` + } else |err| switch (err) { + error.Canceled => {}, + } +} + +test "swapCancelProtection" { + const global = struct { + fn waitTwice( + io: Io, + event: *Io.Event, + ) error{ Canceled, CanceledWhileProtected }!void { + // Wait for `event` while protected from cancelation. + { + const old_prot = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(old_prot); + event.wait(io) catch |err| switch (err) { + error.Canceled => return error.CanceledWhileProtected, + }; + } + // Reset the event (it will never be set again), and this time wait for it without protection. + event.reset(); + _ = try event.wait(io); + } + fn sleepThenSet(io: Io, event: *Io.Event) !void { + // Give `waitTwice` a chance to get canceled. + try io.sleep(.fromMilliseconds(200), .awake); + event.set(io); + } + }; + + const io = std.testing.io; + + var event: Io.Event = .unset; + + var wait_future = io.concurrent(global.waitTwice, .{ io, &event }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + defer wait_future.cancel(io) catch {}; + + var set_future = try io.concurrent(global.sleepThenSet, .{ io, &event }); + defer set_future.cancel(io) catch {}; + + if (wait_future.cancel(io)) { + return error.UnexpectedSuccess; // there was no `set` call to unblock the second `wait` + } else |err| switch (err) { + error.Canceled => {}, + error.CanceledWhileProtected => |e| return e, + } + + // Because it reached the `set`, it should be too late for `sleepThenSet` to see `error.Canceled`. + try set_future.cancel(io); +} diff --git a/src/Sema.zig b/src/Sema.zig @@ -29023,33 +29023,6 @@ fn coerceExtra( else => {}, }, .error_union => switch (inst_ty.zigTypeTag(zcu)) { - .error_union => eu: { - if (maybe_inst_val) |inst_val| { - switch (inst_val.toIntern()) { - .undef => return pt.undefRef(dest_ty), - else => switch (zcu.intern_pool.indexToKey(inst_val.toIntern())) { - .error_union => |error_union| switch (error_union.val) { - .err_name => |err_name| { - const error_set_ty = inst_ty.errorUnionSet(zcu); - const error_set_val = Air.internedToRef((try pt.intern(.{ .err = .{ - .ty = error_set_ty.toIntern(), - .name = err_name, - } }))); - return sema.wrapErrorUnionSet(block, dest_ty, error_set_val, inst_src); - }, - .payload => |payload| { - const payload_val = Air.internedToRef(payload); - return sema.wrapErrorUnionPayload(block, dest_ty, payload_val, inst_src) catch |err| switch (err) { - error.NotCoercible => break :eu, - else => |e| return e, - }; - }, - }, - else => unreachable, - }, - } - } - }, .error_set => { // E to E!T return sema.wrapErrorUnionSet(block, dest_ty, inst, inst_src); diff --git a/src/codegen/x86_64/CodeGen.zig b/src/codegen/x86_64/CodeGen.zig @@ -171444,12 +171444,14 @@ fn genBody(cg: *CodeGen, body: []const Air.Inst.Index) InnerError!void { const elem_dies = bt.feed(); if (tuple_type.values.get(ip)[field_index] != .none) continue; const field_type = Type.fromInterned(tuple_type.types.get(ip)[field_index]); - elem_disp = @intCast(field_type.abiAlignment(zcu).forward(elem_disp)); - var elem = try cg.tempFromOperand(elem_ref, elem_dies); - try res.write(&elem, .{ .disp = elem_disp }, cg); - try elem.die(cg); - try cg.resetTemps(reset_index); - elem_disp += @intCast(field_type.abiSize(zcu)); + if (!hack_around_sema_opv_bugs or field_type.hasRuntimeBitsIgnoreComptime(zcu)) { + elem_disp = @intCast(field_type.abiAlignment(zcu).forward(elem_disp)); + var elem = try cg.tempFromOperand(elem_ref, elem_dies); + try res.write(&elem, .{ .disp = elem_disp }, cg); + try elem.die(cg); + try cg.resetTemps(reset_index); + elem_disp += @intCast(field_type.abiSize(zcu)); + } } }, else => return cg.fail("failed to select {s} {f}", .{