Skip to content

Commit dae7ed1

Browse files
authored
Reimplement the concurrency limiter middleware to use the new abstractions & implementations #38306 (#39040)
1 parent c76e108 commit dae7ed1

File tree

7 files changed

+172
-364
lines changed

7 files changed

+172
-364
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Concurrent;
5+
using System.Threading.RateLimiting;
6+
using Microsoft.Extensions.Options;
7+
using Limiter = System.Threading.RateLimiting.ConcurrencyLimiter;
8+
using LimiterOptions = System.Threading.RateLimiting.ConcurrencyLimiterOptions;
9+
10+
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
11+
12+
internal class BasePolicy : IQueuePolicy, IDisposable
13+
{
14+
private readonly Limiter _limiter;
15+
private readonly ConcurrentQueue<RateLimitLease> _leases = new ConcurrentQueue<RateLimitLease>();
16+
17+
public int TotalRequests => _leases.Count;
18+
19+
public BasePolicy(IOptions<QueuePolicyOptions> options, QueueProcessingOrder order)
20+
{
21+
var queuePolicyOptions = options.Value;
22+
23+
var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
24+
if (maxConcurrentRequests <= 0)
25+
{
26+
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
27+
}
28+
29+
var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
30+
if (requestQueueLimit < 0)
31+
{
32+
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
33+
}
34+
35+
_limiter = new Limiter(new LimiterOptions(permitLimit: maxConcurrentRequests, order, queueLimit: requestQueueLimit));
36+
}
37+
38+
public ValueTask<bool> TryEnterAsync()
39+
{
40+
// a return value of 'false' indicates that the request is rejected
41+
// a return value of 'true' indicates that the request may proceed
42+
43+
var lease = _limiter.Acquire();
44+
if (lease.IsAcquired)
45+
{
46+
_leases.Enqueue(lease);
47+
return ValueTask.FromResult(true);
48+
}
49+
50+
var task = _limiter.WaitAsync();
51+
if (task.IsCompletedSuccessfully)
52+
{
53+
lease = task.Result;
54+
if (lease.IsAcquired)
55+
{
56+
_leases.Enqueue(lease);
57+
return ValueTask.FromResult(true);
58+
}
59+
60+
return ValueTask.FromResult(false);
61+
}
62+
63+
return Awaited(task);
64+
}
65+
66+
public void OnExit()
67+
{
68+
if (!_leases.TryDequeue(out var lease))
69+
{
70+
throw new InvalidOperationException("No outstanding leases.");
71+
}
72+
73+
lease.Dispose();
74+
}
75+
76+
public void Dispose()
77+
{
78+
_limiter.Dispose();
79+
}
80+
81+
private async ValueTask<bool> Awaited(ValueTask<RateLimitLease> task)
82+
{
83+
var lease = await task;
84+
85+
if (lease.IsAcquired)
86+
{
87+
_leases.Enqueue(lease);
88+
return true;
89+
}
90+
91+
return false;
92+
}
93+
}
Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,15 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Threading.RateLimiting;
45
using Microsoft.Extensions.Options;
56

67
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
78

