zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

blob 0ecfdc02 (20130B) - Raw


      1 //! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints.
      2 //! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value.
      3 //! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`.
      4 //! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals.  
      5 
      6 const std = @import("../std.zig");
      7 const builtin = @import("builtin");
      8 const Futex = @This();
      9 
     10 const target = builtin.target;
     11 const single_threaded = builtin.single_threaded;
     12 
     13 const assert = std.debug.assert;
     14 const testing = std.testing;
     15 
     16 const Atomic = std.atomic.Atomic;
     17 const spinLoopHint = std.atomic.spinLoopHint;
     18 
     19 /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
     20 /// - The value at `ptr` is no longer equal to `expect`.
     21 /// - The caller is unblocked by a matching `wake()`.
     22 /// - The caller is unblocked spuriously by an arbitrary internal signal.
     23 /// 
     24 /// If `timeout` is provided, and the caller is blocked for longer than `timeout` nanoseconds`, `error.TimedOut` is returned.
     25 ///
     26 /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
     27 /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
     28 pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
     29     if (single_threaded) {
     30         // check whether the caller should block
     31         if (ptr.loadUnchecked() != expect) {
     32             return;
     33         }
     34 
     35         // There are no other threads which could notify the caller on single_threaded.
     36         // Therefor a wait() without a timeout would block indefinitely.
     37         const timeout_ns = timeout orelse {
     38             @panic("deadlock");
     39         };
     40 
     41         // Simulate blocking with the timeout knowing that:
     42         // - no other thread can change the ptr value
     43         // - no other thread could unblock us if we waiting on the ptr
     44         std.time.sleep(timeout_ns);
     45         return error.TimedOut;
     46     }
     47 
     48     // Avoid calling into the OS for no-op waits()
     49     if (timeout) |timeout_ns| {
     50         if (timeout_ns == 0) {
     51             if (ptr.load(.SeqCst) != expect) return;
     52             return error.TimedOut;
     53         }
     54     }
     55 
     56     return OsFutex.wait(ptr, expect, timeout);
     57 }
     58 
     59 /// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`.
     60 /// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`.
     61 pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
     62     if (single_threaded) return;
     63     if (num_waiters == 0) return;
     64 
     65     return OsFutex.wake(ptr, num_waiters);
     66 }
     67 
     68 const OsFutex = if (target.os.tag == .windows)
     69     WindowsFutex
     70 else if (target.os.tag == .linux)
     71     LinuxFutex
     72 else if (target.isDarwin())
     73     DarwinFutex
     74 else if (builtin.link_libc)
     75     PosixFutex
     76 else
     77     UnsupportedFutex;
     78 
     79 const UnsupportedFutex = struct {
     80     fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
     81         return unsupported(.{ ptr, expect, timeout });
     82     }
     83 
     84     fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
     85         return unsupported(.{ ptr, num_waiters });
     86     }
     87 
     88     fn unsupported(unused: anytype) noreturn {
     89         @compileLog("Unsupported operating system", target.os.tag);
     90         _ = unused;
     91         unreachable;
     92     }
     93 };
     94 
     95 const WindowsFutex = struct {
     96     const windows = std.os.windows;
     97 
     98     fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
     99         var timeout_value: windows.LARGE_INTEGER = undefined;
    100         var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
    101 
    102         // NTDLL functions work with time in units of 100 nanoseconds.
    103         // Positive values for timeouts are absolute time while negative is relative.
    104         if (timeout) |timeout_ns| {
    105             timeout_ptr = &timeout_value;
    106             timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
    107         }
    108 
    109         switch (windows.ntdll.RtlWaitOnAddress(
    110             @ptrCast(?*const c_void, ptr),
    111             @ptrCast(?*const c_void, &expect),
    112             @sizeOf(@TypeOf(expect)),
    113             timeout_ptr,
    114         )) {
    115             .SUCCESS => {},
    116             .TIMEOUT => return error.TimedOut,
    117             else => unreachable,
    118         }
    119     }
    120 
    121     fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
    122         const address = @ptrCast(?*const c_void, ptr);
    123         switch (num_waiters) {
    124             1 => windows.ntdll.RtlWakeAddressSingle(address),
    125             else => windows.ntdll.RtlWakeAddressAll(address),
    126         }
    127     }
    128 };
    129 
    130 const LinuxFutex = struct {
    131     const linux = std.os.linux;
    132 
    133     fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
    134         var ts: std.os.timespec = undefined;
    135         var ts_ptr: ?*std.os.timespec = null;
    136 
    137         // Futex timespec timeout is already in relative time.
    138         if (timeout) |timeout_ns| {
    139             ts_ptr = &ts;
    140             ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
    141             ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
    142         }
    143 
    144         switch (linux.getErrno(linux.futex_wait(
    145             @ptrCast(*const i32, ptr),
    146             linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
    147             @bitCast(i32, expect),
    148             ts_ptr,
    149         ))) {
    150             .SUCCESS => {}, // notified by `wake()`
    151             .INTR => {}, // spurious wakeup
    152             .AGAIN => {}, // ptr.* != expect
    153             .TIMEDOUT => return error.TimedOut,
    154             .INVAL => {}, // possibly timeout overflow
    155             .FAULT => unreachable,
    156             else => unreachable,
    157         }
    158     }
    159 
    160     fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
    161         switch (linux.getErrno(linux.futex_wake(
    162             @ptrCast(*const i32, ptr),
    163             linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
    164             std.math.cast(i32, num_waiters) catch std.math.maxInt(i32),
    165         ))) {
    166             .SUCCESS => {}, // successful wake up
    167             .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
    168             .FAULT => {}, // pointer became invalid while doing the wake
    169             else => unreachable,
    170         }
    171     }
    172 };
    173 
    174 const DarwinFutex = struct {
    175     const darwin = std.os.darwin;
    176 
    177     fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
    178         // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
    179         // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
    180         //
    181         // This XNU version appears to correspond to 11.0.1:
    182         // https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
    183         //
    184         // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
    185         // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
    186         var timeout_ns: u64 = 0;
    187         if (timeout) |timeout_value| {
    188             // This should be checked by the caller.
    189             assert(timeout_value != 0);
    190             timeout_ns = timeout_value;
    191         }
    192         const addr = @ptrCast(*const c_void, ptr);
    193         const flags = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO;
    194         // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of
    195         // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users
    196         // should handle spurious wakeups), but we need to remember that we did so, so that
    197         // we don't return `TimedOut` incorrectly. If that happens, we set this variable to
    198         // true so that we we know to ignore the ETIMEDOUT result.
    199         var timeout_overflowed = false;
    200         const status = blk: {
    201             if (target.os.version_range.semver.min.major >= 11) {
    202                 break :blk darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
    203             } else {
    204                 const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) catch overflow: {
    205                     timeout_overflowed = true;
    206                     break :overflow std.math.maxInt(u32);
    207                 };
    208                 break :blk darwin.__ulock_wait(flags, addr, expect, timeout_us);
    209             }
    210         };
    211 
    212         if (status >= 0) return;
    213         switch (@intToEnum(std.os.E, -status)) {
    214             .INTR => {},
    215             // Address of the futex is paged out. This is unlikely, but possible in theory, and
    216             // pthread/libdispatch on darwin bother to handle it. In this case we'll return
    217             // without waiting, but the caller should retry anyway.
    218             .FAULT => {},
    219             .TIMEDOUT => if (!timeout_overflowed) return error.TimedOut,
    220             else => unreachable,
    221         }
    222     }
    223 
    224     fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
    225         var flags: u32 = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO;
    226         if (num_waiters > 1) {
    227             flags |= darwin.ULF_WAKE_ALL;
    228         }
    229 
    230         while (true) {
    231             const addr = @ptrCast(*const c_void, ptr);
    232             const status = darwin.__ulock_wake(flags, addr, 0);
    233 
    234             if (status >= 0) return;
    235             switch (@intToEnum(std.os.E, -status)) {
    236                 .INTR => continue, // spurious wake()
    237                 .FAULT => continue, // address of the lock was paged out
    238                 .NOENT => return, // nothing was woken up
    239                 .ALREADY => unreachable, // only for ULF_WAKE_THREAD
    240                 else => unreachable,
    241             }
    242         }
    243     }
    244 };
    245 
    246 const PosixFutex = struct {
    247     fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
    248         const address = @ptrToInt(ptr);
    249         const bucket = Bucket.from(address);
    250         var waiter: List.Node = undefined;
    251 
    252         {
    253             assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
    254             defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
    255 
    256             if (ptr.load(.SeqCst) != expect) {
    257                 return;
    258             }
    259 
    260             waiter.data = .{ .address = address };
    261             bucket.list.prepend(&waiter);
    262         }
    263 
    264         var timed_out = false;
    265         waiter.data.wait(timeout) catch {
    266             defer if (!timed_out) {
    267                 waiter.data.wait(null) catch unreachable;
    268             };
    269 
    270             assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
    271             defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
    272 
    273             if (waiter.data.address == address) {
    274                 timed_out = true;
    275                 bucket.list.remove(&waiter);
    276             }
    277         };
    278 
    279         waiter.data.deinit();
    280         if (timed_out) {
    281             return error.TimedOut;
    282         }
    283     }
    284 
    285     fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
    286         const address = @ptrToInt(ptr);
    287         const bucket = Bucket.from(address);
    288         var can_notify = num_waiters;
    289 
    290         var notified = List{};
    291         defer while (notified.popFirst()) |waiter| {
    292             waiter.data.notify();
    293         };
    294 
    295         assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
    296         defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
    297 
    298         var waiters = bucket.list.first;
    299         while (waiters) |waiter| {
    300             assert(waiter.data.address != null);
    301             waiters = waiter.next;
    302 
    303             if (waiter.data.address != address) continue;
    304             if (can_notify == 0) break;
    305             can_notify -= 1;
    306 
    307             bucket.list.remove(waiter);
    308             waiter.data.address = null;
    309             notified.prepend(waiter);
    310         }
    311     }
    312 
    313     const Bucket = struct {
    314         mutex: std.c.pthread_mutex_t = .{},
    315         list: List = .{},
    316 
    317         var buckets = [_]Bucket{.{}} ** 64;
    318 
    319         fn from(address: usize) *Bucket {
    320             return &buckets[address % buckets.len];
    321         }
    322     };
    323 
    324     const List = std.TailQueue(struct {
    325         address: ?usize,
    326         state: State = .empty,
    327         cond: std.c.pthread_cond_t = .{},
    328         mutex: std.c.pthread_mutex_t = .{},
    329 
    330         const Self = @This();
    331         const State = enum {
    332             empty,
    333             waiting,
    334             notified,
    335         };
    336 
    337         fn deinit(self: *Self) void {
    338             _ = std.c.pthread_cond_destroy(&self.cond);
    339             _ = std.c.pthread_mutex_destroy(&self.mutex);
    340         }
    341 
    342         fn wait(self: *Self, timeout: ?u64) error{TimedOut}!void {
    343             assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
    344             defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
    345 
    346             switch (self.state) {
    347                 .empty => self.state = .waiting,
    348                 .waiting => unreachable,
    349                 .notified => return,
    350             }
    351 
    352             var ts: std.os.timespec = undefined;
    353             var ts_ptr: ?*const std.os.timespec = null;
    354             if (timeout) |timeout_ns| {
    355                 ts_ptr = &ts;
    356                 std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable;
    357                 ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
    358                 ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
    359                 if (ts.tv_nsec >= std.time.ns_per_s) {
    360                     ts.tv_sec += 1;
    361                     ts.tv_nsec -= std.time.ns_per_s;
    362                 }
    363             }
    364 
    365             while (true) {
    366                 switch (self.state) {
    367                     .empty => unreachable,
    368                     .waiting => {},
    369                     .notified => return,
    370                 }
    371 
    372                 const ts_ref = ts_ptr orelse {
    373                     assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == .SUCCESS);
    374                     continue;
    375                 };
    376 
    377                 const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ref);
    378                 switch (rc) {
    379                     .SUCCESS => {},
    380                     .TIMEDOUT => {
    381                         self.state = .empty;
    382                         return error.TimedOut;
    383                     },
    384                     else => unreachable,
    385                 }
    386             }
    387         }
    388 
    389         fn notify(self: *Self) void {
    390             assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
    391             defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
    392 
    393             switch (self.state) {
    394                 .empty => self.state = .notified,
    395                 .waiting => {
    396                     self.state = .notified;
    397                     assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS);
    398                 },
    399                 .notified => unreachable,
    400             }
    401         }
    402     });
    403 };
    404 
    405 test "Futex - wait/wake" {
    406     var value = Atomic(u32).init(0);
    407     Futex.wait(&value, 1, null) catch unreachable;
    408 
    409     const wait_noop_result = Futex.wait(&value, 0, 0);
    410     try testing.expectError(error.TimedOut, wait_noop_result);
    411 
    412     const wait_longer_result = Futex.wait(&value, 0, std.time.ns_per_ms);
    413     try testing.expectError(error.TimedOut, wait_longer_result);
    414 
    415     Futex.wake(&value, 0);
    416     Futex.wake(&value, 1);
    417     Futex.wake(&value, std.math.maxInt(u32));
    418 }
    419 
    420 test "Futex - Signal" {
    421     if (single_threaded) {
    422         return error.SkipZigTest;
    423     }
    424 
    425     const Paddle = struct {
    426         value: Atomic(u32) = Atomic(u32).init(0),
    427         current: u32 = 0,
    428 
    429         fn run(self: *@This(), hit_to: *@This()) !void {
    430             var iterations: usize = 4;
    431             while (iterations > 0) : (iterations -= 1) {
    432                 var value: u32 = undefined;
    433                 while (true) {
    434                     value = self.value.load(.Acquire);
    435                     if (value != self.current) break;
    436                     Futex.wait(&self.value, self.current, null) catch unreachable;
    437                 }
    438 
    439                 try testing.expectEqual(value, self.current + 1);
    440                 self.current = value;
    441 
    442                 _ = hit_to.value.fetchAdd(1, .Release);
    443                 Futex.wake(&hit_to.value, 1);
    444             }
    445         }
    446     };
    447 
    448     var ping = Paddle{};
    449     var pong = Paddle{};
    450 
    451     const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong });
    452     defer t1.join();
    453 
    454     const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping });
    455     defer t2.join();
    456 
    457     _ = ping.value.fetchAdd(1, .Release);
    458     Futex.wake(&ping.value, 1);
    459 }
    460 
    461 test "Futex - Broadcast" {
    462     if (single_threaded) {
    463         return error.SkipZigTest;
    464     }
    465 
    466     const Context = struct {
    467         threads: [4]std.Thread = undefined,
    468         broadcast: Atomic(u32) = Atomic(u32).init(0),
    469         notified: Atomic(usize) = Atomic(usize).init(0),
    470 
    471         const BROADCAST_EMPTY = 0;
    472         const BROADCAST_SENT = 1;
    473         const BROADCAST_RECEIVED = 2;
    474 
    475         fn runSender(self: *@This()) !void {
    476             self.broadcast.store(BROADCAST_SENT, .Monotonic);
    477             Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
    478 
    479             while (true) {
    480                 const broadcast = self.broadcast.load(.Acquire);
    481                 if (broadcast == BROADCAST_RECEIVED) break;
    482                 try testing.expectEqual(broadcast, BROADCAST_SENT);
    483                 Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
    484             }
    485         }
    486 
    487         fn runReceiver(self: *@This()) void {
    488             while (true) {
    489                 const broadcast = self.broadcast.load(.Acquire);
    490                 if (broadcast == BROADCAST_SENT) break;
    491                 assert(broadcast == BROADCAST_EMPTY);
    492                 Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
    493             }
    494 
    495             const notified = self.notified.fetchAdd(1, .Monotonic);
    496             if (notified + 1 == self.threads.len) {
    497                 self.broadcast.store(BROADCAST_RECEIVED, .Release);
    498                 Futex.wake(&self.broadcast, 1);
    499             }
    500         }
    501     };
    502 
    503     var ctx = Context{};
    504     for (ctx.threads) |*thread|
    505         thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx});
    506     defer for (ctx.threads) |thread|
    507         thread.join();
    508 
    509     // Try to wait for the threads to start before running runSender().
    510     // NOTE: not actually needed for correctness.
    511     std.time.sleep(16 * std.time.ns_per_ms);
    512     try ctx.runSender();
    513 
    514     const notified = ctx.notified.load(.Monotonic);
    515     try testing.expectEqual(notified, ctx.threads.len);
    516 }
    517 
    518 test "Futex - Chain" {
    519     if (single_threaded) {
    520         return error.SkipZigTest;
    521     }
    522 
    523     const Signal = struct {
    524         value: Atomic(u32) = Atomic(u32).init(0),
    525 
    526         fn wait(self: *@This()) void {
    527             while (true) {
    528                 const value = self.value.load(.Acquire);
    529                 if (value == 1) break;
    530                 assert(value == 0);
    531                 Futex.wait(&self.value, 0, null) catch unreachable;
    532             }
    533         }
    534 
    535         fn notify(self: *@This()) void {
    536             assert(self.value.load(.Unordered) == 0);
    537             self.value.store(1, .Release);
    538             Futex.wake(&self.value, 1);
    539         }
    540     };
    541 
    542     const Context = struct {
    543         completed: Signal = .{},
    544         threads: [4]struct {
    545             thread: std.Thread,
    546             signal: Signal,
    547         } = undefined,
    548 
    549         fn run(self: *@This(), index: usize) void {
    550             const this_signal = &self.threads[index].signal;
    551 
    552             var next_signal = &self.completed;
    553             if (index + 1 < self.threads.len) {
    554                 next_signal = &self.threads[index + 1].signal;
    555             }
    556 
    557             this_signal.wait();
    558             next_signal.notify();
    559         }
    560     };
    561 
    562     var ctx = Context{};
    563     for (ctx.threads) |*entry, index| {
    564         entry.signal = .{};
    565         entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index });
    566     }
    567 
    568     ctx.threads[0].signal.notify();
    569     ctx.completed.wait();
    570 
    571     for (ctx.threads) |entry| {
    572         entry.thread.join();
    573     }
    574 }