blob 45bb7759 (3925B) - Raw
1 const std = @import("../std.zig"); 2 const assert = std.debug.assert; 3 const testing = std.testing; 4 const builtin = @import("builtin"); 5 const Lock = std.event.Lock; 6 const Loop = std.event.Loop; 7 8 /// This is a value that starts out unavailable, until resolve() is called 9 /// While it is unavailable, functions suspend when they try to get() it, 10 /// and then are resumed when resolve() is called. 11 /// At this point the value remains forever available, and another resolve() is not allowed. 12 pub fn Future(comptime T: type) type { 13 return struct { 14 lock: Lock, 15 data: T, 16 17 /// TODO make this an enum 18 /// 0 - not started 19 /// 1 - started 20 /// 2 - finished 21 available: u8, 22 23 const Self = @This(); 24 const Queue = std.atomic.Queue(anyframe); 25 26 pub fn init(loop: *Loop) Self { 27 return Self{ 28 .lock = Lock.initLocked(loop), 29 .available = 0, 30 .data = undefined, 31 }; 32 } 33 34 /// Obtain the value. If it's not available, wait until it becomes 35 /// available. 36 /// Thread-safe. 37 pub async fn get(self: *Self) *T { 38 if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { 39 return &self.data; 40 } 41 const held = self.lock.acquire(); 42 held.release(); 43 44 return &self.data; 45 } 46 47 /// Gets the data without waiting for it. If it's available, a pointer is 48 /// returned. Otherwise, null is returned. 49 pub fn getOrNull(self: *Self) ?*T { 50 if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { 51 return &self.data; 52 } else { 53 return null; 54 } 55 } 56 57 /// If someone else has started working on the data, wait for them to complete 58 /// and return a pointer to the data. Otherwise, return null, and the caller 59 /// should start working on the data. 60 /// It's not required to call start() before resolve() but it can be useful since 61 /// this method is thread-safe. 62 pub async fn start(self: *Self) ?*T { 63 const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null; 64 switch (state) { 65 1 => { 66 const held = self.lock.acquire(); 67 held.release(); 68 return &self.data; 69 }, 70 2 => return &self.data, 71 else => unreachable, 72 } 73 } 74 75 /// Make the data become available. May be called only once. 76 /// Before calling this, modify the `data` property. 77 pub fn resolve(self: *Self) void { 78 const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst); 79 assert(prev == 0 or prev == 1); // resolve() called twice 80 Lock.Held.release(Lock.Held{ .lock = &self.lock }); 81 } 82 }; 83 } 84 85 test "std.event.Future" { 86 // https://github.com/ziglang/zig/issues/1908 87 if (builtin.single_threaded) return error.SkipZigTest; 88 89 const allocator = std.heap.direct_allocator; 90 91 var loop: Loop = undefined; 92 try loop.initMultiThreaded(allocator); 93 defer loop.deinit(); 94 95 const handle = async testFuture(&loop); 96 97 loop.run(); 98 } 99 100 async fn testFuture(loop: *Loop) void { 101 var future = Future(i32).init(loop); 102 103 const a = async waitOnFuture(&future); 104 const b = async waitOnFuture(&future); 105 const c = async resolveFuture(&future); 106 107 // TODO make this work: 108 //const result = (await a) + (await b); 109 const a_result = await a; 110 const b_result = await b; 111 const result = a_result + b_result; 112 113 await c; 114 testing.expect(result == 12); 115 } 116 117 async fn waitOnFuture(future: *Future(i32)) i32 { 118 return future.get().*; 119 } 120 121 async fn resolveFuture(future: *Future(i32)) void { 122 future.data = 6; 123 future.resolve(); 124 }