Skip to content

Commit e8e87f6

Browse files
authored
Merge pull request ReactiveX#108 from resilience4j/evicting_queue_improvement
Evicting queue improvement
2 parents 7e73cf7 + d4a9c2f commit e8e87f6

File tree

7 files changed

+148
-9
lines changed

7 files changed

+148
-9
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ subprojects {
6868
testCompile ( libraries.powermock_module_junit4)
6969
testCompile ( libraries.awaitility)
7070

71-
jmh ( libraries.logback)
72-
jmh "org.openjdk.jmh:jmh-generator-annprocess:1.12"
71+
jmh "org.openjdk.jmh:jmh-core:1.18"
72+
jmh "org.openjdk.jmh:jmh-generator-annprocess:1.18"
7373
}
7474

7575
tasks.withType(JavaCompile) {

libraries.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,5 @@ ext {
6565
// Prometheus addon
6666
prometheus_simpleclient: "io.prometheus:simpleclient_common:${prometheusSimpleClientVersion}",
6767
prometheus_spring_boot: "io.prometheus:simpleclient_spring_boot:${prometheusSimpleClientVersion}"
68-
69-
7068
]
7169
}

resilience4j-circularbuffer/src/jmh/java/io/github/resilience4j/circularbuffer/CircularBufferBenchmark.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
import org.openjdk.jmh.annotations.State;
3333
import org.openjdk.jmh.annotations.Warmup;
3434
import org.openjdk.jmh.infra.Blackhole;
35+
import org.openjdk.jmh.runner.Runner;
36+
import org.openjdk.jmh.runner.RunnerException;
37+
import org.openjdk.jmh.runner.options.Options;
38+
import org.openjdk.jmh.runner.options.OptionsBuilder;
3539

3640
import java.util.concurrent.TimeUnit;
3741

