commit 5360968e03525be4d312ca61a7ba2dcb7890ec42 (tree) parent 43fba5ea83849ec901bb2cd4f98bd0222f51f7f6 Author: Andrew Kelley <andrew@ziglang.org> Date: Thu, 10 Jul 2025 10:28:32 -0700 std: rename `io` to `Io` in preparation This commit is non-breaking. std.io is deprecated in favor of std.Io, in preparation for that namespace becoming an interface. Diffstat:
28 files changed, 903 insertions(+), 901 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt @@ -387,6 +387,18 @@ set(ZIG_STAGE2_SOURCES lib/std/Build.zig lib/std/Build/Cache.zig lib/std/Build/Cache/DepTokenizer.zig + lib/std/Io.zig + lib/std/Io/Reader.zig + lib/std/Io/Writer.zig + lib/std/Io/buffered_atomic_file.zig + lib/std/Io/buffered_writer.zig + lib/std/Io/change_detection_stream.zig + lib/std/Io/counting_reader.zig + lib/std/Io/counting_writer.zig + lib/std/Io/find_byte_writer.zig + lib/std/Io/fixed_buffer_stream.zig + lib/std/Io/limited_reader.zig + lib/std/Io/seekable_stream.zig lib/std/Progress.zig lib/std/Random.zig lib/std/Target.zig @@ -449,18 +461,6 @@ set(ZIG_STAGE2_SOURCES lib/std/hash_map.zig lib/std/heap.zig lib/std/heap/arena_allocator.zig - lib/std/io.zig - lib/std/io/Reader.zig - lib/std/io/Writer.zig - lib/std/io/buffered_atomic_file.zig - lib/std/io/buffered_writer.zig - lib/std/io/change_detection_stream.zig - lib/std/io/counting_reader.zig - lib/std/io/counting_writer.zig - lib/std/io/find_byte_writer.zig - lib/std/io/fixed_buffer_stream.zig - lib/std/io/limited_reader.zig - lib/std/io/seekable_stream.zig lib/std/json.zig lib/std/json/stringify.zig lib/std/leb128.zig diff --git a/lib/std/Io.zig b/lib/std/Io.zig @@ -0,0 +1,884 @@ +const std = @import("std.zig"); +const builtin = @import("builtin"); +const root = @import("root"); +const c = std.c; +const is_windows = builtin.os.tag == .windows; +const windows = std.os.windows; +const posix = std.posix; +const math = std.math; +const assert = std.debug.assert; +const fs = std.fs; +const mem = std.mem; +const meta = std.meta; +const File = std.fs.File; +const Allocator = std.mem.Allocator; +const Alignment = std.mem.Alignment; + +pub const Limit = enum(usize) { + nothing = 0, + unlimited = std.math.maxInt(usize), + _, + + /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`. + pub fn limited(n: usize) Limit { + return @enumFromInt(n); + } + + /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean + /// `.unlimited`. + pub fn limited64(n: u64) Limit { + return @enumFromInt(@min(n, std.math.maxInt(usize))); + } + + pub fn countVec(data: []const []const u8) Limit { + var total: usize = 0; + for (data) |d| total += d.len; + return .limited(total); + } + + pub fn min(a: Limit, b: Limit) Limit { + return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b))); + } + + pub fn minInt(l: Limit, n: usize) usize { + return @min(n, @intFromEnum(l)); + } + + pub fn minInt64(l: Limit, n: u64) usize { + return @min(n, @intFromEnum(l)); + } + + pub fn slice(l: Limit, s: []u8) []u8 { + return s[0..l.minInt(s.len)]; + } + + pub fn sliceConst(l: Limit, s: []const u8) []const u8 { + return s[0..l.minInt(s.len)]; + } + + pub fn toInt(l: Limit) ?usize { + return switch (l) { + else => @intFromEnum(l), + .unlimited => null, + }; + } + + /// Reduces a slice to account for the limit, leaving room for one extra + /// byte above the limit, allowing for the use case of differentiating + /// between end-of-stream and reaching the limit. + pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 { + assert(non_empty_buffer.len >= 1); + return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)]; + } + + pub fn nonzero(l: Limit) bool { + return @intFromEnum(l) > 0; + } + + /// Return a new limit reduced by `amount` or return `null` indicating + /// limit would be exceeded. + pub fn subtract(l: Limit, amount: usize) ?Limit { + if (l == .unlimited) return .unlimited; + if (amount > @intFromEnum(l)) return null; + return @enumFromInt(@intFromEnum(l) - amount); + } +}; + +pub const Reader = @import("Io/Reader.zig"); +pub const Writer = @import("Io/Writer.zig"); + +/// Deprecated in favor of `Reader`. +pub fn GenericReader( + comptime Context: type, + comptime ReadError: type, + /// Returns the number of bytes read. It may be less than buffer.len. + /// If the number of bytes read is 0, it means end of stream. + /// End of stream is not an error condition. + comptime readFn: fn (context: Context, buffer: []u8) ReadError!usize, +) type { + return struct { + context: Context, + + pub const Error = ReadError; + pub const NoEofError = ReadError || error{ + EndOfStream, + }; + + pub inline fn read(self: Self, buffer: []u8) Error!usize { + return readFn(self.context, buffer); + } + + pub inline fn readAll(self: Self, buffer: []u8) Error!usize { + return @errorCast(self.any().readAll(buffer)); + } + + pub inline fn readAtLeast(self: Self, buffer: []u8, len: usize) Error!usize { + return @errorCast(self.any().readAtLeast(buffer, len)); + } + + pub inline fn readNoEof(self: Self, buf: []u8) NoEofError!void { + return @errorCast(self.any().readNoEof(buf)); + } + + pub inline fn readAllArrayList( + self: Self, + array_list: *std.ArrayList(u8), + max_append_size: usize, + ) (error{StreamTooLong} || Allocator.Error || Error)!void { + return @errorCast(self.any().readAllArrayList(array_list, max_append_size)); + } + + pub inline fn readAllArrayListAligned( + self: Self, + comptime alignment: ?Alignment, + array_list: *std.ArrayListAligned(u8, alignment), + max_append_size: usize, + ) (error{StreamTooLong} || Allocator.Error || Error)!void { + return @errorCast(self.any().readAllArrayListAligned( + alignment, + array_list, + max_append_size, + )); + } + + pub inline fn readAllAlloc( + self: Self, + allocator: Allocator, + max_size: usize, + ) (Error || Allocator.Error || error{StreamTooLong})![]u8 { + return @errorCast(self.any().readAllAlloc(allocator, max_size)); + } + + pub inline fn readUntilDelimiterArrayList( + self: Self, + array_list: *std.ArrayList(u8), + delimiter: u8, + max_size: usize, + ) (NoEofError || Allocator.Error || error{StreamTooLong})!void { + return @errorCast(self.any().readUntilDelimiterArrayList( + array_list, + delimiter, + max_size, + )); + } + + pub inline fn readUntilDelimiterAlloc( + self: Self, + allocator: Allocator, + delimiter: u8, + max_size: usize, + ) (NoEofError || Allocator.Error || error{StreamTooLong})![]u8 { + return @errorCast(self.any().readUntilDelimiterAlloc( + allocator, + delimiter, + max_size, + )); + } + + pub inline fn readUntilDelimiter( + self: Self, + buf: []u8, + delimiter: u8, + ) (NoEofError || error{StreamTooLong})![]u8 { + return @errorCast(self.any().readUntilDelimiter(buf, delimiter)); + } + + pub inline fn readUntilDelimiterOrEofAlloc( + self: Self, + allocator: Allocator, + delimiter: u8, + max_size: usize, + ) (Error || Allocator.Error || error{StreamTooLong})!?[]u8 { + return @errorCast(self.any().readUntilDelimiterOrEofAlloc( + allocator, + delimiter, + max_size, + )); + } + + pub inline fn readUntilDelimiterOrEof( + self: Self, + buf: []u8, + delimiter: u8, + ) (Error || error{StreamTooLong})!?[]u8 { + return @errorCast(self.any().readUntilDelimiterOrEof(buf, delimiter)); + } + + pub inline fn streamUntilDelimiter( + self: Self, + writer: anytype, + delimiter: u8, + optional_max_size: ?usize, + ) (NoEofError || error{StreamTooLong} || @TypeOf(writer).Error)!void { + return @errorCast(self.any().streamUntilDelimiter( + writer, + delimiter, + optional_max_size, + )); + } + + pub inline fn skipUntilDelimiterOrEof(self: Self, delimiter: u8) Error!void { + return @errorCast(self.any().skipUntilDelimiterOrEof(delimiter)); + } + + pub inline fn readByte(self: Self) NoEofError!u8 { + return @errorCast(self.any().readByte()); + } + + pub inline fn readByteSigned(self: Self) NoEofError!i8 { + return @errorCast(self.any().readByteSigned()); + } + + pub inline fn readBytesNoEof( + self: Self, + comptime num_bytes: usize, + ) NoEofError![num_bytes]u8 { + return @errorCast(self.any().readBytesNoEof(num_bytes)); + } + + pub inline fn readIntoBoundedBytes( + self: Self, + comptime num_bytes: usize, + bounded: *std.BoundedArray(u8, num_bytes), + ) Error!void { + return @errorCast(self.any().readIntoBoundedBytes(num_bytes, bounded)); + } + + pub inline fn readBoundedBytes( + self: Self, + comptime num_bytes: usize, + ) Error!std.BoundedArray(u8, num_bytes) { + return @errorCast(self.any().readBoundedBytes(num_bytes)); + } + + pub inline fn readInt(self: Self, comptime T: type, endian: std.builtin.Endian) NoEofError!T { + return @errorCast(self.any().readInt(T, endian)); + } + + pub inline fn readVarInt( + self: Self, + comptime ReturnType: type, + endian: std.builtin.Endian, + size: usize, + ) NoEofError!ReturnType { + return @errorCast(self.any().readVarInt(ReturnType, endian, size)); + } + + pub const SkipBytesOptions = AnyReader.SkipBytesOptions; + + pub inline fn skipBytes( + self: Self, + num_bytes: u64, + comptime options: SkipBytesOptions, + ) NoEofError!void { + return @errorCast(self.any().skipBytes(num_bytes, options)); + } + + pub inline fn isBytes(self: Self, slice: []const u8) NoEofError!bool { + return @errorCast(self.any().isBytes(slice)); + } + + pub inline fn readStruct(self: Self, comptime T: type) NoEofError!T { + return @errorCast(self.any().readStruct(T)); + } + + pub inline fn readStructEndian(self: Self, comptime T: type, endian: std.builtin.Endian) NoEofError!T { + return @errorCast(self.any().readStructEndian(T, endian)); + } + + pub const ReadEnumError = NoEofError || error{ + /// An integer was read, but it did not match any of the tags in the supplied enum. + InvalidValue, + }; + + pub inline fn readEnum( + self: Self, + comptime Enum: type, + endian: std.builtin.Endian, + ) ReadEnumError!Enum { + return @errorCast(self.any().readEnum(Enum, endian)); + } + + pub inline fn any(self: *const Self) AnyReader { + return .{ + .context = @ptrCast(&self.context), + .readFn = typeErasedReadFn, + }; + } + + const Self = @This(); + + fn typeErasedReadFn(context: *const anyopaque, buffer: []u8) anyerror!usize { + const ptr: *const Context = @alignCast(@ptrCast(context)); + return readFn(ptr.*, buffer); + } + }; +} + +/// Deprecated in favor of `Writer`. +pub fn GenericWriter( + comptime Context: type, + comptime WriteError: type, + comptime writeFn: fn (context: Context, bytes: []const u8) WriteError!usize, +) type { + return struct { + context: Context, + + const Self = @This(); + pub const Error = WriteError; + + pub inline fn write(self: Self, bytes: []const u8) Error!usize { + return writeFn(self.context, bytes); + } + + pub inline fn writeAll(self: Self, bytes: []const u8) Error!void { + return @errorCast(self.any().writeAll(bytes)); + } + + pub inline fn print(self: Self, comptime format: []const u8, args: anytype) Error!void { + return @errorCast(self.any().print(format, args)); + } + + pub inline fn writeByte(self: Self, byte: u8) Error!void { + return @errorCast(self.any().writeByte(byte)); + } + + pub inline fn writeByteNTimes(self: Self, byte: u8, n: usize) Error!void { + return @errorCast(self.any().writeByteNTimes(byte, n)); + } + + pub inline fn writeBytesNTimes(self: Self, bytes: []const u8, n: usize) Error!void { + return @errorCast(self.any().writeBytesNTimes(bytes, n)); + } + + pub inline fn writeInt(self: Self, comptime T: type, value: T, endian: std.builtin.Endian) Error!void { + return @errorCast(self.any().writeInt(T, value, endian)); + } + + pub inline fn writeStruct(self: Self, value: anytype) Error!void { + return @errorCast(self.any().writeStruct(value)); + } + + pub inline fn writeStructEndian(self: Self, value: anytype, endian: std.builtin.Endian) Error!void { + return @errorCast(self.any().writeStructEndian(value, endian)); + } + + pub inline fn any(self: *const Self) AnyWriter { + return .{ + .context = @ptrCast(&self.context), + .writeFn = typeErasedWriteFn, + }; + } + + fn typeErasedWriteFn(context: *const anyopaque, bytes: []const u8) anyerror!usize { + const ptr: *const Context = @alignCast(@ptrCast(context)); + return writeFn(ptr.*, bytes); + } + + /// Helper for bridging to the new `Writer` API while upgrading. + pub fn adaptToNewApi(self: *const Self) Adapter { + return .{ + .derp_writer = self.*, + .new_interface = .{ + .buffer = &.{}, + .vtable = &.{ .drain = Adapter.drain }, + }, + }; + } + + pub const Adapter = struct { + derp_writer: Self, + new_interface: Writer, + err: ?Error = null, + + fn drain(w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize { + _ = splat; + const a: *@This() = @fieldParentPtr("new_interface", w); + return a.derp_writer.write(data[0]) catch |err| { + a.err = err; + return error.WriteFailed; + }; + } + }; + }; +} + +/// Deprecated in favor of `Reader`. +pub const AnyReader = @import("Io/DeprecatedReader.zig"); +/// Deprecated in favor of `Writer`. +pub const AnyWriter = @import("Io/DeprecatedWriter.zig"); + +pub const SeekableStream = @import("Io/seekable_stream.zig").SeekableStream; + +pub const BufferedWriter = @import("Io/buffered_writer.zig").BufferedWriter; +pub const bufferedWriter = @import("Io/buffered_writer.zig").bufferedWriter; + +pub const BufferedReader = @import("Io/buffered_reader.zig").BufferedReader; +pub const bufferedReader = @import("Io/buffered_reader.zig").bufferedReader; +pub const bufferedReaderSize = @import("Io/buffered_reader.zig").bufferedReaderSize; + +pub const FixedBufferStream = @import("Io/fixed_buffer_stream.zig").FixedBufferStream; +pub const fixedBufferStream = @import("Io/fixed_buffer_stream.zig").fixedBufferStream; + +pub const CWriter = @import("Io/c_writer.zig").CWriter; +pub const cWriter = @import("Io/c_writer.zig").cWriter; + +pub const LimitedReader = @import("Io/limited_reader.zig").LimitedReader; +pub const limitedReader = @import("Io/limited_reader.zig").limitedReader; + +pub const CountingWriter = @import("Io/counting_writer.zig").CountingWriter; +pub const countingWriter = @import("Io/counting_writer.zig").countingWriter; +pub const CountingReader = @import("Io/counting_reader.zig").CountingReader; +pub const countingReader = @import("Io/counting_reader.zig").countingReader; + +pub const MultiWriter = @import("Io/multi_writer.zig").MultiWriter; +pub const multiWriter = @import("Io/multi_writer.zig").multiWriter; + +pub const BitReader = @import("Io/bit_reader.zig").BitReader; +pub const bitReader = @import("Io/bit_reader.zig").bitReader; + +pub const BitWriter = @import("Io/bit_writer.zig").BitWriter; +pub const bitWriter = @import("Io/bit_writer.zig").bitWriter; + +pub const ChangeDetectionStream = @import("Io/change_detection_stream.zig").ChangeDetectionStream; +pub const changeDetectionStream = @import("Io/change_detection_stream.zig").changeDetectionStream; + +pub const FindByteWriter = @import("Io/find_byte_writer.zig").FindByteWriter; +pub const findByteWriter = @import("Io/find_byte_writer.zig").findByteWriter; + +pub const BufferedAtomicFile = @import("Io/buffered_atomic_file.zig").BufferedAtomicFile; + +pub const StreamSource = @import("Io/stream_source.zig").StreamSource; + +pub const tty = @import("Io/tty.zig"); + +/// A Writer that doesn't write to anything. +pub const null_writer: NullWriter = .{ .context = {} }; + +pub const NullWriter = GenericWriter(void, error{}, dummyWrite); +fn dummyWrite(context: void, data: []const u8) error{}!usize { + _ = context; + return data.len; +} + +test null_writer { + null_writer.writeAll("yay" ** 10) catch |err| switch (err) {}; +} + +pub fn poll( + allocator: Allocator, + comptime StreamEnum: type, + files: PollFiles(StreamEnum), +) Poller(StreamEnum) { + const enum_fields = @typeInfo(StreamEnum).@"enum".fields; + var result: Poller(StreamEnum) = undefined; + + if (is_windows) result.windows = .{ + .first_read_done = false, + .overlapped = [1]windows.OVERLAPPED{ + mem.zeroes(windows.OVERLAPPED), + } ** enum_fields.len, + .small_bufs = undefined, + .active = .{ + .count = 0, + .handles_buf = undefined, + .stream_map = undefined, + }, + }; + + inline for (0..enum_fields.len) |i| { + result.fifos[i] = .{ + .allocator = allocator, + .buf = &.{}, + .head = 0, + .count = 0, + }; + if (is_windows) { + result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; + } else { + result.poll_fds[i] = .{ + .fd = @field(files, enum_fields[i].name).handle, + .events = posix.POLL.IN, + .revents = undefined, + }; + } + } + return result; +} + +pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); + +pub fn Poller(comptime StreamEnum: type) type { + return struct { + const enum_fields = @typeInfo(StreamEnum).@"enum".fields; + const PollFd = if (is_windows) void else posix.pollfd; + + fifos: [enum_fields.len]PollFifo, + poll_fds: [enum_fields.len]PollFd, + windows: if (is_windows) struct { + first_read_done: bool, + overlapped: [enum_fields.len]windows.OVERLAPPED, + small_bufs: [enum_fields.len][128]u8, + active: struct { + count: math.IntFittingRange(0, enum_fields.len), + handles_buf: [enum_fields.len]windows.HANDLE, + stream_map: [enum_fields.len]StreamEnum, + + pub fn removeAt(self: *@This(), index: u32) void { + std.debug.assert(index < self.count); + for (index + 1..self.count) |i| { + self.handles_buf[i - 1] = self.handles_buf[i]; + self.stream_map[i - 1] = self.stream_map[i]; + } + self.count -= 1; + } + }, + } else void, + + const Self = @This(); + + pub fn deinit(self: *Self) void { + if (is_windows) { + // cancel any pending IO to prevent clobbering OVERLAPPED value + for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { + _ = windows.kernel32.CancelIo(h); + } + } + inline for (&self.fifos) |*q| q.deinit(); + self.* = undefined; + } + + pub fn poll(self: *Self) !bool { + if (is_windows) { + return pollWindows(self, null); + } else { + return pollPosix(self, null); + } + } + + pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool { + if (is_windows) { + return pollWindows(self, nanoseconds); + } else { + return pollPosix(self, nanoseconds); + } + } + + pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { + return &self.fifos[@intFromEnum(which)]; + } + + fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { + const bump_amt = 512; + + if (!self.windows.first_read_done) { + var already_read_data = false; + for (0..enum_fields.len) |i| { + const handle = self.windows.active.handles_buf[i]; + switch (try windowsAsyncReadToFifoAndQueueSmallRead( + handle, + &self.windows.overlapped[i], + &self.fifos[i], + &self.windows.small_bufs[i], + bump_amt, + )) { + .populated, .empty => |state| { + if (state == .populated) already_read_data = true; + self.windows.active.handles_buf[self.windows.active.count] = handle; + self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i)); + self.windows.active.count += 1; + }, + .closed => {}, // don't add to the wait_objects list + .closed_populated => { + // don't add to the wait_objects list, but we did already get data + already_read_data = true; + }, + } + } + self.windows.first_read_done = true; + if (already_read_data) return true; + } + + while (true) { + if (self.windows.active.count == 0) return false; + + const status = windows.kernel32.WaitForMultipleObjects( + self.windows.active.count, + &self.windows.active.handles_buf, + 0, + if (nanoseconds) |ns| + @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1) + else + windows.INFINITE, + ); + if (status == windows.WAIT_FAILED) + return windows.unexpectedError(windows.GetLastError()); + if (status == windows.WAIT_TIMEOUT) + return true; + + if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1) + unreachable; + + const active_idx = status - windows.WAIT_OBJECT_0; + + const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]); + const handle = self.windows.active.handles_buf[active_idx]; + + const overlapped = &self.windows.overlapped[stream_idx]; + const stream_fifo = &self.fifos[stream_idx]; + const small_buf = &self.windows.small_bufs[stream_idx]; + + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { + .success => |n| n, + .closed => { + self.windows.active.removeAt(active_idx); + continue; + }, + .aborted => unreachable, + }; + try stream_fifo.write(small_buf[0..num_bytes_read]); + + switch (try windowsAsyncReadToFifoAndQueueSmallRead( + handle, + overlapped, + stream_fifo, + small_buf, + bump_amt, + )) { + .empty => {}, // irrelevant, we already got data from the small buffer + .populated => {}, + .closed, + .closed_populated, // identical, since we already got data from the small buffer + => self.windows.active.removeAt(active_idx), + } + return true; + } + } + + fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { + // We ask for ensureUnusedCapacity with this much extra space. This + // has more of an effect on small reads because once the reads + // start to get larger the amount of space an ArrayList will + // allocate grows exponentially. + const bump_amt = 512; + + const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP; + + const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns| + std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32) + else + -1); + if (events_len == 0) { + for (self.poll_fds) |poll_fd| { + if (poll_fd.fd != -1) return true; + } else return false; + } + + var keep_polling = false; + inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { + // Try reading whatever is available before checking the error + // conditions. + // It's still possible to read after a POLL.HUP is received, + // always check if there's some data waiting to be read first. + if (poll_fd.revents & posix.POLL.IN != 0) { + const buf = try q.writableWithSize(bump_amt); + const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { + error.BrokenPipe => 0, // Handle the same as EOF. + else => |e| return e, + }; + q.update(amt); + if (amt == 0) { + // Remove the fd when the EOF condition is met. + poll_fd.fd = -1; + } else { + keep_polling = true; + } + } else if (poll_fd.revents & err_mask != 0) { + // Exclude the fds that signaled an error. + poll_fd.fd = -1; + } else if (poll_fd.fd != -1) { + keep_polling = true; + } + } + return keep_polling; + } + }; +} + +/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful +/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For +/// compatibility, we point it to this dummy variables, which we never otherwise access. +/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile +var win_dummy_bytes_read: u32 = undefined; + +/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before +/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data +/// is available. `handle` must have no pending asynchronous operation. +fn windowsAsyncReadToFifoAndQueueSmallRead( + handle: windows.HANDLE, + overlapped: *windows.OVERLAPPED, + fifo: *PollFifo, + small_buf: *[128]u8, + bump_amt: usize, +) !enum { empty, populated, closed_populated, closed } { + var read_any_data = false; + while (true) { + const fifo_read_pending = while (true) { + const buf = try fifo.writableWithSize(bump_amt); + const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); + + if (0 == windows.kernel32.ReadFile( + handle, + buf.ptr, + buf_len, + &win_dummy_bytes_read, + overlapped, + )) switch (windows.GetLastError()) { + .IO_PENDING => break true, + .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, + else => |err| return windows.unexpectedError(err), + }; + + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => unreachable, + }; + + read_any_data = true; + fifo.update(num_bytes_read); + + if (num_bytes_read == buf_len) { + // We filled the buffer, so there's probably more data available. + continue; + } else { + // We didn't fill the buffer, so assume we're out of data. + // There is no pending read. + break false; + } + }; + + if (fifo_read_pending) cancel_read: { + // Cancel the pending read into the FIFO. + _ = windows.kernel32.CancelIo(handle); + + // We have to wait for the handle to be signalled, i.e. for the cancellation to complete. + switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { + windows.WAIT_OBJECT_0 => {}, + windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), + else => unreachable, + } + + // If it completed before we canceled, make sure to tell the FIFO! + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => break :cancel_read, + }; + read_any_data = true; + fifo.update(num_bytes_read); + } + + // Try to queue the 1-byte read. + if (0 == windows.kernel32.ReadFile( + handle, + small_buf, + small_buf.len, + &win_dummy_bytes_read, + overlapped, + )) switch (windows.GetLastError()) { + .IO_PENDING => { + // 1-byte read pending as intended + return if (read_any_data) .populated else .empty; + }, + .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, + else => |err| return windows.unexpectedError(err), + }; + + // We got data back this time. Write it to the FIFO and run the main loop again. + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => unreachable, + }; + try fifo.write(small_buf[0..num_bytes_read]); + read_any_data = true; + } +} + +/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. +/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). +/// +/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the +/// operation immediately returns data: +/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially +/// erroneous results." +/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] +/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to +/// get the actual number of bytes read." +/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile +fn windowsGetReadResult( + handle: windows.HANDLE, + overlapped: *windows.OVERLAPPED, + allow_aborted: bool, +) !union(enum) { + success: u32, + closed, + aborted, +} { + var num_bytes_read: u32 = undefined; + if (0 == windows.kernel32.GetOverlappedResult( + handle, + overlapped, + &num_bytes_read, + 0, + )) switch (windows.GetLastError()) { + .BROKEN_PIPE => return .closed, + .OPERATION_ABORTED => |err| if (allow_aborted) { + return .aborted; + } else { + return windows.unexpectedError(err); + }, + else => |err| return windows.unexpectedError(err), + }; + return .{ .success = num_bytes_read }; +} + +/// Given an enum, returns a struct with fields of that enum, each field +/// representing an I/O stream for polling. +pub fn PollFiles(comptime StreamEnum: type) type { + const enum_fields = @typeInfo(StreamEnum).@"enum".fields; + var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined; + for (&struct_fields, enum_fields) |*struct_field, enum_field| { + struct_field.* = .{ + .name = enum_field.name, + .type = fs.File, + .default_value_ptr = null, + .is_comptime = false, + .alignment = @alignOf(fs.File), + }; + } + return @Type(.{ .@"struct" = .{ + .layout = .auto, + .fields = &struct_fields, + .decls = &.{}, + .is_tuple = false, + } }); +} + +test { + _ = Reader; + _ = Writer; + _ = @import("Io/bit_reader.zig"); + _ = @import("Io/bit_writer.zig"); + _ = @import("Io/buffered_atomic_file.zig"); + _ = @import("Io/buffered_reader.zig"); + _ = @import("Io/buffered_writer.zig"); + _ = @import("Io/c_writer.zig"); + _ = @import("Io/counting_writer.zig"); + _ = @import("Io/counting_reader.zig"); + _ = @import("Io/fixed_buffer_stream.zig"); + _ = @import("Io/seekable_stream.zig"); + _ = @import("Io/stream_source.zig"); + _ = @import("Io/test.zig"); +} diff --git a/lib/std/io/DeprecatedReader.zig b/lib/std/Io/DeprecatedReader.zig diff --git a/lib/std/io/DeprecatedWriter.zig b/lib/std/Io/DeprecatedWriter.zig diff --git a/lib/std/io/Reader.zig b/lib/std/Io/Reader.zig diff --git a/lib/std/io/Reader/Limited.zig b/lib/std/Io/Reader/Limited.zig diff --git a/lib/std/io/Reader/test.zig b/lib/std/Io/Reader/test.zig diff --git a/lib/std/io/Writer.zig b/lib/std/Io/Writer.zig diff --git a/lib/std/io/bit_reader.zig b/lib/std/Io/bit_reader.zig diff --git a/lib/std/io/bit_writer.zig b/lib/std/Io/bit_writer.zig diff --git a/lib/std/io/buffered_atomic_file.zig b/lib/std/Io/buffered_atomic_file.zig diff --git a/lib/std/io/buffered_reader.zig b/lib/std/Io/buffered_reader.zig diff --git a/lib/std/io/buffered_writer.zig b/lib/std/Io/buffered_writer.zig diff --git a/lib/std/io/c_writer.zig b/lib/std/Io/c_writer.zig diff --git a/lib/std/io/change_detection_stream.zig b/lib/std/Io/change_detection_stream.zig diff --git a/lib/std/io/counting_reader.zig b/lib/std/Io/counting_reader.zig diff --git a/lib/std/io/counting_writer.zig b/lib/std/Io/counting_writer.zig diff --git a/lib/std/io/find_byte_writer.zig b/lib/std/Io/find_byte_writer.zig diff --git a/lib/std/io/fixed_buffer_stream.zig b/lib/std/Io/fixed_buffer_stream.zig diff --git a/lib/std/io/limited_reader.zig b/lib/std/Io/limited_reader.zig diff --git a/lib/std/io/multi_writer.zig b/lib/std/Io/multi_writer.zig diff --git a/lib/std/io/seekable_stream.zig b/lib/std/Io/seekable_stream.zig diff --git a/lib/std/io/stream_source.zig b/lib/std/Io/stream_source.zig diff --git a/lib/std/io/test.zig b/lib/std/Io/test.zig diff --git a/lib/std/io/tty.zig b/lib/std/Io/tty.zig diff --git a/lib/std/fs/path.zig b/lib/std/fs/path.zig @@ -227,8 +227,8 @@ test join { try testJoinMaybeZWindows(&[_][]const u8{ "c:\\a\\", "b\\", "c" }, "c:\\a\\b\\c", zero); try testJoinMaybeZWindows( - &[_][]const u8{ "c:\\home\\andy\\dev\\zig\\build\\lib\\zig\\std", "io.zig" }, - "c:\\home\\andy\\dev\\zig\\build\\lib\\zig\\std\\io.zig", + &[_][]const u8{ "c:\\home\\andy\\dev\\zig\\build\\lib\\zig\\std", "ab.zig" }, + "c:\\home\\andy\\dev\\zig\\build\\lib\\zig\\std\\ab.zig", zero, ); @@ -252,8 +252,8 @@ test join { try testJoinMaybeZPosix(&[_][]const u8{ "/a/", "b/", "c" }, "/a/b/c", zero); try testJoinMaybeZPosix( - &[_][]const u8{ "/home/andy/dev/zig/build/lib/zig/std", "io.zig" }, - "/home/andy/dev/zig/build/lib/zig/std/io.zig", + &[_][]const u8{ "/home/andy/dev/zig/build/lib/zig/std", "ab.zig" }, + "/home/andy/dev/zig/build/lib/zig/std/ab.zig", zero, ); diff --git a/lib/std/io.zig b/lib/std/io.zig @@ -1,884 +0,0 @@ -const std = @import("std.zig"); -const builtin = @import("builtin"); -const root = @import("root"); -const c = std.c; -const is_windows = builtin.os.tag == .windows; -const windows = std.os.windows; -const posix = std.posix; -const math = std.math; -const assert = std.debug.assert; -const fs = std.fs; -const mem = std.mem; -const meta = std.meta; -const File = std.fs.File; -const Allocator = std.mem.Allocator; -const Alignment = std.mem.Alignment; - -pub const Limit = enum(usize) { - nothing = 0, - unlimited = std.math.maxInt(usize), - _, - - /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`. - pub fn limited(n: usize) Limit { - return @enumFromInt(n); - } - - /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean - /// `.unlimited`. - pub fn limited64(n: u64) Limit { - return @enumFromInt(@min(n, std.math.maxInt(usize))); - } - - pub fn countVec(data: []const []const u8) Limit { - var total: usize = 0; - for (data) |d| total += d.len; - return .limited(total); - } - - pub fn min(a: Limit, b: Limit) Limit { - return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b))); - } - - pub fn minInt(l: Limit, n: usize) usize { - return @min(n, @intFromEnum(l)); - } - - pub fn minInt64(l: Limit, n: u64) usize { - return @min(n, @intFromEnum(l)); - } - - pub fn slice(l: Limit, s: []u8) []u8 { - return s[0..l.minInt(s.len)]; - } - - pub fn sliceConst(l: Limit, s: []const u8) []const u8 { - return s[0..l.minInt(s.len)]; - } - - pub fn toInt(l: Limit) ?usize { - return switch (l) { - else => @intFromEnum(l), - .unlimited => null, - }; - } - - /// Reduces a slice to account for the limit, leaving room for one extra - /// byte above the limit, allowing for the use case of differentiating - /// between end-of-stream and reaching the limit. - pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 { - assert(non_empty_buffer.len >= 1); - return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)]; - } - - pub fn nonzero(l: Limit) bool { - return @intFromEnum(l) > 0; - } - - /// Return a new limit reduced by `amount` or return `null` indicating - /// limit would be exceeded. - pub fn subtract(l: Limit, amount: usize) ?Limit { - if (l == .unlimited) return .unlimited; - if (amount > @intFromEnum(l)) return null; - return @enumFromInt(@intFromEnum(l) - amount); - } -}; - -pub const Reader = @import("io/Reader.zig"); -pub const Writer = @import("io/Writer.zig"); - -/// Deprecated in favor of `Reader`. -pub fn GenericReader( - comptime Context: type, - comptime ReadError: type, - /// Returns the number of bytes read. It may be less than buffer.len. - /// If the number of bytes read is 0, it means end of stream. - /// End of stream is not an error condition. - comptime readFn: fn (context: Context, buffer: []u8) ReadError!usize, -) type { - return struct { - context: Context, - - pub const Error = ReadError; - pub const NoEofError = ReadError || error{ - EndOfStream, - }; - - pub inline fn read(self: Self, buffer: []u8) Error!usize { - return readFn(self.context, buffer); - } - - pub inline fn readAll(self: Self, buffer: []u8) Error!usize { - return @errorCast(self.any().readAll(buffer)); - } - - pub inline fn readAtLeast(self: Self, buffer: []u8, len: usize) Error!usize { - return @errorCast(self.any().readAtLeast(buffer, len)); - } - - pub inline fn readNoEof(self: Self, buf: []u8) NoEofError!void { - return @errorCast(self.any().readNoEof(buf)); - } - - pub inline fn readAllArrayList( - self: Self, - array_list: *std.ArrayList(u8), - max_append_size: usize, - ) (error{StreamTooLong} || Allocator.Error || Error)!void { - return @errorCast(self.any().readAllArrayList(array_list, max_append_size)); - } - - pub inline fn readAllArrayListAligned( - self: Self, - comptime alignment: ?Alignment, - array_list: *std.ArrayListAligned(u8, alignment), - max_append_size: usize, - ) (error{StreamTooLong} || Allocator.Error || Error)!void { - return @errorCast(self.any().readAllArrayListAligned( - alignment, - array_list, - max_append_size, - )); - } - - pub inline fn readAllAlloc( - self: Self, - allocator: Allocator, - max_size: usize, - ) (Error || Allocator.Error || error{StreamTooLong})![]u8 { - return @errorCast(self.any().readAllAlloc(allocator, max_size)); - } - - pub inline fn readUntilDelimiterArrayList( - self: Self, - array_list: *std.ArrayList(u8), - delimiter: u8, - max_size: usize, - ) (NoEofError || Allocator.Error || error{StreamTooLong})!void { - return @errorCast(self.any().readUntilDelimiterArrayList( - array_list, - delimiter, - max_size, - )); - } - - pub inline fn readUntilDelimiterAlloc( - self: Self, - allocator: Allocator, - delimiter: u8, - max_size: usize, - ) (NoEofError || Allocator.Error || error{StreamTooLong})![]u8 { - return @errorCast(self.any().readUntilDelimiterAlloc( - allocator, - delimiter, - max_size, - )); - } - - pub inline fn readUntilDelimiter( - self: Self, - buf: []u8, - delimiter: u8, - ) (NoEofError || error{StreamTooLong})![]u8 { - return @errorCast(self.any().readUntilDelimiter(buf, delimiter)); - } - - pub inline fn readUntilDelimiterOrEofAlloc( - self: Self, - allocator: Allocator, - delimiter: u8, - max_size: usize, - ) (Error || Allocator.Error || error{StreamTooLong})!?[]u8 { - return @errorCast(self.any().readUntilDelimiterOrEofAlloc( - allocator, - delimiter, - max_size, - )); - } - - pub inline fn readUntilDelimiterOrEof( - self: Self, - buf: []u8, - delimiter: u8, - ) (Error || error{StreamTooLong})!?[]u8 { - return @errorCast(self.any().readUntilDelimiterOrEof(buf, delimiter)); - } - - pub inline fn streamUntilDelimiter( - self: Self, - writer: anytype, - delimiter: u8, - optional_max_size: ?usize, - ) (NoEofError || error{StreamTooLong} || @TypeOf(writer).Error)!void { - return @errorCast(self.any().streamUntilDelimiter( - writer, - delimiter, - optional_max_size, - )); - } - - pub inline fn skipUntilDelimiterOrEof(self: Self, delimiter: u8) Error!void { - return @errorCast(self.any().skipUntilDelimiterOrEof(delimiter)); - } - - pub inline fn readByte(self: Self) NoEofError!u8 { - return @errorCast(self.any().readByte()); - } - - pub inline fn readByteSigned(self: Self) NoEofError!i8 { - return @errorCast(self.any().readByteSigned()); - } - - pub inline fn readBytesNoEof( - self: Self, - comptime num_bytes: usize, - ) NoEofError![num_bytes]u8 { - return @errorCast(self.any().readBytesNoEof(num_bytes)); - } - - pub inline fn readIntoBoundedBytes( - self: Self, - comptime num_bytes: usize, - bounded: *std.BoundedArray(u8, num_bytes), - ) Error!void { - return @errorCast(self.any().readIntoBoundedBytes(num_bytes, bounded)); - } - - pub inline fn readBoundedBytes( - self: Self, - comptime num_bytes: usize, - ) Error!std.BoundedArray(u8, num_bytes) { - return @errorCast(self.any().readBoundedBytes(num_bytes)); - } - - pub inline fn readInt(self: Self, comptime T: type, endian: std.builtin.Endian) NoEofError!T { - return @errorCast(self.any().readInt(T, endian)); - } - - pub inline fn readVarInt( - self: Self, - comptime ReturnType: type, - endian: std.builtin.Endian, - size: usize, - ) NoEofError!ReturnType { - return @errorCast(self.any().readVarInt(ReturnType, endian, size)); - } - - pub const SkipBytesOptions = AnyReader.SkipBytesOptions; - - pub inline fn skipBytes( - self: Self, - num_bytes: u64, - comptime options: SkipBytesOptions, - ) NoEofError!void { - return @errorCast(self.any().skipBytes(num_bytes, options)); - } - - pub inline fn isBytes(self: Self, slice: []const u8) NoEofError!bool { - return @errorCast(self.any().isBytes(slice)); - } - - pub inline fn readStruct(self: Self, comptime T: type) NoEofError!T { - return @errorCast(self.any().readStruct(T)); - } - - pub inline fn readStructEndian(self: Self, comptime T: type, endian: std.builtin.Endian) NoEofError!T { - return @errorCast(self.any().readStructEndian(T, endian)); - } - - pub const ReadEnumError = NoEofError || error{ - /// An integer was read, but it did not match any of the tags in the supplied enum. - InvalidValue, - }; - - pub inline fn readEnum( - self: Self, - comptime Enum: type, - endian: std.builtin.Endian, - ) ReadEnumError!Enum { - return @errorCast(self.any().readEnum(Enum, endian)); - } - - pub inline fn any(self: *const Self) AnyReader { - return .{ - .context = @ptrCast(&self.context), - .readFn = typeErasedReadFn, - }; - } - - const Self = @This(); - - fn typeErasedReadFn(context: *const anyopaque, buffer: []u8) anyerror!usize { - const ptr: *const Context = @alignCast(@ptrCast(context)); - return readFn(ptr.*, buffer); - } - }; -} - -/// Deprecated in favor of `Writer`. -pub fn GenericWriter( - comptime Context: type, - comptime WriteError: type, - comptime writeFn: fn (context: Context, bytes: []const u8) WriteError!usize, -) type { - return struct { - context: Context, - - const Self = @This(); - pub const Error = WriteError; - - pub inline fn write(self: Self, bytes: []const u8) Error!usize { - return writeFn(self.context, bytes); - } - - pub inline fn writeAll(self: Self, bytes: []const u8) Error!void { - return @errorCast(self.any().writeAll(bytes)); - } - - pub inline fn print(self: Self, comptime format: []const u8, args: anytype) Error!void { - return @errorCast(self.any().print(format, args)); - } - - pub inline fn writeByte(self: Self, byte: u8) Error!void { - return @errorCast(self.any().writeByte(byte)); - } - - pub inline fn writeByteNTimes(self: Self, byte: u8, n: usize) Error!void { - return @errorCast(self.any().writeByteNTimes(byte, n)); - } - - pub inline fn writeBytesNTimes(self: Self, bytes: []const u8, n: usize) Error!void { - return @errorCast(self.any().writeBytesNTimes(bytes, n)); - } - - pub inline fn writeInt(self: Self, comptime T: type, value: T, endian: std.builtin.Endian) Error!void { - return @errorCast(self.any().writeInt(T, value, endian)); - } - - pub inline fn writeStruct(self: Self, value: anytype) Error!void { - return @errorCast(self.any().writeStruct(value)); - } - - pub inline fn writeStructEndian(self: Self, value: anytype, endian: std.builtin.Endian) Error!void { - return @errorCast(self.any().writeStructEndian(value, endian)); - } - - pub inline fn any(self: *const Self) AnyWriter { - return .{ - .context = @ptrCast(&self.context), - .writeFn = typeErasedWriteFn, - }; - } - - fn typeErasedWriteFn(context: *const anyopaque, bytes: []const u8) anyerror!usize { - const ptr: *const Context = @alignCast(@ptrCast(context)); - return writeFn(ptr.*, bytes); - } - - /// Helper for bridging to the new `Writer` API while upgrading. - pub fn adaptToNewApi(self: *const Self) Adapter { - return .{ - .derp_writer = self.*, - .new_interface = .{ - .buffer = &.{}, - .vtable = &.{ .drain = Adapter.drain }, - }, - }; - } - - pub const Adapter = struct { - derp_writer: Self, - new_interface: Writer, - err: ?Error = null, - - fn drain(w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize { - _ = splat; - const a: *@This() = @fieldParentPtr("new_interface", w); - return a.derp_writer.write(data[0]) catch |err| { - a.err = err; - return error.WriteFailed; - }; - } - }; - }; -} - -/// Deprecated in favor of `Reader`. -pub const AnyReader = @import("io/DeprecatedReader.zig"); -/// Deprecated in favor of `Writer`. -pub const AnyWriter = @import("io/DeprecatedWriter.zig"); - -pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream; - -pub const BufferedWriter = @import("io/buffered_writer.zig").BufferedWriter; -pub const bufferedWriter = @import("io/buffered_writer.zig").bufferedWriter; - -pub const BufferedReader = @import("io/buffered_reader.zig").BufferedReader; -pub const bufferedReader = @import("io/buffered_reader.zig").bufferedReader; -pub const bufferedReaderSize = @import("io/buffered_reader.zig").bufferedReaderSize; - -pub const FixedBufferStream = @import("io/fixed_buffer_stream.zig").FixedBufferStream; -pub const fixedBufferStream = @import("io/fixed_buffer_stream.zig").fixedBufferStream; - -pub const CWriter = @import("io/c_writer.zig").CWriter; -pub const cWriter = @import("io/c_writer.zig").cWriter; - -pub const LimitedReader = @import("io/limited_reader.zig").LimitedReader; -pub const limitedReader = @import("io/limited_reader.zig").limitedReader; - -pub const CountingWriter = @import("io/counting_writer.zig").CountingWriter; -pub const countingWriter = @import("io/counting_writer.zig").countingWriter; -pub const CountingReader = @import("io/counting_reader.zig").CountingReader; -pub const countingReader = @import("io/counting_reader.zig").countingReader; - -pub const MultiWriter = @import("io/multi_writer.zig").MultiWriter; -pub const multiWriter = @import("io/multi_writer.zig").multiWriter; - -pub const BitReader = @import("io/bit_reader.zig").BitReader; -pub const bitReader = @import("io/bit_reader.zig").bitReader; - -pub const BitWriter = @import("io/bit_writer.zig").BitWriter; -pub const bitWriter = @import("io/bit_writer.zig").bitWriter; - -pub const ChangeDetectionStream = @import("io/change_detection_stream.zig").ChangeDetectionStream; -pub const changeDetectionStream = @import("io/change_detection_stream.zig").changeDetectionStream; - -pub const FindByteWriter = @import("io/find_byte_writer.zig").FindByteWriter; -pub const findByteWriter = @import("io/find_byte_writer.zig").findByteWriter; - -pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAtomicFile; - -pub const StreamSource = @import("io/stream_source.zig").StreamSource; - -pub const tty = @import("io/tty.zig"); - -/// A Writer that doesn't write to anything. -pub const null_writer: NullWriter = .{ .context = {} }; - -pub const NullWriter = GenericWriter(void, error{}, dummyWrite); -fn dummyWrite(context: void, data: []const u8) error{}!usize { - _ = context; - return data.len; -} - -test null_writer { - null_writer.writeAll("yay" ** 10) catch |err| switch (err) {}; -} - -pub fn poll( - allocator: Allocator, - comptime StreamEnum: type, - files: PollFiles(StreamEnum), -) Poller(StreamEnum) { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var result: Poller(StreamEnum) = undefined; - - if (is_windows) result.windows = .{ - .first_read_done = false, - .overlapped = [1]windows.OVERLAPPED{ - mem.zeroes(windows.OVERLAPPED), - } ** enum_fields.len, - .small_bufs = undefined, - .active = .{ - .count = 0, - .handles_buf = undefined, - .stream_map = undefined, - }, - }; - - inline for (0..enum_fields.len) |i| { - result.fifos[i] = .{ - .allocator = allocator, - .buf = &.{}, - .head = 0, - .count = 0, - }; - if (is_windows) { - result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; - } else { - result.poll_fds[i] = .{ - .fd = @field(files, enum_fields[i].name).handle, - .events = posix.POLL.IN, - .revents = undefined, - }; - } - } - return result; -} - -pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); - -pub fn Poller(comptime StreamEnum: type) type { - return struct { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - const PollFd = if (is_windows) void else posix.pollfd; - - fifos: [enum_fields.len]PollFifo, - poll_fds: [enum_fields.len]PollFd, - windows: if (is_windows) struct { - first_read_done: bool, - overlapped: [enum_fields.len]windows.OVERLAPPED, - small_bufs: [enum_fields.len][128]u8, - active: struct { - count: math.IntFittingRange(0, enum_fields.len), - handles_buf: [enum_fields.len]windows.HANDLE, - stream_map: [enum_fields.len]StreamEnum, - - pub fn removeAt(self: *@This(), index: u32) void { - std.debug.assert(index < self.count); - for (index + 1..self.count) |i| { - self.handles_buf[i - 1] = self.handles_buf[i]; - self.stream_map[i - 1] = self.stream_map[i]; - } - self.count -= 1; - } - }, - } else void, - - const Self = @This(); - - pub fn deinit(self: *Self) void { - if (is_windows) { - // cancel any pending IO to prevent clobbering OVERLAPPED value - for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { - _ = windows.kernel32.CancelIo(h); - } - } - inline for (&self.fifos) |*q| q.deinit(); - self.* = undefined; - } - - pub fn poll(self: *Self) !bool { - if (is_windows) { - return pollWindows(self, null); - } else { - return pollPosix(self, null); - } - } - - pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool { - if (is_windows) { - return pollWindows(self, nanoseconds); - } else { - return pollPosix(self, nanoseconds); - } - } - - pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { - return &self.fifos[@intFromEnum(which)]; - } - - fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { - const bump_amt = 512; - - if (!self.windows.first_read_done) { - var already_read_data = false; - for (0..enum_fields.len) |i| { - const handle = self.windows.active.handles_buf[i]; - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - handle, - &self.windows.overlapped[i], - &self.fifos[i], - &self.windows.small_bufs[i], - bump_amt, - )) { - .populated, .empty => |state| { - if (state == .populated) already_read_data = true; - self.windows.active.handles_buf[self.windows.active.count] = handle; - self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i)); - self.windows.active.count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .closed_populated => { - // don't add to the wait_objects list, but we did already get data - already_read_data = true; - }, - } - } - self.windows.first_read_done = true; - if (already_read_data) return true; - } - - while (true) { - if (self.windows.active.count == 0) return false; - - const status = windows.kernel32.WaitForMultipleObjects( - self.windows.active.count, - &self.windows.active.handles_buf, - 0, - if (nanoseconds) |ns| - @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1) - else - windows.INFINITE, - ); - if (status == windows.WAIT_FAILED) - return windows.unexpectedError(windows.GetLastError()); - if (status == windows.WAIT_TIMEOUT) - return true; - - if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1) - unreachable; - - const active_idx = status - windows.WAIT_OBJECT_0; - - const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]); - const handle = self.windows.active.handles_buf[active_idx]; - - const overlapped = &self.windows.overlapped[stream_idx]; - const stream_fifo = &self.fifos[stream_idx]; - const small_buf = &self.windows.small_bufs[stream_idx]; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => { - self.windows.active.removeAt(active_idx); - continue; - }, - .aborted => unreachable, - }; - try stream_fifo.write(small_buf[0..num_bytes_read]); - - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - handle, - overlapped, - stream_fifo, - small_buf, - bump_amt, - )) { - .empty => {}, // irrelevant, we already got data from the small buffer - .populated => {}, - .closed, - .closed_populated, // identical, since we already got data from the small buffer - => self.windows.active.removeAt(active_idx), - } - return true; - } - } - - fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { - // We ask for ensureUnusedCapacity with this much extra space. This - // has more of an effect on small reads because once the reads - // start to get larger the amount of space an ArrayList will - // allocate grows exponentially. - const bump_amt = 512; - - const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP; - - const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns| - std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32) - else - -1); - if (events_len == 0) { - for (self.poll_fds) |poll_fd| { - if (poll_fd.fd != -1) return true; - } else return false; - } - - var keep_polling = false; - inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { - // Try reading whatever is available before checking the error - // conditions. - // It's still possible to read after a POLL.HUP is received, - // always check if there's some data waiting to be read first. - if (poll_fd.revents & posix.POLL.IN != 0) { - const buf = try q.writableWithSize(bump_amt); - const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { - error.BrokenPipe => 0, // Handle the same as EOF. - else => |e| return e, - }; - q.update(amt); - if (amt == 0) { - // Remove the fd when the EOF condition is met. - poll_fd.fd = -1; - } else { - keep_polling = true; - } - } else if (poll_fd.revents & err_mask != 0) { - // Exclude the fds that signaled an error. - poll_fd.fd = -1; - } else if (poll_fd.fd != -1) { - keep_polling = true; - } - } - return keep_polling; - } - }; -} - -/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful -/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For -/// compatibility, we point it to this dummy variables, which we never otherwise access. -/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile -var win_dummy_bytes_read: u32 = undefined; - -/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before -/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data -/// is available. `handle` must have no pending asynchronous operation. -fn windowsAsyncReadToFifoAndQueueSmallRead( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - fifo: *PollFifo, - small_buf: *[128]u8, - bump_amt: usize, -) !enum { empty, populated, closed_populated, closed } { - var read_any_data = false; - while (true) { - const fifo_read_pending = while (true) { - const buf = try fifo.writableWithSize(bump_amt); - const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); - - if (0 == windows.kernel32.ReadFile( - handle, - buf.ptr, - buf_len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => break true, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - - read_any_data = true; - fifo.update(num_bytes_read); - - if (num_bytes_read == buf_len) { - // We filled the buffer, so there's probably more data available. - continue; - } else { - // We didn't fill the buffer, so assume we're out of data. - // There is no pending read. - break false; - } - }; - - if (fifo_read_pending) cancel_read: { - // Cancel the pending read into the FIFO. - _ = windows.kernel32.CancelIo(handle); - - // We have to wait for the handle to be signalled, i.e. for the cancellation to complete. - switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { - windows.WAIT_OBJECT_0 => {}, - windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), - else => unreachable, - } - - // If it completed before we canceled, make sure to tell the FIFO! - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => break :cancel_read, - }; - read_any_data = true; - fifo.update(num_bytes_read); - } - - // Try to queue the 1-byte read. - if (0 == windows.kernel32.ReadFile( - handle, - small_buf, - small_buf.len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => { - // 1-byte read pending as intended - return if (read_any_data) .populated else .empty; - }, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - // We got data back this time. Write it to the FIFO and run the main loop again. - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - try fifo.write(small_buf[0..num_bytes_read]); - read_any_data = true; - } -} - -/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. -/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). -/// -/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the -/// operation immediately returns data: -/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially -/// erroneous results." -/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] -/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to -/// get the actual number of bytes read." -/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile -fn windowsGetReadResult( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - allow_aborted: bool, -) !union(enum) { - success: u32, - closed, - aborted, -} { - var num_bytes_read: u32 = undefined; - if (0 == windows.kernel32.GetOverlappedResult( - handle, - overlapped, - &num_bytes_read, - 0, - )) switch (windows.GetLastError()) { - .BROKEN_PIPE => return .closed, - .OPERATION_ABORTED => |err| if (allow_aborted) { - return .aborted; - } else { - return windows.unexpectedError(err); - }, - else => |err| return windows.unexpectedError(err), - }; - return .{ .success = num_bytes_read }; -} - -/// Given an enum, returns a struct with fields of that enum, each field -/// representing an I/O stream for polling. -pub fn PollFiles(comptime StreamEnum: type) type { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined; - for (&struct_fields, enum_fields) |*struct_field, enum_field| { - struct_field.* = .{ - .name = enum_field.name, - .type = fs.File, - .default_value_ptr = null, - .is_comptime = false, - .alignment = @alignOf(fs.File), - }; - } - return @Type(.{ .@"struct" = .{ - .layout = .auto, - .fields = &struct_fields, - .decls = &.{}, - .is_tuple = false, - } }); -} - -test { - _ = Reader; - _ = Writer; - _ = @import("io/bit_reader.zig"); - _ = @import("io/bit_writer.zig"); - _ = @import("io/buffered_atomic_file.zig"); - _ = @import("io/buffered_reader.zig"); - _ = @import("io/buffered_writer.zig"); - _ = @import("io/c_writer.zig"); - _ = @import("io/counting_writer.zig"); - _ = @import("io/counting_reader.zig"); - _ = @import("io/fixed_buffer_stream.zig"); - _ = @import("io/seekable_stream.zig"); - _ = @import("io/stream_source.zig"); - _ = @import("io/test.zig"); -} diff --git a/lib/std/std.zig b/lib/std/std.zig @@ -25,6 +25,7 @@ pub const EnumMap = enums.EnumMap; pub const EnumSet = enums.EnumSet; pub const HashMap = hash_map.HashMap; pub const HashMapUnmanaged = hash_map.HashMapUnmanaged; +pub const Io = @import("Io.zig"); pub const MultiArrayList = @import("multi_array_list.zig").MultiArrayList; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const PriorityDequeue = @import("priority_dequeue.zig").PriorityDequeue; @@ -67,7 +68,8 @@ pub const hash = @import("hash.zig"); pub const hash_map = @import("hash_map.zig"); pub const heap = @import("heap.zig"); pub const http = @import("http.zig"); -pub const io = @import("io.zig"); +/// Deprecated +pub const io = Io; pub const json = @import("json.zig"); pub const leb = @import("leb128.zig"); pub const log = @import("log.zig");