zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

commit 0b5bcd2f56a84e66d5c700744ec1838381893667 (tree)
parent 704cd977bdcdfa8cff4e70aaad93857d9b622fc7
Author: Andrew Kelley <andrew@ziglang.org>
Date:   Thu,  6 Feb 2020 17:56:40 -0500

more std lib async I/O integration

 * `zig test` gainst `--test-evented-io` parameter and gains the ability
   to seamlessly run async tests.
 * `std.ChildProcess` opens its child process pipe with O_NONBLOCK when
   using evented I/O
 * `std.io.getStdErr()` gives a File that is blocking even in evented
   I/O mode.
 * Delete `std.event.fs`. The functionality is now merged into `std.fs`
   and async file system access (using a dedicated thread) is
   automatically handled.
 * `std.fs.File` can be configured to specify whether its handle is
   expected to block, and whether that is OK to block even when in
   async I/O mode. This makes async I/O work correctly for e.g. the
   file system as well as network.
 * `std.fs.File` has some deprecated functions removed.
 * Missing readv,writev,pread,pwrite,preadv,pwritev functions are added
   to `std.os` and `std.fs.File`. They are all integrated with async
   I/O.
 * `std.fs.Watch` is still bit rotted and needs to be audited in light
   of the new async/await syntax.
 * `std.io.OutStream` integrates with async I/O
 * linked list nodes in the std lib have default `null` values for
   `prev` and `next`.
 * Windows async I/O integration is enabled for reading/writing file
   handles.
 * Added `std.os.mode_t`. Integer sizes need to be audited.
 * Fixed #4403 which was causing compiler to crash.

This is working towards:

./zig test ../test/stage1/behavior.zig --test-evented-io

Which does not successfully build yet. I'd like to enable behavioral
tests and std lib tests with --test-evented-io in the test matrix in the
future, to prevent regressions.

Diffstat:
Mlib/std/builtin.zig | 1+
Mlib/std/child_process.zig | 59++++++++++++++++++++++++++++++++++++++++++++---------------
Mlib/std/debug.zig | 48++++++++++++++++++++++++------------------------
Mlib/std/event.zig | 2--
Dlib/std/event/fs.zig | 1418-------------------------------------------------------------------------------
Mlib/std/event/loop.zig | 367++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Mlib/std/fs.zig | 37+++++++++++++++++++++++++++++++------
Mlib/std/fs/file.zig | 151++++++++++++++++++++++++++++++++++++++++---------------------------------------
Alib/std/fs/watch.zig | 673+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mlib/std/io.zig | 16+++++++++++++---
Mlib/std/io/out_stream.zig | 24++++++++++--------------
Mlib/std/linked_list.zig | 9+++------
Mlib/std/os.zig | 205+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Mlib/std/os/bits/darwin.zig | 1+
Mlib/std/os/bits/dragonfly.zig | 1+
Mlib/std/os/bits/freebsd.zig | 1+
Mlib/std/os/bits/linux/x86_64.zig | 2++
Mlib/std/os/bits/netbsd.zig | 1+
Mlib/std/os/bits/wasi.zig | 1+
Mlib/std/os/bits/windows.zig | 1+
Mlib/std/os/windows.zig | 157++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Mlib/std/special/test_runner.zig | 26++++++++++++++++++++++++--
Msrc/analyze.cpp | 20++++++++++++++++----
Msrc/codegen.cpp | 42++++++++++++++++++++++++------------------
24 files changed, 1572 insertions(+), 1691 deletions(-)

