From 9e0338b82e45f672975bf7daa1ade5f4b2de4c01 Mon Sep 17 00:00:00 2001 From: Jonathan Marler Date: Thu, 17 Jun 2021 17:36:42 -0600 Subject: [PATCH] finish ChildProcess collectOutputWindows This finishes LemonBoy's Draft PR ziglang#6750. It updates ChildProcess to collect the output from stdout/stderr asynchronously using Overlapped IO and named pipes. --- lib/std/child_process.zig | 228 +++++++++++++++++--------------- lib/std/os/windows/kernel32.zig | 47 ++----- 2 files changed, 136 insertions(+), 139 deletions(-) diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig index 644a17c1dc..b63153e904 100644 --- a/lib/std/child_process.zig +++ b/lib/std/child_process.zig @@ -13,6 +13,7 @@ const process = std.process; const File = std.fs.File; const windows = os.windows; const mem = std.mem; +const math = std.math; const debug = std.debug; const BufMap = std.BufMap; const builtin = std.builtin; @@ -257,58 +258,76 @@ pub const ChildProcess = struct { } } - fn collectOutputWindows(child: *const ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void { - var wait_objects = [_]windows.kernel32.HANDLE{ - child.stdout.?.handle, child.stderr.?.handle, + fn collectOutputWindows(child: *const ChildProcess, outs: [2]*std.ArrayList(u8), max_output_bytes: usize) !void { + const bump_amt = 512; + const handles = [_]windows.HANDLE{ + child.stdout.?.handle, + child.stderr.?.handle, }; - var waiting_objects: u32 = wait_objects.len; - // XXX: Calling zeroes([2]windows.OVERLAPPED) causes the stage1 compiler - // to crash and burn. var overlapped = [_]windows.OVERLAPPED{ mem.zeroes(windows.OVERLAPPED), mem.zeroes(windows.OVERLAPPED), }; - var temp_buf: [2][4096]u8 = undefined; - // Kickstart the loop by issuing two async reads. - // ReadFile returns false and GetLastError returns ERROR_IO_PENDING if - // everything is ok. - _ = windows.kernel32.ReadFile(wait_objects[0], &temp_buf[0], temp_buf[0].len, null, &overlapped[0]); - _ = windows.kernel32.ReadFile(wait_objects[1], &temp_buf[1], temp_buf[1].len, null, &overlapped[1]); + var wait_objects: [2]windows.HANDLE = undefined; + var wait_object_count: u2 = 0; - poll: while (waiting_objects > 0) { - const status = windows.kernel32.WaitForMultipleObjects(waiting_objects, &wait_objects, 0, windows.INFINITE); - switch (status) { - windows.WAIT_OBJECT_0 + 0...windows.WAIT_OBJECT_0 + 1 => { - // stdout (or stderr) is ready. - const object = status - windows.WAIT_OBJECT_0; + // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope + defer for (wait_objects[0..wait_object_count]) |o| { + _ = windows.kernel32.CancelIo(o); + }; - var read_bytes: u32 = undefined; - if (windows.kernel32.GetOverlappedResult(wait_objects[object], &overlapped[object], &read_bytes, 0) == 0) { - switch (windows.kernel32.GetLastError()) { - .BROKEN_PIPE => { - // Move it to the end to remove it. - if (object != waiting_objects - 1) - mem.swap(windows.kernel32.HANDLE, &wait_objects[object], &wait_objects[waiting_objects - 1]); - waiting_objects -= 1; - continue :poll; - }, - else => |err| return windows.unexpectedError(err), - } - } - try stdout.appendSlice(temp_buf[object][0..read_bytes]); - _ = windows.kernel32.ReadFile(wait_objects[object], &temp_buf[object], temp_buf[object].len, null, &overlapped[object]); - }, - windows.WAIT_FAILED => { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - }, - // We're waiting with an infinite timeout - windows.WAIT_TIMEOUT => unreachable, - else => unreachable, + // Windows Async IO requires an initial call to ReadFile before waiting on the handle + for ([_]u1{ 0, 1 }) |i| { + try outs[i].ensureCapacity(bump_amt); + const buf = outs[i].unusedCapacitySlice(); + _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]); + wait_objects[wait_object_count] = handles[i]; + wait_object_count += 1; + } + + while (true) { + const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE); + if (status == windows.WAIT_FAILED) { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } } + if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1) + unreachable; + + const wait_idx = status - windows.WAIT_OBJECT_0; + + // this extra `i` index is needed to map the wait handle back to the stdout or stderr + // values since the wait_idx can change which handle it corresponds with + const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1; + + // remove completed event from the wait list + wait_object_count -= 1; + if (wait_idx == 0) + wait_objects[0] = wait_objects[1]; + + var read_bytes: u32 = undefined; + if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) { + switch (windows.kernel32.GetLastError()) { + .BROKEN_PIPE => { + if (wait_object_count == 0) + break; + continue; + }, + else => |err| return windows.unexpectedError(err), + } + } + + outs[i].items.len += read_bytes; + const new_capacity = std.math.min(outs[i].items.len + bump_amt, max_output_bytes); + try outs[i].ensureCapacity(new_capacity); + const buf = outs[i].unusedCapacitySlice(); + if (buf.len == 0) return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong; + _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]); + wait_objects[wait_object_count] = handles[i]; + wait_object_count += 1; } } @@ -361,12 +380,8 @@ pub const ChildProcess = struct { stderr.deinit(); } - try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes); - - // XXX: Respect max_output_bytes - // XXX: Smarter reading logic, read directly into the ArrayList if (builtin.os.tag == .windows) { - try collectOutputWindows(child, &stdout, &stderr, args.max_output_bytes); + try collectOutputWindows(child, [_]*std.ArrayList(u8){ &stdout, &stderr }, args.max_output_bytes); } else { try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes); } @@ -707,7 +722,7 @@ pub const ChildProcess = struct { var g_hChildStd_OUT_Wr: ?windows.HANDLE = null; switch (self.stdout_behavior) { StdIo.Pipe => { - try windowsMakePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr); + try windowsMakeAsyncPipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr); }, StdIo.Ignore => { g_hChildStd_OUT_Wr = nul_handle; @@ -727,7 +742,7 @@ pub const ChildProcess = struct { var g_hChildStd_ERR_Wr: ?windows.HANDLE = null; switch (self.stderr_behavior) { StdIo.Pipe => { - try windowsMakePipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr); + try windowsMakeAsyncPipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr); }, StdIo.Ignore => { g_hChildStd_ERR_Wr = nul_handle; @@ -960,55 +975,6 @@ fn windowsDestroyPipe(rd: ?windows.HANDLE, wr: ?windows.HANDLE) void { if (wr) |h| os.close(h); } -var pipe_name_counter = std.atomic.Int(u32).init(1); - -fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { - var tmp_buf: [128]u8 = undefined; - // Forge a random path for the pipe. - const pipe_path = std.fmt.bufPrintZ( - &tmp_buf, - "\\\\.\\pipe\\zig-childprocess-{d}-{d}", - .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1) }, - ) catch unreachable; - - // Create the read handle that can be used with overlapped IO ops. - const read_handle = windows.kernel32.CreateNamedPipeA( - pipe_path, - windows.PIPE_ACCESS_INBOUND | windows.FILE_FLAG_OVERLAPPED, - windows.PIPE_TYPE_BYTE, - 1, - 0x1000, - 0x1000, - 0, - sattr, - ); - if (read_handle == windows.INVALID_HANDLE_VALUE) { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - } - - const write_handle = windows.kernel32.CreateFileA( - pipe_path, - windows.GENERIC_WRITE, - 0, - sattr, - windows.OPEN_EXISTING, - windows.FILE_ATTRIBUTE_NORMAL, - null, - ); - if (write_handle == windows.INVALID_HANDLE_VALUE) { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - } - - try windows.SetHandleInformation(read_handle, windows.HANDLE_FLAG_INHERIT, 0); - - rd.* = read_handle; - wr.* = write_handle; -} - fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { var rd_h: windows.HANDLE = undefined; var wr_h: windows.HANDLE = undefined; @@ -1019,14 +985,64 @@ fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const w wr.* = wr_h; } -fn windowsMakePipeOut(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { - var rd_h: windows.HANDLE = undefined; - var wr_h: windows.HANDLE = undefined; - try windows.CreatePipe(&rd_h, &wr_h, sattr); - errdefer windowsDestroyPipe(rd_h, wr_h); - try windows.SetHandleInformation(rd_h, windows.HANDLE_FLAG_INHERIT, 0); - rd.* = rd_h; - wr.* = wr_h; +var pipe_name_counter = std.atomic.Atomic(u32).init(1); + +fn windowsMakeAsyncPipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { + var tmp_bufw: [128]u16 = undefined; + + // We must make a named pipe on windows because anonymous pipes do not support async IO + const pipe_path = blk: { + var tmp_buf: [128]u8 = undefined; + // Forge a random path for the pipe. + const pipe_path = std.fmt.bufPrintZ( + &tmp_buf, + "\\\\.\\pipe\\zig-childprocess-{d}-{d}", + .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1, .Monotonic) }, + ) catch unreachable; + const len = std.unicode.utf8ToUtf16Le(&tmp_bufw, pipe_path) catch unreachable; + tmp_bufw[len] = 0; + break :blk tmp_bufw[0..len :0]; + }; + + // Create the read handle that can be used with overlapped IO ops. + const read_handle = windows.kernel32.CreateNamedPipeW( + pipe_path.ptr, + windows.PIPE_ACCESS_INBOUND | windows.FILE_FLAG_OVERLAPPED, + windows.PIPE_TYPE_BYTE, + 1, + 4096, + 4096, + 0, + sattr, + ); + if (read_handle == windows.INVALID_HANDLE_VALUE) { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } + } + errdefer os.close(read_handle); + + var sattr_copy = sattr.*; + const write_handle = windows.kernel32.CreateFileW( + pipe_path.ptr, + windows.GENERIC_WRITE, + 0, + &sattr_copy, + windows.OPEN_EXISTING, + windows.FILE_ATTRIBUTE_NORMAL, + null, + ); + if (write_handle == windows.INVALID_HANDLE_VALUE) { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } + } + errdefer os.close(write_handle); + + try windows.SetHandleInformation(read_handle, windows.HANDLE_FLAG_INHERIT, 0); + + rd.* = read_handle; + wr.* = write_handle; } fn destroyPipe(pipe: [2]os.fd_t) void { diff --git a/lib/std/os/windows/kernel32.zig b/lib/std/os/windows/kernel32.zig index f847a34162..971273ef3a 100644 --- a/lib/std/os/windows/kernel32.zig +++ b/lib/std/os/windows/kernel32.zig @@ -8,6 +8,7 @@ usingnamespace @import("bits.zig"); pub extern "kernel32" fn AddVectoredExceptionHandler(First: c_ulong, Handler: ?VECTORED_EXCEPTION_HANDLER) callconv(WINAPI) ?*c_void; pub extern "kernel32" fn RemoveVectoredExceptionHandler(Handle: HANDLE) callconv(WINAPI) c_ulong; +pub extern "kernel32" fn CancelIo(hFile: HANDLE) callconv(WINAPI) BOOL; pub extern "kernel32" fn CancelIoEx(hFile: HANDLE, lpOverlapped: ?LPOVERLAPPED) callconv(WINAPI) BOOL; pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL; @@ -15,29 +16,6 @@ pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL; pub extern "kernel32" fn CreateDirectoryW(lpPathName: [*:0]const u16, lpSecurityAttributes: ?*SECURITY_ATTRIBUTES) callconv(WINAPI) BOOL; pub extern "kernel32" fn SetEndOfFile(hFile: HANDLE) callconv(WINAPI) BOOL; -pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD; - -pub extern "kernel32" fn CreateNamedPipeA( - lpName: [*:0]const u8, - dwOpenMode: DWORD, - dwPipeMode: DWORD, - nMaxInstances: DWORD, - nOutBufferSize: DWORD, - nInBufferSize: DWORD, - nDefaultTimeOut: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, -) callconv(WINAPI) HANDLE; -pub extern "kernel32" fn CreateNamedPipeW( - lpName: LPCWSTR, - dwOpenMode: DWORD, - dwPipeMode: DWORD, - nMaxInstances: DWORD, - nOutBufferSize: DWORD, - nInBufferSize: DWORD, - nDefaultTimeOut: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, -) callconv(WINAPI) HANDLE; - pub extern "kernel32" fn CreateEventExW( lpEventAttributes: ?*SECURITY_ATTRIBUTES, lpName: [*:0]const u16, @@ -55,16 +33,6 @@ pub extern "kernel32" fn CreateFileW( hTemplateFile: ?HANDLE, ) callconv(WINAPI) HANDLE; -pub extern "kernel32" fn CreateFileA( - lpFileName: [*:0]const u8, - dwDesiredAccess: DWORD, - dwShareMode: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, - dwCreationDisposition: DWORD, - dwFlagsAndAttributes: DWORD, - hTemplateFile: ?HANDLE, -) callconv(WINAPI) HANDLE; - pub extern "kernel32" fn CreatePipe( hReadPipe: *HANDLE, hWritePipe: *HANDLE, @@ -72,6 +40,17 @@ pub extern "kernel32" fn CreatePipe( nSize: DWORD, ) callconv(WINAPI) BOOL; +pub extern "kernel32" fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, +) callconv(WINAPI) HANDLE; + pub extern "kernel32" fn CreateProcessW( lpApplicationName: ?LPWSTR, lpCommandLine: LPWSTR, @@ -132,6 +111,8 @@ pub extern "kernel32" fn GetCurrentDirectoryW(nBufferLength: DWORD, lpBuffer: ?[ pub extern "kernel32" fn GetCurrentThread() callconv(WINAPI) HANDLE; pub extern "kernel32" fn GetCurrentThreadId() callconv(WINAPI) DWORD; +pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD; + pub extern "kernel32" fn GetCurrentProcess() callconv(WINAPI) HANDLE; pub extern "kernel32" fn GetEnvironmentStringsW() callconv(WINAPI) ?[*:0]u16;