commit e5454ff780ae4571cfa71a2edb6f4287eb8cf4de (tree)
parent 3abc96a601d2349cc1743774f0cebb2eb0ea0c61
Author: Andrew Kelley <andrew@ziglang.org>
Date: Sun, 1 Feb 2026 07:56:09 +0100
Merge pull request 'std.Io: move fileWriteStreaming to Operation' (#31065) from more-poll into master
Reviewed-on: https://codeberg.org/ziglang/zig/pulls/31065
Diffstat:
5 files changed, 219 insertions(+), 73 deletions(-)
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
@@ -184,7 +184,6 @@ pub const VTable = struct {
fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
fileLength: *const fn (?*anyopaque, File) File.LengthError!u64,
fileClose: *const fn (?*anyopaque, []const File) void,
- fileWriteStreaming: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize) File.Writer.Error!usize,
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
@@ -257,6 +256,7 @@ pub const VTable = struct {
pub const Operation = union(enum) {
file_read_streaming: FileReadStreaming,
+ file_write_streaming: FileWriteStreaming,
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
@@ -287,7 +287,41 @@ pub const Operation = union(enum) {
LockViolation,
} || Io.UnexpectedError;
- pub const Result = usize;
+ pub const Result = Error!usize;
+ };
+
+ pub const FileWriteStreaming = struct {
+ file: File,
+ header: []const u8 = &.{},
+ data: []const []const u8,
+ splat: usize = 1,
+
+ pub const Error = error{
+ DiskQuota,
+ FileTooBig,
+ InputOutput,
+ NoSpaceLeft,
+ DeviceBusy,
+ /// File descriptor does not hold the required rights to write to it.
+ AccessDenied,
+ PermissionDenied,
+ /// File is an unconnected socket, or closed its read end.
+ BrokenPipe,
+ /// Insufficient kernel memory to read from in_fd.
+ SystemResources,
+ NotOpenForWriting,
+ /// The process cannot access the file because another process has locked
+ /// a portion of the file. Windows-only.
+ LockViolation,
+ /// Non-blocking has been enabled and this operation would block.
+ WouldBlock,
+ /// This error occurs when a device gets disconnected before or mid-flush
+ /// while it's being written to - errno(6): No such device or address.
+ NoDevice,
+ FileBusy,
+ } || Io.UnexpectedError;
+
+ pub const Result = Error!usize;
};
pub const Result = Result: {
@@ -296,7 +330,7 @@ pub const Operation = union(enum) {
var field_types: [operation_fields.len]type = undefined;
for (operation_fields, &field_names, &field_types) |field, *field_name, *field_type| {
field_name.* = field.name;
- field_type.* = field.type.Error!field.type.Result;
+ field_type.* = field.type.Result;
}
break :Result @Union(.auto, Tag, &field_names, &field_types, &@splat(.{}));
};
diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig
@@ -572,16 +572,16 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
-/// Returns 0 on stream end or if `buffer` has no space available for data.
+/// May return fewer bytes than buffer space available, including 0.
+/// End-of-stream is indicated by `error.EndOfStream`.
///
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
- const result = try io.operate(.{ .file_read_streaming = .{
+ return (try io.operate(.{ .file_read_streaming = .{
.file = file,
.data = buffer,
- } });
- return result.file_read_streaming;
+ } })).file_read_streaming;
}
pub const ReadPositionalError = error{
@@ -714,11 +714,22 @@ pub fn writerStreaming(file: File, io: Io, buffer: []u8) Writer {
return .initStreaming(file, io, buffer);
}
+/// This is a low-level API that calls the `Io` interface function directly.
+/// For a higher level API, see `writerStreaming`.
+pub fn writeStreaming(file: File, io: Io, header: []const u8, data: []const []const u8, splat: usize) Writer.Error!usize {
+ return (try io.operate(.{ .file_write_streaming = .{
+ .file = file,
+ .header = header,
+ .data = data,
+ .splat = splat,
+ } })).file_write_streaming;
+}
+
/// Equivalent to creating a streaming writer, writing `bytes`, and then flushing.
pub fn writeStreamingAll(file: File, io: Io, bytes: []const u8) Writer.Error!void {
var index: usize = 0;
while (index < bytes.len) {
- index += try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{bytes[index..]}, 1);
+ index += try writeStreaming(file, io, &.{}, &.{bytes[index..]}, 1);
}
}
diff --git a/lib/std/Io/File/Writer.zig b/lib/std/Io/File/Writer.zig
@@ -20,30 +20,7 @@ interface: Io.Writer,
pub const Mode = File.Reader.Mode;
-pub const Error = error{
- DiskQuota,
- FileTooBig,
- InputOutput,
- NoSpaceLeft,
- DeviceBusy,
- /// File descriptor does not hold the required rights to write to it.
- AccessDenied,
- PermissionDenied,
- /// File is an unconnected socket, or closed its read end.
- BrokenPipe,
- /// Insufficient kernel memory to read from in_fd.
- SystemResources,
- NotOpenForWriting,
- /// The process cannot access the file because another process has locked
- /// a portion of the file. Windows-only.
- LockViolation,
- /// Non-blocking has been enabled and this operation would block.
- WouldBlock,
- /// This error occurs when a device gets disconnected before or mid-flush
- /// while it's being written to - errno(6): No such device or address.
- NoDevice,
- FileBusy,
-} || Io.Cancelable || Io.UnexpectedError;
+pub const Error = Io.Operation.FileWriteStreaming.Error || Io.Cancelable;
pub const WriteFileError = Error || error{
/// Descriptor is not valid or locked, or an mmap(2)-like operation is not available for in_fd.
@@ -146,7 +123,7 @@ fn drainPositional(w: *Writer, data: []const []const u8, splat: usize) Io.Writer
fn drainStreaming(w: *Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const io = w.io;
const header = w.interface.buffered();
- const n = io.vtable.fileWriteStreaming(io.userdata, w.file, header, data, splat) catch |err| {
+ const n = w.file.writeStreaming(io, header, data, splat) catch |err| {
w.err = err;
return error.WriteFailed;
};
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -1649,7 +1649,6 @@ pub fn io(t: *Threaded) Io {
.fileStat = fileStat,
.fileLength = fileLength,
.fileClose = fileClose,
- .fileWriteStreaming = fileWriteStreaming,
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
@@ -1813,7 +1812,6 @@ pub fn ioBasic(t: *Threaded) Io {
.fileStat = fileStat,
.fileLength = fileLength,
.fileClose = fileClose,
- .fileWriteStreaming = fileWriteStreaming,
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
@@ -2496,6 +2494,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
else => |e| e,
},
},
+ .file_write_streaming => |o| return .{
+ .file_write_streaming = fileWriteStreaming(t, o.file, o.header, o.data, o.splat) catch |err| switch (err) {
+ error.Canceled => |e| return e,
+ else => |e| e,
+ },
+ },
}
}
@@ -2523,6 +2527,10 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
+ .file_write_streaming => |o| {
+ poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.OUT, .revents = 0 };
+ poll_len += 1;
+ },
}
index = submission.node.next;
}
@@ -2687,6 +2695,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN),
+ .file_write_streaming => |o| try poll_storage.add(o.file, posix.POLL.OUT),
}
index = submission.node.next;
}
@@ -2864,6 +2873,7 @@ fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows
b.completions.tail = .fromIndex(index);
const result: Io.Operation.Result = switch (pending.tag) {
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
+ .file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
@@ -2957,12 +2967,87 @@ fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, Concurren
};
}
},
+ .file_write_streaming => |o| o: {
+ const buffer = windowsWriteBuffer(o.header, o.data, o.splat);
+ if (buffer.len == 0) {
+ context.iosb = .{
+ .u = .{ .Status = .SUCCESS },
+ .Information = 0,
+ };
+ batchApc(b, &context.iosb, 0);
+ break :o;
+ }
+ if (o.file.flags.nonblocking) {
+ context.file = o.file.handle;
+ switch (windows.ntdll.NtWriteFile(
+ o.file.handle,
+ null, // event
+ &batchApc,
+ b,
+ &context.iosb,
+ buffer.ptr,
+ @intCast(buffer.len),
+ null, // byte offset
+ null, // key
+ )) {
+ .PENDING, .SUCCESS => {},
+ .CANCELLED => unreachable,
+ else => |status| {
+ context.iosb.u.Status = status;
+ batchApc(b, &context.iosb, 0);
+ },
+ }
+ } else {
+ if (concurrency) return error.ConcurrencyUnavailable;
+
+ const syscall: Syscall = try .start();
+ while (true) switch (windows.ntdll.NtWriteFile(
+ o.file.handle,
+ null, // event
+ null, // APC routine
+ null, // APC context
+ &context.iosb,
+ buffer.ptr,
+ @intCast(buffer.len),
+ null, // byte offset
+ null, // key
+ )) {
+ .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
+ .CANCELLED => {
+ try syscall.checkCancel();
+ continue;
+ },
+ else => |status| {
+ syscall.finish();
+
+ context.iosb.u.Status = status;
+ batchApc(b, &context.iosb, 0);
+ break;
+ },
+ };
+ }
+ },
}
index = submission.node.next;
}
b.submissions = .{ .head = .none, .tail = .none };
}
+/// Since Windows only supports writing one contiguous buffer, returns the
+/// first one, while also limiting it to a length representable by 32-bit
+/// unsigned integer.
+fn windowsWriteBuffer(header: []const u8, data: []const []const u8, splat: usize) []const u8 {
+ const buffer = b: {
+ if (header.len != 0) break :b header;
+ for (data[0 .. data.len - 1]) |buffer| {
+ if (buffer.len != 0) break :b buffer;
+ }
+ if (splat == 0) return &.{};
+ break :b data[data.len - 1];
+ };
+ return buffer[0..@min(buffer.len, std.math.maxInt(u32))];
+}
+
fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
const ct = complete_tail.*;
const len: u31 = @intCast(ring.len);
@@ -9005,6 +9090,24 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
}
}
+fn ntWriteFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
+ switch (io_status_block.u.Status) {
+ .PENDING => unreachable,
+ .CANCELLED => unreachable,
+ .SUCCESS => return io_status_block.Information,
+ .INVALID_USER_BUFFER => return error.SystemResources,
+ .NO_MEMORY => return error.SystemResources,
+ .QUOTA_EXCEEDED => return error.SystemResources,
+ .PIPE_BROKEN => return error.BrokenPipe,
+ .INVALID_HANDLE => return error.NotOpenForWriting,
+ .LOCK_NOT_GRANTED => return error.LockViolation,
+ .ACCESS_DENIED => return error.AccessDenied,
+ .WORKING_SET_QUOTA => return error.SystemResources,
+ .DISK_FULL => return error.NoSpaceLeft,
+ else => |status| return windows.unexpectedStatus(status),
+ }
+}
+
fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)");
@@ -9837,16 +9940,9 @@ fn fileWriteStreaming(
_ = t;
if (is_windows) {
- if (header.len != 0) {
- return writeFileStreamingWindows(file.handle, header);
- }
- for (data[0 .. data.len - 1]) |buf| {
- if (buf.len == 0) continue;
- return writeFileStreamingWindows(file.handle, buf);
- }
- const pattern = data[data.len - 1];
- if (pattern.len == 0 or splat == 0) return 0;
- return writeFileStreamingWindows(file.handle, pattern);
+ const buffer = windowsWriteBuffer(header, data, splat);
+ if (buffer.len == 0) return 0;
+ return fileWriteStreamingWindows(file, buffer);
}
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
@@ -9953,38 +10049,66 @@ fn fileWriteStreaming(
}
}
-fn writeFileStreamingWindows(
- handle: windows.HANDLE,
- bytes: []const u8,
-) File.Writer.Error!usize {
- assert(bytes.len != 0);
- var bytes_written: windows.DWORD = undefined;
- const adjusted_len = std.math.lossyCast(u32, bytes.len);
- const syscall: Syscall = try .start();
- while (true) {
- if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, null) != 0) {
- syscall.finish();
- return bytes_written;
+fn fileWriteStreamingWindows(file: File, buffer: []const u8) File.Writer.Error!usize {
+ assert(buffer.len != 0);
+
+ var iosb: windows.IO_STATUS_BLOCK = undefined;
+
+ if (file.flags.nonblocking) {
+ var done: bool = false;
+ switch (windows.ntdll.NtWriteFile(
+ file.handle,
+ null, // event
+ flagApc,
+ &done, // APC context
+ &iosb,
+ buffer.ptr,
+ @intCast(buffer.len),
+ null, // byte offset
+ null, // key
+ )) {
+ // We must wait for the APC routine.
+ .PENDING, .SUCCESS => while (!done) {
+ // Once we get here we must not return from the function until the
+ // operation completes, thereby releasing reference to io_status_block.
+ const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
+ error.Canceled => |e| {
+ var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
+ _ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb);
+ while (!done) waitForApcOrAlert();
+ return e;
+ },
+ };
+ waitForApcOrAlert();
+ alertable_syscall.finish();
+ },
+ else => |status| iosb.u.Status = status,
}
- switch (windows.GetLastError()) {
- .OPERATION_ABORTED => {
+ return ntWriteFileResult(&iosb);
+ } else {
+ const syscall: Syscall = try .start();
+ while (true) switch (windows.ntdll.NtWriteFile(
+ file.handle,
+ null, // event
+ null, // APC routine
+ null, // APC context
+ &iosb,
+ buffer.ptr,
+ @intCast(buffer.len),
+ null, // byte offset
+ null, // key
+ )) {
+ .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
+ .CANCELLED => {
try syscall.checkCancel();
continue;
},
- .INVALID_USER_BUFFER => return syscall.fail(error.SystemResources),
- .NOT_ENOUGH_MEMORY => return syscall.fail(error.SystemResources),
- .NOT_ENOUGH_QUOTA => return syscall.fail(error.SystemResources),
- .NO_DATA => return syscall.fail(error.BrokenPipe),
- .INVALID_HANDLE => return syscall.fail(error.NotOpenForWriting),
- .LOCK_VIOLATION => return syscall.fail(error.LockViolation),
- .ACCESS_DENIED => return syscall.fail(error.AccessDenied),
- .WORKING_SET_QUOTA => return syscall.fail(error.SystemResources),
- .DISK_FULL => return syscall.fail(error.NoSpaceLeft),
- else => |err| {
+ else => |status| {
syscall.finish();
- return windows.unexpectedError(err);
+ iosb.u.Status = status;
+ return ntWriteFileResult(&iosb);
},
- }
+ };
}
}
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig
@@ -1437,7 +1437,7 @@ fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!voi
// We do this in a separate write call to give a better chance for the
// writev below to be in a single packet.
const n = @min(parents.len, remaining_write_trash_bytes);
- if (io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{parents[0..n]}, 1)) |written| {
+ if (file.writeStreaming(io, &.{}, &.{parents[0..n]}, 1)) |written| {
remaining_write_trash_bytes -= written;
continue;
} else |err| switch (err) {
@@ -1478,7 +1478,7 @@ fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error
return total_written) : (iov_index += 1) written -= iov[iov_index].len;
iov[iov_index].ptr += written;
iov[iov_index].len -= written;
- written = try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, iov, 1);
+ written = try file.writeStreaming(io, &.{}, iov, 1);
if (written == 0) return total_written;
total_written += written;
}