organize std lib concurrency primitives and add RwLock
* move concurrency primitives that always operate on kernel threads to the std.Thread namespace * remove std.SpinLock. Nobody should use this in a non-freestanding environment; the other primitives are always preferable. In freestanding, it will be necessary to put custom spin logic in there, so there are no use cases for a std lib version. * move some std lib files to the top level fields convention * add std.Thread.spinLoopHint * add std.Thread.Condition * add std.Thread.Semaphore * new implementation of std.Thread.Mutex for Windows and non-pthreads Linux * add std.Thread.RwLock Implementations provided by @kprotty
This commit is contained in:
228
lib/std/Thread/AutoResetEvent.zig
Normal file
228
lib/std/Thread/AutoResetEvent.zig
Normal file
@@ -0,0 +1,228 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`.
|
||||
//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like).
|
||||
//!
|
||||
//! AutoResetEvent has 3 possible states:
|
||||
//! - UNSET: the AutoResetEvent is currently unset
|
||||
//! - SET: the AutoResetEvent was notified before a wait() was called
|
||||
//! - <StaticResetEvent pointer>: there is an active waiter waiting for a notification.
|
||||
//!
|
||||
//! When attempting to wait:
|
||||
//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
|
||||
//! if the event is already set, then it consumes the notification and resets the event.
|
||||
//!
|
||||
//! When attempting to notify:
|
||||
//! if the event is unset, then we set the event
|
||||
//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
|
||||
//!
|
||||
//! This ensures that the event is automatically reset after a wait() has been issued
|
||||
//! and avoids the race condition when using StaticResetEvent in the following scenario:
|
||||
//! thread 1 | thread 2
|
||||
//! StaticResetEvent.wait() |
|
||||
//! | StaticResetEvent.set()
|
||||
//! | StaticResetEvent.set()
|
||||
//! StaticResetEvent.reset() |
|
||||
//! StaticResetEvent.wait() | (missed the second .set() notification above)
|
||||
|
||||
state: usize = UNSET,
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const testing = std.testing;
|
||||
const assert = std.debug.assert;
|
||||
const StaticResetEvent = std.Thread.StaticResetEvent;
|
||||
const AutoResetEvent = @This();
|
||||
|
||||
const UNSET = 0;
|
||||
const SET = 1;
|
||||
|
||||
/// the minimum alignment for the `*StaticResetEvent` created by wait*()
|
||||
const event_align = std.math.max(@alignOf(StaticResetEvent), 2);
|
||||
|
||||
pub fn wait(self: *AutoResetEvent) void {
|
||||
self.waitFor(null) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
|
||||
return self.waitFor(timeout);
|
||||
}
|
||||
|
||||
fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
|
||||
// lazily initialized StaticResetEvent
|
||||
var reset_event: StaticResetEvent align(event_align) = undefined;
|
||||
var has_reset_event = false;
|
||||
|
||||
var state = @atomicLoad(usize, &self.state, .SeqCst);
|
||||
while (true) {
|
||||
// consume a notification if there is any
|
||||
if (state == SET) {
|
||||
@atomicStore(usize, &self.state, UNSET, .SeqCst);
|
||||
return;
|
||||
}
|
||||
|
||||
// check if theres currently a pending ResetEvent pointer already registered
|
||||
if (state != UNSET) {
|
||||
unreachable; // multiple waiting threads on the same AutoResetEvent
|
||||
}
|
||||
|
||||
// lazily initialize the ResetEvent if it hasn't been already
|
||||
if (!has_reset_event) {
|
||||
has_reset_event = true;
|
||||
reset_event = .{};
|
||||
}
|
||||
|
||||
// Since the AutoResetEvent currently isnt set,
|
||||
// try to register our ResetEvent on it to wait
|
||||
// for a set() call from another thread.
|
||||
if (@cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
UNSET,
|
||||
@ptrToInt(&reset_event),
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
)) |new_state| {
|
||||
state = new_state;
|
||||
continue;
|
||||
}
|
||||
|
||||
// if no timeout was specified, then just wait forever
|
||||
const timeout_ns = timeout orelse {
|
||||
reset_event.wait();
|
||||
return;
|
||||
};
|
||||
|
||||
// wait with a timeout and return if signalled via set()
|
||||
switch (reset_event.timedWait(timeout_ns)) {
|
||||
.event_set => return,
|
||||
.timed_out => {},
|
||||
}
|
||||
|
||||
// If we timed out, we need to transition the AutoResetEvent back to UNSET.
|
||||
// If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
|
||||
state = @cmpxchgStrong(
|
||||
usize,
|
||||
&self.state,
|
||||
@ptrToInt(&reset_event),
|
||||
UNSET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return error.TimedOut;
|
||||
|
||||
// We didn't manage to unregister ourselves from the state.
|
||||
if (state == SET) {
|
||||
unreachable; // AutoResetEvent notified without waking up the waiting thread
|
||||
} else if (state != UNSET) {
|
||||
unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
|
||||
}
|
||||
|
||||
// This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
|
||||
// We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
|
||||
// We don't return error.TimedOut here as it technically notified us while we were "timing out".
|
||||
reset_event.wait();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(self: *AutoResetEvent) void {
|
||||
var state = @atomicLoad(usize, &self.state, .SeqCst);
|
||||
while (true) {
|
||||
// If the AutoResetEvent is already set, there is nothing else left to do
|
||||
if (state == SET) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If the AutoResetEvent isn't set,
|
||||
// then try to leave a notification for the wait() thread that we set() it.
|
||||
if (state == UNSET) {
|
||||
state = @cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
UNSET,
|
||||
SET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return;
|
||||
continue;
|
||||
}
|
||||
|
||||
// There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
|
||||
// Try to acquire ownership of it so that we can wake it up.
|
||||
// This also resets the AutoResetEvent so that there is no race condition as defined above.
|
||||
if (@cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
state,
|
||||
UNSET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
)) |new_state| {
|
||||
state = new_state;
|
||||
continue;
|
||||
}
|
||||
|
||||
const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state);
|
||||
reset_event.set();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
test "basic usage" {
|
||||
// test local code paths
|
||||
{
|
||||
var event = AutoResetEvent{};
|
||||
testing.expectError(error.TimedOut, event.timedWait(1));
|
||||
event.set();
|
||||
event.wait();
|
||||
}
|
||||
|
||||
// test cross-thread signaling
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
|
||||
const Context = struct {
|
||||
value: u128 = 0,
|
||||
in: AutoResetEvent = AutoResetEvent{},
|
||||
out: AutoResetEvent = AutoResetEvent{},
|
||||
|
||||
const Self = @This();
|
||||
|
||||
fn sender(self: *Self) void {
|
||||
testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.out.set();
|
||||
|
||||
self.in.wait();
|
||||
testing.expect(self.value == 2);
|
||||
self.value = 3;
|
||||
self.out.set();
|
||||
|
||||
self.in.wait();
|
||||
testing.expect(self.value == 4);
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) void {
|
||||
self.out.wait();
|
||||
testing.expect(self.value == 1);
|
||||
self.value = 2;
|
||||
self.in.set();
|
||||
|
||||
self.out.wait();
|
||||
testing.expect(self.value == 3);
|
||||
self.value = 4;
|
||||
self.in.set();
|
||||
}
|
||||
};
|
||||
|
||||
var context = Context{};
|
||||
const send_thread = try std.Thread.spawn(&context, Context.sender);
|
||||
const recv_thread = try std.Thread.spawn(&context, Context.receiver);
|
||||
|
||||
send_thread.wait();
|
||||
recv_thread.wait();
|
||||
}
|
||||
182
lib/std/Thread/Condition.zig
Normal file
182
lib/std/Thread/Condition.zig
Normal file
@@ -0,0 +1,182 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! A condition provides a way for a kernel thread to block until it is signaled
|
||||
//! to wake up. Spurious wakeups are possible.
|
||||
//! This API supports static initialization and does not require deinitialization.
|
||||
|
||||
impl: Impl,
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const Condition = @This();
|
||||
const windows = std.os.windows;
|
||||
const linux = std.os.linux;
|
||||
const Mutex = std.Thread.Mutex;
|
||||
const assert = std.debug.assert;
|
||||
|
||||
const Impl = if (std.builtin.single_threaded)
|
||||
SingleThreadedCondition
|
||||
else if (std.Target.current.os.tag == .windows)
|
||||
WindowsCondition
|
||||
else if (std.Thread.use_pthreads)
|
||||
PthreadCondition
|
||||
else
|
||||
AtomicCondition;
|
||||
|
||||
pub const SingleThreadedCondition = struct {
|
||||
pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void {
|
||||
unreachable; // deadlock detected
|
||||
}
|
||||
|
||||
pub fn signal(cond: *SingleThreadedCondition) void {}
|
||||
|
||||
pub fn broadcast(cond: *SingleThreadedCondition) void {}
|
||||
};
|
||||
|
||||
pub const WindowsCondition = struct {
|
||||
cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT,
|
||||
|
||||
pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void {
|
||||
const rc = windows.SleepConditionVariableSRW(
|
||||
&cond.cond,
|
||||
&mutex.srwlock,
|
||||
windows.INFINITE,
|
||||
@as(windows.ULONG, 0),
|
||||
);
|
||||
assert(rc != windows.FALSE);
|
||||
}
|
||||
|
||||
pub fn signal(cond: *WindowsCondition) void {
|
||||
windows.WakeConditionVariable(&cond.cond);
|
||||
}
|
||||
|
||||
pub fn broadcast(cond: *WindowsCondition) void {
|
||||
windows.WakeAllConditionVariable(&cond.cond);
|
||||
}
|
||||
};
|
||||
|
||||
pub const PthreadCondition = struct {
|
||||
cond: std.c.pthread_cond_t = .{},
|
||||
|
||||
pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void {
|
||||
const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.mutex);
|
||||
assert(rc == 0);
|
||||
}
|
||||
|
||||
pub fn signal(cond: *PthreadCondition) void {
|
||||
const rc = std.c.pthread_cond_signal(&cond.cond);
|
||||
assert(rc == 0);
|
||||
}
|
||||
|
||||
pub fn broadcast(cond: *PthreadCondition) void {
|
||||
const rc = std.c.pthread_cond_broadcast(&cond.cond);
|
||||
assert(rc == 0);
|
||||
}
|
||||
};
|
||||
|
||||
pub const AtomicCondition = struct {
|
||||
pending: bool = false,
|
||||
queue_mutex: Mutex = .{},
|
||||
queue_list: QueueList = .{},
|
||||
|
||||
pub const QueueList = std.SinglyLinkedList(QueueItem);
|
||||
|
||||
pub const QueueItem = struct {
|
||||
futex: i32 = 0,
|
||||
|
||||
fn wait(cond: *@This()) void {
|
||||
while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
|
||||
switch (std.Target.current.os.tag) {
|
||||
.linux => {
|
||||
switch (linux.getErrno(linux.futex_wait(
|
||||
&cond.futex,
|
||||
linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT,
|
||||
0,
|
||||
null,
|
||||
))) {
|
||||
0 => {},
|
||||
std.os.EINTR => {},
|
||||
std.os.EAGAIN => {},
|
||||
else => unreachable,
|
||||
}
|
||||
},
|
||||
else => spinLoopHint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn notify(cond: *@This()) void {
|
||||
@atomicStore(i32, &cond.futex, 1, .Release);
|
||||
|
||||
switch (std.Target.current.os.tag) {
|
||||
.linux => {
|
||||
switch (linux.getErrno(linux.futex_wake(
|
||||
&cond.futex,
|
||||
linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE,
|
||||
1,
|
||||
))) {
|
||||
0 => {},
|
||||
std.os.EFAULT => {},
|
||||
else => unreachable,
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void {
|
||||
var waiter = QueueList.Node{ .data = .{} };
|
||||
|
||||
{
|
||||
const held = cond.queue_mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
cond.queue_list.prepend(&waiter);
|
||||
@atomicStore(bool, &cond.pending, true, .SeqCst);
|
||||
}
|
||||
|
||||
mutex.unlock();
|
||||
waiter.data.wait();
|
||||
mutex.lock();
|
||||
}
|
||||
|
||||
pub fn signal(cond: *AtomicCondition) void {
|
||||
if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
|
||||
return;
|
||||
|
||||
const maybe_waiter = blk: {
|
||||
const held = cond.queue_mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
const maybe_waiter = cond.queue_list.popFirst();
|
||||
@atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
|
||||
break :blk maybe_waiter;
|
||||
};
|
||||
|
||||
if (maybe_waiter) |waiter|
|
||||
waiter.data.notify();
|
||||
}
|
||||
|
||||
pub fn broadcast(cond: *AtomicCondition) void {
|
||||
if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
|
||||
return;
|
||||
|
||||
@atomicStore(bool, &cond.pending, false, .SeqCst);
|
||||
|
||||
var waiters = blk: {
|
||||
const held = cond.queue_mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
const waiters = cond.queue_list;
|
||||
cond.queue_list = .{};
|
||||
break :blk waiters;
|
||||
};
|
||||
|
||||
while (waiters.popFirst()) |waiter|
|
||||
waiter.data.notify();
|
||||
}
|
||||
};
|
||||
303
lib/std/Thread/Mutex.zig
Normal file
303
lib/std/Thread/Mutex.zig
Normal file
@@ -0,0 +1,303 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! Lock may be held only once. If the same thread tries to acquire
|
||||
//! the same mutex twice, it deadlocks. This type supports static
|
||||
//! initialization and is at most `@sizeOf(usize)` in size. When an
|
||||
//! application is built in single threaded release mode, all the
|
||||
//! functions are no-ops. In single threaded debug mode, there is
|
||||
//! deadlock detection.
|
||||
//!
|
||||
//! Example usage:
|
||||
//! var m = Mutex{};
|
||||
//!
|
||||
//! const lock = m.acquire();
|
||||
//! defer lock.release();
|
||||
//! ... critical code
|
||||
//!
|
||||
//! Non-blocking:
|
||||
//! if (m.tryAcquire) |lock| {
|
||||
//! defer lock.release();
|
||||
//! // ... critical section
|
||||
//! } else {
|
||||
//! // ... lock not acquired
|
||||
//! }
|
||||
|
||||
impl: Impl = .{},
|
||||
|
||||
const Mutex = @This();
|
||||
const std = @import("../std.zig");
|
||||
const builtin = std.builtin;
|
||||
const os = std.os;
|
||||
const assert = std.debug.assert;
|
||||
const windows = os.windows;
|
||||
const linux = os.linux;
|
||||
const testing = std.testing;
|
||||
const StaticResetEvent = std.thread.StaticResetEvent;
|
||||
|
||||
pub const Held = struct {
|
||||
impl: *Impl,
|
||||
|
||||
pub fn release(held: Held) void {
|
||||
held.impl.release();
|
||||
}
|
||||
};
|
||||
|
||||
/// Try to acquire the mutex without blocking. Returns null if
|
||||
/// the mutex is unavailable. Otherwise returns Held. Call
|
||||
/// release on Held.
|
||||
pub fn tryAcquire(m: *Mutex) ?Held {
|
||||
if (m.impl.tryAcquire()) {
|
||||
return Held{ .impl = &m.impl };
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquire the mutex. Deadlocks if the mutex is already
|
||||
/// held by the calling thread.
|
||||
pub fn acquire(m: *Mutex) Held {
|
||||
m.impl.acquire();
|
||||
return .{ .impl = &m.impl };
|
||||
}
|
||||
|
||||
const Impl = if (builtin.single_threaded)
|
||||
Dummy
|
||||
else if (builtin.os.tag == .windows)
|
||||
WindowsMutex
|
||||
else if (std.Thread.use_pthreads)
|
||||
PthreadMutex
|
||||
else
|
||||
AtomicMutex;
|
||||
|
||||
pub const AtomicMutex = struct {
|
||||
state: State = .unlocked,
|
||||
|
||||
const State = enum(i32) {
|
||||
unlocked,
|
||||
locked,
|
||||
waiting,
|
||||
};
|
||||
|
||||
pub fn tryAcquire(self: *AtomicMutex) bool {
|
||||
return @cmpxchgStrong(
|
||||
State,
|
||||
&self.state,
|
||||
.unlocked,
|
||||
.locked,
|
||||
.Acquire,
|
||||
.Monotonic,
|
||||
) == null;
|
||||
}
|
||||
|
||||
pub fn acquire(self: *AtomicMutex) void {
|
||||
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) {
|
||||
.unlocked => {},
|
||||
else => |s| self.lockSlow(s),
|
||||
}
|
||||
}
|
||||
|
||||
fn lockSlow(self: *AtomicMutex, current_state: State) void {
|
||||
@setCold(true);
|
||||
var new_state = current_state;
|
||||
|
||||
var spin: u8 = 0;
|
||||
while (spin < 100) : (spin += 1) {
|
||||
const state = @cmpxchgWeak(
|
||||
State,
|
||||
&self.state,
|
||||
.unlocked,
|
||||
new_state,
|
||||
.Acquire,
|
||||
.Monotonic,
|
||||
) orelse return;
|
||||
|
||||
switch (state) {
|
||||
.unlocked => {},
|
||||
.locked => {},
|
||||
.waiting => break,
|
||||
}
|
||||
|
||||
var iter = std.math.min(32, spin + 1);
|
||||
while (iter > 0) : (iter -= 1)
|
||||
std.Thread.spinLoopHint();
|
||||
}
|
||||
|
||||
new_state = .waiting;
|
||||
while (true) {
|
||||
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) {
|
||||
.unlocked => return,
|
||||
else => {},
|
||||
}
|
||||
switch (std.Target.current.os.tag) {
|
||||
.linux => {
|
||||
switch (linux.getErrno(linux.futex_wait(
|
||||
@ptrCast(*const i32, &self.state),
|
||||
linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT,
|
||||
@enumToInt(new_state),
|
||||
null,
|
||||
))) {
|
||||
0 => {},
|
||||
std.os.EINTR => {},
|
||||
std.os.EAGAIN => {},
|
||||
else => unreachable,
|
||||
}
|
||||
},
|
||||
else => std.Thread.spinLoopHint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release(self: *AtomicMutex) void {
|
||||
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) {
|
||||
.unlocked => unreachable,
|
||||
.locked => {},
|
||||
.waiting => self.unlockSlow(),
|
||||
}
|
||||
}
|
||||
|
||||
fn unlockSlow(self: *AtomicMutex) void {
|
||||
@setCold(true);
|
||||
|
||||
switch (std.Target.current.os.tag) {
|
||||
.linux => {
|
||||
switch (linux.getErrno(linux.futex_wake(
|
||||
@ptrCast(*const i32, &self.state),
|
||||
linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE,
|
||||
1,
|
||||
))) {
|
||||
0 => {},
|
||||
std.os.EFAULT => {},
|
||||
else => unreachable,
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const PthreadMutex = struct {
|
||||
pthread_mutex: std.c.pthread_mutex_t = .{},
|
||||
|
||||
/// Try to acquire the mutex without blocking. Returns null if
|
||||
/// the mutex is unavailable. Otherwise returns Held. Call
|
||||
/// release on Held.
|
||||
pub fn tryAcquire(self: *PthreadMutex) bool {
|
||||
return std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0;
|
||||
}
|
||||
|
||||
/// Acquire the mutex. Will deadlock if the mutex is already
|
||||
/// held by the calling thread.
|
||||
pub fn acquire(self: *PthreadMutex) void {
|
||||
switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) {
|
||||
0 => return,
|
||||
std.c.EINVAL => unreachable,
|
||||
std.c.EBUSY => unreachable,
|
||||
std.c.EAGAIN => unreachable,
|
||||
std.c.EDEADLK => unreachable,
|
||||
std.c.EPERM => unreachable,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release(self: *PthreadMutex) void {
|
||||
switch (std.c.pthread_mutex_unlock(&self.pthread_mutex)) {
|
||||
0 => return,
|
||||
std.c.EINVAL => unreachable,
|
||||
std.c.EAGAIN => unreachable,
|
||||
std.c.EPERM => unreachable,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// This has the sematics as `Mutex`, however it does not actually do any
|
||||
/// synchronization. Operations are safety-checked no-ops.
|
||||
pub const Dummy = struct {
|
||||
lock: @TypeOf(lock_init) = lock_init,
|
||||
|
||||
const lock_init = if (std.debug.runtime_safety) false else {};
|
||||
|
||||
/// Try to acquire the mutex without blocking. Returns null if
|
||||
/// the mutex is unavailable. Otherwise returns Held. Call
|
||||
/// release on Held.
|
||||
pub fn tryAcquire(self: *Dummy) bool {
|
||||
if (std.debug.runtime_safety) {
|
||||
if (self.lock) return false;
|
||||
self.lock = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Acquire the mutex. Will deadlock if the mutex is already
|
||||
/// held by the calling thread.
|
||||
pub fn acquire(self: *Dummy) void {
|
||||
return self.tryAcquire() orelse @panic("deadlock detected");
|
||||
}
|
||||
|
||||
pub fn release(self: *Dummy) void {
|
||||
if (std.debug.runtime_safety) {
|
||||
self.mutex.lock = false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const WindowsMutex = struct {
|
||||
srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT,
|
||||
|
||||
pub fn tryAcquire(self: *WindowsMutex) bool {
|
||||
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE;
|
||||
}
|
||||
|
||||
pub fn acquire(self: *WindowsMutex) void {
|
||||
AcquireSRWLockExclusive(&self.srwlock);
|
||||
}
|
||||
|
||||
pub fn release(self: *WindowsMutex) void {
|
||||
ReleaseSRWLockExclusive(&self.srwlock);
|
||||
}
|
||||
};
|
||||
|
||||
const TestContext = struct {
|
||||
mutex: *Mutex,
|
||||
data: i128,
|
||||
|
||||
const incr_count = 10000;
|
||||
};
|
||||
|
||||
test "basic usage" {
|
||||
var mutex = Mutex{};
|
||||
|
||||
var context = TestContext{
|
||||
.mutex = &mutex,
|
||||
.data = 0,
|
||||
};
|
||||
|
||||
if (builtin.single_threaded) {
|
||||
worker(&context);
|
||||
testing.expect(context.data == TestContext.incr_count);
|
||||
} else {
|
||||
const thread_count = 10;
|
||||
var threads: [thread_count]*std.Thread = undefined;
|
||||
for (threads) |*t| {
|
||||
t.* = try std.Thread.spawn(&context, worker);
|
||||
}
|
||||
for (threads) |t|
|
||||
t.wait();
|
||||
|
||||
testing.expect(context.data == thread_count * TestContext.incr_count);
|
||||
}
|
||||
}
|
||||
|
||||
fn worker(ctx: *TestContext) void {
|
||||
var i: usize = 0;
|
||||
while (i != TestContext.incr_count) : (i += 1) {
|
||||
const held = ctx.mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
ctx.data += 1;
|
||||
}
|
||||
}
|
||||
297
lib/std/Thread/ResetEvent.zig
Normal file
297
lib/std/Thread/ResetEvent.zig
Normal file
@@ -0,0 +1,297 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! A thread-safe resource which supports blocking until signaled.
|
||||
//! This API is for kernel threads, not evented I/O.
|
||||
//! This API requires being initialized at runtime, and initialization
|
||||
//! can fail. Once initialized, the core operations cannot fail.
|
||||
//! If you need an abstraction that cannot fail to be initialized, see
|
||||
//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure,
|
||||
//! it is preferred to use `ResetEvent`.
|
||||
|
||||
const ResetEvent = @This();
|
||||
const std = @import("../std.zig");
|
||||
const builtin = std.builtin;
|
||||
const testing = std.testing;
|
||||
const assert = std.debug.assert;
|
||||
const c = std.c;
|
||||
const os = std.os;
|
||||
const time = std.time;
|
||||
|
||||
impl: Impl,
|
||||
|
||||
pub const Impl = if (builtin.single_threaded)
|
||||
std.Thread.StaticResetEvent.DebugEvent
|
||||
else if (std.Target.current.isDarwin())
|
||||
DarwinEvent
|
||||
else if (std.Thread.use_pthreads)
|
||||
PosixEvent
|
||||
else
|
||||
std.Thread.StaticResetEvent.AtomicEvent;
|
||||
|
||||
pub const InitError = error{SystemResources};
|
||||
|
||||
/// After `init`, it is legal to call any other function.
|
||||
pub fn init(ev: *ResetEvent) InitError!void {
|
||||
return ev.impl.init();
|
||||
}
|
||||
|
||||
/// This function is not thread-safe.
|
||||
/// After `deinit`, the only legal function to call is `init`.
|
||||
pub fn deinit(ev: *ResetEvent) void {
|
||||
return ev.impl.deinit();
|
||||
}
|
||||
|
||||
/// Sets the event if not already set and wakes up all the threads waiting on
|
||||
/// the event. It is safe to call `set` multiple times before calling `wait`.
|
||||
/// However it is illegal to call `set` after `wait` is called until the event
|
||||
/// is `reset`. This function is thread-safe.
|
||||
pub fn set(ev: *ResetEvent) void {
|
||||
return ev.impl.set();
|
||||
}
|
||||
|
||||
/// Resets the event to its original, unset state.
|
||||
/// This function is *not* thread-safe. It is equivalent to calling
|
||||
/// `deinit` followed by `init` but without the possibility of failure.
|
||||
pub fn reset(ev: *ResetEvent) void {
|
||||
return ev.impl.reset();
|
||||
}
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// Thread-safe. No spurious wakeups.
|
||||
/// Upon return from `wait`, the only functions available to be called
|
||||
/// in `ResetEvent` are `reset` and `deinit`.
|
||||
pub fn wait(ev: *ResetEvent) void {
|
||||
return ev.impl.wait();
|
||||
}
|
||||
|
||||
pub const TimedWaitResult = enum { event_set, timed_out };
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// A timeout in nanoseconds can be provided as a hint for how
|
||||
/// long the thread should block on the unset event before returning
|
||||
/// `TimedWaitResult.timed_out`.
|
||||
/// Thread-safe. No precision of timing is guaranteed.
|
||||
/// Upon return from `wait`, the only functions available to be called
|
||||
/// in `ResetEvent` are `reset` and `deinit`.
|
||||
pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult {
|
||||
return ev.impl.timedWait(timeout_ns);
|
||||
}
|
||||
|
||||
/// Apple has decided to not support POSIX semaphores, so we go with a
|
||||
/// different approach using Grand Central Dispatch. This API is exposed
|
||||
/// by libSystem so it is guaranteed to be available on all Darwin platforms.
|
||||
pub const DarwinEvent = struct {
|
||||
sem: c.dispatch_semaphore_t = undefined,
|
||||
|
||||
pub fn init(ev: *DarwinEvent) !void {
|
||||
ev.* = .{
|
||||
.sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(ev: *DarwinEvent) void {
|
||||
c.dispatch_release(ev.sem);
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *DarwinEvent) void {
|
||||
// Empirically this returns the numerical value of the semaphore.
|
||||
_ = c.dispatch_semaphore_signal(ev.sem);
|
||||
}
|
||||
|
||||
pub fn wait(ev: *DarwinEvent) void {
|
||||
assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0);
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult {
|
||||
const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns));
|
||||
if (c.dispatch_semaphore_wait(ev.sem, t) != 0) {
|
||||
return .timed_out;
|
||||
} else {
|
||||
return .event_set;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *DarwinEvent) void {
|
||||
// Keep calling until the semaphore goes back down to 0.
|
||||
while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {}
|
||||
}
|
||||
};
|
||||
|
||||
/// POSIX semaphores must be initialized at runtime because they are allowed to
|
||||
/// be implemented as file descriptors, in which case initialization would require
|
||||
/// a syscall to open the fd.
|
||||
pub const PosixEvent = struct {
|
||||
sem: c.sem_t = undefined,
|
||||
|
||||
pub fn init(ev: *PosixEvent) !void {
|
||||
switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) {
|
||||
0 => return,
|
||||
else => return error.SystemResources,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(ev: *PosixEvent) void {
|
||||
assert(c.sem_destroy(&ev.sem) == 0);
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *PosixEvent) void {
|
||||
assert(c.sem_post(&ev.sem) == 0);
|
||||
}
|
||||
|
||||
pub fn wait(ev: *PosixEvent) void {
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_wait(&ev.sem))) {
|
||||
0 => return,
|
||||
c.EINTR => continue,
|
||||
c.EINVAL => unreachable,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult {
|
||||
var ts: os.timespec = undefined;
|
||||
var timeout_abs = timeout_ns;
|
||||
os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out;
|
||||
timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
|
||||
timeout_abs += @intCast(u64, ts.tv_nsec);
|
||||
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
|
||||
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) {
|
||||
0 => return .event_set,
|
||||
c.EINTR => continue,
|
||||
c.EINVAL => unreachable,
|
||||
c.ETIMEDOUT => return .timed_out,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *PosixEvent) void {
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_trywait(&ev.sem))) {
|
||||
0 => continue, // Need to make it go to zero.
|
||||
c.EINTR => continue,
|
||||
c.EINVAL => unreachable,
|
||||
c.EAGAIN => return, // The semaphore currently has the value zero.
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test "basic usage" {
|
||||
var event: ResetEvent = undefined;
|
||||
try event.init();
|
||||
defer event.deinit();
|
||||
|
||||
// test event setting
|
||||
event.set();
|
||||
|
||||
// test event resetting
|
||||
event.reset();
|
||||
|
||||
// test event waiting (non-blocking)
|
||||
event.set();
|
||||
event.wait();
|
||||
event.reset();
|
||||
|
||||
event.set();
|
||||
testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
|
||||
|
||||
// test cross-thread signaling
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
|
||||
const Context = struct {
|
||||
const Self = @This();
|
||||
|
||||
value: u128,
|
||||
in: ResetEvent,
|
||||
out: ResetEvent,
|
||||
|
||||
fn init(self: *Self) !void {
|
||||
self.* = .{
|
||||
.value = 0,
|
||||
.in = undefined,
|
||||
.out = undefined,
|
||||
};
|
||||
try self.in.init();
|
||||
try self.out.init();
|
||||
}
|
||||
|
||||
fn deinit(self: *Self) void {
|
||||
self.in.deinit();
|
||||
self.out.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
|
||||
fn sender(self: *Self) void {
|
||||
// update value and signal input
|
||||
testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.in.set();
|
||||
|
||||
// wait for receiver to update value and signal output
|
||||
self.out.wait();
|
||||
testing.expect(self.value == 2);
|
||||
|
||||
// update value and signal final input
|
||||
self.value = 3;
|
||||
self.in.set();
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) void {
|
||||
// wait for sender to update value and signal input
|
||||
self.in.wait();
|
||||
assert(self.value == 1);
|
||||
|
||||
// update value and signal output
|
||||
self.in.reset();
|
||||
self.value = 2;
|
||||
self.out.set();
|
||||
|
||||
// wait for sender to update value and signal final input
|
||||
self.in.wait();
|
||||
assert(self.value == 3);
|
||||
}
|
||||
|
||||
fn sleeper(self: *Self) void {
|
||||
self.in.set();
|
||||
time.sleep(time.ns_per_ms * 2);
|
||||
self.value = 5;
|
||||
self.out.set();
|
||||
}
|
||||
|
||||
fn timedWaiter(self: *Self) !void {
|
||||
self.in.wait();
|
||||
testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
|
||||
try self.out.timedWait(time.ns_per_ms * 100);
|
||||
testing.expect(self.value == 5);
|
||||
}
|
||||
};
|
||||
|
||||
var context: Context = undefined;
|
||||
try context.init();
|
||||
defer context.deinit();
|
||||
const receiver = try std.Thread.spawn(&context, Context.receiver);
|
||||
defer receiver.wait();
|
||||
context.sender();
|
||||
|
||||
if (false) {
|
||||
// I have now observed this fail on macOS, Windows, and Linux.
|
||||
// https://github.com/ziglang/zig/issues/7009
|
||||
var timed = Context.init();
|
||||
defer timed.deinit();
|
||||
const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
|
||||
defer sleeper.wait();
|
||||
try timed.timedWaiter();
|
||||
}
|
||||
}
|
||||
308
lib/std/Thread/RwLock.zig
Normal file
308
lib/std/Thread/RwLock.zig
Normal file
@@ -0,0 +1,308 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! A lock that supports one writer or many readers.
|
||||
//! This API is for kernel threads, not evented I/O.
|
||||
//! This API requires being initialized at runtime, and initialization
|
||||
//! can fail. Once initialized, the core operations cannot fail.
|
||||
|
||||
impl: Impl,
|
||||
|
||||
const RwLock = @This();
|
||||
const std = @import("../std.zig");
|
||||
const builtin = std.builtin;
|
||||
const assert = std.debug.assert;
|
||||
const Mutex = std.Thread.Mutex;
|
||||
const Semaphore = std.Semaphore;
|
||||
const CondVar = std.CondVar;
|
||||
|
||||
pub const Impl = if (builtin.single_threaded)
|
||||
SingleThreadedRwLock
|
||||
else if (std.Thread.use_pthreads)
|
||||
PthreadRwLock
|
||||
else
|
||||
DefaultRwLock;
|
||||
|
||||
pub fn init(rwl: *RwLock) void {
|
||||
return rwl.impl.init();
|
||||
}
|
||||
|
||||
pub fn deinit(rwl: *RwLock) void {
|
||||
return rwl.impl.deinit();
|
||||
}
|
||||
|
||||
/// Attempts to obtain exclusive lock ownership.
|
||||
/// Returns `true` if the lock is obtained, `false` otherwise.
|
||||
pub fn tryLock(rwl: *RwLock) bool {
|
||||
return rwl.impl.tryLock();
|
||||
}
|
||||
|
||||
/// Blocks until exclusive lock ownership is acquired.
|
||||
pub fn lock(rwl: *RwLock) void {
|
||||
return rwl.impl.lock();
|
||||
}
|
||||
|
||||
/// Releases a held exclusive lock.
|
||||
/// Asserts the lock is held exclusively.
|
||||
pub fn unlock(rwl: *RwLock) void {
|
||||
return rwl.impl.unlock();
|
||||
}
|
||||
|
||||
/// Attempts to obtain shared lock ownership.
|
||||
/// Returns `true` if the lock is obtained, `false` otherwise.
|
||||
pub fn tryLockShared(rwl: *RwLock) bool {
|
||||
return rwl.impl.tryLockShared();
|
||||
}
|
||||
|
||||
/// Blocks until shared lock ownership is acquired.
|
||||
pub fn lockShared(rwl: *RwLock) void {
|
||||
return rwl.impl.lockShared();
|
||||
}
|
||||
|
||||
/// Releases a held shared lock.
|
||||
pub fn unlockShared(rwl: *RwLock) void {
|
||||
return rwl.impl.unlockShared();
|
||||
}
|
||||
|
||||
/// Single-threaded applications use this for deadlock checks in
|
||||
/// debug mode, and no-ops in release modes.
|
||||
pub const SingleThreadedRwLock = struct {
|
||||
state: enum { unlocked, locked_exclusive, locked_shared },
|
||||
shared_count: usize,
|
||||
|
||||
pub fn init(rwl: *SingleThreadedRwLock) void {
|
||||
rwl.* = .{
|
||||
.state = .unlocked,
|
||||
.shared_count = 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(rwl: *SingleThreadedRwLock) void {
|
||||
assert(rwl.state == .unlocked);
|
||||
assert(rwl.shared_count == 0);
|
||||
}
|
||||
|
||||
/// Attempts to obtain exclusive lock ownership.
|
||||
/// Returns `true` if the lock is obtained, `false` otherwise.
|
||||
pub fn tryLock(rwl: *SingleThreadedRwLock) bool {
|
||||
switch (rwl.state) {
|
||||
.unlocked => {
|
||||
assert(rwl.shared_count == 0);
|
||||
rwl.state = .locked_exclusive;
|
||||
return true;
|
||||
},
|
||||
.locked_exclusive, .locked_shared => return false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until exclusive lock ownership is acquired.
|
||||
pub fn lock(rwl: *SingleThreadedRwLock) void {
|
||||
assert(rwl.state == .unlocked); // deadlock detected
|
||||
assert(rwl.shared_count == 0); // corrupted state detected
|
||||
rwl.state = .locked_exclusive;
|
||||
}
|
||||
|
||||
/// Releases a held exclusive lock.
|
||||
/// Asserts the lock is held exclusively.
|
||||
pub fn unlock(rwl: *SingleThreadedRwLock) void {
|
||||
assert(rwl.state == .locked_exclusive);
|
||||
assert(rwl.shared_count == 0); // corrupted state detected
|
||||
rwl.state = .unlocked;
|
||||
}
|
||||
|
||||
/// Attempts to obtain shared lock ownership.
|
||||
/// Returns `true` if the lock is obtained, `false` otherwise.
|
||||
pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool {
|
||||
switch (rwl.state) {
|
||||
.unlocked => {
|
||||
rwl.state = .locked_shared;
|
||||
assert(rwl.shared_count == 0);
|
||||
rwl.shared_count = 1;
|
||||
return true;
|
||||
},
|
||||
.locked_exclusive, .locked_shared => return false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until shared lock ownership is acquired.
|
||||
pub fn lockShared(rwl: *SingleThreadedRwLock) void {
|
||||
switch (rwl.state) {
|
||||
.unlocked => {
|
||||
rwl.state = .locked_shared;
|
||||
assert(rwl.shared_count == 0);
|
||||
rwl.shared_count = 1;
|
||||
},
|
||||
.locked_shared => {
|
||||
rwl.shared_count += 1;
|
||||
},
|
||||
.locked_exclusive => unreachable, // deadlock detected
|
||||
}
|
||||
}
|
||||
|
||||
/// Releases a held shared lock.
|
||||
pub fn unlockShared(rwl: *SingleThreadedRwLock) void {
|
||||
switch (rwl.state) {
|
||||
.unlocked => unreachable, // too many calls to `unlockShared`
|
||||
.locked_exclusive => unreachable, // exclusively held lock
|
||||
.locked_shared => {
|
||||
rwl.shared_count -= 1;
|
||||
if (rwl.shared_count == 0) {
|
||||
rwl.state = .unlocked;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const PthreadRwLock = struct {
|
||||
rwlock: pthread_rwlock_t,
|
||||
|
||||
pub fn init(rwl: *PthreadRwLock) void {
|
||||
rwl.* = .{ .rwlock = .{} };
|
||||
}
|
||||
|
||||
pub fn deinit(rwl: *PthreadRwLock) void {
|
||||
const safe_rc = switch (std.builtin.os.tag) {
|
||||
.dragonfly, .netbsd => std.os.EAGAIN,
|
||||
else => 0,
|
||||
};
|
||||
|
||||
const rc = std.c.pthread_rwlock_destroy(&rwl.rwlock);
|
||||
assert(rc == 0 or rc == safe_rc);
|
||||
|
||||
rwl.* = undefined;
|
||||
}
|
||||
|
||||
pub fn tryLock(rwl: *PthreadRwLock) bool {
|
||||
return pthread_rwlock_trywrlock(&rwl.rwlock) == 0;
|
||||
}
|
||||
|
||||
pub fn lock(rwl: *PthreadRwLock) void {
|
||||
const rc = pthread_rwlock_wrlock(&rwl.rwlock);
|
||||
assert(rc == 0);
|
||||
}
|
||||
|
||||
pub fn unlock(rwl: *PthreadRwLock) void {
|
||||
const rc = pthread_rwlock_unlock(&rwl.rwlock);
|
||||
assert(rc == 0);
|
||||
}
|
||||
|
||||
pub fn tryLockShared(rwl: *PthreadRwLock) bool {
|
||||
return pthread_rwlock_tryrdlock(&rwl.rwlock) == 0;
|
||||
}
|
||||
|
||||
pub fn lockShared(rwl: *PthreadRwLock) void {
|
||||
const rc = pthread_rwlock_rdlock(&rwl.rwlock);
|
||||
assert(rc == 0);
|
||||
}
|
||||
|
||||
pub fn unlockShared(rwl: *PthreadRwLock) void {
|
||||
const rc = pthread_rwlock_unlock(&rwl.rwlock);
|
||||
assert(rc == 0);
|
||||
}
|
||||
};
|
||||
|
||||
pub const DefaultRwLock = struct {
|
||||
state: usize,
|
||||
mutex: Mutex,
|
||||
semaphore: Semaphore,
|
||||
|
||||
const IS_WRITING: usize = 1;
|
||||
const WRITER: usize = 1 << 1;
|
||||
const READER: usize = 1 << (1 + std.meta.bitCount(Count));
|
||||
const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER);
|
||||
const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER);
|
||||
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2));
|
||||
|
||||
pub fn init(rwl: *DefaultRwLock) void {
|
||||
rwl.* = .{
|
||||
.state = 0,
|
||||
.mutex = Mutex.init(),
|
||||
.semaphore = Semaphore.init(0),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(rwl: *DefaultRwLock) void {
|
||||
rwl.semaphore.deinit();
|
||||
rwl.mutex.deinit();
|
||||
rwl.* = undefined;
|
||||
}
|
||||
|
||||
pub fn tryLock(rwl: *DefaultRwLock) bool {
|
||||
if (rwl.mutex.tryLock()) {
|
||||
const state = @atomicLoad(usize, &rwl.state, .SeqCst);
|
||||
if (state & READER_MASK == 0) {
|
||||
_ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
|
||||
return true;
|
||||
}
|
||||
|
||||
rwl.mutex.unlock();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn lock(rwl: *DefaultRwLock) void {
|
||||
_ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst);
|
||||
rwl.mutex.lock();
|
||||
|
||||
const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
|
||||
if (state & READER_MASK != 0)
|
||||
rwl.semaphore.wait();
|
||||
}
|
||||
|
||||
pub fn unlock(rwl: *DefaultRwLock) void {
|
||||
_ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .SeqCst);
|
||||
rwl.mutex.unlock();
|
||||
}
|
||||
|
||||
pub fn tryLockShared(rwl: *DefaultRwLock) bool {
|
||||
const state = @atomicLoad(usize, &rwl.state, .SeqCst);
|
||||
if (state & (IS_WRITING | WRITER_MASK) == 0) {
|
||||
_ = @cmpxchgStrong(
|
||||
usize,
|
||||
&rwl.state,
|
||||
state,
|
||||
state + READER,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return true;
|
||||
}
|
||||
|
||||
if (rwl.mutex.tryLock()) {
|
||||
_ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst);
|
||||
rwl.mutex.unlock();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn lockShared(rwl: *DefaultRwLock) void {
|
||||
var state = @atomicLoad(usize, &rwl.state, .SeqCst);
|
||||
while (state & (IS_WRITING | WRITER_MASK) == 0) {
|
||||
state = @cmpxchgWeak(
|
||||
usize,
|
||||
&rwl.state,
|
||||
state,
|
||||
state + READER,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return;
|
||||
}
|
||||
|
||||
rwl.mutex.lock();
|
||||
_ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst);
|
||||
rwl.mutex.unlock();
|
||||
}
|
||||
|
||||
pub fn unlockShared(rwl: *DefaultRwLock) void {
|
||||
const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .SeqCst);
|
||||
|
||||
if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
|
||||
rwl.semaphore.post();
|
||||
}
|
||||
};
|
||||
39
lib/std/Thread/Semaphore.zig
Normal file
39
lib/std/Thread/Semaphore.zig
Normal file
@@ -0,0 +1,39 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! A semaphore is an unsigned integer that blocks the kernel thread if
|
||||
//! the number would become negative.
|
||||
//! This API supports static initialization and does not require deinitialization.
|
||||
|
||||
mutex: Mutex = .{},
|
||||
cond: Condition = .{},
|
||||
//! It is OK to initialize this field to any value.
|
||||
permits: usize = 0,
|
||||
|
||||
const RwLock = @This();
|
||||
const std = @import("../std.zig");
|
||||
const Mutex = std.Thread.Mutex;
|
||||
const Condition = std.Thread.Condition;
|
||||
|
||||
pub fn wait(sem: *Semaphore) void {
|
||||
const held = sem.mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
while (sem.permits == 0)
|
||||
sem.cond.wait(&sem.mutex);
|
||||
|
||||
sem.permits -= 1;
|
||||
if (sem.permits > 0)
|
||||
sem.cond.signal();
|
||||
}
|
||||
|
||||
pub fn post(sem: *Semaphore) void {
|
||||
const held = sem.mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
sem.permits += 1;
|
||||
sem.cond.signal();
|
||||
}
|
||||
396
lib/std/Thread/StaticResetEvent.zig
Normal file
396
lib/std/Thread/StaticResetEvent.zig
Normal file
@@ -0,0 +1,396 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// Copyright (c) 2015-2021 Zig Contributors
|
||||
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
|
||||
// The MIT license requires this copyright notice to be included in all copies
|
||||
// and substantial portions of the software.
|
||||
|
||||
//! A thread-safe resource which supports blocking until signaled.
|
||||
//! This API is for kernel threads, not evented I/O.
|
||||
//! This API is statically initializable. It cannot fail to be initialized
|
||||
//! and it requires no deinitialization. The downside is that it may not
|
||||
//! integrate as cleanly into other synchronization APIs, or, in a worst case,
|
||||
//! may be forced to fall back on spin locking. As a rule of thumb, prefer
|
||||
//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when
|
||||
//! the logic needs stronger API guarantees.
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const StaticResetEvent = @This();
|
||||
const SpinLock = std.SpinLock;
|
||||
const assert = std.debug.assert;
|
||||
const os = std.os;
|
||||
const time = std.time;
|
||||
const linux = std.os.linux;
|
||||
const windows = std.os.windows;
|
||||
const testing = std.testing;
|
||||
|
||||
impl: Impl = .{},
|
||||
|
||||
pub const Impl = if (std.builtin.single_threaded)
|
||||
DebugEvent
|
||||
else
|
||||
AtomicEvent;
|
||||
|
||||
/// Sets the event if not already set and wakes up all the threads waiting on
|
||||
/// the event. It is safe to call `set` multiple times before calling `wait`.
|
||||
/// However it is illegal to call `set` after `wait` is called until the event
|
||||
/// is `reset`. This function is thread-safe.
|
||||
pub fn set(ev: *StaticResetEvent) void {
|
||||
return ev.impl.set();
|
||||
}
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// Thread-safe. No spurious wakeups.
|
||||
/// Upon return from `wait`, the only function available to be called
|
||||
/// in `StaticResetEvent` is `reset`.
|
||||
pub fn wait(ev: *StaticResetEvent) void {
|
||||
return ev.impl.wait();
|
||||
}
|
||||
|
||||
/// Resets the event to its original, unset state.
|
||||
/// This function is *not* thread-safe. It is equivalent to calling
|
||||
/// `deinit` followed by `init` but without the possibility of failure.
|
||||
pub fn reset(ev: *StaticResetEvent) void {
|
||||
return ev.impl.reset();
|
||||
}
|
||||
|
||||
pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult;
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// A timeout in nanoseconds can be provided as a hint for how
|
||||
/// long the thread should block on the unset event before returning
|
||||
/// `TimedWaitResult.timed_out`.
|
||||
/// Thread-safe. No precision of timing is guaranteed.
|
||||
/// Upon return from `timedWait`, the only function available to be called
|
||||
/// in `StaticResetEvent` is `reset`.
|
||||
pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult {
|
||||
return ev.impl.timedWait(timeout_ns);
|
||||
}
|
||||
|
||||
/// For single-threaded builds, we use this to detect deadlocks.
|
||||
/// In unsafe modes this ends up being no-ops.
|
||||
pub const DebugEvent = struct {
|
||||
state: State = State.unset,
|
||||
|
||||
const State = enum {
|
||||
unset,
|
||||
set,
|
||||
waited,
|
||||
};
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn init(ev: *DebugEvent) void {
|
||||
ev.* = .{};
|
||||
}
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn deinit(ev: *DebugEvent) void {
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *DebugEvent) void {
|
||||
switch (ev.state) {
|
||||
.unset => ev.state = .set,
|
||||
.set => {},
|
||||
.waited => unreachable, // Not allowed to call `set` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(ev: *DebugEvent) void {
|
||||
switch (ev.state) {
|
||||
.unset => unreachable, // Deadlock detected.
|
||||
.set => return,
|
||||
.waited => unreachable, // Not allowed to call `wait` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult {
|
||||
switch (ev.state) {
|
||||
.unset => return .timed_out,
|
||||
.set => return .event_set,
|
||||
.waited => unreachable, // Not allowed to call `wait` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *DebugEvent) void {
|
||||
ev.state = .unset;
|
||||
}
|
||||
};
|
||||
|
||||
pub const AtomicEvent = struct {
|
||||
waiters: u32 = 0,
|
||||
|
||||
const WAKE = 1 << 0;
|
||||
const WAIT = 1 << 1;
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn init(ev: *AtomicEvent) void {
|
||||
ev.* = .{};
|
||||
}
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn deinit(ev: *AtomicEvent) void {
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *AtomicEvent) void {
|
||||
const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release);
|
||||
if (waiters >= WAIT) {
|
||||
return Futex.wake(&ev.waiters, waiters >> 1);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(ev: *AtomicEvent) void {
|
||||
switch (ev.timedWait(null)) {
|
||||
.timed_out => unreachable,
|
||||
.event_set => return,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult {
|
||||
var waiters = @atomicLoad(u32, &ev.waiters, .Acquire);
|
||||
while (waiters != WAKE) {
|
||||
waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse {
|
||||
if (Futex.wait(&ev.waiters, timeout)) |_| {
|
||||
return .event_set;
|
||||
} else |_| {
|
||||
return .timed_out;
|
||||
}
|
||||
};
|
||||
}
|
||||
return .event_set;
|
||||
}
|
||||
|
||||
pub fn reset(ev: *AtomicEvent) void {
|
||||
@atomicStore(u32, &ev.waiters, 0, .Monotonic);
|
||||
}
|
||||
|
||||
pub const Futex = switch (std.Target.current.os.tag) {
|
||||
.windows => WindowsFutex,
|
||||
.linux => LinuxFutex,
|
||||
else => SpinFutex,
|
||||
};
|
||||
|
||||
pub const SpinFutex = struct {
|
||||
fn wake(waiters: *u32, wake_count: u32) void {}
|
||||
|
||||
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
var timer: time.Timer = undefined;
|
||||
if (timeout != null)
|
||||
timer = time.Timer.start() catch return error.TimedOut;
|
||||
|
||||
while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
|
||||
SpinLock.yield();
|
||||
if (timeout) |timeout_ns| {
|
||||
if (timer.read() >= timeout_ns)
|
||||
return error.TimedOut;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const LinuxFutex = struct {
|
||||
fn wake(waiters: *u32, wake_count: u32) void {
|
||||
const waiting = std.math.maxInt(i32); // wake_count
|
||||
const ptr = @ptrCast(*const i32, waiters);
|
||||
const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
|
||||
assert(linux.getErrno(rc) == 0);
|
||||
}
|
||||
|
||||
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
var ts: linux.timespec = undefined;
|
||||
var ts_ptr: ?*linux.timespec = null;
|
||||
if (timeout) |timeout_ns| {
|
||||
ts_ptr = &ts;
|
||||
ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
|
||||
ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
const waiting = @atomicLoad(u32, waiters, .Acquire);
|
||||
if (waiting == WAKE)
|
||||
return;
|
||||
const expected = @intCast(i32, waiting);
|
||||
const ptr = @ptrCast(*const i32, waiters);
|
||||
const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
|
||||
switch (linux.getErrno(rc)) {
|
||||
0 => continue,
|
||||
os.ETIMEDOUT => return error.TimedOut,
|
||||
os.EINTR => continue,
|
||||
os.EAGAIN => return,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const WindowsFutex = struct {
|
||||
pub fn wake(waiters: *u32, wake_count: u32) void {
|
||||
const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
|
||||
const key = @ptrCast(*const c_void, waiters);
|
||||
|
||||
var waiting = wake_count;
|
||||
while (waiting != 0) : (waiting -= 1) {
|
||||
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
||||
assert(rc == .SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
|
||||
const key = @ptrCast(*const c_void, waiters);
|
||||
|
||||
// NT uses timeouts in units of 100ns with negative value being relative
|
||||
var timeout_ptr: ?*windows.LARGE_INTEGER = null;
|
||||
var timeout_value: windows.LARGE_INTEGER = undefined;
|
||||
if (timeout) |timeout_ns| {
|
||||
timeout_ptr = &timeout_value;
|
||||
timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
|
||||
}
|
||||
|
||||
// NtWaitForKeyedEvent doesnt have spurious wake-ups
|
||||
var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
|
||||
switch (rc) {
|
||||
.TIMEOUT => {
|
||||
// update the wait count to signal that we're not waiting anymore.
|
||||
// if the .set() thread already observed that we are, perform a
|
||||
// matching NtWaitForKeyedEvent so that the .set() thread doesn't
|
||||
// deadlock trying to run NtReleaseKeyedEvent above.
|
||||
var waiting = @atomicLoad(u32, waiters, .Monotonic);
|
||||
while (true) {
|
||||
if (waiting == WAKE) {
|
||||
rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
|
||||
assert(rc == .WAIT_0);
|
||||
break;
|
||||
} else {
|
||||
waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return error.TimedOut;
|
||||
},
|
||||
.WAIT_0 => {},
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
var event_handle: usize = EMPTY;
|
||||
const EMPTY = ~@as(usize, 0);
|
||||
const LOADING = EMPTY - 1;
|
||||
|
||||
pub fn getEventHandle() ?windows.HANDLE {
|
||||
var handle = @atomicLoad(usize, &event_handle, .Monotonic);
|
||||
while (true) {
|
||||
switch (handle) {
|
||||
EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
|
||||
const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
|
||||
const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
|
||||
if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
|
||||
handle = 0;
|
||||
@atomicStore(usize, &event_handle, handle, .Monotonic);
|
||||
return @intToPtr(?windows.HANDLE, handle);
|
||||
},
|
||||
LOADING => {
|
||||
SpinLock.yield();
|
||||
handle = @atomicLoad(usize, &event_handle, .Monotonic);
|
||||
},
|
||||
else => {
|
||||
return @intToPtr(?windows.HANDLE, handle);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
test "basic usage" {
|
||||
var event = StaticResetEvent{};
|
||||
|
||||
// test event setting
|
||||
event.set();
|
||||
|
||||
// test event resetting
|
||||
event.reset();
|
||||
|
||||
// test event waiting (non-blocking)
|
||||
event.set();
|
||||
event.wait();
|
||||
event.reset();
|
||||
|
||||
event.set();
|
||||
testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
|
||||
|
||||
// test cross-thread signaling
|
||||
if (std.builtin.single_threaded)
|
||||
return;
|
||||
|
||||
const Context = struct {
|
||||
const Self = @This();
|
||||
|
||||
value: u128 = 0,
|
||||
in: StaticResetEvent = .{},
|
||||
out: StaticResetEvent = .{},
|
||||
|
||||
fn sender(self: *Self) void {
|
||||
// update value and signal input
|
||||
testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.in.set();
|
||||
|
||||
// wait for receiver to update value and signal output
|
||||
self.out.wait();
|
||||
testing.expect(self.value == 2);
|
||||
|
||||
// update value and signal final input
|
||||
self.value = 3;
|
||||
self.in.set();
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) void {
|
||||
// wait for sender to update value and signal input
|
||||
self.in.wait();
|
||||
assert(self.value == 1);
|
||||
|
||||
// update value and signal output
|
||||
self.in.reset();
|
||||
self.value = 2;
|
||||
self.out.set();
|
||||
|
||||
// wait for sender to update value and signal final input
|
||||
self.in.wait();
|
||||
assert(self.value == 3);
|
||||
}
|
||||
|
||||
fn sleeper(self: *Self) void {
|
||||
self.in.set();
|
||||
time.sleep(time.ns_per_ms * 2);
|
||||
self.value = 5;
|
||||
self.out.set();
|
||||
}
|
||||
|
||||
fn timedWaiter(self: *Self) !void {
|
||||
self.in.wait();
|
||||
testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
|
||||
try self.out.timedWait(time.ns_per_ms * 100);
|
||||
testing.expect(self.value == 5);
|
||||
}
|
||||
};
|
||||
|
||||
var context = Context{};
|
||||
const receiver = try std.Thread.spawn(&context, Context.receiver);
|
||||
defer receiver.wait();
|
||||
context.sender();
|
||||
|
||||
if (false) {
|
||||
// I have now observed this fail on macOS, Windows, and Linux.
|
||||
// https://github.com/ziglang/zig/issues/7009
|
||||
var timed = Context.init();
|
||||
defer timed.deinit();
|
||||
const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
|
||||
defer sleeper.wait();
|
||||
try timed.timedWaiter();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user