diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index bb48c63719..a62362c963 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -2830,6 +2830,390 @@ test "linkat" { try testing.expectEqualStrings("hello", second_file_data[0..read]); } +test "provide_buffers: read" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffers[0].len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Do 4 reads which should consume all buffers + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // This read should fail + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 42); + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Final read which should work + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } +} + +test "remove_buffers" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + _ = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Remove the first 3 buffers + + { + var sqe = try ring.remove_buffers(0xbababababa, 3, group_id); + try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, 3), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data); + } + + // This read should work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, 0); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // Final read should _not_ work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } +} + +test "provide_buffers: accept/connect/send/recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); + + // Do 4 send on the socket + + { + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + } + + var cqes: [4]linux.io_uring_cqe = undefined; + try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4)); + } + + // Do 4 recv which should consume all buffers + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer); + } + + // This recv should fail + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Redo 1 send on the server socket + + { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + _ = try ring.copy_cqe(); + } + + // Final recv which should work + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer); + } +} + /// Used for testing server/client interactions. const SocketTestHarness = struct { listener: os.socket_t,