Skip to content

Commit eb47a8e

Browse files
committed
Issue ReactiveX#12 AtomicRateLimiter tests and additional documentation
1 parent bbfc98f commit eb47a8e

10 files changed

+286
-300
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ dependencies {
6969
testCompile "ch.qos.logback:logback-classic:0.9.26"
7070
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
7171
testCompile "org.mockito:mockito-core:1.10.19"
72+
testCompile "org.powermock:powermock:1.6.6"
73+
testCompile "org.powermock:powermock-api-mockito:1.6.6"
74+
testCompile "org.powermock:powermock-module-junit4:1.6.6"
7275
testCompile "io.projectreactor:reactor-core:2.5.0.M2"
7376
testCompile "com.jayway.awaitility:awaitility:1.7.0"
7477

src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt

Lines changed: 0 additions & 41 deletions
This file was deleted.

src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import javaslang.ratelimiter.RateLimiterConfig;
55
import javaslang.ratelimiter.internal.AtomicRateLimiter;
66
import javaslang.ratelimiter.internal.SemaphoreBasedRateLimiter;
7-
import javaslang.ratelimiter.internal.TimeBasedRateLimiter;
87
import org.openjdk.jmh.annotations.Benchmark;
98
import org.openjdk.jmh.annotations.BenchmarkMode;
109
import org.openjdk.jmh.annotations.Fork;
@@ -33,11 +32,9 @@ public class RateLimiterBenchmark {
3332
private static final int THREAD_COUNT = 2;
3433

3534
private RateLimiter semaphoreBasedRateLimiter;
36-
private RateLimiter timeBasedRateLimiter;
3735
private RateLimiter atomicRateLimiter;
3836

3937
private Supplier<String> semaphoreGuardedSupplier;
40-
private Supplier<String> timeGuardedSupplier;
4138
private Supplier<String> atomicGuardedSupplier;
4239

4340
@Setup
@@ -48,15 +45,13 @@ public void setUp() {
4845
.timeoutDuration(Duration.ofSeconds(5))
4946
.build();
5047
semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig);
51-
timeBasedRateLimiter = new TimeBasedRateLimiter("timeBased", rateLimiterConfig);
5248
atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig);
5349

5450
Supplier<String> stringSupplier = () -> {
5551
Blackhole.consumeCPU(1);
5652
return "Hello Benchmark";
5753
};
5854
semaphoreGuardedSupplier = RateLimiter.decorateSupplier(semaphoreBasedRateLimiter, stringSupplier);
59-
timeGuardedSupplier = RateLimiter.decorateSupplier(timeBasedRateLimiter, stringSupplier);
6055
atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier);
6156
}
6257

@@ -69,15 +64,6 @@ public String semaphoreBasedPermission() {
6964
return semaphoreGuardedSupplier.get();
7065
}
7166

72-
@Benchmark
73-
@Threads(value = THREAD_COUNT)
74-
@Warmup(iterations = WARMUP_COUNT)
75-
@Fork(value = FORK_COUNT)
76-
@Measurement(iterations = ITERATION_COUNT)
77-
public String timeBasedPermission() {
78-
return timeGuardedSupplier.get();
79-
}
80-
8167
@Benchmark
8268
@Threads(value = THREAD_COUNT)
8369
@Warmup(iterations = WARMUP_COUNT)

src/main/java/javaslang/ratelimiter/RateLimiterConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
public class RateLimiterConfig {
88
private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null";
99
private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null";
10-
11-
private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L); // TODO: use jmh to find real one
10+
private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L);
1211

1312
private final Duration timeoutDuration;
1413
private final Duration limitRefreshPeriod;

src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
4242
permissionsPerCycle = rateLimiterConfig.getLimitForPeriod();
4343

4444
waitingThreads = new AtomicInteger(0);
45-
long activeCycle = nanoTime() / cyclePeriodInNanos;
46-
int activePermissions = permissionsPerCycle;
47-
state = new AtomicReference<>(new State(activeCycle, activePermissions, 0));
45+
state = new AtomicReference<>(new State(0, 0, 0));
4846
}
4947

