commit 9bf65f6e05467496780ede466cebc9ed8a0e17f3 (tree)
parent 0649f96da3e89c1397824e3b42702024b70ec332
Author: Matthew Lugg <mlugg@mlugg.co.uk>
Date: Sat, 20 Dec 2025 13:10:10 +0000
std.Io.Threaded: replace ResetEvent with Io.Event
Diffstat:
1 file changed, 24 insertions(+), 227 deletions(-)
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -783,13 +783,13 @@ const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.
const AsyncClosure = struct {
closure: Closure,
func: *const fn (context: *anyopaque, result: *anyopaque) void,
- reset_event: ResetEvent,
- select_condition: ?*ResetEvent,
+ event: Io.Event,
+ select_condition: ?*Io.Event,
context_alignment: Alignment,
result_offset: usize,
alloc_len: usize,
- const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
+ const done_event: *Io.Event = @ptrFromInt(@alignOf(Io.Event));
fn start(closure: *Closure, t: *Threaded) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
@@ -798,11 +798,11 @@ const AsyncClosure = struct {
ac.func(ac.contextPointer(), ac.resultPointer());
current_thread.current_closure = null;
- if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
- assert(select_reset != done_reset_event);
- select_reset.set();
+ if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| {
+ assert(select_event != done_event);
+ select_event.set(ioBasic(t));
}
- ac.reset_event.set();
+ ac.event.set(ioBasic(t));
}
fn resultPointer(ac: *AsyncClosure) [*]u8 {
@@ -844,7 +844,7 @@ const AsyncClosure = struct {
.context_alignment = context_alignment,
.result_offset = actual_result_offset,
.alloc_len = alloc_len,
- .reset_event = .unset,
+ .event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
@@ -852,10 +852,10 @@ const AsyncClosure = struct {
}
fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void {
- ac.reset_event.wait(t) catch |err| switch (err) {
+ ac.event.wait(ioBasic(t)) catch |err| switch (err) {
error.Canceled => {
ac.closure.requestCancel(t);
- ac.reset_event.waitUncancelable();
+ ac.event.waitUncancelable(ioBasic(t));
},
};
@memcpy(result, ac.resultPointer()[0..result.len]);
@@ -977,14 +977,14 @@ const GroupClosure = struct {
const current_thread = Thread.getCurrent(t);
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
- const reset_event: *ResetEvent = @ptrCast(&group.context);
+ const event: *Io.Event = @ptrCast(&group.context);
current_thread.current_closure = closure;
gc.func(group, gc.contextPointer());
current_thread.current_closure = null;
const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel);
assert((prev_state / sync_one_pending) > 0);
- if (prev_state == (sync_one_pending | sync_is_waiting)) reset_event.set();
+ if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t));
}
fn contextPointer(gc: *GroupClosure) [*]u8 {
@@ -1145,10 +1145,10 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
if (builtin.single_threaded) return;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
- const reset_event: *ResetEvent = @ptrCast(&group.context);
+ const event: *Io.Event = @ptrCast(&group.context);
const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
assert(prev_state & GroupClosure.sync_is_waiting == 0);
- if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.wait(t) catch |err| switch (err) {
+ if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) {
error.Canceled => {
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
@@ -1156,7 +1156,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
gc.closure.requestCancel(t);
node = node.next orelse break;
}
- reset_event.waitUncancelable();
+ event.waitUncancelable(ioBasic(t));
},
};
@@ -1185,10 +1185,10 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
}
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
- const reset_event: *ResetEvent = @ptrCast(&group.context);
+ const event: *Io.Event = @ptrCast(&group.context);
const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
assert(prev_state & GroupClosure.sync_is_waiting == 0);
- if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.waitUncancelable();
+ if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t));
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
@@ -3684,28 +3684,28 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
- var reset_event: ResetEvent = .unset;
+ var event: Io.Event = .unset;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
+ if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, &event, .seq_cst) == AsyncClosure.done_event) {
for (futures[0..i]) |cleanup_future| {
const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future));
- if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
- cleanup_closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
+ if (@atomicRmw(?*Io.Event, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) {
+ cleanup_closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event.
}
}
return i;
}
}
- try reset_event.wait(t);
+ try event.wait(ioBasic(t));
var result: ?usize = null;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
- closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
+ if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) {
+ closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event.
if (result == null) result = i; // In case multiple are ready, return first.
}
}
@@ -6666,209 +6666,6 @@ fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) Hos
/// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11;
-/// A thread-safe logical boolean value which can be `set` and `unset`.
-///
-/// It can also block threads until the value is set with cancelation via timed
-/// waits. Statically initializable; four bytes on all targets.
-pub const ResetEvent = switch (native_os) {
- .illumos, .netbsd, .openbsd => ResetEventPosix,
- else => ResetEventFutex,
-};
-
-/// A `ResetEvent` implementation based on futexes.
-const ResetEventFutex = enum(u32) {
- unset = 0,
- waiting = 1,
- is_set = 2,
-
- /// Returns whether the logical boolean is `set`.
- ///
- /// Once `reset` is called, this returns false until the next `set`.
- ///
- /// The memory accesses before the `set` can be said to happen before
- /// `isSet` returns true.
- pub fn isSet(ref: *const ResetEventFutex) bool {
- if (builtin.single_threaded) return switch (ref.*) {
- .unset => false,
- .waiting => unreachable,
- .is_set => true,
- };
- // Acquire barrier ensures memory accesses before `set` happen before
- // returning true.
- return @atomicLoad(ResetEventFutex, ref, .acquire) == .is_set;
- }
-
- /// Blocks the calling thread until `set` is called.
- ///
- /// This is effectively a more efficient version of `while (!isSet()) {}`.
- ///
- /// The memory accesses before the `set` can be said to happen before `wait` returns.
- pub fn wait(ref: *ResetEventFutex, t: *Threaded) Io.Cancelable!void {
- if (builtin.single_threaded) switch (ref.*) {
- .unset => unreachable, // Deadlock, no other threads to wake us up.
- .waiting => unreachable, // Invalid state.
- .is_set => return,
- };
- // Try to set the state from `unset` to `waiting` to indicate to the
- // `set` thread that others are blocked on the ResetEventFutex. Avoid using
- // any strict barriers until we know the ResetEventFutex is set.
- var state = @atomicLoad(ResetEventFutex, ref, .acquire);
- if (state == .is_set) {
- @branchHint(.likely);
- return;
- }
- if (state == .unset) {
- state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
- }
- const current_thread = Thread.getCurrent(t);
- while (state == .waiting) {
- try current_thread.futexWait(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
- state = @atomicLoad(ResetEventFutex, ref, .acquire);
- }
- assert(state == .is_set);
- }
-
- /// Same as `wait` except uninterruptible.
- pub fn waitUncancelable(ref: *ResetEventFutex) void {
- if (builtin.single_threaded) switch (ref.*) {
- .unset => unreachable, // Deadlock, no other threads to wake us up.
- .waiting => unreachable, // Invalid state.
- .is_set => return,
- };
- // Try to set the state from `unset` to `waiting` to indicate to the
- // `set` thread that others are blocked on the ResetEventFutex. Avoid using
- // any strict barriers until we know the ResetEventFutex is set.
- var state = @atomicLoad(ResetEventFutex, ref, .acquire);
- if (state == .is_set) {
- @branchHint(.likely);
- return;
- }
- if (state == .unset) {
- state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
- }
- while (state == .waiting) {
- Thread.futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
- state = @atomicLoad(ResetEventFutex, ref, .acquire);
- }
- assert(state == .is_set);
- }
-
- /// Marks the logical boolean as `set` and unblocks any threads in `wait`
- /// or `timedWait` to observe the new state.
- ///
- /// The logical boolean stays `set` until `reset` is called, making future
- /// `set` calls do nothing semantically.
- ///
- /// The memory accesses before `set` can be said to happen before `isSet`
- /// returns true or `wait`/`timedWait` return successfully.
- pub fn set(ref: *ResetEventFutex) void {
- if (builtin.single_threaded) {
- ref.* = .is_set;
- return;
- }
- if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) {
- Thread.futexWake(@ptrCast(ref), std.math.maxInt(u32));
- }
- }
-
- /// Unmarks the ResetEventFutex as if `set` was never called.
- ///
- /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent
- /// calls to `set`, `isSet` and `reset` are allowed.
- pub fn reset(ref: *ResetEventFutex) void {
- if (builtin.single_threaded) {
- ref.* = .unset;
- return;
- }
- @atomicStore(ResetEventFutex, ref, .unset, .monotonic);
- }
-};
-
-/// A `ResetEvent` implementation based on pthreads API.
-const ResetEventPosix = struct {
- cond: std.c.pthread_cond_t,
- mutex: std.c.pthread_mutex_t,
- state: ResetEventFutex,
-
- pub const unset: ResetEventPosix = .{
- .cond = std.c.PTHREAD_COND_INITIALIZER,
- .mutex = std.c.PTHREAD_MUTEX_INITIALIZER,
- .state = .unset,
- };
-
- pub fn isSet(rep: *const ResetEventPosix) bool {
- if (builtin.single_threaded) return switch (rep.state) {
- .unset => false,
- .waiting => unreachable,
- .is_set => true,
- };
- return @atomicLoad(ResetEventFutex, &rep.state, .acquire) == .is_set;
- }
-
- pub fn wait(rep: *ResetEventPosix, t: *Threaded) Io.Cancelable!void {
- if (builtin.single_threaded) switch (rep.*) {
- .unset => unreachable, // Deadlock, no other threads to wake us up.
- .waiting => unreachable, // Invalid state.
- .is_set => return,
- };
- const current_thread = Thread.getCurrent(t);
- assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
- sw: switch (rep.state) {
- .unset => {
- rep.state = .waiting;
- continue :sw .waiting;
- },
- .waiting => {
- try current_thread.beginSyscall();
- assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
- current_thread.endSyscall();
- continue :sw rep.state;
- },
- .is_set => return,
- }
- }
-
- pub fn waitUncancelable(rep: *ResetEventPosix) void {
- if (builtin.single_threaded) switch (rep.*) {
- .unset => unreachable, // Deadlock, no other threads to wake us up.
- .waiting => unreachable, // Invalid state.
- .is_set => return,
- };
- assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
- sw: switch (rep.state) {
- .unset => {
- rep.state = .waiting;
- continue :sw .waiting;
- },
- .waiting => {
- assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
- continue :sw rep.state;
- },
- .is_set => return,
- }
- }
-
- pub fn set(rep: *ResetEventPosix) void {
- if (builtin.single_threaded) {
- rep.* = .is_set;
- return;
- }
- if (@atomicRmw(ResetEventFutex, &rep.state, .Xchg, .is_set, .release) == .waiting) {
- assert(std.c.pthread_cond_broadcast(&rep.cond) == .SUCCESS);
- }
- }
-
- pub fn reset(rep: *ResetEventPosix) void {
- if (builtin.single_threaded) {
- rep.* = .unset;
- return;
- }
- @atomicStore(ResetEventFutex, &rep.state, .unset, .monotonic);
- }
-};
-
fn closeSocketWindows(s: ws2_32.SOCKET) void {
const rc = ws2_32.closesocket(s);
if (is_debug) switch (rc) {