Merge pull request #18025 from ianic/io_uring_send_zc
io_uring: add send_zc and *_direct operations
This commit is contained in:
@@ -491,6 +491,7 @@ pub const IO_Uring = struct {
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.5
|
||||
pub fn accept(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -505,10 +506,14 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an multishot `accept4(2)` on a socket.
|
||||
/// Queues an multishot accept on a socket.
|
||||
///
|
||||
/// Multishot variant allows an application to issue a single accept request,
|
||||
/// which will repeatedly trigger a CQE when a connection request comes in.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
|
||||
/// further CQEs.
|
||||
///
|
||||
/// Available since 5.19
|
||||
pub fn accept_multishot(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -523,6 +528,47 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues an accept using direct (registered) file descriptors.
|
||||
///
|
||||
/// To use an accept direct variant, the application must first have registered
|
||||
/// a file table (with register_files). An unused table index will be
|
||||
/// dynamically chosen and returned in the CQE res field.
|
||||
///
|
||||
/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
|
||||
/// flags member, and setting the SQE fd field to the direct descriptor value
|
||||
/// rather than the regular file descriptor.
|
||||
///
|
||||
/// Available since 5.19
|
||||
pub fn accept_direct(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
addr: ?*os.sockaddr,
|
||||
addrlen: ?*os.socklen_t,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_accept_direct(sqe, fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues an multishot accept using direct (registered) file descriptors.
|
||||
/// Available since 5.19
|
||||
pub fn accept_multishot_direct(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
addr: ?*os.sockaddr,
|
||||
addrlen: ?*os.socklen_t,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_multishot_accept_direct(sqe, fd, addr, addrlen, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
pub fn connect(
|
||||
@@ -570,6 +616,7 @@ pub const IO_Uring = struct {
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform a `recv(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.6
|
||||
pub fn recv(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -593,6 +640,7 @@ pub const IO_Uring = struct {
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform a `send(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.6
|
||||
pub fn send(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -606,8 +654,56 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
|
||||
///
|
||||
/// This operation will most likely produce two CQEs. The flags field of the
|
||||
/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
|
||||
/// be a second cqe with the user_data field set to the same value. The user
|
||||
/// must not modify the data buffer until the notification is posted. The first
|
||||
/// cqe follows the usual rules and so its res field will contain the number of
|
||||
/// bytes sent or a negative error code. The notification's res field will be
|
||||
/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
|
||||
/// step model is needed because the kernel may hold on to buffers for a long
|
||||
/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
|
||||
/// the lifetime of the buffers. Even errored requests may generate a
|
||||
/// notification.
|
||||
///
|
||||
/// Available since 6.0
|
||||
pub fn send_zc(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
buffer: []const u8,
|
||||
send_flags: u32,
|
||||
zc_flags: u16,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_send_zc(sqe, fd, buffer, send_flags, zc_flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.0
|
||||
pub fn send_zc_fixed(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
buffer: []const u8,
|
||||
send_flags: u32,
|
||||
zc_flags: u16,
|
||||
buf_index: u16,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_send_zc_fixed(sqe, fd, buffer, send_flags, zc_flags, buf_index);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.3
|
||||
pub fn recvmsg(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -623,6 +719,7 @@ pub const IO_Uring = struct {
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.3
|
||||
pub fn sendmsg(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -636,8 +733,25 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.1
|
||||
pub fn sendmsg_zc(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
msg: *const os.msghdr_const,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_sendmsg_zc(sqe, fd, msg, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `openat(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.6.
|
||||
pub fn openat(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
@@ -652,8 +766,35 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues an openat using direct (registered) file descriptors.
|
||||
///
|
||||
/// To use an accept direct variant, the application must first have registered
|
||||
/// a file table (with register_files). An unused table index will be
|
||||
/// dynamically chosen and returned in the CQE res field.
|
||||
///
|
||||
/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
|
||||
/// flags member, and setting the SQE fd field to the direct descriptor value
|
||||
/// rather than the regular file descriptor.
|
||||
///
|
||||
/// Available since 5.15
|
||||
pub fn openat_direct(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
fd: os.fd_t,
|
||||
path: [*:0]const u8,
|
||||
flags: u32,
|
||||
mode: os.mode_t,
|
||||
file_index: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_openat_direct(sqe, fd, path, flags, mode, file_index);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform a `close(2)`.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 5.6.
|
||||
pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_close(sqe, fd);
|
||||
@@ -661,6 +802,15 @@ pub const IO_Uring = struct {
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues close of registered file descriptor.
|
||||
/// Available since 5.15
|
||||
pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_close_direct(sqe, file_index);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to register a timeout operation.
|
||||
/// Returns a pointer to the SQE.
|
||||
///
|
||||
@@ -1109,6 +1259,57 @@ pub const IO_Uring = struct {
|
||||
else => |errno| return os.unexpectedErrno(errno),
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepares a socket creation request.
|
||||
/// New socket fd will be returned in completion result.
|
||||
/// Available since 5.19
|
||||
pub fn socket(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Prepares a socket creation request for registered file at index `file_index`.
|
||||
/// Available since 5.19
|
||||
pub fn socket_direct(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
file_index: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_socket_direct(sqe, domain, socket_type, protocol, flags, file_index);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc).
|
||||
/// File index will be returned in CQE res field.
|
||||
/// Available since 5.19
|
||||
pub fn socket_direct_alloc(
|
||||
self: *IO_Uring,
|
||||
user_data: u64,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
io_uring_prep_socket_direct_alloc(sqe, domain, socket_type, protocol, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
};
|
||||
|
||||
pub const SubmissionQueue = struct {
|
||||
@@ -1343,6 +1544,41 @@ pub fn io_uring_prep_accept(
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_accept_direct(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
addr: ?*os.sockaddr,
|
||||
addrlen: ?*os.socklen_t,
|
||||
flags: u32,
|
||||
file_index: u32,
|
||||
) void {
|
||||
io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
|
||||
__io_uring_set_target_fixed_file(sqe, file_index);
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_multishot_accept_direct(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
addr: ?*os.sockaddr,
|
||||
addrlen: ?*os.socklen_t,
|
||||
flags: u32,
|
||||
) void {
|
||||
io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags);
|
||||
__io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
|
||||
}
|
||||
|
||||
fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void {
|
||||
const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC)
|
||||
linux.IORING_FILE_INDEX_ALLOC
|
||||
else
|
||||
// 0 means no fixed files, indexes should be encoded as "index + 1"
|
||||
file_index + 1;
|
||||
// This filed is overloaded in liburing:
|
||||
// splice_fd_in: i32
|
||||
// sqe_file_index: u32
|
||||
sqe.splice_fd_in = @bitCast(sqe_file_index);
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_connect(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
@@ -1373,6 +1609,28 @@ pub fn io_uring_prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void {
|
||||
io_uring_prep_rw(.SEND_ZC, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
|
||||
sqe.rw_flags = flags;
|
||||
sqe.ioprio = zc_flags;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void {
|
||||
io_uring_prep_send_zc(sqe, fd, buffer, flags, zc_flags);
|
||||
sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF;
|
||||
sqe.buf_index = buf_index;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_sendmsg_zc(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
msg: *const os.msghdr_const,
|
||||
flags: u32,
|
||||
) void {
|
||||
io_uring_prep_sendmsg(sqe, fd, msg, flags);
|
||||
sqe.opcode = .SENDMSG_ZC;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_recvmsg(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
@@ -1404,6 +1662,18 @@ pub fn io_uring_prep_openat(
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_openat_direct(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: os.fd_t,
|
||||
path: [*:0]const u8,
|
||||
flags: u32,
|
||||
mode: os.mode_t,
|
||||
file_index: u32,
|
||||
) void {
|
||||
io_uring_prep_openat(sqe, fd, path, flags, mode);
|
||||
__io_uring_set_target_fixed_file(sqe, file_index);
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void {
|
||||
sqe.* = .{
|
||||
.opcode = .CLOSE,
|
||||
@@ -1423,6 +1693,11 @@ pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void {
|
||||
io_uring_prep_close(sqe, 0);
|
||||
__io_uring_set_target_fixed_file(sqe, file_index);
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_timeout(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
ts: *const os.linux.kernel_timespec,
|
||||
@@ -1650,6 +1925,40 @@ pub fn io_uring_prep_multishot_accept(
|
||||
sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_socket(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
) void {
|
||||
io_uring_prep_rw(.SOCKET, sqe, @intCast(domain), 0, protocol, socket_type);
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_socket_direct(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
file_index: u32,
|
||||
) void {
|
||||
io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
|
||||
__io_uring_set_target_fixed_file(sqe, file_index);
|
||||
}
|
||||
|
||||
pub fn io_uring_prep_socket_direct_alloc(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
domain: u32,
|
||||
socket_type: u32,
|
||||
protocol: u32,
|
||||
flags: u32,
|
||||
) void {
|
||||
io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
|
||||
__io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
|
||||
}
|
||||
|
||||
test "structs/offsets/entries" {
|
||||
if (builtin.os.tag != .linux) return error.SkipZigTest;
|
||||
|
||||
@@ -3492,3 +3801,363 @@ test "accept multishot" {
|
||||
os.closeSocket(client);
|
||||
}
|
||||
}
|
||||
|
||||
test "accept/connect/send_zc/recv" {
|
||||
try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
const socket_test_harness = try createSocketTestHarness(&ring);
|
||||
defer socket_test_harness.close();
|
||||
|
||||
const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
|
||||
var buffer_recv = [_]u8{0} ** 10;
|
||||
|
||||
// zero-copy send
|
||||
const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
|
||||
send.flags |= linux.IOSQE_IO_LINK;
|
||||
_ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
|
||||
try testing.expectEqual(@as(u32, 2), try ring.submit());
|
||||
|
||||
// First completion of zero-copy send.
|
||||
// IORING_CQE_F_MORE, means that there
|
||||
// will be a second completion event / notification for the
|
||||
// request, with the user_data field set to the same value.
|
||||
// buffer_send must be keep alive until second cqe.
|
||||
var cqe_send = try ring.copy_cqe();
|
||||
try testing.expectEqual(linux.io_uring_cqe{
|
||||
.user_data = 0xeeeeeeee,
|
||||
.res = buffer_send.len,
|
||||
.flags = linux.IORING_CQE_F_MORE,
|
||||
}, cqe_send);
|
||||
|
||||
const cqe_recv = try ring.copy_cqe();
|
||||
try testing.expectEqual(linux.io_uring_cqe{
|
||||
.user_data = 0xffffffff,
|
||||
.res = buffer_recv.len,
|
||||
.flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
|
||||
}, cqe_recv);
|
||||
|
||||
try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
|
||||
|
||||
// Second completion of zero-copy send.
|
||||
// IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer
|
||||
cqe_send = try ring.copy_cqe();
|
||||
try testing.expectEqual(linux.io_uring_cqe{
|
||||
.user_data = 0xeeeeeeee,
|
||||
.res = 0,
|
||||
.flags = linux.IORING_CQE_F_NOTIF,
|
||||
}, cqe_send);
|
||||
}
|
||||
|
||||
test "accept_direct" {
|
||||
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
var address = try net.Address.parseIp4("127.0.0.1", 0);
|
||||
|
||||
// register direct file descriptors
|
||||
var registered_fds = [_]os.fd_t{-1} ** 2;
|
||||
try ring.register_files(registered_fds[0..]);
|
||||
|
||||
const listener_socket = try createListenerSocket(&address);
|
||||
defer os.closeSocket(listener_socket);
|
||||
|
||||
const accept_userdata: u64 = 0xaaaaaaaa;
|
||||
const read_userdata: u64 = 0xbbbbbbbb;
|
||||
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
|
||||
|
||||
for (0..2) |_| {
|
||||
for (registered_fds, 0..) |_, i| {
|
||||
var buffer_recv = [_]u8{0} ** 16;
|
||||
const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop
|
||||
|
||||
// submit accept, will chose registered fd and return index in cqe
|
||||
_ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
|
||||
// connect
|
||||
var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0);
|
||||
try os.connect(client, &address.any, address.getOsSockLen());
|
||||
defer os.closeSocket(client);
|
||||
|
||||
// accept completion
|
||||
const cqe_accept = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_accept.err());
|
||||
const fd_index = cqe_accept.res;
|
||||
try testing.expect(fd_index < registered_fds.len);
|
||||
try testing.expect(cqe_accept.user_data == accept_userdata);
|
||||
|
||||
// send data
|
||||
_ = try os.send(client, buffer_send, 0);
|
||||
|
||||
// Example of how to use registered fd:
|
||||
// Submit receive to fixed file returned by accept (fd_index).
|
||||
// Fd field is set to registered file index, returned by accept.
|
||||
// Flag linux.IOSQE_FIXED_FILE must be set.
|
||||
const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0);
|
||||
recv_sqe.flags |= linux.IOSQE_FIXED_FILE;
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
|
||||
// accept receive
|
||||
const recv_cqe = try ring.copy_cqe();
|
||||
try testing.expect(recv_cqe.user_data == read_userdata);
|
||||
try testing.expect(recv_cqe.res == buffer_send.len);
|
||||
try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
|
||||
}
|
||||
// no more available fds, accept will get NFILE error
|
||||
{
|
||||
// submit accept
|
||||
_ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
// connect
|
||||
var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0);
|
||||
try os.connect(client, &address.any, address.getOsSockLen());
|
||||
defer os.closeSocket(client);
|
||||
// completion with error
|
||||
const cqe_accept = try ring.copy_cqe();
|
||||
try testing.expect(cqe_accept.user_data == accept_userdata);
|
||||
try testing.expectEqual(os.E.NFILE, cqe_accept.err());
|
||||
}
|
||||
// return file descriptors to kernel
|
||||
try ring.register_files_update(0, registered_fds[0..]);
|
||||
}
|
||||
try ring.unregister_files();
|
||||
}
|
||||
|
||||
test "accept_multishot_direct" {
|
||||
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
var address = try net.Address.parseIp4("127.0.0.1", 0);
|
||||
|
||||
var registered_fds = [_]os.fd_t{-1} ** 2;
|
||||
try ring.register_files(registered_fds[0..]);
|
||||
|
||||
const listener_socket = try createListenerSocket(&address);
|
||||
defer os.closeSocket(listener_socket);
|
||||
|
||||
const accept_userdata: u64 = 0xaaaaaaaa;
|
||||
|
||||
for (0..2) |_| {
|
||||
// submit multishot accept
|
||||
// Will chose registered fd and return index of the selected registered file in cqe.
|
||||
_ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
|
||||
for (registered_fds) |_| {
|
||||
// connect
|
||||
var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0);
|
||||
try os.connect(client, &address.any, address.getOsSockLen());
|
||||
defer os.closeSocket(client);
|
||||
|
||||
// accept completion
|
||||
const cqe_accept = try ring.copy_cqe();
|
||||
const fd_index = cqe_accept.res;
|
||||
try testing.expect(fd_index < registered_fds.len);
|
||||
try testing.expect(cqe_accept.user_data == accept_userdata);
|
||||
try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set
|
||||
}
|
||||
// No more available fds, accept will get NFILE error.
|
||||
// Multishot is terminated (more flag is not set).
|
||||
{
|
||||
// connect
|
||||
var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0);
|
||||
try os.connect(client, &address.any, address.getOsSockLen());
|
||||
defer os.closeSocket(client);
|
||||
// completion with error
|
||||
const cqe_accept = try ring.copy_cqe();
|
||||
try testing.expect(cqe_accept.user_data == accept_userdata);
|
||||
try testing.expectEqual(os.E.NFILE, cqe_accept.err());
|
||||
try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set
|
||||
}
|
||||
// return file descriptors to kernel
|
||||
try ring.register_files_update(0, registered_fds[0..]);
|
||||
}
|
||||
try ring.unregister_files();
|
||||
}
|
||||
|
||||
test "socket" {
|
||||
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
// prepare, submit socket operation
|
||||
_ = try ring.socket(0, linux.AF.INET, os.SOCK.STREAM, 0, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
|
||||
// test completion
|
||||
var cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe.err());
|
||||
const fd: os.fd_t = @intCast(cqe.res);
|
||||
try testing.expect(fd > 2);
|
||||
|
||||
os.close(fd);
|
||||
}
|
||||
|
||||
test "socket_direct/socket_direct_alloc/close_direct" {
|
||||
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
var registered_fds = [_]os.fd_t{-1} ** 3;
|
||||
try ring.register_files(registered_fds[0..]);
|
||||
|
||||
// create socket in registered file descriptor at index 0 (last param)
|
||||
_ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
var cqe_socket = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_socket.err());
|
||||
try testing.expect(cqe_socket.res == 0);
|
||||
|
||||
// create socket in registered file descriptor at index 1 (last param)
|
||||
_ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 1);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
cqe_socket = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_socket.err());
|
||||
try testing.expect(cqe_socket.res == 0); // res is 0 when index is specified
|
||||
|
||||
// create socket in kernel chosen file descriptor index (_alloc version)
|
||||
// completion res has index from registered files
|
||||
_ = try ring.socket_direct_alloc(0, linux.AF.INET, os.SOCK.STREAM, 0, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
cqe_socket = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_socket.err());
|
||||
try testing.expect(cqe_socket.res == 2); // returns registered file index
|
||||
|
||||
// use sockets from registered_fds in connect operation
|
||||
var address = try net.Address.parseIp4("127.0.0.1", 0);
|
||||
const listener_socket = try createListenerSocket(&address);
|
||||
defer os.closeSocket(listener_socket);
|
||||
const accept_userdata: u64 = 0xaaaaaaaa;
|
||||
const connect_userdata: u64 = 0xbbbbbbbb;
|
||||
const close_userdata: u64 = 0xcccccccc;
|
||||
for (registered_fds, 0..) |_, fd_index| {
|
||||
// prepare accept
|
||||
_ = try ring.accept(accept_userdata, listener_socket, null, null, 0);
|
||||
// prepare connect with fixed socket
|
||||
const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), &address.any, address.getOsSockLen());
|
||||
connect_sqe.flags |= linux.IOSQE_FIXED_FILE; // fd is fixed file index
|
||||
// submit both
|
||||
try testing.expectEqual(@as(u32, 2), try ring.submit());
|
||||
// get completions
|
||||
var cqe_connect = try ring.copy_cqe();
|
||||
var cqe_accept = try ring.copy_cqe();
|
||||
// ignore order
|
||||
if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) {
|
||||
const a = cqe_accept;
|
||||
const b = cqe_connect;
|
||||
cqe_accept = b;
|
||||
cqe_connect = a;
|
||||
}
|
||||
// test connect completion
|
||||
try testing.expect(cqe_connect.user_data == connect_userdata);
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_connect.err());
|
||||
// test accept completion
|
||||
try testing.expect(cqe_accept.user_data == accept_userdata);
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_accept.err());
|
||||
|
||||
// submit and test close_direct
|
||||
_ = try ring.close_direct(close_userdata, @intCast(fd_index));
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
var cqe_close = try ring.copy_cqe();
|
||||
try testing.expect(cqe_close.user_data == close_userdata);
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_close.err());
|
||||
}
|
||||
|
||||
try ring.unregister_files();
|
||||
}
|
||||
|
||||
test "openat_direct/close_direct" {
|
||||
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
|
||||
|
||||
var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
var registered_fds = [_]os.fd_t{-1} ** 3;
|
||||
try ring.register_files(registered_fds[0..]);
|
||||
|
||||
var tmp = std.testing.tmpDir(.{});
|
||||
defer tmp.cleanup();
|
||||
const path = "test_io_uring_close_direct";
|
||||
const flags: u32 = os.O.RDWR | os.O.CREAT;
|
||||
const mode: os.mode_t = 0o666;
|
||||
const user_data: u64 = 0;
|
||||
|
||||
// use registered file at index 0 (last param)
|
||||
_ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
var cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe.err());
|
||||
try testing.expect(cqe.res == 0);
|
||||
|
||||
// use registered file at index 1
|
||||
_ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe.err());
|
||||
try testing.expect(cqe.res == 0); // res is 0 when we specify index
|
||||
|
||||
// let kernel choose registered file index
|
||||
_ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe.err());
|
||||
try testing.expect(cqe.res == 2); // chosen index is in res
|
||||
|
||||
// close all open file descriptors
|
||||
for (registered_fds, 0..) |_, fd_index| {
|
||||
_ = try ring.close_direct(user_data, @intCast(fd_index));
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit());
|
||||
var cqe_close = try ring.copy_cqe();
|
||||
try testing.expectEqual(os.E.SUCCESS, cqe_close.err());
|
||||
}
|
||||
try ring.unregister_files();
|
||||
}
|
||||
|
||||
/// For use in tests. Returns SkipZigTest is kernel version is less than required.
|
||||
inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
|
||||
if (builtin.os.tag != .linux) return error.SkipZigTest;
|
||||
|
||||
var uts: linux.utsname = undefined;
|
||||
const res = linux.uname(&uts);
|
||||
switch (linux.getErrno(res)) {
|
||||
.SUCCESS => {},
|
||||
else => |errno| return os.unexpectedErrno(errno),
|
||||
}
|
||||
|
||||
const release = mem.sliceTo(&uts.release, 0);
|
||||
var current = try std.SemanticVersion.parse(release);
|
||||
current.pre = null; // don't check pre field
|
||||
if (required.order(current) == .gt) return error.SkipZigTest;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user