zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 2046e0f4be23028d18009a61a517d0712e2ca164 (tree)
parent 4772f1a9f418d91e3e157b4f45a8edb7ed7975f0
Author: Matthew Lugg <mlugg@mlugg.co.uk>
Date:   Mon, 22 Dec 2025 12:01:37 +0000

std.Io.Threaded: fix group closure leak

More tasks could be added to the group at any time before it completes,
so it's not valid to look at the `token` passed in here.

There's also a related bug in `Threaded`, which is that tasks spawned in
a group after it is canceled will not observe that cancelation, but that
is a more complex bug which needs some deeper design changes.

Diffstat:
Mlib/std/Io/Threaded.zig | 48++++++++++++++++++++++--------------------------
1 file changed, 22 insertions(+), 26 deletions(-)

diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -1183,10 +1183,12 @@ fn groupConcurrent( t.cond.signal(); } -fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { +fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; + _ = initial_token; // we need to load `token` *after* the group finishes + if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); @@ -1195,42 +1197,40 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { assert(prev_state & GroupClosure.sync_is_waiting == 0); if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) { error.Canceled => { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); + while (it) |node| : (it = node.next) { const gc: *GroupClosure = @fieldParentPtr("node", node); gc.closure.requestCancel(t); - node = node.next orelse break; } event.waitUncancelable(ioBasic(t)); }, }; - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - const node_next = node.next; - gc.deinit(gpa); - node = node_next orelse break; - } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only // thread who can access `group` right now. + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); group.token.raw = null; + while (it) |node| { + it = node.next; // update `it` now, because `deinit` will invalidate `node` + const gc: *GroupClosure = @fieldParentPtr("node", node); + gc.deinit(gpa); + } } -fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { +fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; + _ = initial_token; // we need to load `token` *after* the group finishes + if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); + while (it) |node| : (it = node.next) { const gc: *GroupClosure = @fieldParentPtr("node", node); gc.closure.requestCancel(t); - node = node.next orelse break; } } @@ -1240,20 +1240,16 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void assert(prev_state & GroupClosure.sync_is_waiting == 0); if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t)); - { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - const node_next = node.next; - gc.deinit(gpa); - node = node_next orelse break; - } - } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only // thread who can access `group` right now. + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); group.token.raw = null; + while (it) |node| { + it = node.next; // update `it` now, because `deinit` will invalidate `node` + const gc: *GroupClosure = @fieldParentPtr("node", node); + gc.deinit(gpa); + } } fn recancel(userdata: ?*anyopaque) void {