Skip to content

Commit 5d0ed38

Browse files
committed
Adding a draft of Subject class
ReactiveX#19
1 parent 9ac4d2e commit 5d0ed38

File tree

3 files changed

+171
-5
lines changed

3 files changed

+171
-5
lines changed

rxjava-core/src/main/java/rx/observables/Notification.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,43 @@ public boolean isOnNext() {
119119
public static enum Kind {
120120
OnNext, OnError, OnCompleted
121121
}
122+
123+
@Override
124+
public String toString() {
125+
StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind());
126+
if (hasValue())
127+
str.append(" ").append(getValue());
128+
if (hasException())
129+
str.append(" ").append(getException().getMessage());
130+
str.append("]");
131+
return str.toString();
132+
}
133+
134+
@Override
135+
public int hashCode() {
136+
int hash = getKind().hashCode();
137+
if (hasValue())
138+
hash = hash * 31 + getValue().hashCode();
139+
if (hasException())
140+
hash = hash * 31 + getException().hashCode();
141+
return hash;
142+
}
143+
144+
@Override
145+
public boolean equals(Object obj) {
146+
if (obj == null)
147+
return false;
148+
if (this == obj)
149+
return true;
150+
if (obj.getClass() != getClass())
151+
return false;
152+
Notification notification = (Notification) obj;
153+
if (notification.getKind() != getKind())
154+
return false;
155+
if (hasValue() && !getValue().equals(notification.getValue()))
156+
return false;
157+
if (hasException() && !getException().equals(notification.getException()))
158+
return false;
159+
return true;
160+
}
122161
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package rx.subjects;
2+
3+
import groovy.lang.Reference;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import junit.framework.Assert;
11+
12+
import org.junit.Test;
13+
14+
import rx.observables.Notification;
15+
import rx.observables.Observable;
16+
import rx.observables.Observer;
17+
import rx.observables.Subscription;
18+
import rx.util.AtomicObservableSubscription;
19+
import rx.util.AtomicObserver;
20+
import rx.util.functions.Action1;
21+
import rx.util.functions.Func1;
22+
23+
public class Subject<T> extends Observable<T> implements Observer<T> {
24+
public static <T> Subject<T> create() {
25+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
26+
27+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
28+
@Override
29+
public Subscription call(Observer<T> observer) {
30+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
31+
32+
subscription.wrap(new Subscription() {
33+
@Override
34+
public void unsubscribe() {
35+
// on unsubscribe remove it from the map of outbound observers to notify
36+
observers.remove(subscription);
37+
}
38+
});
39+
40+
// on subscribe add it to the map of outbound observers to notify
41+
observers.put(subscription, new AtomicObserver<T>(observer, subscription));
42+
return subscription;
43+
}
44+
};
45+
46+
return new Subject<T>(onSubscribe, observers);
47+
}
48+
49+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
50+
51+
protected Subject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
52+
super(onSubscribe);
53+
this.observers = observers;
54+
}
55+
56+
@Override
57+
public void onCompleted() {
58+
for (Observer<T> observer : observers.values()) {
59+
observer.onCompleted();
60+
}
61+
}
62+
63+
@Override
64+
public void onError(Exception e) {
65+
for (Observer<T> observer : observers.values()) {
66+
observer.onError(e);
67+
}
68+
}
69+
70+
@Override
71+
public void onNext(T args) {
72+
for (Observer<T> observer : observers.values()) {
73+
observer.onNext(args);
74+
}
75+
}
76+
77+
public static class UnitTest {
78+
@Test
79+
public void test() {
80+
Subject<Integer> subject = Subject.<Integer> create();
81+
final Reference<List<Notification<String>>> actualRef = new Reference<List<Notification<String>>>();
82+
83+
Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
84+
wNotificationsList.subscribe(new Action1<List<Notification<String>>>() {
85+
@Override
86+
public void call(List<Notification<String>> actual) {
87+
actualRef.set(actual);
88+
}
89+
});
90+
91+
Subscription sub = Observable.create(new Func1<Observer<Integer>, Subscription>() {
92+
@Override
93+
public Subscription call(final Observer<Integer> observer) {
94+
final AtomicBoolean stop = new AtomicBoolean(false);
95+
new Thread() {
96+
@Override
97+
public void run() {
98+
int i = 1;
99+
while (!stop.get()) {
100+
observer.onNext(i++);
101+
}
102+
observer.onCompleted();
103+
}
104+
}.start();
105+
return new Subscription() {
106+
@Override
107+
public void unsubscribe() {
108+
stop.set(true);
109+
}
110+
};
111+
}
112+
}).subscribe(subject);
113+
// the subject has received an onComplete from the first subscribe because
114+
// it is synchronous and the next subscribe won't do anything.
115+
Observable.toObservable(-1, -2, -3).subscribe(subject);
116+
117+
List<Notification<Integer>> expected = new ArrayList<Notification<Integer>>();
118+
expected.add(new Notification<Integer>(-1));
119+
expected.add(new Notification<Integer>(-2));
120+
expected.add(new Notification<Integer>(-3));
121+
expected.add(new Notification<Integer>());
122+
Assert.assertTrue(actualRef.get().containsAll(expected));
123+
124+
sub.unsubscribe();
125+
}
126+
}
127+
}

rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public final class AtomicObserverSingleThreaded<T> implements Observer<T> {
6565
* compositional by its very nature.
6666
*/
6767

68-
private final Observer<T> Observer;
68+
private final Observer<T> observer;
6969
private final AtomicObservableSubscription subscription;
7070
private volatile boolean finishRequested = false;
7171
private volatile boolean finished = false;
7272

7373
public AtomicObserverSingleThreaded(Observer<T> Observer, AtomicObservableSubscription subscription) {
74-
this.Observer = Observer;
74+
this.observer = Observer;
7575
this.subscription = subscription;
7676
}
7777

@@ -86,7 +86,7 @@ public void onNext(T arg) {
8686
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
8787
return;
8888
}
89-
Observer.onNext(arg);
89+
observer.onNext(arg);
9090
}
9191
}
9292

@@ -101,7 +101,7 @@ public void onError(Exception e) {
101101
if (finished || subscription.isUnsubscribed()) {
102102
return;
103103
}
104-
Observer.onError(e);
104+
observer.onError(e);
105105
finished = true;
106106
}
107107
}
@@ -117,7 +117,7 @@ public void onCompleted() {
117117
if (finished || subscription.isUnsubscribed()) {
118118
return;
119119
}
120-
Observer.onCompleted();
120+
observer.onCompleted();
121121
finished = true;
122122
}
123123
}

0 commit comments

Comments
 (0)