diff --git a/std/event/fs.zig b/std/event/fs.zig index 1f810c4842..e468af2e67 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -1,8 +1,10 @@ +const builtin = @import("builtin"); const std = @import("../index.zig"); const event = std.event; const assert = std.debug.assert; const os = std.os; const mem = std.mem; +const posix = os.posix; pub const RequestNode = std.atomic.Queue(Request).Node; @@ -19,8 +21,7 @@ pub const Request = struct { pub const Msg = union(enum) { PWriteV: PWriteV, PReadV: PReadV, - OpenRead: OpenRead, - OpenRW: OpenRW, + Open: Open, Close: Close, WriteFile: WriteFile, End, // special - means the fs thread should exit @@ -43,19 +44,12 @@ pub const Request = struct { pub const Error = os.File.ReadError; }; - pub const OpenRead = struct { + pub const Open = struct { /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 path: []const u8, - result: Error!os.FileHandle, - - pub const Error = os.File.OpenError; - }; - - pub const OpenRW = struct { - /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 - path: []const u8, - result: Error!os.FileHandle, + flags: u32, mode: os.File.Mode, + result: Error!os.FileHandle, pub const Error = os.File.OpenError; }; @@ -77,7 +71,7 @@ pub const Request = struct { }; /// data - just the inner references - must live until pwritev promise completes. -pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []const u8) !void { +pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -94,8 +88,8 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: } var req_node = RequestNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = Request{ .msg = Request.Msg{ .PWriteV = Request.Msg.PWriteV{ @@ -107,14 +101,16 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = @handle(), }, }, }, }; + errdefer loop.posixFsCancel(&req_node); + suspend { loop.posixFsRequest(&req_node); } @@ -123,7 +119,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: } /// data - just the inner references - must live until pwritev promise completes. -pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []u8) !usize { +pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { //const data_dupe = try mem.dupe(loop.allocator, []const u8, data); //defer loop.allocator.free(data_dupe); @@ -143,8 +139,8 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [ } var req_node = RequestNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = Request{ .msg = Request.Msg{ .PReadV = Request.Msg.PReadV{ @@ -156,14 +152,16 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [ }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = @handle(), }, }, }, }; + errdefer loop.posixFsCancel(&req_node); + suspend { loop.posixFsRequest(&req_node); } @@ -171,47 +169,8 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [ return req_node.data.msg.PReadV.result; } -pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - - const path_with_null = try std.cstr.addNullByte(loop.allocator, path); - defer loop.allocator.free(path_with_null); - - var req_node = RequestNode{ - .prev = undefined, - .next = undefined, - .data = Request{ - .msg = Request.Msg{ - .OpenRead = Request.Msg.OpenRead{ - .path = path_with_null[0..path.len], - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = event.Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = @handle(), - }, - }, - }, - }; - - suspend { - loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.OpenRead.result; -} - -/// Creates if does not exist. Does not truncate. -pub async fn openReadWrite( - loop: *event.Loop, - path: []const u8, - mode: os.File.Mode, +pub async fn open( + loop: *event.Loop, path: []const u8, flags: u32, mode: os.File.Mode, ) os.File.OpenError!os.FileHandle { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { @@ -222,50 +181,70 @@ pub async fn openReadWrite( defer loop.allocator.free(path_with_null); var req_node = RequestNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = Request{ .msg = Request.Msg{ - .OpenRW = Request.Msg.OpenRW{ + .Open = Request.Msg.Open{ .path = path_with_null[0..path.len], + .flags = flags, .mode = mode, .result = undefined, }, }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = @handle(), }, }, }, }; + errdefer loop.posixFsCancel(&req_node); + suspend { loop.posixFsRequest(&req_node); } - return req_node.data.msg.OpenRW.result; + return req_node.data.msg.Open.result; +} + +pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle { + const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; + return await (async open(loop, path, flags, 0) catch unreachable); +} + +/// Creates if does not exist. Does not truncate. +pub async fn openReadWrite( + loop: *event.Loop, + path: []const u8, + mode: os.File.Mode, +) os.File.OpenError!os.FileHandle { + const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; + return await (async open(loop, path, flags, mode) catch unreachable); } /// This abstraction helps to close file handles in defer expressions /// without the possibility of failure and without the use of suspend points. /// Start a `CloseOperation` before opening a file, so that you can defer -/// `CloseOperation.deinit`. +/// `CloseOperation.finish`. +/// If you call `setHandle` then finishing will close the fd; otherwise finishing +/// will deallocate the `CloseOperation`. pub const CloseOperation = struct { loop: *event.Loop, have_fd: bool, close_req_node: RequestNode, - pub fn create(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) { + pub fn start(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) { const self = try loop.allocator.createOne(CloseOperation); self.* = CloseOperation{ .loop = loop, .have_fd = false, .close_req_node = RequestNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = Request{ .msg = Request.Msg{ .Close = Request.Msg.Close{ .fd = undefined }, @@ -278,7 +257,7 @@ pub const CloseOperation = struct { } /// Defer this after creating. - pub fn deinit(self: *CloseOperation) void { + pub fn finish(self: *CloseOperation) void { if (self.have_fd) { self.loop.posixFsRequest(&self.close_req_node); } else { @@ -290,6 +269,16 @@ pub const CloseOperation = struct { self.close_req_node.data.msg.Close.fd = handle; self.have_fd = true; } + + /// Undo a `setHandle`. + pub fn clearHandle(self: *CloseOperation) void { + self.have_fd = false; + } + + pub fn getHandle(self: *CloseOperation) os.FileHandle { + assert(self.have_fd); + return self.close_req_node.data.msg.Close.fd; + } }; /// contents must remain alive until writeFile completes. @@ -308,8 +297,8 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons defer loop.allocator.free(path_with_null); var req_node = RequestNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = Request{ .msg = Request.Msg{ .WriteFile = Request.Msg.WriteFile{ @@ -321,14 +310,16 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ - .prev = undefined, - .next = undefined, + .prev = null, + .next = null, .data = @handle(), }, }, }, }; + errdefer loop.posixFsCancel(&req_node); + suspend { loop.posixFsRequest(&req_node); } @@ -340,8 +331,8 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons /// is closed. /// Caller owns returned memory. pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 { - var close_op = try CloseOperation.create(loop); - defer close_op.deinit(); + var close_op = try CloseOperation.start(loop); + defer close_op.finish(); const path_with_null = try std.cstr.addNullByte(loop.allocator, file_path); defer loop.allocator.free(path_with_null); @@ -356,7 +347,7 @@ pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) try list.ensureCapacity(list.len + os.page_size); const buf = list.items[list.len..]; const buf_array = [][]u8{buf}; - const amt = try await (async preadv(loop, fd, list.len, buf_array) catch unreachable); + const amt = try await (async preadv(loop, fd, buf_array, list.len) catch unreachable); list.len += amt; if (list.len > max_size) { return error.FileTooBig; @@ -370,19 +361,38 @@ pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) pub fn Watch(comptime V: type) type { return struct { channel: *event.Channel(Event), - putter: promise, - wd_table: WdTable, - table_lock: event.Lock, - inotify_fd: i32, + os_data: OsData, + + const OsData = switch (builtin.os) { + builtin.Os.macosx => struct{ + file_table: FileTable, + table_lock: event.Lock, + + const FileTable = std.AutoHashMap([]const u8, *Put); + const Put = struct { + putter: promise, + value_ptr: *V, + }; + }, + builtin.Os.linux => struct { + putter: promise, + inotify_fd: i32, + wd_table: WdTable, + table_lock: event.Lock, + + const FileTable = std.AutoHashMap([]const u8, V); + }, + else => @compileError("Unsupported OS"), + }; const WdTable = std.AutoHashMap(i32, Dir); - const FileTable = std.AutoHashMap([]const u8, V); + const FileToHandle = std.AutoHashMap([]const u8, promise); const Self = this; const Dir = struct { dirname: []const u8, - file_table: FileTable, + file_table: OsData.FileTable, }; pub const Event = union(enum) { @@ -392,26 +402,140 @@ pub fn Watch(comptime V: type) type { pub const Error = error{ UserResourceLimitReached, SystemResources, + AccessDenied, }; }; pub fn create(loop: *event.Loop, event_buf_count: usize) !*Self { - const inotify_fd = try os.linuxINotifyInit1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); - errdefer os.close(inotify_fd); - const channel = try event.Channel(Self.Event).create(loop, event_buf_count); errdefer channel.destroy(); - var result: *Self = undefined; - _ = try async eventPutter(inotify_fd, channel, &result); - return result; + switch (builtin.os) { + builtin.Os.linux => { + const inotify_fd = try os.linuxINotifyInit1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); + errdefer os.close(inotify_fd); + + var result: *Self = undefined; + _ = try async linuxEventPutter(inotify_fd, channel, &result); + return result; + }, + builtin.Os.macosx => { + const self = try loop.allocator.createOne(Self); + errdefer loop.allocator.destroy(self); + + self.* = Self{ + .channel = channel, + .os_data = OsData{ + .table_lock = event.Lock.init(loop), + .file_table = OsData.FileTable.init(loop.allocator), + }, + }; + return self; + }, + else => @compileError("Unsupported OS"), + } } pub fn destroy(self: *Self) void { - cancel self.putter; + switch (builtin.os) { + builtin.Os.macosx => { + self.os_data.table_lock.deinit(); + var it = self.os_data.file_table.iterator(); + while (it.next()) |entry| { + cancel entry.value.putter; + self.channel.loop.allocator.free(entry.key); + } + self.channel.destroy(); + }, + builtin.Os.linux => cancel self.os_data.putter, + else => @compileError("Unsupported OS"), + } } pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { + switch (builtin.os) { + builtin.Os.macosx => return await (async addFileMacosx(self, file_path, value) catch unreachable), + builtin.Os.linux => return await (async addFileLinux(self, file_path, value) catch unreachable), + else => @compileError("Unsupported OS"), + } + } + + async fn addFileMacosx(self: *Self, file_path: []const u8, value: V) !?V { + const resolved_path = try os.path.resolve(self.channel.loop.allocator, file_path); + var resolved_path_consumed = false; + defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); + + var close_op = try CloseOperation.start(self.channel.loop); + var close_op_consumed = false; + defer if (!close_op_consumed) close_op.finish(); + + const flags = posix.O_SYMLINK|posix.O_EVTONLY; + const mode = 0; + const fd = try await (async open(self.channel.loop, resolved_path, flags, mode) catch unreachable); + close_op.setHandle(fd); + + var put_data: *OsData.Put = undefined; + const putter = try async self.kqPutEvents(close_op, value, &put_data); + close_op_consumed = true; + errdefer cancel putter; + + const result = blk: { + const held = await (async self.os_data.table_lock.acquire() catch unreachable); + defer held.release(); + + const gop = try self.os_data.file_table.getOrPut(resolved_path); + if (gop.found_existing) { + const prev_value = gop.kv.value.value_ptr.*; + cancel gop.kv.value.putter; + gop.kv.value = put_data; + break :blk prev_value; + } else { + resolved_path_consumed = true; + gop.kv.value = put_data; + break :blk null; + } + }; + + return result; + } + + async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { + // TODO https://github.com/ziglang/zig/issues/1194 + suspend { + resume @handle(); + } + + var value_copy = value; + var put = OsData.Put{ + .putter = @handle(), + .value_ptr = &value_copy, + }; + out_put.* = &put; + self.channel.loop.beginOneEvent(); + + defer { + close_op.finish(); + self.channel.loop.finishOneEvent(); + } + + while (true) { + (await (async self.channel.loop.bsdWaitKev( + @intCast(usize, close_op.getHandle()), posix.EVFILT_VNODE, posix.NOTE_WRITE, + ) catch unreachable)) catch |err| switch (err) { + error.EventNotFound => unreachable, + error.ProcessNotFound => unreachable, + error.AccessDenied, error.SystemResources => { + // TODO https://github.com/ziglang/zig/issues/769 + const casted_err = @errSetCast(error{AccessDenied,SystemResources}, err); + await (async self.channel.put(Self.Event{ .Err = casted_err }) catch unreachable); + }, + }; + + await (async self.channel.put(Self.Event{ .CloseWrite = value_copy }) catch unreachable); + } + } + + async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { const dirname = os.path.dirname(file_path) orelse "."; const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); var dirname_with_null_consumed = false; @@ -423,20 +547,20 @@ pub fn Watch(comptime V: type) type { defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); const wd = try os.linuxINotifyAddWatchC( - self.inotify_fd, + self.os_data.inotify_fd, dirname_with_null.ptr, os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, ); // wd is either a newly created watch or an existing one. - const held = await (async self.table_lock.acquire() catch unreachable); + const held = await (async self.os_data.table_lock.acquire() catch unreachable); defer held.release(); - const gop = try self.wd_table.getOrPut(wd); + const gop = try self.os_data.wd_table.getOrPut(wd); if (!gop.found_existing) { gop.kv.value = Dir{ .dirname = dirname_with_null, - .file_table = FileTable.init(self.channel.loop.allocator), + .file_table = OsData.FileTable.init(self.channel.loop.allocator), }; dirname_with_null_consumed = true; } @@ -458,7 +582,7 @@ pub fn Watch(comptime V: type) type { @panic("TODO"); } - async fn eventPutter(inotify_fd: i32, channel: *event.Channel(Event), out_watch: **Self) void { + async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event), out_watch: **Self) void { // TODO https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -467,27 +591,27 @@ pub fn Watch(comptime V: type) type { const loop = channel.loop; var watch = Self{ - .putter = @handle(), .channel = channel, - .wd_table = WdTable.init(loop.allocator), - .table_lock = event.Lock.init(loop), - .inotify_fd = inotify_fd, + .os_data = OsData{ + .putter = @handle(), + .inotify_fd = inotify_fd, + .wd_table = WdTable.init(loop.allocator), + .table_lock = event.Lock.init(loop), + }, }; out_watch.* = &watch; loop.beginOneEvent(); defer { - watch.table_lock.deinit(); - { - var wd_it = watch.wd_table.iterator(); - while (wd_it.next()) |wd_entry| { - var file_it = wd_entry.value.file_table.iterator(); - while (file_it.next()) |file_entry| { - loop.allocator.free(file_entry.key); - } - loop.allocator.free(wd_entry.value.dirname); + watch.os_data.table_lock.deinit(); + var wd_it = watch.os_data.wd_table.iterator(); + while (wd_it.next()) |wd_entry| { + var file_it = wd_entry.value.file_table.iterator(); + while (file_it.next()) |file_entry| { + loop.allocator.free(file_entry.key); } + loop.allocator.free(wd_entry.value.dirname); } loop.finishOneEvent(); os.close(inotify_fd); @@ -511,10 +635,10 @@ pub fn Watch(comptime V: type) type { const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); const basename_with_null = basename_ptr[0 .. std.cstr.len(basename_ptr) + 1]; const user_value = blk: { - const held = await (async watch.table_lock.acquire() catch unreachable); + const held = await (async watch.os_data.table_lock.acquire() catch unreachable); defer held.release(); - const dir = &watch.wd_table.get(ev.wd).?.value; + const dir = &watch.os_data.wd_table.get(ev.wd).?.value; if (dir.file_table.get(basename_with_null)) |entry| { break :blk entry.value; } else { @@ -572,7 +696,7 @@ test "write a file, watch it, write it again" { try loop.initMultiThreaded(allocator); defer loop.deinit(); - var result: error!void = undefined; + var result: error!void = error.ResultNeverWritten; const handle = try async testFsWatchCantFail(&loop, &result); defer cancel handle; @@ -615,7 +739,7 @@ async fn testFsWatch(loop: *event.Loop) !void { { defer os.close(fd); - try await try async pwritev(loop, fd, line2_offset, []const []const u8{"lorem ipsum"}); + try await try async pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset); } ev_consumed = true; diff --git a/std/event/loop.zig b/std/event/loop.zig index 78191e60d4..278462409f 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -116,7 +116,7 @@ pub const Loop = struct { switch (builtin.os) { builtin.Os.linux => { self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); - self.os_data.fs_queue_len = 0; + self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async // file system I/O API. self.os_data.fs_end_request = fs.RequestNode{ @@ -201,9 +201,6 @@ pub const Loop = struct { }, }; - self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); - errdefer self.allocator.free(self.os_data.kevents); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; for (self.eventfd_resume_nodes) |*eventfd_node, i| { @@ -230,15 +227,6 @@ pub const Loop = struct { _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; - // this one is for waiting for events - self.os_data.kevents[i] = posix.Kevent{ - .ident = i, - .filter = posix.EVFILT_USER, - .flags = 0, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(&eventfd_node.data.base), - }; } // Pre-add so that we cannot get error.SystemResources @@ -257,16 +245,16 @@ pub const Loop = struct { self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; self.os_data.fs_kevent_wake = posix.Kevent{ - .ident = extra_thread_count + 1, + .ident = 0, .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD, + .flags = posix.EV_ADD|posix.EV_ENABLE, .fflags = posix.NOTE_TRIGGER, .data = 0, .udata = undefined, }; self.os_data.fs_kevent_wait = posix.Kevent{ - .ident = extra_thread_count + 1, + .ident = 0, .filter = posix.EVFILT_USER, .flags = posix.EV_ADD|posix.EV_CLEAR, .fflags = 0, @@ -349,7 +337,6 @@ pub const Loop = struct { self.allocator.free(self.eventfd_resume_nodes); }, builtin.Os.macosx => { - self.allocator.free(self.os_data.kevents); os.close(self.os_data.kqfd); os.close(self.os_data.fs_kqfd); }, @@ -384,12 +371,8 @@ pub const Loop = struct { } pub fn linuxRemoveFd(self: *Loop, fd: i32) void { - self.linuxRemoveFdNoCounter(fd); - self.finishOneEvent(); - } - - fn linuxRemoveFdNoCounter(self: *Loop, fd: i32) void { os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + self.finishOneEvent(); } pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { @@ -404,6 +387,50 @@ pub const Loop = struct { } } + pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !void { + defer self.bsdRemoveKev(ident, filter); + suspend { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = @handle(), + }; + try self.bsdAddKev(&resume_node, ident, filter, fflags); + } + } + + /// resume_node must live longer than the promise that it holds a reference to. + pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode, ident: usize, filter: i16, fflags: u32) !void { + self.beginOneEvent(); + errdefer self.finishOneEvent(); + var kev = posix.Kevent{ + .ident = ident, + .filter = filter, + .flags = posix.EV_ADD|posix.EV_ENABLE|posix.EV_CLEAR, + .fflags = fflags, + .data = 0, + .udata = @ptrToInt(resume_node), + }; + const kevent_array = (*[1]posix.Kevent)(&kev); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); + } + + pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void { + var kev = posix.Kevent{ + .ident = ident, + .filter = filter, + .flags = posix.EV_DELETE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + const kevent_array = (*[1]posix.Kevent)(&kev); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; + self.finishOneEvent(); + } + fn dispatch(self: *Loop) void { while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { const next_tick_node = self.next_tick_queue.get() orelse { @@ -598,7 +625,8 @@ pub const Loop = struct { }, builtin.Os.macosx => { var eventlist: [1]posix.Kevent = undefined; - const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + const count = os.bsdKEvent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; for (eventlist[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.udata); const handle = resume_node.handle; @@ -617,7 +645,6 @@ pub const Loop = struct { self.finishOneEvent(); } } - break; }, builtin.Os.windows => { var completion_key: usize = undefined; @@ -662,8 +689,8 @@ pub const Loop = struct { _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; }, builtin.Os.linux => { - _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap - const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); + _ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAKE, 1); switch (os.linux.getErrno(rc)) { 0 => {}, posix.EINVAL => unreachable, @@ -674,11 +701,18 @@ pub const Loop = struct { } } + fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void { + if (self.os_data.fs_queue.remove(request_node)) { + self.finishOneEvent(); + } + } + fn posixFsRun(self: *Loop) void { - var processed_count: i32 = 0; // we let this wrap while (true) { + if (builtin.os == builtin.Os.linux) { + _ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + } while (self.os_data.fs_queue.get()) |node| { - processed_count +%= 1; switch (node.data.msg) { @TagType(fs.Request.Msg).End => return, @TagType(fs.Request.Msg).PWriteV => |*msg| { @@ -687,13 +721,8 @@ pub const Loop = struct { @TagType(fs.Request.Msg).PReadV => |*msg| { msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); }, - @TagType(fs.Request.Msg).OpenRead => |*msg| { - const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; - msg.result = os.posixOpenC(msg.path.ptr, flags, 0); - }, - @TagType(fs.Request.Msg).OpenRW => |*msg| { - const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; - msg.result = os.posixOpenC(msg.path.ptr, flags, msg.mode); + @TagType(fs.Request.Msg).Open => |*msg| { + msg.result = os.posixOpenC(msg.path.ptr, msg.flags, msg.mode); }, @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { @@ -718,7 +747,7 @@ pub const Loop = struct { } switch (builtin.os) { builtin.Os.linux => { - const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); + const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAIT, 0, null); switch (os.linux.getErrno(rc)) { 0 => continue, posix.EINTR => continue, @@ -742,7 +771,7 @@ pub const Loop = struct { final_eventfd: i32, final_eventfd_event: os.linux.epoll_event, fs_thread: *os.Thread, - fs_queue_len: i32, // we let this wrap + fs_queue_item: u8, fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, }, @@ -757,7 +786,6 @@ pub const Loop = struct { const MacOsData = struct { kqfd: i32, final_kevent: posix.Kevent, - kevents: []posix.Kevent, fs_kevent_wake: posix.Kevent, fs_kevent_wait: posix.Kevent, fs_thread: *os.Thread,