Skip to content

Commit 377effd

Browse files
Merge pull request #108 from abersnaze/subject
Adding a draft of Subject class
2 parents ca75023 + c7f554f commit 377effd

File tree

3 files changed

+170
-5
lines changed

3 files changed

+170
-5
lines changed

rxjava-core/src/main/java/rx/observables/Notification.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,43 @@ public boolean isOnNext() {
119119
public static enum Kind {
120120
OnNext, OnError, OnCompleted
121121
}
122+
123+
@Override
124+
public String toString() {
125+
StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind());
126+
if (hasValue())
127+
str.append(" ").append(getValue());
128+
if (hasException())
129+
str.append(" ").append(getException().getMessage());
130+
str.append("]");
131+
return str.toString();
132+
}
133+
134+
@Override
135+
public int hashCode() {
136+
int hash = getKind().hashCode();
137+
if (hasValue())
138+
hash = hash * 31 + getValue().hashCode();
139+
if (hasException())
140+
hash = hash * 31 + getException().hashCode();
141+
return hash;
142+
}
143+
144+
@Override
145+
public boolean equals(Object obj) {
146+
if (obj == null)
147+
return false;
148+
if (this == obj)
149+
return true;
150+
if (obj.getClass() != getClass())
151+
return false;
152+
Notification notification = (Notification) obj;
153+
if (notification.getKind() != getKind())
154+
return false;
155+
if (hasValue() && !getValue().equals(notification.getValue()))
156+
return false;
157+
if (hasException() && !getException().equals(notification.getException()))
158+
return false;
159+
return true;
160+
}
122161
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package rx.subjects;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
import junit.framework.Assert;
10+
11+
import org.junit.Test;
12+
13+
import rx.observables.Notification;
14+
import rx.observables.Observable;
15+
import rx.observables.Observer;
16+
import rx.observables.Subscription;
17+
import rx.util.AtomicObservableSubscription;
18+
import rx.util.AtomicObserver;
19+
import rx.util.functions.Action1;
20+
import rx.util.functions.Func1;
21+
22+
public class Subject<T> extends Observable<T> implements Observer<T> {
23+
public static <T> Subject<T> create() {
24+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
25+
26+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
27+
@Override
28+
public Subscription call(Observer<T> observer) {
29+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
30+
31+
subscription.wrap(new Subscription() {
32+
@Override
33+
public void unsubscribe() {
34+
// on unsubscribe remove it from the map of outbound observers to notify
35+
observers.remove(subscription);
36+
}
37+
});
38+
39+
// on subscribe add it to the map of outbound observers to notify
40+
observers.put(subscription, new AtomicObserver<T>(observer, subscription));
41+
return subscription;
42+
}
43+
};
44+
45+
return new Subject<T>(onSubscribe, observers);
46+
}
47+
48+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
49+
50+
protected Subject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
51+
super(onSubscribe);
52+
this.observers = observers;
53+
}
54+
55+
@Override
56+
public void onCompleted() {
57+
for (Observer<T> observer : observers.values()) {
58+
observer.onCompleted();
59+
}
60+
}
61+
62+
@Override
63+
public void onError(Exception e) {
64+
for (Observer<T> observer : observers.values()) {
65+
observer.onError(e);
66+
}
67+
}
68+
69+
@Override
70+
public void onNext(T args) {
71+
for (Observer<T> observer : observers.values()) {
72+
observer.onNext(args);
73+
}
74+
}
75+
76+
public static class UnitTest {
77+
@Test
78+
public void test() {
79+
Subject<Integer> subject = Subject.<Integer> create();
80+
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();
81+
82+
Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
83+
wNotificationsList.subscribe(new Action1<List<Notification<String>>>() {
84+
@Override
85+
public void call(List<Notification<String>> actual) {
86+
actualRef.set(actual);
87+
}
88+
});
89+
90+
Subscription sub = Observable.create(new Func1<Observer<Integer>, Subscription>() {
91+
@Override
92+
public Subscription call(final Observer<Integer> observer) {
93+
final AtomicBoolean stop = new AtomicBoolean(false);
94+
new Thread() {
95+
@Override
96+
public void run() {
97+
int i = 1;
98+
while (!stop.get()) {
99+
observer.onNext(i++);
100+
}
101+
observer.onCompleted();
102+
}
103+
}.start();
104+
return new Subscription() {
105+
@Override
106+
public void unsubscribe() {
107+
stop.set(true);
108+
}
109+
};
110+
}
111+
}).subscribe(subject);
112+
// the subject has received an onComplete from the first subscribe because
113+
// it is synchronous and the next subscribe won't do anything.
114+
Observable.toObservable(-1, -2, -3).subscribe(subject);
115+
116+
List<Notification<Integer>> expected = new ArrayList<Notification<Integer>>();
117+
expected.add(new Notification<Integer>(-1));
118+
expected.add(new Notification<Integer>(-2));
119+
expected.add(new Notification<Integer>(-3));
120+
expected.add(new Notification<Integer>());
121+
Assert.assertTrue(actualRef.get().containsAll(expected));
122+
123+
sub.unsubscribe();
124+
}
125+
}
126+
}

rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public final class AtomicObserverSingleThreaded<T> implements Observer<T> {
6565
* compositional by its very nature.
6666
*/
6767

68-
private final Observer<T> Observer;
68+
private final Observer<T> observer;
6969
private final AtomicObservableSubscription subscription;
7070
private volatile boolean finishRequested = false;
7171
private volatile boolean finished = false;
7272

7373
public AtomicObserverSingleThreaded(Observer<T> Observer, AtomicObservableSubscription subscription) {
74-
this.Observer = Observer;
74+
this.observer = Observer;
7575
this.subscription = subscription;
7676
}
7777

@@ -86,7 +86,7 @@ public void onNext(T arg) {
8686
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
8787
return;
8888
}
89-
Observer.onNext(arg);
89+
observer.onNext(arg);
9090
}
9191
}
9292

@@ -101,7 +101,7 @@ public void onError(Exception e) {
101101
if (finished || subscription.isUnsubscribed()) {
102102
return;
103103
}
104-
Observer.onError(e);
104+
observer.onError(e);
105105
finished = true;
106106
}
107107
}
@@ -117,7 +117,7 @@ public void onCompleted() {
117117
if (finished || subscription.isUnsubscribed()) {
118118
return;
119119
}
120-
Observer.onCompleted();
120+
observer.onCompleted();
121121
finished = true;
122122
}
123123
}

0 commit comments

Comments
 (0)