Skip to content

Commit 1aa722d

Browse files
Merge pull request #248 from jmhofer/sample
Implemented Sample Operation
2 parents 9e48ffb + ae2183c commit 1aa722d

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import rx.operators.OperationOnErrorResumeNextViaFunction;
5757
import rx.operators.OperationOnErrorResumeNextViaObservable;
5858
import rx.operators.OperationOnErrorReturn;
59+
import rx.operators.OperationSample;
5960
import rx.operators.OperationScan;
6061
import rx.operators.OperationSkip;
6162
import rx.operators.OperationSubscribeOn;
@@ -252,13 +253,15 @@ public Subscription subscribe(final Map<String, Object> callbacks) {
252253
*/
253254
return protectivelyWrapAndSubscribe(new Observer() {
254255

256+
@Override
255257
public void onCompleted() {
256258
Object onComplete = callbacks.get("onCompleted");
257259
if (onComplete != null) {
258260
Functions.from(onComplete).call();
259261
}
260262
}
261263

264+
@Override
262265
public void onError(Exception e) {
263266
handleError(e);
264267
Object onError = callbacks.get("onError");
@@ -267,6 +270,7 @@ public void onError(Exception e) {
267270
}
268271
}
269272

273+
@Override
270274
public void onNext(Object args) {
271275
onNext.call(args);
272276
}
@@ -298,15 +302,18 @@ public Subscription subscribe(final Object o) {
298302
*/
299303
return protectivelyWrapAndSubscribe(new Observer() {
300304

305+
@Override
301306
public void onCompleted() {
302307
// do nothing
303308
}
304309

310+
@Override
305311
public void onError(Exception e) {
306312
handleError(e);
307313
// no callback defined
308314
}
309315

316+
@Override
310317
public void onNext(Object args) {
311318
onNext.call(args);
312319
}
@@ -327,15 +334,18 @@ public Subscription subscribe(final Action1<T> onNext) {
327334
*/
328335
return protectivelyWrapAndSubscribe(new Observer<T>() {
329336

337+
@Override
330338
public void onCompleted() {
331339
// do nothing
332340
}
333341

342+
@Override
334343
public void onError(Exception e) {
335344
handleError(e);
336345
// no callback defined
337346
}
338347

348+
@Override
339349
public void onNext(T args) {
340350
if (onNext == null) {
341351
throw new RuntimeException("onNext must be implemented");
@@ -365,17 +375,20 @@ public Subscription subscribe(final Object onNext, final Object onError) {
365375
*/
366376
return protectivelyWrapAndSubscribe(new Observer() {
367377

378+
@Override
368379
public void onCompleted() {
369380
// do nothing
370381
}
371382

383+
@Override
372384
public void onError(Exception e) {
373385
handleError(e);
374386
if (onError != null) {
375387
Functions.from(onError).call(e);
376388
}
377389
}
378390

391+
@Override
379392
public void onNext(Object args) {
380393
onNextFunction.call(args);
381394
}
@@ -396,17 +409,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
396409
*/
397410
return protectivelyWrapAndSubscribe(new Observer<T>() {
398411

412+
@Override
399413
public void onCompleted() {
400414
// do nothing
401415
}
402416

417+
@Override
403418
public void onError(Exception e) {
404419
handleError(e);
405420
if (onError != null) {
406421
onError.call(e);
407422
}
408423
}
409424

425+
@Override
410426
public void onNext(T args) {
411427
if (onNext == null) {
412428
throw new RuntimeException("onNext must be implemented");
@@ -436,19 +452,22 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
436452
*/
437453
return protectivelyWrapAndSubscribe(new Observer() {
438454

455+
@Override
439456
public void onCompleted() {
440457
if (onComplete != null) {
441458
Functions.from(onComplete).call();
442459
}
443460
}
444461

462+
@Override
445463
public void onError(Exception e) {
446464
handleError(e);
447465
if (onError != null) {
448466
Functions.from(onError).call(e);
449467
}
450468
}
451469

470+
@Override
452471
public void onNext(Object args) {
453472
onNextFunction.call(args);
454473
}
@@ -469,17 +488,20 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
469488
*/
470489
return protectivelyWrapAndSubscribe(new Observer<T>() {
471490

491+
@Override
472492
public void onCompleted() {
473493
onComplete.call();
474494
}
475495

496+
@Override
476497
public void onError(Exception e) {
477498
handleError(e);
478499
if (onError != null) {
479500
onError.call(e);
480501
}
481502
}
482503

504+
@Override
483505
public void onNext(T args) {
484506
if (onNext == null) {
485507
throw new RuntimeException("onNext must be implemented");
@@ -516,10 +538,12 @@ public void forEach(final Action1<T> onNext) {
516538
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
517539
*/
518540
protectivelyWrapAndSubscribe(new Observer<T>() {
541+
@Override
519542
public void onCompleted() {
520543
latch.countDown();
521544
}
522545

546+
@Override
523547
public void onError(Exception e) {
524548
/*
525549
* If we receive an onError event we set the reference on the outer thread
@@ -531,6 +555,7 @@ public void onError(Exception e) {
531555
latch.countDown();
532556
}
533557

558+
@Override
534559
public void onNext(T args) {
535560
onNext.call(args);
536561
}
@@ -582,6 +607,7 @@ public void forEach(final Object o) {
582607

583608
forEach(new Action1() {
584609

610+
@Override
585611
public void call(Object args) {
586612
onNext.call(args);
587613
}
@@ -2743,6 +2769,7 @@ public Observable<T> filter(final Object callback) {
27432769
final FuncN _f = Functions.from(callback);
27442770
return filter(this, new Func1<T, Boolean>() {
27452771

2772+
@Override
27462773
public Boolean call(T t1) {
27472774
return (Boolean) _f.call(t1);
27482775
}
@@ -2913,6 +2940,7 @@ public <R> Observable<R> map(final Object callback) {
29132940
final FuncN _f = Functions.from(callback);
29142941
return map(this, new Func1<T, R>() {
29152942

2943+
@Override
29162944
@SuppressWarnings("unchecked")
29172945
public R call(T t1) {
29182946
return (R) _f.call(t1);
@@ -2963,6 +2991,7 @@ public <R> Observable<R> mapMany(final Object callback) {
29632991
final FuncN _f = Functions.from(callback);
29642992
return mapMany(this, new Func1<T, Observable<R>>() {
29652993

2994+
@Override
29662995
@SuppressWarnings("unchecked")
29672996
public Observable<R> call(T t1) {
29682997
return (Observable<R>) _f.call(t1);
@@ -3071,6 +3100,7 @@ public Observable<T> onErrorResumeNext(final Object resumeFunction) {
30713100
final FuncN _f = Functions.from(resumeFunction);
30723101
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {
30733102

3103+
@Override
30743104
@SuppressWarnings("unchecked")
30753105
public Observable<T> call(Exception e) {
30763106
return (Observable<T>) _f.call(e);
@@ -3152,6 +3182,7 @@ public Observable<T> onErrorReturn(final Object resumeFunction) {
31523182
final FuncN _f = Functions.from(resumeFunction);
31533183
return onErrorReturn(this, new Func1<Exception, T>() {
31543184

3185+
@Override
31553186
@SuppressWarnings("unchecked")
31563187
public T call(Exception e) {
31573188
return (T) _f.call(e);
@@ -3288,6 +3319,34 @@ public Observable<T> scan(Func2<T, T, T> accumulator) {
32883319
return scan(this, accumulator);
32893320
}
32903321

3322+
/**
3323+
* Samples the observable sequence at each interval.
3324+
*
3325+
* @param period
3326+
* The period of time that defines the sampling rate.
3327+
* @param unit
3328+
* The time unit for the sampling rate time period.
3329+
* @return An observable sequence whose elements are the results of sampling the current observable sequence.
3330+
*/
3331+
public Observable<T> sample(long period, TimeUnit unit) {
3332+
return create(OperationSample.sample(this, period, unit));
3333+
}
3334+
3335+
/**
3336+
* Samples the observable sequence at each interval.
3337+
*
3338+
* @param period
3339+
* The period of time that defines the sampling rate.
3340+
* @param unit
3341+
* The time unit for the sampling rate time period.
3342+
* @param scheduler
3343+
* The scheduler to use for sampling.
3344+
* @return An observable sequence whose elements are the results of sampling the current observable sequence.
3345+
*/
3346+
public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
3347+
return create(OperationSample.sample(this, period, unit, scheduler));
3348+
}
3349+
32913350
/**
32923351
* Returns an Observable that applies a function of your choosing to the first item emitted by a
32933352
* source Observable, then feeds the result of that function along with the second item emitted

0 commit comments

Comments
 (0)