zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 177377b6e356b34bbed40cadca596658d158af6b (tree)
parent 5377b7fb97311448daa3c29a8c8f100656d871ba
Author: Andrew Kelley <andrew@ziglang.org>
Date:   Wed, 23 Dec 2020 16:57:18 -0800

rework std.ResetEvent, improve std lib Darwin integration

 * split std.ResetEvent into:
   - ResetEvent - requires init() at runtime and it can fail. Also
     requires deinit().
   - StaticResetEvent - can be statically initialized and requires no
     deinitialization. Initialization cannot fail.
 * the POSIX sem_t implementation can in fact fail on initialization
   because it is allowed to be implemented as a file descriptor.
 * Completely define, clarify, and explain in detail the semantics of
   these APIs. Remove the `isSet` function.
 * `ResetEvent.timedWait` returns an enum instead of a possible error.
 * `ResetEvent.init` takes a pointer to the ResetEvent instead of
   returning a copy.
 * On Darwin, `ResetEvent` is implemented using Grand Central Dispatch,
   which is exposed by libSystem.

stage2 changes:
 * ThreadPool: use a single, pre-initialized `ResetEvent` per worker.
 * WaitGroup: now requires init() and deinit() and init() can fail.
   - Add a `reset` function.
   - Compilation initializes one for the work queue in creation and
     re-uses it for every update.
   - Rename `stop` to `finish`.
   - Simplify the implementation based on the usage pattern.