8-
internal class QueuePolicy : IQueuePolicy, IDisposable
9+
internal class QueuePolicy : BasePolicy
910
{
10-
private readonly int _maxTotalRequest;
11-
private readonly SemaphoreSlim _serverSemaphore;
12-
13-
private int _totalRequests;
14-
15-
public int TotalRequests => _totalRequests;
16-
1711
public QueuePolicy(IOptions<QueuePolicyOptions> options)
12+
: base(options, QueueProcessingOrder.OldestFirst)
1813
{
19-
var queuePolicyOptions = options.Value;
20-
21-
var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
22-
if (maxConcurrentRequests <= 0)
23-
{
24-
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
25-
}
26-
27-
var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
28-
if (requestQueueLimit < 0)
29-
{
30-
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
31-
}
32-
33-
_serverSemaphore = new SemaphoreSlim(maxConcurrentRequests);
34-
35-
_maxTotalRequest = maxConcurrentRequests + requestQueueLimit;
36-
}
37-
38-
public ValueTask<bool> TryEnterAsync()
39-
{
40-
// a return value of 'false' indicates that the request is rejected
41-
// a return value of 'true' indicates that the request may proceed
42-
// _serverSemaphore.Release is *not* called in this method, it is called externally when requests leave the server
43-
44-
int totalRequests = Interlocked.Increment(ref _totalRequests);
45-
46-
if (totalRequests > _maxTotalRequest)
47-
{
48-
Interlocked.Decrement(ref _totalRequests);
49-
return new ValueTask<bool>(false);
50-
}
51-
52-
Task task = _serverSemaphore.WaitAsync();
53-
if (task.IsCompletedSuccessfully)
54-
{
55-
return new ValueTask<bool>(true);
56-
}
57-
58-
return SemaphoreAwaited(task);
59-
}
60-
61-
public void OnExit()
62-
{
63-
_serverSemaphore.Release();
64-
65-
Interlocked.Decrement(ref _totalRequests);
66-
}
67-
68-
public void Dispose()
69-
{
70-
_serverSemaphore.Dispose();
71-
}
72-
73-
private static async ValueTask<bool> SemaphoreAwaited(Task task)
74-
{
75-
await task;
76-
77-
return true;
7814
}
7915
}

src/Middleware/ConcurrencyLimiter/src/QueuePolicies/ResettableBooleanCompletionSource.cs

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 3 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,15 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Threading.RateLimiting;
45
using Microsoft.Extensions.Options;
56

67
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
78

8-
internal class StackPolicy : IQueuePolicy
9+
internal class StackPolicy : BasePolicy
910
{
10-
private readonly List<ResettableBooleanCompletionSource> _buffer;
11-
public ResettableBooleanCompletionSource? _cachedResettableTCS;
12-
13-
private readonly int _maxQueueCapacity;
14-
private readonly int _maxConcurrentRequests;
15-
private bool _hasReachedCapacity;
16-
private int _head;
17-
private int _queueLength;
18-
19-
private readonly object _bufferLock = new Object();
20-
21-
private int _freeServerSpots;
22-
2311
public StackPolicy(IOptions<QueuePolicyOptions> options)
12+
: base(options, QueueProcessingOrder.NewestFirst)
2413
{
25-
_buffer = new List<ResettableBooleanCompletionSource>();
26-
_maxQueueCapacity = options.Value.RequestQueueLimit;
27-
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
28-
_freeServerSpots = options.Value.MaxConcurrentRequests;
29-
}
30-
31-
public ValueTask<bool> TryEnterAsync()
32-
{
33-
lock (_bufferLock)
34-
{
35-
if (_freeServerSpots > 0)
36-
{
37-
_freeServerSpots--;
38-
return new ValueTask<bool>(true);
39-
}
40-
41-
// if queue is full, cancel oldest request
42-
if (_queueLength == _maxQueueCapacity)
43-
{
44-
_hasReachedCapacity = true;
45-
_buffer[_head].Complete(false);
46-
_queueLength--;
47-
}
48-
49-
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
50-
_cachedResettableTCS = null;
51-
52-
if (_hasReachedCapacity || _queueLength < _buffer.Count)
53-
{
54-
_buffer[_head] = tcs;
55-
}
56-
else
57-
{
58-
_buffer.Add(tcs);
59-
}
60-
_queueLength++;
61-
62-
// increment _head for next time
63-
_head++;
64-
if (_head == _maxQueueCapacity)
65-
{
66-
_head = 0;
67-
}
68-
69-
return tcs.GetValueTask();
70-
}
71-
}
72-
73-
public void OnExit()
74-
{
75-
lock (_bufferLock)
76-
{
77-
if (_queueLength == 0)
78-
{
79-
_freeServerSpots++;
80-
81-
if (_freeServerSpots > _maxConcurrentRequests)
82-
{
83-
_freeServerSpots--;
84-
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
85-
}
86-
87-
return;
88-
}
89-
90-
// step backwards and launch a new task
91-
if (_head == 0)
92-
{
93-
_head = _maxQueueCapacity - 1;
94-
}
95-
else
96-
{
97-
_head--;
98-
}
99-
100-
_buffer[_head].Complete(true);
101-
_queueLength--;
102-
}
10314
}
10415
}

0 commit comments

Comments
 (0)