commit 0fd68f49e2eabb866ea1d21c4657c2a1d3c8ce53 (tree)
parent 577b57784ae70a85f5f4fef37e749687d0e9b6b0
Author: Andrew Kelley <andrew@ziglang.org>
Date: Thu, 24 Dec 2020 00:15:33 -0800
Merge pull request #7519 from ziglang/more-pthreads-integration
More pthreads integration
Diffstat:
18 files changed, 937 insertions(+), 635 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
@@ -410,11 +410,12 @@ set(ZIG_STAGE2_SOURCES
"${CMAKE_SOURCE_DIR}/lib/std/os/windows/bits.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig"
+ "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig"
+ "${CMAKE_SOURCE_DIR}/lib/std/ResetEvent.zig"
+ "${CMAKE_SOURCE_DIR}/lib/std/StaticResetEvent.zig"
"${CMAKE_SOURCE_DIR}/lib/std/pdb.zig"
"${CMAKE_SOURCE_DIR}/lib/std/process.zig"
- "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig"
"${CMAKE_SOURCE_DIR}/lib/std/rand.zig"
- "${CMAKE_SOURCE_DIR}/lib/std/reset_event.zig"
"${CMAKE_SOURCE_DIR}/lib/std/sort.zig"
"${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt.zig"
"${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/addXf3.zig"
@@ -512,7 +513,6 @@ set(ZIG_STAGE2_SOURCES
"${CMAKE_SOURCE_DIR}/src/Cache.zig"
"${CMAKE_SOURCE_DIR}/src/Compilation.zig"
"${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig"
- "${CMAKE_SOURCE_DIR}/src/Event.zig"
"${CMAKE_SOURCE_DIR}/src/Module.zig"
"${CMAKE_SOURCE_DIR}/src/Package.zig"
"${CMAKE_SOURCE_DIR}/src/RangeSet.zig"
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig
@@ -160,6 +160,9 @@ pub fn maybeRefresh(self: *Progress) void {
if (now < self.initial_delay_ns) return;
const held = self.update_lock.tryAcquire() orelse return;
defer held.release();
+ // TODO I have observed this to happen sometimes. I think we need to follow Rust's
+ // lead and guarantee monotonically increasing times in the std lib itself.
+ if (now < self.prev_refresh_timestamp) return;
if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return;
return self.refreshWithHeldLock();
}
diff --git a/lib/std/ResetEvent.zig b/lib/std/ResetEvent.zig
@@ -0,0 +1,297 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2020 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A thread-safe resource which supports blocking until signaled.
+//! This API is for kernel threads, not evented I/O.
+//! This API requires being initialized at runtime, and initialization
+//! can fail. Once initialized, the core operations cannot fail.
+//! If you need an abstraction that cannot fail to be initialized, see
+//! `std.StaticResetEvent`. However if you can handle initialization failure,
+//! it is preferred to use `ResetEvent`.
+
+const ResetEvent = @This();
+const std = @import("std.zig");
+const builtin = std.builtin;
+const testing = std.testing;
+const assert = std.debug.assert;
+const c = std.c;
+const os = std.os;
+const time = std.time;
+
+impl: Impl,
+
+pub const Impl = if (builtin.single_threaded)
+ std.StaticResetEvent.DebugEvent
+else if (std.Target.current.isDarwin())
+ DarwinEvent
+else if (std.Thread.use_pthreads)
+ PosixEvent
+else
+ std.StaticResetEvent.AtomicEvent;
+
+pub const InitError = error{SystemResources};
+
+/// After `init`, it is legal to call any other function.
+pub fn init(ev: *ResetEvent) InitError!void {
+ return ev.impl.init();
+}
+
+/// This function is not thread-safe.
+/// After `deinit`, the only legal function to call is `init`.
+pub fn deinit(ev: *ResetEvent) void {
+ return ev.impl.deinit();
+}
+
+/// Sets the event if not already set and wakes up all the threads waiting on
+/// the event. It is safe to call `set` multiple times before calling `wait`.
+/// However it is illegal to call `set` after `wait` is called until the event
+/// is `reset`. This function is thread-safe.
+pub fn set(ev: *ResetEvent) void {
+ return ev.impl.set();
+}
+
+/// Resets the event to its original, unset state.
+/// This function is *not* thread-safe. It is equivalent to calling
+/// `deinit` followed by `init` but without the possibility of failure.
+pub fn reset(ev: *ResetEvent) void {
+ return ev.impl.reset();
+}
+
+/// Wait for the event to be set by blocking the current thread.
+/// Thread-safe. No spurious wakeups.
+/// Upon return from `wait`, the only functions available to be called
+/// in `ResetEvent` are `reset` and `deinit`.
+pub fn wait(ev: *ResetEvent) void {
+ return ev.impl.wait();
+}
+
+pub const TimedWaitResult = enum { event_set, timed_out };
+
+/// Wait for the event to be set by blocking the current thread.
+/// A timeout in nanoseconds can be provided as a hint for how
+/// long the thread should block on the unset event before returning
+/// `TimedWaitResult.timed_out`.
+/// Thread-safe. No precision of timing is guaranteed.
+/// Upon return from `wait`, the only functions available to be called
+/// in `ResetEvent` are `reset` and `deinit`.
+pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult {
+ return ev.impl.timedWait(timeout_ns);
+}
+
+/// Apple has decided to not support POSIX semaphores, so we go with a
+/// different approach using Grand Central Dispatch. This API is exposed
+/// by libSystem so it is guaranteed to be available on all Darwin platforms.
+pub const DarwinEvent = struct {
+ sem: c.dispatch_semaphore_t = undefined,
+
+ pub fn init(ev: *DarwinEvent) !void {
+ ev.* = .{
+ .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources,
+ };
+ }
+
+ pub fn deinit(ev: *DarwinEvent) void {
+ c.dispatch_release(ev.sem);
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *DarwinEvent) void {
+ // Empirically this returns the numerical value of the semaphore.
+ _ = c.dispatch_semaphore_signal(ev.sem);
+ }
+
+ pub fn wait(ev: *DarwinEvent) void {
+ assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0);
+ }
+
+ pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult {
+ const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns));
+ if (c.dispatch_semaphore_wait(ev.sem, t) != 0) {
+ return .timed_out;
+ } else {
+ return .event_set;
+ }
+ }
+
+ pub fn reset(ev: *DarwinEvent) void {
+ // Keep calling until the semaphore goes back down to 0.
+ while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {}
+ }
+};
+
+/// POSIX semaphores must be initialized at runtime because they are allowed to
+/// be implemented as file descriptors, in which case initialization would require
+/// a syscall to open the fd.
+pub const PosixEvent = struct {
+ sem: c.sem_t = undefined,
+
+ pub fn init(ev: *PosixEvent) !void {
+ switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) {
+ 0 => return,
+ else => return error.SystemResources,
+ }
+ }
+
+ pub fn deinit(ev: *PosixEvent) void {
+ assert(c.sem_destroy(&ev.sem) == 0);
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *PosixEvent) void {
+ assert(c.sem_post(&ev.sem) == 0);
+ }
+
+ pub fn wait(ev: *PosixEvent) void {
+ while (true) {
+ switch (c.getErrno(c.sem_wait(&ev.sem))) {
+ 0 => return,
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ else => unreachable,
+ }
+ }
+ }
+
+ pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult {
+ var ts: os.timespec = undefined;
+ var timeout_abs = timeout_ns;
+ os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out;
+ timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
+ timeout_abs += @intCast(u64, ts.tv_nsec);
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
+ while (true) {
+ switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) {
+ 0 => return .event_set,
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ c.ETIMEDOUT => return .timed_out,
+ else => unreachable,
+ }
+ }
+ }
+
+ pub fn reset(ev: *PosixEvent) void {
+ while (true) {
+ switch (c.getErrno(c.sem_trywait(&ev.sem))) {
+ 0 => continue, // Need to make it go to zero.
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ c.EAGAIN => return, // The semaphore currently has the value zero.
+ else => unreachable,
+ }
+ }
+ }
+};
+
+test "basic usage" {
+ var event: ResetEvent = undefined;
+ try event.init();
+ defer event.deinit();
+
+ // test event setting
+ event.set();
+
+ // test event resetting
+ event.reset();
+
+ // test event waiting (non-blocking)
+ event.set();
+ event.wait();
+ event.reset();
+
+ event.set();
+ testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+
+ // test cross-thread signaling
+ if (builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ const Self = @This();
+
+ value: u128,
+ in: ResetEvent,
+ out: ResetEvent,
+
+ fn init(self: *Self) !void {
+ self.* = .{
+ .value = 0,
+ .in = undefined,
+ .out = undefined,
+ };
+ try self.in.init();
+ try self.out.init();
+ }
+
+ fn deinit(self: *Self) void {
+ self.in.deinit();
+ self.out.deinit();
+ self.* = undefined;
+ }
+
+ fn sender(self: *Self) void {
+ // update value and signal input
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.in.set();
+
+ // wait for receiver to update value and signal output
+ self.out.wait();
+ testing.expect(self.value == 2);
+
+ // update value and signal final input
+ self.value = 3;
+ self.in.set();
+ }
+
+ fn receiver(self: *Self) void {
+ // wait for sender to update value and signal input
+ self.in.wait();
+ assert(self.value == 1);
+
+ // update value and signal output
+ self.in.reset();
+ self.value = 2;
+ self.out.set();
+
+ // wait for sender to update value and signal final input
+ self.in.wait();
+ assert(self.value == 3);
+ }
+
+ fn sleeper(self: *Self) void {
+ self.in.set();
+ time.sleep(time.ns_per_ms * 2);
+ self.value = 5;
+ self.out.set();
+ }
+
+ fn timedWaiter(self: *Self) !void {
+ self.in.wait();
+ testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
+ try self.out.timedWait(time.ns_per_ms * 100);
+ testing.expect(self.value == 5);
+ }
+ };
+
+ var context: Context = undefined;
+ try context.init();
+ defer context.deinit();
+ const receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+ context.sender();
+
+ if (false) {
+ // I have now observed this fail on macOS, Windows, and Linux.
+ // https://github.com/ziglang/zig/issues/7009
+ var timed = Context.init();
+ defer timed.deinit();
+ const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
+ defer sleeper.wait();
+ try timed.timedWaiter();
+ }
+}
diff --git a/lib/std/StaticResetEvent.zig b/lib/std/StaticResetEvent.zig
@@ -0,0 +1,396 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2020 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A thread-safe resource which supports blocking until signaled.
+//! This API is for kernel threads, not evented I/O.
+//! This API is statically initializable. It cannot fail to be initialized
+//! and it requires no deinitialization. The downside is that it may not
+//! integrate as cleanly into other synchronization APIs, or, in a worst case,
+//! may be forced to fall back on spin locking. As a rule of thumb, prefer
+//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when
+//! the logic needs stronger API guarantees.
+
+const std = @import("std.zig");
+const StaticResetEvent = @This();
+const SpinLock = std.SpinLock;
+const assert = std.debug.assert;
+const os = std.os;
+const time = std.time;
+const linux = std.os.linux;
+const windows = std.os.windows;
+const testing = std.testing;
+
+impl: Impl = .{},
+
+pub const Impl = if (std.builtin.single_threaded)
+ DebugEvent
+else
+ AtomicEvent;
+
+/// Sets the event if not already set and wakes up all the threads waiting on
+/// the event. It is safe to call `set` multiple times before calling `wait`.
+/// However it is illegal to call `set` after `wait` is called until the event
+/// is `reset`. This function is thread-safe.
+pub fn set(ev: *StaticResetEvent) void {
+ return ev.impl.set();
+}
+
+/// Wait for the event to be set by blocking the current thread.
+/// Thread-safe. No spurious wakeups.
+/// Upon return from `wait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn wait(ev: *StaticResetEvent) void {
+ return ev.impl.wait();
+}
+
+/// Resets the event to its original, unset state.
+/// This function is *not* thread-safe. It is equivalent to calling
+/// `deinit` followed by `init` but without the possibility of failure.
+pub fn reset(ev: *StaticResetEvent) void {
+ return ev.impl.reset();
+}
+
+pub const TimedWaitResult = std.ResetEvent.TimedWaitResult;
+
+/// Wait for the event to be set by blocking the current thread.
+/// A timeout in nanoseconds can be provided as a hint for how
+/// long the thread should block on the unset event before returning
+/// `TimedWaitResult.timed_out`.
+/// Thread-safe. No precision of timing is guaranteed.
+/// Upon return from `timedWait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult {
+ return ev.impl.timedWait(timeout_ns);
+}
+
+/// For single-threaded builds, we use this to detect deadlocks.
+/// In unsafe modes this ends up being no-ops.
+pub const DebugEvent = struct {
+ state: State = State.unset,
+
+ const State = enum {
+ unset,
+ set,
+ waited,
+ };
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.ResetEvent`.
+ pub fn init(ev: *DebugEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.ResetEvent`.
+ pub fn deinit(ev: *DebugEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => ev.state = .set,
+ .set => {},
+ .waited => unreachable, // Not allowed to call `set` until `reset`.
+ }
+ }
+
+ pub fn wait(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => unreachable, // Deadlock detected.
+ .set => return,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult {
+ switch (ev.state) {
+ .unset => return .timed_out,
+ .set => return .event_set,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn reset(ev: *DebugEvent) void {
+ ev.state = .unset;
+ }
+};
+
+pub const AtomicEvent = struct {
+ waiters: u32 = 0,
+
+ const WAKE = 1 << 0;
+ const WAIT = 1 << 1;
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.ResetEvent`.
+ pub fn init(ev: *AtomicEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.ResetEvent`.
+ pub fn deinit(ev: *AtomicEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *AtomicEvent) void {
+ const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release);
+ if (waiters >= WAIT) {
+ return Futex.wake(&ev.waiters, waiters >> 1);
+ }
+ }
+
+ pub fn wait(ev: *AtomicEvent) void {
+ switch (ev.timedWait(null)) {
+ .timed_out => unreachable,
+ .event_set => return,
+ }
+ }
+
+ pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult {
+ var waiters = @atomicLoad(u32, &ev.waiters, .Acquire);
+ while (waiters != WAKE) {
+ waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse {
+ if (Futex.wait(&ev.waiters, timeout)) |_| {
+ return .event_set;
+ } else |_| {
+ return .timed_out;
+ }
+ };
+ }
+ return .event_set;
+ }
+
+ pub fn reset(ev: *AtomicEvent) void {
+ @atomicStore(u32, &ev.waiters, 0, .Monotonic);
+ }
+
+ pub const Futex = switch (std.Target.current.os.tag) {
+ .windows => WindowsFutex,
+ .linux => LinuxFutex,
+ else => SpinFutex,
+ };
+
+ pub const SpinFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {}
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var timer: time.Timer = undefined;
+ if (timeout != null)
+ timer = time.Timer.start() catch return error.TimedOut;
+
+ while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
+ SpinLock.yield();
+ if (timeout) |timeout_ns| {
+ if (timer.read() >= timeout_ns)
+ return error.TimedOut;
+ }
+ }
+ }
+ };
+
+ pub const LinuxFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {
+ const waiting = std.math.maxInt(i32); // wake_count
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
+ assert(linux.getErrno(rc) == 0);
+ }
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var ts: linux.timespec = undefined;
+ var ts_ptr: ?*linux.timespec = null;
+ if (timeout) |timeout_ns| {
+ ts_ptr = &ts;
+ ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
+ ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
+ }
+
+ while (true) {
+ const waiting = @atomicLoad(u32, waiters, .Acquire);
+ if (waiting == WAKE)
+ return;
+ const expected = @intCast(i32, waiting);
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
+ switch (linux.getErrno(rc)) {
+ 0 => continue,
+ os.ETIMEDOUT => return error.TimedOut,
+ os.EINTR => continue,
+ os.EAGAIN => return,
+ else => unreachable,
+ }
+ }
+ }
+ };
+
+ pub const WindowsFutex = struct {
+ pub fn wake(waiters: *u32, wake_count: u32) void {
+ const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
+ const key = @ptrCast(*const c_void, waiters);
+
+ var waiting = wake_count;
+ while (waiting != 0) : (waiting -= 1) {
+ const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .SUCCESS);
+ }
+ }
+
+ pub fn wait(waiters: *u32, timeout: ?u64) !void {
+ const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
+ const key = @ptrCast(*const c_void, waiters);
+
+ // NT uses timeouts in units of 100ns with negative value being relative
+ var timeout_ptr: ?*windows.LARGE_INTEGER = null;
+ var timeout_value: windows.LARGE_INTEGER = undefined;
+ if (timeout) |timeout_ns| {
+ timeout_ptr = &timeout_value;
+ timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
+ }
+
+ // NtWaitForKeyedEvent doesnt have spurious wake-ups
+ var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
+ switch (rc) {
+ .TIMEOUT => {
+ // update the wait count to signal that we're not waiting anymore.
+ // if the .set() thread already observed that we are, perform a
+ // matching NtWaitForKeyedEvent so that the .set() thread doesn't
+ // deadlock trying to run NtReleaseKeyedEvent above.
+ var waiting = @atomicLoad(u32, waiters, .Monotonic);
+ while (true) {
+ if (waiting == WAKE) {
+ rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .WAIT_0);
+ break;
+ } else {
+ waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
+ continue;
+ }
+ }
+ return error.TimedOut;
+ },
+ .WAIT_0 => {},
+ else => unreachable,
+ }
+ }
+
+ var event_handle: usize = EMPTY;
+ const EMPTY = ~@as(usize, 0);
+ const LOADING = EMPTY - 1;
+
+ pub fn getEventHandle() ?windows.HANDLE {
+ var handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ while (true) {
+ switch (handle) {
+ EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
+ const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
+ const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
+ if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
+ handle = 0;
+ @atomicStore(usize, &event_handle, handle, .Monotonic);
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ LOADING => {
+ SpinLock.yield();
+ handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ },
+ else => {
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ }
+ }
+ }
+ };
+};
+
+test "basic usage" {
+ var event = StaticResetEvent{};
+
+ // test event setting
+ event.set();
+
+ // test event resetting
+ event.reset();
+
+ // test event waiting (non-blocking)
+ event.set();
+ event.wait();
+ event.reset();
+
+ event.set();
+ testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+
+ // test cross-thread signaling
+ if (std.builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ const Self = @This();
+
+ value: u128 = 0,
+ in: StaticResetEvent = .{},
+ out: StaticResetEvent = .{},
+
+ fn sender(self: *Self) void {
+ // update value and signal input
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.in.set();
+
+ // wait for receiver to update value and signal output
+ self.out.wait();
+ testing.expect(self.value == 2);
+
+ // update value and signal final input
+ self.value = 3;
+ self.in.set();
+ }
+
+ fn receiver(self: *Self) void {
+ // wait for sender to update value and signal input
+ self.in.wait();
+ assert(self.value == 1);
+
+ // update value and signal output
+ self.in.reset();
+ self.value = 2;
+ self.out.set();
+
+ // wait for sender to update value and signal final input
+ self.in.wait();
+ assert(self.value == 3);
+ }
+
+ fn sleeper(self: *Self) void {
+ self.in.set();
+ time.sleep(time.ns_per_ms * 2);
+ self.value = 5;
+ self.out.set();
+ }
+
+ fn timedWaiter(self: *Self) !void {
+ self.in.wait();
+ testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
+ try self.out.timedWait(time.ns_per_ms * 100);
+ testing.expect(self.value == 5);
+ }
+ };
+
+ var context = Context{};
+ const receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+ context.sender();
+
+ if (false) {
+ // I have now observed this fail on macOS, Windows, and Linux.
+ // https://github.com/ziglang/zig/issues/7009
+ var timed = Context.init();
+ defer timed.deinit();
+ const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
+ defer sleeper.wait();
+ try timed.timedWaiter();
+ }
+}
diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig
@@ -7,14 +7,15 @@ const std = @import("std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const assert = std.debug.assert;
+const StaticResetEvent = std.StaticResetEvent;
-/// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`.
-/// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like).
+/// Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`.
+/// Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like).
pub const AutoResetEvent = struct {
/// AutoResetEvent has 3 possible states:
/// - UNSET: the AutoResetEvent is currently unset
/// - SET: the AutoResetEvent was notified before a wait() was called
- /// - <std.ResetEvent pointer>: there is an active waiter waiting for a notification.
+ /// - <StaticResetEvent pointer>: there is an active waiter waiting for a notification.
///
/// When attempting to wait:
/// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
@@ -25,20 +26,20 @@ pub const AutoResetEvent = struct {
/// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
///
/// This ensures that the event is automatically reset after a wait() has been issued
- /// and avoids the race condition when using std.ResetEvent in the following scenario:
- /// thread 1 | thread 2
- /// std.ResetEvent.wait() |
- /// | std.ResetEvent.set()
- /// | std.ResetEvent.set()
- /// std.ResetEvent.reset() |
- /// std.ResetEvent.wait() | (missed the second .set() notification above)
+ /// and avoids the race condition when using StaticResetEvent in the following scenario:
+ /// thread 1 | thread 2
+ /// StaticResetEvent.wait() |
+ /// | StaticResetEvent.set()
+ /// | StaticResetEvent.set()
+ /// StaticResetEvent.reset() |
+ /// StaticResetEvent.wait() | (missed the second .set() notification above)
state: usize = UNSET,
const UNSET = 0;
const SET = 1;
- /// the minimum alignment for the `*std.ResetEvent` created by wait*()
- const event_align = std.math.max(@alignOf(std.ResetEvent), 2);
+ /// the minimum alignment for the `*StaticResetEvent` created by wait*()
+ const event_align = std.math.max(@alignOf(StaticResetEvent), 2);
pub fn wait(self: *AutoResetEvent) void {
self.waitFor(null) catch unreachable;
@@ -49,12 +50,9 @@ pub const AutoResetEvent = struct {
}
fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
- // lazily initialized std.ResetEvent
- var reset_event: std.ResetEvent align(event_align) = undefined;
+ // lazily initialized StaticResetEvent
+ var reset_event: StaticResetEvent align(event_align) = undefined;
var has_reset_event = false;
- defer if (has_reset_event) {
- reset_event.deinit();
- };
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
@@ -72,7 +70,7 @@ pub const AutoResetEvent = struct {
// lazily initialize the ResetEvent if it hasn't been already
if (!has_reset_event) {
has_reset_event = true;
- reset_event = std.ResetEvent.init();
+ reset_event = .{};
}
// Since the AutoResetEvent currently isnt set,
@@ -97,9 +95,10 @@ pub const AutoResetEvent = struct {
};
// wait with a timeout and return if signalled via set()
- if (reset_event.timedWait(timeout_ns)) |_| {
- return;
- } else |timed_out| {}
+ switch (reset_event.timedWait(timeout_ns)) {
+ .event_set => return,
+ .timed_out => {},
+ }
// If we timed out, we need to transition the AutoResetEvent back to UNSET.
// If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
@@ -164,7 +163,7 @@ pub const AutoResetEvent = struct {
continue;
}
- const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state);
+ const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state);
reset_event.set();
return;
}
diff --git a/lib/std/c.zig b/lib/std/c.zig
@@ -270,6 +270,13 @@ pub extern "c" fn pthread_atfork(
parent: ?fn () callconv(.C) void,
child: ?fn () callconv(.C) void,
) c_int;
+pub extern "c" fn sem_init(sem: *sem_t, pshared: c_int, value: c_uint) c_int;
+pub extern "c" fn sem_destroy(sem: *sem_t) c_int;
+pub extern "c" fn sem_post(sem: *sem_t) c_int;
+pub extern "c" fn sem_wait(sem: *sem_t) c_int;
+pub extern "c" fn sem_trywait(sem: *sem_t) c_int;
+pub extern "c" fn sem_timedwait(sem: *sem_t, abs_timeout: *const timespec) c_int;
+pub extern "c" fn sem_getvalue(sem: *sem_t, sval: *c_int) c_int;
pub extern "c" fn kqueue() c_int;
pub extern "c" fn kevent(
@@ -316,6 +323,7 @@ pub extern "c" fn dn_expand(
pub const PTHREAD_MUTEX_INITIALIZER = pthread_mutex_t{};
pub extern "c" fn pthread_mutex_lock(mutex: *pthread_mutex_t) c_int;
pub extern "c" fn pthread_mutex_unlock(mutex: *pthread_mutex_t) c_int;
+pub extern "c" fn pthread_mutex_trylock(mutex: *pthread_mutex_t) c_int;
pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int;
pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{};
diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig
@@ -177,6 +177,7 @@ pub const pthread_cond_t = extern struct {
__sig: c_long = 0x3CB0B1BB,
__opaque: [__PTHREAD_COND_SIZE__]u8 = [_]u8{0} ** __PTHREAD_COND_SIZE__,
};
+pub const sem_t = c_int;
const __PTHREAD_MUTEX_SIZE__ = if (@sizeOf(usize) == 8) 56 else 40;
const __PTHREAD_COND_SIZE__ = if (@sizeOf(usize) == 8) 40 else 24;
@@ -186,3 +187,15 @@ pub const pthread_attr_t = extern struct {
};
pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void;
+
+// Grand Central Dispatch is exposed by libSystem.
+pub const dispatch_semaphore_t = *opaque{};
+pub const dispatch_time_t = u64;
+pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0);
+pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0);
+pub extern "c" fn dispatch_semaphore_create(value: isize) ?dispatch_semaphore_t;
+pub extern "c" fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize;
+pub extern "c" fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize;
+
+pub extern "c" fn dispatch_release(object: *c_void) void;
+pub extern "c" fn dispatch_time(when: dispatch_time_t, delta: i64) dispatch_time_t;
diff --git a/lib/std/c/freebsd.zig b/lib/std/c/freebsd.zig
@@ -47,6 +47,15 @@ pub const pthread_attr_t = extern struct {
__align: c_long,
};
+pub const sem_t = extern struct {
+ _magic: u32,
+ _kern: extern struct {
+ _count: u32,
+ _flags: u32,
+ },
+ _padding: u32,
+};
+
pub const EAI = extern enum(c_int) {
/// address family for hostname not supported
ADDRFAMILY = 1,
diff --git a/lib/std/c/linux.zig b/lib/std/c/linux.zig
@@ -123,6 +123,10 @@ pub const pthread_mutex_t = extern struct {
pub const pthread_cond_t = extern struct {
size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T,
};
+pub const sem_t = extern struct {
+ __size: [__SIZEOF_SEM_T]u8 align(@alignOf(usize)),
+};
+
const __SIZEOF_PTHREAD_COND_T = 48;
const __SIZEOF_PTHREAD_MUTEX_T = if (builtin.os.tag == .fuchsia) 40 else switch (builtin.abi) {
.musl, .musleabi, .musleabihf => if (@sizeOf(usize) == 8) 40 else 24,
@@ -134,6 +138,7 @@ const __SIZEOF_PTHREAD_MUTEX_T = if (builtin.os.tag == .fuchsia) 40 else switch
},
else => unreachable,
};
+const __SIZEOF_SEM_T = 4 * @sizeOf(usize);
pub const RTLD_LAZY = 1;
pub const RTLD_NOW = 2;
diff --git a/lib/std/debug.zig b/lib/std/debug.zig
@@ -274,9 +274,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c
// and call abort()
// Sleep forever without hammering the CPU
- var event = std.ResetEvent.init();
+ var event: std.StaticResetEvent = .{};
event.wait();
-
unreachable;
}
},
diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig
@@ -758,7 +758,8 @@ test "open file with exclusive lock twice, make sure it waits" {
}
};
- var evt = std.ResetEvent.init();
+ var evt: std.ResetEvent = undefined;
+ try evt.init();
defer evt.deinit();
const t = try std.Thread.spawn(S.C{ .dir = &tmp.dir, .evt = &evt }, S.checkFn);
@@ -771,8 +772,6 @@ test "open file with exclusive lock twice, make sure it waits" {
std.time.sleep(SLEEP_TIMEOUT_NS);
if (timer.read() >= SLEEP_TIMEOUT_NS) break;
}
- // Check that createFile is still waiting for the lock to be released.
- testing.expect(!evt.isSet());
file.close();
// No timeout to avoid failures on heavily loaded systems.
evt.wait();
diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig
@@ -10,7 +10,7 @@ const assert = std.debug.assert;
const windows = os.windows;
const testing = std.testing;
const SpinLock = std.SpinLock;
-const ResetEvent = std.ResetEvent;
+const StaticResetEvent = std.StaticResetEvent;
/// Lock may be held only once. If the same thread tries to acquire
/// the same mutex twice, it deadlocks. This type supports static
@@ -37,6 +37,8 @@ pub const Mutex = if (builtin.single_threaded)
Dummy
else if (builtin.os.tag == .windows)
WindowsMutex
+else if (std.Thread.use_pthreads)
+ PthreadMutex
else if (builtin.link_libc or builtin.os.tag == .linux)
// stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
struct {
@@ -52,7 +54,7 @@ else if (builtin.link_libc or builtin.os.tag == .linux)
const Node = struct {
next: ?*Node,
- event: ResetEvent,
+ event: StaticResetEvent,
};
pub fn tryAcquire(self: *Mutex) ?Held {
@@ -88,11 +90,12 @@ else if (builtin.link_libc or builtin.os.tag == .linux)
state = @atomicLoad(usize, &self.state, .Monotonic);
}
- // create the ResetEvent node on the stack
+ // create the StaticResetEvent node on the stack
// (faster than threadlocal on platforms like OSX)
- var node: Node = undefined;
- node.event = ResetEvent.init();
- defer node.event.deinit();
+ var node: Node = .{
+ .next = undefined,
+ .event = .{},
+ };
// we've spun too long, try and add our node to the LIFO queue.
// if the mutex becomes available in the process, try and grab it instead.
@@ -166,6 +169,52 @@ else if (builtin.link_libc or builtin.os.tag == .linux)
else
SpinLock;
+pub const PthreadMutex = struct {
+ pthread_mutex: std.c.pthread_mutex_t = init,
+
+ pub const Held = struct {
+ mutex: *PthreadMutex,
+
+ pub fn release(self: Held) void {
+ switch (std.c.pthread_mutex_unlock(&self.mutex.pthread_mutex)) {
+ 0 => return,
+ std.c.EINVAL => unreachable,
+ std.c.EAGAIN => unreachable,
+ std.c.EPERM => unreachable,
+ else => unreachable,
+ }
+ }
+ };
+
+ /// Create a new mutex in unlocked state.
+ pub const init = std.c.PTHREAD_MUTEX_INITIALIZER;
+
+ /// Try to acquire the mutex without blocking. Returns null if
+ /// the mutex is unavailable. Otherwise returns Held. Call
+ /// release on Held.
+ pub fn tryAcquire(self: *PthreadMutex) ?Held {
+ if (std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0) {
+ return Held{ .mutex = self };
+ } else {
+ return null;
+ }
+ }
+
+ /// Acquire the mutex. Will deadlock if the mutex is already
+ /// held by the calling thread.
+ pub fn acquire(self: *PthreadMutex) Held {
+ switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) {
+ 0 => return Held{ .mutex = self },
+ std.c.EINVAL => unreachable,
+ std.c.EBUSY => unreachable,
+ std.c.EAGAIN => unreachable,
+ std.c.EDEADLK => unreachable,
+ std.c.EPERM => unreachable,
+ else => unreachable,
+ }
+ }
+};
+
/// This has the sematics as `Mutex`, however it does not actually do any
/// synchronization. Operations are safety-checked no-ops.
pub const Dummy = struct {
@@ -236,7 +285,7 @@ const WindowsMutex = struct {
fn acquireSlow(self: *WindowsMutex) Held {
// try to use NT keyed events for blocking, falling back to spinlock if unavailable
@setCold(true);
- const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning();
+ const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning();
const key = @ptrCast(*const c_void, &self.state.waiters);
while (true) : (SpinLock.loopHint(1)) {
@@ -264,7 +313,7 @@ const WindowsMutex = struct {
pub fn release(self: Held) void {
// unlock without a rmw/cmpxchg instruction
@atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release);
- const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return;
+ const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return;
const key = @ptrCast(*const c_void, &self.mutex.state.waiters);
while (true) : (SpinLock.loopHint(1)) {
diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig
@@ -1,468 +0,0 @@
-// SPDX-License-Identifier: MIT
-// Copyright (c) 2015-2020 Zig Contributors
-// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
-// The MIT license requires this copyright notice to be included in all copies
-// and substantial portions of the software.
-const std = @import("std.zig");
-const builtin = @import("builtin");
-const testing = std.testing;
-const SpinLock = std.SpinLock;
-const assert = std.debug.assert;
-const c = std.c;
-const os = std.os;
-const time = std.time;
-const linux = os.linux;
-const windows = os.windows;
-
-/// A resource object which supports blocking until signaled.
-/// Once finished, the `deinit()` method should be called for correctness.
-pub const ResetEvent = struct {
- os_event: OsEvent,
-
- pub const OsEvent = if (builtin.single_threaded)
- DebugEvent
- else if (builtin.link_libc and builtin.os.tag != .windows and builtin.os.tag != .linux)
- PosixEvent
- else
- AtomicEvent;
-
- pub fn init() ResetEvent {
- return ResetEvent{ .os_event = OsEvent.init() };
- }
-
- pub fn deinit(self: *ResetEvent) void {
- self.os_event.deinit();
- }
-
- /// Returns whether or not the event is currenetly set
- pub fn isSet(self: *ResetEvent) bool {
- return self.os_event.isSet();
- }
-
- /// Sets the event if not already set and
- /// wakes up all the threads waiting on the event.
- pub fn set(self: *ResetEvent) void {
- return self.os_event.set();
- }
-
- /// Resets the event to its original, unset state.
- pub fn reset(self: *ResetEvent) void {
- return self.os_event.reset();
- }
-
- /// Wait for the event to be set by blocking the current thread.
- pub fn wait(self: *ResetEvent) void {
- return self.os_event.wait(null) catch unreachable;
- }
-
- /// Wait for the event to be set by blocking the current thread.
- /// A timeout in nanoseconds can be provided as a hint for how
- /// long the thread should block on the unset event before throwing error.TimedOut.
- pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void {
- return self.os_event.wait(timeout_ns);
- }
-};
-
-const DebugEvent = struct {
- is_set: bool,
-
- fn init() DebugEvent {
- return DebugEvent{ .is_set = false };
- }
-
- fn deinit(self: *DebugEvent) void {
- self.* = undefined;
- }
-
- fn isSet(self: *DebugEvent) bool {
- return self.is_set;
- }
-
- fn reset(self: *DebugEvent) void {
- self.is_set = false;
- }
-
- fn set(self: *DebugEvent) void {
- self.is_set = true;
- }
-
- fn wait(self: *DebugEvent, timeout: ?u64) !void {
- if (self.is_set)
- return;
- if (timeout != null)
- return error.TimedOut;
- @panic("deadlock detected");
- }
-};
-
-const PosixEvent = struct {
- is_set: bool,
- cond: c.pthread_cond_t,
- mutex: c.pthread_mutex_t,
-
- fn init() PosixEvent {
- return PosixEvent{
- .is_set = false,
- .cond = c.PTHREAD_COND_INITIALIZER,
- .mutex = c.PTHREAD_MUTEX_INITIALIZER,
- };
- }
-
- fn deinit(self: *PosixEvent) void {
- // on dragonfly or openbsd, *destroy() functions can return EINVAL
- // for statically initialized pthread structures
- const err = if (builtin.os.tag == .dragonfly or builtin.os.tag == .openbsd)
- os.EINVAL
- else
- 0;
-
- const retm = c.pthread_mutex_destroy(&self.mutex);
- assert(retm == 0 or retm == err);
- const retc = c.pthread_cond_destroy(&self.cond);
- assert(retc == 0 or retc == err);
- }
-
- fn isSet(self: *PosixEvent) bool {
- assert(c.pthread_mutex_lock(&self.mutex) == 0);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == 0);
-
- return self.is_set;
- }
-
- fn reset(self: *PosixEvent) void {
- assert(c.pthread_mutex_lock(&self.mutex) == 0);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == 0);
-
- self.is_set = false;
- }
-
- fn set(self: *PosixEvent) void {
- assert(c.pthread_mutex_lock(&self.mutex) == 0);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == 0);
-
- if (!self.is_set) {
- self.is_set = true;
- assert(c.pthread_cond_broadcast(&self.cond) == 0);
- }
- }
-
- fn wait(self: *PosixEvent, timeout: ?u64) !void {
- assert(c.pthread_mutex_lock(&self.mutex) == 0);
- defer assert(c.pthread_mutex_unlock(&self.mutex) == 0);
-
- // quick guard before possibly calling time syscalls below
- if (self.is_set)
- return;
-
- var ts: os.timespec = undefined;
- if (timeout) |timeout_ns| {
- var timeout_abs = timeout_ns;
- if (comptime std.Target.current.isDarwin()) {
- var tv: os.darwin.timeval = undefined;
- assert(os.darwin.gettimeofday(&tv, null) == 0);
- timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s;
- timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us;
- } else {
- os.clock_gettime(os.CLOCK_REALTIME, &ts) catch unreachable;
- timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
- timeout_abs += @intCast(u64, ts.tv_nsec);
- }
- ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
- ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
- }
-
- while (!self.is_set) {
- const rc = switch (timeout == null) {
- true => c.pthread_cond_wait(&self.cond, &self.mutex),
- else => c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts),
- };
- switch (rc) {
- 0 => {},
- os.ETIMEDOUT => return error.TimedOut,
- os.EINVAL => unreachable,
- os.EPERM => unreachable,
- else => unreachable,
- }
- }
- }
-};
-
-const AtomicEvent = struct {
- waiters: u32,
-
- const WAKE = 1 << 0;
- const WAIT = 1 << 1;
-
- fn init() AtomicEvent {
- return AtomicEvent{ .waiters = 0 };
- }
-
- fn deinit(self: *AtomicEvent) void {
- self.* = undefined;
- }
-
- fn isSet(self: *const AtomicEvent) bool {
- return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE;
- }
-
- fn reset(self: *AtomicEvent) void {
- @atomicStore(u32, &self.waiters, 0, .Monotonic);
- }
-
- fn set(self: *AtomicEvent) void {
- const waiters = @atomicRmw(u32, &self.waiters, .Xchg, WAKE, .Release);
- if (waiters >= WAIT) {
- return Futex.wake(&self.waiters, waiters >> 1);
- }
- }
-
- fn wait(self: *AtomicEvent, timeout: ?u64) !void {
- var waiters = @atomicLoad(u32, &self.waiters, .Acquire);
- while (waiters != WAKE) {
- waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse return Futex.wait(&self.waiters, timeout);
- }
- }
-
- pub const Futex = switch (builtin.os.tag) {
- .windows => WindowsFutex,
- .linux => LinuxFutex,
- else => SpinFutex,
- };
-
- const SpinFutex = struct {
- fn wake(waiters: *u32, wake_count: u32) void {}
-
- fn wait(waiters: *u32, timeout: ?u64) !void {
- // TODO: handle platforms where a monotonic timer isnt available
- var timer: time.Timer = undefined;
- if (timeout != null)
- timer = time.Timer.start() catch unreachable;
-
- while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
- SpinLock.yield();
- if (timeout) |timeout_ns| {
- if (timer.read() >= timeout_ns)
- return error.TimedOut;
- }
- }
- }
- };
-
- const LinuxFutex = struct {
- fn wake(waiters: *u32, wake_count: u32) void {
- const waiting = std.math.maxInt(i32); // wake_count
- const ptr = @ptrCast(*const i32, waiters);
- const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
- assert(linux.getErrno(rc) == 0);
- }
-
- fn wait(waiters: *u32, timeout: ?u64) !void {
- var ts: linux.timespec = undefined;
- var ts_ptr: ?*linux.timespec = null;
- if (timeout) |timeout_ns| {
- ts_ptr = &ts;
- ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
- ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
- }
-
- while (true) {
- const waiting = @atomicLoad(u32, waiters, .Acquire);
- if (waiting == WAKE)
- return;
- const expected = @intCast(i32, waiting);
- const ptr = @ptrCast(*const i32, waiters);
- const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
- switch (linux.getErrno(rc)) {
- 0 => continue,
- os.ETIMEDOUT => return error.TimedOut,
- os.EINTR => continue,
- os.EAGAIN => return,
- else => unreachable,
- }
- }
- }
- };
-
- const WindowsFutex = struct {
- pub fn wake(waiters: *u32, wake_count: u32) void {
- const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
- const key = @ptrCast(*const c_void, waiters);
-
- var waiting = wake_count;
- while (waiting != 0) : (waiting -= 1) {
- const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
- assert(rc == .SUCCESS);
- }
- }
-
- pub fn wait(waiters: *u32, timeout: ?u64) !void {
- const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
- const key = @ptrCast(*const c_void, waiters);
-
- // NT uses timeouts in units of 100ns with negative value being relative
- var timeout_ptr: ?*windows.LARGE_INTEGER = null;
- var timeout_value: windows.LARGE_INTEGER = undefined;
- if (timeout) |timeout_ns| {
- timeout_ptr = &timeout_value;
- timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
- }
-
- // NtWaitForKeyedEvent doesnt have spurious wake-ups
- var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
- switch (rc) {
- .TIMEOUT => {
- // update the wait count to signal that we're not waiting anymore.
- // if the .set() thread already observed that we are, perform a
- // matching NtWaitForKeyedEvent so that the .set() thread doesn't
- // deadlock trying to run NtReleaseKeyedEvent above.
- var waiting = @atomicLoad(u32, waiters, .Monotonic);
- while (true) {
- if (waiting == WAKE) {
- rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
- assert(rc == .WAIT_0);
- break;
- } else {
- waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
- continue;
- }
- }
- return error.TimedOut;
- },
- .WAIT_0 => {},
- else => unreachable,
- }
- }
-
- var event_handle: usize = EMPTY;
- const EMPTY = ~@as(usize, 0);
- const LOADING = EMPTY - 1;
-
- pub fn getEventHandle() ?windows.HANDLE {
- var handle = @atomicLoad(usize, &event_handle, .Monotonic);
- while (true) {
- switch (handle) {
- EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
- const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
- const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
- if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
- handle = 0;
- @atomicStore(usize, &event_handle, handle, .Monotonic);
- return @intToPtr(?windows.HANDLE, handle);
- },
- LOADING => {
- SpinLock.yield();
- handle = @atomicLoad(usize, &event_handle, .Monotonic);
- },
- else => {
- return @intToPtr(?windows.HANDLE, handle);
- },
- }
- }
- }
- };
-};
-
-test "ResetEvent" {
- var event = ResetEvent.init();
- defer event.deinit();
-
- // test event setting
- testing.expect(event.isSet() == false);
- event.set();
- testing.expect(event.isSet() == true);
-
- // test event resetting
- event.reset();
- testing.expect(event.isSet() == false);
-
- // test event waiting (non-blocking)
- event.set();
- event.wait();
- try event.timedWait(1);
-
- // test cross-thread signaling
- if (builtin.single_threaded)
- return;
-
- const Context = struct {
- const Self = @This();
-
- value: u128,
- in: ResetEvent,
- out: ResetEvent,
-
- fn init() Self {
- return Self{
- .value = 0,
- .in = ResetEvent.init(),
- .out = ResetEvent.init(),
- };
- }
-
- fn deinit(self: *Self) void {
- self.in.deinit();
- self.out.deinit();
- self.* = undefined;
- }
-
- fn sender(self: *Self) void {
- // update value and signal input
- testing.expect(self.value == 0);
- self.value = 1;
- self.in.set();
-
- // wait for receiver to update value and signal output
- self.out.wait();
- testing.expect(self.value == 2);
-
- // update value and signal final input
- self.value = 3;
- self.in.set();
- }
-
- fn receiver(self: *Self) void {
- // wait for sender to update value and signal input
- self.in.wait();
- assert(self.value == 1);
-
- // update value and signal output
- self.in.reset();
- self.value = 2;
- self.out.set();
-
- // wait for sender to update value and signal final input
- self.in.wait();
- assert(self.value == 3);
- }
-
- fn sleeper(self: *Self) void {
- self.in.set();
- time.sleep(time.ns_per_ms * 2);
- self.value = 5;
- self.out.set();
- }
-
- fn timedWaiter(self: *Self) !void {
- self.in.wait();
- testing.expectError(error.TimedOut, self.out.timedWait(time.ns_per_us));
- try self.out.timedWait(time.ns_per_ms * 100);
- testing.expect(self.value == 5);
- }
- };
-
- var context = Context.init();
- defer context.deinit();
- const receiver = try std.Thread.spawn(&context, Context.receiver);
- defer receiver.wait();
- context.sender();
-
- if (false) {
- // I have now observed this fail on macOS, Windows, and Linux.
- // https://github.com/ziglang/zig/issues/7009
- var timed = Context.init();
- defer timed.deinit();
- const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
- defer sleeper.wait();
- try timed.timedWaiter();
- }
-}
diff --git a/lib/std/std.zig b/lib/std/std.zig
@@ -30,10 +30,11 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice;
pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian;
pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue;
pub const Progress = @import("Progress.zig");
-pub const ResetEvent = @import("reset_event.zig").ResetEvent;
+pub const ResetEvent = @import("ResetEvent.zig");
pub const SemanticVersion = @import("SemanticVersion.zig");
pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList;
pub const SpinLock = @import("spinlock.zig").SpinLock;
+pub const StaticResetEvent = @import("StaticResetEvent.zig");
pub const StringHashMap = hash_map.StringHashMap;
pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged;
pub const StringArrayHashMap = array_hash_map.StringArrayHashMap;
diff --git a/src/Compilation.zig b/src/Compilation.zig
@@ -135,6 +135,8 @@ emit_docs: ?EmitLoc,
c_header: ?c_link.Header,
+work_queue_wait_group: WaitGroup,
+
pub const InnerError = Module.InnerError;
pub const CRTFile = struct {
@@ -1006,11 +1008,15 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation {
.test_filter = options.test_filter,
.test_name_prefix = options.test_name_prefix,
.test_evented_io = options.test_evented_io,
+ .work_queue_wait_group = undefined,
};
break :comp comp;
};
errdefer comp.destroy();
+ try comp.work_queue_wait_group.init();
+ errdefer comp.work_queue_wait_group.deinit();
+
if (comp.bin_file.options.module) |mod| {
try comp.work_queue.writeItem(.{ .generate_builtin_zig = {} });
}
@@ -1191,6 +1197,8 @@ pub fn destroy(self: *Compilation) void {
self.cache_parent.manifest_dir.close();
if (self.owned_link_dir) |*dir| dir.close();
+ self.work_queue_wait_group.deinit();
+
// This destroys `self`.
self.arena_state.promote(gpa).deinit();
}
@@ -1405,13 +1413,13 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor
var arena = std.heap.ArenaAllocator.init(self.gpa);
defer arena.deinit();
- var wg = WaitGroup{};
- defer wg.wait();
+ self.work_queue_wait_group.reset();
+ defer self.work_queue_wait_group.wait();
while (self.c_object_work_queue.readItem()) |c_object| {
- wg.start();
+ self.work_queue_wait_group.start();
try self.thread_pool.spawn(workerUpdateCObject, .{
- self, c_object, &c_comp_progress_node, &wg,
+ self, c_object, &c_comp_progress_node, &self.work_queue_wait_group,
});
}
@@ -1764,7 +1772,7 @@ fn workerUpdateCObject(
progress_node: *std.Progress.Node,
wg: *WaitGroup,
) void {
- defer wg.stop();
+ defer wg.finish();
comp.updateCObject(c_object, progress_node) catch |err| switch (err) {
error.AnalysisFail => return,
diff --git a/src/Event.zig b/src/Event.zig
@@ -1,43 +0,0 @@
-// SPDX-License-Identifier: MIT
-// Copyright (c) 2015-2020 Zig Contributors
-// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
-// The MIT license requires this copyright notice to be included in all copies
-// and substantial portions of the software.
-const std = @import("std");
-const Event = @This();
-
-lock: std.Mutex = .{},
-event: std.ResetEvent = undefined,
-state: enum { empty, waiting, notified } = .empty,
-
-pub fn wait(self: *Event) void {
- const held = self.lock.acquire();
-
- switch (self.state) {
- .empty => {
- self.state = .waiting;
- self.event = @TypeOf(self.event).init();
- held.release();
- self.event.wait();
- self.event.deinit();
- },
- .waiting => unreachable,
- .notified => held.release(),
- }
-}
-
-pub fn set(self: *Event) void {
- const held = self.lock.acquire();
-
- switch (self.state) {
- .empty => {
- self.state = .notified;
- held.release();
- },
- .waiting => {
- held.release();
- self.event.set();
- },
- .notified => unreachable,
- }
-}
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
@@ -9,70 +9,102 @@ const ThreadPool = @This();
lock: std.Mutex = .{},
is_running: bool = true,
allocator: *std.mem.Allocator,
-running: usize = 0,
-threads: []*std.Thread,
+workers: []Worker,
run_queue: RunQueue = .{},
idle_queue: IdleQueue = .{},
-const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent);
+const IdleQueue = std.SinglyLinkedList(std.ResetEvent);
const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct {
runFn: fn (*Runnable) void,
};
+const Worker = struct {
+ pool: *ThreadPool,
+ thread: *std.Thread,
+ /// The node is for this worker only and must have an already initialized event
+ /// when the thread is spawned.
+ idle_node: IdleQueue.Node,
+
+ fn run(worker: *Worker) void {
+ while (true) {
+ const held = worker.pool.lock.acquire();
+
+ if (worker.pool.run_queue.popFirst()) |run_node| {
+ held.release();
+ (run_node.data.runFn)(&run_node.data);
+ continue;
+ }
+
+ if (worker.pool.is_running) {
+ worker.idle_node.data.reset();
+
+ worker.pool.idle_queue.prepend(&worker.idle_node);
+ held.release();
+
+ worker.idle_node.data.wait();
+ continue;
+ }
+
+ held.release();
+ return;
+ }
+ }
+};
+
pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
self.* = .{
.allocator = allocator,
- .threads = &[_]*std.Thread{},
+ .workers = &[_]Worker{},
};
if (std.builtin.single_threaded)
return;
- errdefer self.deinit();
+ const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1);
+ self.workers = try allocator.alloc(Worker, worker_count);
+ errdefer allocator.free(self.workers);
+
+ var worker_index: usize = 0;
+ errdefer self.destroyWorkers(worker_index);
+ while (worker_index < worker_count) : (worker_index += 1) {
+ const worker = &self.workers[worker_index];
+ worker.pool = self;
- var num_threads = std.Thread.cpuCount() catch 1;
- if (num_threads > 0)
- self.threads = try allocator.alloc(*std.Thread, num_threads);
+ // Each worker requires its ResetEvent to be pre-initialized.
+ try worker.idle_node.data.init();
+ errdefer worker.idle_node.data.deinit();
- while (num_threads > 0) : (num_threads -= 1) {
- const thread = try std.Thread.spawn(self, runWorker);
- self.threads[self.running] = thread;
- self.running += 1;
+ worker.thread = try std.Thread.spawn(worker, Worker.run);
}
}
-pub fn deinit(self: *ThreadPool) void {
- self.shutdown();
-
- std.debug.assert(!self.is_running);
- for (self.threads[0..self.running]) |thread|
- thread.wait();
-
- defer self.threads = &[_]*std.Thread{};
- if (self.running > 0)
- self.allocator.free(self.threads);
+fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
+ for (self.workers[0..spawned]) |*worker| {
+ worker.thread.wait();
+ worker.idle_node.data.deinit();
+ }
}
-pub fn shutdown(self: *ThreadPool) void {
- const held = self.lock.acquire();
-
- if (!self.is_running)
- return held.release();
+pub fn deinit(self: *ThreadPool) void {
+ {
+ const held = self.lock.acquire();
+ defer held.release();
- var idle_queue = self.idle_queue;
- self.idle_queue = .{};
- self.is_running = false;
- held.release();
+ self.is_running = false;
+ while (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
+ }
- while (idle_queue.popFirst()) |idle_node|
- idle_node.data.set();
+ self.destroyWorkers(self.workers.len);
+ self.allocator.free(self.workers);
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
if (std.builtin.single_threaded) {
- @call(.{}, func, args);
+ const result = @call(.{}, func, args);
return;
}
+
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
@@ -83,44 +115,24 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
const closure = @fieldParentPtr(@This(), "run_node", run_node);
const result = @call(.{}, func, closure.arguments);
+
+ const held = closure.pool.lock.acquire();
+ defer held.release();
closure.pool.allocator.destroy(closure);
}
};
+ const held = self.lock.acquire();
+ defer held.release();
+
const closure = try self.allocator.create(Closure);
closure.* = .{
.arguments = args,
.pool = self,
};
- const held = self.lock.acquire();
self.run_queue.prepend(&closure.run_node);
- const idle_node = self.idle_queue.popFirst();
- held.release();
-
- if (idle_node) |node|
- node.data.set();
-}
-
-fn runWorker(self: *ThreadPool) void {
- while (true) {
- const held = self.lock.acquire();
-
- if (self.run_queue.popFirst()) |run_node| {
- held.release();
- (run_node.data.runFn)(&run_node.data);
- continue;
- }
-
- if (!self.is_running) {
- held.release();
- return;
- }
-
- var idle_node = IdleQueue.Node{ .data = .{} };
- self.idle_queue.prepend(&idle_node);
- held.release();
- idle_node.data.wait();
- }
+ if (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
}
diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig
@@ -5,11 +5,24 @@
// and substantial portions of the software.
const std = @import("std");
const WaitGroup = @This();
-const Event = @import("Event.zig");
lock: std.Mutex = .{},
counter: usize = 0,
-event: ?*Event = null,
+event: std.ResetEvent,
+
+pub fn init(self: *WaitGroup) !void {
+ self.* = .{
+ .lock = .{},
+ .counter = 0,
+ .event = undefined,
+ };
+ try self.event.init();
+}
+
+pub fn deinit(self: *WaitGroup) void {
+ self.event.deinit();
+ self.* = undefined;
+}
pub fn start(self: *WaitGroup) void {
const held = self.lock.acquire();
@@ -18,29 +31,31 @@ pub fn start(self: *WaitGroup) void {
self.counter += 1;
}
-pub fn stop(self: *WaitGroup) void {
- var event: ?*Event = null;
- defer if (event) |waiter|
- waiter.set();
-
+pub fn finish(self: *WaitGroup) void {
const held = self.lock.acquire();
defer held.release();
self.counter -= 1;
- if (self.counter == 0)
- std.mem.swap(?*Event, &self.event, &event);
+
+ if (self.counter == 0) {
+ self.event.set();
+ }
}
pub fn wait(self: *WaitGroup) void {
- var event = Event{};
- var has_event = false;
- defer if (has_event)
- event.wait();
+ while (true) {
+ const held = self.lock.acquire();
- const held = self.lock.acquire();
- defer held.release();
+ if (self.counter == 0) {
+ held.release();
+ return;
+ }
+
+ held.release();
+ self.event.wait();
+ }
+}
- has_event = self.counter != 0;
- if (has_event)
- self.event = &event;
+pub fn reset(self: *WaitGroup) void {
+ self.event.reset();
}