5048
/**
@@ -69,7 +67,7 @@ public boolean getPermission(final Duration timeoutDuration) {
6967
* @return next {@link State}
7068
*/
7169
private State calculateNextState(final long timeoutInNanos, final State activeState) {
72-
long currentNanos = nanoTime();
70+
long currentNanos = currentNanoTime();
7371
long currentCycle = currentNanos / cyclePeriodInNanos;
7472

7573
long nextCycle = activeState.activeCycle;
@@ -85,6 +83,7 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt
8583
return nextState;
8684
}
8785

86+
8887
/**
8988
* Calculates time to wait for next permission as
9089
* [time to the next cycle] + [duration of full cycles until reserved permissions expire]
@@ -147,19 +146,27 @@ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lo
147146

148147
/**
149148
* Parks {@link Thread} for nanosToWait.
149+
* <p>If the current thread is {@linkplain Thread#interrupted}
150+
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
151+
* but its interrupt status will be set.
150152
*
151153
* @param nanosToWait nanoseconds caller need to wait
152154
* @return true if caller was not {@link Thread#interrupted} while waiting
153155
*/
154156
private boolean waitForPermission(final long nanosToWait) {
155157
waitingThreads.incrementAndGet();
156-
long deadline = nanoTime() + nanosToWait;
157-
while (nanoTime() < deadline || currentThread().isInterrupted()) {
158-
long sleepBlockDuration = deadline - nanoTime();
158+
long deadline = currentNanoTime() + nanosToWait;
159+
boolean wasInterrupted = false;
160+
while (currentNanoTime() < deadline && !wasInterrupted) {
161+
long sleepBlockDuration = deadline - currentNanoTime();
159162
parkNanos(sleepBlockDuration);
163+
wasInterrupted = Thread.interrupted();
160164
}
161165
waitingThreads.decrementAndGet();
162-
return !currentThread().isInterrupted();
166+
if (wasInterrupted) {
167+
currentThread().interrupt();
168+
}
169+
return !wasInterrupted;
163170
}
164171

165172
/**
@@ -182,7 +189,7 @@ public RateLimiterConfig getRateLimiterConfig() {
182189
* {@inheritDoc}
183190
*/
184191
@Override
185-
public Metrics getMetrics() {
192+
public AtomicRateLimiterMetrics getMetrics() {
186193
return new AtomicRateLimiterMetrics();
187194
}
188195

@@ -201,6 +208,7 @@ public Metrics getMetrics() {
201208
* </ul>
202209
*/
203210
private static class State {
211+
204212
private final long activeCycle;
205213
private final int activePermissions;
206214
private final long nanosToWait;
@@ -210,12 +218,14 @@ public State(final long activeCycle, final int activePermissions, final long nan
210218
this.activePermissions = activePermissions;
211219
this.nanosToWait = nanosToWait;
212220
}
221+
213222
}
214223

215224
/**
216225
* Enhanced {@link Metrics} with some implementation specific details
217226
*/
218227
public final class AtomicRateLimiterMetrics implements Metrics {
228+
219229
private AtomicRateLimiterMetrics() {
220230
}
221231

@@ -228,5 +238,33 @@ private AtomicRateLimiterMetrics() {
228238
public int getNumberOfWaitingThreads() {
229239
return waitingThreads.get();
230240
}
241+
242+
/**
243+
* @return estimated time duration in nanos to wait for the next permission
244+
*/
245+
public long getNanosToWait() {
246+
State currentState = state.get();
247+
State estimatedState = calculateNextState(-1, currentState);
248+
return estimatedState.nanosToWait;
249+
}
250+
251+
/**
252+
* Estimates count of permissions available permissions.
253+
* Can be negative if some permissions where reserved.
254+
*
255+
* @return estimated count of permissions
256+
*/
257+
public long getAvailablePermissions() {
258+
State currentState = state.get();
259+
State estimatedState = calculateNextState(-1, currentState);
260+
return estimatedState.activePermissions;
261+
}
262+
}
263+
264+
/**
265+
* Created only for test purposes. Simply calls {@link System#nanoTime()}
266+
*/
267+
private long currentNanoTime() {
268+
return nanoTime();
231269
}
232270
}

0 commit comments

Comments
 (0)