@@ -70,14 +70,52 @@ func (p *WorkerPool) Push(data Data) {
70
70
atomic .AddInt64 (& p .numInQueue , 1 )
71
71
p .lock .Lock ()
72
72
if p .blockTimeout > 0 && p .boostTimeout > 0 && (p .numberOfWorkers <= p .maxNumberOfWorkers || p .maxNumberOfWorkers < 0 ) {
73
- p .lock .Unlock ()
73
+ if p .numberOfWorkers == 0 {
74
+ p .zeroBoost ()
75
+ } else {
76
+ p .lock .Unlock ()
77
+ }
74
78
p .pushBoost (data )
75
79
} else {
76
80
p .lock .Unlock ()
77
81
p .dataChan <- data
78
82
}
79
83
}
80
84
85
+ func (p * WorkerPool ) zeroBoost () {
86
+ ctx , cancel := context .WithCancel (p .baseCtx )
87
+ mq := GetManager ().GetManagedQueue (p .qid )
88
+ boost := p .boostWorkers
89
+ if (boost + p .numberOfWorkers ) > p .maxNumberOfWorkers && p .maxNumberOfWorkers >= 0 {
90
+ boost = p .maxNumberOfWorkers - p .numberOfWorkers
91
+ }
92
+ if mq != nil {
93
+ log .Warn ("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s" , p .qid , mq .Name , boost , p .boostTimeout )
94
+
95
+ start := time .Now ()
96
+ pid := mq .RegisterWorkers (boost , start , true , start .Add (p .boostTimeout ), cancel , false )
97
+ go func () {
98
+ select {
99
+ case <- ctx .Done ():
100
+ case <- time .After (p .boostTimeout ):
101
+ }
102
+ mq .RemoveWorkers (pid )
103
+ cancel ()
104
+ }()
105
+ } else {
106
+ log .Warn ("WorkerPool: %d has zero workers - adding %d temporary workers for %s" , p .qid , p .boostWorkers , p .boostTimeout )
107
+ go func () {
108
+ select {
109
+ case <- ctx .Done ():
110
+ case <- time .After (p .boostTimeout ):
111
+ }
112
+ cancel ()
113
+ }()
114
+ }
115
+ p .lock .Unlock ()
116
+ p .addWorkers (ctx , boost )
117
+ }
118
+
81
119
func (p * WorkerPool ) pushBoost (data Data ) {
82
120
select {
83
121
case p .dataChan <- data :
@@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) {
112
150
log .Warn ("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v" , p .qid , mq .Name , ourTimeout , boost , p .boostTimeout , p .blockTimeout )
113
151
114
152
start := time .Now ()
115
- pid := mq .RegisterWorkers (boost , start , false , start , cancel , false )
153
+ pid := mq .RegisterWorkers (boost , start , true , start . Add ( p . boostTimeout ) , cancel , false )
116
154
go func () {
117
155
<- ctx .Done ()
118
156
mq .RemoveWorkers (pid )
0 commit comments