1
+ package rx .operators ;
2
+
3
+ import static org .junit .Assert .*;
4
+
5
+ import java .lang .Thread .UncaughtExceptionHandler ;
6
+ import java .util .ArrayList ;
7
+ import java .util .List ;
8
+ import java .util .concurrent .atomic .AtomicBoolean ;
9
+ import java .util .concurrent .atomic .AtomicReference ;
10
+
11
+ import org .junit .Test ;
12
+
13
+ import rx .Observable ;
14
+ import rx .Observer ;
15
+ import rx .Subscription ;
16
+ import rx .subscriptions .Subscriptions ;
17
+ import rx .util .functions .Func1 ;
18
+
19
+ /**
20
+ * Common utility functions for operator implementations and tests.
21
+ */
22
+ /* package */ class AbstractOperation
23
+ {
24
+ private AbstractOperation () {
25
+ }
26
+
27
+ public static class UnitTest {
28
+
29
+ public static <T > Func1 <Observer <T >, Subscription > assertTrustedObservable (final Func1 <Observer <T >, Subscription > source )
30
+ {
31
+ return new Func1 <Observer <T >, Subscription >()
32
+ {
33
+ @ Override
34
+ public Subscription call (Observer <T > observer )
35
+ {
36
+ return source .call (new TestingObserver <T >(observer ));
37
+ }
38
+ };
39
+ }
40
+
41
+ public static class TestingObserver <T > implements Observer <T > {
42
+
43
+ private final Observer <T > actual ;
44
+ private final AtomicBoolean isFinished = new AtomicBoolean (false );
45
+ private final AtomicBoolean isInCallback = new AtomicBoolean (false );
46
+
47
+ public TestingObserver (Observer <T > actual ) {
48
+ this .actual = actual ;
49
+ }
50
+
51
+ @ Override
52
+ public void onCompleted () {
53
+ assertFalse ("previous call to onCompleted() or onError()" , !isFinished .compareAndSet (false , true ));
54
+ assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
55
+ actual .onCompleted ();
56
+ isInCallback .set (false );
57
+ }
58
+
59
+ @ Override
60
+ public void onError (Exception e ) {
61
+ assertFalse ("previous call to onCompleted() or onError()" , !isFinished .compareAndSet (false , true ));
62
+ assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
63
+ actual .onError (e );
64
+ isInCallback .set (false );
65
+ }
66
+
67
+ @ Override
68
+ public void onNext (T args ) {
69
+ assertFalse ("previous call to onCompleted() or onError()" , isFinished .get ());
70
+ assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
71
+ actual .onNext (args );
72
+ isInCallback .set (false );
73
+ }
74
+
75
+ }
76
+
77
+ @ Test (expected = AssertionError .class )
78
+ public void testDoubleCompleted () {
79
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
80
+ {
81
+ @ Override
82
+ public Subscription call (Observer <String > observer )
83
+ {
84
+ observer .onCompleted ();
85
+ observer .onCompleted ();
86
+ return Subscriptions .empty ();
87
+ }
88
+ })).lastOrDefault ("end" );
89
+
90
+ }
91
+
92
+ @ Test (expected = AssertionError .class )
93
+ public void testCompletedError () {
94
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
95
+ {
96
+ @ Override
97
+ public Subscription call (Observer <String > observer )
98
+ {
99
+ observer .onCompleted ();
100
+ observer .onError (new Exception ());
101
+ return Subscriptions .empty ();
102
+ }
103
+ })).lastOrDefault ("end" );
104
+ }
105
+
106
+ @ Test (expected = AssertionError .class )
107
+ public void testCompletedNext () {
108
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
109
+ {
110
+ @ Override
111
+ public Subscription call (Observer <String > observer )
112
+ {
113
+ observer .onCompleted ();
114
+ observer .onNext ("one" );
115
+ return Subscriptions .empty ();
116
+ }
117
+ })).lastOrDefault ("end" );
118
+ }
119
+
120
+ @ Test (expected = AssertionError .class )
121
+ public void testErrorCompleted () {
122
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
123
+ {
124
+ @ Override
125
+ public Subscription call (Observer <String > observer )
126
+ {
127
+ observer .onError (new Exception ());
128
+ observer .onCompleted ();
129
+ return Subscriptions .empty ();
130
+ }
131
+ })).lastOrDefault ("end" );
132
+ }
133
+
134
+ @ Test (expected = AssertionError .class )
135
+ public void testDoubleError () {
136
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
137
+ {
138
+ @ Override
139
+ public Subscription call (Observer <String > observer )
140
+ {
141
+ observer .onError (new Exception ());
142
+ observer .onError (new Exception ());
143
+ return Subscriptions .empty ();
144
+ }
145
+ })).lastOrDefault ("end" );
146
+ }
147
+
148
+ @ Test (expected = AssertionError .class )
149
+ public void testErrorNext () {
150
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
151
+ {
152
+ @ Override
153
+ public Subscription call (Observer <String > observer )
154
+ {
155
+ observer .onError (new Exception ());
156
+ observer .onNext ("one" );
157
+ return Subscriptions .empty ();
158
+ }
159
+ })).lastOrDefault ("end" );
160
+ }
161
+
162
+ @ Test
163
+ public void testNextCompleted () {
164
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
165
+ {
166
+ @ Override
167
+ public Subscription call (Observer <String > observer )
168
+ {
169
+ observer .onNext ("one" );
170
+ observer .onCompleted ();
171
+ return Subscriptions .empty ();
172
+ }
173
+ })).lastOrDefault ("end" );
174
+ }
175
+
176
+ @ Test
177
+ public void testConcurrentNextNext () {
178
+ final List <Thread > threads = new ArrayList <Thread >();
179
+ final AtomicReference <Throwable > threadFailure = new AtomicReference <Throwable >();
180
+ Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
181
+ {
182
+ @ Override
183
+ public Subscription call (final Observer <String > observer )
184
+ {
185
+ threads .add (new Thread (new Runnable ()
186
+ {
187
+ @ Override
188
+ public void run ()
189
+ {
190
+ observer .onNext ("one" );
191
+ }
192
+ }));
193
+ threads .add (new Thread (new Runnable ()
194
+ {
195
+ @ Override
196
+ public void run ()
197
+ {
198
+ observer .onNext ("two" );
199
+ }
200
+ }));
201
+ return Subscriptions .empty ();
202
+ }
203
+ })).subscribe (new SlowObserver ());
204
+ for (Thread thread : threads ) {
205
+ thread .setUncaughtExceptionHandler (new UncaughtExceptionHandler ()
206
+ {
207
+ @ Override
208
+ public void uncaughtException (Thread thread , Throwable throwable )
209
+ {
210
+ threadFailure .set (throwable );
211
+ }
212
+ });
213
+ thread .start ();
214
+ }
215
+ for (Thread thread : threads ) {
216
+ try {
217
+ thread .join ();
218
+ } catch (InterruptedException ignored ) {
219
+ }
220
+ }
221
+ // Junit seems pretty bad about exposing test failures inside of created threads.
222
+ assertNotNull ("exception thrown by thread" , threadFailure .get ());
223
+ assertEquals ("class of exception thrown by thread" , AssertionError .class , threadFailure .get ().getClass ());
224
+ }
225
+
226
+ private static class SlowObserver implements Observer <String >
227
+ {
228
+ @ Override
229
+ public void onCompleted ()
230
+ {
231
+ try {
232
+ Thread .sleep (10 );
233
+ } catch (InterruptedException ignored ) {
234
+ }
235
+ }
236
+
237
+ @ Override
238
+ public void onError (Exception e )
239
+ {
240
+ try {
241
+ Thread .sleep (10 );
242
+ } catch (InterruptedException ignored ) {
243
+ }
244
+ }
245
+
246
+ @ Override
247
+ public void onNext (String args )
248
+ {
249
+ try {
250
+ Thread .sleep (10 );
251
+ } catch (InterruptedException ignored ) {
252
+ }
253
+ }
254
+ }
255
+ }
256
+ }
0 commit comments