Skip to content

Commit 84d5967

Browse files
authored
Add Quota prefetch (istio#53)
* Add quota prefetch code. * fix format * Count inflight requests. * fix format
1 parent c45f937 commit 84d5967

9 files changed

+804
-22
lines changed

mixerclient/prefetch/BUILD

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ licenses(["notice"])
1717
cc_library(
1818
name = "quota_prefetch_lib",
1919
srcs = [
20+
"circular_queue.h",
21+
"quota_prefetch.cc",
2022
"time_based_counter.cc",
23+
"time_based_counter.h",
2124
],
2225
hdrs = [
23-
"circular_queue.h",
24-
"time_based_counter.h",
26+
"quota_prefetch.h",
2527
],
2628
visibility = ["//visibility:public"],
2729
)
@@ -55,3 +57,18 @@ cc_test(
5557
"//external:googletest_main",
5658
],
5759
)
60+
61+
cc_test(
62+
name = "quota_prefetch_test",
63+
size = "small",
64+
srcs = ["quota_prefetch_test.cc"],
65+
linkopts = [
66+
"-lm",
67+
"-lpthread",
68+
],
69+
linkstatic = 1,
70+
deps = [
71+
":quota_prefetch_lib",
72+
"//external:googletest_main",
73+
],
74+
)