Diffstat:
MCMakeLists.txt | 5+++--
Alib/std/ResetEvent.zig | 297+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Alib/std/StaticResetEvent.zig | 396+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mlib/std/c/darwin.zig | 12++++++++++++
Mlib/std/debug.zig | 3+--
Mlib/std/fs/test.zig | 2--
Mlib/std/mutex.zig | 4++--
Dlib/std/reset_event.zig | 501-------------------------------------------------------------------------------
Mlib/std/std.zig | 3++-
Msrc/Compilation.zig | 18+++++++++++++-----
Msrc/ThreadPool.zig | 96+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Msrc/WaitGroup.zig | 35++++++++++++++++++++++-------------
12 files changed, 805 insertions(+), 567 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt @@ -410,11 +410,12 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/os/windows/bits.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" + "${CMAKE_SOURCE_DIR}/lib/std/ResetEvent.zig" + "${CMAKE_SOURCE_DIR}/lib/std/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/pdb.zig" "${CMAKE_SOURCE_DIR}/lib/std/process.zig" - "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" "${CMAKE_SOURCE_DIR}/lib/std/rand.zig" - "${CMAKE_SOURCE_DIR}/lib/std/reset_event.zig" "${CMAKE_SOURCE_DIR}/lib/std/sort.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/addXf3.zig" diff --git a/lib/std/ResetEvent.zig b/lib/std/ResetEvent.zig @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. +//! If you need an abstraction that cannot fail to be initialized, see +//! `std.StaticResetEvent`. However if you can handle initialization failure, +//! it is preferred to use `ResetEvent`. + +const ResetEvent = @This(); +const std = @import("std.zig"); +const builtin = std.builtin; +const testing = std.testing; +const assert = std.debug.assert; +const c = std.c; +const os = std.os; +const time = std.time; + +impl: Impl, + +pub const Impl = if (builtin.single_threaded) + std.StaticResetEvent.DebugEvent +else if (std.Target.current.isDarwin()) + DarwinEvent +else if (std.Thread.use_pthreads) + PosixEvent +else + std.StaticResetEvent.AtomicEvent; + +pub const InitError = error{SystemResources}; + +/// After `init`, it is legal to call any other function. +pub fn init(ev: *ResetEvent) InitError!void { + return ev.impl.init(); +} + +/// This function is not thread-safe. +/// After `deinit`, the only legal function to call is `init`. +pub fn deinit(ev: *ResetEvent) void { + return ev.impl.deinit(); +} + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *ResetEvent) void { + return ev.impl.set(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *ResetEvent) void { + return ev.impl.reset(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn wait(ev: *ResetEvent) void { + return ev.impl.wait(); +} + +pub const TimedWaitResult = enum { event_set, timed_out }; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// Apple has decided to not support POSIX semaphores, so we go with a +/// different approach using Grand Central Dispatch. This API is exposed +/// by libSystem so it is guaranteed to be available on all Darwin platforms. +pub const DarwinEvent = struct { + sem: c.dispatch_semaphore_t = undefined, + + pub fn init(ev: *DarwinEvent) !void { + ev.* = .{ + .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, + }; + } + + pub fn deinit(ev: *DarwinEvent) void { + c.dispatch_release(ev.sem); + ev.* = undefined; + } + + pub fn set(ev: *DarwinEvent) void { + // Empirically this returns the numerical value of the semaphore. + _ = c.dispatch_semaphore_signal(ev.sem); + } + + pub fn wait(ev: *DarwinEvent) void { + assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + } + + pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { + const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); + if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { + return .timed_out; + } else { + return .event_set; + } + } + + pub fn reset(ev: *DarwinEvent) void { + // Keep calling until the semaphore goes back down to 0. + while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + } +}; + +/// POSIX semaphores must be initialized at runtime because they are allowed to +/// be implemented as file descriptors, in which case initialization would require +/// a syscall to open the fd. +pub const PosixEvent = struct { + sem: c.sem_t = undefined, + + pub fn init(ev: *PosixEvent) !void { + switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { + 0 => return, + else => return error.SystemResources, + } + } + + pub fn deinit(ev: *PosixEvent) void { + assert(c.sem_destroy(&ev.sem) == 0); + ev.* = undefined; + } + + pub fn set(ev: *PosixEvent) void { + assert(c.sem_post(&ev.sem) == 0); + } + + pub fn wait(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&ev.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } + } + } + + pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { + var ts: os.timespec = undefined; + var timeout_abs = timeout_ns; + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { + 0 => return .event_set, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return .timed_out, + else => unreachable, + } + } + } + + pub fn reset(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_trywait(&ev.sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } + } +}; + +test "basic usage" { + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128, + in: ResetEvent, + out: ResetEvent, + + fn init(self: *Self) !void { + self.* = .{ + .value = 0, + .in = undefined, + .out = undefined, + }; + try self.in.init(); + try self.out.init(); + } + + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context: Context = undefined; + try context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/StaticResetEvent.zig b/lib/std/StaticResetEvent.zig @@ -0,0 +1,396 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API is statically initializable. It cannot fail to be initialized +//! and it requires no deinitialization. The downside is that it may not +//! integrate as cleanly into other synchronization APIs, or, in a worst case, +//! may be forced to fall back on spin locking. As a rule of thumb, prefer +//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when +//! the logic needs stronger API guarantees. + +const std = @import("std.zig"); +const StaticResetEvent = @This(); +const SpinLock = std.SpinLock; +const assert = std.debug.assert; +const os = std.os; +const time = std.time; +const linux = std.os.linux; +const windows = std.os.windows; +const testing = std.testing; + +impl: Impl = .{}, + +pub const Impl = if (std.builtin.single_threaded) + DebugEvent +else + AtomicEvent; + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *StaticResetEvent) void { + return ev.impl.set(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn wait(ev: *StaticResetEvent) void { + return ev.impl.wait(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *StaticResetEvent) void { + return ev.impl.reset(); +} + +pub const TimedWaitResult = std.ResetEvent.TimedWaitResult; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `timedWait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// For single-threaded builds, we use this to detect deadlocks. +/// In unsafe modes this ends up being no-ops. +pub const DebugEvent = struct { + state: State = State.unset, + + const State = enum { + unset, + set, + waited, + }; + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *DebugEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *DebugEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *DebugEvent) void { + switch (ev.state) { + .unset => ev.state = .set, + .set => {}, + .waited => unreachable, // Not allowed to call `set` until `reset`. + } + } + + pub fn wait(ev: *DebugEvent) void { + switch (ev.state) { + .unset => unreachable, // Deadlock detected. + .set => return, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + switch (ev.state) { + .unset => return .timed_out, + .set => return .event_set, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn reset(ev: *DebugEvent) void { + ev.state = .unset; + } +}; + +pub const AtomicEvent = struct { + waiters: u32 = 0, + + const WAKE = 1 << 0; + const WAIT = 1 << 1; + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *AtomicEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *AtomicEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *AtomicEvent) void { + const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); + if (waiters >= WAIT) { + return Futex.wake(&ev.waiters, waiters >> 1); + } + } + + pub fn wait(ev: *AtomicEvent) void { + switch (ev.timedWait(null)) { + .timed_out => unreachable, + .event_set => return, + } + } + + pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { + var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); + while (waiters != WAKE) { + waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { + if (Futex.wait(&ev.waiters, timeout)) |_| { + return .event_set; + } else |_| { + return .timed_out; + } + }; + } + return .event_set; + } + + pub fn reset(ev: *AtomicEvent) void { + @atomicStore(u32, &ev.waiters, 0, .Monotonic); + } + + pub const Futex = switch (std.Target.current.os.tag) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + pub const SpinFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void {} + + fn wait(waiters: *u32, timeout: ?u64) !void { + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch return error.TimedOut; + + while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { + SpinLock.yield(); + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + pub const LinuxFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void { + const waiting = std.math.maxInt(i32); // wake_count + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); + assert(linux.getErrno(rc) == 0); + } + + fn wait(waiters: *u32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (true) { + const waiting = @atomicLoad(u32, waiters, .Acquire); + if (waiting == WAKE) + return; + const expected = @intCast(i32, waiting); + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + pub const WindowsFutex = struct { + pub fn wake(waiters: *u32, wake_count: u32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); + const key = @ptrCast(*const c_void, waiters); + + var waiting = wake_count; + while (waiting != 0) : (waiting -= 1) { + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .SUCCESS); + } + } + + pub fn wait(waiters: *u32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); + const key = @ptrCast(*const c_void, waiters); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + .TIMEOUT => { + // update the wait count to signal that we're not waiting anymore. + // if the .set() thread already observed that we are, perform a + // matching NtWaitForKeyedEvent so that the .set() thread doesn't + // deadlock trying to run NtReleaseKeyedEvent above. + var waiting = @atomicLoad(u32, waiters, .Monotonic); + while (true) { + if (waiting == WAKE) { + rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .WAIT_0); + break; + } else { + waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; + continue; + } + } + return error.TimedOut; + }, + .WAIT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "basic usage" { + var event = StaticResetEvent{}; + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (std.builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128 = 0, + in: StaticResetEvent = .{}, + out: StaticResetEvent = .{}, + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context = Context{}; + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig @@ -187,3 +187,15 @@ pub const pthread_attr_t = extern struct { }; pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void; + +// Grand Central Dispatch is exposed by libSystem. +pub const dispatch_semaphore_t = *opaque{}; +pub const dispatch_time_t = u64; +pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0); +pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0); +pub extern "c" fn dispatch_semaphore_create(value: isize) ?dispatch_semaphore_t; +pub extern "c" fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize; +pub extern "c" fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize; + +pub extern "c" fn dispatch_release(object: *c_void) void; +pub extern "c" fn dispatch_time(when: dispatch_time_t, delta: i64) dispatch_time_t; diff --git a/lib/std/debug.zig b/lib/std/debug.zig @@ -274,9 +274,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c // and call abort() // Sleep forever without hammering the CPU - var event = std.ResetEvent.init(); + var event: std.StaticResetEvent = .{}; event.wait(); - unreachable; } }, diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig @@ -771,8 +771,6 @@ test "open file with exclusive lock twice, make sure it waits" { std.time.sleep(SLEEP_TIMEOUT_NS); if (timer.read() >= SLEEP_TIMEOUT_NS) break; } - // Check that createFile is still waiting for the lock to be released. - testing.expect(!evt.isSet()); file.close(); // No timeout to avoid failures on heavily loaded systems. evt.wait(); diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig @@ -284,7 +284,7 @@ const WindowsMutex = struct { fn acquireSlow(self: *WindowsMutex) Held { // try to use NT keyed events for blocking, falling back to spinlock if unavailable @setCold(true); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning(); + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); const key = @ptrCast(*const c_void, &self.state.waiters); while (true) : (SpinLock.loopHint(1)) { @@ -312,7 +312,7 @@ const WindowsMutex = struct { pub fn release(self: Held) void { // unlock without a rmw/cmpxchg instruction @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return; + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return; const key = @ptrCast(*const c_void, &self.mutex.state.waiters); while (true) : (SpinLock.loopHint(1)) { diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig @@ -1,501 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const SpinLock = std.SpinLock; -const assert = std.debug.assert; -const c = std.c; -const os = std.os; -const time = std.time; -const linux = os.linux; -const windows = os.windows; - -/// A resource object which supports blocking until signaled. -/// Once finished, the `deinit()` method should be called for correctness. -pub const ResetEvent = struct { - os_event: OsEvent, - - pub const OsEvent = if (builtin.single_threaded) - DebugEvent - else if (std.Thread.use_pthreads) - PosixEvent - else - AtomicEvent; - - pub fn init() ResetEvent { - return ResetEvent{ .os_event = OsEvent.init() }; - } - - pub fn deinit(self: *ResetEvent) void { - self.os_event.deinit(); - } - - /// When `wait` would return without blocking, this returns `true`. - /// Note that the value may be immediately invalid upon this function's - /// return, because another thread may call `wait` in between, changing - /// the event's set/cleared status. - pub fn isSet(self: *ResetEvent) bool { - return self.os_event.isSet(); - } - - /// Sets the event if not already set and - /// wakes up all the threads waiting on the event. - pub fn set(self: *ResetEvent) void { - return self.os_event.set(); - } - - /// Resets the event to its original, unset state. - /// TODO improve these docs: - /// * under what circumstances does it make sense to call this function? - pub fn reset(self: *ResetEvent) void { - return self.os_event.reset(); - } - - /// Wait for the event to be set by blocking the current thread. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn wait(self: *ResetEvent) void { - return self.os_event.wait(); - } - - /// Wait for the event to be set by blocking the current thread. - /// A timeout in nanoseconds can be provided as a hint for how - /// long the thread should block on the unset event before throwing error.TimedOut. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { - return self.os_event.timedWait(timeout_ns); - } -}; - -const DebugEvent = struct { - is_set: bool, - - fn init() DebugEvent { - return DebugEvent{ .is_set = false }; - } - - fn deinit(self: *DebugEvent) void { - self.* = undefined; - } - - fn isSet(self: *DebugEvent) bool { - return self.is_set; - } - - fn reset(self: *DebugEvent) void { - self.is_set = false; - } - - fn set(self: *DebugEvent) void { - self.is_set = true; - } - - fn wait(self: *DebugEvent) void { - if (self.is_set) - return; - - @panic("deadlock detected"); - } - - fn timedWait(self: *DebugEvent, timeout: u64) !void { - if (self.is_set) - return; - - return error.TimedOut; - } -}; - -const PosixEvent = struct { - sem: c.sem_t = undefined, - /// Sadly this is needed because pthreads semaphore API does not - /// support static initialization. - init_mutex: std.mutex.PthreadMutex = .{}, - state: enum { uninit, init } = .uninit, - - fn init() PosixEvent { - return .{}; - } - - /// Not thread-safe. - fn deinit(self: *PosixEvent) void { - switch (self.state) { - .uninit => {}, - .init => { - assert(c.sem_destroy(&self.sem) == 0); - }, - } - self.* = undefined; - } - - fn isSet(self: *PosixEvent) bool { - const sem = self.getInitializedSem(); - var val: c_int = undefined; - assert(c.sem_getvalue(sem, &val) == 0); - return val > 0; - } - - fn reset(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_trywait(sem))) { - 0 => continue, // Need to make it go to zero. - c.EINTR => continue, - c.EINVAL => unreachable, - c.EAGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } - } - } - - fn set(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - assert(c.sem_post(sem) == 0); - } - - fn wait(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_wait(sem))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - else => unreachable, - } - } - } - - fn timedWait(self: *PosixEvent, timeout_ns: u64) !void { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - if (comptime std.Target.current.isDarwin()) { - var tv: os.darwin.timeval = undefined; - assert(os.darwin.gettimeofday(&tv, null) == 0); - timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us; - } else { - os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return error.TimedOut; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - } - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_timedwait(&self.sem, &ts))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - c.ETIMEDOUT => return error.TimedOut, - else => unreachable, - } - } - } - - fn getInitializedSem(self: *PosixEvent) *c.sem_t { - const held = self.init_mutex.acquire(); - defer held.release(); - - switch (self.state) { - .init => return &self.sem, - .uninit => { - self.state = .init; - assert(c.sem_init(&self.sem, 0, 0) == 0); - return &self.sem; - }, - } - } -}; - -const AtomicEvent = struct { - waiters: u32, - - const WAKE = 1 << 0; - const WAIT = 1 << 1; - - fn init() AtomicEvent { - return AtomicEvent{ .waiters = 0 }; - } - - fn deinit(self: *AtomicEvent) void { - self.* = undefined; - } - - fn isSet(self: *const AtomicEvent) bool { - return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE; - } - - fn reset(self: *AtomicEvent) void { - @atomicStore(u32, &self.waiters, 0, .Monotonic); - } - - fn set(self: *AtomicEvent) void { - const waiters = @atomicRmw(u32, &self.waiters, .Xchg, WAKE, .Release); - if (waiters >= WAIT) { - return Futex.wake(&self.waiters, waiters >> 1); - } - } - - fn wait(self: *AtomicEvent) void { - return self.timedWait(null) catch unreachable; - } - - fn timedWait(self: *AtomicEvent, timeout: ?u64) !void { - var waiters = @atomicLoad(u32, &self.waiters, .Acquire); - while (waiters != WAKE) { - waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse return Futex.wait(&self.waiters, timeout); - } - } - - pub const Futex = switch (builtin.os.tag) { - .windows => WindowsFutex, - .linux => LinuxFutex, - else => SpinFutex, - }; - - const SpinFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void {} - - fn wait(waiters: *u32, timeout: ?u64) !void { - // TODO: handle platforms where a monotonic timer isnt available - var timer: time.Timer = undefined; - if (timeout != null) - timer = time.Timer.start() catch unreachable; - - while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { - SpinLock.yield(); - if (timeout) |timeout_ns| { - if (timer.read() >= timeout_ns) - return error.TimedOut; - } - } - } - }; - - const LinuxFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void { - const waiting = std.math.maxInt(i32); // wake_count - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); - assert(linux.getErrno(rc) == 0); - } - - fn wait(waiters: *u32, timeout: ?u64) !void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - while (true) { - const waiting = @atomicLoad(u32, waiters, .Acquire); - if (waiting == WAKE) - return; - const expected = @intCast(i32, waiting); - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); - switch (linux.getErrno(rc)) { - 0 => continue, - os.ETIMEDOUT => return error.TimedOut, - os.EINTR => continue, - os.EAGAIN => return, - else => unreachable, - } - } - } - }; - - const WindowsFutex = struct { - pub fn wake(waiters: *u32, wake_count: u32) void { - const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); - const key = @ptrCast(*const c_void, waiters); - - var waiting = wake_count; - while (waiting != 0) : (waiting -= 1) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - } - } - - pub fn wait(waiters: *u32, timeout: ?u64) !void { - const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); - const key = @ptrCast(*const c_void, waiters); - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - .TIMEOUT => { - // update the wait count to signal that we're not waiting anymore. - // if the .set() thread already observed that we are, perform a - // matching NtWaitForKeyedEvent so that the .set() thread doesn't - // deadlock trying to run NtReleaseKeyedEvent above. - var waiting = @atomicLoad(u32, waiters, .Monotonic); - while (true) { - if (waiting == WAKE) { - rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .WAIT_0); - break; - } else { - waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; - continue; - } - } - return error.TimedOut; - }, - .WAIT_0 => {}, - else => unreachable, - } - } - - var event_handle: usize = EMPTY; - const EMPTY = ~@as(usize, 0); - const LOADING = EMPTY - 1; - - pub fn getEventHandle() ?windows.HANDLE { - var handle = @atomicLoad(usize, &event_handle, .Monotonic); - while (true) { - switch (handle) { - EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { - const handle_ptr = @ptrCast(*windows.HANDLE, &handle); - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) - handle = 0; - @atomicStore(usize, &event_handle, handle, .Monotonic); - return @intToPtr(?windows.HANDLE, handle); - }, - LOADING => { - SpinLock.yield(); - handle = @atomicLoad(usize, &event_handle, .Monotonic); - }, - else => { - return @intToPtr(?windows.HANDLE, handle); - }, - } - } - } - }; -}; - -test "ResetEvent" { - var event = ResetEvent.init(); - defer event.deinit(); - - // test event setting - testing.expect(!event.isSet()); - event.set(); - testing.expect(event.isSet()); - - // test event resetting - event.reset(); - testing.expect(!event.isSet()); - - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); - - event.set(); - try event.timedWait(1); - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - const Self = @This(); - - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init() Self { - return Self{ - .value = 0, - .in = ResetEvent.init(), - .out = ResetEvent.init(), - }; - } - - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; - } - - fn sender(self: *Self) void { - // update value and signal input - testing.expect(self.value == 0); - self.value = 1; - self.in.set(); - - // wait for receiver to update value and signal output - self.out.wait(); - testing.expect(self.value == 2); - - // update value and signal final input - self.value = 3; - self.in.set(); - } - - fn receiver(self: *Self) void { - // wait for sender to update value and signal input - self.in.wait(); - assert(self.value == 1); - - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); - - // wait for sender to update value and signal final input - self.in.wait(); - assert(self.value == 3); - } - - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); - } - - fn timedWaiter(self: *Self) !void { - self.in.wait(); - testing.expectError(error.TimedOut, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - testing.expect(self.value == 5); - } - }; - - var context = Context.init(); - defer context.deinit(); - const receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(&timed, Context.sleeper); - defer sleeper.wait(); - try timed.timedWaiter(); - } -} diff --git a/lib/std/std.zig b/lib/std/std.zig @@ -30,10 +30,11 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("Progress.zig"); -pub const ResetEvent = @import("reset_event.zig").ResetEvent; +pub const ResetEvent = @import("ResetEvent.zig"); pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; pub const SpinLock = @import("spinlock.zig").SpinLock; +pub const StaticResetEvent = @import("StaticResetEvent.zig"); pub const StringHashMap = hash_map.StringHashMap; pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged; pub const StringArrayHashMap = array_hash_map.StringArrayHashMap; diff --git a/src/Compilation.zig b/src/Compilation.zig @@ -135,6 +135,8 @@ emit_docs: ?EmitLoc, c_header: ?c_link.Header, +work_queue_wait_group: WaitGroup, + pub const InnerError = Module.InnerError; pub const CRTFile = struct { @@ -1006,11 +1008,15 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .test_filter = options.test_filter, .test_name_prefix = options.test_name_prefix, .test_evented_io = options.test_evented_io, + .work_queue_wait_group = undefined, }; break :comp comp; }; errdefer comp.destroy(); + try comp.work_queue_wait_group.init(); + errdefer comp.work_queue_wait_group.deinit(); + if (comp.bin_file.options.module) |mod| { try comp.work_queue.writeItem(.{ .generate_builtin_zig = {} }); } @@ -1191,6 +1197,8 @@ pub fn destroy(self: *Compilation) void { self.cache_parent.manifest_dir.close(); if (self.owned_link_dir) |*dir| dir.close(); + self.work_queue_wait_group.deinit(); + // This destroys `self`. self.arena_state.promote(gpa).deinit(); } @@ -1405,13 +1413,13 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var arena = std.heap.ArenaAllocator.init(self.gpa); defer arena.deinit(); - var wg = WaitGroup{}; - defer wg.wait(); + self.work_queue_wait_group.reset(); + defer self.work_queue_wait_group.wait(); while (self.c_object_work_queue.readItem()) |c_object| { - wg.start(); + self.work_queue_wait_group.start(); try self.thread_pool.spawn(workerUpdateCObject, .{ - self, c_object, &c_comp_progress_node, &wg, + self, c_object, &c_comp_progress_node, &self.work_queue_wait_group, }); } @@ -1764,7 +1772,7 @@ fn workerUpdateCObject( progress_node: *std.Progress.Node, wg: *WaitGroup, ) void { - defer wg.stop(); + defer wg.finish(); comp.updateCObject(c_object, progress_node) catch |err| switch (err) { error.AnalysisFail => return, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig @@ -9,8 +9,7 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -spawned: usize = 0, -threads: []*std.Thread, +workers: []Worker, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, @@ -20,23 +19,69 @@ const Runnable = struct { runFn: fn (*Runnable) void, }; +const Worker = struct { + pool: *ThreadPool, + thread: *std.Thread, + /// The node is for this worker only and must have an already initialized event + /// when the thread is spawned. + idle_node: IdleQueue.Node, + + fn run(worker: *Worker) void { + while (true) { + const held = worker.pool.lock.acquire(); + + if (worker.pool.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (worker.pool.is_running) { + worker.idle_node.data.reset(); + + worker.pool.idle_queue.prepend(&worker.idle_node); + held.release(); + + worker.idle_node.data.wait(); + continue; + } + + held.release(); + return; + } + } +}; + pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .threads = &[_]*std.Thread{}, + .workers = &[_]Worker{}, }; if (std.builtin.single_threaded) return; - errdefer self.deinit(); + const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1); + self.workers = try allocator.alloc(Worker, worker_count); + errdefer allocator.free(self.workers); + + var worker_index: usize = 0; + errdefer self.destroyWorkers(worker_index); + while (worker_index < worker_count) : (worker_index += 1) { + const worker = &self.workers[worker_index]; + worker.pool = self; - var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); - self.threads = try allocator.alloc(*std.Thread, num_threads); + // Each worker requires its ResetEvent to be pre-initialized. + try worker.idle_node.data.init(); + errdefer worker.idle_node.data.deinit(); + + worker.thread = try std.Thread.spawn(worker, Worker.run); + } +} - while (num_threads > 0) : (num_threads -= 1) { - const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.spawned] = thread; - self.spawned += 1; +fn destroyWorkers(self: *ThreadPool, spawned: usize) void { + for (self.workers[0..spawned]) |*worker| { + worker.thread.wait(); + worker.idle_node.data.deinit(); } } @@ -50,9 +95,8 @@ pub fn deinit(self: *ThreadPool) void { idle_node.data.set(); } - defer self.allocator.free(self.threads); - for (self.threads[0..self.spawned]) |thread| - thread.wait(); + self.destroyWorkers(self.workers.len); + self.allocator.free(self.workers); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -92,29 +136,3 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (self.idle_queue.popFirst()) |idle_node| idle_node.data.set(); } - -fn runWorker(self: *ThreadPool) void { - while (true) { - const held = self.lock.acquire(); - - if (self.run_queue.popFirst()) |run_node| { - held.release(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (self.is_running) { - var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - - self.idle_queue.prepend(&idle_node); - held.release(); - - idle_node.data.wait(); - idle_node.data.deinit(); - continue; - } - - held.release(); - return; - } -} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig @@ -8,7 +8,21 @@ const WaitGroup = @This(); lock: std.Mutex = .{}, counter: usize = 0, -event: ?*std.ResetEvent = null, +event: std.ResetEvent, + +pub fn init(self: *WaitGroup) !void { + self.* = .{ + .lock = .{}, + .counter = 0, + .event = undefined, + }; + try self.event.init(); +} + +pub fn deinit(self: *WaitGroup) void { + self.event.deinit(); + self.* = undefined; +} pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -17,17 +31,14 @@ pub fn start(self: *WaitGroup) void { self.counter += 1; } -pub fn stop(self: *WaitGroup) void { +pub fn finish(self: *WaitGroup) void { const held = self.lock.acquire(); defer held.release(); self.counter -= 1; if (self.counter == 0) { - if (self.event) |event| { - self.event = null; - event.set(); - } + self.event.set(); } } @@ -40,13 +51,11 @@ pub fn wait(self: *WaitGroup) void { return; } - var event = std.ResetEvent.init(); - defer event.deinit(); - - std.debug.assert(self.event == null); - self.event = &event; - held.release(); - event.wait(); + self.event.wait(); } } + +pub fn reset(self: *WaitGroup) void { + self.event.reset(); +}