zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 88110139fefb844ff774ba71614139286cf8da99 (tree)
parent 4025af9c05f7392acb96b6085c541c6887e8b8fe
Author: Andrew Kelley <andrew@ziglang.org>
Date:   Fri, 19 Dec 2025 15:57:53 -0800

std.Io.Threaded: fix NetBSD compilation

Diffstat:
Mlib/std/Io/Threaded.zig | 467+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 425 insertions(+), 42 deletions(-)

diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -360,7 +360,11 @@ const Thread = struct { else => unreachable, }; }, - else => @compileError("unimplemented: futexWait"), + else => if (std.Thread.use_pthreads) { + return pthreads_futex.wait(ptr, expect, timeout_ns); + } else { + @compileError("unimplemented: futexWait"); + }, } } @@ -436,7 +440,11 @@ const Thread = struct { else => unreachable, // deadlock due to operating system bug } }, - else => @compileError("unimplemented: futexWake"), + else => if (std.Thread.use_pthreads) { + return pthreads_futex.wake(ptr, max_waiters); + } else { + @compileError("unimplemented: futexWake"); + }, } } }; @@ -4025,9 +4033,7 @@ fn fileRealPathPosix(userdata: ?*anyopaque, file: File, out_buffer: []u8) File.R fn realPathPosix(current_thread: *Thread, fd: posix.fd_t, out_buffer: []u8) File.RealPathError!usize { switch (native_os) { - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - // On macOS, we can use F.GETPATH fcntl command to query the OS for - // the path to the file descriptor. + .netbsd, .dragonfly, .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { var sufficient_buffer: [posix.PATH_MAX]u8 = undefined; @memset(&sufficient_buffer, 0); try current_thread.beginSyscall(); @@ -4045,9 +4051,12 @@ fn realPathPosix(current_thread: *Thread, fd: posix.fd_t, out_buffer: []u8) File else => |e| { current_thread.endSyscall(); switch (e) { + .ACCES => return error.AccessDenied, .BADF => return error.FileNotFound, - .NOSPC => return error.NameTooLong, .NOENT => return error.FileNotFound, + .NOMEM => return error.SystemResources, + .NOSPC => return error.NameTooLong, + .RANGE => return error.NameTooLong, else => |err| return posix.unexpectedErrno(err), } }, @@ -4095,11 +4104,11 @@ fn realPathPosix(current_thread: *Thread, fd: posix.fd_t, out_buffer: []u8) File } }, .freebsd => { - var kfile: std.c.kinfo_file = undefined; - kfile.structsize = std.c.KINFO_FILE_SIZE; + var k_file: std.c.kinfo_file = undefined; + k_file.structsize = std.c.KINFO_FILE_SIZE; try current_thread.beginSyscall(); while (true) { - switch (posix.errno(std.c.fcntl(fd, std.c.F.KINFO, @intFromPtr(&kfile)))) { + switch (posix.errno(std.c.fcntl(fd, std.c.F.KINFO, @intFromPtr(&k_file)))) { .SUCCESS => { current_thread.endSyscall(); break; @@ -4118,39 +4127,10 @@ fn realPathPosix(current_thread: *Thread, fd: posix.fd_t, out_buffer: []u8) File }, } } - const len = std.mem.indexOfScalar(u8, &kfile.path, 0) orelse kfile.path.len; + const len = std.mem.indexOfScalar(u8, &k_file.path, 0) orelse k_file.path.len; if (len == 0) return error.NameTooLong; return len; }, - .netbsd, .dragonfly => { - @memset(out_buffer[0..Dir.max_path_bytes], 0); - try current_thread.beginSyscall(); - while (true) { - switch (posix.errno(std.c.fcntl(fd, posix.F.GETPATH, out_buffer))) { - .SUCCESS => { - current_thread.endSyscall(); - break; - }, - .INTR => { - try current_thread.checkCancel(); - continue; - }, - .CANCELED => return current_thread.endSyscallCanceled(), - else => |e| { - current_thread.endSyscall(); - switch (e) { - .ACCES => return error.AccessDenied, - .BADF => return error.FileNotFound, - .NOENT => return error.FileNotFound, - .NOMEM => return error.SystemResources, - .RANGE => return error.NameTooLong, - else => |err| return posix.unexpectedErrno(err), - } - }, - } - } - return std.mem.indexOfScalar(u8, &out_buffer, 0) orelse out_buffer.len; - }, else => return error.OperationUnsupported, } comptime unreachable; @@ -7049,11 +7029,32 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) std.process.Ex }, .netbsd => { const current_thread = Thread.getCurrent(t); - try current_thread.checkCancel(); var mib = [4]c_int{ posix.CTL.KERN, posix.KERN.PROC_ARGS, -1, posix.KERN.PROC_PATHNAME }; var out_len: usize = out_buffer.len; - try posix.sysctl(&mib, out_buffer.ptr, &out_len, null, 0); - return out_len; + try current_thread.beginSyscall(); + while (true) { + switch (posix.errno(posix.system.sysctl(&mib, mib.len, out_buffer.ptr, &out_len, null, 0))) { + .SUCCESS => { + current_thread.endSyscall(); + return out_len; + }, + .INTR => { + try current_thread.checkCancel(); + continue; + }, + .CANCELED => return current_thread.endSyscallCanceled(), + else => |e| { + current_thread.endSyscall(); + switch (e) { + .FAULT => |err| return errnoBug(err), + .PERM => return error.PermissionDenied, + .NOMEM => return error.SystemResources, + .NOENT => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + }, + } + } }, .openbsd, .haiku => { // OpenBSD doesn't support getting the path of a running process, so try to guess it @@ -11320,6 +11321,388 @@ fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void { fn doNothingSignalHandler(_: posix.SIG) callconv(.c) void {} +const pthreads_futex = struct { + const c = std.c; + const atomic = std.atomic; + + const Event = struct { + cond: c.pthread_cond_t, + mutex: c.pthread_mutex_t, + state: enum { empty, waiting, notified }, + + fn init(self: *Event) void { + // Use static init instead of pthread_cond/mutex_init() since this is generally faster. + self.cond = .{}; + self.mutex = .{}; + self.state = .empty; + } + + fn deinit(self: *Event) void { + // Some platforms reportedly give EINVAL for statically initialized pthread types. + const rc = c.pthread_cond_destroy(&self.cond); + assert(rc == .SUCCESS or rc == .INVAL); + + const rm = c.pthread_mutex_destroy(&self.mutex); + assert(rm == .SUCCESS or rm == .INVAL); + + self.* = undefined; + } + + fn wait(self: *Event, timeout: ?u64) error{Timeout}!void { + assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); + + // Early return if the event was already set. + if (self.state == .notified) { + return; + } + + // Compute the absolute timeout if one was specified. + // POSIX requires that REALTIME is used by default for the pthread timedwait functions. + // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere. + var ts: c.timespec = undefined; + if (timeout) |timeout_ns| { + ts = std.posix.clock_gettime(c.CLOCK.REALTIME) catch unreachable; + ts.sec +|= @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s)); + ts.nsec += @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s)); + + if (ts.nsec >= std.time.ns_per_s) { + ts.sec +|= 1; + ts.nsec -= std.time.ns_per_s; + } + } + + // Start waiting on the event - there can be only one thread waiting. + assert(self.state == .empty); + self.state = .waiting; + + while (true) { + // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout. + const rc = blk: { + if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex); + break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); + }; + + // After waking up, check if the event was set. + if (self.state == .notified) { + return; + } + + assert(self.state == .waiting); + switch (rc) { + .SUCCESS => {}, + .TIMEDOUT => { + // If timed out, reset the event to avoid the set() thread doing an unnecessary signal(). + self.state = .empty; + return error.Timeout; + }, + .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid + .PERM => unreachable, // mutex is locked when cond_*wait() functions are called + else => unreachable, + } + } + } + + fn set(self: *Event) void { + assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); + + // Make sure that multiple calls to set() were not done on the same Event. + const old_state = self.state; + assert(old_state != .notified); + + // Mark the event as set and wake up the waiting thread if there was one. + // This must be done while the mutex as the wait() thread could deallocate + // the condition variable once it observes the new state, potentially causing a UAF if done unlocked. + self.state = .notified; + if (old_state == .waiting) { + assert(c.pthread_cond_signal(&self.cond) == .SUCCESS); + } + } + }; + + const Treap = std.Treap(usize, std.math.order); + const Waiter = struct { + node: Treap.Node, + prev: ?*Waiter, + next: ?*Waiter, + tail: ?*Waiter, + is_queued: bool, + event: Event, + }; + + // An unordered set of Waiters + const WaitList = struct { + top: ?*Waiter = null, + len: usize = 0, + + fn push(self: *WaitList, waiter: *Waiter) void { + waiter.next = self.top; + self.top = waiter; + self.len += 1; + } + + fn pop(self: *WaitList) ?*Waiter { + const waiter = self.top orelse return null; + self.top = waiter.next; + self.len -= 1; + return waiter; + } + }; + + const WaitQueue = struct { + fn insert(treap: *Treap, address: usize, waiter: *Waiter) void { + // prepare the waiter to be inserted. + waiter.next = null; + waiter.is_queued = true; + + // Find the wait queue entry associated with the address. + // If there isn't a wait queue on the address, this waiter creates the queue. + var entry = treap.getEntryFor(address); + const entry_node = entry.node orelse { + waiter.prev = null; + waiter.tail = waiter; + entry.set(&waiter.node); + return; + }; + + // There's a wait queue on the address; get the queue head and tail. + const head: *Waiter = @fieldParentPtr("node", entry_node); + const tail = head.tail orelse unreachable; + + // Push the waiter to the tail by replacing it and linking to the previous tail. + head.tail = waiter; + tail.next = waiter; + waiter.prev = tail; + } + + fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList { + // Find the wait queue associated with this address and get the head/tail if any. + var entry = treap.getEntryFor(address); + var queue_head: ?*Waiter = if (entry.node) |node| @fieldParentPtr("node", node) else null; + const queue_tail = if (queue_head) |head| head.tail else null; + + // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well. + defer entry.set(blk: { + const new_head = queue_head orelse break :blk null; + new_head.tail = queue_tail; + break :blk &new_head.node; + }); + + var removed = WaitList{}; + while (removed.len < max_waiters) { + // dequeue and collect waiters from their wait queue. + const waiter = queue_head orelse break; + queue_head = waiter.next; + removed.push(waiter); + + // When dequeueing, we must mark is_queued as false. + // This ensures that a waiter which calls tryRemove() returns false. + assert(waiter.is_queued); + waiter.is_queued = false; + } + + return removed; + } + + fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool { + if (!waiter.is_queued) { + return false; + } + + queue_remove: { + // Find the wait queue associated with the address. + var entry = blk: { + // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup. + if (waiter.prev == null) { + assert(waiter.node.key == address); + break :blk treap.getEntryForExisting(&waiter.node); + } + break :blk treap.getEntryFor(address); + }; + + // The queue head and tail must exist if we're removing a queued waiter. + const head: *Waiter = @fieldParentPtr("node", entry.node orelse unreachable); + const tail = head.tail orelse unreachable; + + // A waiter with a previous link is never the head of the queue. + if (waiter.prev) |prev| { + assert(waiter != head); + prev.next = waiter.next; + + // A waiter with both a previous and next link is in the middle. + // We only need to update the surrounding waiter's links to remove it. + if (waiter.next) |next| { + assert(waiter != tail); + next.prev = waiter.prev; + break :queue_remove; + } + + // A waiter with a previous but no next link means it's the tail of the queue. + // In that case, we need to update the head's tail reference. + assert(waiter == tail); + head.tail = waiter.prev; + break :queue_remove; + } + + // A waiter with no previous link means it's the queue head of queue. + // We must replace (or remove) the head waiter reference in the treap. + assert(waiter == head); + entry.set(blk: { + const new_head = waiter.next orelse break :blk null; + new_head.tail = head.tail; + break :blk &new_head.node; + }); + } + + // Mark the waiter as successfully removed. + waiter.is_queued = false; + return true; + } + }; + + const Bucket = struct { + mutex: c.pthread_mutex_t align(atomic.cache_line) = .{}, + pending: atomic.Value(usize) = atomic.Value(usize).init(0), + treap: Treap = .{}, + + // Global array of buckets that addresses map to. + // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing. + var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize); + + // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353 + fn from(address: usize) *Bucket { + // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio. + // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array + // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers). + const max_multiplier_bits = @bitSizeOf(usize); + const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits); + + const max_bucket_bits = @ctz(buckets.len); + comptime assert(std.math.isPowerOfTwo(buckets.len)); + + const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits); + return &buckets[index]; + } + }; + + const Address = struct { + fn from(ptr: *const u32) usize { + // Get the alignment of the pointer. + const alignment = @alignOf(atomic.Value(u32)); + comptime assert(std.math.isPowerOfTwo(alignment)); + + // Make sure the pointer is aligned, + // then cut off the zero bits from the alignment to get the unique address. + const addr = @intFromPtr(ptr); + assert(addr & (alignment - 1) == 0); + return addr >> @ctz(@as(usize, alignment)); + } + }; + + fn wait(ptr: *const u32, expect: u32, timeout: ?u64) error{Timeout}!void { + const address = Address.from(ptr); + const bucket = Bucket.from(address); + + // Announce that there's a waiter in the bucket before checking the ptr/expect condition. + // If the announcement is reordered after the ptr check, the waiter could deadlock: + // + // - T1: checks ptr == expect which is true + // - T2: updates ptr to != expect + // - T2: does Futex.wake(), sees no pending waiters, exits + // - T1: bumps pending waiters (was reordered after the ptr == expect check) + // - T1: goes to sleep and misses both the ptr change and T2's wake up + // + // acquire barrier to ensure the announcement happens before the ptr check below. + var pending = bucket.pending.fetchAdd(1, .acquire); + assert(pending < std.math.maxInt(usize)); + + // If the wait gets canceled, remove the pending count we previously added. + // This is done outside the mutex lock to keep the critical section short in case of contention. + var canceled = false; + defer if (canceled) { + pending = bucket.pending.fetchSub(1, .monotonic); + assert(pending > 0); + }; + + var waiter: Waiter = undefined; + { + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + canceled = @atomicLoad(u32, ptr, .monotonic) != expect; + if (canceled) { + return; + } + + waiter.event.init(); + WaitQueue.insert(&bucket.treap, address, &waiter); + } + + defer { + assert(!waiter.is_queued); + waiter.event.deinit(); + } + + waiter.event.wait(timeout) catch { + // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up. + // We must wait until the event is set as that's a signal that the wake() thread won't access the waiter memory anymore. + // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF. + defer if (!canceled) waiter.event.wait(null) catch unreachable; + + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + canceled = WaitQueue.tryRemove(&bucket.treap, address, &waiter); + if (canceled) { + return error.Timeout; + } + }; + } + + fn wake(ptr: *const u32, max_waiters: u32) void { + const address = Address.from(ptr); + const bucket = Bucket.from(address); + + // Quick check if there's even anything to wake up. + // The change to the ptr's value must happen before we check for pending waiters. + // If not, the wake() thread could miss a sleeping waiter and have it deadlock: + // + // - T2: p = has pending waiters (reordered before the ptr update) + // - T1: bump pending waiters + // - T1: if ptr == expected: sleep() + // - T2: update ptr != expected + // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping) + // + // What we really want here is a Release load, but that doesn't exist under the C11 memory model. + // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing, + // LLVM lowers the fetchAdd(0, .release) into an mfence+load which avoids gaining ownership of the cache-line. + if (bucket.pending.fetchAdd(0, .release) == 0) { + return; + } + + // Keep a list of all the waiters notified and wake then up outside the mutex critical section. + var notified = WaitList{}; + defer if (notified.len > 0) { + const pending = bucket.pending.fetchSub(notified.len, .monotonic); + assert(pending >= notified.len); + + while (notified.pop()) |waiter| { + assert(!waiter.is_queued); + waiter.event.set(); + } + }; + + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + // Another pending check again to avoid the WaitQueue lookup if not necessary. + if (bucket.pending.load(.monotonic) > 0) { + notified = WaitQueue.remove(&bucket.treap, address, max_waiters); + } + } +}; + test { _ = @import("Threaded/test.zig"); }