zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

blob 3b4a1411 (12145B) - Raw


      1 const std = @import("../std.zig");
      2 const builtin = @import("builtin");
      3 const assert = std.debug.assert;
      4 const expect = std.testing.expect;
      5 
      6 /// Many producer, many consumer, non-allocating, thread-safe.
      7 /// Uses a mutex to protect access.
      8 /// The queue does not manage ownership and the user is responsible to
      9 /// manage the storage of the nodes.
     10 pub fn Queue(comptime T: type) type {
     11     return struct {
     12         head: ?*Node,
     13         tail: ?*Node,
     14         mutex: std.Thread.Mutex,
     15 
     16         pub const Self = @This();
     17         pub const Node = std.TailQueue(T).Node;
     18 
     19         /// Initializes a new queue. The queue does not provide a `deinit()`
     20         /// function, so the user must take care of cleaning up the queue elements.
     21         pub fn init() Self {
     22             return Self{
     23                 .head = null,
     24                 .tail = null,
     25                 .mutex = std.Thread.Mutex{},
     26             };
     27         }
     28 
     29         /// Appends `node` to the queue.
     30         /// The lifetime of `node` must be longer than lifetime of queue.
     31         pub fn put(self: *Self, node: *Node) void {
     32             node.next = null;
     33 
     34             self.mutex.lock();
     35             defer self.mutex.unlock();
     36 
     37             node.prev = self.tail;
     38             self.tail = node;
     39             if (node.prev) |prev_tail| {
     40                 prev_tail.next = node;
     41             } else {
     42                 assert(self.head == null);
     43                 self.head = node;
     44             }
     45         }
     46 
     47         /// Gets a previously inserted node or returns `null` if there is none.
     48         /// It is safe to `get()` a node from the queue while another thread tries
     49         /// to `remove()` the same node at the same time.
     50         pub fn get(self: *Self) ?*Node {
     51             self.mutex.lock();
     52             defer self.mutex.unlock();
     53 
     54             const head = self.head orelse return null;
     55             self.head = head.next;
     56             if (head.next) |new_head| {
     57                 new_head.prev = null;
     58             } else {
     59                 self.tail = null;
     60             }
     61             // This way, a get() and a remove() are thread-safe with each other.
     62             head.prev = null;
     63             head.next = null;
     64             return head;
     65         }
     66 
     67         pub fn unget(self: *Self, node: *Node) void {
     68             node.prev = null;
     69 
     70             self.mutex.lock();
     71             defer self.mutex.unlock();
     72 
     73             const opt_head = self.head;
     74             self.head = node;
     75             if (opt_head) |head| {
     76                 head.next = node;
     77             } else {
     78                 assert(self.tail == null);
     79                 self.tail = node;
     80             }
     81         }
     82 
     83         /// Removes a node from the queue, returns whether node was actually removed.
     84         /// It is safe to `remove()` a node from the queue while another thread tries
     85         /// to `get()` the same node at the same time.
     86         pub fn remove(self: *Self, node: *Node) bool {
     87             self.mutex.lock();
     88             defer self.mutex.unlock();
     89 
     90             if (node.prev == null and node.next == null and self.head != node) {
     91                 return false;
     92             }
     93 
     94             if (node.prev) |prev| {
     95                 prev.next = node.next;
     96             } else {
     97                 self.head = node.next;
     98             }
     99             if (node.next) |next| {
    100                 next.prev = node.prev;
    101             } else {
    102                 self.tail = node.prev;
    103             }
    104             node.prev = null;
    105             node.next = null;
    106             return true;
    107         }
    108 
    109         /// Returns `true` if the queue is currently empty.
    110         /// Note that in a multi-consumer environment a return value of `false`
    111         /// does not mean that `get` will yield a non-`null` value!
    112         pub fn isEmpty(self: *Self) bool {
    113             self.mutex.lock();
    114             defer self.mutex.unlock();
    115             return self.head == null;
    116         }
    117 
    118         /// Dumps the contents of the queue to `stderr`.
    119         pub fn dump(self: *Self) void {
    120             self.dumpToStream(std.io.getStdErr().writer()) catch return;
    121         }
    122 
    123         /// Dumps the contents of the queue to `stream`.
    124         /// Up to 4 elements from the head are dumped and the tail of the queue is
    125         /// dumped as well.
    126         pub fn dumpToStream(self: *Self, stream: anytype) !void {
    127             const S = struct {
    128                 fn dumpRecursive(
    129                     s: anytype,
    130                     optional_node: ?*Node,
    131                     indent: usize,
    132                     comptime depth: comptime_int,
    133                 ) !void {
    134                     try s.writeByteNTimes(' ', indent);
    135                     if (optional_node) |node| {
    136                         try s.print("0x{x}={}\n", .{ @ptrToInt(node), node.data });
    137                         if (depth == 0) {
    138                             try s.print("(max depth)\n", .{});
    139                             return;
    140                         }
    141                         try dumpRecursive(s, node.next, indent + 1, depth - 1);
    142                     } else {
    143                         try s.print("(null)\n", .{});
    144                     }
    145                 }
    146             };
    147             self.mutex.lock();
    148             defer self.mutex.unlock();
    149 
    150             try stream.print("head: ", .{});
    151             try S.dumpRecursive(stream, self.head, 0, 4);
    152             try stream.print("tail: ", .{});
    153             try S.dumpRecursive(stream, self.tail, 0, 4);
    154         }
    155     };
    156 }
    157 
    158 const Context = struct {
    159     allocator: std.mem.Allocator,
    160     queue: *Queue(i32),
    161     put_sum: isize,
    162     get_sum: isize,
    163     get_count: usize,
    164     puts_done: bool,
    165 };
    166 
    167 // TODO add lazy evaluated build options and then put puts_per_thread behind
    168 // some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor
    169 // CI we would use a less aggressive setting since at 1 core, while we still
    170 // want this test to pass, we need a smaller value since there is so much thrashing
    171 // we would also use a less aggressive setting when running in valgrind
    172 const puts_per_thread = 500;
    173 const put_thread_count = 3;
    174 
    175 test "std.atomic.Queue" {
    176     var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
    177     defer std.heap.page_allocator.free(plenty_of_memory);
    178 
    179     var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory);
    180     var a = fixed_buffer_allocator.getThreadSafeAllocator();
    181 
    182     var queue = Queue(i32).init();
    183     var context = Context{
    184         .allocator = a,
    185         .queue = &queue,
    186         .put_sum = 0,
    187         .get_sum = 0,
    188         .puts_done = false,
    189         .get_count = 0,
    190     };
    191 
    192     if (builtin.single_threaded) {
    193         try expect(context.queue.isEmpty());
    194         {
    195             var i: usize = 0;
    196             while (i < put_thread_count) : (i += 1) {
    197                 try expect(startPuts(&context) == 0);
    198             }
    199         }
    200         try expect(!context.queue.isEmpty());
    201         context.puts_done = true;
    202         {
    203             var i: usize = 0;
    204             while (i < put_thread_count) : (i += 1) {
    205                 try expect(startGets(&context) == 0);
    206             }
    207         }
    208         try expect(context.queue.isEmpty());
    209     } else {
    210         try expect(context.queue.isEmpty());
    211 
    212         var putters: [put_thread_count]std.Thread = undefined;
    213         for (putters) |*t| {
    214             t.* = try std.Thread.spawn(.{}, startPuts, .{&context});
    215         }
    216         var getters: [put_thread_count]std.Thread = undefined;
    217         for (getters) |*t| {
    218             t.* = try std.Thread.spawn(.{}, startGets, .{&context});
    219         }
    220 
    221         for (putters) |t|
    222             t.join();
    223         @atomicStore(bool, &context.puts_done, true, .SeqCst);
    224         for (getters) |t|
    225             t.join();
    226 
    227         try expect(context.queue.isEmpty());
    228     }
    229 
    230     if (context.put_sum != context.get_sum) {
    231         std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum });
    232     }
    233 
    234     if (context.get_count != puts_per_thread * put_thread_count) {
    235         std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{
    236             context.get_count,
    237             @as(u32, puts_per_thread),
    238             @as(u32, put_thread_count),
    239         });
    240     }
    241 }
    242 
    243 fn startPuts(ctx: *Context) u8 {
    244     var put_count: usize = puts_per_thread;
    245     var prng = std.rand.DefaultPrng.init(0xdeadbeef);
    246     const random = prng.random();
    247     while (put_count != 0) : (put_count -= 1) {
    248         std.time.sleep(1); // let the os scheduler be our fuzz
    249         const x = @bitCast(i32, random.int(u32));
    250         const node = ctx.allocator.create(Queue(i32).Node) catch unreachable;
    251         node.* = .{
    252             .prev = undefined,
    253             .next = undefined,
    254             .data = x,
    255         };
    256         ctx.queue.put(node);
    257         _ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst);
    258     }
    259     return 0;
    260 }
    261 
    262 fn startGets(ctx: *Context) u8 {
    263     while (true) {
    264         const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst);
    265 
    266         while (ctx.queue.get()) |node| {
    267             std.time.sleep(1); // let the os scheduler be our fuzz
    268             _ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst);
    269             _ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst);
    270         }
    271 
    272         if (last) return 0;
    273     }
    274 }
    275 
    276 test "std.atomic.Queue single-threaded" {
    277     var queue = Queue(i32).init();
    278     try expect(queue.isEmpty());
    279 
    280     var node_0 = Queue(i32).Node{
    281         .data = 0,
    282         .next = undefined,
    283         .prev = undefined,
    284     };
    285     queue.put(&node_0);
    286     try expect(!queue.isEmpty());
    287 
    288     var node_1 = Queue(i32).Node{
    289         .data = 1,
    290         .next = undefined,
    291         .prev = undefined,
    292     };
    293     queue.put(&node_1);
    294     try expect(!queue.isEmpty());
    295 
    296     try expect(queue.get().?.data == 0);
    297     try expect(!queue.isEmpty());
    298 
    299     var node_2 = Queue(i32).Node{
    300         .data = 2,
    301         .next = undefined,
    302         .prev = undefined,
    303     };
    304     queue.put(&node_2);
    305     try expect(!queue.isEmpty());
    306 
    307     var node_3 = Queue(i32).Node{
    308         .data = 3,
    309         .next = undefined,
    310         .prev = undefined,
    311     };
    312     queue.put(&node_3);
    313     try expect(!queue.isEmpty());
    314 
    315     try expect(queue.get().?.data == 1);
    316     try expect(!queue.isEmpty());
    317 
    318     try expect(queue.get().?.data == 2);
    319     try expect(!queue.isEmpty());
    320 
    321     var node_4 = Queue(i32).Node{
    322         .data = 4,
    323         .next = undefined,
    324         .prev = undefined,
    325     };
    326     queue.put(&node_4);
    327     try expect(!queue.isEmpty());
    328 
    329     try expect(queue.get().?.data == 3);
    330     node_3.next = null;
    331     try expect(!queue.isEmpty());
    332 
    333     try expect(queue.get().?.data == 4);
    334     try expect(queue.isEmpty());
    335 
    336     try expect(queue.get() == null);
    337     try expect(queue.isEmpty());
    338 }
    339 
    340 test "std.atomic.Queue dump" {
    341     const mem = std.mem;
    342     var buffer: [1024]u8 = undefined;
    343     var expected_buffer: [1024]u8 = undefined;
    344     var fbs = std.io.fixedBufferStream(&buffer);
    345 
    346     var queue = Queue(i32).init();
    347 
    348     // Test empty stream
    349     fbs.reset();
    350     try queue.dumpToStream(fbs.writer());
    351     try expect(mem.eql(u8, buffer[0..fbs.pos],
    352         \\head: (null)
    353         \\tail: (null)
    354         \\
    355     ));
    356 
    357     // Test a stream with one element
    358     var node_0 = Queue(i32).Node{
    359         .data = 1,
    360         .next = undefined,
    361         .prev = undefined,
    362     };
    363     queue.put(&node_0);
    364 
    365     fbs.reset();
    366     try queue.dumpToStream(fbs.writer());
    367 
    368     var expected = try std.fmt.bufPrint(expected_buffer[0..],
    369         \\head: 0x{x}=1
    370         \\ (null)
    371         \\tail: 0x{x}=1
    372         \\ (null)
    373         \\
    374     , .{ @ptrToInt(queue.head), @ptrToInt(queue.tail) });
    375     try expect(mem.eql(u8, buffer[0..fbs.pos], expected));
    376 
    377     // Test a stream with two elements
    378     var node_1 = Queue(i32).Node{
    379         .data = 2,
    380         .next = undefined,
    381         .prev = undefined,
    382     };
    383     queue.put(&node_1);
    384 
    385     fbs.reset();
    386     try queue.dumpToStream(fbs.writer());
    387 
    388     expected = try std.fmt.bufPrint(expected_buffer[0..],
    389         \\head: 0x{x}=1
    390         \\ 0x{x}=2
    391         \\  (null)
    392         \\tail: 0x{x}=2
    393         \\ (null)
    394         \\
    395     , .{ @ptrToInt(queue.head), @ptrToInt(queue.head.?.next), @ptrToInt(queue.tail) });
    396     try expect(mem.eql(u8, buffer[0..fbs.pos], expected));
    397 }