Skip to content

Commit a82fd98

Browse files
zeripathlafriks6543techknowlogickwxiaoguang
authored
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <[email protected]> * Create pushback interface Signed-off-by: Andrew Thornton <[email protected]> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <[email protected]> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <[email protected]> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <[email protected]> * Wire in UI for pausing Signed-off-by: Andrew Thornton <[email protected]> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <[email protected]> * fix build Signed-off-by: Andrew Thornton <[email protected]> * prevent "race" in the test Signed-off-by: Andrew Thornton <[email protected]> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <[email protected]> * fix conflicts Signed-off-by: Andrew Thornton <[email protected]> * fix format Signed-off-by: Andrew Thornton <[email protected]> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <[email protected]> * Use StopTimer Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: Lauris BH <[email protected]> Co-authored-by: 6543 <[email protected]> Co-authored-by: techknowlogick <[email protected]> Co-authored-by: wxiaoguang <[email protected]>
1 parent 27ee01e commit a82fd98

34 files changed

+1389
-122
lines changed

modules/indexer/code/indexer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,11 @@ func Init() {
133133
// Create the Queue
134134
switch setting.Indexer.RepoType {
135135
case "bleve", "elasticsearch":
136-
handler := func(data ...queue.Data) {
136+
handler := func(data ...queue.Data) []queue.Data {
137137
idx, err := indexer.get()
138138
if idx == nil || err != nil {
139139
log.Error("Codes indexer handler: unable to get indexer!")
140-
return
140+
return data
141141
}
142142

143143
for _, datum := range data {
@@ -153,6 +153,7 @@ func Init() {
153153
continue
154154
}
155155
}
156+
return nil
156157
}
157158

158159
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})

modules/indexer/issues/indexer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) {
103103
// Create the Queue
104104
switch setting.Indexer.IssueType {
105105
case "bleve", "elasticsearch":
106-
handler := func(data ...queue.Data) {
106+
handler := func(data ...queue.Data) []queue.Data {
107107
indexer := holder.get()
108108
if indexer == nil {
109109
log.Error("Issue indexer handler: unable to get indexer!")
110-
return
110+
return data
111111
}
112112

113113
iData := make([]*IndexerData, 0, len(data))
@@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) {
127127
if err := indexer.Index(iData); err != nil {
128128
log.Error("Error whilst indexing: %v Error: %v", iData, err)
129129
}
130+
return nil
130131
}
131132

132133
issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})

modules/indexer/stats/queue.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ import (
1717
var statsQueue queue.UniqueQueue
1818

1919
// handle passed PR IDs and test the PRs
20-
func handle(data ...queue.Data) {
20+
func handle(data ...queue.Data) []queue.Data {
2121
for _, datum := range data {
2222
opts := datum.(int64)
2323
if err := indexer.Index(opts); err != nil {
2424
log.Error("stats queue indexer.Index(%d) failed: %v", opts, err)
2525
}
2626
}
27+
return nil
2728
}
2829

2930
func initStatsQueue() error {

modules/notification/ui/ui.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ func NewNotifier() base.Notifier {
3838
return ns
3939
}
4040

41-
func (ns *notificationService) handle(data ...queue.Data) {
41+
func (ns *notificationService) handle(data ...queue.Data) []queue.Data {
4242
for _, datum := range data {
4343
opts := datum.(issueNotificationOpts)
4444
if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil {
4545
log.Error("Was unable to create issue notification: %v", err)
4646
}
4747
}
48+
return nil
4849
}
4950

5051
func (ns *notificationService) Run() {

modules/queue/bytefifo.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type ByteFIFO interface {
1616
Pop(ctx context.Context) ([]byte, error)
1717
// Close this fifo
1818
Close() error
19+
// PushBack pushes data back to the top of the fifo
20+
PushBack(ctx context.Context, data []byte) error
1921
}
2022

2123
// UniqueByteFIFO defines a FIFO that Uniques its contents
@@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 {
5052
return 0
5153
}
5254

55+
// PushBack pushes data back to the top of the fifo
56+
func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error {
57+
return nil
58+
}
59+
5360
var _ UniqueByteFIFO = &DummyUniqueByteFIFO{}
5461

5562
// DummyUniqueByteFIFO represents a dummy unique fifo

modules/queue/manager.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ type Flushable interface {
5454
IsEmpty() bool
5555
}
5656

57+
// Pausable represents a pool or queue that is Pausable
58+
type Pausable interface {
59+
// IsPaused will return if the pool or queue is paused
60+
IsPaused() bool
61+
// Pause will pause the pool or queue
62+
Pause()
63+
// Resume will resume the pool or queue
64+
Resume()
65+
// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
66+
IsPausedIsResumed() (paused, resumed <-chan struct{})
67+
}
68+
5769
// ManagedPool is a simple interface to get certain details from a worker pool
5870
type ManagedPool interface {
5971
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
@@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
192204
wg.Done()
193205
continue
194206
}
207+
if pausable, ok := mq.Managed.(Pausable); ok {
208+
// no point flushing paused queues
209+
if pausable.IsPaused() {
210+
wg.Done()
211+
continue
212+
}
213+
}
214+
195215
allEmpty = false
196216
if flushable, ok := mq.Managed.(Flushable); ok {
197217
log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
215235
log.Debug("All queues are empty")
216236
break
217237
}
218-
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
238+
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
219239
// but don't delay cancellation here.
220240
select {
221241
case <-ctx.Done():
@@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can
298318
return nil
299319
}
300320

321+
// Flushable returns true if the queue is flushable
322+
func (q *ManagedQueue) Flushable() bool {
323+
_, ok := q.Managed.(Flushable)
324+
return ok
325+
}
326+
301327
// Flush flushes the queue with a timeout
302328
func (q *ManagedQueue) Flush(timeout time.Duration) error {
303329
if flushable, ok := q.Managed.(Flushable); ok {
@@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool {
315341
return true
316342
}
317343

344+
// Pausable returns whether the queue is Pausable
345+
func (q *ManagedQueue) Pausable() bool {
346+
_, ok := q.Managed.(Pausable)
347+
return ok
348+
}
349+
350+
// Pause pauses the queue
351+
func (q *ManagedQueue) Pause() {
352+
if pausable, ok := q.Managed.(Pausable); ok {
353+
pausable.Pause()
354+
}
355+
}
356+
357+
// IsPaused reveals if the queue is paused
358+
func (q *ManagedQueue) IsPaused() bool {
359+
if pausable, ok := q.Managed.(Pausable); ok {
360+
return pausable.IsPaused()
361+
}
362+
return false
363+
}
364+
365+
// Resume resumes the queue
366+
func (q *ManagedQueue) Resume() {
367+
if pausable, ok := q.Managed.(Pausable); ok {
368+
pausable.Resume()
369+
}
370+
}
371+
318372
// NumberOfWorkers returns the number of workers in the queue
319373
func (q *ManagedQueue) NumberOfWorkers() int {
320374
if pool, ok := q.Managed.(ManagedPool); ok {

modules/queue/queue.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Type string
3636
type Data interface{}
3737

3838
// HandlerFunc is a function that takes a variable amount of data and processes it
39-
type HandlerFunc func(...Data)
39+
type HandlerFunc func(...Data) (unhandled []Data)
4040

4141
// NewQueueFunc is a function that creates a queue
4242
type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
@@ -61,6 +61,12 @@ type Queue interface {
6161
Push(Data) error
6262
}
6363

64+
// PushBackable queues can be pushed back to
65+
type PushBackable interface {
66+
// PushBack pushes data back to the top of the fifo
67+
PushBack(Data) error
68+
}
69+
6470
// DummyQueueType is the type for the dummy queue
6571
const DummyQueueType Type = "dummy"
6672

0 commit comments

Comments
 (0)