zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit c8f54a2f07fd4b1bc6ba83c618d0e6f6de265b5c (tree)
parent c6eeae8a8c2314fd35b5baddb18f33b0122d319f
Author: Andrew Kelley <andrew@ziglang.org>
Date:   Sun, 15 Feb 2026 19:29:21 +0100

Merge pull request 'std.Io: remove select function' (#31223) from remove-select into master

Reviewed-on: https://codeberg.org/ziglang/zig/pulls/31223

Diffstat:
Mlib/std/Io.zig | 38--------------------------------------
Mlib/std/Io/Dispatch.zig | 136+++++++++++++++++++------------------------------------------------------------
Mlib/std/Io/Kqueue.zig | 22----------------------
Mlib/std/Io/Threaded.zig | 70----------------------------------------------------------------------
Mlib/std/Io/Uring.zig | 388+++++++++++++++++++++++++++++++++++--------------------------------------------
Mlib/std/Io/test.zig | 34----------------------------------
Mlib/std/os/linux/IoUring.zig | 4++++
7 files changed, 209 insertions(+), 483 deletions(-)

diff --git a/lib/std/Io.zig b/lib/std/Io.zig @@ -144,10 +144,6 @@ pub const VTable = struct { swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection, checkCancel: *const fn (?*anyopaque) Cancelable!void, - /// Blocks until one of the futures from the list has a result ready, such - /// that awaiting it will not block. Returns that index. - select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize, - futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void, futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, @@ -2120,40 +2116,6 @@ pub fn sleep(io: Io, duration: Duration, clock: Clock) Cancelable!void { } }); } -/// Given a struct with each field a `*Future`, returns a union with the same -/// fields, each field type the future's result. -pub fn SelectUnion(S: type) type { - const struct_fields = @typeInfo(S).@"struct".fields; - var names: [struct_fields.len][]const u8 = undefined; - var types: [struct_fields.len]type = undefined; - for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| { - const FieldFuture = @typeInfo(struct_field.type).pointer.child; - union_field_name.* = struct_field.name; - UnionFieldType.* = @FieldType(FieldFuture, "result"); - } - return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{})); -} - -/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type, -/// and can be different for each field. -pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) { - const U = SelectUnion(@TypeOf(s)); - const S = @TypeOf(s); - const fields = @typeInfo(S).@"struct".fields; - var futures: [fields.len]*AnyFuture = undefined; - inline for (fields, &futures) |field, *any_future| { - const future = @field(s, field.name); - any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result); - } - switch (try io.vtable.select(io.userdata, &futures)) { - inline 0...(fields.len - 1) => |selected_index| { - const field_name = fields[selected_index].name; - return @unionInit(U, field_name, @field(s, field_name).await(io)); - }, - else => unreachable, - } -} - pub const LockedStderr = struct { file_writer: *File.Writer, terminal_mode: Terminal.Mode, diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig @@ -100,15 +100,11 @@ const Fiber = struct { required_align: void align(4), evented: *Evented, context: Io.fiber.Context, - await_count: i32, link: union { awaiter: ?*Fiber, group: struct { prev: ?*Fiber, next: ?*Fiber }, }, - status: union(enum) { - queue_next: ?*Fiber, - awaiting_group: Group, - }, + awaiting_group: Group, cancel_status: CancelStatus, cancel_protection: CancelProtection, @@ -123,7 +119,6 @@ const Fiber = struct { const Awaiting = enum(@Int(.unsigned, @bitSizeOf(usize) - shift)) { nothing = 0, group = 1, - select = 2, _, const shift = 1; @@ -216,7 +211,6 @@ const Fiber = struct { } fn destroy(fiber: *Fiber, ev: *Evented) void { - assert(fiber.status.queue_next == null); ev.allocator().free(fiber.allocatedSlice()); } @@ -272,14 +266,11 @@ const Fiber = struct { .group => { // The awaiter received a cancelation request while awaiting a group, // so propagate the cancelation to the group. - if (fiber.status.awaiting_group.cancel(ev, null)) { - fiber.status = .{ .queue_next = null }; + if (fiber.awaiting_group.cancel(ev, null)) { + fiber.awaiting_group = undefined; ev.queue.async(fiber, &Fiber.@"resume"); } }, - .select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) { - ev.queue.async(fiber, &Fiber.@"resume"); - }, _ => |awaiting| awaiting.toCancelable().async(), } } @@ -370,8 +361,6 @@ pub fn io(ev: *Evented) Io { .swapCancelProtection = swapCancelProtection, .checkCancel = checkCancel, - .select = select, - .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -522,9 +511,8 @@ pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !v .required_align = {}, .evented = ev, .context = undefined, - .await_count = 0, .link = .{ .awaiter = null }, - .status = .{ .queue_next = null }, + .awaiting_group = undefined, .cancel_status = .unrequested, .cancel_protection = .unblocked, }, @@ -642,7 +630,7 @@ const SwitchMessage = struct { const PendingTask = union(enum) { nothing, - await: u31, + await: *Fiber, activate: c.dispatch.object_t, @"resume": c.dispatch.object_t, group_await: Group, @@ -661,10 +649,10 @@ const SwitchMessage = struct { thread.current_context = message.contexts.new; switch (message.pending_task) { .nothing => {}, - .await => |count| { - const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); - if (@atomicRmw(i32, &fiber.await_count, .Sub, count, .monotonic) > 0) - ev.queue.async(fiber, &Fiber.@"resume"); + .await => |awaiting| { + const awaiter: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); + if (@atomicRmw(?*Fiber, &awaiting.link.awaiter, .Xchg, awaiter, .acq_rel) == + Fiber.finished) ev.queue.async(awaiter, &Fiber.@"resume"); }, .activate => |object| object.activate(), .@"resume" => |object| object.@"resume"(), @@ -998,7 +986,7 @@ fn crashHandler(userdata: ?*anyopaque) void { } const AsyncClosure = struct { - ev: *Evented, + evented: *Evented, fiber: *Fiber, start: *const fn (context: *const anyopaque, result: *anyopaque) void, result_align: Alignment, @@ -1035,13 +1023,13 @@ const AsyncClosure = struct { closure: *AsyncClosure, message: *const SwitchMessage, ) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { - message.handle(closure.ev); + const ev = closure.evented; const fiber = closure.fiber; + message.handle(ev); closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align)); if (@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel)) |awaiter| - if (@atomicRmw(i32, &awaiter.await_count, .Add, 1, .monotonic) == -1) - closure.ev.queue.async(awaiter, &Fiber.@"resume"); - closure.ev.yield(.nothing); + ev.queue.async(awaiter, &Fiber.@"resume"); + ev.yield(.nothing); unreachable; // switched to dead fiber } }; @@ -1096,14 +1084,13 @@ fn concurrent( }, else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, - .await_count = 0, .link = .{ .awaiter = null }, - .status = .{ .queue_next = null }, + .awaiting_group = undefined, .cancel_status = .unrequested, .cancel_protection = .unblocked, }; closure.* = .{ - .ev = ev, + .evented = ev, .fiber = fiber, .start = start, .result_align = result_alignment, @@ -1121,18 +1108,11 @@ fn await( result_alignment: Alignment, ) void { const ev: *Evented = @ptrCast(@alignCast(userdata)); - const fiber = Thread.current().currentFiber(); - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, fiber, .acq_rel)) |awaiter| { - assert(awaiter == Fiber.finished); - } else while (true) { - ev.yield(.{ .await = 1 }); - const awaiter = @atomicLoad(?*Fiber, &future_fiber.link.awaiter, .acquire); - if (awaiter == Fiber.finished) break; - assert(awaiter == fiber); // spurious wakeup - } - @memcpy(result, future_fiber.resultBytes(result_alignment)); - future_fiber.destroy(ev); + const awaiting: *Fiber = @ptrCast(@alignCast(future)); + if (@atomicLoad(?*Fiber, &awaiting.link.awaiter, .acquire) != Fiber.finished) + ev.yield(.{ .await = awaiting }); + @memcpy(result, awaiting.resultBytes(result_alignment)); + awaiting.destroy(ev); } fn cancel( @@ -1267,8 +1247,8 @@ const Group = struct { .awaiter_delayed = false, .fibers = .null, }, .release); - assert(awaiter.status.awaiting_group.ptr == group.ptr); - awaiter.status = .{ .queue_next = null }; + assert(awaiter.awaiting_group.ptr == group.ptr); + awaiter.awaiting_group = undefined; return awaiter; } // Race with `Fiber.requestCancel` @@ -1336,8 +1316,7 @@ const Group = struct { /// Assumes the mutex is held. fn registerAwaiter(group: Group, awaiter: *Fiber) bool { - assert(awaiter.status.queue_next == null); - awaiter.status = .{ .awaiting_group = group }; + awaiter.awaiting_group = group; assert(@atomicRmw( Awaiter, group.awaiterPtr(), @@ -1349,7 +1328,7 @@ const Group = struct { } const AsyncClosure = struct { - ev: *Evented, + evented: *Evented, group: Group, fiber: *Fiber, start: *const fn (context: *const anyopaque) Io.Cancelable!void, @@ -1388,19 +1367,15 @@ const Group = struct { closure: *Group.AsyncClosure, message: *const SwitchMessage, ) callconv(.withStackAlign(.c, @alignOf(Group.AsyncClosure))) noreturn { - message.handle(closure.ev); - assert(closure.fiber.status.queue_next == null); - const result = closure.start(closure.contextPointer()); - const ev = closure.ev; - const group = closure.group; + const ev = closure.evented; const fiber = closure.fiber; - const cancel_acknowledged = fiber.cancel_protection.acknowledged; - if (result) { - assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + message.handle(ev); + if (closure.start(closure.contextPointer())) { + assert(!fiber.cancel_protection.acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` } else |err| switch (err) { - error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + error.Canceled => assert(fiber.cancel_protection.acknowledged), // group task returned `error.Canceled` but was never canceled } - if (group.removeFiber(ev, fiber)) |awaiter| ev.queue.async(awaiter, &Fiber.@"resume"); + if (closure.group.removeFiber(ev, fiber)) |awaiter| ev.queue.async(awaiter, &Fiber.@"resume"); ev.yield(.destroy); unreachable; // switched to dead fiber } @@ -1470,14 +1445,13 @@ fn groupConcurrent( }, else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, - .await_count = 0, .link = .{ .group = .{ .prev = null, .next = null } }, - .status = .{ .queue_next = null }, + .awaiting_group = undefined, .cancel_status = .unrequested, .cancel_protection = .unblocked, }; closure.* = .{ - .ev = ev, + .evented = ev, .group = group, .fiber = fiber, .start = start, @@ -1689,50 +1663,6 @@ fn futexForAddress(ev: *Evented, address: usize) *Futex { return &ev.futexes[hashed >> @clz(ev.futexes.len - 1)]; } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - const fiber = Thread.current().currentFiber(); - var await_count: u31, var result = for (futures, 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw( - ?*Fiber, - &future_fiber.link.awaiter, - .Xchg, - fiber, - .acq_rel, - )) |awaiter| { - assert(awaiter == Fiber.finished); - break .{ @intCast(future_index), future_index }; - } - } else result: { - const await_count: u31 = @intCast(futures.len); - ev.yield(.{ .await = 1 }); - break :result .{ await_count - 1, futures.len }; - }; - for (futures[0..result], 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic); - if (awaiter == Fiber.finished) { - @atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic); - result = @min(future_index, result); - } else { - assert(awaiter == fiber); - await_count -= 1; - } - } - // Equivalent to `ev.yield(null, .{ .await = await_count });`, - // but avoiding a context switch in the common case. - switch (std.math.order( - @atomicRmw(i32, &fiber.await_count, .Sub, await_count, .monotonic), - await_count, - )) { - .lt => ev.yield(.{ .await = 0 }), - .eq => {}, - .gt => unreachable, - } - return result; -} - fn futexWait( userdata: ?*anyopaque, ptr: *const u32, diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig @@ -491,7 +491,6 @@ const SwitchMessage = struct { reschedule, recycle: *Fiber, register_awaiter: *?*Fiber, - register_select: []const *Io.AnyFuture, exit, }; @@ -514,19 +513,6 @@ const SwitchMessage = struct { if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .register_select => |futures| { - const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); - assert(prev_fiber.queue_next == null); - for (futures) |any_future| { - const future_fiber: *Fiber = @ptrCast(@alignCast(any_future)); - if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) { - const closure: *AsyncClosure = .fromFiber(future_fiber); - if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) { - k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); - } - } - } - }, .exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| { const changes = [_]posix.Kevent{ .{ @@ -628,7 +614,6 @@ pub fn io(k: *Kqueue) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -824,13 +809,6 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void @panic("TODO"); } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = futures; - @panic("TODO"); -} - fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig @@ -1772,7 +1772,6 @@ pub fn io(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -1938,7 +1937,6 @@ pub fn ioBasic(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -11727,74 +11725,6 @@ fn sleepNanosleep(t: *Threaded, timeout: Io.Timeout) Io.Cancelable!void { } } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - - var num_completed: std.atomic.Value(u32) = .init(0); - - for (futures, 0..) |any_future, i| { - const future: *Future = @ptrCast(@alignCast(any_future)); - future.awaiter = &num_completed; - const old_status = future.status.fetchOr( - .{ .tag = .pending_awaited, .thread = .null }, - .release, // release `future.awaiter` - ); - switch (old_status.tag) { - .pending => {}, - .pending_awaited => unreachable, // `await` raced with `select` - .pending_canceled => unreachable, // `cancel` raced with `select` - .done => { - future.status.store(old_status, .monotonic); - _ = finishSelect(&num_completed, futures[0..i]); - return i; - }, - } - } - - errdefer _ = finishSelect(&num_completed, futures); - - while (true) { - const n = num_completed.load(.acquire); - if (n > 0) break; - assert(n < futures.len); - try Thread.futexWait(&num_completed.raw, n, null); - } - return finishSelect(&num_completed, futures).?; -} -fn finishSelect( - num_completed: *std.atomic.Value(u32), - futures: []const *Io.AnyFuture, -) ?usize { - var completed_index: ?usize = null; - var expect_completed: u32 = 0; - for (futures, 0..) |any_future, i| { - const future: *Future = @ptrCast(@alignCast(any_future)); - // This operation will convert `.pending_awaited` to `.pending`, or leave `.done` untouched. - switch (future.status.fetchAnd( - .{ .tag = @enumFromInt(0b10), .thread = .all_ones }, - .monotonic, - ).tag) { - .pending_awaited => {}, - .pending => unreachable, - .pending_canceled => unreachable, - .done => { - expect_completed += 1; - completed_index = i; - }, - } - } - // If any future has just finished, wait for it to signal `num_completed` to avoid dangling - // references to stack memory. - while (true) { - const n = num_completed.load(.acquire); - if (n == expect_completed) break; - assert(n < expect_completed); - Thread.futexWaitUncancelable(&num_completed.raw, n, null); - } - return completed_index; -} - fn netListenIpPosix( userdata: ?*anyopaque, address: IpAddress, diff --git a/lib/std/Io/Uring.zig b/lib/std/Io/Uring.zig @@ -150,7 +150,6 @@ const Thread = struct { const Fiber = struct { required_align: void align(4), context: Io.fiber.Context, - await_count: i32, link: union { awaiter: ?*Fiber, group: struct { prev: ?*Fiber, next: ?*Fiber }, @@ -175,7 +174,6 @@ const Fiber = struct { const Awaiting = enum(u31) { nothing = std.math.maxInt(u31), group = std.math.maxInt(u31) - 1, - select = std.math.maxInt(u31) - 2, /// An io_uring fd. _, @@ -186,14 +184,14 @@ const Fiber = struct { fn fromIoUringFd(fd: fd_t) Awaiting { const awaiting: Awaiting = @enumFromInt(fd); switch (awaiting) { - .nothing, .group, .select => unreachable, + .nothing, .group => unreachable, _ => return awaiting, } } fn toIoUringFd(awaiting: Awaiting) fd_t { switch (awaiting) { - .nothing, .group, .select => unreachable, + .nothing, .group => unreachable, _ => return @intFromEnum(awaiting), } } @@ -376,9 +374,6 @@ const Fiber = struct { _ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber }); } }, - .select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) { - _ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber }); - }, _ => |awaiting| { const awaiting_io_uring_fd = awaiting.toIoUringFd(); const thread: *Thread = .current(); @@ -684,8 +679,6 @@ pub fn io(ev: *Evented) Io { .swapCancelProtection = swapCancelProtection, .checkCancel = checkCancel, - .select = select, - .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -862,7 +855,6 @@ pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !v main_fiber.* = .{ .required_align = {}, .context = undefined, - .await_count = 0, .link = .{ .awaiter = null }, .status = .{ .queue_next = null }, .cancel_status = .unrequested, @@ -1266,7 +1258,7 @@ const SwitchMessage = struct { const PendingTask = union(enum) { nothing, reschedule, - await: u31, + await: *Fiber, group_await: Group, group_cancel: Group, batch_await: *Io.Batch, @@ -1290,10 +1282,11 @@ const SwitchMessage = struct { assert(fiber.status.queue_next == null); _ = ev.schedule(thread, .{ .head = fiber, .tail = fiber }); }, - .await => |count| { - const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); - if (@atomicRmw(i32, &fiber.await_count, .Sub, count, .monotonic) > 0) - _ = ev.schedule(thread, .{ .head = fiber, .tail = fiber }); + .await => |awaiting| { + const awaiter: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); + assert(awaiter.status.queue_next == null); + if (@atomicRmw(?*Fiber, &awaiting.link.awaiter, .Xchg, awaiter, .acq_rel) == + Fiber.finished) _ = ev.schedule(thread, .{ .head = awaiter, .tail = awaiter }); }, .group_await => |group| { const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); @@ -1367,7 +1360,7 @@ fn crashHandler(userdata: ?*anyopaque) void { } const AsyncClosure = struct { - ev: *Evented, + evented: *Evented, fiber: *Fiber, start: *const fn (context: *const anyopaque, result: *anyopaque) void, result_align: Alignment, @@ -1404,16 +1397,11 @@ const AsyncClosure = struct { closure: *AsyncClosure, message: *const SwitchMessage, ) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { - message.handle(closure.ev); + const ev = closure.evented; const fiber = closure.fiber; + message.handle(ev); closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align)); - closure.ev.yield( - if (@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel)) |awaiter| - if (@atomicRmw(i32, &awaiter.await_count, .Add, 1, .monotonic) == -1) awaiter else null - else - null, - .nothing, - ); + ev.yield(@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel), .nothing); unreachable; // switched to dead fiber } }; @@ -1467,7 +1455,6 @@ fn concurrent( }, else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, - .await_count = 0, .link = .{ .awaiter = null }, .status = .{ .queue_next = null }, .cancel_status = .unrequested, @@ -1485,7 +1472,7 @@ fn concurrent( }, }; closure.* = .{ - .ev = ev, + .evented = ev, .fiber = fiber, .start = start, .result_align = result_alignment, @@ -1504,18 +1491,11 @@ fn await( result_alignment: Alignment, ) void { const ev: *Evented = @ptrCast(@alignCast(userdata)); - const fiber = Thread.current().currentFiber(); - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, fiber, .acq_rel)) |awaiter| { - assert(awaiter == Fiber.finished); - } else while (true) { - ev.yield(null, .{ .await = 1 }); - const awaiter = @atomicLoad(?*Fiber, &future_fiber.link.awaiter, .acquire); - if (awaiter == Fiber.finished) break; - assert(awaiter == fiber); // spurious wakeup - } - @memcpy(result, future_fiber.resultBytes(result_alignment)); - future_fiber.destroy(); + const awaiting: *Fiber = @ptrCast(@alignCast(future)); + if (@atomicLoad(?*Fiber, &awaiting.link.awaiter, .acquire) != Fiber.finished) + ev.yield(null, .{ .await = awaiting }); + @memcpy(result, awaiting.resultBytes(result_alignment)); + awaiting.destroy(); } fn cancel( @@ -1732,7 +1712,7 @@ const Group = struct { } const AsyncClosure = struct { - ev: *Evented, + evented: *Evented, group: Group, fiber: *Fiber, start: *const fn (context: *const anyopaque) Io.Cancelable!void, @@ -1771,19 +1751,16 @@ const Group = struct { closure: *Group.AsyncClosure, message: *const SwitchMessage, ) callconv(.withStackAlign(.c, @alignOf(Group.AsyncClosure))) noreturn { - message.handle(closure.ev); - assert(closure.fiber.status.queue_next == null); - const result = closure.start(closure.contextPointer()); - const ev = closure.ev; - const group = closure.group; + const ev = closure.evented; const fiber = closure.fiber; - const cancel_acknowledged = fiber.cancel_protection.acknowledged; - if (result) { - assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + message.handle(ev); + assert(fiber.status.queue_next == null); + if (closure.start(closure.contextPointer())) { + assert(!fiber.cancel_protection.acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` } else |err| switch (err) { - error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + error.Canceled => assert(fiber.cancel_protection.acknowledged), // group task returned `error.Canceled` but was never canceled } - ev.yield(group.removeFiber(ev, fiber), .destroy); + ev.yield(closure.group.removeFiber(ev, fiber), .destroy); unreachable; // switched to dead fiber } }; @@ -1851,7 +1828,6 @@ fn groupConcurrent( }, else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, - .await_count = 0, .link = .{ .group = .{ .prev = null, .next = null } }, .status = .{ .queue_next = null }, .cancel_status = .unrequested, @@ -1869,7 +1845,7 @@ fn groupConcurrent( }, }; closure.* = .{ - .ev = ev, + .evented = ev, .group = group, .fiber = fiber, .start = start, @@ -1928,57 +1904,6 @@ fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { } } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - var cancel_region: CancelRegion = .init(); - defer cancel_region.deinit(); - var await_count: u31, var result = for (futures, 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw( - ?*Fiber, - &future_fiber.link.awaiter, - .Xchg, - cancel_region.fiber, - .acq_rel, - )) |awaiter| { - assert(awaiter == Fiber.finished); - break .{ @intCast(future_index), future_index }; - } - } else result: { - const await_count: u31 = @intCast(futures.len); - cancel_region.await(.select) catch |err| switch (err) { - error.Canceled => |e| break :result .{ await_count + 1, e }, - }; - ev.yield(null, .{ .await = 1 }); - cancel_region.await(.nothing) catch |err| switch (err) { - error.Canceled => |e| break :result .{ await_count, e }, - }; - break :result .{ await_count - 1, futures.len }; - }; - for (futures[0 .. result catch futures.len], 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic); - if (awaiter == Fiber.finished) { - @atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic); - result = if (result) |finished_index| @min(future_index, finished_index) else |e| e; - } else { - assert(awaiter == cancel_region.fiber); - await_count -= 1; - } - } - // Equivalent to `ev.yield(null, .{ .await = await_count });`, - // but avoiding a context switch in the common case. - switch (std.math.order( - @atomicRmw(i32, &cancel_region.fiber.await_count, .Sub, await_count, .monotonic), - await_count, - )) { - .lt => ev.yield(null, .{ .await = 0 }), - .eq => {}, - .gt => unreachable, - } - return result; -} - fn futexWait( userdata: ?*anyopaque, ptr: *const u32, @@ -2230,7 +2155,7 @@ fn deviceIoControl( const rc = linux.ioctl(o.file.handle, @bitCast(o.code), @intFromPtr(o.arg)); switch (linux.errno(rc)) { .SUCCESS => return @bitCast(@as(u32, @truncate(rc))), - .INTR => continue, + .INTR => {}, else => |err| return -@as(i32, @intFromEnum(err)), } } @@ -2628,7 +2553,7 @@ fn dirCreateDir( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => return error.AccessDenied, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .PERM => return error.PermissionDenied, @@ -2711,7 +2636,7 @@ fn filePathKind(ev: *Evented, dir: Dir, sub_path: []const u8) !File.Kind { if (!statx_buf.mask.TYPE) return error.Unexpected; return statxKind(statx_buf.mode); }, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // File descriptor used after closed. .FAULT => |err| return errnoBug(err), @@ -2824,7 +2749,7 @@ fn dirAccess( try sync.cancel_region.await(.nothing); switch (linux.errno(linux.faccessat(dir.handle, sub_path_posix, mode, flags))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .ROFS => return error.ReadOnlyFileSystem, @@ -2863,7 +2788,7 @@ fn dirCreateFile( .EXCL = flags.exclusive, .CLOEXEC = true, }, flags.permissions.toMode()); - errdefer ev.close(fd); + errdefer ev.closeAsync(fd); switch (flags.lock) { .none => {}, @@ -3051,7 +2976,7 @@ fn dirOpenFile( .CLOEXEC = true, .PATH = flags.path_only, }, 0); - errdefer ev.close(fd); + errdefer ev.closeAsync(fd); if (!flags.allow_directory) { const is_dir = is_dir: { @@ -3105,7 +3030,7 @@ fn dirRead(userdata: ?*anyopaque, dr: *Dir.Reader, buffer: []Dir.Entry) Dir.Read const rc = linux.getdents64(dr.dir.handle, dr.buffer.ptr, dr.buffer.len); switch (linux.errno(rc)) { .SUCCESS => break rc, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), // Dir is invalid or was opened without iteration ability. .FAULT => |err| return errnoBug(err), .NOTDIR => |err| return errnoBug(err), @@ -3198,7 +3123,7 @@ fn dirRealPathFile( error.FileLocksUnsupported => return errnoBug(.OPNOTSUPP), // Not asking for locks. else => |e| return e, }; - defer ev.close(fd); + defer ev.closeAsync(fd); return ev.realPath(try maybe_sync.enterSync(ev), fd, out_buffer); } @@ -3231,7 +3156,7 @@ fn dirDeleteFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8) Dir.Dele ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .PERM => return error.PermissionDenied, .ACCES => return error.AccessDenied, .BUSY => return error.FileBusy, @@ -3283,7 +3208,7 @@ fn dirDeleteDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8) Dir.Delet ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .BUSY => return error.FileBusy, @@ -3399,7 +3324,7 @@ fn dirSymLink( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), .ACCES => return error.AccessDenied, @@ -3438,7 +3363,7 @@ fn dirReadLink( const rc = linux.readlinkat(dir.handle, sub_path_posix, buffer.ptr, buffer.len); switch (linux.errno(rc)) { .SUCCESS => return @bitCast(rc), - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .FAULT => |err| return errnoBug(err), .INVAL => return error.NotLink, @@ -3628,7 +3553,7 @@ fn fileLength(userdata: ?*anyopaque, file: File) File.LengthError!u64 { if (!statx_buf.mask.SIZE) return error.Unexpected; return statx_buf.size; }, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // File descriptor used after closed. .FAULT => |err| return errnoBug(err), @@ -3811,7 +3736,7 @@ fn fileSync(userdata: ?*anyopaque, file: File) File.SyncError!void { ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .BADF => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), .ROFS => |err| return errnoBug(err), @@ -3833,7 +3758,7 @@ fn fileIsTty(userdata: ?*anyopaque, file: File) Io.Cancelable!bool { const rc = linux.ioctl(file.handle, linux.T.IOCGWINSZ, @intFromPtr(&wsz)); switch (linux.errno(rc)) { .SUCCESS => return true, - .INTR => continue, + .INTR => {}, else => return false, } } @@ -3869,7 +3794,7 @@ fn fileSetLength(userdata: ?*anyopaque, file: File, length: u64) File.SetLengthE ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .FBIG => return error.FileTooBig, .IO => return error.InputOutput, .PERM => return error.PermissionDenied, @@ -4051,7 +3976,7 @@ fn fileMemoryMapCreate( const rc = linux.mmap(null, options.len, prot, flags, file.handle, casted_offset); switch (linux.errno(rc)) { .SUCCESS => break @as([*]align(page_align) u8, @ptrFromInt(rc))[0..options.len], - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .AGAIN => return error.LockedMemoryLimitExceeded, .MFILE => return error.ProcessFdQuotaExceeded, @@ -4111,7 +4036,7 @@ fn fileMemoryMapSetLength( const rc = linux.mremap(old_memory.ptr, old_memory.len, new_len, flags, addr_hint); switch (linux.errno(rc)) { .SUCCESS => break @as([*]align(page_align) u8, @ptrFromInt(rc))[0..new_len], - .INTR => continue, + .INTR => {}, .AGAIN => return error.LockedMemoryLimitExceeded, .NOMEM => return error.OutOfMemory, .INVAL => |err| return errnoBug(err), @@ -4220,7 +4145,7 @@ fn processCurrentPath(userdata: ?*anyopaque, buffer: []u8) process.CurrentPathEr try sync.cancel_region.await(.nothing); switch (linux.errno(linux.getcwd(buffer.ptr, buffer.len))) { .SUCCESS => return std.mem.findScalar(u8, buffer, 0).?, - .INTR => continue, + .INTR => {}, .NOENT => return error.CurrentDirUnlinked, .RANGE => return error.NameTooLong, .FAULT => |err| return errnoBug(err), @@ -4235,7 +4160,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr if (dir.handle == linux.AT.FDCWD) return; var sync: CancelRegion.Sync = try .init(ev); defer sync.deinit(ev); - return ev.fchdir(&sync, dir.handle); + return fchdir(&sync, dir.handle); } fn processSetCurrentPath(userdata: ?*anyopaque, dir_path: []const u8) ChdirError!void { @@ -4272,7 +4197,7 @@ fn processReplace(userdata: ?*anyopaque, options: process.ReplaceOptions) proces var sync: CancelRegion.Sync = try .init(ev); defer sync.deinit(ev); - return ev.execv(&sync, options.expand_arg0, argv_buf.ptr[0].?, argv_buf.ptr, env_block, PATH); + return execv(&sync, options.expand_arg0, argv_buf.ptr[0].?, argv_buf.ptr, env_block, PATH); } fn processReplacePath( @@ -4292,7 +4217,7 @@ fn processSpawn(userdata: ?*anyopaque, options: process.SpawnOptions) process.Sp const spawned = try ev.spawn(options); var cancel_region: CancelRegion = .initBlocked(); defer cancel_region.deinit(); - defer ev.close(spawned.err_fd); + defer ev.closeAsync(spawned.err_fd); // Wait for the child to report any errors in or before `execvpe`. var child_err: ForkBailError = undefined; @@ -4434,8 +4359,9 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned if (pid_result == 0) { defer comptime unreachable; // We are the child. + // Note that the parent uring is no longer accessible, so we must no longer reference `ev`. var sync: CancelRegion.Sync = .{ .cancel_region = .initBlocked() }; - const err = ev.setUpChild(&sync, .{ + const err = setUpChild(&sync, .{ .stdin_pipe = stdin_pipe[0], .stdout_pipe = stdout_pipe[1], .stderr_pipe = stderr_pipe[1], @@ -4446,7 +4372,7 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned .PATH = PATH, .spawn = options, }); - ev.writeAll(&sync.cancel_region, err_pipe[1], @ptrCast(&err)) catch {}; + writeAllSync(&sync, err_pipe[1], @ptrCast(&err)) catch {}; const exit = if (builtin.single_threaded) linux.exit else linux.exit_group; exit(1); } @@ -4454,13 +4380,13 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned const pid: pid_t = @intCast(pid_result); // We are the parent. errdefer comptime unreachable; // The child is forked; we must not error from now on - ev.close(err_pipe[1]); // make sure only the child holds the write end open + ev.closeAsync(err_pipe[1]); // make sure only the child holds the write end open - if (options.stdin == .pipe) ev.close(stdin_pipe[0]); - if (options.stdout == .pipe) ev.close(stdout_pipe[1]); - if (options.stderr == .pipe) ev.close(stderr_pipe[1]); + if (options.stdin == .pipe) ev.closeAsync(stdin_pipe[0]); + if (options.stdout == .pipe) ev.closeAsync(stdout_pipe[1]); + if (options.stderr == .pipe) ev.closeAsync(stderr_pipe[1]); - if (prog_pipe[1] != -1) ev.close(prog_pipe[1]); + if (prog_pipe[1] != -1) ev.closeAsync(prog_pipe[1]); options.progress_node.setIpcFile(ev, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } }); @@ -4497,14 +4423,14 @@ pub fn pipe2(flags: linux.O) PipeError![2]fd_t { } } fn destroyPipe(ev: *Evented, pipe: [2]fd_t) void { - if (pipe[0] != -1) ev.close(pipe[0]); - if (pipe[0] != pipe[1]) ev.close(pipe[1]); + if (pipe[0] != -1) ev.closeAsync(pipe[0]); + if (pipe[0] != pipe[1]) ev.closeAsync(pipe[1]); } /// Errors that can occur between fork() and execv() const ForkBailError = process.SetCurrentDirError || ChdirError || process.SpawnError || process.ReplaceError; -fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct { +fn setUpChild(sync: *CancelRegion.Sync, options: struct { stdin_pipe: fd_t, stdout_pipe: fd_t, stderr_pipe: fd_t, @@ -4515,21 +4441,21 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct { PATH: []const u8, spawn: process.SpawnOptions, }) ForkBailError { - try ev.setUpChildIo( + try setUpChildIo( sync, options.spawn.stdin, options.stdin_pipe, linux.STDIN_FILENO, options.dev_null_fd, ); - try ev.setUpChildIo( + try setUpChildIo( sync, options.spawn.stdout, options.stdout_pipe, linux.STDOUT_FILENO, options.dev_null_fd, ); - try ev.setUpChildIo( + try setUpChildIo( sync, options.spawn.stderr, options.stderr_pipe, @@ -4539,17 +4465,17 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct { switch (options.spawn.cwd) { .inherit => {}, - .dir => |cwd_dir| try ev.fchdir(sync, cwd_dir.handle), + .dir => |cwd_dir| try fchdir(sync, cwd_dir.handle), .path => |cwd_path| { var cwd_path_buffer: [PATH_MAX]u8 = undefined; const cwd_path_posix = try pathToPosix(cwd_path, &cwd_path_buffer); - try ev.chdir(sync, cwd_path_posix); + try chdir(sync, cwd_path_posix); }, } // Must happen after fchdir above, the cwd file descriptor might be // equal to prog_fileno and be clobbered by this dup2 call. - if (options.prog_pipe != -1) try ev.dup2(sync, options.prog_pipe, prog_fileno); + if (options.prog_pipe != -1) try dup2(sync, options.prog_pipe, prog_fileno); if (options.spawn.gid) |gid| { switch (linux.errno(linux.setregid(gid, gid))) { @@ -4589,7 +4515,7 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct { } } - return ev.execv( + return execv( sync, options.spawn.expand_arg0, options.argv_buf.ptr[0].?, @@ -4600,7 +4526,6 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct { } fn setUpChildIo( - ev: *Evented, sync: *CancelRegion.Sync, stdio: process.SpawnOptions.StdIo, pipe_fd: fd_t, @@ -4608,13 +4533,13 @@ fn setUpChildIo( dev_null_fd: fd_t, ) !void { switch (stdio) { - .pipe => try ev.dup2(sync, pipe_fd, std_fileno), + .pipe => try dup2(sync, pipe_fd, std_fileno), .close => _ = linux.close(std_fileno), .inherit => {}, - .ignore => try ev.dup2(sync, dev_null_fd, std_fileno), + .ignore => try dup2(sync, dev_null_fd, std_fileno), .file => |file| { if (file.flags.nonblocking) @panic("TODO implement setUpChildIo when nonblocking file is used"); - try ev.dup2(sync, file.handle, std_fileno); + try dup2(sync, file.handle, std_fileno); }, } } @@ -4623,13 +4548,12 @@ pub const DupError = error{ ProcessFdQuotaExceeded, SystemResources, } || Io.UnexpectedError || Io.Cancelable; -pub fn dup2(ev: *Evented, sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t) DupError!void { - _ = ev; +pub fn dup2(sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t) DupError!void { while (true) { try sync.cancel_region.await(.nothing); switch (linux.errno(linux.dup2(old_fd, new_fd))) { .SUCCESS => {}, - .BUSY, .INTR => continue, + .BUSY, .INTR => {}, .INVAL => |err| return errnoBug(err), // invalid parameters .BADF => |err| return errnoBug(err), // use after free .MFILE => return error.ProcessFdQuotaExceeded, @@ -4640,7 +4564,6 @@ pub fn dup2(ev: *Evented, sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t) } fn execv( - ev: *Evented, sync: *CancelRegion.Sync, arg0_expand: process.ArgExpansion, file: [*:0]const u8, @@ -4649,7 +4572,8 @@ fn execv( PATH: []const u8, ) process.ReplaceError { const file_slice = std.mem.sliceTo(file, 0); - if (std.mem.findScalar(u8, file_slice, '/') != null) return ev.execvPath(sync, file, child_argv, env_block); + if (std.mem.findScalar(u8, file_slice, '/') != null) + return execvPath(sync, file, child_argv, env_block); // Use of PATH_MAX here is valid as the path_buf will be passed // directly to the operating system in posixExecvPath. @@ -4677,7 +4601,7 @@ fn execv( .expand => child_argv[0] = full_path, .no_expand => {}, } - err = ev.execvPath(sync, full_path, child_argv, env_block); + err = execvPath(sync, full_path, child_argv, env_block); switch (err) { error.AccessDenied => seen_eacces = true, error.FileNotFound, error.NotDir => {}, @@ -4689,13 +4613,11 @@ fn execv( } /// This function ignores PATH environment variable. pub fn execvPath( - ev: *Evented, sync: *CancelRegion.Sync, path: [*:0]const u8, child_argv: [*:null]const ?[*:0]const u8, env_block: process.Environ.PosixBlock, ) process.ReplaceError { - _ = ev; try sync.cancel_region.await(.nothing); switch (linux.errno(linux.execve(path, child_argv, env_block.slice.ptr))) { .FAULT => |err| return errnoBug(err), // Bad pointer parameter. @@ -4766,7 +4688,7 @@ fn childWait(userdata: ?*anyopaque, child: *process.Child) process.Child.WaitErr child.resource_usage_statistics.rusage = rusage; break; }, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .CHILD => |err| return errnoBug(err), // Double-free. else => |err| return unexpectedErrno(err), } @@ -4781,7 +4703,7 @@ fn childWait(userdata: ?*anyopaque, child: *process.Child) process.Child.WaitErr _, .CONTINUED => .{ .unknown = status }, }; }, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .CHILD => |err| return errnoBug(err), // Double-free. else => |err| return unexpectedErrno(err), } @@ -4798,7 +4720,7 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void { const pid = child.id.?; while (true) switch (linux.errno(linux.kill(pid, .TERM))) { .SUCCESS => break, - .INTR => continue, + .INTR => {}, .PERM => return, .INVAL => |err| return errnoBug(err) catch {}, .SRCH => |err| return errnoBug(err) catch {}, @@ -4830,7 +4752,7 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void { ev.yield(null, .nothing); switch (maybe_sync.cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .CHILD => |err| return errnoBug(err) catch {}, // Double-free. else => |err| return unexpectedErrno(err) catch {}, } @@ -4839,15 +4761,15 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void { fn childCleanup(ev: *Evented, child: *process.Child) void { if (child.stdin) |*stdin| { - ev.close(stdin.handle); + ev.closeAsync(stdin.handle); child.stdin = null; } if (child.stdout) |*stdout| { - ev.close(stdout.handle); + ev.closeAsync(stdout.handle); child.stdout = null; } if (child.stderr) |*stderr| { - ev.close(stderr.handle); + ev.closeAsync(stderr.handle); child.stderr = null; } child.id = null; @@ -4953,14 +4875,10 @@ fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void { .resv = 0, }; ev.yield(null, .nothing); - switch (cancel_region.errno()) { - // Handles SUCCESS as well as clock not available and unexpected - // errors. The user had a chance to check clock resolution before - // getting here, which would have reported 0, making this a legal - // amount of time to sleep. - else => return, - .INTR, .CANCELED => return error.Canceled, - } + // Handles SUCCESS as well as clock not available and unexpected + // errors. The user had a chance to check clock resolution before + // getting here, which would have reported 0, making this a legal + // amount of time to sleep. } fn random(userdata: ?*anyopaque, buffer: []u8) void { @@ -5037,7 +4955,7 @@ fn netBindIp( var maybe_sync: CancelRegion.Sync.Maybe = .{ .cancel_region = .init() }; defer maybe_sync.deinit(ev); const socket_fd = try ev.socket(&maybe_sync.cancel_region, family, options); - errdefer ev.close(socket_fd); + errdefer ev.closeAsync(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); try ev.bind(&maybe_sync.cancel_region, socket_fd, &storage.any, addr_len); @@ -5204,23 +5122,19 @@ fn netReceive( .data = data, .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control, .flags = .{ - .eor = (msg.flags & linux.MSG.EOR) != 0, - .trunc = (msg.flags & linux.MSG.TRUNC) != 0, - .ctrunc = (msg.flags & linux.MSG.CTRUNC) != 0, - .oob = (msg.flags & linux.MSG.OOB) != 0, - .errqueue = if (@hasDecl(linux.MSG, "ERRQUEUE")) (msg.flags & linux.MSG.ERRQUEUE) != 0 else false, + .eor = msg.flags & linux.MSG.EOR != 0, + .trunc = msg.flags & linux.MSG.TRUNC != 0, + .ctrunc = msg.flags & linux.MSG.CTRUNC != 0, + .oob = msg.flags & linux.MSG.OOB != 0, + .errqueue = msg.flags & linux.MSG.ERRQUEUE != 0, }, }; message_i += 1; continue; }, .AGAIN => unreachable, - .INTR, .CANCELED => { - if (deadline) |d| { - if (now(ev, d.clock).nanoseconds >= d.raw.nanoseconds) return .{ error.Timeout, message_i }; - } - continue; - }, + .INTR, .CANCELED => if (deadline) |d| if (now(ev, d.clock).nanoseconds >= d.raw.nanoseconds) + return .{ error.Timeout, message_i }, .BADF => |err| return .{ errnoBug(err), message_i }, .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, @@ -5323,7 +5237,7 @@ fn netShutdown( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .BADF, .NOTSOCK, .INVAL => |err| return errnoBug(err), .NOTCONN => return error.SocketUnconnected, .NOBUFS => return error.SystemResources, @@ -5393,7 +5307,7 @@ fn bind( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ADDRINUSE => return error.AddressInUse, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .INVAL => |err| return errnoBug(err), // invalid parameters @@ -5407,13 +5321,12 @@ fn bind( } } -fn chdir(ev: *Evented, sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void { - _ = ev; +fn chdir(sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void { while (true) { try sync.cancel_region.await(.nothing); switch (linux.errno(linux.chdir(path))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .IO => return error.FileSystem, .LOOP => return error.SymLinkLoop, @@ -5429,6 +5342,36 @@ fn chdir(ev: *Evented, sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError } fn close(ev: *Evented, fd: fd_t) void { + var cancel_region: CancelRegion = .initBlocked(); + defer cancel_region.deinit(); + const thread = cancel_region.awaitIoUring() catch |err| switch (err) { + error.Canceled => unreachable, // blocked + }; + thread.enqueue().* = .{ + .opcode = .CLOSE, + .flags = 0, + .ioprio = 0, + .fd = fd, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = @intFromPtr(cancel_region.fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; + ev.yield(null, .nothing); + switch (cancel_region.errno()) { + .BADF => recoverableOsBugDetected(), // Always a race condition. + .INTR => {}, // This is still a success. See https://github.com/ziglang/zig/issues/2425 + else => {}, + } +} + +fn closeAsync(ev: *Evented, fd: fd_t) void { _ = ev; const thread: *Thread = .current(); thread.enqueue().* = .{ @@ -5449,14 +5392,13 @@ fn close(ev: *Evented, fd: fd_t) void { }; } -fn fchdir(ev: *Evented, sync: *CancelRegion.Sync, dir: fd_t) process.SetCurrentDirError!void { - _ = ev; +fn fchdir(sync: *CancelRegion.Sync, dir: fd_t) process.SetCurrentDirError!void { if (dir == linux.AT.FDCWD) return; while (true) { try sync.cancel_region.await(.nothing); switch (linux.errno(linux.fchdir(dir))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .NOTDIR => return error.NotDir, .IO => return error.FileSystem, @@ -5479,7 +5421,7 @@ fn fchmodat( try sync.cancel_region.await(.nothing); switch (linux.errno(linux.fchmodat2(dir, path, mode, flags))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), @@ -5511,7 +5453,7 @@ fn fchownat( try sync.cancel_region.await(.nothing); switch (linux.errno(linux.fchownat(dir, path, owner, group, flags))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), // likely fd refers to directory opened without `Dir.OpenOptions.iterate` .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), @@ -5543,7 +5485,7 @@ fn flock( .exclusive => LOCK.EX, })))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), // invalid parameters .NOLCK => return error.SystemResources, @@ -5593,7 +5535,7 @@ fn getsockname( try sync.cancel_region.await(.nothing); switch (linux.errno(linux.getsockname(socket_fd, addr, addr_len))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), // invalid parameters @@ -5634,7 +5576,7 @@ fn linkat( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => return error.AccessDenied, .DQUOT => return error.DiskQuota, .EXIST => return error.PathAlreadyExists, @@ -5674,7 +5616,7 @@ fn lseek( 8 => linux.lseek(fd, @bitCast(offset), whence), })) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .INVAL => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -5717,7 +5659,7 @@ fn openat( const completion = cancel_region.completion(); switch (completion.errno()) { .SUCCESS => return completion.result, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .FAULT => |err| return errnoBug(err), .INVAL => return error.BadPathName, .BADF => |err| return errnoBug(err), // File descriptor used after closed. @@ -5779,7 +5721,7 @@ fn preadv( const completion = cancel_region.completion(); switch (completion.errno()) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => return error.WouldBlock, @@ -5826,7 +5768,7 @@ fn pwritev( const completion = cancel_region.completion(); switch (completion.errno()) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => return error.WouldBlock, @@ -5876,7 +5818,7 @@ fn realPath( const rc = linux.readlink(proc_path, out_buffer.ptr, out_buffer.len); switch (linux.errno(rc)) { .SUCCESS => return rc, - .INTR => continue, + .INTR => {}, .ACCES => return error.AccessDenied, .FAULT => |err| return errnoBug(err), .IO => return error.FileSystem, @@ -5921,7 +5863,7 @@ fn renameat( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .BUSY => return error.FileBusy, @@ -5988,7 +5930,7 @@ fn setsockopt( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .NOTSOCK => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), @@ -6039,7 +5981,7 @@ fn socket( const completion = cancel_region.completion(); switch (completion.errno()) { .SUCCESS => break completion.result, - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .INVAL => return error.ProtocolUnsupportedBySystem, .MFILE => return error.ProcessFdQuotaExceeded, @@ -6051,7 +5993,7 @@ fn socket( else => |err| return unexpectedErrno(err), } }; - errdefer ev.close(socket_fd); + errdefer ev.closeAsync(socket_fd); if (options.ip6_only) { if (linux.IPV6 == void) return error.OptionUnsupported; @@ -6101,7 +6043,7 @@ fn statx( ev.yield(null, .nothing); switch (cancel_region.errno()) { .SUCCESS => return statFromLinux(&statx_buf), - .INTR, .CANCELED => continue, + .INTR, .CANCELED => {}, .ACCES => return error.AccessDenied, .BADF => |err| return errnoBug(err), // File descriptor used after closed. .FAULT => |err| return errnoBug(err), @@ -6140,7 +6082,7 @@ fn utimensat( try sync.cancel_region.await(.nothing); switch (linux.errno(linux.utimensat(dir, path, times, flags))) { .SUCCESS => return, - .INTR => continue, + .INTR => {}, .BADF => |err| return errnoBug(err), // always a race condition .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), @@ -6152,19 +6094,33 @@ fn utimensat( } } -fn writeAll( - ev: *Evented, - cancel_region: *CancelRegion, - fd: fd_t, - buffer: []const u8, -) (File.Writer.Error || error{EndOfStream})!void { +fn writeAllSync(sync: *CancelRegion.Sync, fd: fd_t, buffer: []const u8) File.Writer.Error!void { var index: usize = 0; - while (buffer.len - index != 0) { - const len = try ev.pwritev(cancel_region, fd, &.{ - .{ .base = buffer[index..].ptr, .len = buffer.len - index }, - }, null); - if (len == 0) return error.EndOfStream; - index += len; + while (buffer.len - index != 0) index += try writeSync(sync, fd, buffer[index..]); +} + +fn writeSync(sync: *CancelRegion.Sync, fd: fd_t, buffer: []const u8) File.Writer.Error!usize { + while (true) { + try sync.cancel_region.await(.nothing); + const rc = linux.write(fd, buffer.ptr, buffer.len); + switch (linux.errno(rc)) { + .SUCCESS => return @intCast(rc), + .INTR => {}, + .INVAL => |err| return errnoBug(err), + .FAULT => |err| return errnoBug(err), + .AGAIN => return error.WouldBlock, + .BADF => return error.NotOpenForWriting, // Can be a race condition. + .DESTADDRREQ => |err| return errnoBug(err), // `connect` was never called. + .DQUOT => return error.DiskQuota, + .FBIG => return error.FileTooBig, + .IO => return error.InputOutput, + .NOSPC => return error.NoSpaceLeft, + .PERM => return error.PermissionDenied, + .PIPE => return error.BrokenPipe, + .CONNRESET => |err| return errnoBug(err), // Not a socket handle. + .BUSY => return error.DeviceBusy, + else => |err| return unexpectedErrno(err), + } } } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig @@ -282,40 +282,6 @@ test "Group.concurrent" { try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); } -test "select" { - const io = testing.io; - - var queue: Io.Queue(u8) = .init(&.{}); - - var get_a = io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }) catch |err| switch (err) { - error.ConcurrencyUnavailable => { - try testing.expect(builtin.single_threaded); - return; - }, - }; - defer _ = get_a.cancel(io) catch {}; - - var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }); - defer _ = get_b.cancel(io) catch {}; - - var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake }); - defer timeout.cancel(io) catch {}; - - switch (try io.select(.{ - .get_a = &get_a, - .get_b = &get_b, - .timeout = &timeout, - })) { - .get_a => return error.TestFailure, - .get_b => return error.TestFailure, - .timeout => { - queue.close(io); - try testing.expectError(error.Closed, get_a.await(io)); - try testing.expectError(error.Closed, get_b.await(io)); - }, - } -} - fn testQueue(comptime len: usize) !void { const io = testing.io; var buf: [len]usize = undefined; diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig @@ -201,6 +201,10 @@ pub fn enter(self: *IoUring, to_submit: u32, min_complete: u32, flags: u32) !u32 // The kernel believes our `self.fd` does not refer to an io_uring instance, // or the opcode is valid but not supported by this kernel (more likely): .OPNOTSUPP => return error.OpcodeNotSupported, + // The thread submitting the work is invalid. This may occur if IORING_ENTER_GETEVENTS + // and IORING_SETUP_DEFER_TASKRUN is set, but the submitting thread is not the thread + // that initially created or enabled the io_uring associated with fd. + .EXIST => return error.InvalidThread, // The operation was interrupted by a delivery of a signal before it could complete. // This can happen while waiting for events with IORING_ENTER_GETEVENTS: .INTR => return error.SignalInterrupt,