diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 52235d4327..5c50d0eb55 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -1,8 +1,8 @@ const std = @import("std.zig"); const builtin = @import("builtin"); const testing = std.testing; +const SpinLock = std.SpinLock; const assert = std.debug.assert; -const Backoff = std.SpinLock.Backoff; const c = std.c; const os = std.os; const time = std.time; @@ -14,13 +14,17 @@ const windows = os.windows; pub const ResetEvent = struct { os_event: OsEvent, + pub const OsEvent = if (builtin.single_threaded) DebugEvent else switch (builtin.os) { + .windows => AtomicEvent, + else => if (builtin.link_libc) PosixEvent else AtomicEvent, + }; + pub fn init() ResetEvent { return ResetEvent{ .os_event = OsEvent.init() }; } pub fn deinit(self: *ResetEvent) void { self.os_event.deinit(); - self.* = undefined; } /// Returns whether or not the event is currenetly set @@ -29,308 +33,116 @@ pub const ResetEvent = struct { } /// Sets the event if not already set and - /// wakes up AT LEAST one thread waiting the event. - /// Returns whether or not a thread was woken up. - pub fn set(self: *ResetEvent, auto_reset: bool) bool { - return self.os_event.set(auto_reset); + /// wakes up at least one thread waiting the event. + pub fn set(self: *ResetEvent) void { + return self.os_event.set(); } /// Resets the event to its original, unset state. - /// Returns whether or not the event was currently set before un-setting. - pub fn reset(self: *ResetEvent) bool { + pub fn reset(self: *ResetEvent) void { return self.os_event.reset(); } - const WaitError = error{ - /// The thread blocked longer than the maximum time specified. - TimedOut, - }; + /// Wait for the event to be set by blocking the current thread. + pub fn wait(self: *ResetEvent) void { + return self.os_event.wait(null) catch unreachable; + } /// Wait for the event to be set by blocking the current thread. - /// Optionally provided timeout in nanoseconds which throws an - /// `error.TimedOut` if the thread blocked AT LEAST longer than specified. - /// Returns whether or not the thread blocked from the event being unset at the time of calling. - pub fn wait(self: *ResetEvent, timeout_ns: ?u64) WaitError!bool { + /// A timeout in nanoseconds can be provided as a hint for how + /// long the thread should block on the unset event before throwind error.TimedOut. + pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { return self.os_event.wait(timeout_ns); } }; -const OsEvent = if (builtin.single_threaded) DebugEvent else switch (builtin.os) { - .windows => WindowsEvent, - .linux => if (builtin.link_libc) PosixEvent else LinuxEvent, - else => if (builtin.link_libc) PosixEvent else SpinEvent, -}; - const DebugEvent = struct { - is_set: @TypeOf(set_init), + is_set: bool, - const set_init = if (std.debug.runtime_safety) false else {}; - - pub fn init() DebugEvent { - return DebugEvent{ .is_set = set_init }; + fn init() DebugEvent { + return DebugEvent{ .is_set = false }; } - pub fn deinit(self: *DebugEvent) void { + fn deinit(self: *DebugEvent) void { self.* = undefined; } - pub fn isSet(self: *DebugEvent) bool { - if (!std.debug.runtime_safety) - return true; + fn isSet(self: *DebugEvent) bool { return self.is_set; } - pub fn set(self: *DebugEvent, auto_reset: bool) bool { - if (std.debug.runtime_safety) - self.is_set = !auto_reset; - return false; - } - - pub fn reset(self: *DebugEvent) bool { - if (!std.debug.runtime_safety) - return false; - const was_set = self.is_set; + fn reset(self: *DebugEvent) void { self.is_set = false; - return was_set; } - pub fn wait(self: *DebugEvent, timeout: ?u64) ResetEvent.WaitError!bool { - if (std.debug.runtime_safety and !self.is_set) - @panic("deadlock detected"); - return ResetEvent.WaitError.TimedOut; + fn set(self: *DebugEvent) void { + self.is_set = true; + } + + fn wait(self: *DebugEvent, timeout: ?u64) !void { + if (self.is_set) + return; + if (timeout != null) + return error.TimedOut; + @panic("deadlock detected"); } }; -fn AtomicEvent(comptime FutexImpl: type) type { - return struct { - state: u32, - - const IS_SET: u32 = 1 << 0; - const WAIT_MASK = ~IS_SET; - - pub const Self = @This(); - pub const Futex = FutexImpl; - - pub fn init() Self { - return Self{ .state = 0 }; - } - - pub fn deinit(self: *Self) void { - self.* = undefined; - } - - pub fn isSet(self: *const Self) bool { - const state = @atomicLoad(u32, &self.state, .Acquire); - return (state & IS_SET) != 0; - } - - pub fn reset(self: *Self) bool { - const old_state = @atomicRmw(u32, &self.state, .Xchg, 0, .Monotonic); - return (old_state & IS_SET) != 0; - } - - pub fn set(self: *Self, auto_reset: bool) bool { - const new_state = if (auto_reset) 0 else IS_SET; - const old_state = @atomicRmw(u32, &self.state, .Xchg, new_state, .Release); - if ((old_state & WAIT_MASK) == 0) { - return false; - } - - Futex.wake(&self.state); - return true; - } - - pub fn wait(self: *Self, timeout: ?u64) ResetEvent.WaitError!bool { - var dummy_value: u32 = undefined; - const wait_token = @truncate(u32, @ptrToInt(&dummy_value)); - - var state = @atomicLoad(u32, &self.state, .Monotonic); - while (true) { - if ((state & IS_SET) != 0) - return false; - state = @cmpxchgWeak(u32, &self.state, state, wait_token, .Acquire, .Monotonic) orelse break; - } - - try Futex.wait(&self.state, wait_token, timeout); - return true; - } - }; -} - -const SpinEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void {} - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - // TODO: handle platforms where time.Timer.start() fails - var spin = Backoff.init(); - var timer = if (timeout == null) null else time.Timer.start() catch unreachable; - while (@atomicLoad(u32, ptr, .Acquire) == expected) { - spin.yield(); - if (timeout) |timeout_ns| { - if (timer.?.read() > timeout_ns) - return ResetEvent.WaitError.TimedOut; - } - } - } -}); - -const LinuxEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void { - const key = @ptrCast(*const i32, ptr); - const rc = linux.futex_wake(key, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); - assert(linux.getErrno(rc) == 0); - } - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - const key = @ptrCast(*const i32, ptr); - const key_expect = @bitCast(i32, expected); - while (@atomicLoad(i32, key, .Acquire) == key_expect) { - const rc = linux.futex_wait(key, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, key_expect, ts_ptr); - switch (linux.getErrno(rc)) { - 0, linux.EAGAIN => break, - linux.EINTR => continue, - linux.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, - else => unreachable, - } - } - } -}); - -const WindowsEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void { - if (getEventHandle()) |handle| { - const key = @ptrCast(*const c_void, ptr); - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - } - } - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - // fallback to spinlock if NT Keyed Events arent available - const handle = getEventHandle() orelse { - return SpinEvent.Futex.wait(ptr, expected, timeout); - }; - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - if (@atomicLoad(u32, ptr, .Acquire) == expected) { - const key = @ptrCast(*const c_void, ptr); - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - 0 => {}, - windows.WAIT_TIMEOUT => return ResetEvent.WaitError.TimedOut, - else => unreachable, - } - } - } - - var keyed_state = State.Uninitialized; - var keyed_handle: ?windows.HANDLE = null; - - const State = enum(u8) { - Uninitialized, - Intializing, - Initialized, - }; - - fn getEventHandle() ?windows.HANDLE { - var spin = Backoff.init(); - var state = @atomicLoad(State, &keyed_state, .Monotonic); - - while (true) { - switch (state) { - .Initialized => { - return keyed_handle; - }, - .Intializing => { - spin.yield(); - state = @atomicLoad(State, &keyed_state, .Acquire); - }, - .Uninitialized => state = @cmpxchgWeak(State, &keyed_state, state, .Intializing, .Acquire, .Monotonic) orelse { - var handle: windows.HANDLE = undefined; - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(&handle, access_mask, null, 0) == 0) - keyed_handle = handle; - @atomicStore(State, &keyed_state, .Initialized, .Release); - return keyed_handle; - }, - } - } - } -}); - const PosixEvent = struct { - state: u32, + is_set: bool, cond: c.pthread_cond_t, mutex: c.pthread_mutex_t, - const IS_SET: u32 = 1; - - pub fn init() PosixEvent { + fn init() PosixEvent { return PosixEvent{ - .state = 0, + .is_set = false, .cond = c.PTHREAD_COND_INITIALIZER, .mutex = c.PTHREAD_MUTEX_INITIALIZER, }; } - pub fn deinit(self: *PosixEvent) void { - // On dragonfly, the destroy functions return EINVAL if they were initialized statically. + fn deinit(self: *PosixEvent) void { + // on dragonfly, *destroy() functions can return EINVAL + // for statically initialized pthread structures + const err = if (builtin.os == .dragonfly) os.EINVAL else 0; + const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); + assert(retm == 0 or retm == err); const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); + assert(retc == 0 or retc == err); } - pub fn isSet(self: *PosixEvent) bool { + fn isSet(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - return self.state == IS_SET; + return self.is_set; } - pub fn reset(self: *PosixEvent) bool { + fn reset(self: *PosixEvent) void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const was_set = self.state == IS_SET; - self.state = 0; - return was_set; + self.is_set = false; } - pub fn set(self: *PosixEvent, auto_reset: bool) bool { + fn set(self: *PosixEvent) void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const had_waiter = self.state > IS_SET; - self.state = if (auto_reset) 0 else IS_SET; - if (had_waiter) { + if (!self.is_set) { + self.is_set = true; assert(c.pthread_cond_signal(&self.cond) == 0); } - return had_waiter; } - pub fn wait(self: *PosixEvent, timeout: ?u64) ResetEvent.WaitError!bool { + fn wait(self: *PosixEvent, timeout: ?u64) !void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - if (self.state == IS_SET) - return false; + // quick guard before possibly calling time syscalls below + if (self.is_set) + return; var ts: os.timespec = undefined; if (timeout) |timeout_ns| { @@ -349,85 +161,251 @@ const PosixEvent = struct { ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.second)); } - var dummy_value: u32 = undefined; - var wait_token = @truncate(u32, @ptrToInt(&dummy_value)); - self.state = wait_token; - - while (self.state == wait_token) { + while (!self.is_set) { const rc = switch (timeout == null) { true => c.pthread_cond_wait(&self.cond, &self.mutex), else => c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts), }; - // TODO: rc appears to be the positive error code making os.errno() always return 0 on linux - switch (std.math.max(@as(c_int, os.errno(rc)), rc)) { + switch (rc) { 0 => {}, - os.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, + os.ETIMEDOUT => return error.TimedOut, os.EINVAL => unreachable, os.EPERM => unreachable, else => unreachable, } } - return true; } }; -test "std.ResetEvent" { - // TODO - if (builtin.single_threaded) - return error.SkipZigTest; +const AtomicEvent = struct { + state: State, + const State = enum(i32) { + Empty, + Waiting, + Signaled, + }; + + fn init() AtomicEvent { + return AtomicEvent{ .state = .Empty }; + } + + fn deinit(self: *AtomicEvent) void { + self.* = undefined; + } + + fn isSet(self: *AtomicEvent) bool { + return @atomicLoad(State, &self.state, .Acquire) == .Signaled; + } + + fn reset(self: *AtomicEvent) void { + @atomicStore(State, &self.state, .Empty, .Monotonic); + } + + fn set(self: *AtomicEvent) void { + if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting) + Futex.wake(@ptrCast(*i32, &self.state)); + } + + fn wait(self: *AtomicEvent, timeout: ?u64) !void { + var state = @atomicLoad(State, &self.state, .Monotonic); + while (state == .Empty) { + state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse + return Futex.wait(@ptrCast(*i32, &self.state), @enumToInt(State.Waiting), timeout); + } + } + + pub const Futex = switch (builtin.os) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + const SpinFutex = struct { + fn wake(ptr: *i32) void {} + + fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + // TODO: handle platforms where a monotonic timer isnt available + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch unreachable; + + while (@atomicLoad(i32, ptr, .Acquire) == expected) { + switch (builtin.os) { + .windows => SpinLock.yield(400), + else => os.sched_yield() catch SpinLock.yield(1), + } + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + const LinuxFutex = struct { + fn wake(ptr: *i32) void { + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); + assert(linux.getErrno(rc) == 0); + } + + fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (@atomicLoad(i32, ptr, .Acquire) == expected) { + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + const WindowsFutex = struct { + pub fn wake(ptr: *i32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(ptr); + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == 0); + } + + pub fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(ptr, expected, timeout); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + windows.WAIT_TIMEOUT => return error.TimedOut, + windows.WAIT_OBJECT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != 0) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(1000); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "std.ResetEvent" { var event = ResetEvent.init(); defer event.deinit(); // test event setting testing.expect(event.isSet() == false); - testing.expect(event.set(false) == false); + event.set(); testing.expect(event.isSet() == true); // test event resetting - testing.expect(event.reset() == true); + event.reset(); testing.expect(event.isSet() == false); - testing.expect(event.reset() == false); - // test cross thread signaling + // test event waiting (non-blocking) + event.set(); + event.wait(); + try event.timedWait(1); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + const Context = struct { - event: ResetEvent, + const Self = @This(); + value: u128, + in: ResetEvent, + out: ResetEvent, - fn receiver(self: *@This()) void { - // wait for the sender to notify us with updated value - assert(self.value == 0); - assert((self.event.wait(1 * time.second) catch unreachable) == true); - assert(self.value == 1); - - // wait for sender to sleep, then notify it of new value - time.sleep(50 * time.millisecond); - self.value = 2; - assert(self.event.set(false) == true); + fn init() Self { + return Self{ + .value = 0, + .in = ResetEvent.init(), + .out = ResetEvent.init(), + }; } - fn sender(self: *@This()) !void { - // wait for the receiver() to start wait()'ing - time.sleep(50 * time.millisecond); + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } - // update value to 1 and notify the receiver() - assert(self.value == 0); + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); self.value = 1; - assert(self.event.set(true) == true); + self.in.set(); - // wait for the receiver to update the value & notify us - assert((try self.event.wait(1 * time.second)) == true); - assert(self.value == 2); + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); } }; - _ = event.reset(); - var context = Context{ - .event = event, - .value = 0, - }; - - var receiver = try std.Thread.spawn(&context, Context.receiver); + var context = Context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); defer receiver.wait(); - try context.sender(); + context.sender(); }