commit 5763f7dbcc65288d7d13236f82eec9a23d97bee7 (tree)
parent d9fc7fa04db797d7b27dab2d9d6f56f63848da76
Author: Jacob Young <jacobly0@users.noreply.github.com>
Date: Sun, 15 Feb 2026 05:51:42 -0500
std.Io.Evented: clean up supporting code for `select`
Diffstat:
3 files changed, 207 insertions(+), 210 deletions(-)
diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig
@@ -100,15 +100,11 @@ const Fiber = struct {
required_align: void align(4),
evented: *Evented,
context: Io.fiber.Context,
- await_count: i32,
link: union {
awaiter: ?*Fiber,
group: struct { prev: ?*Fiber, next: ?*Fiber },
},
- status: union(enum) {
- queue_next: ?*Fiber,
- awaiting_group: Group,
- },
+ awaiting_group: Group,
cancel_status: CancelStatus,
cancel_protection: CancelProtection,
@@ -215,7 +211,6 @@ const Fiber = struct {
}
fn destroy(fiber: *Fiber, ev: *Evented) void {
- assert(fiber.status.queue_next == null);
ev.allocator().free(fiber.allocatedSlice());
}
@@ -271,8 +266,8 @@ const Fiber = struct {
.group => {
// The awaiter received a cancelation request while awaiting a group,
// so propagate the cancelation to the group.
- if (fiber.status.awaiting_group.cancel(ev, null)) {
- fiber.status = .{ .queue_next = null };
+ if (fiber.awaiting_group.cancel(ev, null)) {
+ fiber.awaiting_group = undefined;
ev.queue.async(fiber, &Fiber.@"resume");
}
},
@@ -516,9 +511,8 @@ pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !v
.required_align = {},
.evented = ev,
.context = undefined,
- .await_count = 0,
.link = .{ .awaiter = null },
- .status = .{ .queue_next = null },
+ .awaiting_group = undefined,
.cancel_status = .unrequested,
.cancel_protection = .unblocked,
},
@@ -636,7 +630,7 @@ const SwitchMessage = struct {
const PendingTask = union(enum) {
nothing,
- await: u31,
+ await: *Fiber,
activate: c.dispatch.object_t,
@"resume": c.dispatch.object_t,
group_await: Group,
@@ -655,10 +649,10 @@ const SwitchMessage = struct {
thread.current_context = message.contexts.new;
switch (message.pending_task) {
.nothing => {},
- .await => |count| {
- const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
- if (@atomicRmw(i32, &fiber.await_count, .Sub, count, .monotonic) > 0)
- ev.queue.async(fiber, &Fiber.@"resume");
+ .await => |awaiting| {
+ const awaiter: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
+ if (@atomicRmw(?*Fiber, &awaiting.link.awaiter, .Xchg, awaiter, .acq_rel) ==
+ Fiber.finished) ev.queue.async(awaiter, &Fiber.@"resume");
},
.activate => |object| object.activate(),
.@"resume" => |object| object.@"resume"(),
@@ -992,7 +986,7 @@ fn crashHandler(userdata: ?*anyopaque) void {
}
const AsyncClosure = struct {
- ev: *Evented,
+ evented: *Evented,
fiber: *Fiber,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
result_align: Alignment,
@@ -1029,13 +1023,13 @@ const AsyncClosure = struct {
closure: *AsyncClosure,
message: *const SwitchMessage,
) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
- message.handle(closure.ev);
+ const ev = closure.evented;
const fiber = closure.fiber;
+ message.handle(ev);
closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
if (@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel)) |awaiter|
- if (@atomicRmw(i32, &awaiter.await_count, .Add, 1, .monotonic) == -1)
- closure.ev.queue.async(awaiter, &Fiber.@"resume");
- closure.ev.yield(.nothing);
+ ev.queue.async(awaiter, &Fiber.@"resume");
+ ev.yield(.nothing);
unreachable; // switched to dead fiber
}
};
@@ -1090,14 +1084,13 @@ fn concurrent(
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
- .await_count = 0,
.link = .{ .awaiter = null },
- .status = .{ .queue_next = null },
+ .awaiting_group = undefined,
.cancel_status = .unrequested,
.cancel_protection = .unblocked,
};
closure.* = .{
- .ev = ev,
+ .evented = ev,
.fiber = fiber,
.start = start,
.result_align = result_alignment,
@@ -1115,18 +1108,11 @@ fn await(
result_alignment: Alignment,
) void {
const ev: *Evented = @ptrCast(@alignCast(userdata));
- const fiber = Thread.current().currentFiber();
- const future_fiber: *Fiber = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, fiber, .acq_rel)) |awaiter| {
- assert(awaiter == Fiber.finished);
- } else while (true) {
- ev.yield(.{ .await = 1 });
- const awaiter = @atomicLoad(?*Fiber, &future_fiber.link.awaiter, .acquire);
- if (awaiter == Fiber.finished) break;
- assert(awaiter == fiber); // spurious wakeup
- }
- @memcpy(result, future_fiber.resultBytes(result_alignment));
- future_fiber.destroy(ev);
+ const awaiting: *Fiber = @ptrCast(@alignCast(future));
+ if (@atomicLoad(?*Fiber, &awaiting.link.awaiter, .acquire) != Fiber.finished)
+ ev.yield(.{ .await = awaiting });
+ @memcpy(result, awaiting.resultBytes(result_alignment));
+ awaiting.destroy(ev);
}
fn cancel(
@@ -1261,8 +1247,8 @@ const Group = struct {
.awaiter_delayed = false,
.fibers = .null,
}, .release);
- assert(awaiter.status.awaiting_group.ptr == group.ptr);
- awaiter.status = .{ .queue_next = null };
+ assert(awaiter.awaiting_group.ptr == group.ptr);
+ awaiter.awaiting_group = undefined;
return awaiter;
}
// Race with `Fiber.requestCancel`
@@ -1330,8 +1316,7 @@ const Group = struct {
/// Assumes the mutex is held.
fn registerAwaiter(group: Group, awaiter: *Fiber) bool {
- assert(awaiter.status.queue_next == null);
- awaiter.status = .{ .awaiting_group = group };
+ awaiter.awaiting_group = group;
assert(@atomicRmw(
Awaiter,
group.awaiterPtr(),
@@ -1343,7 +1328,7 @@ const Group = struct {
}
const AsyncClosure = struct {
- ev: *Evented,
+ evented: *Evented,
group: Group,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
@@ -1382,19 +1367,15 @@ const Group = struct {
closure: *Group.AsyncClosure,
message: *const SwitchMessage,
) callconv(.withStackAlign(.c, @alignOf(Group.AsyncClosure))) noreturn {
- message.handle(closure.ev);
- assert(closure.fiber.status.queue_next == null);
- const result = closure.start(closure.contextPointer());
- const ev = closure.ev;
- const group = closure.group;
+ const ev = closure.evented;
const fiber = closure.fiber;
- const cancel_acknowledged = fiber.cancel_protection.acknowledged;
- if (result) {
- assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
+ message.handle(ev);
+ if (closure.start(closure.contextPointer())) {
+ assert(!fiber.cancel_protection.acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
} else |err| switch (err) {
- error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled
+ error.Canceled => assert(fiber.cancel_protection.acknowledged), // group task returned `error.Canceled` but was never canceled
}
- if (group.removeFiber(ev, fiber)) |awaiter| ev.queue.async(awaiter, &Fiber.@"resume");
+ if (closure.group.removeFiber(ev, fiber)) |awaiter| ev.queue.async(awaiter, &Fiber.@"resume");
ev.yield(.destroy);
unreachable; // switched to dead fiber
}
@@ -1464,14 +1445,13 @@ fn groupConcurrent(
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
- .await_count = 0,
.link = .{ .group = .{ .prev = null, .next = null } },
- .status = .{ .queue_next = null },
+ .awaiting_group = undefined,
.cancel_status = .unrequested,
.cancel_protection = .unblocked,
};
closure.* = .{
- .ev = ev,
+ .evented = ev,
.group = group,
.fiber = fiber,
.start = start,
diff --git a/lib/std/Io/Uring.zig b/lib/std/Io/Uring.zig
@@ -150,7 +150,6 @@ const Thread = struct {
const Fiber = struct {
required_align: void align(4),
context: Io.fiber.Context,
- await_count: i32,
link: union {
awaiter: ?*Fiber,
group: struct { prev: ?*Fiber, next: ?*Fiber },
@@ -856,7 +855,6 @@ pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !v
main_fiber.* = .{
.required_align = {},
.context = undefined,
- .await_count = 0,
.link = .{ .awaiter = null },
.status = .{ .queue_next = null },
.cancel_status = .unrequested,
@@ -1260,7 +1258,7 @@ const SwitchMessage = struct {
const PendingTask = union(enum) {
nothing,
reschedule,
- await: u31,
+ await: *Fiber,
group_await: Group,
group_cancel: Group,
batch_await: *Io.Batch,
@@ -1284,10 +1282,11 @@ const SwitchMessage = struct {
assert(fiber.status.queue_next == null);
_ = ev.schedule(thread, .{ .head = fiber, .tail = fiber });
},
- .await => |count| {
- const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
- if (@atomicRmw(i32, &fiber.await_count, .Sub, count, .monotonic) > 0)
- _ = ev.schedule(thread, .{ .head = fiber, .tail = fiber });
+ .await => |awaiting| {
+ const awaiter: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
+ assert(awaiter.status.queue_next == null);
+ if (@atomicRmw(?*Fiber, &awaiting.link.awaiter, .Xchg, awaiter, .acq_rel) ==
+ Fiber.finished) _ = ev.schedule(thread, .{ .head = awaiter, .tail = awaiter });
},
.group_await => |group| {
const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
@@ -1361,7 +1360,7 @@ fn crashHandler(userdata: ?*anyopaque) void {
}
const AsyncClosure = struct {
- ev: *Evented,
+ evented: *Evented,
fiber: *Fiber,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
result_align: Alignment,
@@ -1398,16 +1397,11 @@ const AsyncClosure = struct {
closure: *AsyncClosure,
message: *const SwitchMessage,
) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
- message.handle(closure.ev);
+ const ev = closure.evented;
const fiber = closure.fiber;
+ message.handle(ev);
closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
- closure.ev.yield(
- if (@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel)) |awaiter|
- if (@atomicRmw(i32, &awaiter.await_count, .Add, 1, .monotonic) == -1) awaiter else null
- else
- null,
- .nothing,
- );
+ ev.yield(@atomicRmw(?*Fiber, &fiber.link.awaiter, .Xchg, Fiber.finished, .acq_rel), .nothing);
unreachable; // switched to dead fiber
}
};
@@ -1461,7 +1455,6 @@ fn concurrent(
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
- .await_count = 0,
.link = .{ .awaiter = null },
.status = .{ .queue_next = null },
.cancel_status = .unrequested,
@@ -1479,7 +1472,7 @@ fn concurrent(
},
};
closure.* = .{
- .ev = ev,
+ .evented = ev,
.fiber = fiber,
.start = start,
.result_align = result_alignment,
@@ -1498,18 +1491,11 @@ fn await(
result_alignment: Alignment,
) void {
const ev: *Evented = @ptrCast(@alignCast(userdata));
- const fiber = Thread.current().currentFiber();
- const future_fiber: *Fiber = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, fiber, .acq_rel)) |awaiter| {
- assert(awaiter == Fiber.finished);
- } else while (true) {
- ev.yield(null, .{ .await = 1 });
- const awaiter = @atomicLoad(?*Fiber, &future_fiber.link.awaiter, .acquire);
- if (awaiter == Fiber.finished) break;
- assert(awaiter == fiber); // spurious wakeup
- }
- @memcpy(result, future_fiber.resultBytes(result_alignment));
- future_fiber.destroy();
+ const awaiting: *Fiber = @ptrCast(@alignCast(future));
+ if (@atomicLoad(?*Fiber, &awaiting.link.awaiter, .acquire) != Fiber.finished)
+ ev.yield(null, .{ .await = awaiting });
+ @memcpy(result, awaiting.resultBytes(result_alignment));
+ awaiting.destroy();
}
fn cancel(
@@ -1726,7 +1712,7 @@ const Group = struct {
}
const AsyncClosure = struct {
- ev: *Evented,
+ evented: *Evented,
group: Group,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
@@ -1765,19 +1751,16 @@ const Group = struct {
closure: *Group.AsyncClosure,
message: *const SwitchMessage,
) callconv(.withStackAlign(.c, @alignOf(Group.AsyncClosure))) noreturn {
- message.handle(closure.ev);
- assert(closure.fiber.status.queue_next == null);
- const result = closure.start(closure.contextPointer());
- const ev = closure.ev;
- const group = closure.group;
+ const ev = closure.evented;
const fiber = closure.fiber;
- const cancel_acknowledged = fiber.cancel_protection.acknowledged;
- if (result) {
- assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
+ message.handle(ev);
+ assert(fiber.status.queue_next == null);
+ if (closure.start(closure.contextPointer())) {
+ assert(!fiber.cancel_protection.acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
} else |err| switch (err) {
- error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled
+ error.Canceled => assert(fiber.cancel_protection.acknowledged), // group task returned `error.Canceled` but was never canceled
}
- ev.yield(group.removeFiber(ev, fiber), .destroy);
+ ev.yield(closure.group.removeFiber(ev, fiber), .destroy);
unreachable; // switched to dead fiber
}
};
@@ -1845,7 +1828,6 @@ fn groupConcurrent(
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
- .await_count = 0,
.link = .{ .group = .{ .prev = null, .next = null } },
.status = .{ .queue_next = null },
.cancel_status = .unrequested,
@@ -1863,7 +1845,7 @@ fn groupConcurrent(
},
};
closure.* = .{
- .ev = ev,
+ .evented = ev,
.group = group,
.fiber = fiber,
.start = start,
@@ -2173,7 +2155,7 @@ fn deviceIoControl(
const rc = linux.ioctl(o.file.handle, @bitCast(o.code), @intFromPtr(o.arg));
switch (linux.errno(rc)) {
.SUCCESS => return @bitCast(@as(u32, @truncate(rc))),
- .INTR => continue,
+ .INTR => {},
else => |err| return -@as(i32, @intFromEnum(err)),
}
}
@@ -2571,7 +2553,7 @@ fn dirCreateDir(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.PERM => return error.PermissionDenied,
@@ -2654,7 +2636,7 @@ fn filePathKind(ev: *Evented, dir: Dir, sub_path: []const u8) !File.Kind {
if (!statx_buf.mask.TYPE) return error.Unexpected;
return statxKind(statx_buf.mode);
},
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.FAULT => |err| return errnoBug(err),
@@ -2767,7 +2749,7 @@ fn dirAccess(
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.faccessat(dir.handle, sub_path_posix, mode, flags))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.ROFS => return error.ReadOnlyFileSystem,
@@ -2806,7 +2788,7 @@ fn dirCreateFile(
.EXCL = flags.exclusive,
.CLOEXEC = true,
}, flags.permissions.toMode());
- errdefer ev.close(fd);
+ errdefer ev.closeAsync(fd);
switch (flags.lock) {
.none => {},
@@ -2994,7 +2976,7 @@ fn dirOpenFile(
.CLOEXEC = true,
.PATH = flags.path_only,
}, 0);
- errdefer ev.close(fd);
+ errdefer ev.closeAsync(fd);
if (!flags.allow_directory) {
const is_dir = is_dir: {
@@ -3048,7 +3030,7 @@ fn dirRead(userdata: ?*anyopaque, dr: *Dir.Reader, buffer: []Dir.Entry) Dir.Read
const rc = linux.getdents64(dr.dir.handle, dr.buffer.ptr, dr.buffer.len);
switch (linux.errno(rc)) {
.SUCCESS => break rc,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err), // Dir is invalid or was opened without iteration ability.
.FAULT => |err| return errnoBug(err),
.NOTDIR => |err| return errnoBug(err),
@@ -3141,7 +3123,7 @@ fn dirRealPathFile(
error.FileLocksUnsupported => return errnoBug(.OPNOTSUPP), // Not asking for locks.
else => |e| return e,
};
- defer ev.close(fd);
+ defer ev.closeAsync(fd);
return ev.realPath(try maybe_sync.enterSync(ev), fd, out_buffer);
}
@@ -3174,7 +3156,7 @@ fn dirDeleteFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8) Dir.Dele
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.PERM => return error.PermissionDenied,
.ACCES => return error.AccessDenied,
.BUSY => return error.FileBusy,
@@ -3226,7 +3208,7 @@ fn dirDeleteDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8) Dir.Delet
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.BUSY => return error.FileBusy,
@@ -3342,7 +3324,7 @@ fn dirSymLink(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
@@ -3381,7 +3363,7 @@ fn dirReadLink(
const rc = linux.readlinkat(dir.handle, sub_path_posix, buffer.ptr, buffer.len);
switch (linux.errno(rc)) {
.SUCCESS => return @bitCast(rc),
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.NotLink,
@@ -3571,7 +3553,7 @@ fn fileLength(userdata: ?*anyopaque, file: File) File.LengthError!u64 {
if (!statx_buf.mask.SIZE) return error.Unexpected;
return statx_buf.size;
},
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.FAULT => |err| return errnoBug(err),
@@ -3754,7 +3736,7 @@ fn fileSync(userdata: ?*anyopaque, file: File) File.SyncError!void {
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.ROFS => |err| return errnoBug(err),
@@ -3776,7 +3758,7 @@ fn fileIsTty(userdata: ?*anyopaque, file: File) Io.Cancelable!bool {
const rc = linux.ioctl(file.handle, linux.T.IOCGWINSZ, @intFromPtr(&wsz));
switch (linux.errno(rc)) {
.SUCCESS => return true,
- .INTR => continue,
+ .INTR => {},
else => return false,
}
}
@@ -3812,7 +3794,7 @@ fn fileSetLength(userdata: ?*anyopaque, file: File, length: u64) File.SetLengthE
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.FBIG => return error.FileTooBig,
.IO => return error.InputOutput,
.PERM => return error.PermissionDenied,
@@ -3994,7 +3976,7 @@ fn fileMemoryMapCreate(
const rc = linux.mmap(null, options.len, prot, flags, file.handle, casted_offset);
switch (linux.errno(rc)) {
.SUCCESS => break @as([*]align(page_align) u8, @ptrFromInt(rc))[0..options.len],
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.AGAIN => return error.LockedMemoryLimitExceeded,
.MFILE => return error.ProcessFdQuotaExceeded,
@@ -4054,7 +4036,7 @@ fn fileMemoryMapSetLength(
const rc = linux.mremap(old_memory.ptr, old_memory.len, new_len, flags, addr_hint);
switch (linux.errno(rc)) {
.SUCCESS => break @as([*]align(page_align) u8, @ptrFromInt(rc))[0..new_len],
- .INTR => continue,
+ .INTR => {},
.AGAIN => return error.LockedMemoryLimitExceeded,
.NOMEM => return error.OutOfMemory,
.INVAL => |err| return errnoBug(err),
@@ -4163,7 +4145,7 @@ fn processCurrentPath(userdata: ?*anyopaque, buffer: []u8) process.CurrentPathEr
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.getcwd(buffer.ptr, buffer.len))) {
.SUCCESS => return std.mem.findScalar(u8, buffer, 0).?,
- .INTR => continue,
+ .INTR => {},
.NOENT => return error.CurrentDirUnlinked,
.RANGE => return error.NameTooLong,
.FAULT => |err| return errnoBug(err),
@@ -4178,7 +4160,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr
if (dir.handle == linux.AT.FDCWD) return;
var sync: CancelRegion.Sync = try .init(ev);
defer sync.deinit(ev);
- return ev.fchdir(&sync, dir.handle);
+ return fchdir(&sync, dir.handle);
}
fn processSetCurrentPath(userdata: ?*anyopaque, dir_path: []const u8) ChdirError!void {
@@ -4215,7 +4197,7 @@ fn processReplace(userdata: ?*anyopaque, options: process.ReplaceOptions) proces
var sync: CancelRegion.Sync = try .init(ev);
defer sync.deinit(ev);
- return ev.execv(&sync, options.expand_arg0, argv_buf.ptr[0].?, argv_buf.ptr, env_block, PATH);
+ return execv(&sync, options.expand_arg0, argv_buf.ptr[0].?, argv_buf.ptr, env_block, PATH);
}
fn processReplacePath(
@@ -4235,7 +4217,7 @@ fn processSpawn(userdata: ?*anyopaque, options: process.SpawnOptions) process.Sp
const spawned = try ev.spawn(options);
var cancel_region: CancelRegion = .initBlocked();
defer cancel_region.deinit();
- defer ev.close(spawned.err_fd);
+ defer ev.closeAsync(spawned.err_fd);
// Wait for the child to report any errors in or before `execvpe`.
var child_err: ForkBailError = undefined;
@@ -4377,8 +4359,9 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned
if (pid_result == 0) {
defer comptime unreachable; // We are the child.
+ // Note that the parent uring is no longer accessible, so we must no longer reference `ev`.
var sync: CancelRegion.Sync = .{ .cancel_region = .initBlocked() };
- const err = ev.setUpChild(&sync, .{
+ const err = setUpChild(&sync, .{
.stdin_pipe = stdin_pipe[0],
.stdout_pipe = stdout_pipe[1],
.stderr_pipe = stderr_pipe[1],
@@ -4389,7 +4372,7 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned
.PATH = PATH,
.spawn = options,
});
- ev.writeAll(&sync.cancel_region, err_pipe[1], @ptrCast(&err)) catch {};
+ writeAllSync(&sync, err_pipe[1], @ptrCast(&err)) catch {};
const exit = if (builtin.single_threaded) linux.exit else linux.exit_group;
exit(1);
}
@@ -4397,13 +4380,13 @@ fn spawn(ev: *Evented, options: process.SpawnOptions) process.SpawnError!Spawned
const pid: pid_t = @intCast(pid_result); // We are the parent.
errdefer comptime unreachable; // The child is forked; we must not error from now on
- ev.close(err_pipe[1]); // make sure only the child holds the write end open
+ ev.closeAsync(err_pipe[1]); // make sure only the child holds the write end open
- if (options.stdin == .pipe) ev.close(stdin_pipe[0]);
- if (options.stdout == .pipe) ev.close(stdout_pipe[1]);
- if (options.stderr == .pipe) ev.close(stderr_pipe[1]);
+ if (options.stdin == .pipe) ev.closeAsync(stdin_pipe[0]);
+ if (options.stdout == .pipe) ev.closeAsync(stdout_pipe[1]);
+ if (options.stderr == .pipe) ev.closeAsync(stderr_pipe[1]);
- if (prog_pipe[1] != -1) ev.close(prog_pipe[1]);
+ if (prog_pipe[1] != -1) ev.closeAsync(prog_pipe[1]);
options.progress_node.setIpcFile(ev, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } });
@@ -4440,14 +4423,14 @@ pub fn pipe2(flags: linux.O) PipeError![2]fd_t {
}
}
fn destroyPipe(ev: *Evented, pipe: [2]fd_t) void {
- if (pipe[0] != -1) ev.close(pipe[0]);
- if (pipe[0] != pipe[1]) ev.close(pipe[1]);
+ if (pipe[0] != -1) ev.closeAsync(pipe[0]);
+ if (pipe[0] != pipe[1]) ev.closeAsync(pipe[1]);
}
/// Errors that can occur between fork() and execv()
const ForkBailError = process.SetCurrentDirError || ChdirError ||
process.SpawnError || process.ReplaceError;
-fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct {
+fn setUpChild(sync: *CancelRegion.Sync, options: struct {
stdin_pipe: fd_t,
stdout_pipe: fd_t,
stderr_pipe: fd_t,
@@ -4458,21 +4441,21 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct {
PATH: []const u8,
spawn: process.SpawnOptions,
}) ForkBailError {
- try ev.setUpChildIo(
+ try setUpChildIo(
sync,
options.spawn.stdin,
options.stdin_pipe,
linux.STDIN_FILENO,
options.dev_null_fd,
);
- try ev.setUpChildIo(
+ try setUpChildIo(
sync,
options.spawn.stdout,
options.stdout_pipe,
linux.STDOUT_FILENO,
options.dev_null_fd,
);
- try ev.setUpChildIo(
+ try setUpChildIo(
sync,
options.spawn.stderr,
options.stderr_pipe,
@@ -4482,17 +4465,17 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct {
switch (options.spawn.cwd) {
.inherit => {},
- .dir => |cwd_dir| try ev.fchdir(sync, cwd_dir.handle),
+ .dir => |cwd_dir| try fchdir(sync, cwd_dir.handle),
.path => |cwd_path| {
var cwd_path_buffer: [PATH_MAX]u8 = undefined;
const cwd_path_posix = try pathToPosix(cwd_path, &cwd_path_buffer);
- try ev.chdir(sync, cwd_path_posix);
+ try chdir(sync, cwd_path_posix);
},
}
// Must happen after fchdir above, the cwd file descriptor might be
// equal to prog_fileno and be clobbered by this dup2 call.
- if (options.prog_pipe != -1) try ev.dup2(sync, options.prog_pipe, prog_fileno);
+ if (options.prog_pipe != -1) try dup2(sync, options.prog_pipe, prog_fileno);
if (options.spawn.gid) |gid| {
switch (linux.errno(linux.setregid(gid, gid))) {
@@ -4532,7 +4515,7 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct {
}
}
- return ev.execv(
+ return execv(
sync,
options.spawn.expand_arg0,
options.argv_buf.ptr[0].?,
@@ -4543,7 +4526,6 @@ fn setUpChild(ev: *Evented, sync: *CancelRegion.Sync, options: struct {
}
fn setUpChildIo(
- ev: *Evented,
sync: *CancelRegion.Sync,
stdio: process.SpawnOptions.StdIo,
pipe_fd: fd_t,
@@ -4551,13 +4533,13 @@ fn setUpChildIo(
dev_null_fd: fd_t,
) !void {
switch (stdio) {
- .pipe => try ev.dup2(sync, pipe_fd, std_fileno),
+ .pipe => try dup2(sync, pipe_fd, std_fileno),
.close => _ = linux.close(std_fileno),
.inherit => {},
- .ignore => try ev.dup2(sync, dev_null_fd, std_fileno),
+ .ignore => try dup2(sync, dev_null_fd, std_fileno),
.file => |file| {
if (file.flags.nonblocking) @panic("TODO implement setUpChildIo when nonblocking file is used");
- try ev.dup2(sync, file.handle, std_fileno);
+ try dup2(sync, file.handle, std_fileno);
},
}
}
@@ -4566,13 +4548,12 @@ pub const DupError = error{
ProcessFdQuotaExceeded,
SystemResources,
} || Io.UnexpectedError || Io.Cancelable;
-pub fn dup2(ev: *Evented, sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t) DupError!void {
- _ = ev;
+pub fn dup2(sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t) DupError!void {
while (true) {
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.dup2(old_fd, new_fd))) {
.SUCCESS => {},
- .BUSY, .INTR => continue,
+ .BUSY, .INTR => {},
.INVAL => |err| return errnoBug(err), // invalid parameters
.BADF => |err| return errnoBug(err), // use after free
.MFILE => return error.ProcessFdQuotaExceeded,
@@ -4583,7 +4564,6 @@ pub fn dup2(ev: *Evented, sync: *CancelRegion.Sync, old_fd: fd_t, new_fd: fd_t)
}
fn execv(
- ev: *Evented,
sync: *CancelRegion.Sync,
arg0_expand: process.ArgExpansion,
file: [*:0]const u8,
@@ -4592,7 +4572,8 @@ fn execv(
PATH: []const u8,
) process.ReplaceError {
const file_slice = std.mem.sliceTo(file, 0);
- if (std.mem.findScalar(u8, file_slice, '/') != null) return ev.execvPath(sync, file, child_argv, env_block);
+ if (std.mem.findScalar(u8, file_slice, '/') != null)
+ return execvPath(sync, file, child_argv, env_block);
// Use of PATH_MAX here is valid as the path_buf will be passed
// directly to the operating system in posixExecvPath.
@@ -4620,7 +4601,7 @@ fn execv(
.expand => child_argv[0] = full_path,
.no_expand => {},
}
- err = ev.execvPath(sync, full_path, child_argv, env_block);
+ err = execvPath(sync, full_path, child_argv, env_block);
switch (err) {
error.AccessDenied => seen_eacces = true,
error.FileNotFound, error.NotDir => {},
@@ -4632,13 +4613,11 @@ fn execv(
}
/// This function ignores PATH environment variable.
pub fn execvPath(
- ev: *Evented,
sync: *CancelRegion.Sync,
path: [*:0]const u8,
child_argv: [*:null]const ?[*:0]const u8,
env_block: process.Environ.PosixBlock,
) process.ReplaceError {
- _ = ev;
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.execve(path, child_argv, env_block.slice.ptr))) {
.FAULT => |err| return errnoBug(err), // Bad pointer parameter.
@@ -4709,7 +4688,7 @@ fn childWait(userdata: ?*anyopaque, child: *process.Child) process.Child.WaitErr
child.resource_usage_statistics.rusage = rusage;
break;
},
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.CHILD => |err| return errnoBug(err), // Double-free.
else => |err| return unexpectedErrno(err),
}
@@ -4724,7 +4703,7 @@ fn childWait(userdata: ?*anyopaque, child: *process.Child) process.Child.WaitErr
_, .CONTINUED => .{ .unknown = status },
};
},
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.CHILD => |err| return errnoBug(err), // Double-free.
else => |err| return unexpectedErrno(err),
}
@@ -4741,7 +4720,7 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void {
const pid = child.id.?;
while (true) switch (linux.errno(linux.kill(pid, .TERM))) {
.SUCCESS => break,
- .INTR => continue,
+ .INTR => {},
.PERM => return,
.INVAL => |err| return errnoBug(err) catch {},
.SRCH => |err| return errnoBug(err) catch {},
@@ -4773,7 +4752,7 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void {
ev.yield(null, .nothing);
switch (maybe_sync.cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.CHILD => |err| return errnoBug(err) catch {}, // Double-free.
else => |err| return unexpectedErrno(err) catch {},
}
@@ -4782,15 +4761,15 @@ fn childKill(userdata: ?*anyopaque, child: *process.Child) void {
fn childCleanup(ev: *Evented, child: *process.Child) void {
if (child.stdin) |*stdin| {
- ev.close(stdin.handle);
+ ev.closeAsync(stdin.handle);
child.stdin = null;
}
if (child.stdout) |*stdout| {
- ev.close(stdout.handle);
+ ev.closeAsync(stdout.handle);
child.stdout = null;
}
if (child.stderr) |*stderr| {
- ev.close(stderr.handle);
+ ev.closeAsync(stderr.handle);
child.stderr = null;
}
child.id = null;
@@ -4896,14 +4875,10 @@ fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void {
.resv = 0,
};
ev.yield(null, .nothing);
- switch (cancel_region.errno()) {
- // Handles SUCCESS as well as clock not available and unexpected
- // errors. The user had a chance to check clock resolution before
- // getting here, which would have reported 0, making this a legal
- // amount of time to sleep.
- else => return,
- .INTR, .CANCELED => return error.Canceled,
- }
+ // Handles SUCCESS as well as clock not available and unexpected
+ // errors. The user had a chance to check clock resolution before
+ // getting here, which would have reported 0, making this a legal
+ // amount of time to sleep.
}
fn random(userdata: ?*anyopaque, buffer: []u8) void {
@@ -4980,7 +4955,7 @@ fn netBindIp(
var maybe_sync: CancelRegion.Sync.Maybe = .{ .cancel_region = .init() };
defer maybe_sync.deinit(ev);
const socket_fd = try ev.socket(&maybe_sync.cancel_region, family, options);
- errdefer ev.close(socket_fd);
+ errdefer ev.closeAsync(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
try ev.bind(&maybe_sync.cancel_region, socket_fd, &storage.any, addr_len);
@@ -5147,23 +5122,19 @@ fn netReceive(
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
- .eor = (msg.flags & linux.MSG.EOR) != 0,
- .trunc = (msg.flags & linux.MSG.TRUNC) != 0,
- .ctrunc = (msg.flags & linux.MSG.CTRUNC) != 0,
- .oob = (msg.flags & linux.MSG.OOB) != 0,
- .errqueue = if (@hasDecl(linux.MSG, "ERRQUEUE")) (msg.flags & linux.MSG.ERRQUEUE) != 0 else false,
+ .eor = msg.flags & linux.MSG.EOR != 0,
+ .trunc = msg.flags & linux.MSG.TRUNC != 0,
+ .ctrunc = msg.flags & linux.MSG.CTRUNC != 0,
+ .oob = msg.flags & linux.MSG.OOB != 0,
+ .errqueue = msg.flags & linux.MSG.ERRQUEUE != 0,
},
};
message_i += 1;
continue;
},
.AGAIN => unreachable,
- .INTR, .CANCELED => {
- if (deadline) |d| {
- if (now(ev, d.clock).nanoseconds >= d.raw.nanoseconds) return .{ error.Timeout, message_i };
- }
- continue;
- },
+ .INTR, .CANCELED => if (deadline) |d| if (now(ev, d.clock).nanoseconds >= d.raw.nanoseconds)
+ return .{ error.Timeout, message_i },
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
@@ -5266,7 +5237,7 @@ fn netShutdown(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.BADF, .NOTSOCK, .INVAL => |err| return errnoBug(err),
.NOTCONN => return error.SocketUnconnected,
.NOBUFS => return error.SystemResources,
@@ -5336,7 +5307,7 @@ fn bind(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.INVAL => |err| return errnoBug(err), // invalid parameters
@@ -5350,13 +5321,12 @@ fn bind(
}
}
-fn chdir(ev: *Evented, sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void {
- _ = ev;
+fn chdir(sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void {
while (true) {
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.chdir(path))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.IO => return error.FileSystem,
.LOOP => return error.SymLinkLoop,
@@ -5372,6 +5342,36 @@ fn chdir(ev: *Evented, sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError
}
fn close(ev: *Evented, fd: fd_t) void {
+ var cancel_region: CancelRegion = .initBlocked();
+ defer cancel_region.deinit();
+ const thread = cancel_region.awaitIoUring() catch |err| switch (err) {
+ error.Canceled => unreachable, // blocked
+ };
+ thread.enqueue().* = .{
+ .opcode = .CLOSE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = fd,
+ .off = 0,
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromPtr(cancel_region.fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ ev.yield(null, .nothing);
+ switch (cancel_region.errno()) {
+ .BADF => recoverableOsBugDetected(), // Always a race condition.
+ .INTR => {}, // This is still a success. See https://github.com/ziglang/zig/issues/2425
+ else => {},
+ }
+}
+
+fn closeAsync(ev: *Evented, fd: fd_t) void {
_ = ev;
const thread: *Thread = .current();
thread.enqueue().* = .{
@@ -5392,14 +5392,13 @@ fn close(ev: *Evented, fd: fd_t) void {
};
}
-fn fchdir(ev: *Evented, sync: *CancelRegion.Sync, dir: fd_t) process.SetCurrentDirError!void {
- _ = ev;
+fn fchdir(sync: *CancelRegion.Sync, dir: fd_t) process.SetCurrentDirError!void {
if (dir == linux.AT.FDCWD) return;
while (true) {
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.fchdir(dir))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.NOTDIR => return error.NotDir,
.IO => return error.FileSystem,
@@ -5422,7 +5421,7 @@ fn fchmodat(
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.fchmodat2(dir, path, mode, flags))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
@@ -5454,7 +5453,7 @@ fn fchownat(
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.fchownat(dir, path, owner, group, flags))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err), // likely fd refers to directory opened without `Dir.OpenOptions.iterate`
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
@@ -5486,7 +5485,7 @@ fn flock(
.exclusive => LOCK.EX,
})))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOLCK => return error.SystemResources,
@@ -5536,7 +5535,7 @@ fn getsockname(
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.getsockname(socket_fd, addr, addr_len))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
@@ -5577,7 +5576,7 @@ fn linkat(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => return error.AccessDenied,
.DQUOT => return error.DiskQuota,
.EXIST => return error.PathAlreadyExists,
@@ -5617,7 +5616,7 @@ fn lseek(
8 => linux.lseek(fd, @bitCast(offset), whence),
})) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
@@ -5660,7 +5659,7 @@ fn openat(
const completion = cancel_region.completion();
switch (completion.errno()) {
.SUCCESS => return completion.result,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
@@ -5722,7 +5721,7 @@ fn preadv(
const completion = cancel_region.completion();
switch (completion.errno()) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => return error.WouldBlock,
@@ -5769,7 +5768,7 @@ fn pwritev(
const completion = cancel_region.completion();
switch (completion.errno()) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => return error.WouldBlock,
@@ -5819,7 +5818,7 @@ fn realPath(
const rc = linux.readlink(proc_path, out_buffer.ptr, out_buffer.len);
switch (linux.errno(rc)) {
.SUCCESS => return rc,
- .INTR => continue,
+ .INTR => {},
.ACCES => return error.AccessDenied,
.FAULT => |err| return errnoBug(err),
.IO => return error.FileSystem,
@@ -5864,7 +5863,7 @@ fn renameat(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.BUSY => return error.FileBusy,
@@ -5931,7 +5930,7 @@ fn setsockopt(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.NOTSOCK => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
@@ -5982,7 +5981,7 @@ fn socket(
const completion = cancel_region.completion();
switch (completion.errno()) {
.SUCCESS => break completion.result,
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.INVAL => return error.ProtocolUnsupportedBySystem,
.MFILE => return error.ProcessFdQuotaExceeded,
@@ -5994,7 +5993,7 @@ fn socket(
else => |err| return unexpectedErrno(err),
}
};
- errdefer ev.close(socket_fd);
+ errdefer ev.closeAsync(socket_fd);
if (options.ip6_only) {
if (linux.IPV6 == void) return error.OptionUnsupported;
@@ -6044,7 +6043,7 @@ fn statx(
ev.yield(null, .nothing);
switch (cancel_region.errno()) {
.SUCCESS => return statFromLinux(&statx_buf),
- .INTR, .CANCELED => continue,
+ .INTR, .CANCELED => {},
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.FAULT => |err| return errnoBug(err),
@@ -6083,7 +6082,7 @@ fn utimensat(
try sync.cancel_region.await(.nothing);
switch (linux.errno(linux.utimensat(dir, path, times, flags))) {
.SUCCESS => return,
- .INTR => continue,
+ .INTR => {},
.BADF => |err| return errnoBug(err), // always a race condition
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
@@ -6095,19 +6094,33 @@ fn utimensat(
}
}
-fn writeAll(
- ev: *Evented,
- cancel_region: *CancelRegion,
- fd: fd_t,
- buffer: []const u8,
-) (File.Writer.Error || error{EndOfStream})!void {
+fn writeAllSync(sync: *CancelRegion.Sync, fd: fd_t, buffer: []const u8) File.Writer.Error!void {
var index: usize = 0;
- while (buffer.len - index != 0) {
- const len = try ev.pwritev(cancel_region, fd, &.{
- .{ .base = buffer[index..].ptr, .len = buffer.len - index },
- }, null);
- if (len == 0) return error.EndOfStream;
- index += len;
+ while (buffer.len - index != 0) index += try writeSync(sync, fd, buffer[index..]);
+}
+
+fn writeSync(sync: *CancelRegion.Sync, fd: fd_t, buffer: []const u8) File.Writer.Error!usize {
+ while (true) {
+ try sync.cancel_region.await(.nothing);
+ const rc = linux.write(fd, buffer.ptr, buffer.len);
+ switch (linux.errno(rc)) {
+ .SUCCESS => return @intCast(rc),
+ .INTR => {},
+ .INVAL => |err| return errnoBug(err),
+ .FAULT => |err| return errnoBug(err),
+ .AGAIN => return error.WouldBlock,
+ .BADF => return error.NotOpenForWriting, // Can be a race condition.
+ .DESTADDRREQ => |err| return errnoBug(err), // `connect` was never called.
+ .DQUOT => return error.DiskQuota,
+ .FBIG => return error.FileTooBig,
+ .IO => return error.InputOutput,
+ .NOSPC => return error.NoSpaceLeft,
+ .PERM => return error.PermissionDenied,
+ .PIPE => return error.BrokenPipe,
+ .CONNRESET => |err| return errnoBug(err), // Not a socket handle.
+ .BUSY => return error.DeviceBusy,
+ else => |err| return unexpectedErrno(err),
+ }
}
}
diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig
@@ -201,6 +201,10 @@ pub fn enter(self: *IoUring, to_submit: u32, min_complete: u32, flags: u32) !u32
// The kernel believes our `self.fd` does not refer to an io_uring instance,
// or the opcode is valid but not supported by this kernel (more likely):
.OPNOTSUPP => return error.OpcodeNotSupported,
+ // The thread submitting the work is invalid. This may occur if IORING_ENTER_GETEVENTS
+ // and IORING_SETUP_DEFER_TASKRUN is set, but the submitting thread is not the thread
+ // that initially created or enabled the io_uring associated with fd.
+ .EXIST => return error.InvalidThread,
// The operation was interrupted by a delivery of a signal before it could complete.
// This can happen while waiting for events with IORING_ENTER_GETEVENTS:
.INTR => return error.SignalInterrupt,