zig

fork of https://codeberg.org/ziglang/zig
Log | Files | Refs | README | LICENSE

blob 45bb7759 (3925B) - Raw


      1 const std = @import("../std.zig");
      2 const assert = std.debug.assert;
      3 const testing = std.testing;
      4 const builtin = @import("builtin");
      5 const Lock = std.event.Lock;
      6 const Loop = std.event.Loop;
      7 
      8 /// This is a value that starts out unavailable, until resolve() is called
      9 /// While it is unavailable, functions suspend when they try to get() it,
     10 /// and then are resumed when resolve() is called.
     11 /// At this point the value remains forever available, and another resolve() is not allowed.
     12 pub fn Future(comptime T: type) type {
     13     return struct {
     14         lock: Lock,
     15         data: T,
     16 
     17         /// TODO make this an enum
     18         /// 0 - not started
     19         /// 1 - started
     20         /// 2 - finished
     21         available: u8,
     22 
     23         const Self = @This();
     24         const Queue = std.atomic.Queue(anyframe);
     25 
     26         pub fn init(loop: *Loop) Self {
     27             return Self{
     28                 .lock = Lock.initLocked(loop),
     29                 .available = 0,
     30                 .data = undefined,
     31             };
     32         }
     33 
     34         /// Obtain the value. If it's not available, wait until it becomes
     35         /// available.
     36         /// Thread-safe.
     37         pub async fn get(self: *Self) *T {
     38             if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
     39                 return &self.data;
     40             }
     41             const held = self.lock.acquire();
     42             held.release();
     43 
     44             return &self.data;
     45         }
     46 
     47         /// Gets the data without waiting for it. If it's available, a pointer is
     48         /// returned. Otherwise, null is returned.
     49         pub fn getOrNull(self: *Self) ?*T {
     50             if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
     51                 return &self.data;
     52             } else {
     53                 return null;
     54             }
     55         }
     56 
     57         /// If someone else has started working on the data, wait for them to complete
     58         /// and return a pointer to the data. Otherwise, return null, and the caller
     59         /// should start working on the data.
     60         /// It's not required to call start() before resolve() but it can be useful since
     61         /// this method is thread-safe.
     62         pub async fn start(self: *Self) ?*T {
     63             const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null;
     64             switch (state) {
     65                 1 => {
     66                     const held = self.lock.acquire();
     67                     held.release();
     68                     return &self.data;
     69                 },
     70                 2 => return &self.data,
     71                 else => unreachable,
     72             }
     73         }
     74 
     75         /// Make the data become available. May be called only once.
     76         /// Before calling this, modify the `data` property.
     77         pub fn resolve(self: *Self) void {
     78             const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst);
     79             assert(prev == 0 or prev == 1); // resolve() called twice
     80             Lock.Held.release(Lock.Held{ .lock = &self.lock });
     81         }
     82     };
     83 }
     84 
     85 test "std.event.Future" {
     86     // https://github.com/ziglang/zig/issues/1908
     87     if (builtin.single_threaded) return error.SkipZigTest;
     88 
     89     const allocator = std.heap.direct_allocator;
     90 
     91     var loop: Loop = undefined;
     92     try loop.initMultiThreaded(allocator);
     93     defer loop.deinit();
     94 
     95     const handle = async testFuture(&loop);
     96 
     97     loop.run();
     98 }
     99 
    100 async fn testFuture(loop: *Loop) void {
    101     var future = Future(i32).init(loop);
    102 
    103     const a = async waitOnFuture(&future);
    104     const b = async waitOnFuture(&future);
    105     const c = async resolveFuture(&future);
    106 
    107     // TODO make this work:
    108     //const result = (await a) + (await b);
    109     const a_result = await a;
    110     const b_result = await b;
    111     const result = a_result + b_result;
    112 
    113     await c;
    114     testing.expect(result == 12);
    115 }
    116 
    117 async fn waitOnFuture(future: *Future(i32)) i32 {
    118     return future.get().*;
    119 }
    120 
    121 async fn resolveFuture(future: *Future(i32)) void {
    122     future.data = 6;
    123     future.resolve();
    124 }