@@ -100,4 +104,11 @@ public void circularBufferTakeEvent(Blackhole bh) {
100104
Option<Object> event = circularFifoBuffer.take();
101105
bh.consume(event);
102106
}
107+
108+
public static void main(String[] args) throws RunnerException {
109+
Options options = new OptionsBuilder()
110+
.include(".*" + CircularBufferBenchmark.class.getSimpleName() + ".*")
111+
.build();
112+
new Runner(options).run();
113+
}
103114
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
*
3+
* Copyright 2016 Robert Winkler and Bohdan Storozhuk
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*
18+
*/
19+
package io.github.resilience4j.circularbuffer;
20+
21+
22+
import org.openjdk.jmh.annotations.Benchmark;
23+
import org.openjdk.jmh.annotations.BenchmarkMode;
24+
import org.openjdk.jmh.annotations.Fork;
25+
import org.openjdk.jmh.annotations.Group;
26+
import org.openjdk.jmh.annotations.GroupThreads;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Scope;
31+
import org.openjdk.jmh.annotations.Setup;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.annotations.Warmup;
34+
import org.openjdk.jmh.infra.Blackhole;
35+
import org.openjdk.jmh.runner.Runner;
36+
import org.openjdk.jmh.runner.RunnerException;
37+
import org.openjdk.jmh.runner.options.Options;
38+
import org.openjdk.jmh.runner.options.OptionsBuilder;
39+
40+
import java.util.concurrent.TimeUnit;
41+
42+
/**
43+
* @author bstorozhuk
44+
*/
45+
@State(Scope.Benchmark)
46+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
47+
@BenchmarkMode(Mode.AverageTime)
48+
public class ConcurrentEvictingQueueBenchmark {
49+
public static final int FORK_COUNT = 2;
50+
private static final int WARMUP_COUNT = 10;
51+
private static final int ITERATION_COUNT = 10;
52+
private static final int CAPACITY = 10;
53+
private ConcurrentEvictingQueue<Object> queue;
54+
private Object event;
55+
56+
@Setup
57+
public void setUp() {
58+
event = new Object();
59+
queue = new ConcurrentEvictingQueue<>(CAPACITY);
60+
}
61+
62+
@Benchmark
63+
@Warmup(iterations = WARMUP_COUNT)
64+
@Fork(value = FORK_COUNT)
65+
@Measurement(iterations = ITERATION_COUNT)
66+
@Group("concurrentEvictingQueue")
67+
@GroupThreads(2)
68+
public void concurrentEvictingQueueAdd() {
69+
queue.add(event);
70+
}
71+
72+
@Benchmark
73+
@Warmup(iterations = WARMUP_COUNT)
74+
@Fork(value = FORK_COUNT)
75+
@Measurement(iterations = ITERATION_COUNT)
76+
@Group("concurrentEvictingQueue")
77+
@GroupThreads(1)
78+
public void concurrentEvictingQueueSize(Blackhole bh) {
79+
int size = queue.size();
80+
bh.consume(size);
81+
}
82+
83+
@Benchmark
84+
@Warmup(iterations = WARMUP_COUNT)
85+
@Fork(value = FORK_COUNT)
86+
@Measurement(iterations = ITERATION_COUNT)
87+
@Group("concurrentEvictingQueue")
88+
@GroupThreads(1)
89+
public void concurrentEvictingQueuePoll(Blackhole bh) {
90+
Object event = queue.poll();
91+
bh.consume(event);
92+
}
93+
94+
@Benchmark
95+
@Warmup(iterations = WARMUP_COUNT)
96+
@Fork(value = FORK_COUNT)
97+
@Measurement(iterations = ITERATION_COUNT)
98+
@Group("concurrentEvictingQueue")
99+
@GroupThreads(1)
100+
public void concurrentEvictingQueuePeek(Blackhole bh) {
101+
Object event = queue.peek();
102+
bh.consume(event);
103+
}
104+
105+
106+
public static void main(String[] args) throws RunnerException {
107+
Options options = new OptionsBuilder()
108+
.include(".*" + ConcurrentEvictingQueueBenchmark.class.getSimpleName() + ".*")
109+
.build();
110+
new Runner(options).run();
111+
}
112+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Benchmark Mode Cnt Score Error Units
2+
CircularBufferBenchmark.circularBuffer avgt 20 135.347 ± 20.444 ns/op
3+
CircularBufferBenchmark.circularBuffer:circularBufferAddEvent avgt 20 267.520 ± 55.244 ns/op
4+
CircularBufferBenchmark.circularBuffer:circularBufferSize avgt 20 9.401 ± 4.046 ns/op
5+
CircularBufferBenchmark.circularBuffer:circularBufferTakeEvent avgt 20 75.721 ± 29.743 ns/op
6+
CircularBufferBenchmark.circularBuffer:circularBufferToList avgt 20 188.747 ± 72.451 ns/op
7+
8+
Benchmark Mode Cnt Score Error Units
9+
ConcurrentEvictingQueueBenchmark.concurrentEvictingQueue avgt 20 116.028 ± 14.322 ns/op
10+
ConcurrentEvictingQueueBenchmark.concurrentEvictingQueue:concurrentEvictingQueueAdd avgt 20 179.901 ± 35.468 ns/op
11+
ConcurrentEvictingQueueBenchmark.concurrentEvictingQueue:concurrentEvictingQueuePeek avgt 20 90.970 ± 30.847 ns/op
12+
ConcurrentEvictingQueueBenchmark.concurrentEvictingQueue:concurrentEvictingQueuePoll avgt 20 123.493 ± 20.239 ns/op
13+
ConcurrentEvictingQueueBenchmark.concurrentEvictingQueue:concurrentEvictingQueueSize avgt 20 5.875 ± 1.569 ns/op

resilience4j-circularbuffer/src/main/java/io/github/resilience4j/circularbuffer/ConcurrentEvictingQueue.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.Objects.requireNonNull;
2323

2424
import java.util.AbstractQueue;
25+
import java.util.Arrays;
2526
import java.util.ConcurrentModificationException;
2627
import java.util.Iterator;
2728
import java.util.NoSuchElementException;
@@ -62,11 +63,11 @@ public class ConcurrentEvictingQueue<E> extends AbstractQueue<E> {
6263

6364
private final int maxSize;
6465
private volatile int size;
65-
private volatile int modificationsCount;
6666
private final StampedLock stampedLock;
6767
private Object[] ringBuffer;
6868
private int headIndex;
6969
private int tailIndex;
70+
private int modificationsCount;
7071

7172
public ConcurrentEvictingQueue(int capacity) {
7273
if (capacity <= 0) {
@@ -184,7 +185,7 @@ public void clear() {
184185
if (size == 0) {
185186
return null;
186187
}
187-
ringBuffer = new Object[maxSize];
188+
Arrays.fill(ringBuffer, null);
188189
size = 0;
189190
headIndex = 0;
190191
tailIndex = 0;
@@ -244,10 +245,10 @@ public <T> T[] toArray(final T[] destination) {
244245
requireNonNull(destination, ILLEGAL_DESTINATION_ARRAY);
245246

246247
Supplier<T[]> copyRingBuffer = () -> {
247-
T[] result = destination;
248248
if (size == 0) {
249-
return result;
249+
return destination;
250250
}
251+
T[] result = destination;
251252
if (destination.length < size) {
252253
result = (T[]) newInstance(result.getClass().getComponentType(), size);
253254
}
@@ -311,6 +312,9 @@ private <T> T readConcurrently(final Supplier<T> readSupplier) {
311312
long stamp;
312313
for (int i = 0; i < RETRIES; i++) {
313314
stamp = stampedLock.tryOptimisticRead();
315+
if(stamp == 0) {
316+
continue;
317+
}
314318
result = readSupplier.get();
315319
if (stampedLock.validate(stamp)) {
316320
return result;

resilience4j-ratelimiter/src/test/java/io/github/resilience4j/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public void getPermissionInterruption() throws Exception {
185185

186186
awaitImpatiently()
187187
.atMost(2, TimeUnit.SECONDS).until(thread::getState, equalTo(RUNNABLE));
188-
then(thread.isInterrupted()).isTrue();
188+
awaitImpatiently()
189+
.atMost(100, TimeUnit.MILLISECONDS).until(thread::isInterrupted);
189190

190191
ArrayList<String> eventStrings = events.get();
191192
then(eventStrings.get(0)).contains("type=SUCCESSFUL_ACQUIRE");

0 commit comments

Comments
 (0)