commit c2ebbd8911d7eec337203bd1de1fd2f3a59273c8 (tree)
parent 8b69341271db18a03a4aa7400a4c6679c5fbf5c6
Author: Andrew Kelley <andrew@ziglang.org>
Date: Thu, 5 Mar 2026 17:46:07 -0800
std.Io.Threaded: implement net_receive for Windows
Diffstat:
2 files changed, 131 insertions(+), 25 deletions(-)
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -2649,7 +2649,7 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
- batchDrainSubmittedWindows(b, false) catch |err| switch (err) {
+ batchDrainSubmittedWindows(t, b, false) catch |err| switch (err) {
error.ConcurrencyUnavailable => unreachable, // passed concurrency=false
else => |e| return e,
};
@@ -2789,7 +2789,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t));
- try batchDrainSubmittedWindows(b, true);
+ try batchDrainSubmittedWindows(t, b, true);
while (b.pending.head != .none and b.completed.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
@@ -3005,6 +3005,31 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
}
}
+fn batchCompleteBlockingWindows(
+ b: *Io.Batch,
+ operation_userdata: *WindowsBatchOperationUserdata,
+ result: Io.Operation.Result,
+) void {
+ const erased_userdata = operation_userdata.toErased();
+ const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("userdata", erased_userdata);
+ switch (pending.node.prev) {
+ .none => b.pending.head = pending.node.next,
+ else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next,
+ }
+ switch (pending.node.next) {
+ .none => b.pending.tail = pending.node.prev,
+ else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev,
+ }
+ const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending);
+ const index: Io.Operation.OptionalIndex = .fromIndex(storage - b.storage.ptr);
+ switch (b.completed.tail) {
+ .none => b.completed.head = index,
+ else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
+ }
+ b.completed.tail = index;
+ storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
+}
+
fn batchApc(
apc_context: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
@@ -3044,7 +3069,7 @@ fn batchApc(
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
.file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
.device_io_control => .{ .device_io_control = iosb.* },
- .net_receive => unreachable, // TODO
+ .net_receive => unreachable,
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
@@ -3052,7 +3077,7 @@ fn batchApc(
}
/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable.
-fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void {
+fn batchDrainSubmittedWindows(t: *Threaded, b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void {
var index = b.submitted.head;
errdefer b.submitted.head = index;
while (index != .none) {
@@ -3246,10 +3271,12 @@ fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentErr
};
}
},
- .net_receive => |o| {
+ .net_receive => |*o| {
+ // TODO integrate with overlapped I/O or equivalent to avoid this error
if (concurrency) return error.ConcurrencyUnavailable;
- _ = o;
- @panic("TODO implement Batch NetReceive on Windows");
+ batchCompleteBlockingWindows(b, operation_userdata, .{
+ .net_receive = netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags),
+ });
},
}
index = submission.node.next;
@@ -13323,13 +13350,89 @@ fn netReceiveWindows(
data_buffer: []u8,
flags: net.ReceiveFlags,
) struct { ?net.Socket.ReceiveError, usize } {
- if (!have_networking) return .{ error.NetworkDown, 0 };
- _ = t;
- _ = socket_handle;
- _ = message_buffer;
- _ = data_buffer;
- _ = flags;
- @panic("TODO implement netReceiveWindows");
+ netReceiveWindowsOne(t, socket_handle, &message_buffer[0], data_buffer, flags) catch |err| return .{ err, 0 };
+ return .{ null, 1 };
+}
+
+fn netReceiveWindowsOne(
+ t: *Threaded,
+ socket_handle: net.Socket.Handle,
+ message: *net.IncomingMessage,
+ data_buffer: []u8,
+ flags: net.ReceiveFlags,
+) net.Socket.ReceiveError!void {
+ comptime assert(have_networking);
+
+ var windows_flags: u32 =
+ @as(u32, if (flags.oob) ws2_32.MSG.OOB else 0) |
+ @as(u32, if (flags.peek) ws2_32.MSG.PEEK else 0) |
+ @as(u32, if (flags.trunc) ws2_32.MSG.TRUNC else 0);
+
+ var buf: ws2_32.WSABUF = .{
+ .buf = data_buffer.ptr,
+ .len = std.math.cast(u32, data_buffer.len) orelse return error.MessageOversize,
+ };
+ var n: u32 = undefined;
+ var syscall: Syscall = try .start();
+ var from_storage: WsaAddress = undefined;
+ var from_storage_len: i32 = @sizeOf(WsaAddress);
+
+ while (true) {
+ const rc = ws2_32.WSARecvFrom(
+ socket_handle,
+ (&buf)[0..1],
+ 1,
+ &n,
+ &windows_flags,
+ &from_storage.any,
+ &from_storage_len,
+ null,
+ null,
+ );
+ if (rc != ws2_32.SOCKET_ERROR) {
+ syscall.finish();
+ message.* = .{
+ .from = addressFromWsa(&from_storage),
+ .data = data_buffer[0..n],
+ .control = &.{},
+ .flags = .{
+ .eor = false,
+ .trunc = (windows_flags & ws2_32.MSG.TRUNC) != 0,
+ .ctrunc = (windows_flags & ws2_32.MSG.CTRUNC) != 0,
+ .oob = false,
+ .errqueue = false,
+ },
+ };
+ return;
+ }
+ switch (ws2_32.WSAGetLastError()) {
+ .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => {
+ try syscall.checkCancel();
+ continue;
+ },
+ .NOTINITIALISED => {
+ syscall.finish();
+ try initializeWsa(t);
+ syscall = try .start();
+ continue;
+ },
+
+ .ECONNRESET => return syscall.fail(error.ConnectionResetByPeer),
+ .ENETDOWN => return syscall.fail(error.NetworkDown),
+ .ENETRESET => return syscall.fail(error.ConnectionResetByPeer),
+ .ENOTCONN => return syscall.fail(error.SocketUnconnected),
+ .EFAULT => unreachable, // a pointer is not completely contained in user address space.
+
+ else => |err| {
+ syscall.finish();
+ switch (err) {
+ .EINVAL => return wsaErrorBug(err),
+ .EMSGSIZE => return wsaErrorBug(err),
+ else => return windows.unexpectedWSAError(err),
+ }
+ },
+ }
+ }
}
fn netReceiveUnavailable(
diff --git a/lib/std/os/windows/ws2_32.zig b/lib/std/os/windows/ws2_32.zig
@@ -661,17 +661,20 @@ pub const IOC_OUT = 1073741824;
pub const IOC_IN = 2147483648;
pub const MSG = struct {
- pub const TRUNC = 256;
- pub const CTRUNC = 512;
- pub const BCAST = 1024;
- pub const MCAST = 2048;
- pub const ERRQUEUE = 4096;
-
- pub const PEEK = 2;
- pub const WAITALL = 8;
- pub const PUSH_IMMEDIATE = 32;
- pub const PARTIAL = 32768;
- pub const INTERRUPT = 16;
+ pub const OOB = 0x1;
+ pub const PEEK = 0x2;
+ pub const DONTROUTE = 0x4;
+ pub const WAITALL = 0x8;
+ pub const INTERRUPT = 0x10;
+ pub const PUSH_IMMEDIATE = 0x20;
+
+ pub const TRUNC = 0x0100;
+ pub const CTRUNC = 0x0200;
+ pub const BCAST = 0x0400;
+ pub const MCAST = 0x0800;
+
+ pub const PARTIAL = 0x8000;
+
pub const MAXIOVLEN = 16;
};