1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
use crate::loom::sync::atomic::AtomicU64;
use crate::sync::AtomicWaker;
use crate::time::driver::{Handle, Inner};
use crate::time::{Duration, Error, Instant};

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::{Arc, Weak};
use std::task::{self, Poll};
use std::u64;

/// Internal state shared between a `Delay` instance and the timer.
///
/// This struct is used as a node in two intrusive data structures:
///
/// * An atomic stack used to signal to the timer thread that the entry state
///   has changed. The timer thread will observe the entry on this stack and
///   perform any actions as necessary.
///
/// * A doubly linked list used **only** by the timer thread. Each slot in the
///   timer wheel is a head pointer to the list of entries that must be
///   processed during that timer tick.
#[derive(Debug)]
pub(crate) struct Entry {
    /// Only accessed from `Registration`.
    time: CachePadded<UnsafeCell<Time>>,

    /// Timer internals. Using a weak pointer allows the timer to shutdown
    /// without all `Delay` instances having completed.
    ///
    /// When empty, it means that the entry has not yet been linked with a
    /// timer instance.
    inner: Weak<Inner>,

    /// Tracks the entry state. This value contains the following information:
    ///
    /// * The deadline at which the entry must be "fired".
    /// * A flag indicating if the entry has already been fired.
    /// * Whether or not the entry transitioned to the error state.
    ///
    /// When an `Entry` is created, `state` is initialized to the instant at
    /// which the entry must be fired. When a timer is reset to a different
    /// instant, this value is changed.
    state: AtomicU64,

    /// Stores the actual error. If `state` indicates that an error occurred,
    /// this is guaranteed to be a non-zero value representing the first error
    /// that occurred. Otherwise its value is undefined.
    error: AtomicU8,

    /// Task to notify once the deadline is reached.
    waker: AtomicWaker,

    /// True when the entry is queued in the "process" stack. This value
    /// is set before pushing the value and unset after popping the value.
    ///
    /// TODO: This could possibly be rolled up into `state`.
    pub(super) queued: AtomicBool,

    /// Next entry in the "process" linked list.
    ///
    /// Access to this field is coordinated by the `queued` flag.
    ///
    /// Represents a strong Arc ref.
    pub(super) next_atomic: UnsafeCell<*mut Entry>,

    /// When the entry expires, relative to the `start` of the timer
    /// (Inner::start). This is only used by the timer.
    ///
    /// A `Delay` instance can be reset to a different deadline by the thread
    /// that owns the `Delay` instance. In this case, the timer thread will not
    /// immediately know that this has happened. The timer thread must know the
    /// last deadline that it saw as it uses this value to locate the entry in
    /// its wheel.
    ///
    /// Once the timer thread observes that the instant has changed, it updates
    /// the wheel and sets this value. The idea is that this value eventually
    /// converges to the value of `state` as the timer thread makes updates.
    when: UnsafeCell<Option<u64>>,

    /// Next entry in the State's linked list.
    ///
    /// This is only accessed by the timer
    pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,

    /// Previous entry in the State's linked list.
    ///
    /// This is only accessed by the timer and is used to unlink a canceled
    /// entry.
    ///
    /// This is a weak reference.
    pub(super) prev_stack: UnsafeCell<*const Entry>,
}

/// Stores the info for `Delay`.
#[derive(Debug)]
pub(crate) struct Time {
    pub(crate) deadline: Instant,
    pub(crate) duration: Duration,
}

/// Flag indicating a timer entry has elapsed
const ELAPSED: u64 = 1 << 63;

/// Flag indicating a timer entry has reached an error state
const ERROR: u64 = u64::MAX;

// ===== impl Entry =====

impl Entry {
    pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
        let inner = handle.inner().unwrap();
        let entry: Entry;

        // Increment the number of active timeouts
        if let Err(err) = inner.increment() {
            entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
            entry.error(err);
        } else {
            let when = inner.normalize_deadline(deadline);
            let state = if when <= inner.elapsed() {
                ELAPSED
            } else {
                when
            };
            entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
        }

        let entry = Arc::new(entry);
        if let Err(err) = inner.queue(&entry) {
            entry.error(err);
        }

        entry
    }

    /// Only called by `Registration`
    pub(crate) fn time_ref(&self) -> &Time {
        unsafe { &*self.time.0.get() }
    }

    /// Only called by `Registration`
    #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281
    pub(crate) unsafe fn time_mut(&self) -> &mut Time {
        &mut *self.time.0.get()
    }

    /// The current entry state as known by the timer. This is not the value of
    /// `state`, but lets the timer know how to converge its state to `state`.
    pub(crate) fn when_internal(&self) -> Option<u64> {
        unsafe { *self.when.get() }
    }

    pub(crate) fn set_when_internal(&self, when: Option<u64>) {
        unsafe {
            *self.when.get() = when;
        }
    }

    /// Called by `Timer` to load the current value of `state` for processing
    pub(crate) fn load_state(&self) -> Option<u64> {
        let state = self.state.load(SeqCst);

        if is_elapsed(state) {
            None
        } else {
            Some(state)
        }
    }

    pub(crate) fn is_elapsed(&self) -> bool {
        let state = self.state.load(SeqCst);
        is_elapsed(state)
    }

    pub(crate) fn fire(&self, when: u64) {
        let mut curr = self.state.load(SeqCst);

        loop {
            if is_elapsed(curr) || curr > when {
                return;
            }

            let next = ELAPSED | curr;
            let actual = self.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        self.waker.wake();
    }

    pub(crate) fn error(&self, error: Error) {
        // Record the precise nature of the error, if there isn't already an
        // error present. If we don't actually transition to the error state
        // below, that's fine, as the error details we set here will be ignored.
        self.error.compare_and_swap(0, error.as_u8(), SeqCst);

        // Only transition to the error state if not currently elapsed
        let mut curr = self.state.load(SeqCst);

        loop {
            if is_elapsed(curr) {
                return;
            }

            let next = ERROR;

            let actual = self.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        self.waker.wake();
    }

    pub(crate) fn cancel(entry: &Arc<Entry>) {
        let state = entry.state.fetch_or(ELAPSED, SeqCst);

        if is_elapsed(state) {
            // Nothing more to do
            return;
        }

        // If registered with a timer instance, try to upgrade the Arc.
        let inner = match entry.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        let _ = inner.queue(entry);
    }

    pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
        let mut curr = self.state.load(SeqCst);

        if is_elapsed(curr) {
            return Poll::Ready(if curr == ERROR {
                Err(Error::from_u8(self.error.load(SeqCst)))
            } else {
                Ok(())
            });
        }

        self.waker.register_by_ref(cx.waker());

        curr = self.state.load(SeqCst);

        if is_elapsed(curr) {
            return Poll::Ready(if curr == ERROR {
                Err(Error::from_u8(self.error.load(SeqCst)))
            } else {
                Ok(())
            });
        }

        Poll::Pending
    }

    /// Only called by `Registration`
    pub(crate) fn reset(entry: &mut Arc<Entry>) {
        let inner = match entry.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        let deadline = entry.time_ref().deadline;
        let when = inner.normalize_deadline(deadline);
        let elapsed = inner.elapsed();

        let next = if when <= elapsed { ELAPSED } else { when };

        let mut curr = entry.state.load(SeqCst);

        loop {
            // In these two cases, there is no work to do when resetting the
            // timer. If the `Entry` is in an error state, then it cannot be
            // used anymore. If resetting the entry to the current value, then
            // the reset is a noop.
            if curr == ERROR || curr == when {
                return;
            }

            let actual = entry.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        // If the state has transitioned to 'elapsed' then wake the task as
        // this entry is ready to be polled.
        if !is_elapsed(curr) && is_elapsed(next) {
            entry.waker.wake();
        }

        // The driver tracks all non-elapsed entries; notify the driver that it
        // should update its state for this entry unless the entry had already
        // elapsed and remains elapsed.
        if !is_elapsed(curr) || !is_elapsed(next) {
            let _ = inner.queue(entry);
        }
    }

    fn new2(deadline: Instant, duration: Duration, inner: Weak<Inner>, state: u64) -> Self {
        Self {
            time: CachePadded(UnsafeCell::new(Time { deadline, duration })),
            inner,
            waker: AtomicWaker::new(),
            state: AtomicU64::new(state),
            queued: AtomicBool::new(false),
            error: AtomicU8::new(0),
            next_atomic: UnsafeCell::new(ptr::null_mut()),
            when: UnsafeCell::new(None),
            next_stack: UnsafeCell::new(None),
            prev_stack: UnsafeCell::new(ptr::null_mut()),
        }
    }

    fn upgrade_inner(&self) -> Option<Arc<Inner>> {
        self.inner.upgrade()
    }
}

fn is_elapsed(state: u64) -> bool {
    state & ELAPSED == ELAPSED
}

impl Drop for Entry {
    fn drop(&mut self) {
        let inner = match self.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        inner.decrement();
    }
}

unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}

#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
#[derive(Debug)]
struct CachePadded<T>(T);