Merge pull request #14744 from ziglang/std.io.poll
introduce std.io.poll
This commit is contained in:
@@ -197,6 +197,19 @@ pub const ChildProcess = struct {
|
||||
stderr: []u8,
|
||||
};
|
||||
|
||||
fn fifoToOwnedArrayList(fifo: *std.io.PollFifo) std.ArrayList(u8) {
|
||||
if (fifo.head > 0) {
|
||||
std.mem.copy(u8, fifo.buf[0..fifo.count], fifo.buf[fifo.head .. fifo.head + fifo.count]);
|
||||
}
|
||||
const result = std.ArrayList(u8){
|
||||
.items = fifo.buf[0..fifo.count],
|
||||
.capacity = fifo.buf.len,
|
||||
.allocator = fifo.allocator,
|
||||
};
|
||||
fifo.* = std.io.PollFifo.init(fifo.allocator);
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Collect the output from the process's stdout and stderr. Will return once all output
|
||||
/// has been collected. This does not mean that the process has ended. `wait` should still
|
||||
/// be called to wait for and clean up the process.
|
||||
@@ -210,195 +223,27 @@ pub const ChildProcess = struct {
|
||||
) !void {
|
||||
debug.assert(child.stdout_behavior == .Pipe);
|
||||
debug.assert(child.stderr_behavior == .Pipe);
|
||||
if (builtin.os.tag == .haiku) {
|
||||
const stdout_in = child.stdout.?.reader();
|
||||
const stderr_in = child.stderr.?.reader();
|
||||
|
||||
try stdout_in.readAllArrayList(stdout, max_output_bytes);
|
||||
try stderr_in.readAllArrayList(stderr, max_output_bytes);
|
||||
} else if (builtin.os.tag == .windows) {
|
||||
try collectOutputWindows(child, stdout, stderr, max_output_bytes);
|
||||
} else {
|
||||
try collectOutputPosix(child, stdout, stderr, max_output_bytes);
|
||||
}
|
||||
}
|
||||
// we could make this work with multiple allocators but YAGNI
|
||||
if (stdout.allocator.ptr != stderr.allocator.ptr or
|
||||
stdout.allocator.vtable != stderr.allocator.vtable)
|
||||
@panic("ChildProcess.collectOutput only supports 1 allocator");
|
||||
|
||||
fn collectOutputPosix(
|
||||
child: ChildProcess,
|
||||
stdout: *std.ArrayList(u8),
|
||||
stderr: *std.ArrayList(u8),
|
||||
max_output_bytes: usize,
|
||||
) !void {
|
||||
var poll_fds = [_]os.pollfd{
|
||||
.{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined },
|
||||
.{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined },
|
||||
};
|
||||
var poller = std.io.poll(stdout.allocator, enum { stdout, stderr }, .{
|
||||
.stdout = child.stdout.?,
|
||||
.stderr = child.stderr.?,
|
||||
});
|
||||
defer poller.deinit();
|
||||
|
||||
var dead_fds: usize = 0;
|
||||
// We ask for ensureTotalCapacity with this much extra space. This has more of an
|
||||
// effect on small reads because once the reads start to get larger the amount
|
||||
// of space an ArrayList will allocate grows exponentially.
|
||||
const bump_amt = 512;
|
||||
|
||||
const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP;
|
||||
|
||||
while (dead_fds < poll_fds.len) {
|
||||
const events = try os.poll(&poll_fds, std.math.maxInt(i32));
|
||||
if (events == 0) continue;
|
||||
|
||||
var remove_stdout = false;
|
||||
var remove_stderr = false;
|
||||
// Try reading whatever is available before checking the error
|
||||
// conditions.
|
||||
// It's still possible to read after a POLL.HUP is received, always
|
||||
// check if there's some data waiting to be read first.
|
||||
if (poll_fds[0].revents & os.POLL.IN != 0) {
|
||||
// stdout is ready.
|
||||
const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes);
|
||||
try stdout.ensureTotalCapacity(new_capacity);
|
||||
const buf = stdout.unusedCapacitySlice();
|
||||
if (buf.len == 0) return error.StdoutStreamTooLong;
|
||||
const nread = try os.read(poll_fds[0].fd, buf);
|
||||
stdout.items.len += nread;
|
||||
|
||||
// Remove the fd when the EOF condition is met.
|
||||
remove_stdout = nread == 0;
|
||||
} else {
|
||||
remove_stdout = poll_fds[0].revents & err_mask != 0;
|
||||
}
|
||||
|
||||
if (poll_fds[1].revents & os.POLL.IN != 0) {
|
||||
// stderr is ready.
|
||||
const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes);
|
||||
try stderr.ensureTotalCapacity(new_capacity);
|
||||
const buf = stderr.unusedCapacitySlice();
|
||||
if (buf.len == 0) return error.StderrStreamTooLong;
|
||||
const nread = try os.read(poll_fds[1].fd, buf);
|
||||
stderr.items.len += nread;
|
||||
|
||||
// Remove the fd when the EOF condition is met.
|
||||
remove_stderr = nread == 0;
|
||||
} else {
|
||||
remove_stderr = poll_fds[1].revents & err_mask != 0;
|
||||
}
|
||||
|
||||
// Exclude the fds that signaled an error.
|
||||
if (remove_stdout) {
|
||||
poll_fds[0].fd = -1;
|
||||
dead_fds += 1;
|
||||
}
|
||||
if (remove_stderr) {
|
||||
poll_fds[1].fd = -1;
|
||||
dead_fds += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const WindowsAsyncReadResult = enum {
|
||||
pending,
|
||||
closed,
|
||||
full,
|
||||
};
|
||||
|
||||
fn windowsAsyncRead(
|
||||
handle: windows.HANDLE,
|
||||
overlapped: *windows.OVERLAPPED,
|
||||
buf: *std.ArrayList(u8),
|
||||
bump_amt: usize,
|
||||
max_output_bytes: usize,
|
||||
) !WindowsAsyncReadResult {
|
||||
while (true) {
|
||||
const new_capacity = std.math.min(buf.items.len + bump_amt, max_output_bytes);
|
||||
try buf.ensureTotalCapacity(new_capacity);
|
||||
const next_buf = buf.unusedCapacitySlice();
|
||||
if (next_buf.len == 0) return .full;
|
||||
var read_bytes: u32 = undefined;
|
||||
const read_result = windows.kernel32.ReadFile(handle, next_buf.ptr, math.cast(u32, next_buf.len) orelse maxInt(u32), &read_bytes, overlapped);
|
||||
if (read_result == 0) return switch (windows.kernel32.GetLastError()) {
|
||||
.IO_PENDING => .pending,
|
||||
.BROKEN_PIPE => .closed,
|
||||
else => |err| windows.unexpectedError(err),
|
||||
};
|
||||
buf.items.len += read_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
fn collectOutputWindows(child: ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
|
||||
const bump_amt = 512;
|
||||
const outs = [_]*std.ArrayList(u8){
|
||||
stdout,
|
||||
stderr,
|
||||
};
|
||||
const handles = [_]windows.HANDLE{
|
||||
child.stdout.?.handle,
|
||||
child.stderr.?.handle,
|
||||
};
|
||||
|
||||
var overlapped = [_]windows.OVERLAPPED{
|
||||
mem.zeroes(windows.OVERLAPPED),
|
||||
mem.zeroes(windows.OVERLAPPED),
|
||||
};
|
||||
|
||||
var wait_objects: [2]windows.HANDLE = undefined;
|
||||
var wait_object_count: u2 = 0;
|
||||
|
||||
// we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope
|
||||
defer for (wait_objects[0..wait_object_count]) |o| {
|
||||
_ = windows.kernel32.CancelIo(o);
|
||||
};
|
||||
|
||||
// Windows Async IO requires an initial call to ReadFile before waiting on the handle
|
||||
for ([_]u1{ 0, 1 }) |i| {
|
||||
switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
|
||||
.pending => {
|
||||
wait_objects[wait_object_count] = handles[i];
|
||||
wait_object_count += 1;
|
||||
},
|
||||
.closed => {}, // don't add to the wait_objects list
|
||||
.full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
|
||||
}
|
||||
while (try poller.poll()) {
|
||||
if (poller.fifo(.stdout).count > max_output_bytes)
|
||||
return error.StdoutStreamTooLong;
|
||||
if (poller.fifo(.stderr).count > max_output_bytes)
|
||||
return error.StderrStreamTooLong;
|
||||
}
|
||||
|
||||
while (wait_object_count > 0) {
|
||||
const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE);
|
||||
if (status == windows.WAIT_FAILED) {
|
||||
switch (windows.kernel32.GetLastError()) {
|
||||
else => |err| return windows.unexpectedError(err),
|
||||
}
|
||||
}
|
||||
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1)
|
||||
unreachable;
|
||||
|
||||
const wait_idx = status - windows.WAIT_OBJECT_0;
|
||||
|
||||
// this extra `i` index is needed to map the wait handle back to the stdout or stderr
|
||||
// values since the wait_idx can change which handle it corresponds with
|
||||
const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1;
|
||||
|
||||
// remove completed event from the wait list
|
||||
wait_object_count -= 1;
|
||||
if (wait_idx == 0)
|
||||
wait_objects[0] = wait_objects[1];
|
||||
|
||||
var read_bytes: u32 = undefined;
|
||||
if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) {
|
||||
switch (windows.kernel32.GetLastError()) {
|
||||
.BROKEN_PIPE => continue,
|
||||
else => |err| return windows.unexpectedError(err),
|
||||
}
|
||||
}
|
||||
|
||||
outs[i].items.len += read_bytes;
|
||||
|
||||
switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
|
||||
.pending => {
|
||||
wait_objects[wait_object_count] = handles[i];
|
||||
wait_object_count += 1;
|
||||
},
|
||||
.closed => {}, // don't add to the wait_objects list
|
||||
.full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
|
||||
}
|
||||
}
|
||||
stdout.* = fifoToOwnedArrayList(poller.fifo(.stdout));
|
||||
stderr.* = fifoToOwnedArrayList(poller.fifo(.stderr));
|
||||
}
|
||||
|
||||
/// Spawns a child process, waits for it, collecting stdout and stderr, and then returns.
|
||||
|
||||
@@ -423,6 +423,7 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type {
|
||||
}
|
||||
} else struct {};
|
||||
|
||||
/// Returns true if there were leaks; false otherwise.
|
||||
pub fn deinit(self: *Self) bool {
|
||||
const leaks = if (config.safety) self.detectLeaks() else false;
|
||||
if (config.retain_metadata) {
|
||||
|
||||
250
lib/std/io.zig
250
lib/std/io.zig
@@ -168,6 +168,256 @@ test "null_writer" {
|
||||
null_writer.writeAll("yay" ** 10) catch |err| switch (err) {};
|
||||
}
|
||||
|
||||
pub fn poll(
|
||||
allocator: std.mem.Allocator,
|
||||
comptime StreamEnum: type,
|
||||
files: PollFiles(StreamEnum),
|
||||
) Poller(StreamEnum) {
|
||||
const enum_fields = @typeInfo(StreamEnum).Enum.fields;
|
||||
var result: Poller(StreamEnum) = undefined;
|
||||
|
||||
if (builtin.os.tag == .windows) result.windows = .{
|
||||
.first_read_done = false,
|
||||
.overlapped = [1]os.windows.OVERLAPPED{
|
||||
mem.zeroes(os.windows.OVERLAPPED),
|
||||
} ** enum_fields.len,
|
||||
.active = .{
|
||||
.count = 0,
|
||||
.handles_buf = undefined,
|
||||
.stream_map = undefined,
|
||||
},
|
||||
};
|
||||
|
||||
inline for (0..enum_fields.len) |i| {
|
||||
result.fifos[i] = .{
|
||||
.allocator = allocator,
|
||||
.buf = &.{},
|
||||
.head = 0,
|
||||
.count = 0,
|
||||
};
|
||||
if (builtin.os.tag == .windows) {
|
||||
result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle;
|
||||
} else {
|
||||
result.poll_fds[i] = .{
|
||||
.fd = @field(files, enum_fields[i].name).handle,
|
||||
.events = os.POLL.IN,
|
||||
.revents = undefined,
|
||||
};
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic);
|
||||
|
||||
pub fn Poller(comptime StreamEnum: type) type {
|
||||
return struct {
|
||||
const enum_fields = @typeInfo(StreamEnum).Enum.fields;
|
||||
const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd;
|
||||
|
||||
fifos: [enum_fields.len]PollFifo,
|
||||
poll_fds: [enum_fields.len]PollFd,
|
||||
windows: if (builtin.os.tag == .windows) struct {
|
||||
first_read_done: bool,
|
||||
overlapped: [enum_fields.len]os.windows.OVERLAPPED,
|
||||
active: struct {
|
||||
count: math.IntFittingRange(0, enum_fields.len),
|
||||
handles_buf: [enum_fields.len]os.windows.HANDLE,
|
||||
stream_map: [enum_fields.len]StreamEnum,
|
||||
|
||||
pub fn removeAt(self: *@This(), index: u32) void {
|
||||
std.debug.assert(index < self.count);
|
||||
for (index + 1..self.count) |i| {
|
||||
self.handles_buf[i - 1] = self.handles_buf[i];
|
||||
self.stream_map[i - 1] = self.stream_map[i];
|
||||
}
|
||||
self.count -= 1;
|
||||
}
|
||||
},
|
||||
} else void,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
if (builtin.os.tag == .windows) {
|
||||
// cancel any pending IO to prevent clobbering OVERLAPPED value
|
||||
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
|
||||
_ = os.windows.kernel32.CancelIo(h);
|
||||
}
|
||||
}
|
||||
inline for (&self.fifos) |*q| q.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
|
||||
pub fn poll(self: *Self) !bool {
|
||||
if (builtin.os.tag == .windows) {
|
||||
return pollWindows(self);
|
||||
} else {
|
||||
return pollPosix(self);
|
||||
}
|
||||
}
|
||||
|
||||
pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo {
|
||||
return &self.fifos[@enumToInt(which)];
|
||||
}
|
||||
|
||||
fn pollWindows(self: *Self) !bool {
|
||||
const bump_amt = 512;
|
||||
|
||||
if (!self.windows.first_read_done) {
|
||||
// Windows Async IO requires an initial call to ReadFile before waiting on the handle
|
||||
for (0..enum_fields.len) |i| {
|
||||
const handle = self.windows.active.handles_buf[i];
|
||||
switch (try windowsAsyncRead(
|
||||
handle,
|
||||
&self.windows.overlapped[i],
|
||||
&self.fifos[i],
|
||||
bump_amt,
|
||||
)) {
|
||||
.pending => {
|
||||
self.windows.active.handles_buf[self.windows.active.count] = handle;
|
||||
self.windows.active.stream_map[self.windows.active.count] = @intToEnum(StreamEnum, i);
|
||||
self.windows.active.count += 1;
|
||||
},
|
||||
.closed => {}, // don't add to the wait_objects list
|
||||
}
|
||||
}
|
||||
self.windows.first_read_done = true;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (self.windows.active.count == 0) return false;
|
||||
|
||||
const status = os.windows.kernel32.WaitForMultipleObjects(
|
||||
self.windows.active.count,
|
||||
&self.windows.active.handles_buf,
|
||||
0,
|
||||
os.windows.INFINITE,
|
||||
);
|
||||
if (status == os.windows.WAIT_FAILED)
|
||||
return os.windows.unexpectedError(os.windows.kernel32.GetLastError());
|
||||
|
||||
if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1)
|
||||
unreachable;
|
||||
|
||||
const active_idx = status - os.windows.WAIT_OBJECT_0;
|
||||
|
||||
const handle = self.windows.active.handles_buf[active_idx];
|
||||
const stream_idx = @enumToInt(self.windows.active.stream_map[active_idx]);
|
||||
var read_bytes: u32 = undefined;
|
||||
if (0 == os.windows.kernel32.GetOverlappedResult(
|
||||
handle,
|
||||
&self.windows.overlapped[stream_idx],
|
||||
&read_bytes,
|
||||
0,
|
||||
)) switch (os.windows.kernel32.GetLastError()) {
|
||||
.BROKEN_PIPE => {
|
||||
self.windows.active.removeAt(active_idx);
|
||||
continue;
|
||||
},
|
||||
else => |err| return os.windows.unexpectedError(err),
|
||||
};
|
||||
|
||||
self.fifos[stream_idx].update(read_bytes);
|
||||
|
||||
switch (try windowsAsyncRead(
|
||||
handle,
|
||||
&self.windows.overlapped[stream_idx],
|
||||
&self.fifos[stream_idx],
|
||||
bump_amt,
|
||||
)) {
|
||||
.pending => {},
|
||||
.closed => self.windows.active.removeAt(active_idx),
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
fn pollPosix(self: *Self) !bool {
|
||||
// We ask for ensureUnusedCapacity with this much extra space. This
|
||||
// has more of an effect on small reads because once the reads
|
||||
// start to get larger the amount of space an ArrayList will
|
||||
// allocate grows exponentially.
|
||||
const bump_amt = 512;
|
||||
|
||||
const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP;
|
||||
|
||||
const events_len = try os.poll(&self.poll_fds, std.math.maxInt(i32));
|
||||
if (events_len == 0) {
|
||||
for (self.poll_fds) |poll_fd| {
|
||||
if (poll_fd.fd != -1) return true;
|
||||
} else return false;
|
||||
}
|
||||
|
||||
var keep_polling = false;
|
||||
inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| {
|
||||
// Try reading whatever is available before checking the error
|
||||
// conditions.
|
||||
// It's still possible to read after a POLL.HUP is received,
|
||||
// always check if there's some data waiting to be read first.
|
||||
if (poll_fd.revents & os.POLL.IN != 0) {
|
||||
const buf = try q.writableWithSize(bump_amt);
|
||||
const amt = try os.read(poll_fd.fd, buf);
|
||||
q.update(amt);
|
||||
if (amt == 0) {
|
||||
// Remove the fd when the EOF condition is met.
|
||||
poll_fd.fd = -1;
|
||||
} else {
|
||||
keep_polling = true;
|
||||
}
|
||||
} else if (poll_fd.revents & err_mask != 0) {
|
||||
// Exclude the fds that signaled an error.
|
||||
poll_fd.fd = -1;
|
||||
} else if (poll_fd.fd != -1) {
|
||||
keep_polling = true;
|
||||
}
|
||||
}
|
||||
return keep_polling;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn windowsAsyncRead(
|
||||
handle: os.windows.HANDLE,
|
||||
overlapped: *os.windows.OVERLAPPED,
|
||||
fifo: *PollFifo,
|
||||
bump_amt: usize,
|
||||
) !enum { pending, closed } {
|
||||
while (true) {
|
||||
const buf = try fifo.writableWithSize(bump_amt);
|
||||
var read_bytes: u32 = undefined;
|
||||
const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped);
|
||||
if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) {
|
||||
.IO_PENDING => .pending,
|
||||
.BROKEN_PIPE => .closed,
|
||||
else => |err| os.windows.unexpectedError(err),
|
||||
};
|
||||
fifo.update(read_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an enum, returns a struct with fields of that enum, each field
|
||||
/// representing an I/O stream for polling.
|
||||
pub fn PollFiles(comptime StreamEnum: type) type {
|
||||
const enum_fields = @typeInfo(StreamEnum).Enum.fields;
|
||||
var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined;
|
||||
for (&struct_fields, enum_fields) |*struct_field, enum_field| {
|
||||
struct_field.* = .{
|
||||
.name = enum_field.name,
|
||||
.type = fs.File,
|
||||
.default_value = null,
|
||||
.is_comptime = false,
|
||||
.alignment = @alignOf(fs.File),
|
||||
};
|
||||
}
|
||||
return @Type(.{ .Struct = .{
|
||||
.layout = .Auto,
|
||||
.fields = &struct_fields,
|
||||
.decls = &.{},
|
||||
.is_tuple = false,
|
||||
} });
|
||||
}
|
||||
|
||||
test {
|
||||
_ = @import("io/bit_reader.zig");
|
||||
_ = @import("io/bit_writer.zig");
|
||||
|
||||
Reference in New Issue
Block a user