diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0fbb967ae1..20ea2113b3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -75,6 +75,7 @@ import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; +import rx.operators.OperationTimeInterval; import rx.operators.OperationTimeout; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; @@ -97,6 +98,7 @@ import rx.util.OnErrorNotImplementedException; import rx.util.Opening; import rx.util.Range; +import rx.util.TimeInterval; import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -4533,6 +4535,30 @@ public Observable timeout(long timeout, TimeUnit timeUnit) { return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation())); } + /** + * Records the time interval between consecutive elements in an observable sequence. + * + * @return An observable sequence with time interval information on elements. + * @see MSDN: Observable.TimeInterval + */ + public Observable> timeInterval() { + return create(OperationTimeInterval.timeInterval(this)); + } + + /** + * Records the time interval between consecutive elements in an observable + * sequence, using the specified scheduler to compute time intervals. + * + * @param scheduler + * Scheduler used to compute time intervals. + * + * @return An observable sequence with time interval information on elements. + * @see MSDN: Observable.TimeInterval + */ + public Observable> timeInterval(Scheduler scheduler) { + return create(OperationTimeInterval.timeInterval(this, scheduler)); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java b/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java new file mode 100644 index 0000000000..7b70818bc1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java @@ -0,0 +1,140 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; + +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.TimeInterval; + +/** + * Records the time interval between consecutive elements in an observable sequence. + */ +public class OperationTimeInterval { + + public static OnSubscribeFunc> timeInterval( + Observable source) { + return timeInterval(source, Schedulers.immediate()); + } + + public static OnSubscribeFunc> timeInterval( + final Observable source, final Scheduler scheduler) { + return new OnSubscribeFunc>() { + @Override + public Subscription onSubscribe( + Observer> observer) { + return source.subscribe(new TimeIntervalObserver(observer, + scheduler)); + } + }; + } + + private static class TimeIntervalObserver implements Observer { + + private final Observer> observer; + /** + * Only used to compute time intervals. + */ + private final Scheduler scheduler; + private long lastTimestamp; + + public TimeIntervalObserver(Observer> observer, + Scheduler scheduler) { + this.observer = observer; + this.scheduler = scheduler; + // The beginning time is the time when the observer subscribes. + lastTimestamp = scheduler.now(); + } + + @Override + public void onNext(T args) { + long nowTimestamp = scheduler.now(); + observer.onNext(new TimeInterval(nowTimestamp - lastTimestamp, + args)); + lastTimestamp = nowTimestamp; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onCompleted(); + } + } + + public static class UnitTest { + + private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; + + @Mock + private Observer> observer; + + private TestScheduler testScheduler; + private PublishSubject subject; + private Observable> observable; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + testScheduler = new TestScheduler(); + subject = PublishSubject.create(); + observable = subject.timeInterval(testScheduler); + } + + @Test + public void testTimeInterval() { + InOrder inOrder = inOrder(observer); + observable.subscribe(observer); + + testScheduler.advanceTimeBy(1000, TIME_UNIT); + subject.onNext(1); + testScheduler.advanceTimeBy(2000, TIME_UNIT); + subject.onNext(2); + testScheduler.advanceTimeBy(3000, TIME_UNIT); + subject.onNext(3); + subject.onCompleted(); + + inOrder.verify(observer, times(1)).onNext( + new TimeInterval(1000, 1)); + inOrder.verify(observer, times(1)).onNext( + new TimeInterval(2000, 2)); + inOrder.verify(observer, times(1)).onNext( + new TimeInterval(3000, 3)); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + } + +} diff --git a/rxjava-core/src/main/java/rx/util/TimeInterval.java b/rxjava-core/src/main/java/rx/util/TimeInterval.java new file mode 100644 index 0000000000..7bf1383688 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/TimeInterval.java @@ -0,0 +1,81 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util; + +public class TimeInterval { + private final long intervalInMilliseconds; + private final T value; + + public TimeInterval(long intervalInMilliseconds, T value) { + this.value = value; + this.intervalInMilliseconds = intervalInMilliseconds; + } + + /** + * Returns the interval in milliseconds. + * + * @return interval in milliseconds + */ + public long getIntervalInMilliseconds() { + return intervalInMilliseconds; + } + + /** + * Returns the value. + * + * @return the value + */ + public T getValue() { + return value; + } + + // The following methods are generated by eclipse automatically. + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime + * result + + (int) (intervalInMilliseconds ^ (intervalInMilliseconds >>> 32)); + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TimeInterval other = (TimeInterval) obj; + if (intervalInMilliseconds != other.intervalInMilliseconds) + return false; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + @Override + public String toString() { + return "TimeInterval [intervalInMilliseconds=" + intervalInMilliseconds + + ", value=" + value + "]"; + } +}