blob 3b4a1411 (12145B) - Raw
1 const std = @import("../std.zig"); 2 const builtin = @import("builtin"); 3 const assert = std.debug.assert; 4 const expect = std.testing.expect; 5 6 /// Many producer, many consumer, non-allocating, thread-safe. 7 /// Uses a mutex to protect access. 8 /// The queue does not manage ownership and the user is responsible to 9 /// manage the storage of the nodes. 10 pub fn Queue(comptime T: type) type { 11 return struct { 12 head: ?*Node, 13 tail: ?*Node, 14 mutex: std.Thread.Mutex, 15 16 pub const Self = @This(); 17 pub const Node = std.TailQueue(T).Node; 18 19 /// Initializes a new queue. The queue does not provide a `deinit()` 20 /// function, so the user must take care of cleaning up the queue elements. 21 pub fn init() Self { 22 return Self{ 23 .head = null, 24 .tail = null, 25 .mutex = std.Thread.Mutex{}, 26 }; 27 } 28 29 /// Appends `node` to the queue. 30 /// The lifetime of `node` must be longer than lifetime of queue. 31 pub fn put(self: *Self, node: *Node) void { 32 node.next = null; 33 34 self.mutex.lock(); 35 defer self.mutex.unlock(); 36 37 node.prev = self.tail; 38 self.tail = node; 39 if (node.prev) |prev_tail| { 40 prev_tail.next = node; 41 } else { 42 assert(self.head == null); 43 self.head = node; 44 } 45 } 46 47 /// Gets a previously inserted node or returns `null` if there is none. 48 /// It is safe to `get()` a node from the queue while another thread tries 49 /// to `remove()` the same node at the same time. 50 pub fn get(self: *Self) ?*Node { 51 self.mutex.lock(); 52 defer self.mutex.unlock(); 53 54 const head = self.head orelse return null; 55 self.head = head.next; 56 if (head.next) |new_head| { 57 new_head.prev = null; 58 } else { 59 self.tail = null; 60 } 61 // This way, a get() and a remove() are thread-safe with each other. 62 head.prev = null; 63 head.next = null; 64 return head; 65 } 66 67 pub fn unget(self: *Self, node: *Node) void { 68 node.prev = null; 69 70 self.mutex.lock(); 71 defer self.mutex.unlock(); 72 73 const opt_head = self.head; 74 self.head = node; 75 if (opt_head) |head| { 76 head.next = node; 77 } else { 78 assert(self.tail == null); 79 self.tail = node; 80 } 81 } 82 83 /// Removes a node from the queue, returns whether node was actually removed. 84 /// It is safe to `remove()` a node from the queue while another thread tries 85 /// to `get()` the same node at the same time. 86 pub fn remove(self: *Self, node: *Node) bool { 87 self.mutex.lock(); 88 defer self.mutex.unlock(); 89 90 if (node.prev == null and node.next == null and self.head != node) { 91 return false; 92 } 93 94 if (node.prev) |prev| { 95 prev.next = node.next; 96 } else { 97 self.head = node.next; 98 } 99 if (node.next) |next| { 100 next.prev = node.prev; 101 } else { 102 self.tail = node.prev; 103 } 104 node.prev = null; 105 node.next = null; 106 return true; 107 } 108 109 /// Returns `true` if the queue is currently empty. 110 /// Note that in a multi-consumer environment a return value of `false` 111 /// does not mean that `get` will yield a non-`null` value! 112 pub fn isEmpty(self: *Self) bool { 113 self.mutex.lock(); 114 defer self.mutex.unlock(); 115 return self.head == null; 116 } 117 118 /// Dumps the contents of the queue to `stderr`. 119 pub fn dump(self: *Self) void { 120 self.dumpToStream(std.io.getStdErr().writer()) catch return; 121 } 122 123 /// Dumps the contents of the queue to `stream`. 124 /// Up to 4 elements from the head are dumped and the tail of the queue is 125 /// dumped as well. 126 pub fn dumpToStream(self: *Self, stream: anytype) !void { 127 const S = struct { 128 fn dumpRecursive( 129 s: anytype, 130 optional_node: ?*Node, 131 indent: usize, 132 comptime depth: comptime_int, 133 ) !void { 134 try s.writeByteNTimes(' ', indent); 135 if (optional_node) |node| { 136 try s.print("0x{x}={}\n", .{ @ptrToInt(node), node.data }); 137 if (depth == 0) { 138 try s.print("(max depth)\n", .{}); 139 return; 140 } 141 try dumpRecursive(s, node.next, indent + 1, depth - 1); 142 } else { 143 try s.print("(null)\n", .{}); 144 } 145 } 146 }; 147 self.mutex.lock(); 148 defer self.mutex.unlock(); 149 150 try stream.print("head: ", .{}); 151 try S.dumpRecursive(stream, self.head, 0, 4); 152 try stream.print("tail: ", .{}); 153 try S.dumpRecursive(stream, self.tail, 0, 4); 154 } 155 }; 156 } 157 158 const Context = struct { 159 allocator: std.mem.Allocator, 160 queue: *Queue(i32), 161 put_sum: isize, 162 get_sum: isize, 163 get_count: usize, 164 puts_done: bool, 165 }; 166 167 // TODO add lazy evaluated build options and then put puts_per_thread behind 168 // some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor 169 // CI we would use a less aggressive setting since at 1 core, while we still 170 // want this test to pass, we need a smaller value since there is so much thrashing 171 // we would also use a less aggressive setting when running in valgrind 172 const puts_per_thread = 500; 173 const put_thread_count = 3; 174 175 test "std.atomic.Queue" { 176 var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024); 177 defer std.heap.page_allocator.free(plenty_of_memory); 178 179 var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory); 180 var a = fixed_buffer_allocator.getThreadSafeAllocator(); 181 182 var queue = Queue(i32).init(); 183 var context = Context{ 184 .allocator = a, 185 .queue = &queue, 186 .put_sum = 0, 187 .get_sum = 0, 188 .puts_done = false, 189 .get_count = 0, 190 }; 191 192 if (builtin.single_threaded) { 193 try expect(context.queue.isEmpty()); 194 { 195 var i: usize = 0; 196 while (i < put_thread_count) : (i += 1) { 197 try expect(startPuts(&context) == 0); 198 } 199 } 200 try expect(!context.queue.isEmpty()); 201 context.puts_done = true; 202 { 203 var i: usize = 0; 204 while (i < put_thread_count) : (i += 1) { 205 try expect(startGets(&context) == 0); 206 } 207 } 208 try expect(context.queue.isEmpty()); 209 } else { 210 try expect(context.queue.isEmpty()); 211 212 var putters: [put_thread_count]std.Thread = undefined; 213 for (putters) |*t| { 214 t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); 215 } 216 var getters: [put_thread_count]std.Thread = undefined; 217 for (getters) |*t| { 218 t.* = try std.Thread.spawn(.{}, startGets, .{&context}); 219 } 220 221 for (putters) |t| 222 t.join(); 223 @atomicStore(bool, &context.puts_done, true, .SeqCst); 224 for (getters) |t| 225 t.join(); 226 227 try expect(context.queue.isEmpty()); 228 } 229 230 if (context.put_sum != context.get_sum) { 231 std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum }); 232 } 233 234 if (context.get_count != puts_per_thread * put_thread_count) { 235 std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{ 236 context.get_count, 237 @as(u32, puts_per_thread), 238 @as(u32, put_thread_count), 239 }); 240 } 241 } 242 243 fn startPuts(ctx: *Context) u8 { 244 var put_count: usize = puts_per_thread; 245 var prng = std.rand.DefaultPrng.init(0xdeadbeef); 246 const random = prng.random(); 247 while (put_count != 0) : (put_count -= 1) { 248 std.time.sleep(1); // let the os scheduler be our fuzz 249 const x = @bitCast(i32, random.int(u32)); 250 const node = ctx.allocator.create(Queue(i32).Node) catch unreachable; 251 node.* = .{ 252 .prev = undefined, 253 .next = undefined, 254 .data = x, 255 }; 256 ctx.queue.put(node); 257 _ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst); 258 } 259 return 0; 260 } 261 262 fn startGets(ctx: *Context) u8 { 263 while (true) { 264 const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst); 265 266 while (ctx.queue.get()) |node| { 267 std.time.sleep(1); // let the os scheduler be our fuzz 268 _ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst); 269 _ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst); 270 } 271 272 if (last) return 0; 273 } 274 } 275 276 test "std.atomic.Queue single-threaded" { 277 var queue = Queue(i32).init(); 278 try expect(queue.isEmpty()); 279 280 var node_0 = Queue(i32).Node{ 281 .data = 0, 282 .next = undefined, 283 .prev = undefined, 284 }; 285 queue.put(&node_0); 286 try expect(!queue.isEmpty()); 287 288 var node_1 = Queue(i32).Node{ 289 .data = 1, 290 .next = undefined, 291 .prev = undefined, 292 }; 293 queue.put(&node_1); 294 try expect(!queue.isEmpty()); 295 296 try expect(queue.get().?.data == 0); 297 try expect(!queue.isEmpty()); 298 299 var node_2 = Queue(i32).Node{ 300 .data = 2, 301 .next = undefined, 302 .prev = undefined, 303 }; 304 queue.put(&node_2); 305 try expect(!queue.isEmpty()); 306 307 var node_3 = Queue(i32).Node{ 308 .data = 3, 309 .next = undefined, 310 .prev = undefined, 311 }; 312 queue.put(&node_3); 313 try expect(!queue.isEmpty()); 314 315 try expect(queue.get().?.data == 1); 316 try expect(!queue.isEmpty()); 317 318 try expect(queue.get().?.data == 2); 319 try expect(!queue.isEmpty()); 320 321 var node_4 = Queue(i32).Node{ 322 .data = 4, 323 .next = undefined, 324 .prev = undefined, 325 }; 326 queue.put(&node_4); 327 try expect(!queue.isEmpty()); 328 329 try expect(queue.get().?.data == 3); 330 node_3.next = null; 331 try expect(!queue.isEmpty()); 332 333 try expect(queue.get().?.data == 4); 334 try expect(queue.isEmpty()); 335 336 try expect(queue.get() == null); 337 try expect(queue.isEmpty()); 338 } 339 340 test "std.atomic.Queue dump" { 341 const mem = std.mem; 342 var buffer: [1024]u8 = undefined; 343 var expected_buffer: [1024]u8 = undefined; 344 var fbs = std.io.fixedBufferStream(&buffer); 345 346 var queue = Queue(i32).init(); 347 348 // Test empty stream 349 fbs.reset(); 350 try queue.dumpToStream(fbs.writer()); 351 try expect(mem.eql(u8, buffer[0..fbs.pos], 352 \\head: (null) 353 \\tail: (null) 354 \\ 355 )); 356 357 // Test a stream with one element 358 var node_0 = Queue(i32).Node{ 359 .data = 1, 360 .next = undefined, 361 .prev = undefined, 362 }; 363 queue.put(&node_0); 364 365 fbs.reset(); 366 try queue.dumpToStream(fbs.writer()); 367 368 var expected = try std.fmt.bufPrint(expected_buffer[0..], 369 \\head: 0x{x}=1 370 \\ (null) 371 \\tail: 0x{x}=1 372 \\ (null) 373 \\ 374 , .{ @ptrToInt(queue.head), @ptrToInt(queue.tail) }); 375 try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); 376 377 // Test a stream with two elements 378 var node_1 = Queue(i32).Node{ 379 .data = 2, 380 .next = undefined, 381 .prev = undefined, 382 }; 383 queue.put(&node_1); 384 385 fbs.reset(); 386 try queue.dumpToStream(fbs.writer()); 387 388 expected = try std.fmt.bufPrint(expected_buffer[0..], 389 \\head: 0x{x}=1 390 \\ 0x{x}=2 391 \\ (null) 392 \\tail: 0x{x}=2 393 \\ (null) 394 \\ 395 , .{ @ptrToInt(queue.head), @ptrToInt(queue.head.?.next), @ptrToInt(queue.tail) }); 396 try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); 397 }