extract std.posix from std.os

closes #5019
This commit is contained in:
Andrew Kelley
2024-03-18 22:39:59 -07:00
parent 7057bffc14
commit cd62005f19
94 changed files with 10240 additions and 10305 deletions

View File

@@ -1,13 +1,20 @@
//! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints.
//! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value.
//! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`.
//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals.
//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a
//! 32bit memory address as hints.
//!
//! Blocking a thread is acknowledged only if the 32bit memory address is equal
//! to a given value. This check helps avoid block/unblock deadlocks which
//! occur if a `wake()` happens before a `wait()`.
//!
//! Using Futex, other Thread synchronization primitives can be built which
//! efficiently wait for cross-thread events or signals.
const std = @import("../std.zig");
const builtin = @import("builtin");
const Futex = @This();
const windows = std.os.windows;
const linux = std.os.linux;
const c = std.c;
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const atomic = std.atomic;
@@ -124,18 +131,18 @@ const SingleThreadedImpl = struct {
// as it's generally already a linked target and is autoloaded into all processes anyway.
const WindowsImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var timeout_value: os.windows.LARGE_INTEGER = undefined;
var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null;
var timeout_value: windows.LARGE_INTEGER = undefined;
var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
// NTDLL functions work with time in units of 100 nanoseconds.
// Positive values are absolute deadlines while negative values are relative durations.
if (timeout) |delay| {
timeout_value = @as(os.windows.LARGE_INTEGER, @intCast(delay / 100));
timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
timeout_value = -timeout_value;
timeout_ptr = &timeout_value;
}
const rc = os.windows.ntdll.RtlWaitOnAddress(
const rc = windows.ntdll.RtlWaitOnAddress(
ptr,
&expect,
@sizeOf(@TypeOf(expect)),
@@ -157,8 +164,8 @@ const WindowsImpl = struct {
assert(max_waiters != 0);
switch (max_waiters) {
1 => os.windows.ntdll.RtlWakeAddressSingle(address),
else => os.windows.ntdll.RtlWakeAddressAll(address),
1 => windows.ntdll.RtlWakeAddressSingle(address),
else => windows.ntdll.RtlWakeAddressAll(address),
}
}
};
@@ -189,10 +196,10 @@ const DarwinImpl = struct {
var timeout_overflowed = false;
const addr: *const anyopaque = ptr;
const flags = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
const flags = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO;
const status = blk: {
if (supports_ulock_wait2) {
break :blk os.darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
}
const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
@@ -200,11 +207,11 @@ const DarwinImpl = struct {
break :overflow std.math.maxInt(u32);
};
break :blk os.darwin.__ulock_wait(flags, addr, expect, timeout_us);
break :blk c.__ulock_wait(flags, addr, expect, timeout_us);
};
if (status >= 0) return;
switch (@as(std.os.E, @enumFromInt(-status))) {
switch (@as(c.E, @enumFromInt(-status))) {
// Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
// Address of the futex was paged out. This is unlikely, but possible in theory, and
@@ -221,17 +228,17 @@ const DarwinImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
var flags: u32 = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO;
if (max_waiters > 1) {
flags |= os.darwin.ULF_WAKE_ALL;
flags |= c.ULF_WAKE_ALL;
}
while (true) {
const addr: *const anyopaque = ptr;
const status = os.darwin.__ulock_wake(flags, addr, 0);
const status = c.__ulock_wake(flags, addr, 0);
if (status >= 0) return;
switch (@as(std.os.E, @enumFromInt(-status))) {
switch (@as(c.E, @enumFromInt(-status))) {
.INTR => continue, // spurious wake()
.FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
@@ -245,20 +252,20 @@ const DarwinImpl = struct {
// https://man7.org/linux/man-pages/man2/futex.2.html
const LinuxImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var ts: os.timespec = undefined;
var ts: linux.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
const rc = os.linux.futex_wait(
const rc = linux.futex_wait(
@as(*const i32, @ptrCast(&ptr.raw)),
os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT,
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
@as(i32, @bitCast(expect)),
if (timeout != null) &ts else null,
);
switch (os.linux.getErrno(rc)) {
switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // spurious wakeup
.AGAIN => {}, // ptr.* != expect
@@ -273,13 +280,13 @@ const LinuxImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.linux.futex_wake(
const rc = linux.futex_wake(
@as(*const i32, @ptrCast(&ptr.raw)),
os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE,
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32),
);
switch (os.linux.getErrno(rc)) {
switch (linux.E.init(rc)) {
.SUCCESS => {}, // successful wake up
.INVAL => {}, // invalid futex_wait() on ptr done elsewhere
.FAULT => {}, // pointer became invalid while doing the wake
@@ -292,28 +299,28 @@ const LinuxImpl = struct {
const FreebsdImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var tm_size: usize = 0;
var tm: os.freebsd._umtx_time = undefined;
var tm_ptr: ?*const os.freebsd._umtx_time = null;
var tm: c._umtx_time = undefined;
var tm_ptr: ?*const c._umtx_time = null;
if (timeout) |timeout_ns| {
tm_ptr = &tm;
tm_size = @sizeOf(@TypeOf(tm));
tm._flags = 0; // use relative time not UMTX_ABSTIME
tm._clockid = os.CLOCK.MONOTONIC;
tm._clockid = c.CLOCK.MONOTONIC;
tm._timeout.tv_sec = @as(@TypeOf(tm._timeout.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
tm._timeout.tv_nsec = @as(@TypeOf(tm._timeout.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
const rc = os.freebsd._umtx_op(
const rc = c._umtx_op(
@intFromPtr(&ptr.raw),
@intFromEnum(os.freebsd.UMTX_OP.WAIT_UINT_PRIVATE),
@intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE),
@as(c_ulong, expect),
tm_size,
@intFromPtr(tm_ptr),
);
switch (os.errno(rc)) {
switch (std.posix.errno(rc)) {
.SUCCESS => {},
.FAULT => unreachable, // one of the args points to invalid memory
.INVAL => unreachable, // arguments should be correct
@@ -327,15 +334,15 @@ const FreebsdImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.freebsd._umtx_op(
const rc = c._umtx_op(
@intFromPtr(&ptr.raw),
@intFromEnum(os.freebsd.UMTX_OP.WAKE_PRIVATE),
@intFromEnum(c.UMTX_OP.WAKE_PRIVATE),
@as(c_ulong, max_waiters),
0, // there is no timeout struct
0, // there is no timeout struct pointer
);
switch (os.errno(rc)) {
switch (std.posix.errno(rc)) {
.SUCCESS => {},
.FAULT => {}, // it's ok if the ptr doesn't point to valid memory
.INVAL => unreachable, // arguments should be correct
@@ -347,21 +354,21 @@ const FreebsdImpl = struct {
// https://man.openbsd.org/futex.2
const OpenbsdImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var ts: os.timespec = undefined;
var ts: c.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
const rc = os.openbsd.futex(
const rc = c.futex(
@as(*const volatile u32, @ptrCast(&ptr.raw)),
os.openbsd.FUTEX_WAIT | os.openbsd.FUTEX_PRIVATE_FLAG,
c.FUTEX_WAIT | c.FUTEX_PRIVATE_FLAG,
@as(c_int, @bitCast(expect)),
if (timeout != null) &ts else null,
null, // FUTEX_WAIT takes no requeue address
);
switch (os.errno(rc)) {
switch (std.posix.errno(rc)) {
.SUCCESS => {}, // woken up by wake
.NOSYS => unreachable, // the futex operation shouldn't be invalid
.FAULT => unreachable, // ptr was invalid
@@ -378,9 +385,9 @@ const OpenbsdImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.openbsd.futex(
const rc = c.futex(
@as(*const volatile u32, @ptrCast(&ptr.raw)),
os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG,
c.FUTEX_WAKE | c.FUTEX_PRIVATE_FLAG,
std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
null, // FUTEX_WAKE takes no timeout ptr
null, // FUTEX_WAKE takes no requeue address
@@ -415,9 +422,9 @@ const DragonflyImpl = struct {
const value = @as(c_int, @bitCast(expect));
const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
const rc = os.dragonfly.umtx_sleep(addr, value, timeout_us);
const rc = c.umtx_sleep(addr, value, timeout_us);
switch (os.errno(rc)) {
switch (std.posix.errno(rc)) {
.SUCCESS => {},
.BUSY => {}, // ptr != expect
.AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
@@ -444,7 +451,7 @@ const DragonflyImpl = struct {
// > umtx_wakeup() will generally return 0 unless the address is bad.
// We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
_ = os.dragonfly.umtx_wakeup(addr, to_wake);
_ = c.umtx_wakeup(addr, to_wake);
}
};
@@ -496,8 +503,8 @@ const WasmImpl = struct {
/// https://go.dev/src/runtime/sema.go
const PosixImpl = struct {
const Event = struct {
cond: std.c.pthread_cond_t,
mutex: std.c.pthread_mutex_t,
cond: c.pthread_cond_t,
mutex: c.pthread_mutex_t,
state: enum { empty, waiting, notified },
fn init(self: *Event) void {
@@ -509,18 +516,18 @@ const PosixImpl = struct {
fn deinit(self: *Event) void {
// Some platforms reportedly give EINVAL for statically initialized pthread types.
const rc = std.c.pthread_cond_destroy(&self.cond);
const rc = c.pthread_cond_destroy(&self.cond);
assert(rc == .SUCCESS or rc == .INVAL);
const rm = std.c.pthread_mutex_destroy(&self.mutex);
const rm = c.pthread_mutex_destroy(&self.mutex);
assert(rm == .SUCCESS or rm == .INVAL);
self.* = undefined;
}
fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
// Early return if the event was already set.
if (self.state == .notified) {
@@ -530,9 +537,9 @@ const PosixImpl = struct {
// Compute the absolute timeout if one was specified.
// POSIX requires that REALTIME is used by default for the pthread timedwait functions.
// This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
var ts: os.timespec = undefined;
var ts: c.timespec = undefined;
if (timeout) |timeout_ns| {
os.clock_gettime(os.CLOCK.REALTIME, &ts) catch unreachable;
c.clock_gettime(c.CLOCK.REALTIME, &ts) catch unreachable;
ts.tv_sec +|= @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec += @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
@@ -549,8 +556,8 @@ const PosixImpl = struct {
while (true) {
// Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
const rc = blk: {
if (timeout == null) break :blk std.c.pthread_cond_wait(&self.cond, &self.mutex);
break :blk std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex);
break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
};
// After waking up, check if the event was set.
@@ -574,8 +581,8 @@ const PosixImpl = struct {
}
fn set(self: *Event) void {
assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
// Make sure that multiple calls to set() were not done on the same Event.
const old_state = self.state;
@@ -586,7 +593,7 @@ const PosixImpl = struct {
// the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
self.state = .notified;
if (old_state == .waiting) {
assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS);
assert(c.pthread_cond_signal(&self.cond) == .SUCCESS);
}
}
};
@@ -732,7 +739,7 @@ const PosixImpl = struct {
};
const Bucket = struct {
mutex: std.c.pthread_mutex_t align(atomic.cache_line) = .{},
mutex: c.pthread_mutex_t align(atomic.cache_line) = .{},
pending: atomic.Value(usize) = atomic.Value(usize).init(0),
treap: Treap = .{},
@@ -798,8 +805,8 @@ const PosixImpl = struct {
var waiter: Waiter = undefined;
{
assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
cancelled = ptr.load(.monotonic) != expect;
if (cancelled) {
@@ -821,8 +828,8 @@ const PosixImpl = struct {
// If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
defer if (!cancelled) waiter.event.wait(null) catch unreachable;
assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
cancelled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
if (cancelled) {
@@ -871,8 +878,8 @@ const PosixImpl = struct {
}
};
assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
// Another pending check again to avoid the WaitQueue lookup if not necessary.
if (bucket.pending.load(.monotonic) > 0) {

View File

@@ -23,7 +23,6 @@ const std = @import("../std.zig");
const builtin = @import("builtin");
const Mutex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Thread = std.Thread;
@@ -117,36 +116,40 @@ const SingleThreadedImpl = struct {
// SRWLOCK on windows is almost always faster than Futex solution.
// It also implements an efficient Condition with requeue support for us.
const WindowsImpl = struct {
srwlock: os.windows.SRWLOCK = .{},
srwlock: windows.SRWLOCK = .{},
fn tryLock(self: *@This()) bool {
return os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != os.windows.FALSE;
return windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
}
fn lock(self: *@This()) void {
os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
}
fn unlock(self: *@This()) void {
os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
}
const windows = std.os.windows;
};
// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
const DarwinImpl = struct {
oul: os.darwin.os_unfair_lock = .{},
oul: c.os_unfair_lock = .{},
fn tryLock(self: *@This()) bool {
return os.darwin.os_unfair_lock_trylock(&self.oul);
return c.os_unfair_lock_trylock(&self.oul);
}
fn lock(self: *@This()) void {
os.darwin.os_unfair_lock_lock(&self.oul);
c.os_unfair_lock_lock(&self.oul);
}
fn unlock(self: *@This()) void {
os.darwin.os_unfair_lock_unlock(&self.oul);
c.os_unfair_lock_unlock(&self.oul);
}
const c = std.c;
};
const FutexImpl = struct {