Skip to content

Commit 8b07f08

Browse files
committed
Issue ReactiveX#12 speed optimisations and cleanup
1 parent ceb75fe commit 8b07f08

File tree

4 files changed

+118
-151
lines changed

4 files changed

+118
-151
lines changed

src/jmh/java/io/github/robwin/circuitbreaker/CircuitBreakerBenchmark.java

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,55 @@
1818
*/
1919
package io.github.robwin.circuitbreaker;
2020

21-
//@State(Scope.Benchmark)
22-
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
23-
//@BenchmarkMode(Mode.Throughput)
24-
//public class CircuitBreakerBenchmark {
25-
//
26-
// private CircuitBreaker circuitBreaker;
27-
// private Supplier<String> supplier;
28-
// private static final int ITERATION_COUNT = 10;
29-
// private static final int WARMUP_COUNT = 10;
30-
// private static final int THREAD_COUNT = 10;
31-
//
32-
// @Setup
33-
// public void setUp() {
34-
// CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom()
35-
// .failureRateThreshold(1)
36-
// .waitDurationInOpenState(Duration.ofSeconds(1))
37-
// .build());
38-
// circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");
39-
//
40-
// supplier = CircuitBreaker.decorateSupplier(() -> {
41-
// try {
42-
// Thread.sleep(100);
43-
// } catch (InterruptedException e) {
44-
// e.printStackTrace();
45-
// }
46-
// return "Hello Benchmark";
47-
// }, circuitBreaker);
48-
// }
49-
//
50-
// @Benchmark
51-
// @Threads(value = THREAD_COUNT)
52-
// @Warmup(iterations = WARMUP_COUNT)
53-
// @Measurement(iterations = ITERATION_COUNT)
54-
// public String invokeSupplier(){
55-
// return supplier.get();
56-
// }
57-
//}
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Measurement;
24+
import org.openjdk.jmh.annotations.Mode;
25+
import org.openjdk.jmh.annotations.OutputTimeUnit;
26+
import org.openjdk.jmh.annotations.Scope;
27+
import org.openjdk.jmh.annotations.Setup;
28+
import org.openjdk.jmh.annotations.State;
29+
import org.openjdk.jmh.annotations.Threads;
30+
import org.openjdk.jmh.annotations.Warmup;
31+
32+
import java.time.Duration;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Supplier;
35+
36+
@State(Scope.Benchmark)
37+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
38+
@BenchmarkMode(Mode.Throughput)
39+
public class CircuitBreakerBenchmark {
40+
41+
private static final int ITERATION_COUNT = 10;
42+
private static final int WARMUP_COUNT = 10;
43+
private static final int THREAD_COUNT = 10;
44+
private CircuitBreaker circuitBreaker;
45+
private Supplier<String> supplier;
46+
47+
@Setup
48+
public void setUp() {
49+
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom()
50+
.failureRateThreshold(1)
51+
.waitDurationInOpenState(Duration.ofSeconds(1))
52+
.build());
53+
circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");
54+
55+
supplier = CircuitBreaker.decorateSupplier(() -> {
56+
try {
57+
Thread.sleep(100);
58+
} catch (InterruptedException e) {
59+
e.printStackTrace();
60+
}
61+
return "Hello Benchmark";
62+
}, circuitBreaker);
63+
}
64+
65+
@Benchmark
66+
@Threads(value = THREAD_COUNT)
67+
@Warmup(iterations = WARMUP_COUNT)
68+
@Measurement(iterations = ITERATION_COUNT)
69+
public String invokeSupplier() {
70+
return supplier.get();
71+
}
72+
}

src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java

Lines changed: 6 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,16 @@
2323

