zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit b4ee54b5804b8afb0387a36c913a2cd8cf595409 (tree)
parent 6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2
Author: Matthew Lugg <mlugg@mlugg.co.uk>
Date:   Mon, 15 Dec 2025 17:07:50 +0000

std.Io: reimplement Mutex and Condition atop futex

This work was partially cherry-picked from Andrew's WIP std.fs branch.
However, I also analyzed and simplified the Mutex and Condition
implementations, and brought them in line with modern Zig style.

Co-authored-by: Andrew Kelley <andrew@ziglang.org>

Diffstat:
Mlib/std/Build/Fuzz.zig | 2+-
Mlib/std/Build/WebServer.zig | 4++--
Mlib/std/Io.zig | 208++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------
Mlib/std/Io/Threaded.zig | 200-------------------------------------------------------------------------------
4 files changed, 145 insertions(+), 269 deletions(-)

diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig @@ -124,7 +124,7 @@ pub fn init( .coverage_files = .empty, .coverage_mutex = .init, .queue_mutex = .init, - .queue_cond = .{}, + .queue_cond = .init, .msg_queue = .empty, }; } diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig @@ -122,8 +122,8 @@ pub fn init(opts: Options) WebServer { .update_id = .init(0), .runner_request_mutex = .init, - .runner_request_ready_cond = .{}, - .runner_request_empty_cond = .{}, + .runner_request_ready_cond = .init, + .runner_request_empty_cond = .init, .runner_request = null, }; } diff --git a/lib/std/Io.zig b/lib/std/Io.zig @@ -658,14 +658,6 @@ pub const VTable = struct { futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, - mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void, - mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, - mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, - - conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void, - conditionWaitUncancelable: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) void, - conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void, - dirMake: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void, dirMakePath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void, dirMakeOpenPath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.OpenOptions) Dir.MakeOpenPathError!Dir, @@ -1215,99 +1207,183 @@ pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, m } pub const Mutex = struct { - state: State, + state: std.atomic.Value(State), - pub const State = enum(usize) { - locked_once = 0b00, - unlocked = 0b01, - contended = 0b10, - /// contended - _, + pub const init: Mutex = .{ .state = .init(.unlocked) }; - pub fn isUnlocked(state: State) bool { - return @intFromEnum(state) & @intFromEnum(State.unlocked) == @intFromEnum(State.unlocked); - } + const State = enum(u32) { + unlocked, + locked_once, + contended, }; - pub const init: Mutex = .{ .state = .unlocked }; - - pub fn tryLock(mutex: *Mutex) bool { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + pub fn tryLock(m: *Mutex) bool { + switch (m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - return prev_state.isUnlocked(); + .monotonic, + ) orelse return true) { + .unlocked => unreachable, + .locked_once, .contended => return false, + } } - pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + pub fn lock(m: *Mutex, io: Io) Cancelable!void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - if (prev_state.isUnlocked()) { + .monotonic, + ) orelse { @branchHint(.likely); return; + }; + if (initial_state == .contended) { + try io.futexWait(State, &m.state.raw, .contended); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + try io.futexWait(State, &m.state.raw, .contended); } - return io.vtable.mutexLock(io.userdata, prev_state, mutex); } - /// Same as `lock` but cannot be canceled. - pub fn lockUncancelable(mutex: *Mutex, io: std.Io) void { - const prev_state: State = @enumFromInt(@atomicRmw( - usize, - @as(*usize, @ptrCast(&mutex.state)), - .And, - ~@intFromEnum(State.unlocked), + pub fn lockUncancelable(m: *Mutex, io: Io) void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, .acquire, - )); - if (prev_state.isUnlocked()) { + .monotonic, + ) orelse { @branchHint(.likely); return; + }; + if (initial_state == .contended) { + io.futexWaitUncancelable(State, &m.state.raw, .contended); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + io.futexWaitUncancelable(State, &m.state.raw, .contended); } - return io.vtable.mutexLockUncancelable(io.userdata, prev_state, mutex); } - pub fn unlock(mutex: *Mutex, io: std.Io) void { - const prev_state = @cmpxchgWeak(State, &mutex.state, .locked_once, .unlocked, .release, .acquire) orelse { - @branchHint(.likely); - return; - }; - assert(prev_state != .unlocked); // mutex not locked - return io.vtable.mutexUnlock(io.userdata, prev_state, mutex); + pub fn unlock(m: *Mutex, io: Io) void { + switch (m.state.swap(.unlocked, .release)) { + .unlocked => unreachable, + .locked_once => {}, + .contended => { + @branchHint(.unlikely); + io.futexWake(State, &m.state.raw, 1); + }, + } } }; pub const Condition = struct { - state: u64 = 0, + state: std.atomic.Value(State), + /// Incremented whenever the condition is signaled + epoch: std.atomic.Value(u32), + + const State = packed struct(u32) { + waiters: u16, + signals: u16, + }; + + pub const init: Condition = .{ + .state = .init(.{ .waiters = 0, .signals = 0 }), + .epoch = .init(0), + }; pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { - return io.vtable.conditionWait(io.userdata, cond, mutex); + try waitInner(cond, io, mutex, false); } pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { - return io.vtable.conditionWaitUncancelable(io.userdata, cond, mutex); + waitInner(cond, io, mutex, true) catch |err| switch (err) { + error.Canceled => unreachable, + }; + } + + fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void { + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load + + { + const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters + } + + mutex.unlock(io); + defer mutex.lockUncancelable(io); + + while (true) { + const result = if (uncancelable) + io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch) + else + io.futexWait(u32, &cond.epoch.raw, epoch); + + epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod + + // Even on error, try to consume a pending signal first. Otherwise a race might + // cause a signal to get stuck in the state with no corresponding waiter. + { + var prev_state = cond.state.load(.monotonic); + while (prev_state.signals > 0) { + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters - 1, + .signals = prev_state.signals - 1, + }, .acquire, .monotonic) orelse { + // We successfully consumed a signal. + return; + }; + } + } + + // There are no more signals available; this was a spurious wakeup or an error. If it + // was an error, we will remove ourselves as a waiter and return that error. Otherwise, + // we'll loop back to the futex wait. + result catch |err| { + const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters > 0); // underflow caused by illegal state + return err; + }; + } } pub fn signal(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .one); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.signals + 1, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + io.futexWake(u32, &cond.epoch.raw, 1); + return; + }; + } } pub fn broadcast(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .all); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.waiters, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + io.futexWake(u32, &cond.epoch.raw, prev_state.waiters - prev_state.signals); + return; + }; + } } - - pub const Wake = enum { - /// Wake up only one thread. - one, - /// Wake up all threads. - all, - }; }; pub const TypeErasedQueue = struct { diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -621,14 +621,6 @@ pub fn io(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .mutexLock = mutexLock, - .mutexLockUncancelable = mutexLockUncancelable, - .mutexUnlock = mutexUnlock, - - .conditionWait = conditionWait, - .conditionWaitUncancelable = conditionWaitUncancelable, - .conditionWake = conditionWake, - .dirMake = dirMake, .dirMakePath = dirMakePath, .dirMakeOpenPath = dirMakeOpenPath, @@ -721,14 +713,6 @@ pub fn ioBasic(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .mutexLock = mutexLock, - .mutexLockUncancelable = mutexLockUncancelable, - .mutexUnlock = mutexUnlock, - - .conditionWait = conditionWait, - .conditionWaitUncancelable = conditionWaitUncancelable, - .conditionWake = conditionWake, - .dirMake = dirMake, .dirMakePath = dirMakePath, .dirMakeOpenPath = dirMakeOpenPath, @@ -1274,190 +1258,6 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { } } -fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); - if (prev_state == .contended) { - try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } - while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } -} - -fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - _ = userdata; - if (prev_state == .contended) { - Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } - while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); - } -} - -fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Interface should have prevented this. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - _ = userdata; - _ = prev_state; - if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) { - Thread.futexWake(@ptrCast(&mutex.state), 1); - } -} - -fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { - if (builtin.single_threaded) unreachable; // Deadlock. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const t_io = ioBasic(t); - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - var epoch = cond_epoch.load(.acquire); - var state = cond_state.fetchAdd(one_waiter, .monotonic); - assert(state & waiter_mask != waiter_mask); - state += one_waiter; - - mutex.unlock(t_io); - defer mutex.lockUncancelable(t_io); - - while (true) { - Thread.futexWaitUncancelable(@ptrCast(cond_epoch), epoch); - epoch = cond_epoch.load(.acquire); - state = cond_state.load(.monotonic); - while (state & signal_mask != 0) { - const new_state = state - one_waiter - one_signal; - state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; - } - } -} - -fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { - if (builtin.single_threaded) unreachable; // Deadlock. - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); - const t_io = ioBasic(t); - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - // Observe the epoch, then check the state again to see if we should wake up. - // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: - // - // - T1: s = LOAD(&state) - // - T2: UPDATE(&s, signal) - // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) - // - T1: e = LOAD(&epoch) (was reordered after the state load) - // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) - // - // Acquire barrier to ensure the epoch load happens before the state load. - var epoch = cond_epoch.load(.acquire); - var state = cond_state.fetchAdd(one_waiter, .monotonic); - assert(state & waiter_mask != waiter_mask); - state += one_waiter; - - mutex.unlock(t_io); - defer mutex.lockUncancelable(t_io); - - while (true) { - try current_thread.futexWait(@ptrCast(cond_epoch), epoch); - - epoch = cond_epoch.load(.acquire); - state = cond_state.load(.monotonic); - - // Try to wake up by consuming a signal and decremented the waiter we - // added previously. Acquire barrier ensures code before the wake() - // which added the signal happens before we decrement it and return. - while (state & signal_mask != 0) { - const new_state = state - one_waiter - one_signal; - state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; - } - } -} - -fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { - if (builtin.single_threaded) unreachable; // Nothing to wake up. - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - comptime assert(@TypeOf(cond.state) == u64); - const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); - const cond_state = &ints[0]; - const cond_epoch = &ints[1]; - const one_waiter = 1; - const waiter_mask = 0xffff; - const one_signal = 1 << 16; - const signal_mask = 0xffff << 16; - var state = cond_state.load(.monotonic); - while (true) { - const waiters = (state & waiter_mask) / one_waiter; - const signals = (state & signal_mask) / one_signal; - - // Reserves which waiters to wake up by incrementing the signals count. - // Therefore, the signals count is always less than or equal to the - // waiters count. We don't need to Futex.wake if there's nothing to - // wake up or if other wake() threads have reserved to wake up the - // current waiters. - const wakeable = waiters - signals; - if (wakeable == 0) { - return; - } - - const to_wake = switch (wake) { - .one => 1, - .all => wakeable, - }; - - // Reserve the amount of waiters to wake by incrementing the signals - // count. Release barrier ensures code before the wake() happens before - // the signal it posted and consumed by the wait() threads. - const new_state = state + (one_signal * to_wake); - state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { - // Wake up the waiting threads we reserved above by changing the epoch value. - // - // A waiting thread could miss a wake up if *exactly* ((1<<32)-1) - // wake()s happen between it observing the epoch and sleeping on - // it. This is very unlikely due to how many precise amount of - // Futex.wake() calls that would be between the waiting thread's - // potential preemption. - // - // Release barrier ensures the signal being added to the state - // happens before the epoch is changed. If not, the waiting thread - // could potentially deadlock from missing both the state and epoch - // change: - // - // - T2: UPDATE(&epoch, 1) (reordered before the state change) - // - T1: e = LOAD(&epoch) - // - T1: s = LOAD(&state) - // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) - // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) - _ = cond_epoch.fetchAdd(1, .release); - if (native_os == .netbsd) @panic("TODO"); - if (native_os == .openbsd) @panic("TODO"); - Thread.futexWake(@ptrCast(cond_epoch), to_wake); - return; - }; - } -} - const dirMake = switch (native_os) { .windows => dirMakeWindows, .wasi => dirMakeWasi,