diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 9e05cced8a..3892e4326a 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -4387,6 +4387,16 @@ pub const io_uring_cqe = extern struct { } return .SUCCESS; } + + // On successful completion of the provided buffers IO request, the CQE flags field + // will have IORING_CQE_F_BUFFER set and the selected buffer ID will be indicated by + // the upper 16-bits of the flags field. + pub fn buffer_id(self: io_uring_cqe) !u16 { + if (self.flags & IORING_CQE_F_BUFFER != IORING_CQE_F_BUFFER) { + return error.NoBufferSelected; + } + return @as(u16, @intCast(self.flags >> IORING_CQE_BUFFER_SHIFT)); + } }; // io_uring_cqe.flags @@ -4667,8 +4677,12 @@ pub const io_uring_buf = extern struct { resv: u16, }; -// io_uring_buf_ring struct omitted -// it's a io_uring_buf array with the resv of the first item used as a "tail" field. +pub const io_uring_buf_ring = extern struct { + resv1: u64, + resv2: u32, + resv3: u16, + tail: u16, +}; /// argument for IORING_(UN)REGISTER_PBUF_RING pub const io_uring_buf_reg = extern struct { diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index ecdc9ae138..9e9957c697 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1,5 +1,5 @@ const IoUring = @This(); -const std = @import("../../std.zig"); +const std = @import("std"); const builtin = @import("builtin"); const assert = std.debug.assert; const mem = std.mem; @@ -1440,6 +1440,229 @@ pub const CompletionQueue = struct { } }; +/// Group of application provided buffers. Uses newer type, called ring mapped +/// buffers, supported since kernel 5.19. Buffers are identified by a buffer +/// group ID, and within that group, a buffer ID. IO_Uring can have multiple +/// buffer groups, each with unique group ID. +/// +/// In `init` application provides contiguous block of memory `buffers` for +/// `buffers_count` buffers of size `buffers_size`. Application can then submit +/// `recv` operation without providing buffer upfront. Once the operation is +/// ready to receive data, a buffer is picked automatically and the resulting +/// CQE will contain the buffer ID in `cqe.buffer_id()`. Use `get` method to get +/// buffer for buffer ID identified by CQE. Once the application has processed +/// the buffer, it may hand ownership back to the kernel, by calling `put` +/// allowing the cycle to repeat. +/// +/// Depending on the rate of arrival of data, it is possible that a given buffer +/// group will run out of buffers before those in CQEs can be put back to the +/// kernel. If this happens, a `cqe.err()` will have ENOBUFS as the error value. +/// +pub const BufferGroup = struct { + /// Parent ring for which this group is registered. + ring: *IoUring, + /// Pointer to the memory shared by the kernel. + /// `buffers_count` of `io_uring_buf` structures are shared by the kernel. + /// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct. + br: *align(mem.page_size) linux.io_uring_buf_ring, + /// Contiguous block of memory of size (buffers_count * buffer_size). + buffers: []u8, + /// Size of each buffer in buffers. + buffer_size: u32, + // Number of buffers in `buffers`, number of `io_uring_buf structures` in br. + buffers_count: u16, + /// ID of this group, must be unique in ring. + group_id: u16, + + pub fn init( + ring: *IoUring, + group_id: u16, + buffers: []u8, + buffer_size: u32, + buffers_count: u16, + ) !BufferGroup { + assert(buffers.len == buffers_count * buffer_size); + + const br = try setup_buf_ring(ring.fd, buffers_count, group_id); + buf_ring_init(br); + + const mask = buf_ring_mask(buffers_count); + var i: u16 = 0; + while (i < buffers_count) : (i += 1) { + const start = buffer_size * i; + const buf = buffers[start .. start + buffer_size]; + buf_ring_add(br, buf, i, mask, i); + } + buf_ring_advance(br, buffers_count); + + return BufferGroup{ + .ring = ring, + .group_id = group_id, + .br = br, + .buffers = buffers, + .buffer_size = buffer_size, + .buffers_count = buffers_count, + }; + } + + // Prepare recv operation which will select buffer from this group. + pub fn recv(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe { + var sqe = try self.ring.get_sqe(); + sqe.prep_rw(.RECV, fd, 0, 0, 0); + sqe.rw_flags = flags; + sqe.flags |= linux.IOSQE_BUFFER_SELECT; + sqe.buf_index = self.group_id; + sqe.user_data = user_data; + return sqe; + } + + // Prepare multishot recv operation which will select buffer from this group. + pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe { + var sqe = try self.recv(user_data, fd, flags); + sqe.ioprio |= linux.IORING_RECV_MULTISHOT; + return sqe; + } + + // Get buffer by id. + pub fn get(self: *BufferGroup, buffer_id: u16) []u8 { + const head = self.buffer_size * buffer_id; + return self.buffers[head .. head + self.buffer_size]; + } + + // Get buffer by CQE. + pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { + const buffer_id = try cqe.buffer_id(); + const used_len = @as(usize, @intCast(cqe.res)); + return self.get(buffer_id)[0..used_len]; + } + + // Release buffer to the kernel. + pub fn put(self: *BufferGroup, buffer_id: u16) void { + const mask = buf_ring_mask(self.buffers_count); + const buffer = self.get(buffer_id); + buf_ring_add(self.br, buffer, buffer_id, mask, 0); + buf_ring_advance(self.br, 1); + } + + // Release buffer from CQE to the kernel. + pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { + self.put(try cqe.buffer_id()); + } + + pub fn deinit(self: *BufferGroup) void { + free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + } +}; + +/// Registers a shared buffer ring to be used with provided buffers. +/// `entries` number of `io_uring_buf` structures is mem mapped and shared by kernel. +/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. +/// `entries` is the number of entries requested in the buffer ring, must be power of 2. +/// `group_id` is the chosen buffer group ID, unique in IO_Uring. +pub fn setup_buf_ring(fd: os.fd_t, entries: u16, group_id: u16) !*align(mem.page_size) linux.io_uring_buf_ring { + if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange; + if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; + + const mmap_size = entries * @sizeOf(linux.io_uring_buf); + const mmap = try os.mmap( + null, + mmap_size, + os.PROT.READ | os.PROT.WRITE, + .{ .TYPE = .PRIVATE, .ANONYMOUS = true }, + -1, + 0, + ); + errdefer os.munmap(mmap); + assert(mmap.len == mmap_size); + + const br: *align(mem.page_size) linux.io_uring_buf_ring = @ptrCast(mmap.ptr); + try register_buf_ring(fd, @intFromPtr(br), entries, group_id); + return br; +} + +fn register_buf_ring(fd: os.fd_t, addr: u64, entries: u32, group_id: u16) !void { + var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ + .ring_addr = addr, + .ring_entries = entries, + .bgid = group_id, + }); + const res = linux.io_uring_register( + fd, + .REGISTER_PBUF_RING, + @as(*const anyopaque, @ptrCast(®)), + 1, + ); + try handle_register_buf_ring_result(res); +} + +fn unregister_buf_ring(fd: os.fd_t, group_id: u16) !void { + var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ + .bgid = group_id, + }); + const res = linux.io_uring_register( + fd, + .UNREGISTER_PBUF_RING, + @as(*const anyopaque, @ptrCast(®)), + 1, + ); + try handle_register_buf_ring_result(res); +} + +fn handle_register_buf_ring_result(res: usize) !void { + switch (linux.getErrno(res)) { + .SUCCESS => {}, + .INVAL => return error.ArgumentsInvalid, + else => |errno| return os.unexpectedErrno(errno), + } +} + +// Unregisters a previously registered shared buffer ring, returned from io_uring_setup_buf_ring. +pub fn free_buf_ring(fd: os.fd_t, br: *align(mem.page_size) linux.io_uring_buf_ring, entries: u32, group_id: u16) void { + unregister_buf_ring(fd, group_id) catch {}; + var mmap: []align(mem.page_size) u8 = undefined; + mmap.ptr = @ptrCast(br); + mmap.len = entries * @sizeOf(linux.io_uring_buf); + os.munmap(mmap); +} + +/// Initialises `br` so that it is ready to be used. +pub fn buf_ring_init(br: *linux.io_uring_buf_ring) void { + br.tail = 0; +} + +/// Calculates the appropriate size mask for a buffer ring. +/// `entries` is the ring entries as specified in io_uring_register_buf_ring. +pub fn buf_ring_mask(entries: u16) u16 { + return entries - 1; +} + +/// Assigns `buffer` with the `br` buffer ring. +/// `buffer_id` is identifier which will be returned in the CQE. +/// `buffer_offset` is the offset to insert at from the current tail. +/// If just one buffer is provided before the ring tail is committed with advance then offset should be 0. +/// If buffers are provided in a loop before being committed, the offset must be incremented by one for each buffer added. +pub fn buf_ring_add( + br: *linux.io_uring_buf_ring, + buffer: []u8, + buffer_id: u16, + mask: u16, + buffer_offset: u16, +) void { + const bufs: [*]linux.io_uring_buf = @ptrCast(br); + const buf: *linux.io_uring_buf = &bufs[(br.tail +% buffer_offset) & mask]; + + buf.addr = @intFromPtr(buffer.ptr); + buf.len = @intCast(buffer.len); + buf.bid = buffer_id; +} + +/// Make `count` new buffers visible to the kernel. Called after +/// `io_uring_buf_ring_add` has been called `count` times to fill in new buffers. +pub fn buf_ring_advance(br: *linux.io_uring_buf_ring, count: u16) void { + const tail: u16 = br.tail +% count; + @atomicStore(u16, &br.tail, tail, .release); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -3652,7 +3875,7 @@ test "waitid" { try testing.expectEqual(7, siginfo.fields.common.second.sigchld.status); } -/// For use in tests. Returns SkipZigTest is kernel version is less than required. +/// For use in tests. Returns SkipZigTest if kernel version is less than required. inline fn skipKernelLessThan(required: std.SemanticVersion) !void { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -3668,3 +3891,342 @@ inline fn skipKernelLessThan(required: std.SemanticVersion) !void { current.pre = null; // don't check pre field if (required.order(current) == .gt) return error.SkipZigTest; } + +test BufferGroup { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + // Init IoUring + var ring = IoUring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + // Init buffer group for ring + const group_id: u16 = 1; // buffers group id + const buffers_count: u16 = 1; // number of buffers in buffer group + const buffer_size: usize = 128; // size of each buffer in group + const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); + defer testing.allocator.free(buffers); + var buf_grp = BufferGroup.init( + &ring, + group_id, + buffers, + buffer_size, + buffers_count, + ) catch |err| switch (err) { + // kernel older than 5.19 + error.ArgumentsInvalid => return error.SkipZigTest, + else => return err, + }; + defer buf_grp.deinit(); + + // Create client/server fds + const fds = try createSocketTestHarness(&ring); + defer fds.close(); + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + + // Client sends data + { + _ = try ring.send(1, fds.client, data[0..], 0); + const submitted = try ring.submit(); + try testing.expectEqual(1, submitted); + const cqe_send = try ring.copy_cqe(); + if (cqe_send.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ .user_data = 1, .res = data.len, .flags = 0 }, cqe_send); + } + + // Server uses buffer group receive + { + // Submit recv operation, buffer will be choosen from buffer group + _ = try buf_grp.recv(2, fds.server, 0); + const submitted = try ring.submit(); + try testing.expectEqual(1, submitted); + + // ... when we have completion for recv operation + const cqe = try ring.copy_cqe(); + try testing.expectEqual(2, cqe.user_data); // matches submitted user_data + try testing.expect(cqe.res >= 0); // success + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len + + // Read buffer_id and used buffer len from cqe + const buffer_id = try cqe.buffer_id(); + const len: usize = @intCast(cqe.res); + // Get buffer from pool + const buf = buf_grp.get(buffer_id)[0..len]; + try testing.expectEqualSlices(u8, &data, buf); + // Releaase buffer to the kernel when application is done with it + buf_grp.put(buffer_id); + } +} + +test "ring mapped buffers recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IoUring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + // init buffer group + const group_id: u16 = 1; // buffers group id + const buffers_count: u16 = 2; // number of buffers in buffer group + const buffer_size: usize = 4; // size of each buffer in group + const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); + defer testing.allocator.free(buffers); + var buf_grp = BufferGroup.init( + &ring, + group_id, + buffers, + buffer_size, + buffers_count, + ) catch |err| switch (err) { + // kernel older than 5.19 + error.ArgumentsInvalid => return error.SkipZigTest, + else => return err, + }; + defer buf_grp.deinit(); + + // create client/server fds + const fds = try createSocketTestHarness(&ring); + defer fds.close(); + + // for random user_data in sqe/cqe + var Rnd = std.rand.DefaultPrng.init(0); + var rnd = Rnd.random(); + + var round: usize = 4; // repeat send/recv cycle round times + while (round > 0) : (round -= 1) { + // client sends data + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + { + const user_data = rnd.int(u64); + _ = try ring.send(user_data, fds.client, data[0..], 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + const cqe_send = try ring.copy_cqe(); + if (cqe_send.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send); + } + + // server reads data into provided buffers + // there are 2 buffers of size 4, so each read gets only chunk of data + // we read four chunks of 4, 4, 4, 3 bytes each + var chunk: []const u8 = data[0..buffer_size]; // first chunk + const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + chunk = data[buffer_size .. buffer_size * 2]; // second chunk + const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + + // both buffers provided to the kernel are used so we get error + // 'no more buffers', until we put buffers to the kernel + { + const user_data = rnd.int(u64); + _ = try buf_grp.recv(user_data, fds.server, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expect(cqe.res < 0); // fail + try testing.expectEqual(os.E.NOBUFS, cqe.err()); + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only + try testing.expectError(error.NoBufferSelected, cqe.buffer_id()); + } + + // put buffers back to the kernel + buf_grp.put(id1); + buf_grp.put(id2); + + chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk + const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + buf_grp.put(id3); + + chunk = data[buffer_size * 3 ..]; // last chunk + const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + buf_grp.put(id4); + } +} + +test "ring mapped buffers multishot recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IoUring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + // init buffer group + const group_id: u16 = 1; // buffers group id + const buffers_count: u16 = 2; // number of buffers in buffer group + const buffer_size: usize = 4; // size of each buffer in group + const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); + defer testing.allocator.free(buffers); + var buf_grp = BufferGroup.init( + &ring, + group_id, + buffers, + buffer_size, + buffers_count, + ) catch |err| switch (err) { + // kernel older than 5.19 + error.ArgumentsInvalid => return error.SkipZigTest, + else => return err, + }; + defer buf_grp.deinit(); + + // create client/server fds + const fds = try createSocketTestHarness(&ring); + defer fds.close(); + + // for random user_data in sqe/cqe + var Rnd = std.rand.DefaultPrng.init(0); + var rnd = Rnd.random(); + + var round: usize = 4; // repeat send/recv cycle round times + while (round > 0) : (round -= 1) { + // client sends data + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + { + const user_data = rnd.int(u64); + _ = try ring.send(user_data, fds.client, data[0..], 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + const cqe_send = try ring.copy_cqe(); + if (cqe_send.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send); + } + + // start multishot recv + var recv_user_data = rnd.int(u64); + _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + + // server reads data into provided buffers + // there are 2 buffers of size 4, so each read gets only chunk of data + // we read four chunks of 4, 4, 4, 3 bytes each + var chunk: []const u8 = data[0..buffer_size]; // first chunk + const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); + try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0); + + chunk = data[buffer_size .. buffer_size * 2]; // second chunk + const cqe2 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); + try testing.expect(cqe2.flags & linux.IORING_CQE_F_MORE > 0); + + // both buffers provided to the kernel are used so we get error + // 'no more buffers', until we put buffers to the kernel + { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(recv_user_data, cqe.user_data); + try testing.expect(cqe.res < 0); // fail + try testing.expectEqual(os.E.NOBUFS, cqe.err()); + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only + // has more is not set + // indicates that multishot is finished + try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE == 0); + try testing.expectError(error.NoBufferSelected, cqe.buffer_id()); + } + + // put buffers back to the kernel + buf_grp.put(try cqe1.buffer_id()); + buf_grp.put(try cqe2.buffer_id()); + + // restart multishot + recv_user_data = rnd.int(u64); + _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + + chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk + const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); + try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0); + buf_grp.put(try cqe3.buffer_id()); + + chunk = data[buffer_size * 3 ..]; // last chunk + const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); + try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0); + buf_grp.put(try cqe4.buffer_id()); + + // cancel pending multishot recv operation + { + const cancel_user_data = rnd.int(u64); + _ = try ring.cancel(cancel_user_data, recv_user_data, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // expect completion of cancel operation and completion of recv operation + var cqe_cancel = try ring.copy_cqe(); + if (cqe_cancel.err() == .INVAL) return error.SkipZigTest; + var cqe_recv = try ring.copy_cqe(); + if (cqe_recv.err() == .INVAL) return error.SkipZigTest; + + // don't depend on order of completions + if (cqe_cancel.user_data == recv_user_data and cqe_recv.user_data == cancel_user_data) { + const a = cqe_cancel; + const b = cqe_recv; + cqe_cancel = b; + cqe_recv = a; + } + + // Note on different kernel results: + // on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16) + // cqe_cancel.err() == .NOENT + // cqe_recv.err() == .NOBUFS + // on kernel (tested with v6.5.0, v6.5.7) + // cqe_cancel.err() == .SUCCESS + // cqe_recv.err() == .CANCELED + // Upstream reference: https://github.com/axboe/liburing/issues/984 + + // cancel operation is success (or NOENT on older kernels) + try testing.expectEqual(cancel_user_data, cqe_cancel.user_data); + try testing.expect(cqe_cancel.err() == .NOENT or cqe_cancel.err() == .SUCCESS); + + // recv operation is failed with err CANCELED (or NOBUFS on older kernels) + try testing.expectEqual(recv_user_data, cqe_recv.user_data); + try testing.expect(cqe_recv.res < 0); + try testing.expect(cqe_recv.err() == .NOBUFS or cqe_recv.err() == .CANCELED); + try testing.expect(cqe_recv.flags & linux.IORING_CQE_F_MORE == 0); + } + } +} + +// Prepare and submit recv using buffer group. +// Test that buffer from group, pointed by cqe, matches expected. +fn expect_buf_grp_recv( + ring: *IoUring, + buf_grp: *BufferGroup, + fd: os.fd_t, + user_data: u64, + expected: []const u8, +) !u16 { + // prepare and submit read + const sqe = try buf_grp.recv(user_data, fd, 0); + try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT); + try testing.expect(sqe.buf_index == buf_grp.group_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + + const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected); + return try cqe.buffer_id(); +} + +fn expect_buf_grp_cqe( + ring: *IoUring, + buf_grp: *BufferGroup, + user_data: u64, + expected: []const u8, +) !linux.io_uring_cqe { + // get cqe + const cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expect(cqe.res >= 0); // success + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set + try testing.expectEqual(expected.len, @as(usize, @intCast(cqe.res))); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + + // get buffer from pool + const buffer_id = try cqe.buffer_id(); + const len = @as(usize, @intCast(cqe.res)); + const buf = buf_grp.get(buffer_id)[0..len]; + try testing.expectEqualSlices(u8, expected, buf); + + return cqe; +} diff --git a/lib/std/os/linux/io_uring_sqe.zig b/lib/std/os/linux/io_uring_sqe.zig index 5946626974..73ee195687 100644 --- a/lib/std/os/linux/io_uring_sqe.zig +++ b/lib/std/os/linux/io_uring_sqe.zig @@ -200,6 +200,36 @@ pub const io_uring_sqe = extern struct { sqe.rw_flags = flags; } + pub fn prep_recv_multishot( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + buffer: []u8, + flags: u32, + ) void { + sqe.prep_recv(fd, buffer, flags); + sqe.ioprio |= linux.IORING_RECV_MULTISHOT; + } + + pub fn prep_recvmsg( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + msg: *os.msghdr, + flags: u32, + ) void { + sqe.prep_rw(.RECVMSG, fd, @intFromPtr(msg), 1, 0); + sqe.rw_flags = flags; + } + + pub fn prep_recvmsg_multishot( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + msg: *os.msghdr, + flags: u32, + ) void { + sqe.prep_recvmsg(fd, msg, flags); + sqe.ioprio |= linux.IORING_RECV_MULTISHOT; + } + pub fn prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void { sqe.prep_rw(.SEND, fd, @intFromPtr(buffer.ptr), buffer.len, 0); sqe.rw_flags = flags; @@ -227,16 +257,6 @@ pub const io_uring_sqe = extern struct { sqe.opcode = .SENDMSG_ZC; } - pub fn prep_recvmsg( - sqe: *linux.io_uring_sqe, - fd: os.fd_t, - msg: *os.msghdr, - flags: u32, - ) void { - sqe.prep_rw(.RECVMSG, fd, @intFromPtr(msg), 1, 0); - sqe.rw_flags = flags; - } - pub fn prep_sendmsg( sqe: *linux.io_uring_sqe, fd: os.fd_t,