Fixed std.fs.Watch implementation on Linux
Added .Deleted event to std.fs.Watch on Linux
This commit is contained in:
@@ -25,9 +25,7 @@ const WatchEventId = enum {
|
||||
};
|
||||
|
||||
fn eqlString(a: []const u16, b: []const u16) bool {
|
||||
if (a.len != b.len) return false;
|
||||
if (a.ptr == b.ptr) return true;
|
||||
return mem.compare(u16, a, b) == .Equal;
|
||||
return mem.eql(u16, a, b);
|
||||
}
|
||||
|
||||
fn hashString(s: []const u16) u32 {
|
||||
@@ -43,7 +41,7 @@ const WatchEventError = error{
|
||||
|
||||
pub fn Watch(comptime V: type) type {
|
||||
return struct {
|
||||
channel: *event.Channel(Event.Error!Event),
|
||||
channel: event.Channel(Event.Error!Event),
|
||||
os_data: OsData,
|
||||
allocator: *Allocator,
|
||||
|
||||
@@ -110,19 +108,14 @@ pub fn Watch(comptime V: type) type {
|
||||
pub const Event = struct {
|
||||
id: Id,
|
||||
data: V,
|
||||
dirname: []const u8,
|
||||
basename: []const u8,
|
||||
|
||||
pub const Id = WatchEventId;
|
||||
pub const Error = WatchEventError;
|
||||
};
|
||||
|
||||
pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self {
|
||||
const channel = try allocator.create(event.Channel(Event.Error!Event));
|
||||
errdefer allocator.destroy(channel);
|
||||
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
errdefer allocator.free(buf);
|
||||
channel.init(buf);
|
||||
errdefer channel.deinit();
|
||||
|
||||
const self = try allocator.create(Self);
|
||||
errdefer allocator.destroy(self);
|
||||
|
||||
@@ -133,15 +126,17 @@ pub fn Watch(comptime V: type) type {
|
||||
|
||||
self.* = Self{
|
||||
.allocator = allocator,
|
||||
.channel = channel,
|
||||
.channel = undefined,
|
||||
.os_data = OsData{
|
||||
.putter_frame = undefined,
|
||||
.inotify_fd = inotify_fd,
|
||||
.wd_table = OsData.WdTable.init(allocator),
|
||||
.table_lock = event.Lock.init(),
|
||||
.table_lock = event.Lock{},
|
||||
},
|
||||
};
|
||||
|
||||
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
self.channel.init(buf);
|
||||
self.os_data.putter_frame = async self.linuxEventPutter();
|
||||
return self;
|
||||
},
|
||||
@@ -149,14 +144,16 @@ pub fn Watch(comptime V: type) type {
|
||||
.windows => {
|
||||
self.* = Self{
|
||||
.allocator = allocator,
|
||||
.channel = channel,
|
||||
.channel = undefined,
|
||||
.os_data = OsData{
|
||||
.table_lock = event.Lock.init(),
|
||||
.table_lock = event.Lock{},
|
||||
.dir_table = OsData.DirTable.init(allocator),
|
||||
.ref_count = std.atomic.Int(usize).init(1),
|
||||
.all_putters = std.atomic.Queue(anyframe).init(),
|
||||
.all_putters = std.atomic.Queue(WindowsOsData.Put).init(),
|
||||
},
|
||||
};
|
||||
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
self.channel.init(buf);
|
||||
return self;
|
||||
},
|
||||
|
||||
@@ -194,6 +191,17 @@ pub fn Watch(comptime V: type) type {
|
||||
},
|
||||
.linux => {
|
||||
self.os_data.cancelled = true;
|
||||
{
|
||||
// Remove all directory watches linuxEventPutter will take care of
|
||||
// cleaning up the memory and closing the inotify fd.
|
||||
var dir_it = self.os_data.wd_table.iterator();
|
||||
while (dir_it.next()) |wd_entry| {
|
||||
const rc = os.linux.inotify_rm_watch(self.os_data.inotify_fd, wd_entry.key);
|
||||
// Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid
|
||||
std.debug.assert(rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
await self.os_data.putter_frame;
|
||||
self.allocator.destroy(self);
|
||||
},
|
||||
@@ -322,19 +330,12 @@ pub fn Watch(comptime V: type) type {
|
||||
|
||||
fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
|
||||
const dirname = std.fs.path.dirname(file_path) orelse ".";
|
||||
const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname);
|
||||
var dirname_with_null_consumed = false;
|
||||
defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null);
|
||||
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
const basename_with_null = try std.cstr.addNullByte(self.allocator, basename);
|
||||
var basename_with_null_consumed = false;
|
||||
defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null);
|
||||
|
||||
const wd = try os.inotify_add_watchZ(
|
||||
const wd = try os.inotify_add_watch(
|
||||
self.os_data.inotify_fd,
|
||||
dirname_with_null.ptr,
|
||||
os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
|
||||
dirname,
|
||||
os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_DELETE | os.linux.IN_EXCL_UNLINK,
|
||||
);
|
||||
// wd is either a newly created watch or an existing one.
|
||||
|
||||
@@ -343,22 +344,21 @@ pub fn Watch(comptime V: type) type {
|
||||
|
||||
const gop = try self.os_data.wd_table.getOrPut(wd);
|
||||
if (!gop.found_existing) {
|
||||
gop.kv.value = OsData.Dir{
|
||||
.dirname = dirname_with_null,
|
||||
gop.entry.value = OsData.Dir{
|
||||
.dirname = try self.allocator.dupe(u8, dirname),
|
||||
.file_table = OsData.FileTable.init(self.allocator),
|
||||
};
|
||||
dirname_with_null_consumed = true;
|
||||
}
|
||||
const dir = &gop.kv.value;
|
||||
|
||||
const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
|
||||
const dir = &gop.entry.value;
|
||||
const file_table_gop = try dir.file_table.getOrPut(basename);
|
||||
if (file_table_gop.found_existing) {
|
||||
const prev_value = file_table_gop.kv.value;
|
||||
file_table_gop.kv.value = value;
|
||||
const prev_value = file_table_gop.entry.value;
|
||||
file_table_gop.entry.value = value;
|
||||
return prev_value;
|
||||
} else {
|
||||
file_table_gop.kv.value = value;
|
||||
basename_with_null_consumed = true;
|
||||
file_table_gop.entry.key = try self.allocator.dupe(u8, basename);
|
||||
file_table_gop.entry.value = value;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -539,76 +539,96 @@ pub fn Watch(comptime V: type) type {
|
||||
}
|
||||
|
||||
pub fn removeFile(self: *Self, file_path: []const u8) ?V {
|
||||
@panic("TODO");
|
||||
switch (builtin.os.tag) {
|
||||
.linux => {
|
||||
const dirname = std.fs.path.dirname(file_path) orelse ".";
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const dir = self.os_data.wd_table.get(dirname) orelse return null;
|
||||
if (dir.file_table.remove(basename)) |file_entry| {
|
||||
self.allocator.free(file_entry.key);
|
||||
return file_entry.value;
|
||||
}
|
||||
return null;
|
||||
},
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => @panic("TODO"),
|
||||
.windows => return @panic("TODO"),
|
||||
else => @compileError("Unsupported OS"),
|
||||
}
|
||||
}
|
||||
|
||||
fn linuxEventPutter(self: *Self) void {
|
||||
global_event_loop.beginOneEvent();
|
||||
|
||||
defer {
|
||||
self.os_data.table_lock.deinit();
|
||||
var wd_it = self.os_data.wd_table.iterator();
|
||||
while (wd_it.next()) |wd_entry| {
|
||||
var file_it = wd_entry.value.file_table.iterator();
|
||||
while (file_it.next()) |file_entry| {
|
||||
self.allocator.free(file_entry.key);
|
||||
}
|
||||
self.allocator.free(wd_entry.value.dirname);
|
||||
wd_entry.value.file_table.deinit();
|
||||
}
|
||||
std.debug.assert(self.os_data.wd_table.count() == 0);
|
||||
self.os_data.wd_table.deinit();
|
||||
global_event_loop.finishOneEvent();
|
||||
os.close(self.os_data.inotify_fd);
|
||||
self.channel.deinit();
|
||||
self.allocator.free(self.channel.buffer_nodes);
|
||||
self.channel.deinit();
|
||||
global_event_loop.finishOneEvent();
|
||||
}
|
||||
|
||||
var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
|
||||
|
||||
while (!self.os_data.cancelled) {
|
||||
const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len);
|
||||
const errno = os.linux.getErrno(rc);
|
||||
switch (errno) {
|
||||
0 => {
|
||||
// can't use @bytesToSlice because of the special variable length name field
|
||||
var ptr = event_buf[0..].ptr;
|
||||
const end_ptr = ptr + event_buf.len;
|
||||
var ev: *os.linux.inotify_event = undefined;
|
||||
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) {
|
||||
ev = @ptrCast(*os.linux.inotify_event, ptr);
|
||||
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
|
||||
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
|
||||
// `ev.len` counts all bytes in `ev.name` including terminating null byte.
|
||||
const basename_with_null = basename_ptr[0..ev.len];
|
||||
const user_value = blk: {
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
const bytes_read = global_event_loop.read(self.os_data.inotify_fd, &event_buf, false) catch unreachable;
|
||||
|
||||
const dir = &self.os_data.wd_table.get(ev.wd).?.value;
|
||||
if (dir.file_table.get(basename_with_null)) |entry| {
|
||||
break :blk entry.value;
|
||||
} else {
|
||||
break :blk null;
|
||||
}
|
||||
};
|
||||
if (user_value) |v| {
|
||||
self.channel.put(Event{
|
||||
.id = WatchEventId.CloseWrite,
|
||||
.data = v,
|
||||
});
|
||||
}
|
||||
}
|
||||
var ptr: [*]u8 = &event_buf;
|
||||
const end_ptr = ptr + bytes_read;
|
||||
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) {
|
||||
const ev = @ptrCast(*const os.linux.inotify_event, ptr);
|
||||
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
|
||||
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
|
||||
const basename = std.mem.span(@ptrCast([*:0]u8, basename_ptr));
|
||||
|
||||
ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len);
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const dir = &self.os_data.wd_table.get(ev.wd).?;
|
||||
if (dir.file_table.getEntry(basename)) |file_value| {
|
||||
self.channel.put(Event{
|
||||
.id = .CloseWrite,
|
||||
.data = file_value.value,
|
||||
.dirname = dir.dirname,
|
||||
.basename = file_value.key,
|
||||
});
|
||||
}
|
||||
},
|
||||
os.linux.EINTR => continue,
|
||||
os.linux.EINVAL => unreachable,
|
||||
os.linux.EFAULT => unreachable,
|
||||
os.linux.EAGAIN => {
|
||||
global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN | os.EPOLLONESHOT);
|
||||
},
|
||||
else => unreachable,
|
||||
} else if (ev.mask & os.linux.IN_IGNORED == os.linux.IN_IGNORED) {
|
||||
// Directory watch was removed
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
if (self.os_data.wd_table.remove(ev.wd)) |*wd_entry| {
|
||||
var file_it = wd_entry.value.file_table.iterator();
|
||||
while (file_it.next()) |file_entry| {
|
||||
self.allocator.free(file_entry.key);
|
||||
}
|
||||
self.allocator.free(wd_entry.value.dirname);
|
||||
wd_entry.value.file_table.deinit();
|
||||
}
|
||||
} else if (ev.mask & os.linux.IN_DELETE == os.linux.IN_DELETE) {
|
||||
// File or directory was removed or deleted
|
||||
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
|
||||
const basename = std.mem.span(@ptrCast([*:0]u8, basename_ptr));
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
const dir = &self.os_data.wd_table.get(ev.wd).?;
|
||||
|
||||
if (dir.file_table.getEntry(basename)) |file_value| {
|
||||
self.channel.put(Event{
|
||||
.id = .Delete,
|
||||
.data = file_value.value,
|
||||
.dirname = dir.dirname,
|
||||
.basename = file_value.key,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -617,19 +637,19 @@ pub fn Watch(comptime V: type) type {
|
||||
|
||||
const test_tmp_dir = "std_event_fs_test";
|
||||
|
||||
test "write a file, watch it, write it again" {
|
||||
// TODO re-enable this test
|
||||
if (true) return error.SkipZigTest;
|
||||
test "write a file, watch it, write it again, delete it" {
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
// TODO https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
try fs.cwd().makePath(test_tmp_dir);
|
||||
defer fs.cwd().deleteTree(test_tmp_dir) catch {};
|
||||
try std.fs.cwd().makePath(test_tmp_dir);
|
||||
defer std.fs.cwd().deleteTree(test_tmp_dir) catch {};
|
||||
|
||||
const allocator = std.heap.page_allocator;
|
||||
return testFsWatch(&allocator);
|
||||
return testWriteWatchWriteDelete(std.testing.allocator);
|
||||
}
|
||||
|
||||
fn testFsWatch(allocator: *Allocator) !void {
|
||||
const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" });
|
||||
fn testWriteWatchWriteDelete(allocator: *Allocator) !void {
|
||||
const file_path = try std.fs.path.join(allocator, &[_][]const u8{ test_tmp_dir, "file.txt" });
|
||||
defer allocator.free(file_path);
|
||||
|
||||
const contents =
|
||||
@@ -639,9 +659,10 @@ fn testFsWatch(allocator: *Allocator) !void {
|
||||
const line2_offset = 7;
|
||||
|
||||
// first just write then read the file
|
||||
try writeFile(allocator, file_path, contents);
|
||||
try std.fs.cwd().writeFile(file_path, contents);
|
||||
|
||||
const read_contents = try readFile(allocator, file_path, 1024 * 1024);
|
||||
const read_contents = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
|
||||
defer allocator.free(read_contents);
|
||||
testing.expectEqualSlices(u8, contents, read_contents);
|
||||
|
||||
// now watch the file
|
||||
@@ -650,28 +671,47 @@ fn testFsWatch(allocator: *Allocator) !void {
|
||||
|
||||
testing.expect((try watch.addFile(file_path, {})) == null);
|
||||
|
||||
const ev = watch.channel.get();
|
||||
var ev = async watch.channel.get();
|
||||
var ev_consumed = false;
|
||||
defer if (!ev_consumed) await ev;
|
||||
defer if (!ev_consumed) {
|
||||
_ = await ev;
|
||||
};
|
||||
|
||||
// overwrite line 2
|
||||
const fd = try await openReadWrite(file_path, File.default_mode);
|
||||
const file = try std.fs.cwd().openFile(file_path, .{ .read = true, .write = true });
|
||||
{
|
||||
defer os.close(fd);
|
||||
|
||||
try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset);
|
||||
defer file.close();
|
||||
const write_contents = "lorem ipsum";
|
||||
var iovec = [_]os.iovec_const{.{
|
||||
.iov_base = write_contents,
|
||||
.iov_len = write_contents.len,
|
||||
}};
|
||||
_ = try file.pwritevAll(&iovec, line2_offset);
|
||||
}
|
||||
|
||||
ev_consumed = true;
|
||||
switch ((try await ev).id) {
|
||||
WatchEventId.CloseWrite => {},
|
||||
WatchEventId.Delete => @panic("wrong event"),
|
||||
.CloseWrite => {
|
||||
ev_consumed = true;
|
||||
},
|
||||
.Delete => @panic("wrong event"),
|
||||
}
|
||||
const contents_updated = try readFile(allocator, file_path, 1024 * 1024);
|
||||
|
||||
const contents_updated = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
|
||||
defer allocator.free(contents_updated);
|
||||
|
||||
testing.expectEqualSlices(u8,
|
||||
\\line 1
|
||||
\\lorem ipsum
|
||||
, contents_updated);
|
||||
|
||||
// TODO test deleting the file and then re-adding it. we should get events for both
|
||||
ev = async watch.channel.get();
|
||||
ev_consumed = false;
|
||||
|
||||
try std.fs.cwd().deleteFile(file_path);
|
||||
switch ((try await ev).id) {
|
||||
.Delete => {
|
||||
ev_consumed = true;
|
||||
},
|
||||
.CloseWrite => @panic("wrong event"),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user