2424
@State(Scope.Benchmark)
2525
@OutputTimeUnit(TimeUnit.MICROSECONDS)
26-
@BenchmarkMode(Mode.AverageTime)
26+
@BenchmarkMode(Mode.All)
2727
public class RateLimiterBenchmark {
2828

2929
public static final int FORK_COUNT = 2;
3030
private static final int WARMUP_COUNT = 10;
31-
private static final int ITERATION_COUNT = 5;
31+
private static final int ITERATION_COUNT = 10;
3232
private static final int THREAD_COUNT = 2;
3333

3434
private RateLimiter semaphoreBasedRateLimiter;
3535
private AtomicRateLimiter atomicRateLimiter;
36-
private AtomicRateLimiter.State state;
37-
private static final Object mutex = new Object();
3836

3937
private Supplier<String> semaphoreGuardedSupplier;
4038
private Supplier<String> atomicGuardedSupplier;
@@ -48,7 +46,6 @@ public void setUp() {
4846
.build();
4947
semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig);
5048
atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig);
51-
state = atomicRateLimiter.state.get();
5249

5350
Supplier<String> stringSupplier = () -> {
5451
Blackhole.consumeCPU(1);
@@ -63,104 +60,16 @@ public void setUp() {
6360
@Warmup(iterations = WARMUP_COUNT)
6461
@Fork(value = FORK_COUNT)
6562
@Measurement(iterations = ITERATION_COUNT)
66-
public void calculateNextState(Blackhole bh) {
67-
AtomicRateLimiter.State next = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), this.state);
68-
bh.consume(next);
63+
public String semaphoreBasedPermission() {
64+
return semaphoreGuardedSupplier.get();
6965
}
7066

7167
@Benchmark
7268
@Threads(value = THREAD_COUNT)
7369
@Warmup(iterations = WARMUP_COUNT)
7470
@Fork(value = FORK_COUNT)
7571
@Measurement(iterations = ITERATION_COUNT)
76-
public void nanosToWaitForPermission(Blackhole bh) {
77-
long next = atomicRateLimiter.nanosToWaitForPermission(1, 315L, 31L);
78-
bh.consume(next);
72+
public String atomicPermission() {
73+
return atomicGuardedSupplier.get();
7974
}
80-
81-
@Benchmark
82-
@Threads(value = THREAD_COUNT)
83-
@Warmup(iterations = WARMUP_COUNT)
84-
@Fork(value = FORK_COUNT)
85-
@Measurement(iterations = ITERATION_COUNT)
86-
public void reservePermissions(Blackhole bh) {
87-
AtomicRateLimiter.State next = atomicRateLimiter.reservePermissions(0L, 31L, 1, 0L);
88-
bh.consume(next);
89-
}
90-
91-
@Benchmark
92-
@Threads(value = THREAD_COUNT)
93-
@Warmup(iterations = WARMUP_COUNT)
94-
@Fork(value = FORK_COUNT)
95-
@Measurement(iterations = ITERATION_COUNT)
96-
public void currentNanoTime(Blackhole bh) {
97-
long next = atomicRateLimiter.currentNanoTime();
98-
bh.consume(next);
99-
}
100-
101-
// @Benchmark
102-
// @Threads(value = THREAD_COUNT)
103-
// @Warmup(iterations = WARMUP_COUNT)
104-
// @Fork(value = FORK_COUNT)
105-
// @Measurement(iterations = ITERATION_COUNT)
106-
// public void mutex(Blackhole bh) {
107-
// synchronized (mutex) {
108-
// state = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state);
109-
// }
110-
// }
111-
//
112-
// @Benchmark
113-
// @Threads(value = THREAD_COUNT)
114-
// @Warmup(iterations = WARMUP_COUNT)
115-
// @Fork(value = FORK_COUNT)
116-
// @Measurement(iterations = ITERATION_COUNT)
117-
// public void atomic(Blackhole bh) {
118-
// atomicRateLimiter.state.updateAndGet(state -> {
119-
// return atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state);
120-
// });
121-
// }
122-
//
123-
// @Benchmark
124-
// @Threads(value = THREAD_COUNT)
125-
// @Warmup(iterations = WARMUP_COUNT)
126-
// @Fork(value = FORK_COUNT)
127-
// @Measurement(iterations = ITERATION_COUNT)
128-
// public void atomicBackOf(Blackhole bh) {
129-
// AtomicRateLimiter.State prev;
130-
// AtomicRateLimiter.State next;
131-
// do {
132-
// prev = atomicRateLimiter.state.get();
133-
// next = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), prev);
134-
// } while (!compareAndSet(prev, next));
135-
// }
136-
//
137-
// /*
138-
// https://arxiv.org/abs/1305.5800 https://dzone.com/articles/wanna-get-faster-wait-bit
139-
// */
140-
// public boolean compareAndSet(final AtomicRateLimiter.State current, final AtomicRateLimiter.State next) {
141-
// if (atomicRateLimiter.state.compareAndSet(current, next)) {
142-
// return true;
143-
// } else {
144-
// LockSupport.parkNanos(1);
145-
// return false;
146-
// }
147-
// }
148-
//
149-
// @Benchmark
150-
// @Threads(value = THREAD_COUNT)
151-
// @Warmup(iterations = WARMUP_COUNT)
152-
// @Fork(value = FORK_COUNT)
153-
// @Measurement(iterations = ITERATION_COUNT)
154-
// public String semaphoreBasedPermission() {
155-
// return semaphoreGuardedSupplier.get();
156-
// }
157-
//
158-
// @Benchmark
159-
// @Threads(value = THREAD_COUNT)
160-
// @Warmup(iterations = WARMUP_COUNT)
161-
// @Fork(value = FORK_COUNT)
162-
// @Measurement(iterations = ITERATION_COUNT)
163-
// public String atomicPermission() {
164-
// return atomicGuardedSupplier.get();
165-
// }
16675
}

src/jmh/java/javaslang/circuitbreaker/stateCalculationBenchmark.txt

Lines changed: 0 additions & 5 deletions
This file was deleted.

src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.time.Duration;
1212
import java.util.concurrent.atomic.AtomicInteger;
1313
import java.util.concurrent.atomic.AtomicReference;
14+
import java.util.function.UnaryOperator;
1415

1516
/**
1617
* {@link AtomicRateLimiter} splits all nanoseconds from the start of epoch into cycles.
@@ -31,7 +32,7 @@ public class AtomicRateLimiter implements RateLimiter {
3132
private final long cyclePeriodInNanos;
3233
private final int permissionsPerCycle;
3334
private final AtomicInteger waitingThreads;
34-
public final AtomicReference<State> state;
35+
private final AtomicReference<State> state;
3536

3637

3738
public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
@@ -51,12 +52,57 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
5152
@Override
5253
public boolean getPermission(final Duration timeoutDuration) {
5354
long timeoutInNanos = timeoutDuration.toNanos();
54-
State modifiedState = state.updateAndGet(
55-
activeState -> calculateNextState(timeoutInNanos, activeState)
56-
);
55+
State modifiedState = updateStateWithBackOff(timeoutInNanos);
5756
return waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
5857
}
5958

59+
/**
60+
* Atomically updates the current {@link State} with the results of
61+
* applying the {@link AtomicRateLimiter#calculateNextState}, returning the updated {@link State}.
62+
* It differs from {@link AtomicReference#updateAndGet(UnaryOperator)} by constant back off.
63+
* It means that after one try to {@link AtomicReference#compareAndSet(Object, Object)}
64+
* this method will wait for a while before try one more time.
65+
* This technique was originally described in this
66+
* <a href="https://arxiv.org/abs/1305.5800"> paper</a>
67+
* and showed great results with {@link AtomicRateLimiter} in benchmark tests.
68+
*
69+
* @param timeoutInNanos a side-effect-free function
70+
* @return the updated value
71+
*/
72+
private State updateStateWithBackOff(final long timeoutInNanos) {
73+
AtomicRateLimiter.State prev;
74+
AtomicRateLimiter.State next;
75+
do {
76+
prev = state.get();
77+
next = calculateNextState(timeoutInNanos, prev);
78+
} while (!compareAndSet(prev, next));
79+
return next;
80+
}
81+
82+
/**
83+
* Atomically sets the value to the given updated value
84+
* if the current value {@code ==} the expected value.
85+
* It differs from {@link AtomicReference#updateAndGet(UnaryOperator)} by constant back off.
86+
* It means that after one try to {@link AtomicReference#compareAndSet(Object, Object)}
87+
* this method will wait for a while before try one more time.
88+
* This technique was originally described in this
89+
* <a href="https://arxiv.org/abs/1305.5800"> paper</a>
90+
* and showed great results with {@link AtomicRateLimiter} in benchmark tests.
91+
*
92+
* @param current the expected value
93+
* @param next the new value
94+
* @return {@code true} if successful. False return indicates that
95+
* the actual value was not equal to the expected value.
96+
*/
97+
private boolean compareAndSet(final State current, final State next) {
98+
if (state.compareAndSet(current, next)) {
99+
return true;
100+
} else {
101+
parkNanos(1); // back-off
102+
return false;
103+
}
104+
}
105+
60106
/**
61107
* A side-effect-free function that can calculate next {@link State} from current.
62108
* It determines time duration that you should wait for permission and reserves it for you,
@@ -66,7 +112,7 @@ public boolean getPermission(final Duration timeoutDuration) {
66112
* @param activeState current state of {@link AtomicRateLimiter}
67113
* @return next {@link State}
68114
*/
69-
public State calculateNextState(final long timeoutInNanos, final State activeState) {
115+
private State calculateNextState(final long timeoutInNanos, final State activeState) {
70116
long currentNanos = currentNanoTime();
71117
long currentCycle = currentNanos / cyclePeriodInNanos;
72118

@@ -93,7 +139,7 @@ public State calculateNextState(final long timeoutInNanos, final State activeSta
93139
* @param currentCycle current {@link AtomicRateLimiter} cycle
94140
* @return nanoseconds to wait for the next permission
95141
*/
96-
public long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) {
142+
private long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) {
97143
if (availablePermissions > 0) {
98144
return 0L;
99145
} else {
@@ -114,7 +160,7 @@ public long nanosToWaitForPermission(final int availablePermissions, final long
114160
* @param nanosToWait nanoseconds to wait for the next permission
115161
* @return new {@link State} with possibly reserved permissions and time to wait
116162
*/
117-
public State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) {
163+
private State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) {
118164
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
119165
int permissionsWithReservation = permissions;
120166
if (canAcquireInTime) {
@@ -130,7 +176,7 @@ public State reservePermissions(final long timeoutInNanos, final long cycle, fin
130176
* @param nanosToWait nanoseconds caller need to wait
131177
* @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and not exceed timeout
132178
*/
133-
public boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
179+
private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
134180
boolean canAcquireImmediately = nanosToWait <= 0;
135181
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
136182

@@ -153,7 +199,7 @@ public boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lon
153199
* @param nanosToWait nanoseconds caller need to wait
154200
* @return true if caller was not {@link Thread#interrupted} while waiting
155201
*/
156-
public boolean waitForPermission(final long nanosToWait) {
202+
private boolean waitForPermission(final long nanosToWait) {
157203
waitingThreads.incrementAndGet();
158204
long deadline = currentNanoTime() + nanosToWait;
159205
boolean wasInterrupted = false;
@@ -207,13 +253,14 @@ public AtomicRateLimiterMetrics getMetrics() {
207253
* the last {@link AtomicRateLimiter#getPermission(Duration)} call.</li>
208254
* </ul>
209255
*/
210-
public static class State {
256+
private static class State {
211257

212258
private final long activeCycle;
259+
213260
private final int activePermissions;
214261
private final long nanosToWait;
215262

216-
public State(final long activeCycle, final int activePermissions, final long nanosToWait) {
263+
private State(final long activeCycle, final int activePermissions, final long nanosToWait) {
217264
this.activeCycle = activeCycle;
218265
this.activePermissions = activePermissions;
219266
this.nanosToWait = nanosToWait;
@@ -259,12 +306,13 @@ public long getAvailablePermissions() {
259306
State estimatedState = calculateNextState(-1, currentState);
260307
return estimatedState.activePermissions;
261308
}
309+
262310
}
263311

264312
/**
265313
* Created only for test purposes. Simply calls {@link System#nanoTime()}
266314
*/
267-
public long currentNanoTime() {
315+
private long currentNanoTime() {
268316
return nanoTime();
269317
}
270318
}

0 commit comments

Comments
 (0)