commit 4cbf30c2a419df3181b41cb0783e175c54248de2 (tree)
parent c2ebbd8911d7eec337203bd1de1fd2f3a59273c8
Author: Andrew Kelley <andrew@ziglang.org>
Date: Thu, 5 Mar 2026 18:04:35 -0800
std.Io.Uring: implement net_receive operate
Batch implementation still TODO.
Diffstat:
2 files changed, 22 insertions(+), 66 deletions(-)
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
@@ -13435,23 +13435,6 @@ fn netReceiveWindowsOne(
}
}
-fn netReceiveUnavailable(
- userdata: ?*anyopaque,
- handle: net.Socket.Handle,
- message_buffer: []net.IncomingMessage,
- data_buffer: []u8,
- flags: net.ReceiveFlags,
- timeout: Io.Timeout,
-) struct { ?net.Socket.ReceiveTimeoutError, usize } {
- _ = userdata;
- _ = handle;
- _ = message_buffer;
- _ = data_buffer;
- _ = flags;
- _ = timeout;
- return .{ error.NetworkDown, 0 };
-}
-
fn netWritePosix(
userdata: ?*anyopaque,
fd: net.Socket.Handle,
diff --git a/lib/std/Io/Uring.zig b/lib/std/Io/Uring.zig
@@ -777,7 +777,6 @@ pub fn io(ev: *Evented) Io {
.netConnectUnix = netConnectUnixUnavailable,
.netSocketCreatePair = netSocketCreatePairUnavailable,
.netSend = netSendUnavailable,
- .netReceive = netReceive,
.netRead = netReadUnavailable,
.netWrite = netWriteUnavailable,
.netWriteFile = netWriteFileUnavailable,
@@ -2092,6 +2091,18 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
.device_io_control => |o| .{
.device_io_control = try ev.deviceIoControl(try maybe_sync.enterSync(ev), o),
},
+ .net_receive => |o| .{
+ .net_receive = r: {
+ const opt_err, const n = ev.netReceive(&maybe_sync.cancel_region, o.socket_handle, o.message_buffer, o.data_buffer, o.flags);
+ break :r .{
+ if (opt_err) |err| switch (err) {
+ error.Canceled => |e| return e,
+ else => |e| e,
+ } else null,
+ n,
+ };
+ },
+ },
};
}
@@ -2375,6 +2386,10 @@ fn batchDrainSubmitted(
return error.ConcurrencyUnavailable
else
.{ .device_io_control = try ev.deviceIoControl(try maybe_sync.enterSync(ev), o) },
+ .net_receive => |o| {
+ _ = o;
+ @panic("TODO implement batchDrainSubmitted for net_receive");
+ },
})) |result| {
switch (batch.completed.tail) {
.none => batch.completed.head = index,
@@ -2475,6 +2490,7 @@ fn batchDrainReady(batch: *Io.Batch) Io.Timeout.Error!void {
},
},
.device_io_control => unreachable,
+ .net_receive => @panic("TODO"),
})) |result| {
switch (batch.completed.tail) {
.none => batch.completed.head = index,
@@ -5035,37 +5051,16 @@ fn netSendUnavailable(
}
fn netReceive(
- userdata: ?*anyopaque,
+ ev: *Evented,
+ cancel_region: *CancelRegion,
handle: net.Socket.Handle,
message_buffer: []net.IncomingMessage,
data_buffer: []u8,
flags: net.ReceiveFlags,
- timeout: Io.Timeout,
-) struct { ?net.Socket.ReceiveTimeoutError, usize } {
- const ev: *Evented = @ptrCast(@alignCast(userdata));
- const ev_io = ev.io();
-
+) struct { ?net.Socket.ReceiveError, usize } {
var message_i: usize = 0;
var data_i: usize = 0;
- const deadline: ?struct {
- raw: Io.Timestamp,
- timespec: linux.kernel_timespec,
- clock: Io.Clock,
- } = if (timeout.toTimestamp(ev_io)) |deadline| deadline: {
- const ns = deadline.raw.toNanoseconds();
- break :deadline .{
- .raw = deadline.raw,
- .timespec = .{
- .sec = @intCast(@divFloor(ns, std.time.ns_per_s)),
- .nsec = @intCast(@mod(ns, std.time.ns_per_s)),
- },
- .clock = deadline.clock,
- };
- } else null;
-
- var cancel_region: CancelRegion = .init();
- defer cancel_region.deinit();
while (true) {
if (message_buffer.len - message_i == 0) return .{ null, message_i };
const message = &message_buffer[message_i];
@@ -5085,7 +5080,7 @@ fn netReceive(
const thread = cancel_region.awaitIoUring() catch |err| return .{ err, message_i };
thread.enqueue().* = .{
.opcode = .RECVMSG,
- .flags = if (deadline) |_| linux.IOSQE_IO_LINK else 0,
+ .flags = 0,
.ioprio = 0,
.fd = handle,
.off = 0,
@@ -5102,26 +5097,6 @@ fn netReceive(
.addr3 = 0,
.resv = 0,
};
- if (deadline) |*deadline_ptr| thread.enqueue().* = .{
- .opcode = .LINK_TIMEOUT,
- .flags = linux.IOSQE_CQE_SKIP_SUCCESS,
- .ioprio = 0,
- .fd = 0,
- .off = 0,
- .addr = @intFromPtr(&deadline_ptr.timespec),
- .len = 1,
- .rw_flags = linux.IORING_TIMEOUT_ABS | @as(u32, switch (deadline_ptr.clock) {
- .real => linux.IORING_TIMEOUT_REALTIME,
- else => 0,
- .boot => linux.IORING_TIMEOUT_BOOTTIME,
- }),
- .user_data = @intFromEnum(Completion.Userdata.wakeup),
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
ev.yield(null, .nothing);
const completion = cancel_region.completion();
switch (completion.errno()) {
@@ -5144,9 +5119,7 @@ fn netReceive(
continue;
},
.AGAIN => unreachable,
- .INTR, .CANCELED => if (deadline) |d| if (now(ev, d.clock).nanoseconds >= d.raw.nanoseconds)
- return .{ error.Timeout, message_i },
-
+ .INTR, .CANCELED => {},
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },