Skip to content

Commit 13c9618

Browse files
zaharidichevcarllerche
authored andcommitted
tokio-timer: Fix multi reset DelayQueue bug (#871)
Fixes #868
1 parent 61d4aa9 commit 13c9618

File tree

2 files changed

+110
-22
lines changed

2 files changed

+110
-22
lines changed

tokio-timer/src/delay_queue.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ impl<T> DelayQueue<T> {
342342
let should_set_delay = if let Some(ref delay) = self.delay {
343343
let current_exp = self.normalize_deadline(delay.deadline());
344344
current_exp > when
345-
} else { false };
345+
} else { true };
346346

347347
if should_set_delay {
348348
self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when)));
@@ -507,24 +507,18 @@ impl<T> DelayQueue<T> {
507507
// Normalize the deadline. Values cannot be set to expire in the past.
508508
let when = self.normalize_deadline(when);
509509

510-
// This is needed only for the debug assertion inside the if-let.
511-
let old = self.start + Duration::from_millis(self.slab[key.index].when);
512-
513510
self.slab[key.index].when = when;
511+
self.insert_idx(when, key.index);
514512

515-
if let Some(ref mut delay) = self.delay {
516-
debug_assert!(old >= delay.deadline());
517-
518-
let start = self.start;
519-
let next_poll = self.wheel.poll_at()
520-
.map(move |t| start + Duration::from_millis(t));
521-
522-
if next_poll != Some(delay.deadline()) {
523-
delay.reset(self.start + Duration::from_millis(when));
524-
}
513+
let next_deadline = self.next_deadline();
514+
if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
515+
delay.reset(deadline);
525516
}
517+
}
526518

527-
self.insert_idx(when, key.index);
519+
/// Returns the next time poll as determined by the wheel
520+
fn next_deadline(&mut self) -> Option<Instant> {
521+
self.wheel.poll_at().map(|poll_at| self.start + Duration::from_millis(poll_at))
528522
}
529523

530524
/// Sets the delay of the item associated with `key` to expire after
@@ -694,14 +688,12 @@ impl<T> DelayQueue<T> {
694688
return Ok(Some(idx).into());
695689
}
696690

697-
let deadline = match self.wheel.poll_at() {
698-
Some(poll_at) => {
699-
self.start + Duration::from_millis(poll_at)
700-
}
701-
None => return Ok(None.into()),
702-
};
691+
if let Some(deadline) = self.next_deadline() {
692+
self.delay = Some(self.handle.delay(deadline));
693+
} else {
694+
return Ok(None.into())
695+
}
703696

704-
self.delay = Some(self.handle.delay(deadline));
705697
}
706698
}
707699

tokio-timer/tests/queue.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,99 @@ fn expires_before_last_insert() {
311311

312312
})
313313
}
314+
315+
#[test]
316+
fn multi_reset() {
317+
mocked(|_, time| {
318+
let mut queue = DelayQueue::new();
319+
let mut task = MockTask::new();
320+
321+
let epoch = time.now();
322+
323+
let foo = queue.insert_at("foo", epoch + ms(200));
324+
let bar = queue.insert_at("bar", epoch + ms(250));
325+
326+
task.enter(|| {
327+
assert_not_ready!(queue);
328+
});
329+
330+
queue.reset_at(&foo, epoch + ms(300));
331+
queue.reset_at(&bar, epoch + ms(350));
332+
queue.reset_at(&foo, epoch + ms(400));
333+
})
334+
}
335+
336+
#[test]
337+
fn expire_first_key_when_reset_to_expire_earlier() {
338+
mocked(|timer, time| {
339+
let mut queue = DelayQueue::new();
340+
let mut task = MockTask::new();
341+
342+
let epoch = time.now();
343+
344+
let foo = queue.insert_at("foo", epoch + ms(200));
345+
queue.insert_at("bar", epoch + ms(250));
346+
347+
task.enter(|| {
348+
assert_not_ready!(queue);
349+
});
350+
351+
queue.reset_at(&foo, epoch + ms(100));
352+
353+
advance(timer, ms(100));
354+
355+
assert!(task.is_notified());
356+
let entry = assert_ready!(queue).unwrap().into_inner();
357+
assert_eq!(entry, "foo");
358+
})
359+
}
360+
361+
#[test]
362+
fn expire_second_key_when_reset_to_expire_earlier() {
363+
mocked(|timer, time| {
364+
let mut queue = DelayQueue::new();
365+
let mut task = MockTask::new();
366+
367+
let epoch = time.now();
368+
369+
queue.insert_at("foo", epoch + ms(200));
370+
let bar = queue.insert_at("bar", epoch + ms(250));
371+
372+
task.enter(|| {
373+
assert_not_ready!(queue);
374+
});
375+
376+
queue.reset_at(&bar, epoch + ms(100));
377+
378+
advance(timer, ms(100));
379+
380+
assert!(task.is_notified());
381+
let entry = assert_ready!(queue).unwrap().into_inner();
382+
assert_eq!(entry, "bar");
383+
})
384+
}
385+
386+
387+
#[test]
388+
fn reset_first_expiring_item_to_expire_later() {
389+
mocked(|timer, time| {
390+
let mut queue = DelayQueue::new();
391+
let mut task = MockTask::new();
392+
393+
let epoch = time.now();
394+
395+
let foo = queue.insert_at("foo", epoch + ms(200));
396+
let bar = queue.insert_at("bar", epoch + ms(250));
397+
398+
task.enter(|| {
399+
assert_not_ready!(queue);
400+
});
401+
402+
queue.reset_at(&foo, epoch + ms(300));
403+
advance(timer, ms(250));
404+
405+
assert!(task.is_notified());
406+
let entry = assert_ready!(queue).unwrap().into_inner();
407+
assert_eq!(entry, "bar");
408+
})
409+
}

0 commit comments

Comments
 (0)