mixerclient/prefetch/circular_queue.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ class CircularQueue {
3636
void Pop();
3737

3838
// Allow modifying the head item.
39-
T* Mutable_Head();
39+
T* Head();
4040

4141
// Calls the fn function for each element from head to tail.
42-
void Iterate(std::function<bool(const T&)> fn) const;
42+
void Iterate(std::function<bool(T&)> fn);
4343

4444
private:
4545
std::vector<T> nodes_;
@@ -76,13 +76,13 @@ void CircularQueue<T>::Pop() {
7676
}
7777

7878
template <class T>
79-
T* CircularQueue<T>::Mutable_Head() {
79+
T* CircularQueue<T>::Head() {
8080
if (count_ == 0) return nullptr;
8181
return &nodes_[head_];
8282
}
8383

8484
template <class T>
85-
void CircularQueue<T>::Iterate(std::function<bool(const T&)> fn) const {
85+
void CircularQueue<T>::Iterate(std::function<bool(T&)> fn) {
8686
if (count_ == 0) return;
8787
int i = head_;
8888
while (i != tail_) {

mixerclient/prefetch/circular_queue_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ namespace istio {
2020
namespace mixer_client {
2121
namespace {
2222

23-
void ASSERT_RESULT(const CircularQueue<int>& q,
24-
const std::vector<int>& expected) {
23+
void ASSERT_RESULT(CircularQueue<int>& q, const std::vector<int>& expected) {
2524
std::vector<int> v;
26-
q.Iterate([&](const int& i) -> bool {
25+
q.Iterate([&](int& i) -> bool {
2726
v.push_back(i);
2827
return true;
2928
});
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
/* Copyright 2017 Istio Authors. All Rights Reserved.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#include "quota_prefetch.h"
17+
#include "circular_queue.h"
18+
#include "time_based_counter.h"
19+
20+
#include <mutex>
21+
22+
using namespace std::chrono;
23+
24+
// Turn this on to debug for quota_prefetch_test.cc
25+
// Not for debugging in production.
26+
#if 0
27+
#include <iostream>
28+
#define LOG(t) \
29+
std::cerr << "(" \
30+
<< duration_cast<milliseconds>(t.time_since_epoch()).count() \
31+
<< "):"
32+
#else
33+
// Pipe to stringstream to disable logging.
34+
#include <sstream>
35+
std::ostringstream os;
36+
#define LOG(t) \
37+
os.clear(); \
38+
os
39+
#endif
40+
41+
namespace istio {
42+
namespace mixer_client {
43+
namespace {
44+
45+
// Default predict window size in milliseconds.
46+
const int kPredictWindowInMs = 1000;
47+
48+
// Default min prefetch amount.
49+
const int kMinPrefetchAmount = 10;
50+
51+
// Default close wait window in milliseconds.
52+
const int kCloseWaitWindowInMs = 500;
53+
54+
// Initiail Circular Queue size for prefetch pool.
55+
const int kInitQueueSize = 10;
56+
57+
// TimeBasedCounter window size
58+
const int kTimeBasedWindowSize = 20;
59+
60+
// Maximum expiration for prefetch amount.
61+
// It is only used when a prefetch amount is added to the pool
62+
// before it is granted. Usually is 1 minute.
63+
const int kMaxExpirationInMs = 60000;
64+
65+
// The implementation class to hide internal implementation detail.
66+
class QuotaPrefetchImpl : public QuotaPrefetch {
67+
public:
68+
// The slot id type.
69+
typedef uint64_t SlotId;
70+
71+
// The struture to store granted amount.
72+
struct Slot {
73+
// available amount
74+
int available;
75+
// the time the amount will be expired.
76+
Tick expire_time;
77+
// the always increment ID to detect if a Slot has been re-cycled.
78+
SlotId id;
79+
};
80+
81+
// The mode.
82+
enum Mode {
83+
OPEN = 0,
84+
CLOSE,
85+
};
86+
87+
QuotaPrefetchImpl(TransportFunc transport, const Options& options, Tick t)
88+
: queue_(kInitQueueSize),
89+
counter_(kTimeBasedWindowSize, options.predict_window, t),
90+
mode_(OPEN),
91+
inflight_count_(0),
92+
transport_(transport),
93+
options_(options),
94+
next_slot_id_(0) {}
95+
96+
bool Check(int amount, Tick t) override;
97+
98+
private:
99+
// Count available token
100+
int CountAvailable(Tick t);
101+
// Check available count is bigger than minimum
102+
int CheckMinAvailable(int min, Tick t);
103+
// Check to see if need to do a prefetch.
104+
void AttemptPrefetch(int amount, Tick t);
105+
// Make a prefetch call.
106+
void Prefetch(int req_amount, bool use_not_granted, Tick t);
107+
// Add the amount to the queue, and return slot id.
108+
SlotId Add(int amount, Tick expiration);
109+
// Substract the amount from the queue.
110+
// Return the amount that could not be substracted.
111+
int Substract(int delta, Tick t);
112+
// On quota allocation response.
113+
void OnResponse(SlotId slot_id, int req_amount, int resp_amount,
114+
milliseconds expiration, Tick t);
115+
// Find the slot by id.
116+
Slot* FindSlotById(SlotId id);
117+
118+
// The mutex guarding all member variables.
119+
std::mutex mutex_;
120+
// The FIFO queue to store prefetched amount.
121+
CircularQueue<Slot> queue_;
122+
// The counter to count number of requests in the pass window.
123+
TimeBasedCounter counter_;
124+
// The current mode.
125+
Mode mode_;
126+
// Last prefetch time.
127+
Tick last_prefetch_time_;
128+
// inflight request count;
129+
int inflight_count_;
130+
// The transport to allocate quota.
131+
TransportFunc transport_;
132+
// Save the options.
133+
Options options_;
134+
// next slot id
135+
SlotId next_slot_id_;
136+
};
137+
138+
int QuotaPrefetchImpl::CountAvailable(Tick t) {
139+
int avail = 0;
140+
queue_.Iterate([&](Slot& slot) -> bool {
141+
if (t < slot.expire_time) {
142+
avail += slot.available;
143+
}
144+
return true;
145+
});
146+
return avail;
147+
}
148+
149+
int QuotaPrefetchImpl::CheckMinAvailable(int min, Tick t) {
150+
int avail = 0;
151+
queue_.Iterate([&](Slot& slot) -> bool {
152+
if (t < slot.expire_time) {
153+
avail += slot.available;
154+
if (avail >= min) return false;
155+
}
156+
return true;
157+
});
158+
return avail >= min;
159+
}
160+
161+
void QuotaPrefetchImpl::AttemptPrefetch(int amount, Tick t) {
162+
if (mode_ == CLOSE && (inflight_count_ > 0 ||
163+
(duration_cast<milliseconds>(t - last_prefetch_time_) <
164+
options_.close_wait_window))) {
165+
return;
166+
}
167+
168+
int avail = CountAvailable(t);
169+
int pass_count = counter_.Count(t);
170+
int desired = std::max(pass_count, options_.min_prefetch_amount);
171+
if ((avail < desired / 2 && inflight_count_ == 0) || avail < amount) {
172+
bool use_not_granted = (avail == 0 && mode_ == OPEN);
173+
Prefetch(std::max(amount, desired), use_not_granted, t);
174+
}
175+
}
176+
177+
void QuotaPrefetchImpl::Prefetch(int req_amount, bool use_not_granted, Tick t) {
178+
SlotId slot_id = 0;
179+
if (use_not_granted) {
180+
// add the prefetch amount to available queue before it is granted.
181+
slot_id = Add(req_amount, t + milliseconds(kMaxExpirationInMs));
182+
}
183+
184+
LOG(t) << "Prefetch: " << req_amount << ", id: " << slot_id << std::endl;
185+
186+
last_prefetch_time_ = t;
187+
++inflight_count_;
188+
transport_(req_amount,
189+
[this, slot_id, req_amount](int resp_amount,
190+
milliseconds expiration, Tick t1) {
191+
OnResponse(slot_id, req_amount, resp_amount, expiration, t1);
192+
},
193+
t);
194+
}
195+
196+
QuotaPrefetchImpl::Slot* QuotaPrefetchImpl::FindSlotById(SlotId id) {
197+
Slot* found = nullptr;
198+
queue_.Iterate([&](Slot& slot) -> bool {
199+
if (slot.id == id) {
200+
found = &slot;
201+
return false;
202+
}
203+
return true;
204+
});
205+
return found;
206+
}
207+
208+
QuotaPrefetchImpl::SlotId QuotaPrefetchImpl::Add(int amount, Tick expire_time) {
209+
SlotId id = ++next_slot_id_;
210+
queue_.Push(Slot{amount, expire_time, id});
211+
return id;
212+
}
213+
214+
int QuotaPrefetchImpl::Substract(int delta, Tick t) {
215+
Slot* n = queue_.Head();
216+
while (n != nullptr && delta > 0) {
217+
if (t < n->expire_time) {
218+
if (n->available > 0) {
219+
int d = std::min(n->available, delta);
220+
n->available -= d;
221+
delta -= d;
222+
}
223+
if (n->available > 0) {
224+
return 0;
225+
}
226+
} else {
227+
if (n->available > 0) {
228+
LOG(t) << "Expired:" << n->available << std::endl;
229+
}
230+
}
231+
queue_.Pop();
232+
n = queue_.Head();
233+
}
234+
return delta;
235+
}
236+
237+
void QuotaPrefetchImpl::OnResponse(SlotId slot_id, int req_amount,
238+
int resp_amount, milliseconds expiration,
239+
Tick t) {
240+
std::lock_guard<std::mutex> lock(mutex_);
241+
--inflight_count_;
242+
243+
LOG(t) << "OnResponse: req:" << req_amount << ", resp: " << resp_amount
244+
<< ", expire: " << expiration.count() << ", id: " << slot_id
245+
<< std::endl;
246+
247+
Slot* slot = nullptr;
248+
if (slot_id != 0) {
249+
// The prefetched amount was added to the available queue
250+
slot = FindSlotById(slot_id);
251+
if (resp_amount < req_amount) {
252+
int delta = req_amount - resp_amount;
253+
// Substract it from its own request node.
254+
if (slot != nullptr) {
255+
int d = std::min(slot->available, delta);
256+
slot->available -= d;
257+
delta -= d;
258+
}
259+
if (delta > 0) {
260+
// Substract it from other prefetched amounts
261+
Substract(delta, t);
262+
}
263+
}
264+
// Adjust the expiration
265+
if (slot != nullptr && slot->available > 0) {
266+
slot->expire_time = t + expiration;
267+
}
268+
} else {
269+
// prefetched amount was NOT added to the pool yet.
270+
if (resp_amount > 0) {
271+
Add(resp_amount, t + expiration);
272+
}
273+
}
274+
275+
if (resp_amount == req_amount) {
276+
mode_ = OPEN;
277+
} else {
278+
mode_ = CLOSE;
279+
}
280+
}
281+
282+
bool QuotaPrefetchImpl::Check(int amount, Tick t) {
283+
std::lock_guard<std::mutex> lock(mutex_);
284+
285+
AttemptPrefetch(amount, t);
286+
counter_.Inc(amount, t);
287+
bool ret;
288+
if (amount == 1) {
289+
ret = Substract(amount, t) == 0;
290+
} else {
291+
ret = CheckMinAvailable(amount, t);
292+
if (ret) {
293+
Substract(amount, t);
294+
}
295+
}
296+
if (!ret) {
297+
LOG(t) << "Rejected amount: " << amount << std::endl;
298+
}
299+
return ret;
300+
}
301+
302+
} // namespace
303+
304+
// Constructor with default values.
305+
QuotaPrefetch::Options::Options()
306+
: predict_window(kPredictWindowInMs),
307+
min_prefetch_amount(kMinPrefetchAmount),
308+
close_wait_window(kCloseWaitWindowInMs) {}
309+
310+
std::unique_ptr<QuotaPrefetch> QuotaPrefetch::Create(TransportFunc transport,
311+
const Options& options,
312+
Tick t) {
313+
return std::unique_ptr<QuotaPrefetch>(
314+
new QuotaPrefetchImpl(transport, options, t));
315+
}
316+
317+
} // namespace mixer_client
318+
} // namespace istio

0 commit comments

Comments
 (0)