commit 8c70fd0a57936cca236a000a17364cbbf7a09673 (tree)
parent e779ca72230b5975f48c84ae19c51f0c74f9ecbe
Author: Andrew Kelley <andrew@ziglang.org>
Date: Fri, 27 Feb 2026 13:38:48 -0800
std.Io.Select: introduce cancelDiscard
and make the return value of `cancel` return queue items.
I don't think it's possible to make `cancel` not deadlock with an empty
queue buffer without introducing a new Group primitive.
This is the best I could come up with based on existing primitives.
Let's see if applications find these APIs palatable.
Diffstat:
2 files changed, 87 insertions(+), 12 deletions(-)
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
@@ -1184,8 +1184,6 @@ pub fn Select(comptime U: type) type {
return struct {
io: Io,
group: Group,
- /// The queue is never closed because there may be live resources
- /// inserted into it which would otherwise leak.
queue: Queue(U),
const S = @This();
@@ -1235,7 +1233,7 @@ pub fn Select(comptime U: type) type {
const raw_result = @call(.auto, function, context.args);
const elem = @unionInit(U, @tagName(field), raw_result);
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
- error.Closed => unreachable,
+ error.Closed => {},
};
if (@typeInfo(@TypeOf(raw_result)) == .error_union)
_ = raw_result catch |err| if (err == error.Canceled) return error.Canceled;
@@ -1274,7 +1272,7 @@ pub fn Select(comptime U: type) type {
const raw_result = @call(.auto, function, context.args);
const elem = @unionInit(U, @tagName(field), raw_result);
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
- error.Closed => unreachable,
+ error.Closed => {},
};
if (@typeInfo(@TypeOf(raw_result)) == .error_union)
_ = raw_result catch |err| if (err == error.Canceled) return error.Canceled;
@@ -1286,6 +1284,8 @@ pub fn Select(comptime U: type) type {
/// Blocks until another task of the select finishes.
///
+ /// It is legal to call `async` and `concurrent` after this.
+ ///
/// Threadsafe.
pub fn await(s: *S) Cancelable!U {
return s.queue.getOne(s.io) catch |err| switch (err) {
@@ -1299,6 +1299,8 @@ pub fn Select(comptime U: type) type {
///
/// Asserts that `buffer.len >= min`.
///
+ /// It is legal to call `async` and `concurrent` after this.
+ ///
/// Threadsafe.
pub fn awaitMany(s: *S, buffer: []U, min: usize) Cancelable!usize {
return s.queue.get(s.io, buffer, min) catch |err| switch (err) {
@@ -1307,16 +1309,53 @@ pub fn Select(comptime U: type) type {
};
}
- /// Equivalent to `await` but requests cancelation on all remaining
- /// tasks owned by the select.
+ /// Requests cancelation on all remaining tasks owned by the select,
+ /// then blocks until they all finish. If the select was initialized
+ /// with insufficient buffer space for all remaining tasks to finish, a
+ /// deadlock occurs.
///
- /// For a description of cancelation and cancelation points, see `Future.cancel`.
+ /// If any of the select tasks allocate resources, those tasks may have
+ /// completed, meaning that this function must be called in a loop
+ /// until `null` is returned in order to deallocate those resources. If
+ /// there is no possibility of resource leaks, `cancelDiscard` is
+ /// preferable.
///
- /// It is illegal to call `await` after this.
+ /// It is illegal to call `await` or `awaitMany` after this.
///
- /// Idempotent. Threadsafe.
- pub fn cancel(s: *S) void {
- s.group.cancel(s.io);
+ /// It is safe to call this multiple times, even after `null` is
+ /// returned.
+ ///
+ /// Threadsafe.
+ pub fn cancel(s: *S) ?U {
+ const io = s.io;
+ if (s.group.token.load(.acquire)) |token| {
+ io.vtable.groupCancel(io.userdata, &s.group, token);
+ assert(s.group.token.raw == null);
+ s.queue.close(io);
+ }
+ return s.queue.getOneUncancelable(io) catch |err| switch (err) {
+ error.Closed => return null,
+ };
+ }
+
+ /// Requests cancelation on all remaining tasks owned by the select,
+ /// then blocks until they all finish.
+ ///
+ /// All return values from outstanding tasks are discarded. This
+ /// function is therefore inappropriate to call when a task can return
+ /// an allocated resource. For that use case, see `cancel`.
+ ///
+ /// It is illegal to call `await` or `awaitMany` after this.
+ ///
+ /// It is safe to call this multiple times.
+ ///
+ /// Threadsafe.
+ pub fn cancelDiscard(s: *S) void {
+ const io = s.io;
+ const token = s.group.token.load(.acquire) orelse return;
+ s.queue.close(io);
+ io.vtable.groupCancel(io.userdata, &s.group, token);
+ assert(s.group.token.raw == null);
}
};
}
@@ -1693,6 +1732,12 @@ pub const TypeErasedQueue = struct {
};
}
+ /// After this is called, the queue enters a "closed" state. A closed
+ /// queue always returns `error.Closed` for put attempts even when
+ /// there is space in the buffer. However, existing elements of the
+ /// queue are retrieved before `error.Closed` is returned.
+ ///
+ /// Threadsafe.
pub fn close(q: *TypeErasedQueue, io: Io) void {
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
@@ -1967,6 +2012,12 @@ pub fn Queue(Elem: type) type {
return .{ .type_erased = .init(@ptrCast(buffer)) };
}
+ /// After this is called, the queue enters a "closed" state. A closed
+ /// queue always returns `error.Closed` for put attempts even when
+ /// there is space in the buffer. However, existing elements of the
+ /// queue are retrieved before `error.Closed` is returned.
+ ///
+ /// Threadsafe.
pub fn close(q: *@This(), io: Io) void {
q.type_erased.close(io);
}
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
@@ -835,7 +835,7 @@ test "Select" {
};
var buffer: [4]U = undefined;
var select: Io.Select(U) = .init(io, &buffer);
- defer select.cancel();
+ defer _ = select.cancel();
select.async(.foo, S.foo, .{});
select.concurrent(.bar, S.bar, .{io}) catch |err| switch (err) {
@@ -864,3 +864,27 @@ test "Select" {
try testing.expectEqual(42, result);
}
+
+test "Select with empty buffer, no deadlock" {
+ const S = struct {
+ fn sleeper(io: Io, duration: Io.Duration) Io.Cancelable!void {
+ try io.sleep(duration, .awake);
+ }
+ };
+
+ const io = testing.io;
+
+ const U = union(enum) {
+ sleeper: Io.Cancelable!void,
+ };
+ var select: Io.Select(U) = .init(io, &.{});
+ defer select.cancelDiscard();
+
+ select.concurrent(.sleeper, S.sleeper, .{ io, .fromNanoseconds(1) }) catch |err| switch (err) {
+ error.ConcurrencyUnavailable => return error.SkipZigTest,
+ };
+ select.concurrent(.sleeper, S.sleeper, .{ io, .fromSeconds(600) }) catch |err| switch (err) {
+ error.ConcurrencyUnavailable => return error.SkipZigTest,
+ };
+ assert((try select.await()) == .sleeper);
+}