std.Io.poll: update to new I/O API

This commit is contained in:
Andrew Kelley
2025-07-22 21:21:27 -07:00
parent bc8e1a74c5
commit b8955a2e0a
9 changed files with 391 additions and 384 deletions

View File

@@ -3,7 +3,6 @@ const builtin = @import("builtin");
const io = std.io;
const fs = std.fs;
const process = std.process;
const ChildProcess = std.process.Child;
const Progress = std.Progress;
const print = std.debug.print;
const mem = std.mem;

View File

@@ -186,7 +186,7 @@ pub fn main() !void {
try child.spawn();
var poller = std.io.poll(arena, Eval.StreamEnum, .{
var poller = std.Io.poll(arena, Eval.StreamEnum, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
@@ -247,19 +247,15 @@ const Eval = struct {
fn check(eval: *Eval, poller: *Poller, update: Case.Update, prog_node: std.Progress.Node) !void {
const arena = eval.arena;
const Header = std.zig.Server.Message.Header;
const stdout = poller.fifo(.stdout);
const stderr = poller.fifo(.stderr);
const stdout = poller.reader(.stdout);
const stderr = poller.reader(.stderr);
poll: while (true) {
while (stdout.readableLength() < @sizeOf(Header)) {
if (!(try poller.poll())) break :poll;
}
const header = stdout.reader().readStruct(Header) catch unreachable;
while (stdout.readableLength() < header.bytes_len) {
if (!(try poller.poll())) break :poll;
}
const body = stdout.readableSliceOfLen(header.bytes_len);
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
switch (header.tag) {
.error_bundle => {
@@ -277,8 +273,8 @@ const Eval = struct {
.string_bytes = try arena.dupe(u8, string_bytes),
.extra = extra_array,
};
if (stderr.readableLength() > 0) {
const stderr_data = try stderr.toOwnedSlice();
if (stderr.bufferedLen() > 0) {
const stderr_data = try poller.toOwnedSlice(.stderr);
if (eval.allow_stderr) {
std.log.info("error_bundle included stderr:\n{s}", .{stderr_data});
} else {
@@ -289,15 +285,14 @@ const Eval = struct {
try eval.checkErrorOutcome(update, result_error_bundle);
}
// This message indicates the end of the update.
stdout.discard(body.len);
return;
},
.emit_digest => {
const EbpHdr = std.zig.Server.Message.EmitDigest;
const ebp_hdr = @as(*align(1) const EbpHdr, @ptrCast(body));
_ = ebp_hdr;
if (stderr.readableLength() > 0) {
const stderr_data = try stderr.toOwnedSlice();
if (stderr.bufferedLen() > 0) {
const stderr_data = try poller.toOwnedSlice(.stderr);
if (eval.allow_stderr) {
std.log.info("emit_digest included stderr:\n{s}", .{stderr_data});
} else {
@@ -308,7 +303,6 @@ const Eval = struct {
if (eval.target.backend == .sema) {
try eval.checkSuccessOutcome(update, null, prog_node);
// This message indicates the end of the update.
stdout.discard(body.len);
}
const digest = body[@sizeOf(EbpHdr)..][0..Cache.bin_digest_len];
@@ -323,21 +317,18 @@ const Eval = struct {
try eval.checkSuccessOutcome(update, bin_path, prog_node);
// This message indicates the end of the update.
stdout.discard(body.len);
},
else => {
// Ignore other messages.
stdout.discard(body.len);
},
}
}
if (stderr.readableLength() > 0) {
const stderr_data = try stderr.toOwnedSlice();
if (stderr.bufferedLen() > 0) {
if (eval.allow_stderr) {
std.log.info("update '{s}' included stderr:\n{s}", .{ update.name, stderr_data });
std.log.info("update '{s}' included stderr:\n{s}", .{ update.name, stderr.buffered() });
} else {
eval.fatal("update '{s}' failed:\n{s}", .{ update.name, stderr_data });
eval.fatal("update '{s}' failed:\n{s}", .{ update.name, stderr.buffered() });
}
}
@@ -537,25 +528,19 @@ const Eval = struct {
fn end(eval: *Eval, poller: *Poller) !void {
requestExit(eval.child, eval);
const Header = std.zig.Server.Message.Header;
const stdout = poller.fifo(.stdout);
const stderr = poller.fifo(.stderr);
const stdout = poller.reader(.stdout);
const stderr = poller.reader(.stderr);
poll: while (true) {
while (stdout.readableLength() < @sizeOf(Header)) {
if (!(try poller.poll())) break :poll;
}
const header = stdout.reader().readStruct(Header) catch unreachable;
while (stdout.readableLength() < header.bytes_len) {
if (!(try poller.poll())) break :poll;
}
const body = stdout.readableSliceOfLen(header.bytes_len);
stdout.discard(body.len);
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
stdout.toss(header.bytes_len);
}
if (stderr.readableLength() > 0) {
const stderr_data = try stderr.toOwnedSlice();
eval.fatal("unexpected stderr:\n{s}", .{stderr_data});
if (stderr.bufferedLen() > 0) {
eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()});
}
}