Skip to content

Commit b919abf

Browse files
Merge pull request ReactiveX#250 from michaeldejong/subject-implementations
Initial implementation of AsyncSubject
2 parents 9f0f753 + 509b69f commit b919abf

File tree

1 file changed

+253
-0
lines changed

1 file changed

+253
-0
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Matchers.anyString;
23+
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import org.junit.Test;
28+
import org.mockito.Mockito;
29+
30+
import rx.Observer;
31+
import rx.Subscription;
32+
import rx.util.AtomicObservableSubscription;
33+
import rx.util.SynchronizedObserver;
34+
import rx.util.functions.Action1;
35+
import rx.util.functions.Func0;
36+
import rx.util.functions.Func1;
37+
38+
/**
39+
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the completes.
40+
* <p>
41+
* Example usage:
42+
* <p>
43+
* <pre> {@code
44+
45+
// observer will receive no onNext events.
46+
AsyncSubject<Object> subject = AsyncSubject.create();
47+
subject.subscribe(observer);
48+
subject.onNext("one");
49+
subject.onNext("two");
50+
subject.onNext("three");
51+
52+
// observer will receive "three" as the only onNext event.
53+
AsyncSubject<Object> subject = AsyncSubject.create();
54+
subject.subscribe(observer);
55+
subject.onNext("one");
56+
subject.onNext("two");
57+
subject.onNext("three");
58+
subject.onCompleted();
59+
60+
} </pre>
61+
*
62+
* @param <T>
63+
*/
64+
public class AsyncSubject<T> extends Subject<T, T> {
65+
66+
public static <T> AsyncSubject<T> create() {
67+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
68+
69+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
70+
@Override
71+
public Subscription call(Observer<T> observer) {
72+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
73+
74+
subscription.wrap(new Subscription() {
75+
@Override
76+
public void unsubscribe() {
77+
// on unsubscribe remove it from the map of outbound observers to notify
78+
observers.remove(subscription);
79+
}
80+
});
81+
82+
// on subscribe add it to the map of outbound observers to notify
83+
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
84+
return subscription;
85+
}
86+
};
87+
88+
return new AsyncSubject<T>(onSubscribe, observers);
89+
}
90+
91+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
92+
private final AtomicReference<T> currentValue;
93+
94+
protected AsyncSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
95+
super(onSubscribe);
96+
this.observers = observers;
97+
this.currentValue = new AtomicReference<T>();
98+
}
99+
100+
@Override
101+
public void onCompleted() {
102+
T finalValue = currentValue.get();
103+
for (Observer<T> observer : observers.values()) {
104+
observer.onNext(finalValue);
105+
}
106+
for (Observer<T> observer : observers.values()) {
107+
observer.onCompleted();
108+
}
109+
}
110+
111+
@Override
112+
public void onError(Exception e) {
113+
for (Observer<T> observer : observers.values()) {
114+
observer.onError(e);
115+
}
116+
}
117+
118+
@Override
119+
public void onNext(T args) {
120+
currentValue.set(args);
121+
}
122+
123+
public static class UnitTest {
124+
125+
private final Exception testException = new Exception();
126+
127+
@Test
128+
public void testNeverCompleted() {
129+
AsyncSubject<Object> subject = AsyncSubject.create();
130+
131+
@SuppressWarnings("unchecked")
132+
Observer<String> aObserver = mock(Observer.class);
133+
subject.subscribe(aObserver);
134+
135+
subject.onNext("one");
136+
subject.onNext("two");
137+
subject.onNext("three");
138+
139+
assertNeverCompletedObserver(aObserver);
140+
}
141+
142+
private void assertNeverCompletedObserver(Observer<String> aObserver)
143+
{
144+
verify(aObserver, Mockito.never()).onNext(anyString());
145+
verify(aObserver, Mockito.never()).onError(testException);
146+
verify(aObserver, Mockito.never()).onCompleted();
147+
}
148+
149+
@Test
150+
public void testCompleted() {
151+
AsyncSubject<Object> subject = AsyncSubject.create();
152+
153+
@SuppressWarnings("unchecked")
154+
Observer<String> aObserver = mock(Observer.class);
155+
subject.subscribe(aObserver);
156+
157+
subject.onNext("one");
158+
subject.onNext("two");
159+
subject.onNext("three");
160+
subject.onCompleted();
161+
162+
assertCompletedObserver(aObserver);
163+
}
164+
165+
private void assertCompletedObserver(Observer<String> aObserver)
166+
{
167+
verify(aObserver, times(1)).onNext("three");
168+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
169+
verify(aObserver, times(1)).onCompleted();
170+
}
171+
172+
@Test
173+
public void testError() {
174+
AsyncSubject<Object> subject = AsyncSubject.create();
175+
176+
@SuppressWarnings("unchecked")
177+
Observer<String> aObserver = mock(Observer.class);
178+
subject.subscribe(aObserver);
179+
180+
subject.onNext("one");
181+
subject.onNext("two");
182+
subject.onNext("three");
183+
subject.onError(testException);
184+
subject.onNext("four");
185+
subject.onError(new Exception());
186+
subject.onCompleted();
187+
188+
assertErrorObserver(aObserver);
189+
}
190+
191+
private void assertErrorObserver(Observer<String> aObserver)
192+
{
193+
verify(aObserver, Mockito.never()).onNext(anyString());
194+
verify(aObserver, times(1)).onError(testException);
195+
verify(aObserver, Mockito.never()).onCompleted();
196+
}
197+
198+
@Test
199+
public void testUnsubscribeBeforeCompleted() {
200+
AsyncSubject<Object> subject = AsyncSubject.create();
201+
202+
@SuppressWarnings("unchecked")
203+
Observer<String> aObserver = mock(Observer.class);
204+
Subscription subscription = subject.subscribe(aObserver);
205+
206+
subject.onNext("one");
207+
subject.onNext("two");
208+
209+
subscription.unsubscribe();
210+
assertNoOnNextEventsReceived(aObserver);
211+
212+
subject.onNext("three");
213+
subject.onCompleted();
214+
215+
assertNoOnNextEventsReceived(aObserver);
216+
}
217+
218+
private void assertNoOnNextEventsReceived(Observer<String> aObserver)
219+
{
220+
verify(aObserver, Mockito.never()).onNext(anyString());
221+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
222+
verify(aObserver, Mockito.never()).onCompleted();
223+
}
224+
225+
@Test
226+
public void testUnsubscribe()
227+
{
228+
UnsubscribeTester.test(new Func0<AsyncSubject<Object>>()
229+
{
230+
@Override
231+
public AsyncSubject<Object> call()
232+
{
233+
return AsyncSubject.create();
234+
}
235+
}, new Action1<AsyncSubject<Object>>()
236+
{
237+
@Override
238+
public void call(AsyncSubject<Object> DefaultSubject)
239+
{
240+
DefaultSubject.onCompleted();
241+
}
242+
}, new Action1<AsyncSubject<Object>>()
243+
{
244+
@Override
245+
public void call(AsyncSubject<Object> DefaultSubject)
246+
{
247+
DefaultSubject.onError(new Exception());
248+
}
249+
},
250+
null);
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)