Skip to content

Commit df9b64a

Browse files
Dan MaasRobWin
Dan Maas
authored andcommitted
Issue ReactiveX#100: Added a rate limiter operator for rxjava2
1 parent f79c61c commit df9b64a

File tree

2 files changed

+600
-0
lines changed

2 files changed

+600
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
* Copyright 2017 Dan Maas
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.resilience4j.ratelimiter.operator;
18+
19+
import io.github.resilience4j.ratelimiter.RateLimiter;
20+
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
21+
import io.reactivex.FlowableOperator;
22+
import io.reactivex.ObservableOperator;
23+
import io.reactivex.Observer;
24+
import io.reactivex.SingleObserver;
25+
import io.reactivex.SingleOperator;
26+
import io.reactivex.disposables.Disposable;
27+
import org.reactivestreams.Subscriber;
28+
import org.reactivestreams.Subscription;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
public class RateLimiterOperator<T> implements ObservableOperator<T, T>, FlowableOperator<T, T>, SingleOperator<T, T> {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(RateLimiterOperator.class);
37+
38+
private final RateLimiter rateLimiter;
39+
40+
private RateLimiterOperator(RateLimiter rateLimiter) {
41+
this.rateLimiter = rateLimiter;
42+
}
43+
44+
/**
45+
* Creates a RateLimiterOperator.
46+
*
47+
* @param rateLimiter the RateLimiter
48+
* @param <T> the value type of the upstream and downstream
49+
* @return a RateLimiterOperator
50+
*/
51+
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
52+
return new RateLimiterOperator<>(rateLimiter);
53+
}
54+
55+
/**
56+
* {@inheritDoc}
57+
*/
58+
@Override
59+
public Subscriber<? super T> apply(Subscriber<? super T> childSubscriber) throws Exception {
60+
return new RateLimiterSubscriber(childSubscriber);
61+
}
62+
63+
/**
64+
* {@inheritDoc}
65+
*/
66+
@Override
67+
public Observer<? super T> apply(Observer<? super T> childObserver) throws Exception {
68+
return new RateLimiterObserver(childObserver);
69+
}
70+
71+
/**
72+
* {@inheritDoc}
73+
*/
74+
@Override
75+
public SingleObserver<? super T> apply(SingleObserver<? super T> childObserver) throws Exception {
76+
return new RateLimiterSingleObserver(childObserver);
77+
}
78+
79+
private final class RateLimiterSubscriber implements Subscriber<T>, Subscription {
80+
81+
private final Subscriber<? super T> childSubscriber;
82+
private Subscription subscription;
83+
private AtomicBoolean cancelled = new AtomicBoolean(false);
84+
85+
RateLimiterSubscriber(Subscriber<? super T> childSubscriber) {
86+
this.childSubscriber = childSubscriber;
87+
}
88+
89+
/**
90+
* {@inheritDoc}
91+
*/
92+
@Override
93+
public void onSubscribe(Subscription subscription) {
94+
this.subscription = subscription;
95+
if (LOG.isDebugEnabled()) {
96+
LOG.info("onSubscribe");
97+
}
98+
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
99+
childSubscriber.onSubscribe(this);
100+
} else {
101+
subscription.cancel();
102+
childSubscriber.onSubscribe(this);
103+
childSubscriber.onError(new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()));
104+
}
105+
}
106+
107+
/**
108+
* {@inheritDoc}
109+
*/
110+
@Override
111+
public void onNext(T event) {
112+
if (LOG.isDebugEnabled()) {
113+
LOG.info("onNext: {}", event);
114+
}
115+
if (!isCancelled()) {
116+
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
117+
childSubscriber.onNext(event);
118+
} else {
119+
subscription.cancel();
120+
childSubscriber.onError(new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()));
121+
}
122+
}
123+
}
124+
125+
/**
126+
* {@inheritDoc}
127+
*/
128+
@Override
129+
public void onError(Throwable e) {
130+
if (LOG.isDebugEnabled()) {
131+
LOG.info("onError", e);
132+
}
133+
if (!isCancelled()) {
134+
childSubscriber.onError(e);
135+
136+
}
137+
}
138+
139+
/**
140+
* {@inheritDoc}
141+
*/
142+
@Override
143+
public void onComplete() {
144+
if (LOG.isDebugEnabled()) {
145+
LOG.info("onComplete");
146+
}
147+
if (!isCancelled()) {
148+
childSubscriber.onComplete();
149+
}
150+
}
151+
152+
/**
153+
* {@inheritDoc}
154+
*/
155+
@Override
156+
public void request(long n) {
157+
subscription.request(n);
158+
}
159+
160+
/**
161+
* {@inheritDoc}
162+
*/
163+
@Override
164+
public void cancel() {
165+
if (!cancelled.get()) {
166+
cancelled.set(true);
167+
subscription.cancel();
168+
}
169+
}
170+
171+
public boolean isCancelled() {
172+
return cancelled.get();
173+
}
174+
}
175+
176+
private final class RateLimiterObserver implements Observer<T>, Disposable {
177+
178+
private final Observer<? super T> childObserver;
179+
private Disposable disposable;
180+
private AtomicBoolean cancelled = new AtomicBoolean(false);
181+
182+
RateLimiterObserver(Observer<? super T> childObserver) {
183+
this.childObserver = childObserver;
184+
}
185+
186+
/**
187+
* {@inheritDoc}
188+
*/
189+
@Override
190+
public void onSubscribe(Disposable disposable) {
191+
this.disposable = disposable;
192+
if (LOG.isDebugEnabled()) {
193+
LOG.info("onSubscribe");
194+
}
195+
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
196+
childObserver.onSubscribe(this);
197+
} else {
198+
disposable.dispose();
199+
childObserver.onSubscribe(this);
200+
childObserver.onError(new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()));
201+
}
202+
}
203+
204+
/**
205+
* {@inheritDoc}
206+
*/
207+
@Override
208+
public void onNext(T event) {
209+
if (LOG.isDebugEnabled()) {
210+
LOG.info("onNext: {}", event);
211+
}
212+
if (!isDisposed()) {
213+
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
214+
childObserver.onNext(event);
215+
} else {
216+
disposable.dispose();
217+
childObserver.onError(new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()));
218+
}
219+
}
220+
}
221+
222+
/**
223+
* {@inheritDoc}
224+
*/
225+
@Override
226+
public void onError(Throwable e) {
227+
if (LOG.isDebugEnabled()) {
228+
LOG.info("onError", e);
229+
}
230+
if (!isDisposed()) {
231+
childObserver.onError(e);
232+
}
233+
}
234+
235+
/**
236+
* {@inheritDoc}
237+
*/
238+
@Override
239+
public void onComplete() {
240+
if (LOG.isDebugEnabled()) {
241+
LOG.info("onComplete");
242+
}
243+
if (!isDisposed()) {
244+
childObserver.onComplete();
245+
}
246+
}
247+
248+
/**
249+
* {@inheritDoc}
250+
*/
251+
@Override
252+
public void dispose() {
253+
if (!cancelled.get()) {
254+
cancelled.set(true);
255+
disposable.dispose();
256+
}
257+
}
258+
259+
/**
260+
* {@inheritDoc}
261+
*/
262+
@Override
263+
public boolean isDisposed() {
264+
return cancelled.get();
265+
}
266+
}
267+
268+
private class RateLimiterSingleObserver implements SingleObserver<T>, Disposable {
269+
270+
private final SingleObserver<? super T> childObserver;
271+
private Disposable disposable;
272+
private AtomicBoolean cancelled = new AtomicBoolean(false);
273+
274+
275+
RateLimiterSingleObserver(SingleObserver<? super T> childObserver) {
276+
this.childObserver = childObserver;
277+
}
278+
279+
/**
280+
* {@inheritDoc}
281+
*/
282+
@Override
283+
public void onSubscribe(Disposable disposable) {
284+
this.disposable = disposable;
285+
if (LOG.isDebugEnabled()) {
286+
LOG.info("onSubscribe");
287+
}
288+
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
289+
childObserver.onSubscribe(this);
290+
} else {
291+
disposable.dispose();
292+
childObserver.onSubscribe(this);
293+
childObserver.onError(new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()));
294+
}
295+
}
296+
297+
/**
298+
* {@inheritDoc}
299+
*/
300+
@Override
301+
public void onError(Throwable e) {
302+
if (LOG.isDebugEnabled()) {
303+
LOG.info("onError", e);
304+
}
305+
if (!isDisposed()) {
306+
childObserver.onError(e);
307+
}
308+
}
309+
310+
/**
311+
* {@inheritDoc}
312+
*/
313+
@Override
314+
public void onSuccess(T value) {
315+
if (LOG.isDebugEnabled()) {
316+
LOG.info("onComplete");
317+
}
318+
if (!isDisposed()) {
319+
childObserver.onSuccess(value);
320+
}
321+
}
322+
323+
/**
324+
* {@inheritDoc}
325+
*/
326+
@Override
327+
public void dispose() {
328+
if (!cancelled.get()) {
329+
cancelled.set(true);
330+
disposable.dispose();
331+
}
332+
}
333+
334+
/**
335+
* {@inheritDoc}
336+
*/
337+
@Override
338+
public boolean isDisposed() {
339+
return cancelled.get();
340+
}
341+
}
342+
}

0 commit comments

Comments
 (0)