commit d821446cf92b9b8974ec4a3e5d7be883d1741b4e (tree)
parent a8226cd536f50fd14c059812388b87bf99324d51
Author: Lukas Lalinsky <lukas@lalinsky.com>
Date: Thu, 19 Feb 2026 12:07:54 +0100
Implement `Condition.waitTimeout`
I'd have preferred if `vtable.futexWait` returned `error.Timeout`, since
all the OS-level APIs provide it. However, if I keep the vtable untouched,
I had to determine the timeout case by post-checking the deadline.
It's fine functionally, but one extra syscall that be avoided at
cost of changing the vtable and all the futex implementations.
Diffstat:
2 files changed, 69 insertions(+), 9 deletions(-)
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
@@ -1682,19 +1682,27 @@ pub const Condition = struct {
};
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
- try waitInner(cond, io, mutex, false);
+ waitInner(cond, io, mutex, .{ .timeout = .none }) catch |err| switch (err) {
+ error.Timeout => unreachable,
+ error.Canceled => return error.Canceled,
+ };
+ }
+
+ pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) (Cancelable || Timeout.Error)!void {
+ return waitInner(cond, io, mutex, .{ .timeout = timeout.toDeadline(io) });
}
/// Same as `wait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
- waitInner(cond, io, mutex, true) catch |err| switch (err) {
+ waitInner(cond, io, mutex, .uncancelable) catch |err| switch (err) {
+ error.Timeout => unreachable,
error.Canceled => unreachable,
};
}
- fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void {
+ fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, mode: union(enum) { uncancelable, timeout: Timeout }) (Cancelable || Timeout.Error)!void {
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
@@ -1706,10 +1714,10 @@ pub const Condition = struct {
defer mutex.lockUncancelable(io);
while (true) {
- const result = if (uncancelable)
- io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch)
- else
- io.futexWait(u32, &cond.epoch.raw, epoch);
+ const result = switch (mode) {
+ .uncancelable => io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch),
+ .timeout => |t| io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, t),
+ };
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
@@ -1729,13 +1737,21 @@ pub const Condition = struct {
}
// There are no more signals available; this was a spurious wakeup or an error. If it
- // was an error, we will remove ourselves as a waiter and return that error. Otherwise,
- // we'll loop back to the futex wait.
+ // was an error, we will remove ourselves as a waiter and return that error. If a
+ // timeout was specified and the deadline has passed, we remove ourselves as a waiter
+ // and return `error.Timeout`. Otherwise, we'll loop back to the futex wait.
result catch |err| {
const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters > 0); // underflow caused by illegal state
return err;
};
+ if (mode == .timeout and mode.timeout != .none) {
+ if (mode.timeout.deadline.untilNow(io).raw.nanoseconds >= 0) {
+ const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic);
+ assert(prev_state.waiters > 0); // underflow caused by illegal state
+ return error.Timeout;
+ }
+ }
}
}
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
@@ -971,3 +971,47 @@ test "Select.cancel with no tasks, no deadlock" {
var select: Io.Select(U) = .init(io, &.{});
try expectEqual(null, select.cancel());
}
+
+test "Condition" {
+ if (builtin.single_threaded) return error.SkipZigTest;
+ const io = testing.io;
+
+ const TestContext = struct {
+ ready: Io.Event = .unset,
+ mutex: Io.Mutex = .init,
+ cond: Io.Condition = .init,
+ value: u32 = 0,
+
+ fn worker(ctx: *@This()) !void {
+ defer ctx.ready.set(io);
+
+ try ctx.mutex.lock(io);
+ defer ctx.mutex.unlock(io);
+
+ try expectError(error.Timeout, ctx.cond.waitTimeout(io, &ctx.mutex, .{ .duration = .{
+ .raw = .fromMilliseconds(1),
+ .clock = .awake,
+ } }));
+ try expectEqual(0, ctx.value);
+
+ ctx.ready.set(io);
+
+ while (ctx.value == 0) try ctx.cond.wait(io, &ctx.mutex);
+ try expectEqual(1, ctx.value);
+ }
+ };
+
+ var ctx: TestContext = .{};
+
+ var future = try io.concurrent(TestContext.worker, .{&ctx});
+ defer future.cancel(io) catch {};
+
+ try ctx.ready.wait(io);
+
+ try ctx.mutex.lock(io);
+ ctx.value = 1;
+ ctx.mutex.unlock(io);
+ ctx.cond.signal(io);
+
+ try future.await(io);
+}