diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 3002d10e98..8099dd289e 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -491,6 +491,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket. /// Returns a pointer to the SQE. + /// Available since 5.5 pub fn accept( self: *IO_Uring, user_data: u64, @@ -505,10 +506,14 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform an multishot `accept4(2)` on a socket. + /// Queues an multishot accept on a socket. + /// /// Multishot variant allows an application to issue a single accept request, /// which will repeatedly trigger a CQE when a connection request comes in. - /// Returns a pointer to the SQE. + /// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate + /// further CQEs. + /// + /// Available since 5.19 pub fn accept_multishot( self: *IO_Uring, user_data: u64, @@ -523,6 +528,47 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an accept using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + /// + /// Available since 5.19 + pub fn accept_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_accept_direct(sqe, fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC); + sqe.user_data = user_data; + return sqe; + } + + /// Queues an multishot accept using direct (registered) file descriptors. + /// Available since 5.19 + pub fn accept_multishot_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_multishot_accept_direct(sqe, fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; + } + /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket. /// Returns a pointer to the SQE. pub fn connect( @@ -570,6 +616,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `recv(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6 pub fn recv( self: *IO_Uring, user_data: u64, @@ -593,6 +640,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `send(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6 pub fn send( self: *IO_Uring, user_data: u64, @@ -606,8 +654,56 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. + /// + /// This operation will most likely produce two CQEs. The flags field of the + /// first cqe may likely contain IORING_CQE_F_MORE, which means that there will + /// be a second cqe with the user_data field set to the same value. The user + /// must not modify the data buffer until the notification is posted. The first + /// cqe follows the usual rules and so its res field will contain the number of + /// bytes sent or a negative error code. The notification's res field will be + /// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two + /// step model is needed because the kernel may hold on to buffers for a long + /// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling + /// the lifetime of the buffers. Even errored requests may generate a + /// notification. + /// + /// Available since 6.0 + pub fn send_zc( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []const u8, + send_flags: u32, + zc_flags: u16, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_send_zc(sqe, fd, buffer, send_flags, zc_flags); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. + /// Returns a pointer to the SQE. + /// Available since 6.0 + pub fn send_zc_fixed( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []const u8, + send_flags: u32, + zc_flags: u16, + buf_index: u16, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_send_zc_fixed(sqe, fd, buffer, send_flags, zc_flags, buf_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform a `recvmsg(2)`. /// Returns a pointer to the SQE. + /// Available since 5.3 pub fn recvmsg( self: *IO_Uring, user_data: u64, @@ -623,6 +719,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `sendmsg(2)`. /// Returns a pointer to the SQE. + /// Available since 5.3 pub fn sendmsg( self: *IO_Uring, user_data: u64, @@ -636,8 +733,25 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`. + /// Returns a pointer to the SQE. + /// Available since 6.1 + pub fn sendmsg_zc( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + msg: *const os.msghdr_const, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_sendmsg_zc(sqe, fd, msg, flags); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform an `openat(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6. pub fn openat( self: *IO_Uring, user_data: u64, @@ -652,8 +766,35 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an openat using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + /// + /// Available since 5.15 + pub fn openat_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_openat_direct(sqe, fd, path, flags, mode, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform a `close(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6. pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe { const sqe = try self.get_sqe(); io_uring_prep_close(sqe, fd); @@ -661,6 +802,15 @@ pub const IO_Uring = struct { return sqe; } + /// Queues close of registered file descriptor. + /// Available since 5.15 + pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_close_direct(sqe, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to register a timeout operation. /// Returns a pointer to the SQE. /// @@ -1109,6 +1259,57 @@ pub const IO_Uring = struct { else => |errno| return os.unexpectedErrno(errno), } } + + /// Prepares a socket creation request. + /// New socket fd will be returned in completion result. + /// Available since 5.19 + pub fn socket( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file at index `file_index`. + /// Available since 5.19 + pub fn socket_direct( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct(sqe, domain, socket_type, protocol, flags, file_index); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc). + /// File index will be returned in CQE res field. + /// Available since 5.19 + pub fn socket_direct_alloc( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct_alloc(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } }; pub const SubmissionQueue = struct { @@ -1343,6 +1544,41 @@ pub fn io_uring_prep_accept( sqe.rw_flags = flags; } +pub fn io_uring_prep_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + file_index: u32, +) void { + io_uring_prep_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_multishot_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, +) void { + io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + +fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void { + const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC) + linux.IORING_FILE_INDEX_ALLOC + else + // 0 means no fixed files, indexes should be encoded as "index + 1" + file_index + 1; + // This filed is overloaded in liburing: + // splice_fd_in: i32 + // sqe_file_index: u32 + sqe.splice_fd_in = @bitCast(sqe_file_index); +} + pub fn io_uring_prep_connect( sqe: *linux.io_uring_sqe, fd: os.fd_t, @@ -1373,6 +1609,28 @@ pub fn io_uring_prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const sqe.rw_flags = flags; } +pub fn io_uring_prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void { + io_uring_prep_rw(.SEND_ZC, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0); + sqe.rw_flags = flags; + sqe.ioprio = zc_flags; +} + +pub fn io_uring_prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void { + io_uring_prep_send_zc(sqe, fd, buffer, flags, zc_flags); + sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF; + sqe.buf_index = buf_index; +} + +pub fn io_uring_prep_sendmsg_zc( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + msg: *const os.msghdr_const, + flags: u32, +) void { + io_uring_prep_sendmsg(sqe, fd, msg, flags); + sqe.opcode = .SENDMSG_ZC; +} + pub fn io_uring_prep_recvmsg( sqe: *linux.io_uring_sqe, fd: os.fd_t, @@ -1404,6 +1662,18 @@ pub fn io_uring_prep_openat( sqe.rw_flags = flags; } +pub fn io_uring_prep_openat_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, +) void { + io_uring_prep_openat(sqe, fd, path, flags, mode); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { sqe.* = .{ .opcode = .CLOSE, @@ -1423,6 +1693,11 @@ pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { }; } +pub fn io_uring_prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void { + io_uring_prep_close(sqe, 0); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_timeout( sqe: *linux.io_uring_sqe, ts: *const os.linux.kernel_timespec, @@ -1650,6 +1925,40 @@ pub fn io_uring_prep_multishot_accept( sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT; } +pub fn io_uring_prep_socket( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_rw(.SOCKET, sqe, @intCast(domain), 0, protocol, socket_type); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_socket_direct( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_socket_direct_alloc( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -3492,3 +3801,363 @@ test "accept multishot" { os.closeSocket(client); } } + +test "accept/connect/send_zc/recv" { + try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 }); + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); + + const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + var buffer_recv = [_]u8{0} ** 10; + + // zero-copy send + const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0); + send.flags |= linux.IOSQE_IO_LINK; + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); + try testing.expectEqual(@as(u32, 2), try ring.submit()); + + // First completion of zero-copy send. + // IORING_CQE_F_MORE, means that there + // will be a second completion event / notification for the + // request, with the user_data field set to the same value. + // buffer_send must be keep alive until second cqe. + var cqe_send = try ring.copy_cqe(); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xeeeeeeee, + .res = buffer_send.len, + .flags = linux.IORING_CQE_F_MORE, + }, cqe_send); + + const cqe_recv = try ring.copy_cqe(); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xffffffff, + .res = buffer_recv.len, + .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY, + }, cqe_recv); + + try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]); + + // Second completion of zero-copy send. + // IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer + cqe_send = try ring.copy_cqe(); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xeeeeeeee, + .res = 0, + .flags = linux.IORING_CQE_F_NOTIF, + }, cqe_send); +} + +test "accept_direct" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + // register direct file descriptors + var registered_fds = [_]os.fd_t{-1} ** 2; + try ring.register_files(registered_fds[0..]); + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + const read_userdata: u64 = 0xbbbbbbbb; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + + for (0..2) |_| { + for (registered_fds, 0..) |_, i| { + var buffer_recv = [_]u8{0} ** 16; + const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop + + // submit accept, will chose registered fd and return index in cqe + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + const fd_index = cqe_accept.res; + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + + // send data + _ = try os.send(client, buffer_send, 0); + + // Example of how to use registered fd: + // Submit receive to fixed file returned by accept (fd_index). + // Fd field is set to registered file index, returned by accept. + // Flag linux.IOSQE_FIXED_FILE must be set. + const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0); + recv_sqe.flags |= linux.IOSQE_FIXED_FILE; + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // accept receive + const recv_cqe = try ring.copy_cqe(); + try testing.expect(recv_cqe.user_data == read_userdata); + try testing.expect(recv_cqe.res == buffer_send.len); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + } + // no more available fds, accept will get NFILE error + { + // submit accept + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "accept_multishot_direct" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var address = try net.Address.parseIp4("127.0.0.1", 0); + + var registered_fds = [_]os.fd_t{-1} ** 2; + try ring.register_files(registered_fds[0..]); + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + + for (0..2) |_| { + // submit multishot accept + // Will chose registered fd and return index of the selected registered file in cqe. + _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + for (registered_fds) |_| { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + const fd_index = cqe_accept.res; + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set + } + // No more available fds, accept will get NFILE error. + // Multishot is terminated (more flag is not set). + { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "socket" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + // prepare, submit socket operation + _ = try ring.socket(0, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // test completion + var cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + const fd: os.fd_t = @intCast(cqe.res); + try testing.expect(fd > 2); + + os.close(fd); +} + +test "socket_direct/socket_direct_alloc/close_direct" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var registered_fds = [_]os.fd_t{-1} ** 3; + try ring.register_files(registered_fds[0..]); + + // create socket in registered file descriptor at index 0 (last param) + _ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 0); + + // create socket in registered file descriptor at index 1 (last param) + _ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 1); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 0); // res is 0 when index is specified + + // create socket in kernel chosen file descriptor index (_alloc version) + // completion res has index from registered files + _ = try ring.socket_direct_alloc(0, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 2); // returns registered file index + + // use sockets from registered_fds in connect operation + var address = try net.Address.parseIp4("127.0.0.1", 0); + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + const accept_userdata: u64 = 0xaaaaaaaa; + const connect_userdata: u64 = 0xbbbbbbbb; + const close_userdata: u64 = 0xcccccccc; + for (registered_fds, 0..) |_, fd_index| { + // prepare accept + _ = try ring.accept(accept_userdata, listener_socket, null, null, 0); + // prepare connect with fixed socket + const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), &address.any, address.getOsSockLen()); + connect_sqe.flags |= linux.IOSQE_FIXED_FILE; // fd is fixed file index + // submit both + try testing.expectEqual(@as(u32, 2), try ring.submit()); + // get completions + var cqe_connect = try ring.copy_cqe(); + var cqe_accept = try ring.copy_cqe(); + // ignore order + if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + // test connect completion + try testing.expect(cqe_connect.user_data == connect_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_connect.err()); + // test accept completion + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + + // submit and test close_direct + _ = try ring.close_direct(close_userdata, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expect(cqe_close.user_data == close_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + + try ring.unregister_files(); +} + +test "openat_direct/close_direct" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var registered_fds = [_]os.fd_t{-1} ** 3; + try ring.register_files(registered_fds[0..]); + + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const path = "test_io_uring_close_direct"; + const flags: u32 = os.O.RDWR | os.O.CREAT; + const mode: os.mode_t = 0o666; + const user_data: u64 = 0; + + // use registered file at index 0 (last param) + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); + + // use registered file at index 1 + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); // res is 0 when we specify index + + // let kernel choose registered file index + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 2); // chosen index is in res + + // close all open file descriptors + for (registered_fds, 0..) |_, fd_index| { + _ = try ring.close_direct(user_data, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + try ring.unregister_files(); +} + +/// For use in tests. Returns SkipZigTest is kernel version is less than required. +inline fn skipKernelLessThan(required: std.SemanticVersion) !void { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var uts: linux.utsname = undefined; + const res = linux.uname(&uts); + switch (linux.getErrno(res)) { + .SUCCESS => {}, + else => |errno| return os.unexpectedErrno(errno), + } + + const release = mem.sliceTo(&uts.release, 0); + var current = try std.SemanticVersion.parse(release); + current.pre = null; // don't check pre field + if (required.order(current) == .gt) return error.SkipZigTest; +}