blob 0ecfdc02 (20130B) - Raw
1 //! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints. 2 //! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value. 3 //! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`. 4 //! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals. 5 6 const std = @import("../std.zig"); 7 const builtin = @import("builtin"); 8 const Futex = @This(); 9 10 const target = builtin.target; 11 const single_threaded = builtin.single_threaded; 12 13 const assert = std.debug.assert; 14 const testing = std.testing; 15 16 const Atomic = std.atomic.Atomic; 17 const spinLoopHint = std.atomic.spinLoopHint; 18 19 /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: 20 /// - The value at `ptr` is no longer equal to `expect`. 21 /// - The caller is unblocked by a matching `wake()`. 22 /// - The caller is unblocked spuriously by an arbitrary internal signal. 23 /// 24 /// If `timeout` is provided, and the caller is blocked for longer than `timeout` nanoseconds`, `error.TimedOut` is returned. 25 /// 26 /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically 27 /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. 28 pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 29 if (single_threaded) { 30 // check whether the caller should block 31 if (ptr.loadUnchecked() != expect) { 32 return; 33 } 34 35 // There are no other threads which could notify the caller on single_threaded. 36 // Therefor a wait() without a timeout would block indefinitely. 37 const timeout_ns = timeout orelse { 38 @panic("deadlock"); 39 }; 40 41 // Simulate blocking with the timeout knowing that: 42 // - no other thread can change the ptr value 43 // - no other thread could unblock us if we waiting on the ptr 44 std.time.sleep(timeout_ns); 45 return error.TimedOut; 46 } 47 48 // Avoid calling into the OS for no-op waits() 49 if (timeout) |timeout_ns| { 50 if (timeout_ns == 0) { 51 if (ptr.load(.SeqCst) != expect) return; 52 return error.TimedOut; 53 } 54 } 55 56 return OsFutex.wait(ptr, expect, timeout); 57 } 58 59 /// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. 60 /// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. 61 pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 62 if (single_threaded) return; 63 if (num_waiters == 0) return; 64 65 return OsFutex.wake(ptr, num_waiters); 66 } 67 68 const OsFutex = if (target.os.tag == .windows) 69 WindowsFutex 70 else if (target.os.tag == .linux) 71 LinuxFutex 72 else if (target.isDarwin()) 73 DarwinFutex 74 else if (builtin.link_libc) 75 PosixFutex 76 else 77 UnsupportedFutex; 78 79 const UnsupportedFutex = struct { 80 fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 81 return unsupported(.{ ptr, expect, timeout }); 82 } 83 84 fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 85 return unsupported(.{ ptr, num_waiters }); 86 } 87 88 fn unsupported(unused: anytype) noreturn { 89 @compileLog("Unsupported operating system", target.os.tag); 90 _ = unused; 91 unreachable; 92 } 93 }; 94 95 const WindowsFutex = struct { 96 const windows = std.os.windows; 97 98 fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 99 var timeout_value: windows.LARGE_INTEGER = undefined; 100 var timeout_ptr: ?*const windows.LARGE_INTEGER = null; 101 102 // NTDLL functions work with time in units of 100 nanoseconds. 103 // Positive values for timeouts are absolute time while negative is relative. 104 if (timeout) |timeout_ns| { 105 timeout_ptr = &timeout_value; 106 timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); 107 } 108 109 switch (windows.ntdll.RtlWaitOnAddress( 110 @ptrCast(?*const c_void, ptr), 111 @ptrCast(?*const c_void, &expect), 112 @sizeOf(@TypeOf(expect)), 113 timeout_ptr, 114 )) { 115 .SUCCESS => {}, 116 .TIMEOUT => return error.TimedOut, 117 else => unreachable, 118 } 119 } 120 121 fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 122 const address = @ptrCast(?*const c_void, ptr); 123 switch (num_waiters) { 124 1 => windows.ntdll.RtlWakeAddressSingle(address), 125 else => windows.ntdll.RtlWakeAddressAll(address), 126 } 127 } 128 }; 129 130 const LinuxFutex = struct { 131 const linux = std.os.linux; 132 133 fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 134 var ts: std.os.timespec = undefined; 135 var ts_ptr: ?*std.os.timespec = null; 136 137 // Futex timespec timeout is already in relative time. 138 if (timeout) |timeout_ns| { 139 ts_ptr = &ts; 140 ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); 141 ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); 142 } 143 144 switch (linux.getErrno(linux.futex_wait( 145 @ptrCast(*const i32, ptr), 146 linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, 147 @bitCast(i32, expect), 148 ts_ptr, 149 ))) { 150 .SUCCESS => {}, // notified by `wake()` 151 .INTR => {}, // spurious wakeup 152 .AGAIN => {}, // ptr.* != expect 153 .TIMEDOUT => return error.TimedOut, 154 .INVAL => {}, // possibly timeout overflow 155 .FAULT => unreachable, 156 else => unreachable, 157 } 158 } 159 160 fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 161 switch (linux.getErrno(linux.futex_wake( 162 @ptrCast(*const i32, ptr), 163 linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, 164 std.math.cast(i32, num_waiters) catch std.math.maxInt(i32), 165 ))) { 166 .SUCCESS => {}, // successful wake up 167 .INVAL => {}, // invalid futex_wait() on ptr done elsewhere 168 .FAULT => {}, // pointer became invalid while doing the wake 169 else => unreachable, 170 } 171 } 172 }; 173 174 const DarwinFutex = struct { 175 const darwin = std.os.darwin; 176 177 fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 178 // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it: 179 // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 180 // 181 // This XNU version appears to correspond to 11.0.1: 182 // https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html 183 // 184 // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout 185 // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) 186 var timeout_ns: u64 = 0; 187 if (timeout) |timeout_value| { 188 // This should be checked by the caller. 189 assert(timeout_value != 0); 190 timeout_ns = timeout_value; 191 } 192 const addr = @ptrCast(*const c_void, ptr); 193 const flags = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; 194 // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of 195 // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users 196 // should handle spurious wakeups), but we need to remember that we did so, so that 197 // we don't return `TimedOut` incorrectly. If that happens, we set this variable to 198 // true so that we we know to ignore the ETIMEDOUT result. 199 var timeout_overflowed = false; 200 const status = blk: { 201 if (target.os.version_range.semver.min.major >= 11) { 202 break :blk darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0); 203 } else { 204 const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) catch overflow: { 205 timeout_overflowed = true; 206 break :overflow std.math.maxInt(u32); 207 }; 208 break :blk darwin.__ulock_wait(flags, addr, expect, timeout_us); 209 } 210 }; 211 212 if (status >= 0) return; 213 switch (@intToEnum(std.os.E, -status)) { 214 .INTR => {}, 215 // Address of the futex is paged out. This is unlikely, but possible in theory, and 216 // pthread/libdispatch on darwin bother to handle it. In this case we'll return 217 // without waiting, but the caller should retry anyway. 218 .FAULT => {}, 219 .TIMEDOUT => if (!timeout_overflowed) return error.TimedOut, 220 else => unreachable, 221 } 222 } 223 224 fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 225 var flags: u32 = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; 226 if (num_waiters > 1) { 227 flags |= darwin.ULF_WAKE_ALL; 228 } 229 230 while (true) { 231 const addr = @ptrCast(*const c_void, ptr); 232 const status = darwin.__ulock_wake(flags, addr, 0); 233 234 if (status >= 0) return; 235 switch (@intToEnum(std.os.E, -status)) { 236 .INTR => continue, // spurious wake() 237 .FAULT => continue, // address of the lock was paged out 238 .NOENT => return, // nothing was woken up 239 .ALREADY => unreachable, // only for ULF_WAKE_THREAD 240 else => unreachable, 241 } 242 } 243 } 244 }; 245 246 const PosixFutex = struct { 247 fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { 248 const address = @ptrToInt(ptr); 249 const bucket = Bucket.from(address); 250 var waiter: List.Node = undefined; 251 252 { 253 assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); 254 defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); 255 256 if (ptr.load(.SeqCst) != expect) { 257 return; 258 } 259 260 waiter.data = .{ .address = address }; 261 bucket.list.prepend(&waiter); 262 } 263 264 var timed_out = false; 265 waiter.data.wait(timeout) catch { 266 defer if (!timed_out) { 267 waiter.data.wait(null) catch unreachable; 268 }; 269 270 assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); 271 defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); 272 273 if (waiter.data.address == address) { 274 timed_out = true; 275 bucket.list.remove(&waiter); 276 } 277 }; 278 279 waiter.data.deinit(); 280 if (timed_out) { 281 return error.TimedOut; 282 } 283 } 284 285 fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { 286 const address = @ptrToInt(ptr); 287 const bucket = Bucket.from(address); 288 var can_notify = num_waiters; 289 290 var notified = List{}; 291 defer while (notified.popFirst()) |waiter| { 292 waiter.data.notify(); 293 }; 294 295 assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); 296 defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); 297 298 var waiters = bucket.list.first; 299 while (waiters) |waiter| { 300 assert(waiter.data.address != null); 301 waiters = waiter.next; 302 303 if (waiter.data.address != address) continue; 304 if (can_notify == 0) break; 305 can_notify -= 1; 306 307 bucket.list.remove(waiter); 308 waiter.data.address = null; 309 notified.prepend(waiter); 310 } 311 } 312 313 const Bucket = struct { 314 mutex: std.c.pthread_mutex_t = .{}, 315 list: List = .{}, 316 317 var buckets = [_]Bucket{.{}} ** 64; 318 319 fn from(address: usize) *Bucket { 320 return &buckets[address % buckets.len]; 321 } 322 }; 323 324 const List = std.TailQueue(struct { 325 address: ?usize, 326 state: State = .empty, 327 cond: std.c.pthread_cond_t = .{}, 328 mutex: std.c.pthread_mutex_t = .{}, 329 330 const Self = @This(); 331 const State = enum { 332 empty, 333 waiting, 334 notified, 335 }; 336 337 fn deinit(self: *Self) void { 338 _ = std.c.pthread_cond_destroy(&self.cond); 339 _ = std.c.pthread_mutex_destroy(&self.mutex); 340 } 341 342 fn wait(self: *Self, timeout: ?u64) error{TimedOut}!void { 343 assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); 344 defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); 345 346 switch (self.state) { 347 .empty => self.state = .waiting, 348 .waiting => unreachable, 349 .notified => return, 350 } 351 352 var ts: std.os.timespec = undefined; 353 var ts_ptr: ?*const std.os.timespec = null; 354 if (timeout) |timeout_ns| { 355 ts_ptr = &ts; 356 std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable; 357 ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); 358 ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); 359 if (ts.tv_nsec >= std.time.ns_per_s) { 360 ts.tv_sec += 1; 361 ts.tv_nsec -= std.time.ns_per_s; 362 } 363 } 364 365 while (true) { 366 switch (self.state) { 367 .empty => unreachable, 368 .waiting => {}, 369 .notified => return, 370 } 371 372 const ts_ref = ts_ptr orelse { 373 assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == .SUCCESS); 374 continue; 375 }; 376 377 const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ref); 378 switch (rc) { 379 .SUCCESS => {}, 380 .TIMEDOUT => { 381 self.state = .empty; 382 return error.TimedOut; 383 }, 384 else => unreachable, 385 } 386 } 387 } 388 389 fn notify(self: *Self) void { 390 assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); 391 defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); 392 393 switch (self.state) { 394 .empty => self.state = .notified, 395 .waiting => { 396 self.state = .notified; 397 assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS); 398 }, 399 .notified => unreachable, 400 } 401 } 402 }); 403 }; 404 405 test "Futex - wait/wake" { 406 var value = Atomic(u32).init(0); 407 Futex.wait(&value, 1, null) catch unreachable; 408 409 const wait_noop_result = Futex.wait(&value, 0, 0); 410 try testing.expectError(error.TimedOut, wait_noop_result); 411 412 const wait_longer_result = Futex.wait(&value, 0, std.time.ns_per_ms); 413 try testing.expectError(error.TimedOut, wait_longer_result); 414 415 Futex.wake(&value, 0); 416 Futex.wake(&value, 1); 417 Futex.wake(&value, std.math.maxInt(u32)); 418 } 419 420 test "Futex - Signal" { 421 if (single_threaded) { 422 return error.SkipZigTest; 423 } 424 425 const Paddle = struct { 426 value: Atomic(u32) = Atomic(u32).init(0), 427 current: u32 = 0, 428 429 fn run(self: *@This(), hit_to: *@This()) !void { 430 var iterations: usize = 4; 431 while (iterations > 0) : (iterations -= 1) { 432 var value: u32 = undefined; 433 while (true) { 434 value = self.value.load(.Acquire); 435 if (value != self.current) break; 436 Futex.wait(&self.value, self.current, null) catch unreachable; 437 } 438 439 try testing.expectEqual(value, self.current + 1); 440 self.current = value; 441 442 _ = hit_to.value.fetchAdd(1, .Release); 443 Futex.wake(&hit_to.value, 1); 444 } 445 } 446 }; 447 448 var ping = Paddle{}; 449 var pong = Paddle{}; 450 451 const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong }); 452 defer t1.join(); 453 454 const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping }); 455 defer t2.join(); 456 457 _ = ping.value.fetchAdd(1, .Release); 458 Futex.wake(&ping.value, 1); 459 } 460 461 test "Futex - Broadcast" { 462 if (single_threaded) { 463 return error.SkipZigTest; 464 } 465 466 const Context = struct { 467 threads: [4]std.Thread = undefined, 468 broadcast: Atomic(u32) = Atomic(u32).init(0), 469 notified: Atomic(usize) = Atomic(usize).init(0), 470 471 const BROADCAST_EMPTY = 0; 472 const BROADCAST_SENT = 1; 473 const BROADCAST_RECEIVED = 2; 474 475 fn runSender(self: *@This()) !void { 476 self.broadcast.store(BROADCAST_SENT, .Monotonic); 477 Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); 478 479 while (true) { 480 const broadcast = self.broadcast.load(.Acquire); 481 if (broadcast == BROADCAST_RECEIVED) break; 482 try testing.expectEqual(broadcast, BROADCAST_SENT); 483 Futex.wait(&self.broadcast, broadcast, null) catch unreachable; 484 } 485 } 486 487 fn runReceiver(self: *@This()) void { 488 while (true) { 489 const broadcast = self.broadcast.load(.Acquire); 490 if (broadcast == BROADCAST_SENT) break; 491 assert(broadcast == BROADCAST_EMPTY); 492 Futex.wait(&self.broadcast, broadcast, null) catch unreachable; 493 } 494 495 const notified = self.notified.fetchAdd(1, .Monotonic); 496 if (notified + 1 == self.threads.len) { 497 self.broadcast.store(BROADCAST_RECEIVED, .Release); 498 Futex.wake(&self.broadcast, 1); 499 } 500 } 501 }; 502 503 var ctx = Context{}; 504 for (ctx.threads) |*thread| 505 thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); 506 defer for (ctx.threads) |thread| 507 thread.join(); 508 509 // Try to wait for the threads to start before running runSender(). 510 // NOTE: not actually needed for correctness. 511 std.time.sleep(16 * std.time.ns_per_ms); 512 try ctx.runSender(); 513 514 const notified = ctx.notified.load(.Monotonic); 515 try testing.expectEqual(notified, ctx.threads.len); 516 } 517 518 test "Futex - Chain" { 519 if (single_threaded) { 520 return error.SkipZigTest; 521 } 522 523 const Signal = struct { 524 value: Atomic(u32) = Atomic(u32).init(0), 525 526 fn wait(self: *@This()) void { 527 while (true) { 528 const value = self.value.load(.Acquire); 529 if (value == 1) break; 530 assert(value == 0); 531 Futex.wait(&self.value, 0, null) catch unreachable; 532 } 533 } 534 535 fn notify(self: *@This()) void { 536 assert(self.value.load(.Unordered) == 0); 537 self.value.store(1, .Release); 538 Futex.wake(&self.value, 1); 539 } 540 }; 541 542 const Context = struct { 543 completed: Signal = .{}, 544 threads: [4]struct { 545 thread: std.Thread, 546 signal: Signal, 547 } = undefined, 548 549 fn run(self: *@This(), index: usize) void { 550 const this_signal = &self.threads[index].signal; 551 552 var next_signal = &self.completed; 553 if (index + 1 < self.threads.len) { 554 next_signal = &self.threads[index + 1].signal; 555 } 556 557 this_signal.wait(); 558 next_signal.notify(); 559 } 560 }; 561 562 var ctx = Context{}; 563 for (ctx.threads) |*entry, index| { 564 entry.signal = .{}; 565 entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index }); 566 } 567 568 ctx.threads[0].signal.notify(); 569 ctx.completed.wait(); 570 571 for (ctx.threads) |entry| { 572 entry.thread.join(); 573 } 574 }