commit 550da1b676d059ae39a629d60da0f9cd155a5e89 (tree)
parent 255aeb57b24bc24b604744460a61ebf7c44e42ea
Author: Andrew Kelley <andrew@ziglang.org>
Date: Sun, 1 Feb 2026 17:33:49 -0800
std: migrate remaining sync primitives to Io
- delete std.Thread.Futex
- delete std.Thread.Mutex
- delete std.Thread.Semaphore
- delete std.Thread.Condition
- delete std.Thread.RwLock
- delete std.once
std.Thread.Mutex.Recursive remains... for now. it will be replaced with
a special purpose mechanism used only by panic logic.
std.Io.Threaded exposes mutexLock and mutexUnlock for the advanced case
when you need to call them directly.
Diffstat:
24 files changed, 257 insertions(+), 2951 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
@@ -408,8 +408,6 @@ set(ZIG_STAGE2_SOURCES
lib/std/Target/wasm.zig
lib/std/Target/x86.zig
lib/std/Thread.zig
- lib/std/Thread/Futex.zig
- lib/std/Thread/Mutex.zig
lib/std/array_hash_map.zig
lib/std/array_list.zig
lib/std/ascii.zig
diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig
@@ -30,14 +30,6 @@ pub fn main(init: process.Init.Minimal) !void {
defer _ = debug_gpa_state.deinit();
const gpa = debug_gpa_state.allocator();
- // ...but we'll back our arena by `std.heap.page_allocator` for efficiency.
- var single_threaded_arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
- defer single_threaded_arena.deinit();
- var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ .child_allocator = single_threaded_arena.allocator() };
- const arena = thread_safe_arena.allocator();
-
- const args = try init.args.toSlice(arena);
-
var threaded: std.Io.Threaded = .init(gpa, .{
.environ = init.environ,
.argv0 = .init(init.args),
@@ -45,6 +37,17 @@ pub fn main(init: process.Init.Minimal) !void {
defer threaded.deinit();
const io = threaded.io();
+ // ...but we'll back our arena by `std.heap.page_allocator` for efficiency.
+ var single_threaded_arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
+ defer single_threaded_arena.deinit();
+ var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
+ .child_allocator = single_threaded_arena.allocator(),
+ .io = io,
+ };
+ const arena = thread_safe_arena.allocator();
+
+ const args = try init.args.toSlice(arena);
+
// skip my own exe name
var arg_idx: usize = 1;
diff --git a/lib/compiler_rt/emutls.zig b/lib/compiler_rt/emutls.zig
@@ -147,7 +147,8 @@ const ObjectArray = struct {
// It provides thread-safety for on-demand storage of Thread Objects.
const current_thread_storage = struct {
var key: std.c.pthread_key_t = undefined;
- var init_once = std.once(current_thread_storage.init);
+ var init_mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER;
+ var init_done: bool = false;
/// Return a per thread ObjectArray with at least the expected index.
pub fn getArray(index: usize) *ObjectArray {
@@ -183,9 +184,13 @@ const current_thread_storage = struct {
/// Initialize pthread_key_t.
fn init() void {
+ if (@atomicLoad(bool, &init_done, .monotonic)) return;
+ _ = std.c.pthread_mutex_lock(&init_mutex);
if (std.c.pthread_key_create(¤t_thread_storage.key, current_thread_storage.deinit) != .SUCCESS) {
abort();
}
+ @atomicStore(bool, &init_done, true, .release);
+ _ = std.c.pthread_mutex_unlock(&init_mutex);
}
/// Invoked by pthread specific destructor. the passed argument is the ObjectArray pointer.
@@ -283,7 +288,7 @@ const emutls_control = extern struct {
/// Get the pointer on allocated storage for emutls variable.
pub fn getPointer(self: *emutls_control) *anyopaque {
// ensure current_thread_storage initialization is done
- current_thread_storage.init_once.call();
+ current_thread_storage.init();
const index = self.getIndex();
var array = current_thread_storage.getArray(index);
diff --git a/lib/fuzzer.zig b/lib/fuzzer.zig
@@ -632,7 +632,7 @@ export fn fuzzer_main(limit_kind: abi.LimitKind, amount: u64) void {
export fn fuzzer_unslide_address(addr: usize) usize {
const si = std.debug.getSelfDebugInfo() catch @compileError("unsupported");
- const slide = si.getModuleSlide(std.debug.getDebugInfoAllocator(), addr) catch |err| {
+ const slide = si.getModuleSlide(std.debug.getDebugInfoAllocator(), io, addr) catch |err| {
std.debug.panic("failed to find virtual address slide: {t}", .{err});
};
return addr - slide;
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -1126,6 +1126,25 @@ const Thread = struct {
return @ptrFromInt(@as(usize, @bitCast(split)));
}
};
+
+ /// Same as `Io.Mutex.lock` but avoids the VTable.
+ fn mutexLock(m: *Io.Mutex) Io.Cancelable!void {
+ const initial_state = m.state.cmpxchgWeak(
+ .unlocked,
+ .locked_once,
+ .acquire,
+ .monotonic,
+ ) orelse {
+ @branchHint(.likely);
+ return;
+ };
+ if (initial_state == .contended) {
+ try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
+ }
+ while (m.state.swap(.contended, .acquire) != .unlocked) {
+ try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
+ }
+ }
};
const Syscall = struct {
@@ -1486,8 +1505,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded;
pub const global_single_threaded: *Threaded = &global_single_threaded_instance;
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
t.async_limit = new_limit;
}
@@ -1508,8 +1527,8 @@ pub fn deinit(t: *Threaded) void {
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
t.join_requested = true;
}
condBroadcast(&t.cond);
@@ -1574,16 +1593,16 @@ fn worker(t: *Threaded) void {
defer t.wait_group.finish();
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
while (true) {
while (t.run_queue.popFirst()) |runnable_node| {
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
thread.cancel_protection = .unblocked;
const runnable: *Runnable = @fieldParentPtr("node", runnable_node);
runnable.startFn(runnable, &thread, t);
- mutexLockUncancelable(&t.mutex);
+ mutexLockInternal(&t.mutex);
t.busy_count -= 1;
}
if (t.join_requested) break;
@@ -2004,12 +2023,12 @@ fn async(
},
};
- mutexLockUncancelable(&t.mutex);
+ mutexLockInternal(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@@ -2023,7 +2042,7 @@ fn async(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@@ -2033,7 +2052,7 @@ fn async(
t.run_queue.prepend(&future.runnable.node);
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
condSignal(&t.cond);
return @ptrCast(future);
}
@@ -2056,8 +2075,8 @@ fn concurrent(
};
errdefer future.destroy(gpa);
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
const busy_count = t.busy_count;
@@ -2101,12 +2120,12 @@ fn groupAsync(
error.OutOfMemory => return groupAsyncEager(start, context.ptr),
};
- mutexLockUncancelable(&t.mutex);
+ mutexLockInternal(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
}
@@ -2119,7 +2138,7 @@ fn groupAsync(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
};
@@ -2136,7 +2155,7 @@ fn groupAsync(
}, .monotonic);
t.run_queue.prepend(&task.runnable.node);
- mutexUnlock(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
condSignal(&t.cond);
}
fn groupAsyncEager(
@@ -2201,8 +2220,8 @@ fn groupConcurrent(
};
errdefer task.destroy(gpa);
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
const busy_count = t.busy_count;
@@ -3838,8 +3857,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION {
if (!t.system_basic_information.initialized.load(.acquire)) {
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
switch (windows.ntdll.NtQuerySystemInformation(
.SystemBasicInformation,
@@ -14373,8 +14392,8 @@ const WindowsEnvironStrings = struct {
};
fn scanEnviron(t: *Threaded) void {
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (t.environ.initialized) return;
t.environ.initialized = true;
@@ -14729,8 +14748,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
fn getDevNullFd(t: *Threaded) !posix.fd_t {
{
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (t.null_file.fd != -1) return t.null_file.fd;
}
const mode: u32 = 0;
@@ -14741,8 +14760,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t {
.SUCCESS => {
syscall.finish();
const fresh_fd: posix.fd_t = @intCast(rc);
- mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex); // Another thread might have won the race.
+ defer mutexUnlockInternal(&t.mutex);
if (t.null_file.fd != -1) {
posix.close(fresh_fd);
return t.null_file.fd;
@@ -15402,8 +15421,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
{
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (t.random_file.handle) |handle| return handle;
}
@@ -15437,8 +15456,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
- mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex); // Another thread might have won the race.
+ defer mutexUnlockInternal(&t.mutex);
if (t.random_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@@ -15458,8 +15477,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
fn getNulHandle(t: *Threaded) !windows.HANDLE {
{
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (t.null_file.handle) |handle| return handle;
}
@@ -15505,8 +15524,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
- mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex); // Another thread might have won the race.
+ defer mutexUnlockInternal(&t.mutex);
if (t.null_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@@ -16551,15 +16570,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void {
}
fn randomMainThread(t: *Threaded, buffer: []u8) void {
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (!t.csprng.isInitialized()) {
@branchHint(.unlikely);
var seed: [Csprng.seed_len]u8 = undefined;
{
- mutexUnlock(&t.mutex);
- defer mutexLockUncancelable(&t.mutex);
+ mutexUnlockInternal(&t.mutex);
+ defer mutexLockInternal(&t.mutex);
const prev = swapCancelProtection(t, .blocked);
defer _ = swapCancelProtection(t, prev);
@@ -16744,8 +16763,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void {
fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
{
- mutexLockUncancelable(&t.mutex);
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex);
+ defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd == -2) return error.EntropyUnavailable;
if (t.random_file.fd != -1) return t.random_file.fd;
@@ -16785,8 +16804,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
.SUCCESS => {
syscall.finish();
if (!statx.mask.TYPE) return error.EntropyUnavailable;
- mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex); // Another thread might have won the race.
+ defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@@ -16813,8 +16832,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
switch (posix.errno(fstat_sym(fd, &stat))) {
.SUCCESS => {
syscall.finish();
- mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
- defer mutexUnlock(&t.mutex);
+ mutexLockInternal(&t.mutex); // Another thread might have won the race.
+ defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@@ -16947,8 +16966,8 @@ const parking_futex = struct {
var status_buf: std.atomic.Value(Thread.Status) = undefined;
{
- mutexLockUncancelable(&bucket.mutex);
- defer mutexUnlock(&bucket.mutex);
+ mutexLockInternal(&bucket.mutex);
+ defer mutexUnlockInternal(&bucket.mutex);
_ = bucket.num_waiters.fetchAdd(1, .acquire);
@@ -17017,8 +17036,8 @@ const parking_futex = struct {
.parked => {
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
// our responsibility to remove `waiter` from `bucket`.
- mutexLockUncancelable(&bucket.mutex);
- defer mutexUnlock(&bucket.mutex);
+ mutexLockInternal(&bucket.mutex);
+ defer mutexUnlockInternal(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
},
@@ -17057,8 +17076,8 @@ const parking_futex = struct {
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
var waking_head: ?*std.DoublyLinkedList.Node = null;
{
- mutexLockUncancelable(&bucket.mutex);
- defer mutexUnlock(&bucket.mutex);
+ mutexLockInternal(&bucket.mutex);
+ defer mutexUnlockInternal(&bucket.mutex);
var num_removed: u32 = 0;
var it = bucket.waiters.first;
@@ -17113,8 +17132,8 @@ const parking_futex = struct {
fn removeCanceledWaiter(waiter: *Waiter) void {
const bucket = bucketForAddress(waiter.address);
- mutexLockUncancelable(&bucket.mutex);
- defer mutexUnlock(&bucket.mutex);
+ mutexLockInternal(&bucket.mutex);
+ defer mutexUnlockInternal(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
@@ -18163,8 +18182,8 @@ fn condWait(cond: *Condition, mutex: *Mutex) void {
assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters
}
- mutexUnlock(mutex);
- defer mutexLockUncancelable(mutex);
+ mutexUnlockInternal(mutex);
+ defer mutexLockInternal(mutex);
while (true) {
Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null);
@@ -18189,28 +18208,13 @@ const Mutex = if (!is_windows) Io.Mutex else struct {
const init: @This() = .{ .srwlock = .{} };
};
-/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
-fn mutexLock(m: *Io.Mutex) Io.Cancelable!void {
- const initial_state = m.state.cmpxchgWeak(
- .unlocked,
- .locked_once,
- .acquire,
- .monotonic,
- ) orelse {
- @branchHint(.likely);
- return;
- };
- if (initial_state == .contended) {
- try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
- }
- while (m.state.swap(.contended, .acquire) != .unlocked) {
- try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
- }
+fn mutexLockInternal(m: *Mutex) void {
+ if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock);
+ return mutexLock(m);
}
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
-fn mutexLockUncancelable(m: *Mutex) void {
- if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock);
+pub fn mutexLock(m: *Io.Mutex) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
@@ -18228,9 +18232,13 @@ fn mutexLockUncancelable(m: *Mutex) void {
}
}
-/// Same as `Io.Mutex.unlock` but avoids the VTable.
-fn mutexUnlock(m: *Mutex) void {
+fn mutexUnlockInternal(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock);
+ return mutexUnlock(m);
+}
+
+/// Same as `Io.Mutex.unlock` but avoids the VTable.
+pub fn mutexUnlock(m: *Io.Mutex) void {
switch (m.state.swap(.unlocked, .release)) {
.unlocked => unreachable,
.locked_once => {},
diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig
@@ -14,13 +14,9 @@ const posix = std.posix;
const windows = std.os.windows;
const testing = std.testing;
-pub const Futex = @import("Thread/Futex.zig");
-pub const Mutex = @import("Thread/Mutex.zig");
-pub const Semaphore = @import("Thread/Semaphore.zig");
-pub const Condition = @import("Thread/Condition.zig");
-pub const RwLock = @import("Thread/RwLock.zig");
-
-pub const Pool = @compileError("deprecated; consider using 'std.Io.Group' with 'std.Io.Threaded'");
+pub const Mutex = struct {
+ pub const Recursive = @import("Thread/Mutex/Recursive.zig");
+};
pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc;
@@ -1609,11 +1605,7 @@ test "setName, getName" {
}
test {
- _ = Futex;
_ = Mutex;
- _ = Semaphore;
- _ = Condition;
- _ = RwLock;
}
fn testIncrementNotify(io: Io, value: *usize, event: *Io.Event) void {
diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig
@@ -1,683 +0,0 @@
-//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur.
-//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex.
-//! Condition can be statically initialized and is at most `@sizeOf(u64)` large.
-//!
-//! Example:
-//! ```
-//! var m = Mutex{};
-//! var c = Condition{};
-//! var predicate = false;
-//!
-//! fn consumer() void {
-//! m.lock();
-//! defer m.unlock();
-//!
-//! while (!predicate) {
-//! c.wait(&m);
-//! }
-//! }
-//!
-//! fn producer() void {
-//! {
-//! m.lock();
-//! defer m.unlock();
-//! predicate = true;
-//! }
-//! c.signal();
-//! }
-//!
-//! const thread = try std.Thread.spawn(.{}, producer, .{});
-//! consumer();
-//! thread.join();
-//! ```
-//!
-//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex.
-//! This means that the following is allowed to deadlock:
-//! ```
-//! thread-1: mutex.lock()
-//! thread-1: condition.wait(&mutex)
-//!
-//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1)
-//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called while holding the mutex)
-//! thread-2: condition.signal()
-//! ```
-
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const Condition = @This();
-const Mutex = std.Thread.Mutex;
-
-const os = std.os;
-const assert = std.debug.assert;
-const testing = std.testing;
-const Futex = std.Thread.Futex;
-
-impl: Impl = .{},
-
-/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
-/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
-///
-/// The Mutex must be locked by the caller's thread when this function is called.
-/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
-/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
-/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
-///
-/// A blocking call to wait() is unblocked from one of the following conditions:
-/// - a spurious ("at random") wake up occurs
-/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`.
-///
-/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously
-/// irrespective of any notifications from `signal()` or `broadcast()`.
-pub fn wait(self: *Condition, mutex: *Mutex) void {
- self.impl.wait(mutex, null) catch |err| switch (err) {
- error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
- };
-}
-
-/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
-/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
-///
-/// The Mutex must be locked by the caller's thread when this function is called.
-/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
-/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
-/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
-///
-/// A blocking call to `timedWait()` is unblocked from one of the following conditions:
-/// - a spurious ("at random") wake occurs
-/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned.
-/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`.
-///
-/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously
-/// irrespective of any notifications from `signal()` or `broadcast()`.
-pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void {
- return self.impl.wait(mutex, timeout_ns);
-}
-
-/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex.
-/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
-/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
-pub fn signal(self: *Condition) void {
- self.impl.wake(.one);
-}
-
-/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex.
-/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
-/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
-pub fn broadcast(self: *Condition) void {
- self.impl.wake(.all);
-}
-
-const Impl = Impl: {
- if (builtin.single_threaded) break :Impl SingleThreadedImpl;
- if (builtin.os.tag == .windows) break :Impl WindowsImpl;
-
- if (builtin.os.tag.isDarwin() or
- builtin.target.os.tag == .linux or
- builtin.target.os.tag == .freebsd or
- builtin.target.os.tag == .openbsd or
- builtin.target.os.tag == .dragonfly or
- builtin.target.cpu.arch.isWasm())
- {
- // Futex is the system's synchronization primitive; use that.
- break :Impl FutexImpl;
- }
-
- if (std.Thread.use_pthreads) {
- // This system doesn't have a futex primitive, so `std.Thread.Futex` is using `PosixImpl`,
- // which implements futex *on top of* pthread mutexes and conditions. Therefore, instead
- // of going through that long inefficient path, just use pthread condition variable directly.
- break :Impl PosixImpl;
- }
-
- break :Impl FutexImpl;
-};
-
-const Notify = enum {
- one, // wake up only one thread
- all, // wake up all threads
-};
-
-const SingleThreadedImpl = struct {
- fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
- _ = self;
- _ = mutex;
- // There are no other threads to wake us up.
- // So if we wait without a timeout we would never wake up.
- assert(timeout != null); // Deadlock detected.
- return error.Timeout;
- }
-
- fn wake(self: *Impl, comptime notify: Notify) void {
- // There are no other threads to wake up.
- _ = self;
- _ = notify;
- }
-};
-
-const WindowsImpl = struct {
- condition: os.windows.CONDITION_VARIABLE = .{},
-
- fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
- var timeout_overflowed = false;
- var timeout_ms: os.windows.DWORD = os.windows.INFINITE;
-
- if (timeout) |timeout_ns| {
- // Round the nanoseconds to the nearest millisecond,
- // then saturating cast it to windows DWORD for use in kernel32 call.
- const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
- timeout_ms = std.math.cast(os.windows.DWORD, ms) orelse std.math.maxInt(os.windows.DWORD);
-
- // Track if the timeout overflowed into INFINITE and make sure not to wait forever.
- if (timeout_ms == os.windows.INFINITE) {
- timeout_overflowed = true;
- timeout_ms -= 1;
- }
- }
-
- if (builtin.mode == .Debug) {
- // The internal state of the DebugMutex needs to be handled here as well.
- mutex.impl.locking_thread.store(0, .unordered);
- }
- const rc = os.windows.kernel32.SleepConditionVariableSRW(
- &self.condition,
- if (builtin.mode == .Debug) &mutex.impl.impl.srwlock else &mutex.impl.srwlock,
- timeout_ms,
- 0, // the srwlock was assumed to acquired in exclusive mode not shared
- );
- if (builtin.mode == .Debug) {
- // The internal state of the DebugMutex needs to be handled here as well.
- mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .unordered);
- }
-
- // Return error.Timeout if we know the timeout elapsed correctly.
- if (rc == os.windows.FALSE) {
- assert(os.windows.GetLastError() == .TIMEOUT);
- if (!timeout_overflowed) return error.Timeout;
- }
- }
-
- fn wake(self: *Impl, comptime notify: Notify) void {
- switch (notify) {
- .one => os.windows.ntdll.RtlWakeConditionVariable(&self.condition),
- .all => os.windows.ntdll.RtlWakeAllConditionVariable(&self.condition),
- }
- }
-};
-
-const FutexImpl = struct {
- state: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
- epoch: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
-
- const one_waiter = 1;
- const waiter_mask = 0xffff;
-
- const one_signal = 1 << 16;
- const signal_mask = 0xffff << 16;
-
- fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
- // Observe the epoch, then check the state again to see if we should wake up.
- // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
- //
- // - T1: s = LOAD(&state)
- // - T2: UPDATE(&s, signal)
- // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
- // - T1: e = LOAD(&epoch) (was reordered after the state load)
- // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
- //
- // Acquire barrier to ensure the epoch load happens before the state load.
- var epoch = self.epoch.load(.acquire);
- var state = self.state.fetchAdd(one_waiter, .monotonic);
- assert(state & waiter_mask != waiter_mask);
- state += one_waiter;
-
- mutex.unlock();
- defer mutex.lock();
-
- var futex_deadline = Futex.Deadline.init(timeout);
-
- while (true) {
- futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) {
- // On timeout, we must decrement the waiter we added above.
- error.Timeout => {
- while (true) {
- // If there's a signal when we're timing out, consume it and report being woken up instead.
- // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
- while (state & signal_mask != 0) {
- const new_state = state - one_waiter - one_signal;
- state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
- }
-
- // Remove the waiter we added and officially return timed out.
- const new_state = state - one_waiter;
- state = self.state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
- }
- },
- };
-
- epoch = self.epoch.load(.acquire);
- state = self.state.load(.monotonic);
-
- // Try to wake up by consuming a signal and decremented the waiter we added previously.
- // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
- while (state & signal_mask != 0) {
- const new_state = state - one_waiter - one_signal;
- state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
- }
- }
- }
-
- fn wake(self: *Impl, comptime notify: Notify) void {
- var state = self.state.load(.monotonic);
- while (true) {
- const waiters = (state & waiter_mask) / one_waiter;
- const signals = (state & signal_mask) / one_signal;
-
- // Reserves which waiters to wake up by incrementing the signals count.
- // Therefore, the signals count is always less than or equal to the waiters count.
- // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
- const wakeable = waiters - signals;
- if (wakeable == 0) {
- return;
- }
-
- const to_wake = switch (notify) {
- .one => 1,
- .all => wakeable,
- };
-
- // Reserve the amount of waiters to wake by incrementing the signals count.
- // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
- const new_state = state + (one_signal * to_wake);
- state = self.state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
- // Wake up the waiting threads we reserved above by changing the epoch value.
- // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
- // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
- //
- // Release barrier ensures the signal being added to the state happens before the epoch is changed.
- // If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
- //
- // - T2: UPDATE(&epoch, 1) (reordered before the state change)
- // - T1: e = LOAD(&epoch)
- // - T1: s = LOAD(&state)
- // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
- // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
- _ = self.epoch.fetchAdd(1, .release);
- Futex.wake(&self.epoch, to_wake);
- return;
- };
- }
- }
-};
-
-const PosixImpl = struct {
- cond: std.c.pthread_cond_t = .{},
-
- fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
- if (builtin.mode == .Debug) {
- mutex.impl.locking_thread.store(0, .unordered);
- }
- defer if (builtin.mode == .Debug) {
- mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .unordered);
- };
-
- const mtx = if (builtin.mode == .Debug) &mutex.impl.impl.mutex else &mutex.impl.mutex;
-
- if (timeout) |t| {
- switch (std.c.pthread_cond_timedwait(&self.cond, mtx, &.{
- .sec = @intCast(@divFloor(t, std.time.ns_per_s)),
- .nsec = @intCast(@mod(t, std.time.ns_per_s)),
- })) {
- .SUCCESS => return,
- .TIMEDOUT => return error.Timeout,
- else => unreachable,
- }
- }
-
- assert(std.c.pthread_cond_wait(&self.cond, mtx) == .SUCCESS);
- }
-
- fn wake(self: *Impl, comptime notify: Notify) void {
- assert(switch (notify) {
- .one => std.c.pthread_cond_signal(&self.cond),
- .all => std.c.pthread_cond_broadcast(&self.cond),
- } == .SUCCESS);
- }
-};
-
-test "smoke test" {
- var mutex = Mutex{};
- var cond = Condition{};
-
- // Try to wake outside the mutex
- defer cond.signal();
- defer cond.broadcast();
-
- mutex.lock();
- defer mutex.unlock();
-
- // Try to wait with a timeout (should not deadlock)
- try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0));
- try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms));
-
- // Try to wake inside the mutex.
- cond.signal();
- cond.broadcast();
-}
-
-// Inspired from: https://github.com/Amanieu/parking_lot/pull/129
-test "wait and signal" {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const io = testing.io;
-
- const num_threads = 4;
-
- const MultiWait = struct {
- mutex: Mutex = .{},
- cond: Condition = .{},
- threads: [num_threads]std.Thread = undefined,
- spawn_count: std.math.IntFittingRange(0, num_threads) = 0,
-
- fn run(self: *@This()) void {
- self.mutex.lock();
- defer self.mutex.unlock();
- self.spawn_count += 1;
-
- self.cond.wait(&self.mutex);
- self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {};
- self.cond.signal();
- }
- };
-
- var multi_wait = MultiWait{};
- for (&multi_wait.threads) |*t| {
- t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait});
- }
-
- while (true) {
- try std.Io.Clock.Duration.sleep(.{ .clock = .awake, .raw = .fromMilliseconds(100) }, io);
-
- multi_wait.mutex.lock();
- defer multi_wait.mutex.unlock();
- // Make sure all of the threads have finished spawning to avoid a deadlock.
- if (multi_wait.spawn_count == num_threads) break;
- }
-
- multi_wait.cond.signal();
- for (multi_wait.threads) |t| {
- t.join();
- }
-}
-
-test signal {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const io = testing.io;
-
- const num_threads = 4;
-
- const SignalTest = struct {
- mutex: Mutex = .{},
- cond: Condition = .{},
- notified: bool = false,
- threads: [num_threads]std.Thread = undefined,
- spawn_count: std.math.IntFittingRange(0, num_threads) = 0,
-
- fn run(self: *@This()) void {
- self.mutex.lock();
- defer self.mutex.unlock();
- self.spawn_count += 1;
-
- // Use timedWait() a few times before using wait()
- // to test multiple threads timing out frequently.
- var i: usize = 0;
- while (!self.notified) : (i +%= 1) {
- if (i < 5) {
- self.cond.timedWait(&self.mutex, 1) catch {};
- } else {
- self.cond.wait(&self.mutex);
- }
- }
-
- // Once we received the signal, notify another thread (inside the lock).
- assert(self.notified);
- self.cond.signal();
- }
- };
-
- var signal_test = SignalTest{};
- for (&signal_test.threads) |*t| {
- t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test});
- }
-
- while (true) {
- try std.Io.Clock.Duration.sleep(.{ .clock = .awake, .raw = .fromMilliseconds(10) }, io);
-
- signal_test.mutex.lock();
- defer signal_test.mutex.unlock();
- // Make sure at least one thread has finished spawning to avoid testing nothing.
- if (signal_test.spawn_count > 0) break;
- }
-
- {
- // Wake up one of them (outside the lock) after setting notified=true.
- defer signal_test.cond.signal();
-
- signal_test.mutex.lock();
- defer signal_test.mutex.unlock();
-
- try testing.expect(!signal_test.notified);
- signal_test.notified = true;
- }
-
- for (signal_test.threads) |t| {
- t.join();
- }
-}
-
-test "multi signal" {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 4;
- const num_iterations = 4;
-
- const Paddle = struct {
- mutex: Mutex = .{},
- cond: Condition = .{},
- value: u32 = 0,
-
- fn hit(self: *@This()) void {
- defer self.cond.signal();
-
- self.mutex.lock();
- defer self.mutex.unlock();
-
- self.value += 1;
- }
-
- fn run(self: *@This(), hit_to: *@This()) !void {
- self.mutex.lock();
- defer self.mutex.unlock();
-
- var current: u32 = 0;
- while (current < num_iterations) : (current += 1) {
- // Wait for the value to change from hit()
- while (self.value == current) {
- self.cond.wait(&self.mutex);
- }
-
- // hit the next paddle
- try testing.expectEqual(self.value, current + 1);
- hit_to.hit();
- }
- }
- };
-
- var paddles = [_]Paddle{.{}} ** num_threads;
- var threads = [_]std.Thread{undefined} ** num_threads;
-
- // Create a circle of paddles which hit each other
- for (&threads, 0..) |*t, i| {
- const paddle = &paddles[i];
- const hit_to = &paddles[(i + 1) % paddles.len];
- t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
- }
-
- // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
- paddles[0].hit();
- for (threads) |t| t.join();
-
- // The first paddle will be hit one last time by the last paddle.
- for (paddles, 0..) |p, i| {
- const expected = @as(u32, num_iterations) + @intFromBool(i == 0);
- try testing.expectEqual(p.value, expected);
- }
-}
-
-test broadcast {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 10;
-
- const BroadcastTest = struct {
- mutex: Mutex = .{},
- cond: Condition = .{},
- completed: Condition = .{},
- count: usize = 0,
- threads: [num_threads]std.Thread = undefined,
-
- fn run(self: *@This()) void {
- self.mutex.lock();
- defer self.mutex.unlock();
-
- // The last broadcast thread to start tells the main test thread it's completed.
- self.count += 1;
- if (self.count == num_threads) {
- self.completed.signal();
- }
-
- // Waits for the count to reach zero after the main test thread observes it at num_threads.
- // Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out.
- var i: usize = 0;
- while (self.count != 0) : (i +%= 1) {
- if (i < 10) {
- self.cond.timedWait(&self.mutex, 1) catch {};
- } else {
- self.cond.wait(&self.mutex);
- }
- }
- }
- };
-
- var broadcast_test = BroadcastTest{};
- for (&broadcast_test.threads) |*t| {
- t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test});
- }
-
- {
- broadcast_test.mutex.lock();
- defer broadcast_test.mutex.unlock();
-
- // Wait for all the broadcast threads to spawn.
- // timedWait() to detect any potential deadlocks.
- while (broadcast_test.count != num_threads) {
- broadcast_test.completed.timedWait(
- &broadcast_test.mutex,
- 1 * std.time.ns_per_s,
- ) catch {};
- }
-
- // Reset the counter and wake all the threads to exit.
- broadcast_test.count = 0;
- broadcast_test.cond.broadcast();
- }
-
- for (broadcast_test.threads) |t| {
- t.join();
- }
-}
-
-test "broadcasting - wake all threads" {
- // Tests issue #12877
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- var num_runs: usize = 1;
- const num_threads = 10;
-
- while (num_runs > 0) : (num_runs -= 1) {
- const BroadcastTest = struct {
- mutex: Mutex = .{},
- cond: Condition = .{},
- completed: Condition = .{},
- count: usize = 0,
- thread_id_to_wake: usize = 0,
- threads: [num_threads]std.Thread = undefined,
- wakeups: usize = 0,
-
- fn run(self: *@This(), thread_id: usize) void {
- self.mutex.lock();
- defer self.mutex.unlock();
-
- // The last broadcast thread to start tells the main test thread it's completed.
- self.count += 1;
- if (self.count == num_threads) {
- self.completed.signal();
- }
-
- while (self.thread_id_to_wake != thread_id) {
- self.cond.timedWait(&self.mutex, 1 * std.time.ns_per_s) catch {};
- self.wakeups += 1;
- }
- if (self.thread_id_to_wake <= num_threads) {
- // Signal next thread to wake up.
- self.thread_id_to_wake += 1;
- self.cond.broadcast();
- }
- }
- };
-
- var broadcast_test = BroadcastTest{};
- var thread_id: usize = 1;
- for (&broadcast_test.threads) |*t| {
- t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{ &broadcast_test, thread_id });
- thread_id += 1;
- }
-
- {
- broadcast_test.mutex.lock();
- defer broadcast_test.mutex.unlock();
-
- // Wait for all the broadcast threads to spawn.
- // timedWait() to detect any potential deadlocks.
- while (broadcast_test.count != num_threads) {
- broadcast_test.completed.timedWait(
- &broadcast_test.mutex,
- 1 * std.time.ns_per_s,
- ) catch {};
- }
-
- // Signal thread 1 to wake up
- broadcast_test.thread_id_to_wake = 1;
- broadcast_test.cond.broadcast();
- }
-
- for (broadcast_test.threads) |t| {
- t.join();
- }
- }
-}
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig
@@ -1,1063 +0,0 @@
-//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a
-//! 32bit memory address as hints.
-//!
-//! Blocking a thread is acknowledged only if the 32bit memory address is equal
-//! to a given value. This check helps avoid block/unblock deadlocks which
-//! occur if a `wake()` happens before a `wait()`.
-//!
-//! Using Futex, other Thread synchronization primitives can be built which
-//! efficiently wait for cross-thread events or signals.
-
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const Futex = @This();
-const windows = std.os.windows;
-const linux = std.os.linux;
-const c = std.c;
-
-const assert = std.debug.assert;
-const testing = std.testing;
-const atomic = std.atomic;
-
-/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
-/// - The value at `ptr` is no longer equal to `expect` and `wake()` is called on the same address.
-/// - The caller is unblocked spuriously ("at random").
-///
-/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
-/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
-pub fn wait(ptr: *const atomic.Value(u32), expect: u32) void {
- @branchHint(.cold);
-
- Impl.wait(ptr, expect, null) catch |err| switch (err) {
- error.Timeout => unreachable, // null timeout meant to wait forever
- };
-}
-
-/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
-/// - The value at `ptr` is no longer equal to `expect`.
-/// - The caller is unblocked by a matching `wake()`.
-/// - The caller is unblocked spuriously ("at random").
-/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
-///
-/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
-/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
-pub fn timedWait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
- @branchHint(.cold);
-
- // Avoid calling into the OS for no-op timeouts.
- if (timeout_ns == 0) {
- if (ptr.load(.seq_cst) != expect) return;
- return error.Timeout;
- }
-
- return Impl.wait(ptr, expect, timeout_ns);
-}
-
-/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`.
-pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- @branchHint(.cold);
-
- // Avoid calling into the OS if there's nothing to wake up.
- if (max_waiters == 0) {
- return;
- }
-
- Impl.wake(ptr, max_waiters);
-}
-
-const Impl = if (builtin.single_threaded)
- SingleThreadedImpl
-else if (builtin.os.tag == .windows)
- WindowsImpl
-else if (builtin.os.tag.isDarwin())
- DarwinImpl
-else if (builtin.os.tag == .linux)
- LinuxImpl
-else if (builtin.os.tag == .freebsd)
- FreebsdImpl
-else if (builtin.os.tag == .openbsd)
- OpenbsdImpl
-else if (builtin.os.tag == .dragonfly)
- DragonflyImpl
-else if (builtin.target.cpu.arch.isWasm())
- WasmImpl
-else if (std.Thread.use_pthreads)
- PosixImpl
-else
- UnsupportedImpl;
-
-/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated.
-/// So instead, we @compileError() on the methods themselves for platforms which don't support futex.
-const UnsupportedImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- return unsupported(.{ ptr, expect, timeout });
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- return unsupported(.{ ptr, max_waiters });
- }
-
- fn unsupported(unused: anytype) noreturn {
- _ = unused;
- @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag));
- }
-};
-
-const SingleThreadedImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- if (ptr.raw != expect) {
- return;
- }
-
- // There are no threads to wake us up.
- // So if we wait without a timeout we would never wake up.
- const delay = timeout orelse {
- unreachable; // deadlock detected
- };
-
- _ = delay;
- return error.Timeout;
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- // There are no other threads to possibly wake up
- _ = ptr;
- _ = max_waiters;
- }
-};
-
-// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll
-// as it's generally already a linked target and is autoloaded into all processes anyway.
-const WindowsImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var timeout_value: windows.LARGE_INTEGER = undefined;
- var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
-
- // NTDLL functions work with time in units of 100 nanoseconds.
- // Positive values are absolute deadlines while negative values are relative durations.
- if (timeout) |delay| {
- timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
- timeout_value = -timeout_value;
- timeout_ptr = &timeout_value;
- }
-
- const rc = windows.ntdll.RtlWaitOnAddress(
- ptr,
- &expect,
- @sizeOf(@TypeOf(expect)),
- timeout_ptr,
- );
-
- switch (rc) {
- .SUCCESS => {},
- .TIMEOUT => {
- assert(timeout != null);
- return error.Timeout;
- },
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const address: ?*const anyopaque = ptr;
- assert(max_waiters != 0);
-
- switch (max_waiters) {
- 1 => windows.ntdll.RtlWakeAddressSingle(address),
- else => windows.ntdll.RtlWakeAddressAll(address),
- }
- }
-};
-
-const DarwinImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
- // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
- //
- // This XNU version appears to correspond to 11.0.1:
- // https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
- //
- // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
- // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
- const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11;
-
- var timeout_ns: u64 = 0;
- if (timeout) |delay| {
- assert(delay != 0); // handled by timedWait()
- timeout_ns = delay;
- }
-
- // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of
- // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users
- // should handle spurious wakeups), but we need to remember that we did so, so that
- // we don't return `Timeout` incorrectly. If that happens, we set this variable to
- // true so that we we know to ignore the ETIMEDOUT result.
- var timeout_overflowed = false;
-
- const addr: *const anyopaque = ptr;
- const flags: c.UL = .{
- .op = .COMPARE_AND_WAIT,
- .NO_ERRNO = true,
- };
- const status = blk: {
- if (supports_ulock_wait2) {
- break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
- }
-
- const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
- timeout_overflowed = true;
- break :overflow std.math.maxInt(u32);
- };
-
- break :blk c.__ulock_wait(flags, addr, expect, timeout_us);
- };
-
- if (status >= 0) return;
- switch (@as(c.E, @enumFromInt(-status))) {
- // Wait was interrupted by the OS or other spurious signalling.
- .INTR => {},
- // Address of the futex was paged out. This is unlikely, but possible in theory, and
- // pthread/libdispatch on darwin bother to handle it. In this case we'll return
- // without waiting, but the caller should retry anyway.
- .FAULT => {},
- // Only report Timeout if we didn't have to cap the timeout
- .TIMEDOUT => {
- assert(timeout != null);
- if (!timeout_overflowed) return error.Timeout;
- },
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const flags: c.UL = .{
- .op = .COMPARE_AND_WAIT,
- .NO_ERRNO = true,
- .WAKE_ALL = max_waiters > 1,
- };
-
- while (true) {
- const addr: *const anyopaque = ptr;
- const status = c.__ulock_wake(flags, addr, 0);
-
- if (status >= 0) return;
- switch (@as(c.E, @enumFromInt(-status))) {
- .INTR => continue, // spurious wake()
- .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
- .NOENT => return, // nothing was woken up
- .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
- else => unreachable,
- }
- }
- }
-};
-
-// https://man7.org/linux/man-pages/man2/futex.2.html
-const LinuxImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var ts: linux.timespec = undefined;
- if (timeout) |timeout_ns| {
- ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
- ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
- }
-
- const rc = linux.futex_4arg(
- &ptr.raw,
- .{ .cmd = .WAIT, .private = true },
- expect,
- if (timeout != null) &ts else null,
- );
-
- switch (linux.errno(rc)) {
- .SUCCESS => {}, // notified by `wake()`
- .INTR => {}, // spurious wakeup
- .AGAIN => {}, // ptr.* != expect
- .TIMEDOUT => {
- assert(timeout != null);
- return error.Timeout;
- },
- .INVAL => {}, // possibly timeout overflow
- .FAULT => unreachable, // ptr was invalid
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = linux.futex_3arg(
- &ptr.raw,
- .{ .cmd = .WAKE, .private = true },
- @min(max_waiters, std.math.maxInt(i32)),
- );
-
- switch (linux.errno(rc)) {
- .SUCCESS => {}, // successful wake up
- .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
- .FAULT => {}, // pointer became invalid while doing the wake
- else => unreachable,
- }
- }
-};
-
-// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1
-const FreebsdImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var tm_size: usize = 0;
- var tm: c._umtx_time = undefined;
- var tm_ptr: ?*const c._umtx_time = null;
-
- if (timeout) |timeout_ns| {
- tm_ptr = &tm;
- tm_size = @sizeOf(@TypeOf(tm));
-
- tm.flags = 0; // use relative time not UMTX_ABSTIME
- tm.clockid = .MONOTONIC;
- tm.timeout.sec = @as(@TypeOf(tm.timeout.sec), @intCast(timeout_ns / std.time.ns_per_s));
- tm.timeout.nsec = @as(@TypeOf(tm.timeout.nsec), @intCast(timeout_ns % std.time.ns_per_s));
- }
-
- const rc = c._umtx_op(
- @intFromPtr(&ptr.raw),
- @intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE),
- @as(c_ulong, expect),
- tm_size,
- @intFromPtr(tm_ptr),
- );
-
- switch (std.posix.errno(rc)) {
- .SUCCESS => {},
- .FAULT => unreachable, // one of the args points to invalid memory
- .INVAL => unreachable, // arguments should be correct
- .TIMEDOUT => {
- assert(timeout != null);
- return error.Timeout;
- },
- .INTR => {}, // spurious wake
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = c._umtx_op(
- @intFromPtr(&ptr.raw),
- @intFromEnum(c.UMTX_OP.WAKE_PRIVATE),
- @as(c_ulong, max_waiters),
- 0, // there is no timeout struct
- 0, // there is no timeout struct pointer
- );
-
- switch (std.posix.errno(rc)) {
- .SUCCESS => {},
- .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
- .INVAL => unreachable, // arguments should be correct
- else => unreachable,
- }
- }
-};
-
-// https://man.openbsd.org/futex.2
-const OpenbsdImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var ts: c.timespec = undefined;
- if (timeout) |timeout_ns| {
- ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
- ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
- }
-
- const rc = c.futex(
- @as(*const volatile u32, @ptrCast(&ptr.raw)),
- c.FUTEX.WAIT | c.FUTEX.PRIVATE_FLAG,
- @as(c_int, @bitCast(expect)),
- if (timeout != null) &ts else null,
- null, // FUTEX.WAIT takes no requeue address
- );
-
- switch (std.posix.errno(rc)) {
- .SUCCESS => {}, // woken up by wake
- .NOSYS => unreachable, // the futex operation shouldn't be invalid
- .FAULT => unreachable, // ptr was invalid
- .AGAIN => {}, // ptr != expect
- .INVAL => unreachable, // invalid timeout
- .TIMEDOUT => {
- assert(timeout != null);
- return error.Timeout;
- },
- .INTR => {}, // spurious wake from signal
- .CANCELED => {}, // spurious wake from signal with SA_RESTART
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = c.futex(
- @as(*const volatile u32, @ptrCast(&ptr.raw)),
- c.FUTEX.WAKE | c.FUTEX.PRIVATE_FLAG,
- std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
- null, // FUTEX.WAKE takes no timeout ptr
- null, // FUTEX.WAKE takes no requeue address
- );
-
- // returns number of threads woken up.
- assert(rc >= 0);
- }
-};
-
-// https://man.dragonflybsd.org/?command=umtx§ion=2
-const DragonflyImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake.
- // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead.
- var timeout_us: c_int = 0;
- var timeout_overflowed = false;
- var sleep_timer: std.time.Timer = undefined;
-
- if (timeout) |delay| {
- assert(delay != 0); // handled by timedWait().
- timeout_us = std.math.cast(c_int, delay / std.time.ns_per_us) orelse blk: {
- timeout_overflowed = true;
- break :blk std.math.maxInt(c_int);
- };
-
- // Only need to record the start time if we can provide somewhat accurate error.Timeout's
- if (!timeout_overflowed) {
- sleep_timer = std.time.Timer.start() catch unreachable;
- }
- }
-
- const value = @as(c_int, @bitCast(expect));
- const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
- const rc = c.umtx_sleep(addr, value, timeout_us);
-
- switch (std.posix.errno(rc)) {
- .SUCCESS => {},
- .BUSY => {}, // ptr != expect
- .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
- if (timeout) |timeout_ns| {
- // Report error.Timeout only if we know the timeout duration has passed.
- // If not, there's not much choice other than treating it as a spurious wake.
- if (!timeout_overflowed and sleep_timer.read() >= timeout_ns) {
- return error.Timeout;
- }
- }
- },
- .INTR => {}, // spurious wake
- .INVAL => unreachable, // invalid timeout
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- // A count of zero means wake all waiters.
- assert(max_waiters != 0);
- const to_wake = std.math.cast(c_int, max_waiters) orelse 0;
-
- // https://man.dragonflybsd.org/?command=umtx§ion=2
- // > umtx_wakeup() will generally return 0 unless the address is bad.
- // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
- const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
- _ = c.umtx_wakeup(addr, to_wake);
- }
-};
-
-const WasmImpl = struct {
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
-
- const to: i64 = if (timeout) |to| @intCast(to) else -1;
- const result = asm volatile (
- \\local.get %[ptr]
- \\local.get %[expected]
- \\local.get %[timeout]
- \\memory.atomic.wait32 0
- \\local.set %[ret]
- : [ret] "=r" (-> u32),
- : [ptr] "r" (&ptr.raw),
- [expected] "r" (@as(i32, @bitCast(expect))),
- [timeout] "r" (to),
- );
- switch (result) {
- 0 => {}, // ok
- 1 => {}, // expected =! loaded
- 2 => return error.Timeout,
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
-
- assert(max_waiters != 0);
- const woken_count = asm volatile (
- \\local.get %[ptr]
- \\local.get %[waiters]
- \\memory.atomic.notify 0
- \\local.set %[ret]
- : [ret] "=r" (-> u32),
- : [ptr] "r" (&ptr.raw),
- [waiters] "r" (max_waiters),
- );
- _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
- }
-};
-
-/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread:
-/// https://code.woboq.org/linux/linux/kernel/futex.c.html
-/// https://go.dev/src/runtime/sema.go
-const PosixImpl = struct {
- const Event = struct {
- cond: c.pthread_cond_t,
- mutex: c.pthread_mutex_t,
- state: enum { empty, waiting, notified },
-
- fn init(self: *Event) void {
- // Use static init instead of pthread_cond/mutex_init() since this is generally faster.
- self.cond = .{};
- self.mutex = .{};
- self.state = .empty;
- }
-
- fn deinit(self: *Event) void {
- // Some platforms reportedly give EINVAL for statically initialized pthread types.
- const rc = c.pthread_cond_destroy(&self.cond);
- assert(rc == .SUCCESS or rc == .INVAL);
-
- const rm = c.pthread_mutex_destroy(&self.mutex);
- assert(rm == .SUCCESS or rm == .INVAL);
-
- self.* = undefined;
- }
-
- fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
- assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
-
- // Early return if the event was already set.
- if (self.state == .notified) {
- return;
- }
-
- // Compute the absolute timeout if one was specified.
- // POSIX requires that REALTIME is used by default for the pthread timedwait functions.
- // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
- var ts: c.timespec = undefined;
- if (timeout) |timeout_ns| {
- ts = std.posix.clock_gettime(c.CLOCK.REALTIME) catch unreachable;
- ts.sec +|= @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
- ts.nsec += @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
-
- if (ts.nsec >= std.time.ns_per_s) {
- ts.sec +|= 1;
- ts.nsec -= std.time.ns_per_s;
- }
- }
-
- // Start waiting on the event - there can be only one thread waiting.
- assert(self.state == .empty);
- self.state = .waiting;
-
- while (true) {
- // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
- const rc = blk: {
- if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex);
- break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
- };
-
- // After waking up, check if the event was set.
- if (self.state == .notified) {
- return;
- }
-
- assert(self.state == .waiting);
- switch (rc) {
- .SUCCESS => {},
- .TIMEDOUT => {
- // If timed out, reset the event to avoid the set() thread doing an unnecessary signal().
- self.state = .empty;
- return error.Timeout;
- },
- .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid
- .PERM => unreachable, // mutex is locked when cond_*wait() functions are called
- else => unreachable,
- }
- }
- }
-
- fn set(self: *Event) void {
- assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
-
- // Make sure that multiple calls to set() were not done on the same Event.
- const old_state = self.state;
- assert(old_state != .notified);
-
- // Mark the event as set and wake up the waiting thread if there was one.
- // This must be done while the mutex as the wait() thread could deallocate
- // the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
- self.state = .notified;
- if (old_state == .waiting) {
- assert(c.pthread_cond_signal(&self.cond) == .SUCCESS);
- }
- }
- };
-
- const Treap = std.Treap(usize, std.math.order);
- const Waiter = struct {
- node: Treap.Node,
- prev: ?*Waiter,
- next: ?*Waiter,
- tail: ?*Waiter,
- is_queued: bool,
- event: Event,
- };
-
- // An unordered set of Waiters
- const WaitList = struct {
- top: ?*Waiter = null,
- len: usize = 0,
-
- fn push(self: *WaitList, waiter: *Waiter) void {
- waiter.next = self.top;
- self.top = waiter;
- self.len += 1;
- }
-
- fn pop(self: *WaitList) ?*Waiter {
- const waiter = self.top orelse return null;
- self.top = waiter.next;
- self.len -= 1;
- return waiter;
- }
- };
-
- const WaitQueue = struct {
- fn insert(treap: *Treap, address: usize, waiter: *Waiter) void {
- // prepare the waiter to be inserted.
- waiter.next = null;
- waiter.is_queued = true;
-
- // Find the wait queue entry associated with the address.
- // If there isn't a wait queue on the address, this waiter creates the queue.
- var entry = treap.getEntryFor(address);
- const entry_node = entry.node orelse {
- waiter.prev = null;
- waiter.tail = waiter;
- entry.set(&waiter.node);
- return;
- };
-
- // There's a wait queue on the address; get the queue head and tail.
- const head: *Waiter = @fieldParentPtr("node", entry_node);
- const tail = head.tail orelse unreachable;
-
- // Push the waiter to the tail by replacing it and linking to the previous tail.
- head.tail = waiter;
- tail.next = waiter;
- waiter.prev = tail;
- }
-
- fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList {
- // Find the wait queue associated with this address and get the head/tail if any.
- var entry = treap.getEntryFor(address);
- var queue_head: ?*Waiter = if (entry.node) |node| @fieldParentPtr("node", node) else null;
- const queue_tail = if (queue_head) |head| head.tail else null;
-
- // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well.
- defer entry.set(blk: {
- const new_head = queue_head orelse break :blk null;
- new_head.tail = queue_tail;
- break :blk &new_head.node;
- });
-
- var removed = WaitList{};
- while (removed.len < max_waiters) {
- // dequeue and collect waiters from their wait queue.
- const waiter = queue_head orelse break;
- queue_head = waiter.next;
- removed.push(waiter);
-
- // When dequeueing, we must mark is_queued as false.
- // This ensures that a waiter which calls tryRemove() returns false.
- assert(waiter.is_queued);
- waiter.is_queued = false;
- }
-
- return removed;
- }
-
- fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool {
- if (!waiter.is_queued) {
- return false;
- }
-
- queue_remove: {
- // Find the wait queue associated with the address.
- var entry = blk: {
- // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup.
- if (waiter.prev == null) {
- assert(waiter.node.key == address);
- break :blk treap.getEntryForExisting(&waiter.node);
- }
- break :blk treap.getEntryFor(address);
- };
-
- // The queue head and tail must exist if we're removing a queued waiter.
- const head: *Waiter = @fieldParentPtr("node", entry.node orelse unreachable);
- const tail = head.tail orelse unreachable;
-
- // A waiter with a previous link is never the head of the queue.
- if (waiter.prev) |prev| {
- assert(waiter != head);
- prev.next = waiter.next;
-
- // A waiter with both a previous and next link is in the middle.
- // We only need to update the surrounding waiter's links to remove it.
- if (waiter.next) |next| {
- assert(waiter != tail);
- next.prev = waiter.prev;
- break :queue_remove;
- }
-
- // A waiter with a previous but no next link means it's the tail of the queue.
- // In that case, we need to update the head's tail reference.
- assert(waiter == tail);
- head.tail = waiter.prev;
- break :queue_remove;
- }
-
- // A waiter with no previous link means it's the queue head of queue.
- // We must replace (or remove) the head waiter reference in the treap.
- assert(waiter == head);
- entry.set(blk: {
- const new_head = waiter.next orelse break :blk null;
- new_head.tail = head.tail;
- break :blk &new_head.node;
- });
- }
-
- // Mark the waiter as successfully removed.
- waiter.is_queued = false;
- return true;
- }
- };
-
- const Bucket = struct {
- mutex: c.pthread_mutex_t align(atomic.cache_line) = .{},
- pending: atomic.Value(usize) = atomic.Value(usize).init(0),
- treap: Treap = .{},
-
- // Global array of buckets that addresses map to.
- // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing.
- var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize);
-
- // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353
- fn from(address: usize) *Bucket {
- // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio.
- // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array
- // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers).
- const max_multiplier_bits = @bitSizeOf(usize);
- const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits);
-
- const max_bucket_bits = @ctz(buckets.len);
- comptime assert(std.math.isPowerOfTwo(buckets.len));
-
- const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits);
- return &buckets[index];
- }
- };
-
- const Address = struct {
- fn from(ptr: *const atomic.Value(u32)) usize {
- // Get the alignment of the pointer.
- const alignment = @alignOf(atomic.Value(u32));
- comptime assert(std.math.isPowerOfTwo(alignment));
-
- // Make sure the pointer is aligned,
- // then cut off the zero bits from the alignment to get the unique address.
- const addr = @intFromPtr(ptr);
- assert(addr & (alignment - 1) == 0);
- return addr >> @ctz(@as(usize, alignment));
- }
- };
-
- fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- const address = Address.from(ptr);
- const bucket = Bucket.from(address);
-
- // Announce that there's a waiter in the bucket before checking the ptr/expect condition.
- // If the announcement is reordered after the ptr check, the waiter could deadlock:
- //
- // - T1: checks ptr == expect which is true
- // - T2: updates ptr to != expect
- // - T2: does Futex.wake(), sees no pending waiters, exits
- // - T1: bumps pending waiters (was reordered after the ptr == expect check)
- // - T1: goes to sleep and misses both the ptr change and T2's wake up
- //
- // acquire barrier to ensure the announcement happens before the ptr check below.
- var pending = bucket.pending.fetchAdd(1, .acquire);
- assert(pending < std.math.maxInt(usize));
-
- // If the wait gets canceled, remove the pending count we previously added.
- // This is done outside the mutex lock to keep the critical section short in case of contention.
- var canceled = false;
- defer if (canceled) {
- pending = bucket.pending.fetchSub(1, .monotonic);
- assert(pending > 0);
- };
-
- var waiter: Waiter = undefined;
- {
- assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
-
- canceled = ptr.load(.monotonic) != expect;
- if (canceled) {
- return;
- }
-
- waiter.event.init();
- WaitQueue.insert(&bucket.treap, address, &waiter);
- }
-
- defer {
- assert(!waiter.is_queued);
- waiter.event.deinit();
- }
-
- waiter.event.wait(timeout) catch {
- // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up.
- // We must wait until the event is set as that's a signal that the wake() thread won't access the waiter memory anymore.
- // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
- defer if (!canceled) waiter.event.wait(null) catch unreachable;
-
- assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
-
- canceled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
- if (canceled) {
- return error.Timeout;
- }
- };
- }
-
- fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const address = Address.from(ptr);
- const bucket = Bucket.from(address);
-
- // Quick check if there's even anything to wake up.
- // The change to the ptr's value must happen before we check for pending waiters.
- // If not, the wake() thread could miss a sleeping waiter and have it deadlock:
- //
- // - T2: p = has pending waiters (reordered before the ptr update)
- // - T1: bump pending waiters
- // - T1: if ptr == expected: sleep()
- // - T2: update ptr != expected
- // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping)
- //
- // What we really want here is a Release load, but that doesn't exist under the C11 memory model.
- // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
- // LLVM lowers the fetchAdd(0, .release) into an mfence+load which avoids gaining ownership of the cache-line.
- if (bucket.pending.fetchAdd(0, .release) == 0) {
- return;
- }
-
- // Keep a list of all the waiters notified and wake then up outside the mutex critical section.
- var notified = WaitList{};
- defer if (notified.len > 0) {
- const pending = bucket.pending.fetchSub(notified.len, .monotonic);
- assert(pending >= notified.len);
-
- while (notified.pop()) |waiter| {
- assert(!waiter.is_queued);
- waiter.event.set();
- }
- };
-
- assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
-
- // Another pending check again to avoid the WaitQueue lookup if not necessary.
- if (bucket.pending.load(.monotonic) > 0) {
- notified = WaitQueue.remove(&bucket.treap, address, max_waiters);
- }
- }
-};
-
-test "smoke test" {
- var value = atomic.Value(u32).init(0);
-
- // Try waits with invalid values.
- Futex.wait(&value, 0xdeadbeef);
- Futex.timedWait(&value, 0xdeadbeef, 0) catch {};
-
- // Try timeout waits.
- try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, 0));
- try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, std.time.ns_per_ms));
-
- // Try wakes
- Futex.wake(&value, 0);
- Futex.wake(&value, 1);
- Futex.wake(&value, std.math.maxInt(u32));
-}
-
-test "signaling" {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 4;
- const num_iterations = 4;
-
- const Paddle = struct {
- value: atomic.Value(u32) = atomic.Value(u32).init(0),
- current: u32 = 0,
-
- fn hit(self: *@This()) void {
- _ = self.value.fetchAdd(1, .release);
- Futex.wake(&self.value, 1);
- }
-
- fn run(self: *@This(), hit_to: *@This()) !void {
- while (self.current < num_iterations) {
- // Wait for the value to change from hit()
- var new_value: u32 = undefined;
- while (true) {
- new_value = self.value.load(.acquire);
- if (new_value != self.current) break;
- Futex.wait(&self.value, self.current);
- }
-
- // change the internal "current" value
- try testing.expectEqual(new_value, self.current + 1);
- self.current = new_value;
-
- // hit the next paddle
- hit_to.hit();
- }
- }
- };
-
- var paddles = [_]Paddle{.{}} ** num_threads;
- var threads = [_]std.Thread{undefined} ** num_threads;
-
- // Create a circle of paddles which hit each other
- for (&threads, 0..) |*t, i| {
- const paddle = &paddles[i];
- const hit_to = &paddles[(i + 1) % paddles.len];
- t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
- }
-
- // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
- paddles[0].hit();
- for (threads) |t| t.join();
- for (paddles) |p| try testing.expectEqual(p.current, num_iterations);
-}
-
-test "broadcasting" {
- // This test requires spawning threads
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 4;
- const num_iterations = 4;
-
- const Barrier = struct {
- count: atomic.Value(u32) = atomic.Value(u32).init(num_threads),
- futex: atomic.Value(u32) = atomic.Value(u32).init(0),
-
- fn wait(self: *@This()) !void {
- // Decrement the counter.
- // Release ensures stuff before this barrier.wait() happens before the last one.
- // Acquire for the last counter ensures stuff before previous barrier.wait()s happened before it.
- const count = self.count.fetchSub(1, .acq_rel);
- try testing.expect(count <= num_threads);
- try testing.expect(count > 0);
-
- // First counter to reach zero wakes all other threads.
- // Release on futex update ensures stuff before all barrier.wait()'s happens before they all return.
- if (count - 1 == 0) {
- self.futex.store(1, .release);
- Futex.wake(&self.futex, num_threads - 1);
- return;
- }
-
- // Other threads wait until last counter wakes them up.
- // Acquire on futex synchronizes with last barrier count to ensure stuff before all barrier.wait()'s happen before us.
- while (self.futex.load(.acquire) == 0) {
- Futex.wait(&self.futex, 0);
- }
- }
- };
-
- const Broadcast = struct {
- barriers: [num_iterations]Barrier = [_]Barrier{.{}} ** num_iterations,
- threads: [num_threads]std.Thread = undefined,
-
- fn run(self: *@This()) !void {
- for (&self.barriers) |*barrier| {
- try barrier.wait();
- }
- }
- };
-
- var broadcast = Broadcast{};
- for (&broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
- for (broadcast.threads) |t| t.join();
-}
-
-/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
-///
-/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
-/// when used in a loop which is often required due to the possibility of spurious wakeups.
-///
-/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
-/// to Futex timedWait() can block for and report more accurate error.Timeouts.
-pub const Deadline = struct {
- timeout: ?u64,
- started: std.time.Timer,
-
- /// Create the deadline to expire after the given amount of time in nanoseconds passes.
- /// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
- pub fn init(expires_in_ns: ?u64) Deadline {
- var deadline: Deadline = undefined;
- deadline.timeout = expires_in_ns;
-
- // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
- if (deadline.timeout != null) {
- deadline.started = std.time.Timer.start() catch unreachable;
- }
-
- return deadline;
- }
-
- /// Wait until either:
- /// - the `ptr`'s value changes from `expect`.
- /// - `Futex.wake()` is called on the `ptr`.
- /// - A spurious wake occurs.
- /// - The deadline expires; In which case `error.Timeout` is returned.
- pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void {
- @branchHint(.cold);
-
- // Check if we actually have a timeout to wait until.
- // If not just wait "forever".
- const timeout_ns = self.timeout orelse {
- return Futex.wait(ptr, expect);
- };
-
- // Get how much time has passed since we started waiting
- // then subtract that from the init() timeout to get how much longer to wait.
- // Use overflow to detect when we've been waiting longer than the init() timeout.
- const elapsed_ns = self.started.read();
- const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
- return Futex.timedWait(ptr, expect, until_timeout_ns);
- }
-};
-
-test "Deadline" {
- var deadline = Deadline.init(100 * std.time.ns_per_ms);
- var futex_word = atomic.Value(u32).init(0);
-
- while (true) {
- deadline.wait(&futex_word, 0) catch break;
- }
-}
diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig
@@ -1,367 +0,0 @@
-//! Mutex is a synchronization primitive which enforces atomic access to a
-//! shared region of code known as the "critical section".
-//!
-//! It does this by blocking ensuring only one thread is in the critical
-//! section at any given point in time by blocking the others.
-//!
-//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large.
-//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it.
-
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const Mutex = @This();
-
-const assert = std.debug.assert;
-const testing = std.testing;
-const Thread = std.Thread;
-const Futex = Thread.Futex;
-
-impl: Impl = .{},
-
-pub const Recursive = @import("Mutex/Recursive.zig");
-
-/// Tries to acquire the mutex without blocking the caller's thread.
-/// Returns `false` if the calling thread would have to block to acquire it.
-/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it.
-pub fn tryLock(self: *Mutex) bool {
- return self.impl.tryLock();
-}
-
-/// Acquires the mutex, blocking the caller's thread until it can.
-/// It is undefined behavior if the mutex is already held by the caller's thread.
-/// Once acquired, call `unlock()` on the Mutex to release it.
-pub fn lock(self: *Mutex) void {
- self.impl.lock();
-}
-
-/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`.
-/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from.
-pub fn unlock(self: *Mutex) void {
- self.impl.unlock();
-}
-
-const Impl = if (builtin.mode == .Debug and !builtin.single_threaded)
- DebugImpl
-else
- ReleaseImpl;
-
-const ReleaseImpl = Impl: {
- if (builtin.single_threaded) break :Impl SingleThreadedImpl;
- if (builtin.os.tag == .windows) break :Impl WindowsImpl;
- if (builtin.os.tag.isDarwin()) break :Impl DarwinImpl;
-
- if (builtin.target.os.tag == .linux or
- builtin.target.os.tag == .freebsd or
- builtin.target.os.tag == .openbsd or
- builtin.target.os.tag == .dragonfly or
- builtin.target.cpu.arch.isWasm())
- {
- // Futex is the system's synchronization primitive; use that.
- break :Impl FutexImpl;
- }
-
- if (std.Thread.use_pthreads) {
- // This system doesn't have a futex primitive, so `std.Thread.Futex` is using `PosixImpl`,
- // which implements futex *on top of* pthread mutexes and conditions. Therefore, instead
- // of going through that long inefficient path, just use pthread mutex directly.
- break :Impl PosixImpl;
- }
-
- break :Impl FutexImpl;
-};
-
-const DebugImpl = struct {
- locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked.
- impl: ReleaseImpl = .{},
-
- inline fn tryLock(self: *@This()) bool {
- const locking = self.impl.tryLock();
- if (locking) {
- self.locking_thread.store(Thread.getCurrentId(), .unordered);
- }
- return locking;
- }
-
- inline fn lock(self: *@This()) void {
- const current_id = Thread.getCurrentId();
- if (self.locking_thread.load(.unordered) == current_id and current_id != 0) {
- @panic("Deadlock detected");
- }
- self.impl.lock();
- self.locking_thread.store(current_id, .unordered);
- }
-
- inline fn unlock(self: *@This()) void {
- assert(self.locking_thread.load(.unordered) == Thread.getCurrentId());
- self.locking_thread.store(0, .unordered);
- self.impl.unlock();
- }
-};
-
-const SingleThreadedImpl = struct {
- is_locked: bool = false,
-
- fn tryLock(self: *@This()) bool {
- if (self.is_locked) return false;
- self.is_locked = true;
- return true;
- }
-
- fn lock(self: *@This()) void {
- if (!self.tryLock()) {
- unreachable; // deadlock detected
- }
- }
-
- fn unlock(self: *@This()) void {
- assert(self.is_locked);
- self.is_locked = false;
- }
-};
-
-/// SRWLOCK on windows is almost always faster than Futex solution.
-/// It also implements an efficient Condition with requeue support for us.
-const WindowsImpl = struct {
- srwlock: windows.SRWLOCK = .{},
-
- fn tryLock(self: *@This()) bool {
- return windows.ntdll.RtlTryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
- }
-
- fn lock(self: *@This()) void {
- windows.ntdll.RtlAcquireSRWLockExclusive(&self.srwlock);
- }
-
- fn unlock(self: *@This()) void {
- windows.ntdll.RtlReleaseSRWLockExclusive(&self.srwlock);
- }
-
- const windows = std.os.windows;
-};
-
-/// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
-const DarwinImpl = struct {
- oul: c.os_unfair_lock = .{},
-
- fn tryLock(self: *@This()) bool {
- return c.os_unfair_lock_trylock(&self.oul);
- }
-
- fn lock(self: *@This()) void {
- c.os_unfair_lock_lock(&self.oul);
- }
-
- fn unlock(self: *@This()) void {
- c.os_unfair_lock_unlock(&self.oul);
- }
-
- const c = std.c;
-};
-
-const FutexImpl = struct {
- state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
-
- const unlocked: u32 = 0b00;
- const locked: u32 = 0b01;
- const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
-
- fn lock(self: *@This()) void {
- if (!self.tryLock())
- self.lockSlow();
- }
-
- fn tryLock(self: *@This()) bool {
- // On x86, use `lock bts` instead of `lock cmpxchg` as:
- // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
- // - `lock bts` is smaller instruction-wise which makes it better for inlining
- if (builtin.target.cpu.arch.isX86()) {
- const locked_bit = @ctz(locked);
- return self.state.bitSet(locked_bit, .acquire) == 0;
- }
-
- // Acquire barrier ensures grabbing the lock happens before the critical section
- // and that the previous lock holder's critical section happens before we grab the lock.
- return self.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
- }
-
- fn lockSlow(self: *@This()) void {
- @branchHint(.cold);
-
- // Avoid doing an atomic swap below if we already know the state is contended.
- // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
- if (self.state.load(.monotonic) == contended) {
- Futex.wait(&self.state, contended);
- }
-
- // Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
- //
- // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
- // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
- // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
- // but this is better than having to wake all waiting threads on mutex unlock.
- //
- // Acquire barrier ensures grabbing the lock happens before the critical section
- // and that the previous lock holder's critical section happens before we grab the lock.
- while (self.state.swap(contended, .acquire) != unlocked) {
- Futex.wait(&self.state, contended);
- }
- }
-
- fn unlock(self: *@This()) void {
- // Unlock the mutex and wake up a waiting thread if any.
- //
- // A waiting thread will acquire with `contended` instead of `locked`
- // which ensures that it wakes up another thread on the next unlock().
- //
- // Release barrier ensures the critical section happens before we let go of the lock
- // and that our critical section happens before the next lock holder grabs the lock.
- const state = self.state.swap(unlocked, .release);
- assert(state != unlocked);
-
- if (state == contended) {
- Futex.wake(&self.state, 1);
- }
- }
-};
-
-const PosixImpl = struct {
- mutex: std.c.pthread_mutex_t = .{},
-
- fn tryLock(impl: *PosixImpl) bool {
- switch (std.c.pthread_mutex_trylock(&impl.mutex)) {
- .SUCCESS => return true,
- .BUSY => return false,
- .INVAL => unreachable, // mutex is initialized correctly
- else => unreachable,
- }
- }
-
- fn lock(impl: *PosixImpl) void {
- switch (std.c.pthread_mutex_lock(&impl.mutex)) {
- .SUCCESS => return,
- .INVAL => unreachable, // mutex is initialized correctly
- .DEADLK => unreachable, // not an error checking mutex
- else => unreachable,
- }
- }
-
- fn unlock(impl: *PosixImpl) void {
- switch (std.c.pthread_mutex_unlock(&impl.mutex)) {
- .SUCCESS => return,
- .INVAL => unreachable, // mutex is initialized correctly
- .PERM => unreachable, // not an error checking mutex
- else => unreachable,
- }
- }
-};
-
-test "smoke test" {
- var mutex = Mutex{};
-
- try testing.expect(mutex.tryLock());
- try testing.expect(!mutex.tryLock());
- mutex.unlock();
-
- mutex.lock();
- try testing.expect(!mutex.tryLock());
- mutex.unlock();
-}
-
-// A counter which is incremented without atomic instructions
-const NonAtomicCounter = struct {
- // direct u128 could maybe use xmm ops on x86 which are atomic
- value: [2]u64 = [_]u64{ 0, 0 },
-
- fn get(self: NonAtomicCounter) u128 {
- return @as(u128, @bitCast(self.value));
- }
-
- fn inc(self: *NonAtomicCounter) void {
- for (@as([2]u64, @bitCast(self.get() + 1)), 0..) |v, i| {
- @as(*volatile u64, @ptrCast(&self.value[i])).* = v;
- }
- }
-};
-
-test "many uncontended" {
- // This test requires spawning threads.
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 4;
- const num_increments = 1000;
-
- const Runner = struct {
- mutex: Mutex = .{},
- thread: Thread = undefined,
- counter: NonAtomicCounter = .{},
-
- fn run(self: *@This()) void {
- var i: usize = num_increments;
- while (i > 0) : (i -= 1) {
- self.mutex.lock();
- defer self.mutex.unlock();
-
- self.counter.inc();
- }
- }
- };
-
- var runners = [_]Runner{.{}} ** num_threads;
- for (&runners) |*r| r.thread = try Thread.spawn(.{}, Runner.run, .{r});
- for (runners) |r| r.thread.join();
- for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments);
-}
-
-test "many contended" {
- // This test requires spawning threads.
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const num_threads = 4;
- const num_increments = 1000;
-
- const Runner = struct {
- mutex: Mutex = .{},
- counter: NonAtomicCounter = .{},
-
- fn run(self: *@This()) void {
- var i: usize = num_increments;
- while (i > 0) : (i -= 1) {
- // Occasionally hint to let another thread run.
- defer if (i % 100 == 0) Thread.yield() catch {};
-
- self.mutex.lock();
- defer self.mutex.unlock();
-
- self.counter.inc();
- }
- }
- };
-
- var runner = Runner{};
-
- var threads: [num_threads]Thread = undefined;
- for (&threads) |*t| t.* = try Thread.spawn(.{}, Runner.run, .{&runner});
- for (threads) |t| t.join();
-
- try testing.expectEqual(runner.counter.get(), num_increments * num_threads);
-}
-
-// https://github.com/ziglang/zig/issues/19295
-//test @This() {
-// var m: Mutex = .{};
-//
-// {
-// m.lock();
-// defer m.unlock();
-// // ... critical section code
-// }
-//
-// if (m.tryLock()) {
-// defer m.unlock();
-// // ... critical section code
-// }
-//}
diff --git a/lib/std/Thread/Mutex/Recursive.zig b/lib/std/Thread/Mutex/Recursive.zig
@@ -7,18 +7,18 @@
//! A recursive mutex is an abstraction layer on top of a regular mutex;
//! therefore it is recommended to use instead `std.Mutex` unless there is a
//! specific reason a recursive mutex is warranted.
+const Recursive = @This();
const std = @import("../../std.zig");
-const Recursive = @This();
-const Mutex = std.Thread.Mutex;
+const Io = std.Io;
const assert = std.debug.assert;
-mutex: Mutex,
+mutex: Io.Mutex,
thread_id: std.Thread.Id,
lock_count: usize,
pub const init: Recursive = .{
- .mutex = .{},
+ .mutex = .init,
.thread_id = invalid_thread_id,
.lock_count = 0,
};
@@ -49,7 +49,7 @@ pub fn tryLock(r: *Recursive) bool {
pub fn lock(r: *Recursive) void {
const current_thread_id = std.Thread.getCurrentId();
if (@atomicLoad(std.Thread.Id, &r.thread_id, .unordered) != current_thread_id) {
- r.mutex.lock();
+ Io.Threaded.mutexLock(&r.mutex);
assert(r.lock_count == 0);
@atomicStore(std.Thread.Id, &r.thread_id, current_thread_id, .unordered);
}
@@ -64,7 +64,7 @@ pub fn unlock(r: *Recursive) void {
r.lock_count -= 1;
if (r.lock_count == 0) {
@atomicStore(std.Thread.Id, &r.thread_id, invalid_thread_id, .unordered);
- r.mutex.unlock();
+ Io.Threaded.mutexUnlock(&r.mutex);
}
}
diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig
@@ -1,386 +0,0 @@
-//! A lock that supports one writer or many readers.
-//! This API is for kernel threads, not evented I/O.
-//! This API requires being initialized at runtime, and initialization
-//! can fail. Once initialized, the core operations cannot fail.
-
-impl: Impl = .{},
-
-const RwLock = @This();
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const assert = std.debug.assert;
-const testing = std.testing;
-
-pub const Impl = if (builtin.single_threaded)
- SingleThreadedRwLock
-else if (std.Thread.use_pthreads)
- PthreadRwLock
-else
- DefaultRwLock;
-
-/// Attempts to obtain exclusive lock ownership.
-/// Returns `true` if the lock is obtained, `false` otherwise.
-pub fn tryLock(rwl: *RwLock) bool {
- return rwl.impl.tryLock();
-}
-
-/// Blocks until exclusive lock ownership is acquired.
-pub fn lock(rwl: *RwLock) void {
- return rwl.impl.lock();
-}
-
-/// Releases a held exclusive lock.
-/// Asserts the lock is held exclusively.
-pub fn unlock(rwl: *RwLock) void {
- return rwl.impl.unlock();
-}
-
-/// Attempts to obtain shared lock ownership.
-/// Returns `true` if the lock is obtained, `false` otherwise.
-pub fn tryLockShared(rwl: *RwLock) bool {
- return rwl.impl.tryLockShared();
-}
-
-/// Obtains shared lock ownership.
-/// Blocks if another thread has exclusive ownership.
-/// May block if another thread is attempting to get exclusive ownership.
-pub fn lockShared(rwl: *RwLock) void {
- return rwl.impl.lockShared();
-}
-
-/// Releases a held shared lock.
-pub fn unlockShared(rwl: *RwLock) void {
- return rwl.impl.unlockShared();
-}
-
-/// Single-threaded applications use this for deadlock checks in
-/// debug mode, and no-ops in release modes.
-pub const SingleThreadedRwLock = struct {
- state: enum { unlocked, locked_exclusive, locked_shared } = .unlocked,
- shared_count: usize = 0,
-
- /// Attempts to obtain exclusive lock ownership.
- /// Returns `true` if the lock is obtained, `false` otherwise.
- pub fn tryLock(rwl: *SingleThreadedRwLock) bool {
- switch (rwl.state) {
- .unlocked => {
- assert(rwl.shared_count == 0);
- rwl.state = .locked_exclusive;
- return true;
- },
- .locked_exclusive, .locked_shared => return false,
- }
- }
-
- /// Blocks until exclusive lock ownership is acquired.
- pub fn lock(rwl: *SingleThreadedRwLock) void {
- assert(rwl.state == .unlocked); // deadlock detected
- assert(rwl.shared_count == 0); // corrupted state detected
- rwl.state = .locked_exclusive;
- }
-
- /// Releases a held exclusive lock.
- /// Asserts the lock is held exclusively.
- pub fn unlock(rwl: *SingleThreadedRwLock) void {
- assert(rwl.state == .locked_exclusive);
- assert(rwl.shared_count == 0); // corrupted state detected
- rwl.state = .unlocked;
- }
-
- /// Attempts to obtain shared lock ownership.
- /// Returns `true` if the lock is obtained, `false` otherwise.
- pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool {
- switch (rwl.state) {
- .unlocked => {
- rwl.state = .locked_shared;
- assert(rwl.shared_count == 0);
- rwl.shared_count = 1;
- return true;
- },
- .locked_shared => {
- rwl.shared_count += 1;
- return true;
- },
- .locked_exclusive => return false,
- }
- }
-
- /// Blocks until shared lock ownership is acquired.
- pub fn lockShared(rwl: *SingleThreadedRwLock) void {
- switch (rwl.state) {
- .unlocked => {
- rwl.state = .locked_shared;
- assert(rwl.shared_count == 0);
- rwl.shared_count = 1;
- },
- .locked_shared => {
- rwl.shared_count += 1;
- },
- .locked_exclusive => unreachable, // deadlock detected
- }
- }
-
- /// Releases a held shared lock.
- pub fn unlockShared(rwl: *SingleThreadedRwLock) void {
- switch (rwl.state) {
- .unlocked => unreachable, // too many calls to `unlockShared`
- .locked_exclusive => unreachable, // exclusively held lock
- .locked_shared => {
- rwl.shared_count -= 1;
- if (rwl.shared_count == 0) {
- rwl.state = .unlocked;
- }
- },
- }
- }
-};
-
-pub const PthreadRwLock = struct {
- rwlock: std.c.pthread_rwlock_t = .{},
-
- pub fn tryLock(rwl: *PthreadRwLock) bool {
- return std.c.pthread_rwlock_trywrlock(&rwl.rwlock) == .SUCCESS;
- }
-
- pub fn lock(rwl: *PthreadRwLock) void {
- const rc = std.c.pthread_rwlock_wrlock(&rwl.rwlock);
- assert(rc == .SUCCESS);
- }
-
- pub fn unlock(rwl: *PthreadRwLock) void {
- const rc = std.c.pthread_rwlock_unlock(&rwl.rwlock);
- assert(rc == .SUCCESS);
- }
-
- pub fn tryLockShared(rwl: *PthreadRwLock) bool {
- return std.c.pthread_rwlock_tryrdlock(&rwl.rwlock) == .SUCCESS;
- }
-
- pub fn lockShared(rwl: *PthreadRwLock) void {
- const rc = std.c.pthread_rwlock_rdlock(&rwl.rwlock);
- assert(rc == .SUCCESS);
- }
-
- pub fn unlockShared(rwl: *PthreadRwLock) void {
- const rc = std.c.pthread_rwlock_unlock(&rwl.rwlock);
- assert(rc == .SUCCESS);
- }
-};
-
-pub const DefaultRwLock = struct {
- state: usize = 0,
- mutex: std.Thread.Mutex = .{},
- semaphore: std.Thread.Semaphore = .{},
-
- const IS_WRITING: usize = 1;
- const WRITER: usize = 1 << 1;
- const READER: usize = 1 << (1 + @bitSizeOf(Count));
- const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(WRITER);
- const READER_MASK: usize = std.math.maxInt(Count) << @ctz(READER);
- const Count = std.meta.Int(.unsigned, @divFloor(@bitSizeOf(usize) - 1, 2));
-
- pub fn tryLock(rwl: *DefaultRwLock) bool {
- if (rwl.mutex.tryLock()) {
- const state = @atomicLoad(usize, &rwl.state, .seq_cst);
- if (state & READER_MASK == 0) {
- _ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .seq_cst);
- return true;
- }
-
- rwl.mutex.unlock();
- }
-
- return false;
- }
-
- pub fn lock(rwl: *DefaultRwLock) void {
- _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .seq_cst);
- rwl.mutex.lock();
-
- const state = @atomicRmw(usize, &rwl.state, .Add, IS_WRITING -% WRITER, .seq_cst);
- if (state & READER_MASK != 0)
- rwl.semaphore.wait();
- }
-
- pub fn unlock(rwl: *DefaultRwLock) void {
- _ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .seq_cst);
- rwl.mutex.unlock();
- }
-
- pub fn tryLockShared(rwl: *DefaultRwLock) bool {
- const state = @atomicLoad(usize, &rwl.state, .seq_cst);
- if (state & (IS_WRITING | WRITER_MASK) == 0) {
- _ = @cmpxchgStrong(
- usize,
- &rwl.state,
- state,
- state + READER,
- .seq_cst,
- .seq_cst,
- ) orelse return true;
- }
-
- if (rwl.mutex.tryLock()) {
- _ = @atomicRmw(usize, &rwl.state, .Add, READER, .seq_cst);
- rwl.mutex.unlock();
- return true;
- }
-
- return false;
- }
-
- pub fn lockShared(rwl: *DefaultRwLock) void {
- var state = @atomicLoad(usize, &rwl.state, .seq_cst);
- while (state & (IS_WRITING | WRITER_MASK) == 0) {
- state = @cmpxchgWeak(
- usize,
- &rwl.state,
- state,
- state + READER,
- .seq_cst,
- .seq_cst,
- ) orelse return;
- }
-
- rwl.mutex.lock();
- _ = @atomicRmw(usize, &rwl.state, .Add, READER, .seq_cst);
- rwl.mutex.unlock();
- }
-
- pub fn unlockShared(rwl: *DefaultRwLock) void {
- const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .seq_cst);
-
- if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
- rwl.semaphore.post();
- }
-};
-
-test "DefaultRwLock - internal state" {
- var rwl = DefaultRwLock{};
-
- // The following failed prior to the fix for Issue #13163,
- // where the WRITER flag was subtracted by the lock method.
-
- rwl.lock();
- rwl.unlock();
- try testing.expectEqual(rwl, DefaultRwLock{});
-}
-
-test "smoke test" {
- var rwl = RwLock{};
-
- rwl.lock();
- try testing.expect(!rwl.tryLock());
- try testing.expect(!rwl.tryLockShared());
- rwl.unlock();
-
- try testing.expect(rwl.tryLock());
- try testing.expect(!rwl.tryLock());
- try testing.expect(!rwl.tryLockShared());
- rwl.unlock();
-
- rwl.lockShared();
- try testing.expect(!rwl.tryLock());
- try testing.expect(rwl.tryLockShared());
- rwl.unlockShared();
- rwl.unlockShared();
-
- try testing.expect(rwl.tryLockShared());
- try testing.expect(!rwl.tryLock());
- try testing.expect(rwl.tryLockShared());
- rwl.unlockShared();
- rwl.unlockShared();
-
- rwl.lock();
- rwl.unlock();
-}
-
-test "concurrent access" {
- if (builtin.single_threaded)
- return;
-
- const num_writers: usize = 2;
- const num_readers: usize = 4;
- const num_writes: usize = 1000;
- const num_reads: usize = 2000;
-
- const Runner = struct {
- const Runner = @This();
-
- rwl: RwLock,
- writes: usize,
- reads: std.atomic.Value(usize),
-
- val_a: usize,
- val_b: usize,
-
- fn reader(run: *Runner, thread_idx: usize) !void {
- var prng = std.Random.DefaultPrng.init(thread_idx);
- const rnd = prng.random();
- while (true) {
- run.rwl.lockShared();
- defer run.rwl.unlockShared();
-
- try testing.expect(run.writes <= num_writes);
- if (run.reads.fetchAdd(1, .monotonic) >= num_reads) break;
-
- // We use `volatile` accesses so that we can make sure the memory is accessed either
- // side of a yield, maximising chances of a race.
- const a_ptr: *const volatile usize = &run.val_a;
- const b_ptr: *const volatile usize = &run.val_b;
-
- const old_a = a_ptr.*;
- if (rnd.boolean()) try std.Thread.yield();
- const old_b = b_ptr.*;
- try testing.expect(old_a == old_b);
- }
- }
-
- fn writer(run: *Runner, thread_idx: usize) !void {
- var prng = std.Random.DefaultPrng.init(thread_idx);
- const rnd = prng.random();
- while (true) {
- run.rwl.lock();
- defer run.rwl.unlock();
-
- try testing.expect(run.writes <= num_writes);
- if (run.writes == num_writes) break;
-
- // We use `volatile` accesses so that we can make sure the memory is accessed either
- // side of a yield, maximising chances of a race.
- const a_ptr: *volatile usize = &run.val_a;
- const b_ptr: *volatile usize = &run.val_b;
-
- const new_val = rnd.int(usize);
-
- const old_a = a_ptr.*;
- a_ptr.* = new_val;
- if (rnd.boolean()) try std.Thread.yield();
- const old_b = b_ptr.*;
- b_ptr.* = new_val;
- try testing.expect(old_a == old_b);
-
- run.writes += 1;
- }
- }
- };
-
- var run: Runner = .{
- .rwl = .{},
- .writes = 0,
- .reads = .init(0),
- .val_a = 0,
- .val_b = 0,
- };
- var write_threads: [num_writers]std.Thread = undefined;
- var read_threads: [num_readers]std.Thread = undefined;
-
- for (&write_threads, 0..) |*t, i| t.* = try .spawn(.{}, Runner.writer, .{ &run, i });
- for (&read_threads, num_writers..) |*t, i| t.* = try .spawn(.{}, Runner.reader, .{ &run, i });
-
- for (write_threads) |t| t.join();
- for (read_threads) |t| t.join();
-
- try testing.expect(run.writes == num_writes);
- try testing.expect(run.reads.raw >= num_reads);
-}
diff --git a/lib/std/Thread/Semaphore.zig b/lib/std/Thread/Semaphore.zig
@@ -1,111 +0,0 @@
-//! A semaphore is an unsigned integer that blocks the kernel thread if
-//! the number would become negative.
-//! This API supports static initialization and does not require deinitialization.
-//!
-//! Example:
-//! ```
-//! var s = Semaphore{};
-//!
-//! fn consumer() void {
-//! s.wait();
-//! }
-//!
-//! fn producer() void {
-//! s.post();
-//! }
-//!
-//! const thread = try std.Thread.spawn(.{}, producer, .{});
-//! consumer();
-//! thread.join();
-//! ```
-
-mutex: Mutex = .{},
-cond: Condition = .{},
-/// It is OK to initialize this field to any value.
-permits: usize = 0,
-
-const Semaphore = @This();
-const std = @import("../std.zig");
-const Mutex = std.Thread.Mutex;
-const Condition = std.Thread.Condition;
-const builtin = @import("builtin");
-const testing = std.testing;
-
-pub fn wait(sem: *Semaphore) void {
- sem.mutex.lock();
- defer sem.mutex.unlock();
-
- while (sem.permits == 0)
- sem.cond.wait(&sem.mutex);
-
- sem.permits -= 1;
- if (sem.permits > 0)
- sem.cond.signal();
-}
-
-pub fn timedWait(sem: *Semaphore, timeout_ns: u64) error{Timeout}!void {
- var timeout_timer = std.time.Timer.start() catch unreachable;
-
- sem.mutex.lock();
- defer sem.mutex.unlock();
-
- while (sem.permits == 0) {
- const elapsed = timeout_timer.read();
- if (elapsed > timeout_ns)
- return error.Timeout;
-
- const local_timeout_ns = timeout_ns - elapsed;
- try sem.cond.timedWait(&sem.mutex, local_timeout_ns);
- }
-
- sem.permits -= 1;
- if (sem.permits > 0)
- sem.cond.signal();
-}
-
-pub fn post(sem: *Semaphore) void {
- sem.mutex.lock();
- defer sem.mutex.unlock();
-
- sem.permits += 1;
- sem.cond.signal();
-}
-
-test Semaphore {
- if (builtin.single_threaded) {
- return error.SkipZigTest;
- }
-
- const TestContext = struct {
- sem: *Semaphore,
- n: *i32,
- fn worker(ctx: *@This()) void {
- ctx.sem.wait();
- ctx.n.* += 1;
- ctx.sem.post();
- }
- };
- const num_threads = 3;
- var sem = Semaphore{ .permits = 1 };
- var threads: [num_threads]std.Thread = undefined;
- var n: i32 = 0;
- var ctx = TestContext{ .sem = &sem, .n = &n };
-
- for (&threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
- for (threads) |t| t.join();
- sem.wait();
- try testing.expect(n == num_threads);
-}
-
-test timedWait {
- var sem = Semaphore{};
- try testing.expectEqual(0, sem.permits);
-
- try testing.expectError(error.Timeout, sem.timedWait(1));
-
- sem.post();
- try testing.expectEqual(1, sem.permits);
-
- try sem.timedWait(1);
- try testing.expectEqual(0, sem.permits);
-}
diff --git a/lib/std/debug.zig b/lib/std/debug.zig
@@ -696,7 +696,7 @@ pub noinline fn writeCurrentStackTrace(options: StackUnwindOptions, t: Io.Termin
.useless, .unsafe => {},
.safe, .ideal => continue, // no need to even warn
}
- const module_name = di.getModuleName(di_gpa, unwind_error.address) catch "???";
+ const module_name = di.getModuleName(di_gpa, io, unwind_error.address) catch "???";
const caption: []const u8 = switch (unwind_error.err) {
error.MissingDebugInfo => "unwind info unavailable",
error.InvalidDebugInfo => "unwind info invalid",
@@ -1141,7 +1141,7 @@ fn printSourceAtAddress(
symbol.source_location,
address,
symbol.name orelse "???",
- symbol.compile_unit_name orelse debug_info.getModuleName(gpa, address) catch "???",
+ symbol.compile_unit_name orelse debug_info.getModuleName(gpa, io, address) catch "???",
);
}
fn printLineInfo(
@@ -1356,7 +1356,10 @@ pub fn getDebugInfoAllocator() Allocator {
// Otherwise, use a global arena backed by the page allocator
const S = struct {
var arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
- var ts_arena: std.heap.ThreadSafeAllocator = .{ .child_allocator = arena.allocator() };
+ var ts_arena: std.heap.ThreadSafeAllocator = .{
+ .child_allocator = arena.allocator(),
+ .io = std.Options.debug_io,
+ };
};
return S.ts_arena.allocator();
}
diff --git a/lib/std/debug/Coverage.zig b/lib/std/debug/Coverage.zig
@@ -1,11 +1,12 @@
+const Coverage = @This();
+
const std = @import("../std.zig");
+const Io = std.Io;
const Allocator = std.mem.Allocator;
const Hash = std.hash.Wyhash;
const Dwarf = std.debug.Dwarf;
const assert = std.debug.assert;
-const Coverage = @This();
-
/// Provides a globally-scoped integer index for directories.
///
/// As opposed to, for example, a directory index that is compilation-unit
@@ -23,12 +24,12 @@ directories: std.ArrayHashMapUnmanaged(String, void, String.MapContext, false),
files: std.ArrayHashMapUnmanaged(File, void, File.MapContext, false),
string_bytes: std.ArrayList(u8),
/// Protects the other fields.
-mutex: std.Thread.Mutex,
+mutex: Io.Mutex,
pub const init: Coverage = .{
.directories = .{},
.files = .{},
- .mutex = .{},
+ .mutex = .init,
.string_bytes = .{},
};
@@ -140,11 +141,12 @@ pub fn stringAt(cov: *Coverage, index: String) [:0]const u8 {
return span(cov.string_bytes.items[@intFromEnum(index)..]);
}
-pub const ResolveAddressesDwarfError = Dwarf.ScanError;
+pub const ResolveAddressesDwarfError = Dwarf.ScanError || Io.Cancelable;
pub fn resolveAddressesDwarf(
cov: *Coverage,
gpa: Allocator,
+ io: Io,
endian: std.builtin.Endian,
/// Asserts the addresses are in ascending order.
sorted_pc_addrs: []const u64,
@@ -161,8 +163,8 @@ pub fn resolveAddressesDwarf(
var prev_pc: u64 = 0;
var prev_cu: ?*std.debug.Dwarf.CompileUnit = null;
// Protects directories and files tables from other threads.
- cov.mutex.lock();
- defer cov.mutex.unlock();
+ try cov.mutex.lock(io);
+ defer cov.mutex.unlock(io);
next_pc: for (sorted_pc_addrs, output) |pc, *out| {
assert(pc >= prev_pc);
prev_pc = pc;
@@ -183,8 +185,8 @@ pub fn resolveAddressesDwarf(
if (cu != prev_cu) {
prev_cu = cu;
if (cu.src_loc_cache == null) {
- cov.mutex.unlock();
- defer cov.mutex.lock();
+ cov.mutex.unlock(io);
+ defer cov.mutex.lockUncancelable(io);
d.populateSrcLocCache(gpa, endian, cu) catch |err| switch (err) {
error.MissingDebugInfo, error.InvalidDebugInfo => {
out.* = SourceLocation.invalid;
diff --git a/lib/std/debug/Info.zig b/lib/std/debug/Info.zig
@@ -93,7 +93,7 @@ pub fn resolveAddresses(
) ResolveAddressesError!void {
assert(sorted_pc_addrs.len == output.len);
switch (info.impl) {
- .elf => |*ef| return info.coverage.resolveAddressesDwarf(gpa, ef.endian, sorted_pc_addrs, output, &ef.dwarf.?),
+ .elf => |*ef| return info.coverage.resolveAddressesDwarf(gpa, io, ef.endian, sorted_pc_addrs, output, &ef.dwarf.?),
.macho => |*mf| {
// Resolving all of the addresses at once unfortunately isn't so easy in Mach-O binaries
// due to split debug information. For now, we'll just resolve the addreses one by one.
@@ -112,7 +112,7 @@ pub fn resolveAddresses(
else => |e| return e,
};
}
- try info.coverage.resolveAddressesDwarf(gpa, .little, &.{dwarf_pc_addr}, src_loc[0..1], dwarf);
+ try info.coverage.resolveAddressesDwarf(gpa, io, .little, &.{dwarf_pc_addr}, src_loc[0..1], dwarf);
}
},
}
diff --git a/lib/std/debug/SelfInfo/Elf.zig b/lib/std/debug/SelfInfo/Elf.zig
@@ -1,4 +1,4 @@
-rwlock: std.Thread.RwLock,
+mutex: Io.Mutex,
modules: std.ArrayList(Module),
ranges: std.ArrayList(Module.Range),
@@ -6,7 +6,7 @@ ranges: std.ArrayList(Module.Range),
unwind_cache: if (can_unwind) ?[]Dwarf.SelfUnwinder.CacheEntry else ?noreturn,
pub const init: SelfInfo = .{
- .rwlock = .{},
+ .mutex = .init,
.modules = .empty,
.ranges = .empty,
.unwind_cache = null,
@@ -29,8 +29,8 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
- const module = try si.findModule(gpa, address, .exclusive);
- defer si.rwlock.unlock();
+ const module = try si.findModule(gpa, io, address, .exclusive);
+ defer si.mutex.unlock(io);
const vaddr = address - module.load_offset;
@@ -73,15 +73,15 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st
error.OutOfMemory => |e| return e,
};
}
-pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
- const module = try si.findModule(gpa, address, .shared);
- defer si.rwlock.unlockShared();
+pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
+ const module = try si.findModule(gpa, io, address, .shared);
+ defer si.mutex.unlock(io);
if (module.name.len == 0) return error.MissingDebugInfo;
return module.name;
}
-pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
- const module = try si.findModule(gpa, address, .shared);
- defer si.rwlock.unlockShared();
+pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
+ const module = try si.findModule(gpa, io, address, .shared);
+ defer si.mutex.unlock(io);
return module.load_offset;
}
@@ -183,8 +183,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
comptime assert(can_unwind);
{
- si.rwlock.lockShared();
- defer si.rwlock.unlockShared();
+ try si.mutex.lock(io);
+ defer si.mutex.unlock(io);
if (si.unwind_cache) |cache| {
if (Dwarf.SelfUnwinder.CacheEntry.find(cache, context.pc)) |entry| {
return context.next(gpa, entry);
@@ -192,8 +192,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
}
}
- const module = try si.findModule(gpa, context.pc, .exclusive);
- defer si.rwlock.unlock();
+ const module = try si.findModule(gpa, io, context.pc, .exclusive);
+ defer si.mutex.unlock(io);
if (si.unwind_cache == null) {
si.unwind_cache = try gpa.alloc(Dwarf.SelfUnwinder.CacheEntry, 2048);
@@ -375,11 +375,11 @@ const Module = struct {
}
};
-fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared, exclusive }) Error!*Module {
+fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum { shared, exclusive }) Error!*Module {
// With the requested lock, scan the module ranges looking for `address`.
switch (lock) {
- .shared => si.rwlock.lockShared(),
- .exclusive => si.rwlock.lock(),
+ .shared => try si.mutex.lock(io),
+ .exclusive => try si.mutex.lock(io),
}
for (si.ranges.items) |*range| {
if (address >= range.start and address < range.start + range.len) {
@@ -389,15 +389,12 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
// The address wasn't in a known range. We will rebuild the module/range lists, since it's possible
// a new module was loaded. Upgrade to an exclusive lock if necessary.
switch (lock) {
- .shared => {
- si.rwlock.unlockShared();
- si.rwlock.lock();
- },
+ .shared => {},
.exclusive => {},
}
// Rebuild module list with the exclusive lock.
{
- errdefer si.rwlock.unlock();
+ errdefer si.mutex.unlock(io);
for (si.modules.items) |*mod| {
unwind: {
const u = &(mod.unwind orelse break :unwind catch break :unwind);
@@ -415,10 +412,7 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
}
// Downgrade the lock back to shared if necessary.
switch (lock) {
- .shared => {
- si.rwlock.unlock();
- si.rwlock.lockShared();
- },
+ .shared => {},
.exclusive => {},
}
// Scan the newly rebuilt module ranges.
@@ -429,8 +423,8 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
}
// Still nothing; unlock and error.
switch (lock) {
- .shared => si.rwlock.unlockShared(),
- .exclusive => si.rwlock.unlock(),
+ .shared => si.mutex.unlock(io),
+ .exclusive => si.mutex.unlock(io),
}
return error.MissingDebugInfo;
}
diff --git a/lib/std/debug/SelfInfo/MachO.zig b/lib/std/debug/SelfInfo/MachO.zig
@@ -1,9 +1,9 @@
-mutex: std.Thread.Mutex,
+mutex: Io.Mutex,
/// Accessed through `Module.Adapter`.
modules: std.ArrayHashMapUnmanaged(Module, void, Module.Context, false),
pub const init: SelfInfo = .{
- .mutex = .{},
+ .mutex = .init,
.modules = .empty,
};
pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
@@ -21,8 +21,8 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
- const module = try si.findModule(gpa, address);
- defer si.mutex.unlock();
+ const module = try si.findModule(gpa, io, address);
+ defer si.mutex.unlock(io);
const file = try module.getFile(gpa, io);
@@ -76,9 +76,10 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st
) catch null,
};
}
-pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
+pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
_ = si;
_ = gpa;
+ _ = io;
// This function is marked as deprecated; however, it is significantly more
// performant than `dladdr` (since the latter also does a very slow symbol
// lookup), so let's use it since it's still available.
@@ -86,9 +87,9 @@ pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]cons
@ptrFromInt(address),
) orelse return error.MissingDebugInfo);
}
-pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
- const module = try si.findModule(gpa, address);
- defer si.mutex.unlock();
+pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
+ const module = try si.findModule(gpa, io, address);
+ defer si.mutex.unlock(io);
const header: *std.macho.mach_header_64 = @ptrFromInt(module.text_base);
const raw_macho: [*]u8 = @ptrCast(header);
var it = macho.LoadCommandIterator.init(header, raw_macho[@sizeOf(macho.mach_header_64)..][0..header.sizeofcmds]) catch unreachable;
@@ -107,8 +108,7 @@ pub const UnwindContext = std.debug.Dwarf.SelfUnwinder;
/// If the compact encoding can't encode a way to unwind a frame, it will
/// defer unwinding to DWARF, in which case `__eh_frame` will be used if available.
pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContext) Error!usize {
- _ = io;
- return unwindFrameInner(si, gpa, context) catch |err| switch (err) {
+ return unwindFrameInner(si, gpa, io, context) catch |err| switch (err) {
error.InvalidDebugInfo,
error.MissingDebugInfo,
error.UnsupportedDebugInfo,
@@ -134,9 +134,9 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
=> return error.InvalidDebugInfo,
};
}
-fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, context: *UnwindContext) !usize {
- const module = try si.findModule(gpa, context.pc);
- defer si.mutex.unlock();
+fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContext) !usize {
+ const module = try si.findModule(gpa, io, context.pc);
+ defer si.mutex.unlock(io);
const unwind: *Module.Unwind = try module.getUnwindInfo(gpa);
@@ -430,15 +430,15 @@ fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, context: *UnwindContext) !usi
}
/// Acquires the mutex on success.
-fn findModule(si: *SelfInfo, gpa: Allocator, address: usize) Error!*Module {
+fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!*Module {
// This function is marked as deprecated; however, it is significantly more
// performant than `dladdr` (since the latter also does a very slow symbol
// lookup), so let's use it since it's still available.
const text_base = std.c._dyld_get_image_header_containing_address(
@ptrFromInt(address),
) orelse return error.MissingDebugInfo;
- si.mutex.lock();
- errdefer si.mutex.unlock();
+ try si.mutex.lock(io);
+ errdefer si.mutex.unlock(io);
const gop = try si.modules.getOrPutAdapted(gpa, @intFromPtr(text_base), Module.Adapter{});
errdefer comptime unreachable;
if (!gop.found_existing) gop.key_ptr.* = .{
diff --git a/lib/std/debug/SelfInfo/Windows.zig b/lib/std/debug/SelfInfo/Windows.zig
@@ -1,9 +1,9 @@
-mutex: std.Thread.Mutex,
+mutex: Io.Mutex,
modules: std.ArrayList(Module),
module_name_arena: std.heap.ArenaAllocator.State,
pub const init: SelfInfo = .{
- .mutex = .{},
+ .mutex = .init,
.modules = .empty,
.module_name_arena = .{},
};
@@ -21,21 +21,21 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
- si.mutex.lock();
- defer si.mutex.unlock();
+ try si.mutex.lock(io);
+ defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
const di = try module.getDebugInfo(gpa, io);
return di.getSymbol(gpa, address - module.base_address);
}
-pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
- si.mutex.lock();
- defer si.mutex.unlock();
+pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
+ try si.mutex.lock(io);
+ defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
return module.name;
}
-pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
- si.mutex.lock();
- defer si.mutex.unlock();
+pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
+ try si.mutex.lock(io);
+ defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
return module.base_address;
}
diff --git a/lib/std/heap/ThreadSafeAllocator.zig b/lib/std/heap/ThreadSafeAllocator.zig
@@ -1,7 +1,14 @@
-//! Wraps a non-thread-safe allocator and makes it thread-safe.
+//! Deprecated. Thread safety should be built into each Allocator instance
+//! directly rather than trying to do this "composable allocators" thing.
+const ThreadSafeAllocator = @This();
+
+const std = @import("../std.zig");
+const Io = std.Io;
+const Allocator = std.mem.Allocator;
child_allocator: Allocator,
-mutex: std.Thread.Mutex = .{},
+io: Io,
+mutex: Io.Mutex = .init,
pub fn allocator(self: *ThreadSafeAllocator) Allocator {
return .{
@@ -17,39 +24,39 @@ pub fn allocator(self: *ThreadSafeAllocator) Allocator {
fn alloc(ctx: *anyopaque, n: usize, alignment: std.mem.Alignment, ra: usize) ?[*]u8 {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
- self.mutex.lock();
- defer self.mutex.unlock();
+ const io = self.io;
+ self.mutex.lockUncancelable(io);
+ defer self.mutex.unlock(io);
return self.child_allocator.rawAlloc(n, alignment, ra);
}
fn resize(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) bool {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
+ const io = self.io;
- self.mutex.lock();
- defer self.mutex.unlock();
+ self.mutex.lockUncancelable(io);
+ defer self.mutex.unlock(io);
return self.child_allocator.rawResize(buf, alignment, new_len, ret_addr);
}
fn remap(context: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, return_address: usize) ?[*]u8 {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(context));
+ const io = self.io;
- self.mutex.lock();
- defer self.mutex.unlock();
+ self.mutex.lockUncancelable(io);
+ defer self.mutex.unlock(io);
return self.child_allocator.rawRemap(memory, alignment, new_len, return_address);
}
fn free(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, ret_addr: usize) void {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
+ const io = self.io;
- self.mutex.lock();
- defer self.mutex.unlock();
+ self.mutex.lockUncancelable(io);
+ defer self.mutex.unlock(io);
return self.child_allocator.rawFree(buf, alignment, ret_addr);
}
-
-const std = @import("../std.zig");
-const ThreadSafeAllocator = @This();
-const Allocator = std.mem.Allocator;
diff --git a/lib/std/heap/debug_allocator.zig b/lib/std/heap/debug_allocator.zig
@@ -126,16 +126,6 @@ pub const Config = struct {
/// Whether the allocator may be used simultaneously from multiple threads.
thread_safe: bool = !builtin.single_threaded,
- /// What type of mutex you'd like to use, for thread safety.
- /// when specified, the mutex type must have the same shape as `std.Thread.Mutex` and
- /// `DummyMutex`, and have no required fields. Specifying this field causes
- /// the `thread_safe` field to be ignored.
- ///
- /// when null (default):
- /// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled.
- /// * the mutex type defaults to `DummyMutex` otherwise.
- MutexType: ?type = null,
-
/// This is a temporary debugging trick you can use to turn segfaults into more helpful
/// logged error messages with stack trace details. The downside is that every allocation
/// will be leaked, unless used with retain_metadata!
@@ -204,17 +194,8 @@ pub fn DebugAllocator(comptime config: Config) type {
const total_requested_bytes_init = if (config.enable_memory_limit) @as(usize, 0) else {};
const requested_memory_limit_init = if (config.enable_memory_limit) @as(usize, math.maxInt(usize)) else {};
- const mutex_init = if (config.MutexType) |T|
- T{}
- else if (config.thread_safe)
- std.Thread.Mutex{}
- else
- DummyMutex{};
-
- const DummyMutex = struct {
- inline fn lock(_: DummyMutex) void {}
- inline fn unlock(_: DummyMutex) void {}
- };
+ const have_mutex = config.thread_safe;
+ const mutex_init = if (have_mutex) std.Io.Mutex.init else {};
const stack_n = config.stack_trace_frames;
const one_trace_size = @sizeOf(usize) * stack_n;
@@ -737,8 +718,8 @@ pub fn DebugAllocator(comptime config: Config) type {
fn alloc(context: *anyopaque, len: usize, alignment: mem.Alignment, ret_addr: usize) ?[*]u8 {
const self: *Self = @ptrCast(@alignCast(context));
- self.mutex.lock();
- defer self.mutex.unlock();
+ if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
+ defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
if (config.enable_memory_limit) {
const new_req_bytes = self.total_requested_bytes + len;
@@ -850,8 +831,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) bool {
const self: *Self = @ptrCast(@alignCast(context));
- self.mutex.lock();
- defer self.mutex.unlock();
+ if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
+ defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@@ -869,8 +850,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) ?[*]u8 {
const self: *Self = @ptrCast(@alignCast(context));
- self.mutex.lock();
- defer self.mutex.unlock();
+ if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
+ defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@@ -887,8 +868,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) void {
const self: *Self = @ptrCast(@alignCast(context));
- self.mutex.lock();
- defer self.mutex.unlock();
+ if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
+ defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(old_memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@@ -1331,18 +1312,6 @@ test "realloc large object to small object" {
try std.testing.expect(slice[16] == 0x34);
}
-test "overridable mutexes" {
- var gpa = DebugAllocator(.{ .MutexType = std.Thread.Mutex }){
- .backing_allocator = std.testing.allocator,
- .mutex = std.Thread.Mutex{},
- };
- defer std.testing.expect(gpa.deinit() == .ok) catch @panic("leak");
- const allocator = gpa.allocator();
-
- const ptr = try allocator.create(i32);
- defer allocator.destroy(ptr);
-}
-
test "non-page-allocator backing allocator" {
var gpa: DebugAllocator(.{
.backing_allocator_zeroes = false,
diff --git a/lib/std/heap/sbrk_allocator.zig b/lib/std/heap/sbrk_allocator.zig
@@ -1,5 +1,7 @@
-const std = @import("../std.zig");
const builtin = @import("builtin");
+
+const std = @import("../std.zig");
+const Io = std.Io;
const math = std.math;
const Allocator = std.mem.Allocator;
const mem = std.mem;
@@ -39,12 +41,12 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
var big_frees = [1]usize{0} ** big_size_class_count;
// TODO don't do the naive locking strategy
- var lock: std.Thread.Mutex = .{};
+ var mutex: Io.Mutex = .{};
fn alloc(ctx: *anyopaque, len: usize, alignment: mem.Alignment, return_address: usize) ?[*]u8 {
_ = ctx;
_ = return_address;
- lock.lock();
- defer lock.unlock();
+ Io.Threaded.mutexLock(&mutex);
+ defer Io.Threaded.mutexUnlock(&mutex);
// Make room for the freelist next pointer.
const actual_len = @max(len +| @sizeOf(usize), alignment.toByteUnits());
const slot_size = math.ceilPowerOfTwo(usize, actual_len) catch return null;
@@ -88,8 +90,8 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
) bool {
_ = ctx;
_ = return_address;
- lock.lock();
- defer lock.unlock();
+ Io.Threaded.mutexLock(&mutex);
+ defer Io.Threaded.mutexUnlock(&mutex);
// We don't want to move anything from one size class to another, but we
// can recover bytes in between powers of two.
const buf_align = alignment.toByteUnits();
@@ -127,8 +129,8 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
) void {
_ = ctx;
_ = return_address;
- lock.lock();
- defer lock.unlock();
+ Io.Threaded.mutexLock(&mutex);
+ defer Io.Threaded.mutexUnlock(&mutex);
const buf_align = alignment.toByteUnits();
const actual_len = @max(buf.len + @sizeOf(usize), buf_align);
const slot_size = math.ceilPowerOfTwoAssert(usize, actual_len);
diff --git a/lib/std/http/Client.zig b/lib/std/http/Client.zig
@@ -3,22 +3,25 @@
//! Connections are opened in a thread-safe manner, but individual Requests are not.
//!
//! TLS support may be disabled via `std.options.http_disable_tls`.
+//!
+//! TODO all the lockUncancelable in this file should be changed to regular lock and
+//! `error.Canceled` added to more error sets.
+const Client = @This();
-const std = @import("../std.zig");
const builtin = @import("builtin");
+
+const std = @import("../std.zig");
+const Io = std.Io;
const testing = std.testing;
const http = std.http;
const mem = std.mem;
const Uri = std.Uri;
-const Allocator = mem.Allocator;
+const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
-const Io = std.Io;
const Writer = std.Io.Writer;
const Reader = std.Io.Reader;
const HostName = std.Io.net.HostName;
-const Client = @This();
-
pub const disable_tls = std.options.http_disable_tls;
/// Used for all client allocations. Must be thread-safe.
@@ -27,7 +30,7 @@ allocator: Allocator,
io: Io,
ca_bundle: if (disable_tls) void else std.crypto.Certificate.Bundle = if (disable_tls) {} else .{},
-ca_bundle_mutex: std.Thread.Mutex = .{},
+ca_bundle_mutex: Io.Mutex = .init,
/// Used both for the reader and writer buffers.
tls_buffer_size: if (disable_tls) u0 else usize = if (disable_tls) 0 else std.crypto.tls.Client.min_buffer_len,
/// If non-null, ssl secrets are logged to a stream. Creating such a stream
@@ -62,7 +65,7 @@ https_proxy: ?*Proxy = null,
/// A Least-Recently-Used cache of open connections to be reused.
pub const ConnectionPool = struct {
- mutex: std.Thread.Mutex = .{},
+ mutex: Io.Mutex = .init,
/// Open connections that are currently in use.
used: std.DoublyLinkedList = .{},
/// Open connections that are not currently in use.
@@ -81,9 +84,9 @@ pub const ConnectionPool = struct {
/// If no connection is found, null is returned.
///
/// Threadsafe.
- pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+ pub fn findConnection(pool: *ConnectionPool, io: Io, criteria: Criteria) ?*Connection {
+ pool.mutex.lockUncancelable(io);
+ defer pool.mutex.unlock(io);
var next = pool.free.last;
while (next) |node| : (next = node.prev) {
@@ -110,9 +113,9 @@ pub const ConnectionPool = struct {
}
/// Acquires an existing connection from the connection pool. This function is threadsafe.
- pub fn acquire(pool: *ConnectionPool, connection: *Connection) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+ pub fn acquire(pool: *ConnectionPool, io: Io, connection: *Connection) void {
+ pool.mutex.lockUncancelable(io);
+ defer pool.mutex.unlock(io);
return pool.acquireUnsafe(connection);
}
@@ -122,8 +125,8 @@ pub const ConnectionPool = struct {
///
/// Threadsafe.
pub fn release(pool: *ConnectionPool, connection: *Connection, io: Io) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+ pool.mutex.lockUncancelable(io);
+ defer pool.mutex.unlock(io);
pool.used.remove(&connection.pool_node);
@@ -147,9 +150,9 @@ pub const ConnectionPool = struct {
}
/// Adds a newly created node to the pool of used connections. This function is threadsafe.
- pub fn addUsed(pool: *ConnectionPool, connection: *Connection) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+ pub fn addUsed(pool: *ConnectionPool, io: Io, connection: *Connection) void {
+ pool.mutex.lockUncancelable(io);
+ defer pool.mutex.unlock(io);
pool.used.append(&connection.pool_node);
}
@@ -159,9 +162,9 @@ pub const ConnectionPool = struct {
/// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size.
///
/// Threadsafe.
- pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+ pub fn resize(pool: *ConnectionPool, io: Io, allocator: Allocator, new_size: usize) void {
+ pool.mutex.lockUncancelable(io);
+ defer pool.mutex.unlock(io);
const next = pool.free.first;
_ = next;
@@ -182,7 +185,7 @@ pub const ConnectionPool = struct {
///
/// Threadsafe.
pub fn deinit(pool: *ConnectionPool, io: Io) void {
- pool.mutex.lock();
+ pool.mutex.lockUncancelable(io);
var next = pool.free.first;
while (next) |node| {
@@ -1308,9 +1311,11 @@ pub fn deinit(client: *Client) void {
/// Uses `arena` for a few small allocations that must outlive the client, or
/// at least until those fields are set to different values.
pub fn initDefaultProxies(client: *Client, arena: Allocator, environ_map: *std.process.Environ.Map) !void {
+ const io = client.io;
+
// Prevent any new connections from being created.
- client.connection_pool.mutex.lock();
- defer client.connection_pool.mutex.unlock();
+ client.connection_pool.mutex.lockUncancelable(io);
+ defer client.connection_pool.mutex.unlock(io);
assert(client.connection_pool.used.first == null); // There are active requests.
@@ -1437,7 +1442,7 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
const proxied_host = options.proxied_host orelse host;
const proxied_port = options.proxied_port orelse port;
- if (client.connection_pool.findConnection(.{
+ if (client.connection_pool.findConnection(io, .{
.host = proxied_host,
.port = proxied_port,
.protocol = protocol,
@@ -1455,12 +1460,12 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
error.Canceled => |e| return e,
else => return error.TlsInitializationFailed,
};
- client.connection_pool.addUsed(&tc.connection);
+ client.connection_pool.addUsed(io, &tc.connection);
return &tc.connection;
},
.plain => {
const pc = try Connection.Plain.create(client, proxied_host, proxied_port, stream);
- client.connection_pool.addUsed(&pc.connection);
+ client.connection_pool.addUsed(io, &pc.connection);
return &pc.connection;
},
}
@@ -1474,7 +1479,7 @@ pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{N
pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connection {
const io = client.io;
- if (client.connection_pool.findConnection(.{
+ if (client.connection_pool.findConnection(io, .{
.host = path,
.port = 0,
.protocol = .plain,
@@ -1516,7 +1521,7 @@ pub fn connectProxied(
const io = client.io;
if (!proxy.supports_connect) return error.TunnelNotSupported;
- if (client.connection_pool.findConnection(.{
+ if (client.connection_pool.findConnection(io, .{
.host = proxied_host,
.port = proxied_port,
.protocol = proxy.protocol,
@@ -1691,8 +1696,8 @@ pub fn request(
if (protocol == .tls) {
if (disable_tls) unreachable;
{
- client.ca_bundle_mutex.lock();
- defer client.ca_bundle_mutex.unlock();
+ client.ca_bundle_mutex.lockUncancelable(io);
+ defer client.ca_bundle_mutex.unlock(io);
if (client.now == null) {
const now = try Io.Clock.real.now(io);
diff --git a/lib/std/once.zig b/lib/std/once.zig
@@ -1,71 +0,0 @@
-const std = @import("std.zig");
-const builtin = @import("builtin");
-const testing = std.testing;
-
-pub fn once(comptime f: fn () void) Once(f) {
- return Once(f){};
-}
-
-/// An object that executes the function `f` just once.
-/// It is undefined behavior if `f` re-enters the same Once instance.
-pub fn Once(comptime f: fn () void) type {
- return struct {
- done: bool = false,
- mutex: std.Thread.Mutex = std.Thread.Mutex{},
-
- /// Call the function `f`.
- /// If `call` is invoked multiple times `f` will be executed only the
- /// first time.
- /// The invocations are thread-safe.
- pub fn call(self: *@This()) void {
- if (@atomicLoad(bool, &self.done, .acquire))
- return;
-
- return self.callSlow();
- }
-
- fn callSlow(self: *@This()) void {
- @branchHint(.cold);
-
- self.mutex.lock();
- defer self.mutex.unlock();
-
- // The first thread to acquire the mutex gets to run the initializer
- if (!self.done) {
- f();
- @atomicStore(bool, &self.done, true, .release);
- }
- }
- };
-}
-
-var global_number: i32 = 0;
-var global_once = once(incr);
-
-fn incr() void {
- global_number += 1;
-}
-
-test "Once executes its function just once" {
- if (builtin.single_threaded) {
- global_once.call();
- global_once.call();
- } else {
- var threads: [10]std.Thread = undefined;
- var thread_count: usize = 0;
- defer for (threads[0..thread_count]) |handle| handle.join();
-
- for (&threads) |*handle| {
- handle.* = try std.Thread.spawn(.{}, struct {
- fn thread_fn(x: u8) void {
- _ = x;
- global_once.call();
- if (global_number != 1) @panic("memory ordering bug");
- }
- }.thread_fn, .{0});
- thread_count += 1;
- }
- }
-
- try testing.expectEqual(@as(i32, 1), global_number);
-}
diff --git a/lib/std/std.zig b/lib/std/std.zig
@@ -86,7 +86,6 @@ pub const math = @import("math.zig");
pub const mem = @import("mem.zig");
pub const meta = @import("meta.zig");
pub const os = @import("os.zig");
-pub const once = @import("once.zig").once;
pub const pdb = @import("pdb.zig");
pub const pie = @import("pie.zig");
pub const posix = @import("posix.zig");