diff --git a/lib/std/builtin.zig b/lib/std/builtin.zig @@ -458,6 +458,7 @@ pub const ExportOptions = struct { pub const TestFn = struct { name: []const u8, func: fn () anyerror!void, + async_frame_size: ?usize, }; /// This function type is used by the Zig language code generation and diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig @@ -329,17 +329,18 @@ pub const ChildProcess = struct { } fn spawnPosix(self: *ChildProcess) SpawnError!void { - const stdin_pipe = if (self.stdin_behavior == StdIo.Pipe) try os.pipe() else undefined; + const pipe_flags = if (io.is_async) os.O_NONBLOCK else 0; + const stdin_pipe = if (self.stdin_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stdin_behavior == StdIo.Pipe) { destroyPipe(stdin_pipe); }; - const stdout_pipe = if (self.stdout_behavior == StdIo.Pipe) try os.pipe() else undefined; + const stdout_pipe = if (self.stdout_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stdout_behavior == StdIo.Pipe) { destroyPipe(stdout_pipe); }; - const stderr_pipe = if (self.stderr_behavior == StdIo.Pipe) try os.pipe() else undefined; + const stderr_pipe = if (self.stderr_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stderr_behavior == StdIo.Pipe) { destroyPipe(stderr_pipe); }; @@ -426,17 +427,26 @@ pub const ChildProcess = struct { // we are the parent const pid = @intCast(i32, pid_result); if (self.stdin_behavior == StdIo.Pipe) { - self.stdin = File.openHandle(stdin_pipe[1]); + self.stdin = File{ + .handle = stdin_pipe[1], + .io_mode = std.io.mode, + }; } else { self.stdin = null; } if (self.stdout_behavior == StdIo.Pipe) { - self.stdout = File.openHandle(stdout_pipe[0]); + self.stdout = File{ + .handle = stdout_pipe[0], + .io_mode = std.io.mode, + }; } else { self.stdout = null; } if (self.stderr_behavior == StdIo.Pipe) { - self.stderr = File.openHandle(stderr_pipe[0]); + self.stderr = File{ + .handle = stderr_pipe[0], + .io_mode = std.io.mode, + }; } else { self.stderr = null; } @@ -661,17 +671,26 @@ pub const ChildProcess = struct { }; if (g_hChildStd_IN_Wr) |h| { - self.stdin = File.openHandle(h); + self.stdin = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stdin = null; } if (g_hChildStd_OUT_Rd) |h| { - self.stdout = File.openHandle(h); + self.stdout = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stdout = null; } if (g_hChildStd_ERR_Rd) |h| { - self.stderr = File.openHandle(h); + self.stderr = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stderr = null; } @@ -693,10 +712,10 @@ pub const ChildProcess = struct { fn setUpChildIo(stdio: StdIo, pipe_fd: i32, std_fileno: i32, dev_null_fd: i32) !void { switch (stdio) { - StdIo.Pipe => try os.dup2(pipe_fd, std_fileno), - StdIo.Close => os.close(std_fileno), - StdIo.Inherit => {}, - StdIo.Ignore => try os.dup2(dev_null_fd, std_fileno), + .Pipe => try os.dup2(pipe_fd, std_fileno), + .Close => os.close(std_fileno), + .Inherit => {}, + .Ignore => try os.dup2(dev_null_fd, std_fileno), } } }; @@ -811,12 +830,22 @@ fn forkChildErrReport(fd: i32, err: ChildProcess.SpawnError) noreturn { const ErrInt = @IntType(false, @sizeOf(anyerror) * 8); fn writeIntFd(fd: i32, value: ErrInt) !void { - const stream = &File.openHandle(fd).outStream().stream; + const file = File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; + const stream = &file.outStream().stream; stream.writeIntNative(u64, @intCast(u64, value)) catch return error.SystemResources; } fn readIntFd(fd: i32) !ErrInt { - const stream = &File.openHandle(fd).inStream().stream; + const file = File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; + const stream = &file.inStream().stream; return @intCast(ErrInt, stream.readIntNative(u64) catch return error.SystemResources); } diff --git a/lib/std/debug.zig b/lib/std/debug.zig @@ -50,7 +50,7 @@ pub fn warn(comptime fmt: []const u8, args: var) void { const held = stderr_mutex.acquire(); defer held.release(); const stderr = getStderrStream(); - stderr.print(fmt, args) catch return; + noasync stderr.print(fmt, args) catch return; } pub fn getStderrStream() *io.OutStream(File.WriteError) { @@ -102,15 +102,15 @@ pub fn detectTTYConfig() TTY.Config { pub fn dumpCurrentStackTrace(start_addr: ?usize) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; writeCurrentStackTrace(stderr, debug_info, detectTTYConfig(), start_addr) catch |err| { - stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; return; }; } @@ -121,11 +121,11 @@ pub fn dumpCurrentStackTrace(start_addr: ?usize) void { pub fn dumpStackTraceFromBase(bp: usize, ip: usize) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; const tty_config = detectTTYConfig(); @@ -195,15 +195,15 @@ pub fn captureStackTrace(first_address: ?usize, stack_trace: *builtin.StackTrace pub fn dumpStackTrace(stack_trace: builtin.StackTrace) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, detectTTYConfig()) catch |err| { - stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; return; }; } @@ -244,7 +244,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c switch (@atomicRmw(u8, &panicking, .Add, 1, .SeqCst)) { 0 => { const stderr = getStderrStream(); - stderr.print(format ++ "\n", args) catch os.abort(); + noasync stderr.print(format ++ "\n", args) catch os.abort(); if (trace) |t| { dumpStackTrace(t.*); } @@ -556,12 +556,12 @@ pub const TTY = struct { switch (conf) { .no_color => return, .escape_codes => switch (color) { - .Red => out_stream.write(RED) catch return, - .Green => out_stream.write(GREEN) catch return, - .Cyan => out_stream.write(CYAN) catch return, - .White, .Bold => out_stream.write(WHITE) catch return, - .Dim => out_stream.write(DIM) catch return, - .Reset => out_stream.write(RESET) catch return, + .Red => noasync out_stream.write(RED) catch return, + .Green => noasync out_stream.write(GREEN) catch return, + .Cyan => noasync out_stream.write(CYAN) catch return, + .White, .Bold => noasync out_stream.write(WHITE) catch return, + .Dim => noasync out_stream.write(DIM) catch return, + .Reset => noasync out_stream.write(RESET) catch return, }, .windows_api => if (builtin.os == .windows) { const S = struct { @@ -717,17 +717,17 @@ fn printLineInfo( tty_config.setColor(out_stream, .White); if (line_info) |*li| { - try out_stream.print("{}:{}:{}", .{ li.file_name, li.line, li.column }); + try noasync out_stream.print("{}:{}:{}", .{ li.file_name, li.line, li.column }); } else { - try out_stream.print("???:?:?", .{}); + try noasync out_stream.write("???:?:?"); } tty_config.setColor(out_stream, .Reset); - try out_stream.write(": "); + try noasync out_stream.write(": "); tty_config.setColor(out_stream, .Dim); - try out_stream.print("0x{x} in {} ({})", .{ address, symbol_name, compile_unit_name }); + try noasync out_stream.print("0x{x} in {} ({})", .{ address, symbol_name, compile_unit_name }); tty_config.setColor(out_stream, .Reset); - try out_stream.write("\n"); + try noasync out_stream.write("\n"); // Show the matching source code line if possible if (line_info) |li| { @@ -736,12 +736,12 @@ fn printLineInfo( // The caret already takes one char const space_needed = @intCast(usize, li.column - 1); - try out_stream.writeByteNTimes(' ', space_needed); + try noasync out_stream.writeByteNTimes(' ', space_needed); tty_config.setColor(out_stream, .Green); - try out_stream.write("^"); + try noasync out_stream.write("^"); tty_config.setColor(out_stream, .Reset); } - try out_stream.write("\n"); + try noasync out_stream.write("\n"); } else |err| switch (err) { error.EndOfFile, error.FileNotFound => {}, error.BadPathName => {}, diff --git a/lib/std/event.zig b/lib/std/event.zig @@ -6,11 +6,9 @@ pub const Locked = @import("event/locked.zig").Locked; pub const RwLock = @import("event/rwlock.zig").RwLock; pub const RwLocked = @import("event/rwlocked.zig").RwLocked; pub const Loop = @import("event/loop.zig").Loop; -pub const fs = @import("event/fs.zig"); test "import event tests" { _ = @import("event/channel.zig"); - _ = @import("event/fs.zig"); _ = @import("event/future.zig"); _ = @import("event/group.zig"); _ = @import("event/lock.zig"); diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig @@ -1,1418 +0,0 @@ -const builtin = @import("builtin"); -const std = @import("../std.zig"); -const event = std.event; -const assert = std.debug.assert; -const testing = std.testing; -const os = std.os; -const mem = std.mem; -const windows = os.windows; -const Loop = event.Loop; -const fd_t = os.fd_t; -const File = std.fs.File; -const Allocator = mem.Allocator; - -//! TODO mege this with `std.fs` - -const global_event_loop = Loop.instance orelse - @compileError("std.event.fs currently only works with event-based I/O"); - -pub const RequestNode = std.atomic.Queue(Request).Node; - -pub const Request = struct { - msg: Msg, - finish: Finish, - - pub const Finish = union(enum) { - TickNode: Loop.NextTickNode, - DeallocCloseOperation: *CloseOperation, - NoAction, - }; - - pub const Msg = union(enum) { - WriteV: WriteV, - PWriteV: PWriteV, - PReadV: PReadV, - Open: Open, - Close: Close, - WriteFile: WriteFile, - End, // special - means the fs thread should exit - - pub const WriteV = struct { - fd: fd_t, - iov: []const os.iovec_const, - result: Error!void, - - pub const Error = os.WriteError; - }; - - pub const PWriteV = struct { - fd: fd_t, - iov: []const os.iovec_const, - offset: usize, - result: Error!void, - - pub const Error = os.WriteError; - }; - - pub const PReadV = struct { - fd: fd_t, - iov: []const os.iovec, - offset: usize, - result: Error!usize, - - pub const Error = os.ReadError; - }; - - pub const Open = struct { - path: [:0]const u8, - flags: u32, - mode: File.Mode, - result: Error!fd_t, - - pub const Error = File.OpenError; - }; - - pub const WriteFile = struct { - path: [:0]const u8, - contents: []const u8, - mode: File.Mode, - result: Error!void, - - pub const Error = File.OpenError || File.WriteError; - }; - - pub const Close = struct { - fd: fd_t, - }; - }; -}; - -pub const PWriteVError = error{OutOfMemory} || File.WriteError; - -/// data - just the inner references - must live until pwritev frame completes. -pub fn pwritev(allocator: *Allocator, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const iovecs = try allocator.alloc(os.iovec_const, data.len); - defer allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec_const{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return pwritevPosix(fd, iovecs, offset); - }, - .windows => { - const data_copy = try std.mem.dupe(allocator, []const u8, data); - defer allocator.free(data_copy); - return pwritevWindows(fd, data, offset); - }, - else => @compileError("Unsupported OS"), - } -} - -/// data must outlive the returned frame -pub fn pwritevWindows(fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { - if (data.len == 0) return; - if (data.len == 1) return pwriteWindows(fd, data[0], offset); - - // TODO do these in parallel - var off = offset; - for (data) |buf| { - try pwriteWindows(fd, buf, off); - off += buf.len; - } -} - -pub fn pwriteWindows(fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @frame(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }, - }, - }; - // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined); - global_event_loop.beginOneEvent(); - errdefer global_event_loop.finishOneEvent(); - - errdefer { - _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - switch (windows.kernel32.GetLastError()) { - .IO_PENDING => unreachable, - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.OperationAborted, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .BROKEN_PIPE => return error.BrokenPipe, - else => |err| return windows.unexpectedError(err), - } - } -} - -/// iovecs must live until pwritev frame completes. -pub fn pwritevPosix(fd: fd_t, iovecs: []const os.iovec_const, offset: usize) os.WriteError!void { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .PWriteV = Request.Msg.PWriteV{ - .fd = fd, - .iov = iovecs, - .offset = offset, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.PWriteV.result; -} - -/// iovecs must live until pwritev frame completes. -pub fn writevPosix(fd: fd_t, iovecs: []const os.iovec_const) os.WriteError!void { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .WriteV = Request.Msg.WriteV{ - .fd = fd, - .iov = iovecs, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.WriteV.result; -} - -pub const PReadVError = error{OutOfMemory} || File.ReadError; - -/// data - just the inner references - must live until preadv frame completes. -pub fn preadv(allocator: *Allocator, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { - assert(data.len != 0); - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const iovecs = try allocator.alloc(os.iovec, data.len); - defer allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return preadvPosix(fd, iovecs, offset); - }, - .windows => { - const data_copy = try std.mem.dupe(allocator, []u8, data); - defer allocator.free(data_copy); - return preadvWindows(fd, data_copy, offset); - }, - else => @compileError("Unsupported OS"), - } -} - -/// data must outlive the returned frame -pub fn preadvWindows(fd: fd_t, data: []const []u8, offset: u64) !usize { - assert(data.len != 0); - if (data.len == 1) return preadWindows(fd, data[0], offset); - - // TODO do these in parallel? - var off: usize = 0; - var iov_i: usize = 0; - var inner_off: usize = 0; - while (true) { - const v = data[iov_i]; - const amt_read = try preadWindows(fd, v[inner_off .. v.len - inner_off], offset + off); - off += amt_read; - inner_off += amt_read; - if (inner_off == v.len) { - iov_i += 1; - inner_off = 0; - if (iov_i == data.len) { - return off; - } - } - if (amt_read == 0) return off; // EOF - } -} - -pub fn preadWindows(fd: fd_t, data: []u8, offset: u64) !usize { - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @frame(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }, - }, - }; - // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined) catch undefined; - global_event_loop.beginOneEvent(); - errdefer global_event_loop.finishOneEvent(); - - errdefer { - _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - switch (windows.kernel32.GetLastError()) { - .IO_PENDING => unreachable, - .OPERATION_ABORTED => return error.OperationAborted, - .BROKEN_PIPE => return error.BrokenPipe, - .HANDLE_EOF => return @as(usize, bytes_transferred), - else => |err| return windows.unexpectedError(err), - } - } - return @as(usize, bytes_transferred); -} - -/// iovecs must live until preadv frame completes -pub fn preadvPosix(fd: fd_t, iovecs: []const os.iovec, offset: usize) os.ReadError!usize { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .PReadV = Request.Msg.PReadV{ - .fd = fd, - .iov = iovecs, - .offset = offset, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.PReadV.result; -} - -pub fn openPosix(path: []const u8, flags: u32, mode: File.Mode) File.OpenError!fd_t { - const path_c = try std.os.toPosixPath(path); - - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .Open = Request.Msg.Open{ - .path = path_c[0..path.len], - .flags = flags, - .mode = mode, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.Open.result; -} - -pub fn openRead(path: []const u8) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, .linux, .freebsd, .netbsd, .dragonfly => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC; - return openPosix(path, flags, File.default_mode); - }, - - .windows => return windows.CreateFile( - path, - windows.GENERIC_READ, - windows.FILE_SHARE_READ, - null, - windows.OPEN_EXISTING, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - - else => @compileError("Unsupported OS"), - } -} - -/// Creates if does not exist. Truncates the file if it exists. -/// Uses the default mode. -pub fn openWrite(path: []const u8) File.OpenError!fd_t { - return openWriteMode(path, File.default_mode); -} - -/// Creates if does not exist. Truncates the file if it exists. -pub fn openWriteMode(path: []const u8, mode: File.Mode) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - return openPosix(path, flags, File.default_mode); - }, - .windows => return windows.CreateFile( - path, - windows.GENERIC_WRITE, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.CREATE_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - else => @compileError("Unsupported OS"), - } -} - -/// Creates if does not exist. Does not truncate. -pub fn openReadWrite(path: []const u8, mode: File.Mode) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, .linux, .freebsd, .netbsd, .dragonfly => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC; - return openPosix(path, flags, mode); - }, - - .windows => return windows.CreateFile( - path, - windows.GENERIC_WRITE | windows.GENERIC_READ, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.OPEN_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - - else => @compileError("Unsupported OS"), - } -} - -/// This abstraction helps to close file handles in defer expressions -/// without the possibility of failure and without the use of suspend points. -/// Start a `CloseOperation` before opening a file, so that you can defer -/// `CloseOperation.finish`. -/// If you call `setHandle` then finishing will close the fd; otherwise finishing -/// will deallocate the `CloseOperation`. -pub const CloseOperation = struct { - allocator: *Allocator, - os_data: OsData, - - const OsData = switch (builtin.os) { - .linux, .macosx, .freebsd, .netbsd, .dragonfly => OsDataPosix, - - .windows => struct { - handle: ?fd_t, - }, - - else => @compileError("Unsupported OS"), - }; - - const OsDataPosix = struct { - have_fd: bool, - close_req_node: RequestNode, - }; - - pub fn start(allocator: *Allocator) (error{OutOfMemory}!*CloseOperation) { - const self = try allocator.create(CloseOperation); - self.* = CloseOperation{ - .allocator = allocator, - .os_data = switch (builtin.os) { - .linux, .macosx, .freebsd, .netbsd, .dragonfly => initOsDataPosix(self), - .windows => OsData{ .handle = null }, - else => @compileError("Unsupported OS"), - }, - }; - return self; - } - - fn initOsDataPosix(self: *CloseOperation) OsData { - return OsData{ - .have_fd = false, - .close_req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .Close = Request.Msg.Close{ .fd = undefined }, - }, - .finish = Request.Finish{ .DeallocCloseOperation = self }, - }, - }, - }; - } - - /// Defer this after creating. - pub fn finish(self: *CloseOperation) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - if (self.os_data.have_fd) { - global_event_loop.posixFsRequest(&self.os_data.close_req_node); - } else { - self.allocator.destroy(self); - } - }, - .windows => { - if (self.os_data.handle) |handle| { - os.close(handle); - } - self.allocator.destroy(self); - }, - else => @compileError("Unsupported OS"), - } - } - - pub fn setHandle(self: *CloseOperation, handle: fd_t) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - self.os_data.close_req_node.data.msg.Close.fd = handle; - self.os_data.have_fd = true; - }, - .windows => { - self.os_data.handle = handle; - }, - else => @compileError("Unsupported OS"), - } - } - - /// Undo a `setHandle`. - pub fn clearHandle(self: *CloseOperation) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - self.os_data.have_fd = false; - }, - .windows => { - self.os_data.handle = null; - }, - else => @compileError("Unsupported OS"), - } - } - - pub fn getHandle(self: *CloseOperation) fd_t { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - assert(self.os_data.have_fd); - return self.os_data.close_req_node.data.msg.Close.fd; - }, - .windows => { - return self.os_data.handle.?; - }, - else => @compileError("Unsupported OS"), - } - } -}; - -/// contents must remain alive until writeFile completes. -/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate -pub fn writeFile(allocator: *Allocator, path: []const u8, contents: []const u8) !void { - return writeFileMode(allocator, path, contents, File.default_mode); -} - -/// contents must remain alive until writeFile completes. -pub fn writeFileMode(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => return writeFileModeThread(allocator, path, contents, mode), - .windows => return writeFileWindows(path, contents), - else => @compileError("Unsupported OS"), - } -} - -fn writeFileWindows(path: []const u8, contents: []const u8) !void { - const handle = try windows.CreateFile( - path, - windows.GENERIC_WRITE, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.CREATE_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ); - defer os.close(handle); - - try pwriteWindows(handle, contents, 0); -} - -fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { - const path_with_null = try std.cstr.addNullByte(allocator, path); - defer allocator.free(path_with_null); - - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .WriteFile = Request.Msg.WriteFile{ - .path = path_with_null[0..path.len], - .contents = contents, - .mode = mode, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.WriteFile.result; -} - -/// The frame resumes when the last data has been confirmed written, but before the file handle -/// is closed. -/// Caller owns returned memory. -pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 { - var close_op = try CloseOperation.start(allocator); - defer close_op.finish(); - - const fd = try openRead(file_path); - close_op.setHandle(fd); - - var list = std.ArrayList(u8).init(allocator); - defer list.deinit(); - - while (true) { - try list.ensureCapacity(list.len + mem.page_size); - const buf = list.items[list.len..]; - const buf_array = [_][]u8{buf}; - const amt = try preadv(allocator, fd, &buf_array, list.len); - list.len += amt; - if (list.len > max_size) { - return error.FileTooBig; - } - if (amt < buf.len) { - return list.toOwnedSlice(); - } - } -} - -pub const WatchEventId = enum { - CloseWrite, - Delete, -}; - -fn eqlString(a: []const u16, b: []const u16) bool { - if (a.len != b.len) return false; - if (a.ptr == b.ptr) return true; - return mem.compare(u16, a, b) == .Equal; -} - -fn hashString(s: []const u16) u32 { - return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s))); -} - -pub const WatchEventError = error{ - UserResourceLimitReached, - SystemResources, - AccessDenied, - Unexpected, // TODO remove this possibility -}; - -pub fn Watch(comptime V: type) type { - return struct { - channel: *event.Channel(Event.Error!Event), - os_data: OsData, - allocator: *Allocator, - - const OsData = switch (builtin.os) { - // TODO https://github.com/ziglang/zig/issues/3778 - .macosx, .freebsd, .netbsd, .dragonfly => KqOsData, - .linux => LinuxOsData, - .windows => WindowsOsData, - - else => @compileError("Unsupported OS"), - }; - - const KqOsData = struct { - file_table: FileTable, - table_lock: event.Lock, - - const FileTable = std.StringHashMap(*Put); - const Put = struct { - putter_frame: @Frame(kqPutEvents), - cancelled: bool = false, - value: V, - }; - }; - - const WindowsOsData = struct { - table_lock: event.Lock, - dir_table: DirTable, - all_putters: std.atomic.Queue(Put), - ref_count: std.atomic.Int(usize), - - const Put = struct { - putter: anyframe, - cancelled: bool = false, - }; - - const DirTable = std.StringHashMap(*Dir); - const FileTable = std.HashMap([]const u16, V, hashString, eqlString); - - const Dir = struct { - putter_frame: @Frame(windowsDirReader), - file_table: FileTable, - table_lock: event.Lock, - }; - }; - - const LinuxOsData = struct { - putter_frame: @Frame(linuxEventPutter), - inotify_fd: i32, - wd_table: WdTable, - table_lock: event.Lock, - cancelled: bool = false, - - const WdTable = std.AutoHashMap(i32, Dir); - const FileTable = std.StringHashMap(V); - - const Dir = struct { - dirname: []const u8, - file_table: FileTable, - }; - }; - - const Self = @This(); - - pub const Event = struct { - id: Id, - data: V, - - pub const Id = WatchEventId; - pub const Error = WatchEventError; - }; - - pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self { - const channel = try allocator.create(event.Channel(Event.Error!Event)); - errdefer allocator.destroy(channel); - var buf = try allocator.alloc(Event.Error!Event, event_buf_count); - errdefer allocator.free(buf); - channel.init(buf); - errdefer channel.deinit(); - - const self = try allocator.create(Self); - errdefer allocator.destroy(self); - - switch (builtin.os) { - .linux => { - const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); - errdefer os.close(inotify_fd); - - self.* = Self{ - .allocator = allocator, - .channel = channel, - .os_data = OsData{ - .putter_frame = undefined, - .inotify_fd = inotify_fd, - .wd_table = OsData.WdTable.init(allocator), - .table_lock = event.Lock.init(), - }, - }; - - self.os_data.putter_frame = async self.linuxEventPutter(); - return self; - }, - - .windows => { - self.* = Self{ - .allocator = allocator, - .channel = channel, - .os_data = OsData{ - .table_lock = event.Lock.init(), - .dir_table = OsData.DirTable.init(allocator), - .ref_count = std.atomic.Int(usize).init(1), - .all_putters = std.atomic.Queue(anyframe).init(), - }, - }; - return self; - }, - - .macosx, .freebsd, .netbsd, .dragonfly => { - self.* = Self{ - .allocator = allocator, - .channel = channel, - .os_data = OsData{ - .table_lock = event.Lock.init(), - .file_table = OsData.FileTable.init(allocator), - }, - }; - return self; - }, - else => @compileError("Unsupported OS"), - } - } - - /// All addFile calls and removeFile calls must have completed. - pub fn deinit(self: *Self) void { - switch (builtin.os) { - .macosx, .freebsd, .netbsd, .dragonfly => { - // TODO we need to cancel the frames before destroying the lock - self.os_data.table_lock.deinit(); - var it = self.os_data.file_table.iterator(); - while (it.next()) |entry| { - entry.cancelled = true; - await entry.value.putter; - self.allocator.free(entry.key); - self.allocator.free(entry.value); - } - self.channel.deinit(); - self.allocator.destroy(self.channel.buffer_nodes); - self.allocator.destroy(self); - }, - .linux => { - self.os_data.cancelled = true; - await self.os_data.putter_frame; - self.allocator.destroy(self); - }, - .windows => { - while (self.os_data.all_putters.get()) |putter_node| { - putter_node.cancelled = true; - await putter_node.frame; - } - self.deref(); - }, - else => @compileError("Unsupported OS"), - } - } - - fn ref(self: *Self) void { - _ = self.os_data.ref_count.incr(); - } - - fn deref(self: *Self) void { - if (self.os_data.ref_count.decr() == 1) { - self.os_data.table_lock.deinit(); - var it = self.os_data.dir_table.iterator(); - while (it.next()) |entry| { - self.allocator.free(entry.key); - self.allocator.destroy(entry.value); - } - self.os_data.dir_table.deinit(); - self.channel.deinit(); - self.allocator.destroy(self.channel.buffer_nodes); - self.allocator.destroy(self); - } - } - - pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V { - switch (builtin.os) { - .macosx, .freebsd, .netbsd, .dragonfly => return addFileKEvent(self, file_path, value), - .linux => return addFileLinux(self, file_path, value), - .windows => return addFileWindows(self, file_path, value), - else => @compileError("Unsupported OS"), - } - } - - fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { - const resolved_path = try std.fs.path.resolve(self.allocator, [_][]const u8{file_path}); - var resolved_path_consumed = false; - defer if (!resolved_path_consumed) self.allocator.free(resolved_path); - - var close_op = try CloseOperation.start(self.allocator); - var close_op_consumed = false; - defer if (!close_op_consumed) close_op.finish(); - - const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0; - const mode = 0; - const fd = try openPosix(self.allocator, resolved_path, flags, mode); - close_op.setHandle(fd); - - var put = try self.allocator.create(OsData.Put); - errdefer self.allocator.destroy(put); - put.* = OsData.Put{ - .value = value, - .putter_frame = undefined, - }; - put.putter_frame = async self.kqPutEvents(close_op, put); - close_op_consumed = true; - errdefer { - put.cancelled = true; - await put.putter_frame; - } - - const result = blk: { - const held = self.os_data.table_lock.acquire(); - defer held.release(); - - const gop = try self.os_data.file_table.getOrPut(resolved_path); - if (gop.found_existing) { - const prev_value = gop.kv.value.value; - await gop.kv.value.putter_frame; - gop.kv.value = put; - break :blk prev_value; - } else { - resolved_path_consumed = true; - gop.kv.value = put; - break :blk null; - } - }; - - return result; - } - - fn kqPutEvents(self: *Self, close_op: *CloseOperation, put: *OsData.Put) void { - global_event_loop.beginOneEvent(); - - defer { - close_op.finish(); - global_event_loop.finishOneEvent(); - } - - while (!put.cancelled) { - if (global_event_loop.bsdWaitKev( - @intCast(usize, close_op.getHandle()), - os.EVFILT_VNODE, - os.NOTE_WRITE | os.NOTE_DELETE, - )) |kev| { - // TODO handle EV_ERROR - if (kev.fflags & os.NOTE_DELETE != 0) { - self.channel.put(Self.Event{ - .id = Event.Id.Delete, - .data = put.value, - }); - } else if (kev.fflags & os.NOTE_WRITE != 0) { - self.channel.put(Self.Event{ - .id = Event.Id.CloseWrite, - .data = put.value, - }); - } - } else |err| switch (err) { - error.EventNotFound => unreachable, - error.ProcessNotFound => unreachable, - error.Overflow => unreachable, - error.AccessDenied, error.SystemResources => |casted_err| { - self.channel.put(casted_err); - }, - } - } - } - - fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { - const dirname = std.fs.path.dirname(file_path) orelse "."; - const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname); - var dirname_with_null_consumed = false; - defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null); - - const basename = std.fs.path.basename(file_path); - const basename_with_null = try std.cstr.addNullByte(self.allocator, basename); - var basename_with_null_consumed = false; - defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null); - - const wd = try os.inotify_add_watchC( - self.os_data.inotify_fd, - dirname_with_null.ptr, - os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, - ); - // wd is either a newly created watch or an existing one. - - const held = self.os_data.table_lock.acquire(); - defer held.release(); - - const gop = try self.os_data.wd_table.getOrPut(wd); - if (!gop.found_existing) { - gop.kv.value = OsData.Dir{ - .dirname = dirname_with_null, - .file_table = OsData.FileTable.init(self.allocator), - }; - dirname_with_null_consumed = true; - } - const dir = &gop.kv.value; - - const file_table_gop = try dir.file_table.getOrPut(basename_with_null); - if (file_table_gop.found_existing) { - const prev_value = file_table_gop.kv.value; - file_table_gop.kv.value = value; - return prev_value; - } else { - file_table_gop.kv.value = value; - basename_with_null_consumed = true; - return null; - } - } - - fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { - // TODO we might need to convert dirname and basename to canonical file paths ("short"?) - const dirname = try std.mem.dupe(self.allocator, u8, std.fs.path.dirname(file_path) orelse "."); - var dirname_consumed = false; - defer if (!dirname_consumed) self.allocator.free(dirname); - - const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, dirname); - defer self.allocator.free(dirname_utf16le); - - // TODO https://github.com/ziglang/zig/issues/265 - const basename = std.fs.path.basename(file_path); - const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, basename); - var basename_utf16le_null_consumed = false; - defer if (!basename_utf16le_null_consumed) self.allocator.free(basename_utf16le_null); - const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; - - const dir_handle = try windows.CreateFileW( - dirname_utf16le.ptr, - windows.FILE_LIST_DIRECTORY, - windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, - null, - windows.OPEN_EXISTING, - windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, - null, - ); - var dir_handle_consumed = false; - defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); - - const held = self.os_data.table_lock.acquire(); - defer held.release(); - - const gop = try self.os_data.dir_table.getOrPut(dirname); - if (gop.found_existing) { - const dir = gop.kv.value; - const held_dir_lock = dir.table_lock.acquire(); - defer held_dir_lock.release(); - - const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); - if (file_gop.found_existing) { - const prev_value = file_gop.kv.value; - file_gop.kv.value = value; - return prev_value; - } else { - file_gop.kv.value = value; - basename_utf16le_null_consumed = true; - return null; - } - } else { - errdefer _ = self.os_data.dir_table.remove(dirname); - const dir = try self.allocator.create(OsData.Dir); - errdefer self.allocator.destroy(dir); - - dir.* = OsData.Dir{ - .file_table = OsData.FileTable.init(self.allocator), - .table_lock = event.Lock.init(), - .putter_frame = undefined, - }; - gop.kv.value = dir; - assert((try dir.file_table.put(basename_utf16le_no_null, value)) == null); - basename_utf16le_null_consumed = true; - - dir.putter_frame = async self.windowsDirReader(dir_handle, dir); - dir_handle_consumed = true; - - dirname_consumed = true; - - return null; - } - } - - fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { - self.ref(); - defer self.deref(); - - defer os.close(dir_handle); - - var putter_node = std.atomic.Queue(anyframe).Node{ - .data = .{ .putter = @frame() }, - .prev = null, - .next = null, - }; - self.os_data.all_putters.put(&putter_node); - defer _ = self.os_data.all_putters.remove(&putter_node); - - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @frame(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = 0, - .OffsetHigh = 0, - .hEvent = null, - }, - }, - }; - var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; - - // TODO handle this error not in the channel but in the setup - _ = windows.CreateIoCompletionPort( - dir_handle, - global_event_loop.os_data.io_port, - undefined, - undefined, - ) catch |err| { - self.channel.put(err); - return; - }; - - while (!putter_node.data.cancelled) { - { - // TODO only 1 beginOneEvent for the whole function - global_event_loop.beginOneEvent(); - errdefer global_event_loop.finishOneEvent(); - errdefer { - _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.ReadDirectoryChangesW( - dir_handle, - &event_buf, - @intCast(windows.DWORD, event_buf.len), - windows.FALSE, // watch subtree - windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | - windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | - windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | - windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, - null, // number of bytes transferred (unused for async) - &resume_node.base.overlapped, - null, // completion routine - unused because we use IOCP - ); - } - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - const err = switch (windows.kernel32.GetLastError()) { - else => |err| windows.unexpectedError(err), - }; - self.channel.put(err); - } else { - // can't use @bytesToSlice because of the special variable length name field - var ptr = event_buf[0..].ptr; - const end_ptr = ptr + bytes_transferred; - var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; - while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { - ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); - const emit = switch (ev.Action) { - windows.FILE_ACTION_REMOVED => WatchEventId.Delete, - windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, - else => null, - }; - if (emit) |id| { - const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; - const user_value = blk: { - const held = dir.table_lock.acquire(); - defer held.release(); - - if (dir.file_table.get(basename_utf16le)) |entry| { - break :blk entry.value; - } else { - break :blk null; - } - }; - if (user_value) |v| { - self.channel.put(Event{ - .id = id, - .data = v, - }); - } - } - if (ev.NextEntryOffset == 0) break; - } - } - } - } - - pub fn removeFile(self: *Self, file_path: []const u8) ?V { - @panic("TODO"); - } - - fn linuxEventPutter(self: *Self) void { - global_event_loop.beginOneEvent(); - - defer { - self.os_data.table_lock.deinit(); - var wd_it = self.os_data.wd_table.iterator(); - while (wd_it.next()) |wd_entry| { - var file_it = wd_entry.value.file_table.iterator(); - while (file_it.next()) |file_entry| { - self.allocator.free(file_entry.key); - } - self.allocator.free(wd_entry.value.dirname); - wd_entry.value.file_table.deinit(); - } - self.os_data.wd_table.deinit(); - global_event_loop.finishOneEvent(); - os.close(self.os_data.inotify_fd); - self.channel.deinit(); - self.allocator.free(self.channel.buffer_nodes); - } - - var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; - - while (!self.os_data.cancelled) { - const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len); - const errno = os.linux.getErrno(rc); - switch (errno) { - 0 => { - // can't use @bytesToSlice because of the special variable length name field - var ptr = event_buf[0..].ptr; - const end_ptr = ptr + event_buf.len; - var ev: *os.linux.inotify_event = undefined; - while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) { - ev = @ptrCast(*os.linux.inotify_event, ptr); - if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { - const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); - // `ev.len` counts all bytes in `ev.name` including terminating null byte. - const basename_with_null = basename_ptr[0..ev.len]; - const user_value = blk: { - const held = self.os_data.table_lock.acquire(); - defer held.release(); - - const dir = &self.os_data.wd_table.get(ev.wd).?.value; - if (dir.file_table.get(basename_with_null)) |entry| { - break :blk entry.value; - } else { - break :blk null; - } - }; - if (user_value) |v| { - self.channel.put(Event{ - .id = WatchEventId.CloseWrite, - .data = v, - }); - } - } - - ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len); - } - }, - os.linux.EINTR => continue, - os.linux.EINVAL => unreachable, - os.linux.EFAULT => unreachable, - os.linux.EAGAIN => { - global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN | os.EPOLLONESHOT); - }, - else => unreachable, - } - } - } - }; -} - -const test_tmp_dir = "std_event_fs_test"; - -test "write a file, watch it, write it again" { - // TODO provide a way to run tests in evented I/O mode - if (!std.io.is_async) return error.SkipZigTest; - - const allocator = std.heap.page_allocator; - - // TODO move this into event loop too - try os.makePath(allocator, test_tmp_dir); - defer os.deleteTree(test_tmp_dir) catch {}; - - return testFsWatch(&allocator); -} - -fn testFsWatch(allocator: *Allocator) !void { - const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" }); - defer allocator.free(file_path); - - const contents = - \\line 1 - \\line 2 - ; - const line2_offset = 7; - - // first just write then read the file - try writeFile(allocator, file_path, contents); - - const read_contents = try readFile(allocator, file_path, 1024 * 1024); - testing.expectEqualSlices(u8, contents, read_contents); - - // now watch the file - var watch = try Watch(void).init(allocator, 0); - defer watch.deinit(); - - testing.expect((try watch.addFile(file_path, {})) == null); - - const ev = watch.channel.get(); - var ev_consumed = false; - defer if (!ev_consumed) await ev; - - // overwrite line 2 - const fd = try await openReadWrite(file_path, File.default_mode); - { - defer os.close(fd); - - try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset); - } - - ev_consumed = true; - switch ((try await ev).id) { - WatchEventId.CloseWrite => {}, - WatchEventId.Delete => @panic("wrong event"), - } - const contents_updated = try readFile(allocator, file_path, 1024 * 1024); - testing.expectEqualSlices(u8, - \\line 1 - \\lorem ipsum - , contents_updated); - - // TODO test deleting the file and then re-adding it. we should get events for both -} - -pub const OutStream = struct { - fd: fd_t, - stream: Stream, - allocator: *Allocator, - offset: usize, - - pub const Error = File.WriteError; - pub const Stream = event.io.OutStream(Error); - - pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) OutStream { - return OutStream{ - .fd = fd, - .offset = offset, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { - const self = @fieldParentPtr(OutStream, "stream", out_stream); - const offset = self.offset; - self.offset += bytes.len; - return pwritev(self.allocator, self.fd, [_][]const u8{bytes}, offset); - } -}; - -pub const InStream = struct { - fd: fd_t, - stream: Stream, - allocator: *Allocator, - offset: usize, - - pub const Error = PReadVError; // TODO make this not have OutOfMemory - pub const Stream = event.io.InStream(Error); - - pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) InStream { - return InStream{ - .fd = fd, - .offset = offset, - .stream = Stream{ .readFn = readFn }, - }; - } - - fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { - const self = @fieldParentPtr(InStream, "stream", in_stream); - const amt = try preadv(self.allocator, self.fd, [_][]u8{bytes}, self.offset); - self.offset += amt; - return amt; - } -}; diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig @@ -6,7 +6,6 @@ const testing = std.testing; const mem = std.mem; const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; -const fs = std.event.fs; const os = std.os; const windows = os.windows; const maxInt = std.math.maxInt; @@ -174,21 +173,19 @@ pub const Loop = struct { fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { .linux => { - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ - .prev = undefined, - .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + self.os_data.fs_end_request = Request.Node{ + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; errdefer { - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); } for (self.eventfd_resume_nodes) |*eventfd_node| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -207,10 +204,10 @@ pub const Loop = struct { } self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); - errdefer os.close(self.os_data.epollfd); + errdefer noasync os.close(self.os_data.epollfd); self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); - errdefer os.close(self.os_data.final_eventfd); + errdefer noasync os.close(self.os_data.final_eventfd); self.os_data.final_eventfd_event = os.epoll_event{ .events = os.EPOLLIN, @@ -237,7 +234,7 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); @@ -249,20 +246,20 @@ pub const Loop = struct { }, .macosx, .freebsd, .netbsd, .dragonfly => { self.os_data.kqfd = try os.kqueue(); - errdefer os.close(self.os_data.kqfd); + errdefer noasync os.close(self.os_data.kqfd); self.os_data.fs_kqfd = try os.kqueue(); - errdefer os.close(self.os_data.fs_kqfd); + errdefer noasync os.close(self.os_data.fs_kqfd); - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); // we need another thread for the file system because Darwin does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ + self.os_data.fs_end_request = Request.Node{ .prev = undefined, .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; @@ -407,14 +404,14 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { .linux => { - os.close(self.os_data.final_eventfd); - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); - os.close(self.os_data.epollfd); + noasync os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); + noasync os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, .macosx, .freebsd, .netbsd, .dragonfly => { - os.close(self.os_data.kqfd); - os.close(self.os_data.fs_kqfd); + noasync os.close(self.os_data.kqfd); + noasync os.close(self.os_data.fs_kqfd); }, .windows => { windows.CloseHandle(self.os_data.io_port); @@ -711,6 +708,190 @@ pub const Loop = struct { } } + /// Performs an async `os.open` using a separate thread. + pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .open = .{ + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.open.result; + } + + /// Performs an async `os.opent` using a separate thread. + pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .openat = .{ + .fd = fd, + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.openat.result; + } + + /// Performs an async `os.close` using a separate thread. + pub fn close(self: *Loop, fd: os.fd_t) void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ .close = .{ .fd = fd } }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + } + + /// Performs an async `os.read` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .read = .{ + .fd = fd, + .buf = buf, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.read.result; + } + + /// Performs an async `os.readv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .readv = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.readv.result; + } + + /// Performs an async `os.preadv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .preadv = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.preadv.result; + } + + /// Performs an async `os.write` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .write = .{ + .fd = fd, + .bytes = bytes, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.write.result; + } + + /// Performs an async `os.writev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .writev = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.writev.result; + } + + /// Performs an async `os.pwritev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pwritev = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pwritev.result; + } + fn workerRun(self: *Loop) void { while (true) { while (true) { @@ -804,7 +985,7 @@ pub const Loop = struct { } } - fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsRequest(self: *Loop, request_node: *Request.Node) void { self.beginOneEvent(); // finished in posixFsRun after processing the msg self.os_data.fs_queue.put(request_node); switch (builtin.os) { @@ -826,7 +1007,7 @@ pub const Loop = struct { } } - fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsCancel(self: *Loop, request_node: *Request.Node) void { if (self.os_data.fs_queue.remove(request_node)) { self.finishOneEvent(); } @@ -841,37 +1022,32 @@ pub const Loop = struct { } while (self.os_data.fs_queue.get()) |node| { switch (node.data.msg) { - .End => return, - .WriteV => |*msg| { + .end => return, + .read => |*msg| { + msg.result = noasync os.read(msg.fd, msg.buf); + }, + .write => |*msg| { + msg.result = noasync os.write(msg.fd, msg.bytes); + }, + .writev => |*msg| { msg.result = noasync os.writev(msg.fd, msg.iov); }, - .PWriteV => |*msg| { + .pwritev => |*msg| { msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, - .PReadV => |*msg| { + .preadv => |*msg| { msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, - .Open => |*msg| { - msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); + .open => |*msg| { + msg.result = noasync os.openC(msg.path, msg.flags, msg.mode); }, - .Close => |*msg| noasync os.close(msg.fd), - .WriteFile => |*msg| blk: { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | - os.O_CLOEXEC | os.O_TRUNC; - const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { - msg.result = err; - break :blk; - }; - defer noasync os.close(fd); - msg.result = noasync os.write(fd, msg.contents); + .openat => |*msg| { + msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode); }, + .close => |*msg| noasync os.close(msg.fd), } switch (node.data.finish) { .TickNode => |*tick_node| self.onNextTick(tick_node), - .DeallocCloseOperation => |close_op| { - self.allocator.destroy(close_op); - }, .NoAction => {}, } self.finishOneEvent(); @@ -911,8 +1087,8 @@ pub const Loop = struct { fs_kevent_wait: os.Kevent, fs_thread: *Thread, fs_kqfd: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, }; const LinuxOsData = struct { @@ -921,8 +1097,99 @@ pub const Loop = struct { final_eventfd_event: os.linux.epoll_event, fs_thread: *Thread, fs_queue_item: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, + }; + + pub const Request = struct { + msg: Msg, + finish: Finish, + + pub const Node = std.atomic.Queue(Request).Node; + + pub const Finish = union(enum) { + TickNode: Loop.NextTickNode, + NoAction, + }; + + pub const Msg = union(enum) { + read: Read, + write: Write, + writev: WriteV, + pwritev: PWriteV, + preadv: PReadV, + open: Open, + openat: OpenAt, + close: Close, + + /// special - means the fs thread should exit + end, + + pub const Read = struct { + fd: os.fd_t, + buf: []u8, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Write = struct { + fd: os.fd_t, + bytes: []const u8, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const WriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PWriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + offset: usize, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PReadV = struct { + fd: os.fd_t, + iov: []const os.iovec, + offset: usize, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Open = struct { + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const OpenAt = struct { + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const Close = struct { + fd: os.fd_t, + }; + }; }; }; diff --git a/lib/std/fs.zig b/lib/std/fs.zig @@ -23,6 +23,8 @@ pub const realpathW = os.realpathW; pub const getAppDataDir = @import("fs/get_app_data_dir.zig").getAppDataDir; pub const GetAppDataDirError = @import("fs/get_app_data_dir.zig").GetAppDataDirError; +pub const Watch = @import("fs/watch.zig").Watch; + /// This represents the maximum size of a UTF-8 encoded file path. /// All file system operations which return a path are guaranteed to /// fit into a UTF-8 encoded array of this length. @@ -43,6 +45,13 @@ pub const base64_encoder = base64.Base64Encoder.init( base64.standard_pad_char, ); +/// Whether or not async file system syscalls need a dedicated thread because the operating +/// system does not support non-blocking I/O on the file system. +pub const need_async_thread = std.io.is_async and switch (builtin.os) { + .windows, .other => false, + else => true, +}; + /// TODO remove the allocator requirement from this API pub fn atomicSymLink(allocator: *Allocator, existing_path: []const u8, new_path: []const u8) !void { if (symLink(existing_path, new_path)) { @@ -688,7 +697,11 @@ pub const Dir = struct { } pub fn close(self: *Dir) void { - os.close(self.fd); + if (need_async_thread) { + std.event.Loop.instance.?.close(self.fd); + } else { + os.close(self.fd); + } self.* = undefined; } @@ -718,8 +731,11 @@ pub const Dir = struct { @as(u32, os.O_WRONLY) else @as(u32, os.O_RDONLY); - const fd = try os.openatC(self.fd, sub_path, os_flags, 0); - return File{ .handle = fd }; + const fd = if (need_async_thread) + try std.event.Loop.instance.?.openatZ(self.fd, sub_path, os_flags, 0) + else + try os.openatC(self.fd, sub_path, os_flags, 0); + return File{ .handle = fd, .io_mode = .blocking }; } /// Same as `openFile` but Windows-only and the path parameter is @@ -756,8 +772,11 @@ pub const Dir = struct { (if (flags.truncate) @as(u32, os.O_TRUNC) else 0) | (if (flags.read) @as(u32, os.O_RDWR) else os.O_WRONLY) | (if (flags.exclusive) @as(u32, os.O_EXCL) else 0); - const fd = try os.openatC(self.fd, sub_path_c, os_flags, flags.mode); - return File{ .handle = fd }; + const fd = if (need_async_thread) + try std.event.Loop.instance.?.openatZ(self.fd, sub_path_c, os_flags, flags.mode) + else + try os.openatC(self.fd, sub_path_c, os_flags, flags.mode); + return File{ .handle = fd, .io_mode = .blocking }; } /// Same as `createFile` but Windows-only and the path parameter is @@ -919,7 +938,12 @@ pub const Dir = struct { } fn openDirFlagsC(self: Dir, sub_path_c: [*:0]const u8, flags: u32) OpenError!Dir { - const fd = os.openatC(self.fd, sub_path_c, flags | os.O_DIRECTORY, 0) catch |err| switch (err) { + const os_flags = flags | os.O_DIRECTORY; + const result = if (need_async_thread) + std.event.Loop.instance.?.openatZ(self.fd, sub_path_c, os_flags, 0) + else + os.openatC(self.fd, sub_path_c, os_flags, 0); + const fd = result catch |err| switch (err) { error.FileTooBig => unreachable, // can't happen for directories error.IsDir => unreachable, // we're providing O_DIRECTORY error.NoSpaceLeft => unreachable, // not providing O_CREAT @@ -1588,4 +1612,5 @@ test "" { _ = @import("fs/path.zig"); _ = @import("fs/file.zig"); _ = @import("fs/get_app_data_dir.zig"); + _ = @import("fs/watch.zig"); } diff --git a/lib/std/fs/file.zig b/lib/std/fs/file.zig @@ -8,18 +8,29 @@ const assert = std.debug.assert; const windows = os.windows; const Os = builtin.Os; const maxInt = std.math.maxInt; +const need_async_thread = std.fs.need_async_thread; pub const File = struct { /// The OS-specific file descriptor or file handle. handle: os.fd_t, - pub const Mode = switch (builtin.os) { - Os.windows => void, - else => u32, - }; + /// On some systems, such as Linux, file system file descriptors are incapable of non-blocking I/O. + /// This forces us to perform asynchronous I/O on a dedicated thread, to achieve non-blocking + /// file-system I/O. To do this, `File` must be aware of whether it is a file system file descriptor, + /// or, more specifically, whether the I/O is blocking. + io_mode: io.Mode, + + /// Even when std.io.mode is async, it is still sometimes desirable to perform blocking I/O, although + /// not by default. For example, when printing a stack trace to stderr. + async_block_allowed: @TypeOf(async_block_allowed_no) = async_block_allowed_no, + + pub const async_block_allowed_yes = if (io.is_async) true else {}; + pub const async_block_allowed_no = if (io.is_async) false else {}; + + pub const Mode = os.mode_t; pub const default_mode = switch (builtin.os) { - Os.windows => {}, + .windows => 0, else => 0o666, }; @@ -49,87 +60,27 @@ pub const File = struct { mode: Mode = default_mode, }; - /// Deprecated; call `std.fs.Dir.openFile` directly. - pub fn openRead(path: []const u8) OpenError!File { - return std.fs.cwd().openFile(path, .{}); - } - - /// Deprecated; call `std.fs.Dir.openFileC` directly. - pub fn openReadC(path_c: [*:0]const u8) OpenError!File { - return std.fs.cwd().openFileC(path_c, .{}); - } - - /// Deprecated; call `std.fs.Dir.openFileW` directly. - pub fn openReadW(path_w: [*:0]const u16) OpenError!File { - return std.fs.cwd().openFileW(path_w, .{}); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWrite(path: []const u8) OpenError!File { - return std.fs.cwd().createFile(path, .{}); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWriteMode(path: []const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFile(path, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFileC` directly. - pub fn openWriteModeC(path_c: [*:0]const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileC(path_c, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFileW` directly. - pub fn openWriteModeW(path_w: [*:0]const u16, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileW(path_w, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWriteNoClobber(path: []const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFile(path, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - /// Deprecated; call `std.fs.Dir.createFileC` directly. - pub fn openWriteNoClobberC(path_c: [*:0]const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileC(path_c, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - /// Deprecated; call `std.fs.Dir.createFileW` directly. - pub fn openWriteNoClobberW(path_w: [*:0]const u16, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileW(path_w, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - pub fn openHandle(handle: os.fd_t) File { - return File{ .handle = handle }; - } - /// Test for the existence of `path`. /// `path` is UTF8-encoded. /// In general it is recommended to avoid this function. For example, /// instead of testing if a file exists and then opening it, just /// open it and handle the error for file not found. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn access(path: []const u8) !void { return os.access(path, os.F_OK); } /// Same as `access` except the parameter is null-terminated. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn accessC(path: [*:0]const u8) !void { return os.accessC(path, os.F_OK); } /// Same as `access` except the parameter is null-terminated UTF16LE-encoded. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn accessW(path: [*:0]const u16) !void { return os.accessW(path, os.F_OK); } @@ -137,7 +88,11 @@ pub const File = struct { /// Upon success, the stream is in an uninitialized state. To continue using it, /// you must use the open() function. pub fn close(self: File) void { - return os.close(self.handle); + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + std.event.Loop.instance.?.close(self.handle); + } else { + return os.close(self.handle); + } } /// Test whether the file refers to a terminal. @@ -167,26 +122,31 @@ pub const File = struct { pub const SeekError = os.SeekError; /// Repositions read/write file offset relative to the current offset. + /// TODO: integrate with async I/O pub fn seekBy(self: File, offset: i64) SeekError!void { return os.lseek_CUR(self.handle, offset); } /// Repositions read/write file offset relative to the end. + /// TODO: integrate with async I/O pub fn seekFromEnd(self: File, offset: i64) SeekError!void { return os.lseek_END(self.handle, offset); } /// Repositions read/write file offset relative to the beginning. + /// TODO: integrate with async I/O pub fn seekTo(self: File, offset: u64) SeekError!void { return os.lseek_SET(self.handle, offset); } pub const GetPosError = os.SeekError || os.FStatError; + /// TODO: integrate with async I/O pub fn getPos(self: File) GetPosError!u64 { return os.lseek_CUR_get(self.handle); } + /// TODO: integrate with async I/O pub fn getEndPos(self: File) GetPosError!u64 { if (builtin.os == .windows) { return windows.GetFileSizeEx(self.handle); @@ -196,6 +156,7 @@ pub const File = struct { pub const ModeError = os.FStatError; + /// TODO: integrate with async I/O pub fn mode(self: File) ModeError!Mode { if (builtin.os == .windows) { return {}; @@ -219,6 +180,7 @@ pub const File = struct { pub const StatError = os.FStatError; + /// TODO: integrate with async I/O pub fn stat(self: File) StatError!Stat { if (builtin.os == .windows) { var io_status_block: windows.IO_STATUS_BLOCK = undefined; @@ -259,6 +221,7 @@ pub const File = struct { /// and therefore this function cannot guarantee any precision will be stored. /// Further, the maximum value is limited by the system ABI. When a value is provided /// that exceeds this range, the value is clamped to the maximum. + /// TODO: integrate with async I/O pub fn updateTimes( self: File, /// access timestamp in nanoseconds @@ -287,21 +250,61 @@ pub const File = struct { pub const ReadError = os.ReadError; pub fn read(self: File, buffer: []u8) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.read(self.handle, buffer); + } return os.read(self.handle, buffer); } + pub fn pread(self: File, buffer: []u8, offset: u64) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pread(self.handle, buffer); + } + return os.pread(self.handle, buffer, offset); + } + + pub fn readv(self: File, iovecs: []const os.iovec) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.readv(self.handle, iovecs); + } + return os.readv(self.handle, iovecs); + } + + pub fn preadv(self: File, iovecs: []const os.iovec, offset: u64) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.preadv(self.handle, iovecs, offset); + } + return os.preadv(self.handle, iovecs, offset); + } + pub const WriteError = os.WriteError; pub fn write(self: File, bytes: []const u8) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.write(self.handle, bytes); + } return os.write(self.handle, bytes); } - pub fn writev_iovec(self: File, iovecs: []const os.iovec_const) WriteError!void { - if (std.event.Loop.instance) |loop| { - return std.event.fs.writevPosix(loop, self.handle, iovecs); - } else { - return os.writev(self.handle, iovecs); + pub fn pwrite(self: File, bytes: []const u8, offset: u64) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pwrite(self.handle, bytes, offset); + } + return os.pwrite(self.handle, bytes, offset); + } + + pub fn writev(self: File, iovecs: []const os.iovec_const) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.writev(self.handle, iovecs); + } + return os.writev(self.handle, iovecs); + } + + pub fn pwritev(self: File, iovecs: []const os.iovec_const, offset: usize) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pwritev(self.handle, iovecs); } + return os.pwritev(self.handle, iovecs); } pub fn inStream(file: File) InStream { diff --git a/lib/std/fs/watch.zig b/lib/std/fs/watch.zig @@ -0,0 +1,673 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const event = std.event; +const assert = std.debug.assert; +const testing = std.testing; +const os = std.os; +const mem = std.mem; +const windows = os.windows; +const Loop = event.Loop; +const fd_t = os.fd_t; +const File = std.fs.File; +const Allocator = mem.Allocator; + +const WatchEventId = enum { + CloseWrite, + Delete, +}; + +fn eqlString(a: []const u16, b: []const u16) bool { + if (a.len != b.len) return false; + if (a.ptr == b.ptr) return true; + return mem.compare(u16, a, b) == .Equal; +} + +fn hashString(s: []const u16) u32 { + return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s))); +} + +const WatchEventError = error{ + UserResourceLimitReached, + SystemResources, + AccessDenied, + Unexpected, // TODO remove this possibility +}; + +pub fn Watch(comptime V: type) type { + return struct { + channel: *event.Channel(Event.Error!Event), + os_data: OsData, + allocator: *Allocator, + + const OsData = switch (builtin.os) { + // TODO https://github.com/ziglang/zig/issues/3778 + .macosx, .freebsd, .netbsd, .dragonfly => KqOsData, + .linux => LinuxOsData, + .windows => WindowsOsData, + + else => @compileError("Unsupported OS"), + }; + + const KqOsData = struct { + file_table: FileTable, + table_lock: event.Lock, + + const FileTable = std.StringHashMap(*Put); + const Put = struct { + putter_frame: @Frame(kqPutEvents), + cancelled: bool = false, + value: V, + }; + }; + + const WindowsOsData = struct { + table_lock: event.Lock, + dir_table: DirTable, + all_putters: std.atomic.Queue(Put), + ref_count: std.atomic.Int(usize), + + const Put = struct { + putter: anyframe, + cancelled: bool = false, + }; + + const DirTable = std.StringHashMap(*Dir); + const FileTable = std.HashMap([]const u16, V, hashString, eqlString); + + const Dir = struct { + putter_frame: @Frame(windowsDirReader), + file_table: FileTable, + table_lock: event.Lock, + }; + }; + + const LinuxOsData = struct { + putter_frame: @Frame(linuxEventPutter), + inotify_fd: i32, + wd_table: WdTable, + table_lock: event.Lock, + cancelled: bool = false, + + const WdTable = std.AutoHashMap(i32, Dir); + const FileTable = std.StringHashMap(V); + + const Dir = struct { + dirname: []const u8, + file_table: FileTable, + }; + }; + + const Self = @This(); + + pub const Event = struct { + id: Id, + data: V, + + pub const Id = WatchEventId; + pub const Error = WatchEventError; + }; + + pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self { + const channel = try allocator.create(event.Channel(Event.Error!Event)); + errdefer allocator.destroy(channel); + var buf = try allocator.alloc(Event.Error!Event, event_buf_count); + errdefer allocator.free(buf); + channel.init(buf); + errdefer channel.deinit(); + + const self = try allocator.create(Self); + errdefer allocator.destroy(self); + + switch (builtin.os) { + .linux => { + const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); + errdefer os.close(inotify_fd); + + self.* = Self{ + .allocator = allocator, + .channel = channel, + .os_data = OsData{ + .putter_frame = undefined, + .inotify_fd = inotify_fd, + .wd_table = OsData.WdTable.init(allocator), + .table_lock = event.Lock.init(), + }, + }; + + self.os_data.putter_frame = async self.linuxEventPutter(); + return self; + }, + + .windows => { + self.* = Self{ + .allocator = allocator, + .channel = channel, + .os_data = OsData{ + .table_lock = event.Lock.init(), + .dir_table = OsData.DirTable.init(allocator), + .ref_count = std.atomic.Int(usize).init(1), + .all_putters = std.atomic.Queue(anyframe).init(), + }, + }; + return self; + }, + + .macosx, .freebsd, .netbsd, .dragonfly => { + self.* = Self{ + .allocator = allocator, + .channel = channel, + .os_data = OsData{ + .table_lock = event.Lock.init(), + .file_table = OsData.FileTable.init(allocator), + }, + }; + return self; + }, + else => @compileError("Unsupported OS"), + } + } + + /// All addFile calls and removeFile calls must have completed. + pub fn deinit(self: *Self) void { + switch (builtin.os) { + .macosx, .freebsd, .netbsd, .dragonfly => { + // TODO we need to cancel the frames before destroying the lock + self.os_data.table_lock.deinit(); + var it = self.os_data.file_table.iterator(); + while (it.next()) |entry| { + entry.cancelled = true; + await entry.value.putter; + self.allocator.free(entry.key); + self.allocator.free(entry.value); + } + self.channel.deinit(); + self.allocator.destroy(self.channel.buffer_nodes); + self.allocator.destroy(self); + }, + .linux => { + self.os_data.cancelled = true; + await self.os_data.putter_frame; + self.allocator.destroy(self); + }, + .windows => { + while (self.os_data.all_putters.get()) |putter_node| { + putter_node.cancelled = true; + await putter_node.frame; + } + self.deref(); + }, + else => @compileError("Unsupported OS"), + } + } + + fn ref(self: *Self) void { + _ = self.os_data.ref_count.incr(); + } + + fn deref(self: *Self) void { + if (self.os_data.ref_count.decr() == 1) { + self.os_data.table_lock.deinit(); + var it = self.os_data.dir_table.iterator(); + while (it.next()) |entry| { + self.allocator.free(entry.key); + self.allocator.destroy(entry.value); + } + self.os_data.dir_table.deinit(); + self.channel.deinit(); + self.allocator.destroy(self.channel.buffer_nodes); + self.allocator.destroy(self); + } + } + + pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V { + switch (builtin.os) { + .macosx, .freebsd, .netbsd, .dragonfly => return addFileKEvent(self, file_path, value), + .linux => return addFileLinux(self, file_path, value), + .windows => return addFileWindows(self, file_path, value), + else => @compileError("Unsupported OS"), + } + } + + fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { + const resolved_path = try std.fs.path.resolve(self.allocator, [_][]const u8{file_path}); + var resolved_path_consumed = false; + defer if (!resolved_path_consumed) self.allocator.free(resolved_path); + + var close_op = try CloseOperation.start(self.allocator); + var close_op_consumed = false; + defer if (!close_op_consumed) close_op.finish(); + + const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0; + const mode = 0; + const fd = try openPosix(self.allocator, resolved_path, flags, mode); + close_op.setHandle(fd); + + var put = try self.allocator.create(OsData.Put); + errdefer self.allocator.destroy(put); + put.* = OsData.Put{ + .value = value, + .putter_frame = undefined, + }; + put.putter_frame = async self.kqPutEvents(close_op, put); + close_op_consumed = true; + errdefer { + put.cancelled = true; + await put.putter_frame; + } + + const result = blk: { + const held = self.os_data.table_lock.acquire(); + defer held.release(); + + const gop = try self.os_data.file_table.getOrPut(resolved_path); + if (gop.found_existing) { + const prev_value = gop.kv.value.value; + await gop.kv.value.putter_frame; + gop.kv.value = put; + break :blk prev_value; + } else { + resolved_path_consumed = true; + gop.kv.value = put; + break :blk null; + } + }; + + return result; + } + + fn kqPutEvents(self: *Self, close_op: *CloseOperation, put: *OsData.Put) void { + global_event_loop.beginOneEvent(); + + defer { + close_op.finish(); + global_event_loop.finishOneEvent(); + } + + while (!put.cancelled) { + if (global_event_loop.bsdWaitKev( + @intCast(usize, close_op.getHandle()), + os.EVFILT_VNODE, + os.NOTE_WRITE | os.NOTE_DELETE, + )) |kev| { + // TODO handle EV_ERROR + if (kev.fflags & os.NOTE_DELETE != 0) { + self.channel.put(Self.Event{ + .id = Event.Id.Delete, + .data = put.value, + }); + } else if (kev.fflags & os.NOTE_WRITE != 0) { + self.channel.put(Self.Event{ + .id = Event.Id.CloseWrite, + .data = put.value, + }); + } + } else |err| switch (err) { + error.EventNotFound => unreachable, + error.ProcessNotFound => unreachable, + error.Overflow => unreachable, + error.AccessDenied, error.SystemResources => |casted_err| { + self.channel.put(casted_err); + }, + } + } + } + + fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { + const dirname = std.fs.path.dirname(file_path) orelse "."; + const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname); + var dirname_with_null_consumed = false; + defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null); + + const basename = std.fs.path.basename(file_path); + const basename_with_null = try std.cstr.addNullByte(self.allocator, basename); + var basename_with_null_consumed = false; + defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null); + + const wd = try os.inotify_add_watchC( + self.os_data.inotify_fd, + dirname_with_null.ptr, + os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, + ); + // wd is either a newly created watch or an existing one. + + const held = self.os_data.table_lock.acquire(); + defer held.release(); + + const gop = try self.os_data.wd_table.getOrPut(wd); + if (!gop.found_existing) { + gop.kv.value = OsData.Dir{ + .dirname = dirname_with_null, + .file_table = OsData.FileTable.init(self.allocator), + }; + dirname_with_null_consumed = true; + } + const dir = &gop.kv.value; + + const file_table_gop = try dir.file_table.getOrPut(basename_with_null); + if (file_table_gop.found_existing) { + const prev_value = file_table_gop.kv.value; + file_table_gop.kv.value = value; + return prev_value; + } else { + file_table_gop.kv.value = value; + basename_with_null_consumed = true; + return null; + } + } + + fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { + // TODO we might need to convert dirname and basename to canonical file paths ("short"?) + const dirname = try std.mem.dupe(self.allocator, u8, std.fs.path.dirname(file_path) orelse "."); + var dirname_consumed = false; + defer if (!dirname_consumed) self.allocator.free(dirname); + + const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, dirname); + defer self.allocator.free(dirname_utf16le); + + // TODO https://github.com/ziglang/zig/issues/265 + const basename = std.fs.path.basename(file_path); + const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, basename); + var basename_utf16le_null_consumed = false; + defer if (!basename_utf16le_null_consumed) self.allocator.free(basename_utf16le_null); + const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; + + const dir_handle = try windows.CreateFileW( + dirname_utf16le.ptr, + windows.FILE_LIST_DIRECTORY, + windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, + null, + windows.OPEN_EXISTING, + windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, + null, + ); + var dir_handle_consumed = false; + defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); + + const held = self.os_data.table_lock.acquire(); + defer held.release(); + + const gop = try self.os_data.dir_table.getOrPut(dirname); + if (gop.found_existing) { + const dir = gop.kv.value; + const held_dir_lock = dir.table_lock.acquire(); + defer held_dir_lock.release(); + + const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); + if (file_gop.found_existing) { + const prev_value = file_gop.kv.value; + file_gop.kv.value = value; + return prev_value; + } else { + file_gop.kv.value = value; + basename_utf16le_null_consumed = true; + return null; + } + } else { + errdefer _ = self.os_data.dir_table.remove(dirname); + const dir = try self.allocator.create(OsData.Dir); + errdefer self.allocator.destroy(dir); + + dir.* = OsData.Dir{ + .file_table = OsData.FileTable.init(self.allocator), + .table_lock = event.Lock.init(), + .putter_frame = undefined, + }; + gop.kv.value = dir; + assert((try dir.file_table.put(basename_utf16le_no_null, value)) == null); + basename_utf16le_null_consumed = true; + + dir.putter_frame = async self.windowsDirReader(dir_handle, dir); + dir_handle_consumed = true; + + dirname_consumed = true; + + return null; + } + } + + fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { + self.ref(); + defer self.deref(); + + defer os.close(dir_handle); + + var putter_node = std.atomic.Queue(anyframe).Node{ + .data = .{ .putter = @frame() }, + .prev = null, + .next = null, + }; + self.os_data.all_putters.put(&putter_node); + defer _ = self.os_data.all_putters.remove(&putter_node); + + var resume_node = Loop.ResumeNode.Basic{ + .base = Loop.ResumeNode{ + .id = Loop.ResumeNode.Id.Basic, + .handle = @frame(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = 0, + .OffsetHigh = 0, + .hEvent = null, + }, + }, + }; + var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; + + // TODO handle this error not in the channel but in the setup + _ = windows.CreateIoCompletionPort( + dir_handle, + global_event_loop.os_data.io_port, + undefined, + undefined, + ) catch |err| { + self.channel.put(err); + return; + }; + + while (!putter_node.data.cancelled) { + { + // TODO only 1 beginOneEvent for the whole function + global_event_loop.beginOneEvent(); + errdefer global_event_loop.finishOneEvent(); + errdefer { + _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); + } + suspend { + _ = windows.kernel32.ReadDirectoryChangesW( + dir_handle, + &event_buf, + @intCast(windows.DWORD, event_buf.len), + windows.FALSE, // watch subtree + windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | + windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | + windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | + windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, + null, // number of bytes transferred (unused for async) + &resume_node.base.overlapped, + null, // completion routine - unused because we use IOCP + ); + } + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + const err = switch (windows.kernel32.GetLastError()) { + else => |err| windows.unexpectedError(err), + }; + self.channel.put(err); + } else { + // can't use @bytesToSlice because of the special variable length name field + var ptr = event_buf[0..].ptr; + const end_ptr = ptr + bytes_transferred; + var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; + while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { + ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); + const emit = switch (ev.Action) { + windows.FILE_ACTION_REMOVED => WatchEventId.Delete, + windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, + else => null, + }; + if (emit) |id| { + const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; + const user_value = blk: { + const held = dir.table_lock.acquire(); + defer held.release(); + + if (dir.file_table.get(basename_utf16le)) |entry| { + break :blk entry.value; + } else { + break :blk null; + } + }; + if (user_value) |v| { + self.channel.put(Event{ + .id = id, + .data = v, + }); + } + } + if (ev.NextEntryOffset == 0) break; + } + } + } + } + + pub fn removeFile(self: *Self, file_path: []const u8) ?V { + @panic("TODO"); + } + + fn linuxEventPutter(self: *Self) void { + global_event_loop.beginOneEvent(); + + defer { + self.os_data.table_lock.deinit(); + var wd_it = self.os_data.wd_table.iterator(); + while (wd_it.next()) |wd_entry| { + var file_it = wd_entry.value.file_table.iterator(); + while (file_it.next()) |file_entry| { + self.allocator.free(file_entry.key); + } + self.allocator.free(wd_entry.value.dirname); + wd_entry.value.file_table.deinit(); + } + self.os_data.wd_table.deinit(); + global_event_loop.finishOneEvent(); + os.close(self.os_data.inotify_fd); + self.channel.deinit(); + self.allocator.free(self.channel.buffer_nodes); + } + + var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; + + while (!self.os_data.cancelled) { + const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len); + const errno = os.linux.getErrno(rc); + switch (errno) { + 0 => { + // can't use @bytesToSlice because of the special variable length name field + var ptr = event_buf[0..].ptr; + const end_ptr = ptr + event_buf.len; + var ev: *os.linux.inotify_event = undefined; + while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) { + ev = @ptrCast(*os.linux.inotify_event, ptr); + if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { + const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); + // `ev.len` counts all bytes in `ev.name` including terminating null byte. + const basename_with_null = basename_ptr[0..ev.len]; + const user_value = blk: { + const held = self.os_data.table_lock.acquire(); + defer held.release(); + + const dir = &self.os_data.wd_table.get(ev.wd).?.value; + if (dir.file_table.get(basename_with_null)) |entry| { + break :blk entry.value; + } else { + break :blk null; + } + }; + if (user_value) |v| { + self.channel.put(Event{ + .id = WatchEventId.CloseWrite, + .data = v, + }); + } + } + + ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len); + } + }, + os.linux.EINTR => continue, + os.linux.EINVAL => unreachable, + os.linux.EFAULT => unreachable, + os.linux.EAGAIN => { + global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN | os.EPOLLONESHOT); + }, + else => unreachable, + } + } + } + }; +} + +const test_tmp_dir = "std_event_fs_test"; + +test "write a file, watch it, write it again" { + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; + + const allocator = std.heap.page_allocator; + + // TODO move this into event loop too + try os.makePath(allocator, test_tmp_dir); + defer os.deleteTree(test_tmp_dir) catch {}; + + return testFsWatch(&allocator); +} + +fn testFsWatch(allocator: *Allocator) !void { + const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" }); + defer allocator.free(file_path); + + const contents = + \\line 1 + \\line 2 + ; + const line2_offset = 7; + + // first just write then read the file + try writeFile(allocator, file_path, contents); + + const read_contents = try readFile(allocator, file_path, 1024 * 1024); + testing.expectEqualSlices(u8, contents, read_contents); + + // now watch the file + var watch = try Watch(void).init(allocator, 0); + defer watch.deinit(); + + testing.expect((try watch.addFile(file_path, {})) == null); + + const ev = watch.channel.get(); + var ev_consumed = false; + defer if (!ev_consumed) await ev; + + // overwrite line 2 + const fd = try await openReadWrite(file_path, File.default_mode); + { + defer os.close(fd); + + try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset); + } + + ev_consumed = true; + switch ((try await ev).id) { + WatchEventId.CloseWrite => {}, + WatchEventId.Delete => @panic("wrong event"), + } + const contents_updated = try readFile(allocator, file_path, 1024 * 1024); + testing.expectEqualSlices(u8, + \\line 1 + \\lorem ipsum + , contents_updated); + + // TODO test deleting the file and then re-adding it. we should get events for both +} diff --git a/lib/std/io.zig b/lib/std/io.zig @@ -47,7 +47,10 @@ fn getStdOutHandle() os.fd_t { } pub fn getStdOut() File { - return File.openHandle(getStdOutHandle()); + return File{ + .handle = getStdOutHandle(), + .io_mode = .blocking, + }; } fn getStdErrHandle() os.fd_t { @@ -63,7 +66,11 @@ fn getStdErrHandle() os.fd_t { } pub fn getStdErr() File { - return File.openHandle(getStdErrHandle()); + return File{ + .handle = getStdErrHandle(), + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; } fn getStdInHandle() os.fd_t { @@ -79,7 +86,10 @@ fn getStdInHandle() os.fd_t { } pub fn getStdIn() File { - return File.openHandle(getStdInHandle()); + return File{ + .handle = getStdInHandle(), + .io_mode = .blocking, + }; } pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream; diff --git a/lib/std/io/out_stream.zig b/lib/std/io/out_stream.zig @@ -9,14 +9,11 @@ pub const stack_size: usize = if (@hasDecl(root, "stack_size_std_io_OutStream")) else default_stack_size; -/// TODO this is not integrated with evented I/O yet. -/// https://github.com/ziglang/zig/issues/3557 pub fn OutStream(comptime WriteError: type) type { return struct { const Self = @This(); pub const Error = WriteError; - // TODO https://github.com/ziglang/zig/issues/3557 - pub const WriteFn = if (std.io.is_async and false) + pub const WriteFn = if (std.io.is_async) async fn (self: *Self, bytes: []const u8) Error!void else fn (self: *Self, bytes: []const u8) Error!void; @@ -24,8 +21,7 @@ pub fn OutStream(comptime WriteError: type) type { writeFn: WriteFn, pub fn write(self: *Self, bytes: []const u8) Error!void { - // TODO https://github.com/ziglang/zig/issues/3557 - if (std.io.is_async and false) { + if (std.io.is_async) { // Let's not be writing 0xaa in safe modes for upwards of 4 MiB for every stream write. @setRuntimeSafety(false); var stack_frame: [stack_size]u8 align(std.Target.stack_align) = undefined; @@ -40,8 +36,8 @@ pub fn OutStream(comptime WriteError: type) type { } pub fn writeByte(self: *Self, byte: u8) Error!void { - const slice = @as(*const [1]u8, &byte)[0..]; - return self.writeFn(self, slice); + const array = [1]u8{byte}; + return self.write(&array); } pub fn writeByteNTimes(self: *Self, byte: u8, n: usize) Error!void { @@ -51,7 +47,7 @@ pub fn OutStream(comptime WriteError: type) type { var remaining: usize = n; while (remaining > 0) { const to_write = std.math.min(remaining, bytes.len); - try self.writeFn(self, bytes[0..to_write]); + try self.write(bytes[0..to_write]); remaining -= to_write; } } @@ -60,32 +56,32 @@ pub fn OutStream(comptime WriteError: type) type { pub fn writeIntNative(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntNative(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } /// Write a foreign-endian integer. pub fn writeIntForeign(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntForeign(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeIntLittle(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntLittle(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeIntBig(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntBig(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeInt(self: *Self, comptime T: type, value: T, endian: builtin.Endian) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeInt(T, &bytes, value, endian); - return self.writeFn(self, &bytes); + return self.write(&bytes); } }; } diff --git a/lib/std/linked_list.zig b/lib/std/linked_list.zig @@ -18,12 +18,11 @@ pub fn SinglyLinkedList(comptime T: type) type { /// Node inside the linked list wrapping the actual data. pub const Node = struct { - next: ?*Node, + next: ?*Node = null, data: T, pub fn init(data: T) Node { return Node{ - .next = null, .data = data, }; } @@ -196,14 +195,12 @@ pub fn TailQueue(comptime T: type) type { /// Node inside the linked list wrapping the actual data. pub const Node = struct { - prev: ?*Node, - next: ?*Node, + prev: ?*Node = null, + next: ?*Node = null, data: T, pub fn init(data: T) Node { return Node{ - .prev = null, - .next = null, .data = data, }; } diff --git a/lib/std/os.zig b/lib/std/os.zig @@ -169,7 +169,12 @@ fn getRandomBytesDevURandom(buf: []u8) !void { return error.NoDevice; } - const stream = &std.fs.File.openHandle(fd).inStream().stream; + const file = std.fs.File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = std.fs.File.async_block_allowed_yes, + }; + const stream = &file.inStream().stream; stream.readNoEof(buf) catch return error.Unexpected; } @@ -293,7 +298,7 @@ pub const ReadError = error{ /// via the event loop. Otherwise EAGAIN results in error.WouldBlock. pub fn read(fd: fd_t, buf: []u8) ReadError!usize { if (builtin.os == .windows) { - return windows.ReadFile(fd, buf); + return windows.ReadFile(fd, buf, null); } if (builtin.os == .wasi and !builtin.link_libc) { @@ -335,9 +340,37 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { } /// Number of bytes read is returned. Upon reading end-of-file, zero is returned. -/// If the application has a global event loop enabled, EAGAIN is handled -/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +/// +/// This operation is non-atomic on the following systems: +/// * Windows +/// On these systems, the read races with concurrent writes to the same file descriptor. pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { + if (builtin.os == .windows) { + // TODO batch these into parallel requests + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const amt_read = try read(fd, v.iov_base[inner_off .. v.iov_len - inner_off]); + off += amt_read; + inner_off += amt_read; + if (inner_off == v.len) { + iov_i += 1; + inner_off = 0; + if (iov_i == iov.len) { + return off; + } + } + if (amt_read == 0) return off; // EOF + } else unreachable; // TODO https://github.com/ziglang/zig/issues/707 + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.readv(fd, iov.ptr, @intCast(u32, iov.len)); @@ -363,8 +396,56 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { } /// Number of bytes read is returned. Upon reading end-of-file, zero is returned. -/// If the application has a global event loop enabled, EAGAIN is handled -/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +pub fn pread(fd: fd_t, buf: []u8, offset: u64) ReadError!usize { + if (builtin.os == .windows) { + return windows.ReadFile(fd, buf, offset); + } + + while (true) { + const rc = system.pread(fd, buf.ptr, buf.len, offset); + switch (errno(rc)) { + 0 => return @intCast(usize, rc), + EINTR => continue, + EINVAL => unreachable, + EFAULT => unreachable, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdReadable(fd); + continue; + } else { + return error.WouldBlock; + }, + EBADF => unreachable, // Always a race condition. + EIO => return error.InputOutput, + EISDIR => return error.IsDir, + ENOBUFS => return error.SystemResources, + ENOMEM => return error.SystemResources, + ECONNRESET => return error.ConnectionResetByPeer, + else => |err| return unexpectedErrno(err), + } + } + return index; +} + +/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +/// +/// This operation is non-atomic on the following systems: +/// * Darwin +/// * Windows +/// On these systems, the read races with concurrent writes to the same file descriptor. pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { if (comptime std.Target.current.isDarwin()) { // Darwin does not have preadv but it does have pread. @@ -409,6 +490,28 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { } } } + + if (builtin.os == .windows) { + // TODO batch these into parallel requests + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const amt_read = try pread(fd, v.iov_base[inner_off .. v.iov_len - inner_off], offset + off); + off += amt_read; + inner_off += amt_read; + if (inner_off == v.len) { + iov_i += 1; + inner_off = 0; + if (iov_i == iov.len) { + return off; + } + } + if (amt_read == 0) return off; // EOF + } else unreachable; // TODO https://github.com/ziglang/zig/issues/707 + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.preadv(fd, iov.ptr, @intCast(u32, iov.len), offset); @@ -451,11 +554,9 @@ pub const WriteError = error{ /// Write to a file descriptor. Keeps trying if it gets interrupted. /// If the application has a global event loop enabled, EAGAIN is handled /// via the event loop. Otherwise EAGAIN results in error.WouldBlock. -/// TODO evented I/O integration is disabled until -/// https://github.com/ziglang/zig/issues/3557 is solved. pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { if (builtin.os == .windows) { - return windows.WriteFile(fd, bytes); + return windows.WriteFile(fd, bytes, null); } if (builtin.os == .wasi and !builtin.link_libc) { @@ -488,14 +589,12 @@ pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - // TODO https://github.com/ziglang/zig/issues/3557 - EAGAIN => return error.WouldBlock, - //EAGAIN => if (std.event.Loop.instance) |loop| { - // loop.waitUntilFdWritable(fd); - // continue; - //} else { - // return error.WouldBlock; - //}, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -540,8 +639,57 @@ pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void { } } +/// Write to a file descriptor, with a position offset. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +pub fn pwrite(fd: fd_t, bytes: []const u8, offset: u64) WriteError!void { + if (comptime std.Target.current.isWindows()) { + return windows.WriteFile(fd, bytes, offset); + } + + while (true) { + const rc = system.pwrite(fd, bytes.ptr, bytes.len, offset); + switch (errno(rc)) { + 0 => return, + EINTR => continue, + EINVAL => unreachable, + EFAULT => unreachable, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, + EBADF => unreachable, // Always a race condition. + EDESTADDRREQ => unreachable, // `connect` was never called. + EDQUOT => return error.DiskQuota, + EFBIG => return error.FileTooBig, + EIO => return error.InputOutput, + ENOSPC => return error.NoSpaceLeft, + EPERM => return error.AccessDenied, + EPIPE => return error.BrokenPipe, + else => |err| return unexpectedErrno(err), + } + } +} + /// Write multiple buffers to a file descriptor, with a position offset. -/// Keeps trying if it gets interrupted. +/// +/// Retries when interrupted by a signal. +/// +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// +/// This operation is non-atomic on the following systems: +/// * Darwin +/// * Windows +/// On these systems, the write races with concurrent writes to the same file descriptor, and +/// the file can be in a partially written state when an error occurs. pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void { if (comptime std.Target.current.isDarwin()) { // Darwin does not have pwritev but it does have pwrite. @@ -589,6 +737,15 @@ pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void } } + if (comptime std.Target.current.isWindows()) { + var off = offset; + for (iov) |item| { + try pwrite(fd, item.iov_base[0..item.iov_len], off); + off += buf.len; + } + return; + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.pwritev(fd, iov.ptr, @intCast(u32, iov.len), offset); @@ -694,7 +851,7 @@ pub fn openC(file_path: [*:0]const u8, flags: u32, perm: usize) OpenError!fd_t { /// Open and possibly create a file. Keeps trying if it gets interrupted. /// `file_path` is relative to the open directory handle `dir_fd`. /// See also `openatC`. -pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: usize) OpenError!fd_t { +pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: mode_t) OpenError!fd_t { const file_path_c = try toPosixPath(file_path); return openatC(dir_fd, &file_path_c, flags, mode); } @@ -702,7 +859,7 @@ pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: usize) Open /// Open and possibly create a file. Keeps trying if it gets interrupted. /// `file_path` is relative to the open directory handle `dir_fd`. /// See also `openat`. -pub fn openatC(dir_fd: fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) OpenError!fd_t { +pub fn openatC(dir_fd: fd_t, file_path: [*:0]const u8, flags: u32, mode: mode_t) OpenError!fd_t { while (true) { const rc = system.openat(dir_fd, file_path, flags, mode); switch (errno(rc)) { @@ -3328,9 +3485,7 @@ pub fn getrusage(who: i32) rusage { } } -pub const TermiosGetError = error{ - NotATerminal, -} || UnexpectedError; +pub const TermiosGetError = error{NotATerminal} || UnexpectedError; pub fn tcgetattr(handle: fd_t) TermiosGetError!termios { var term: termios = undefined; @@ -3342,9 +3497,7 @@ pub fn tcgetattr(handle: fd_t) TermiosGetError!termios { } } -pub const TermiosSetError = TermiosGetError || error{ - ProcessOrphaned, -}; +pub const TermiosSetError = TermiosGetError || error{ProcessOrphaned}; pub fn tcsetattr(handle: fd_t, optional_action: TCSA, termios_p: termios) TermiosSetError!void { while (true) { diff --git a/lib/std/os/bits/darwin.zig b/lib/std/os/bits/darwin.zig @@ -4,6 +4,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; pub const in_port_t = u16; pub const sa_family_t = u8; diff --git a/lib/std/os/bits/dragonfly.zig b/lib/std/os/bits/dragonfly.zig @@ -7,6 +7,7 @@ pub fn S_ISCHR(m: u32) bool { pub const fd_t = c_int; pub const pid_t = c_int; pub const off_t = c_long; +pub const mode_t = c_uint; pub const ENOTSUP = EOPNOTSUPP; pub const EWOULDBLOCK = EAGAIN; diff --git a/lib/std/os/bits/freebsd.zig b/lib/std/os/bits/freebsd.zig @@ -3,6 +3,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; pub const socklen_t = u32; diff --git a/lib/std/os/bits/linux/x86_64.zig b/lib/std/os/bits/linux/x86_64.zig @@ -12,6 +12,8 @@ const socklen_t = linux.socklen_t; const iovec = linux.iovec; const iovec_const = linux.iovec_const; +pub const mode_t = usize; + pub const SYS_read = 0; pub const SYS_write = 1; pub const SYS_open = 2; diff --git a/lib/std/os/bits/netbsd.zig b/lib/std/os/bits/netbsd.zig @@ -3,6 +3,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; /// Renamed from `kevent` to `Kevent` to avoid conflict with function name. pub const Kevent = extern struct { diff --git a/lib/std/os/bits/wasi.zig b/lib/std/os/bits/wasi.zig @@ -130,6 +130,7 @@ pub const EVENTTYPE_FD_WRITE: eventtype_t = 2; pub const exitcode_t = u32; pub const fd_t = u32; +pub const mode_t = u32; pub const fdflags_t = u16; pub const FDFLAG_APPEND: fdflags_t = 0x0001; diff --git a/lib/std/os/bits/windows.zig b/lib/std/os/bits/windows.zig @@ -5,6 +5,7 @@ const ws2_32 = @import("../windows/ws2_32.zig"); pub const fd_t = HANDLE; pub const pid_t = HANDLE; +pub const mode_t = u0; pub const PATH_MAX = 260; diff --git a/lib/std/os/windows.zig b/lib/std/os/windows.zig @@ -344,24 +344,77 @@ pub fn FindClose(hFindFile: HANDLE) void { assert(kernel32.FindClose(hFindFile) != 0); } -pub const ReadFileError = error{Unexpected}; - -pub fn ReadFile(in_hFile: HANDLE, buffer: []u8) ReadFileError!usize { - var index: usize = 0; - while (index < buffer.len) { - const want_read_count = @intCast(DWORD, math.min(@as(DWORD, maxInt(DWORD)), buffer.len - index)); - var amt_read: DWORD = undefined; - if (kernel32.ReadFile(in_hFile, buffer.ptr + index, want_read_count, &amt_read, null) == 0) { - switch (kernel32.GetLastError()) { - .OPERATION_ABORTED => continue, - .BROKEN_PIPE => return index, - else => |err| return unexpectedError(err), +pub const ReadFileError = error{ + OperationAborted, + BrokenPipe, + Unexpected, +}; + +/// If buffer's length exceeds what a Windows DWORD integer can hold, it will be broken into +/// multiple non-atomic reads. +pub fn ReadFile(in_hFile: HANDLE, buffer: []u8, offset: ?u64) ReadFileError!usize { + if (std.event.Loop.instance) |loop| { + // TODO support async ReadFile with no offset + const off = offset.?; + var resume_node = std.event.Loop.ResumeNode.Basic{ + .base = .{ + .id = .Basic, + .handle = @frame(), + .overlapped = OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined; + loop.beginOneEvent(); + suspend { + // TODO handle buffer bigger than DWORD can hold + _ = windows.kernel32.ReadFile(fd, buffer.ptr, @intCast(windows.DWORD, buffer.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { + .IO_PENDING => unreachable, + .OPERATION_ABORTED => return error.OperationAborted, + .BROKEN_PIPE => return error.BrokenPipe, + .HANDLE_EOF => return @as(usize, bytes_transferred), + else => |err| return windows.unexpectedError(err), + } + } + return @as(usize, bytes_transferred); + } else { + var index: usize = 0; + while (index < buffer.len) { + const want_read_count = @intCast(DWORD, math.min(@as(DWORD, maxInt(DWORD)), buffer.len - index)); + var amt_read: DWORD = undefined; + var overlapped_data: OVERLAPPED = undefined; + const overlapped: ?*OVERLAPPED = if (offset) |off| blk: { + overlapped_data = .{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off + index), + .OffsetHigh = @truncate(u32, (off + index) >> 32), + .hEvent = null, + }; + break :blk &overlapped_data; + } else null; + if (kernel32.ReadFile(in_hFile, buffer.ptr + index, want_read_count, &amt_read, overlapped) == 0) { + switch (kernel32.GetLastError()) { + .OPERATION_ABORTED => continue, + .BROKEN_PIPE => return index, + else => |err| return unexpectedError(err), + } } + if (amt_read == 0) return index; + index += amt_read; } - if (amt_read == 0) return index; - index += amt_read; + return index; } - return index; } pub const WriteFileError = error{ @@ -371,20 +424,66 @@ pub const WriteFileError = error{ Unexpected, }; -/// This function is for blocking file descriptors only. For non-blocking, see -/// `WriteFileAsync`. -pub fn WriteFile(handle: HANDLE, bytes: []const u8) WriteFileError!void { - var bytes_written: DWORD = undefined; - // TODO replace this @intCast with a loop that writes all the bytes - if (kernel32.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), &bytes_written, null) == 0) { - switch (kernel32.GetLastError()) { - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.OperationAborted, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .IO_PENDING => unreachable, // this function is for blocking files only - .BROKEN_PIPE => return error.BrokenPipe, - else => |err| return unexpectedError(err), +pub fn WriteFile(handle: HANDLE, bytes: []const u8, offset: ?u64) WriteFileError!void { + if (std.event.Loop.instance) |loop| { + // TODO support async WriteFile with no offset + const off = offset.?; + var resume_node = std.event.Loop.ResumeNode.Basic{ + .base = .{ + .id = .Basic, + .handle = @frame(), + .overlapped = OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); + loop.beginOneEvent(); + suspend { + // TODO replace this @intCast with a loop that writes all the bytes + _ = kernel32.WriteFile(fd, bytes.ptr, @intCast(windows.DWORD, bytes.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, FALSE) == 0) { + switch (kernel32.GetLastError()) { + .IO_PENDING => unreachable, + .INVALID_USER_BUFFER => return error.SystemResources, + .NOT_ENOUGH_MEMORY => return error.SystemResources, + .OPERATION_ABORTED => return error.OperationAborted, + .NOT_ENOUGH_QUOTA => return error.SystemResources, + .BROKEN_PIPE => return error.BrokenPipe, + else => |err| return windows.unexpectedError(err), + } + } + } else { + var bytes_written: DWORD = undefined; + var overlapped_data: OVERLAPPED = undefined; + const overlapped: ?*OVERLAPPED = if (offset) |off| blk: { + overlapped_data = .{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }; + break :blk &overlapped_data; + } else null; + // TODO replace this @intCast with a loop that writes all the bytes + if (kernel32.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), &bytes_written, overlapped) == 0) { + switch (kernel32.GetLastError()) { + .INVALID_USER_BUFFER => return error.SystemResources, + .NOT_ENOUGH_MEMORY => return error.SystemResources, + .OPERATION_ABORTED => return error.OperationAborted, + .NOT_ENOUGH_QUOTA => return error.SystemResources, + .IO_PENDING => unreachable, // this function is for blocking files only + .BROKEN_PIPE => return error.BrokenPipe, + else => |err| return unexpectedError(err), + } } } } diff --git a/lib/std/special/test_runner.zig b/lib/std/special/test_runner.zig @@ -2,7 +2,7 @@ const std = @import("std"); const io = std.io; const builtin = @import("builtin"); -pub const io_mode = builtin.test_io_mode; +pub const io_mode: io.Mode = builtin.test_io_mode; pub fn main() anyerror!void { const test_fn_list = builtin.test_functions; @@ -14,6 +14,11 @@ pub fn main() anyerror!void { error.TimerUnsupported => @panic("timer unsupported"), }; + var async_frame_buffer: []align(std.Target.stack_align) u8 = undefined; + // TODO this is on the next line (using `undefined` above) because otherwise zig incorrectly + // ignores the alignment of the slice. + async_frame_buffer = &[_]u8{}; + for (test_fn_list) |test_fn, i| { std.testing.base_allocator_instance.reset(); @@ -23,7 +28,24 @@ pub fn main() anyerror!void { if (progress.terminal == null) { std.debug.warn("{}/{} {}...", .{ i + 1, test_fn_list.len, test_fn.name }); } - if (test_fn.func()) |_| { + const result = if (test_fn.async_frame_size) |size| switch (io_mode) { + .evented => blk: { + if (async_frame_buffer.len < size) { + std.heap.page_allocator.free(async_frame_buffer); + async_frame_buffer = try std.heap.page_allocator.alignedAlloc(u8, std.Target.stack_align, size); + } + const casted_fn = @ptrCast(async fn () anyerror!void, test_fn.func); + break :blk await @asyncCall(async_frame_buffer, {}, casted_fn); + }, + .blocking => { + skip_count += 1; + test_node.end(); + progress.log("{}...SKIP (async test)\n", .{test_fn.name}); + if (progress.terminal == null) std.debug.warn("SKIP (async test)\n", .{}); + continue; + }, + } else test_fn.func(); + if (result) |_| { ok_count += 1; test_node.end(); std.testing.allocator_instance.validate() catch |err| switch (err) { diff --git a/src/analyze.cpp b/src/analyze.cpp @@ -6144,6 +6144,15 @@ static bool scope_needs_spill(Scope *scope) { zig_unreachable(); } +static ZigType *resolve_type_isf(ZigType *ty) { + if (ty->id != ZigTypeIdPointer) return ty; + InferredStructField *isf = ty->data.pointer.inferred_struct_field; + if (isf == nullptr) return ty; + TypeStructField *field = find_struct_type_field(isf->inferred_struct_type, isf->field_name); + assert(field != nullptr); + return field->type_entry; +} + static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { Error err; @@ -6245,6 +6254,9 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } ZigFn *callee = call->fn_entry; if (callee == nullptr) { + if (call->fn_ref->value->type->data.fn.fn_type_id.cc != CallingConventionAsync) { + continue; + } add_node_error(g, call->base.base.source_node, buf_sprintf("function is not comptime-known; @asyncCall required")); return ErrorSemanticAnalyzeFail; @@ -6402,7 +6414,7 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } else { param_name = buf_sprintf("@arg%" ZIG_PRI_usize, arg_i); } - ZigType *param_type = param_info->type; + ZigType *param_type = resolve_type_isf(param_info->type); if ((err = type_resolve(g, param_type, ResolveStatusSizeKnown))) { return err; } @@ -6421,7 +6433,7 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { instruction->field_index = SIZE_MAX; ZigType *ptr_type = instruction->base.value->type; assert(ptr_type->id == ZigTypeIdPointer); - ZigType *child_type = ptr_type->data.pointer.child_type; + ZigType *child_type = resolve_type_isf(ptr_type->data.pointer.child_type); if (!type_has_bits(child_type)) continue; if (instruction->base.base.ref_count == 0) @@ -6448,8 +6460,6 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } instruction->field_index = fields.length; - src_assert(child_type->id != ZigTypeIdPointer || child_type->data.pointer.inferred_struct_field == nullptr, - instruction->base.base.source_node); fields.append({name, child_type, instruction->align}); } @@ -8251,6 +8261,8 @@ static void resolve_llvm_types_struct(CodeGen *g, ZigType *struct_type, ResolveS size_t debug_field_index = 0; for (size_t i = 0; i < field_count; i += 1) { TypeStructField *field = struct_type->data.structure.fields[i]; + //fprintf(stderr, "%s at gen index %zu\n", buf_ptr(field->name), field->gen_index); + size_t gen_field_index = field->gen_index; if (gen_field_index == SIZE_MAX) { continue; diff --git a/src/codegen.cpp b/src/codegen.cpp @@ -362,14 +362,16 @@ static uint32_t frame_index_arg(CodeGen *g, ZigType *return_type) { } // label (grep this): [fn_frame_struct_layout] -static uint32_t frame_index_trace_stack(CodeGen *g, FnTypeId *fn_type_id) { - uint32_t result = frame_index_arg(g, fn_type_id->return_type); - for (size_t i = 0; i < fn_type_id->param_count; i += 1) { - if (type_has_bits(fn_type_id->param_info->type)) { - result += 1; - } +static uint32_t frame_index_trace_stack(CodeGen *g, ZigFn *fn) { + size_t field_index = 6; + bool have_stack_trace = codegen_fn_has_err_ret_tracing_arg(g, fn->type_entry->data.fn.fn_type_id.return_type); + if (have_stack_trace) { + field_index += 2; } - return result; + field_index += fn->type_entry->data.fn.fn_type_id.param_count; + ZigType *locals_struct = fn->frame_type->data.frame.locals_struct; + TypeStructField *field = locals_struct->data.structure.fields[field_index]; + return field->gen_index; } @@ -7742,7 +7744,7 @@ static void do_code_gen(CodeGen *g) { } uint32_t trace_field_index_stack = UINT32_MAX; if (codegen_fn_has_err_ret_tracing_stack(g, fn_table_entry, true)) { - trace_field_index_stack = frame_index_trace_stack(g, fn_type_id); + trace_field_index_stack = frame_index_trace_stack(g, fn_table_entry); g->cur_err_ret_trace_val_stack = LLVMBuildStructGEP(g->builder, g->cur_frame_ptr, trace_field_index_stack, ""); } @@ -9396,22 +9398,13 @@ static void update_test_functions_builtin_decl(CodeGen *g) { for (size_t i = 0; i < g->test_fns.length; i += 1) { ZigFn *test_fn_entry = g->test_fns.at(i); - if (fn_is_async(test_fn_entry)) { - ErrorMsg *msg = add_node_error(g, test_fn_entry->proto_node, - buf_create_from_str("test functions cannot be async")); - add_error_note(g, msg, test_fn_entry->proto_node, - buf_sprintf("this restriction may be lifted in the future. See https://github.com/ziglang/zig/issues/3117 for more details")); - add_async_error_notes(g, msg, test_fn_entry); - continue; - } - ZigValue *this_val = &test_fn_array->data.x_array.data.s_none.elements[i]; this_val->special = ConstValSpecialStatic; this_val->type = struct_type; this_val->parent.id = ConstParentIdArray; this_val->parent.data.p_array.array_val = test_fn_array; this_val->parent.data.p_array.elem_index = i; - this_val->data.x_struct.fields = alloc_const_vals_ptrs(2); + this_val->data.x_struct.fields = alloc_const_vals_ptrs(3); ZigValue *name_field = this_val->data.x_struct.fields[0]; ZigValue *name_array_val = create_const_str_lit(g, &test_fn_entry->symbol_name)->data.x_ptr.data.ref.pointee; @@ -9423,6 +9416,19 @@ static void update_test_functions_builtin_decl(CodeGen *g) { fn_field->data.x_ptr.special = ConstPtrSpecialFunction; fn_field->data.x_ptr.mut = ConstPtrMutComptimeConst; fn_field->data.x_ptr.data.fn.fn_entry = test_fn_entry; + + ZigValue *frame_size_field = this_val->data.x_struct.fields[2]; + frame_size_field->type = get_optional_type(g, g->builtin_types.entry_usize); + frame_size_field->special = ConstValSpecialStatic; + frame_size_field->data.x_optional = nullptr; + + if (fn_is_async(test_fn_entry)) { + frame_size_field->data.x_optional = create_const_vals(1); + frame_size_field->data.x_optional->special = ConstValSpecialStatic; + frame_size_field->data.x_optional->type = g->builtin_types.entry_usize; + bigint_init_unsigned(&frame_size_field->data.x_optional->data.x_bigint, + test_fn_entry->frame_type->abi_size); + } } report_errors_and_maybe_exit(g);