From 73f52629c37ecd966bbecdfaa931bec0cfb56d4e Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 21:03:21 +0800 Subject: [PATCH 1/7] chore(queue): auto scale the task worker fix https://github.com/golang-queue/queue/issues/43 Signed-off-by: Bo-Yi Wu --- .golangci.yml | 1 - consumer.go | 30 +++++---- consumer_test.go | 56 +++++----------- pool_test.go | 6 +- queue.go | 159 ++++++++++++++++++++++++++++++++++------------ queue_test.go | 53 +--------------- worker.go | 8 ++- worker_empty.go | 17 ++--- worker_message.go | 24 ++++--- worker_task.go | 22 +++++-- 10 files changed, 202 insertions(+), 174 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 5a0031c..7520418 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,7 +8,6 @@ linters: - depguard - dogsled - dupl - - errcheck - exportloopref - exhaustive - gochecknoinits diff --git a/consumer.go b/consumer.go index 847e75b..6db64f3 100644 --- a/consumer.go +++ b/consumer.go @@ -101,7 +101,7 @@ func (s *Consumer) handle(job Job) error { } // Run start the worker -func (s *Consumer) Run() error { +func (s *Consumer) Run(task QueuedMessage) error { // check queue status select { case <-s.stop: @@ -109,18 +109,17 @@ func (s *Consumer) Run() error { default: } - for task := range s.taskQueue { - var data Job - _ = json.Unmarshal(task.Bytes(), &data) - if v, ok := task.(Job); ok { - if v.Task != nil { - data.Task = v.Task - } - } - if err := s.handle(data); err != nil { - s.logger.Error(err.Error()) + var data Job + _ = json.Unmarshal(task.Bytes(), &data) + if v, ok := task.(Job); ok { + if v.Task != nil { + data.Task = v.Task } } + if err := s.handle(data); err != nil { + return err + } + return nil } @@ -161,6 +160,15 @@ func (s *Consumer) Queue(job QueuedMessage) error { } } +func (s *Consumer) Request() (QueuedMessage, error) { + select { + case task := <-s.taskQueue: + return task, nil + default: + return nil, errors.New("no message in queue") + } +} + // NewConsumer for struc func NewConsumer(opts ...Option) *Consumer { o := NewOptions(opts...) diff --git a/consumer_test.go b/consumer_test.go index ae85774..29ced21 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -50,14 +50,16 @@ func TestCustomFuncAndWait(t *testing.T) { q, err := NewQueue( WithWorker(w), WithWorkerCount(2), + WithLogger(NewLogger()), ) assert.NoError(t, err) - q.Start() - time.Sleep(100 * time.Millisecond) assert.NoError(t, q.Queue(m)) assert.NoError(t, q.Queue(m)) assert.NoError(t, q.Queue(m)) assert.NoError(t, q.Queue(m)) + q.Start() + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 2, int(q.metric.BusyWorkers())) time.Sleep(600 * time.Millisecond) q.Shutdown() q.Wait() @@ -84,26 +86,6 @@ func TestEnqueueJobAfterShutdown(t *testing.T) { q.Wait() } -func TestConsumerNumAfterShutdown(t *testing.T) { - w := NewConsumer() - q, err := NewQueue( - WithWorker(w), - WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - q.Start() - time.Sleep(50 * time.Millisecond) - assert.Equal(t, 4, q.Workers()) - q.Shutdown() - q.Wait() - assert.Equal(t, 0, q.Workers()) - // show queue has been shutdown meesgae - q.Start() - q.Start() - assert.Equal(t, 0, q.Workers()) -} - func TestJobReachTimeout(t *testing.T) { m := mockMessage{ message: "foo", @@ -131,12 +113,10 @@ func TestJobReachTimeout(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - q.Start() - time.Sleep(50 * time.Millisecond) assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m)) + q.Start() time.Sleep(50 * time.Millisecond) - q.Shutdown() - q.Wait() + q.Release() } func TestCancelJobAfterShutdown(t *testing.T) { @@ -167,11 +147,12 @@ func TestCancelJobAfterShutdown(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - q.Start() - time.Sleep(50 * time.Millisecond) assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) - q.Shutdown() - q.Wait() + assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) + q.Start() + time.Sleep(10 * time.Millisecond) + assert.Equal(t, 2, int(q.metric.busyWorkers)) + q.Release() } func TestGoroutineLeak(t *testing.T) { @@ -205,15 +186,14 @@ func TestGoroutineLeak(t *testing.T) { WithWorkerCount(10), ) assert.NoError(t, err) - q.Start() - time.Sleep(50 * time.Millisecond) for i := 0; i < 500; i++ { m.message = fmt.Sprintf("foobar: %d", i+1) assert.NoError(t, q.Queue(m)) } + + q.Start() time.Sleep(2 * time.Second) - q.Shutdown() - q.Wait() + q.Release() fmt.Println("number of goroutines:", runtime.NumGoroutine()) } @@ -231,12 +211,10 @@ func TestGoroutinePanic(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - q.Start() - time.Sleep(50 * time.Millisecond) assert.NoError(t, q.Queue(m)) - time.Sleep(50 * time.Millisecond) - q.Shutdown() - q.Wait() + q.Start() + time.Sleep(10 * time.Millisecond) + q.Release() } func TestHandleTimeout(t *testing.T) { diff --git a/pool_test.go b/pool_test.go index ae96ef8..911cb86 100644 --- a/pool_test.go +++ b/pool_test.go @@ -3,7 +3,6 @@ package queue import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -14,9 +13,6 @@ func TestNewPoolWithQueueTask(t *testing.T) { rets := make(chan struct{}, taskN) p := NewPool(totalN) - time.Sleep(time.Millisecond * 50) - assert.Equal(t, totalN, p.Workers()) - for i := 0; i < taskN; i++ { assert.NoError(t, p.QueueTask(func(context.Context) error { rets <- struct{}{} @@ -30,5 +26,5 @@ func TestNewPoolWithQueueTask(t *testing.T) { // shutdown all, and now running worker is 0 p.Release() - assert.Equal(t, 0, p.Workers()) + assert.Equal(t, 0, p.BusyWorkers()) } diff --git a/queue.go b/queue.go index 786e901..e15da16 100644 --- a/queue.go +++ b/queue.go @@ -18,15 +18,17 @@ type TaskFunc func(context.Context) error type ( // A Queue is a message queue. Queue struct { - logger Logger - workerCount int - routineGroup *routineGroup - quit chan struct{} - worker Worker - stopOnce sync.Once - runningWorkers int32 - timeout time.Duration - stopFlag int32 + sync.Mutex + metric *metric + logger Logger + workerCount int + routineGroup *routineGroup + quit chan struct{} + ready chan struct{} + worker Worker + stopOnce sync.Once + timeout time.Duration + stopFlag int32 } // Job describes a task and its metadata. @@ -62,10 +64,12 @@ func NewQueue(opts ...Option) (*Queue, error) { q := &Queue{ routineGroup: newRoutineGroup(), quit: make(chan struct{}), + ready: make(chan struct{}, 1), workerCount: o.workerCount, logger: o.logger, timeout: o.timeout, worker: o.worker, + metric: &metric{}, } if q.worker == nil { @@ -87,7 +91,7 @@ func (q *Queue) Usage() int { // Start to enable all worker func (q *Queue) Start() { - q.startWorker() + go q.start() } // Shutdown stops all queues. @@ -96,8 +100,8 @@ func (q *Queue) Shutdown() { return } - if q.runningWorkers > 0 { - q.logger.Infof("shutdown all woker numbers: %d", q.runningWorkers) + if q.metric.BusyWorkers() > 0 { + q.logger.Infof("shutdown all woker numbers: %d", q.metric.BusyWorkers()) } q.stopOnce.Do(func() { @@ -114,9 +118,9 @@ func (q *Queue) Release() { q.Wait() } -// Workers returns the numbers of workers has been created. -func (q *Queue) Workers() int { - return int(atomic.LoadInt32(&q.runningWorkers)) +// BusyWorkers returns the numbers of workers in the running process. +func (q *Queue) BusyWorkers() int { + return int(q.metric.BusyWorkers()) } // Wait all process @@ -174,39 +178,110 @@ func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error return q.handleQueueTask(timeout, task) } -func (q *Queue) work() { - if atomic.LoadInt32(&q.stopFlag) == 1 { - return - } - - num := atomic.AddInt32(&q.runningWorkers, 1) +func (q *Queue) work(task QueuedMessage) { if err := q.worker.BeforeRun(); err != nil { - q.logger.Fatal(err) + q.logger.Error(err) } - q.routineGroup.Run(func() { - // to handle panic cases from inside the worker - // in such case, we start a new goroutine - defer func() { - atomic.AddInt32(&q.runningWorkers, -1) - if err := recover(); err != nil { - q.logger.Error(err) - q.logger.Infof("restart the new worker: %d", num) - go q.work() - } - }() - q.logger.Infof("start the worker num: %d", num) - if err := q.worker.Run(); err != nil { - q.logger.Errorf("runtime error: %s", err.Error()) + + // to handle panic cases from inside the worker + // in such case, we start a new goroutine + defer func() { + q.metric.DecBusyWorker() + if err := recover(); err != nil { + q.logger.Errorf("panic error: %v", err) } - q.logger.Infof("stop the worker num: %d", num) - }) + q.schedule() + }() + + if err := q.worker.Run(task); err != nil { + q.logger.Errorf("runtime error: %s", err.Error()) + } + if err := q.worker.AfterRun(); err != nil { - q.logger.Fatal(err) + q.logger.Error(err) + } +} + +func (q *Queue) schedule() { + select { + case q.ready <- struct{}{}: + default: } } -func (q *Queue) startWorker() { - for i := 0; i < q.workerCount; i++ { - go q.work() +// start handle job +func (q *Queue) start() { + var task QueuedMessage + tasks := make(chan QueuedMessage, 1) + + for { + if atomic.LoadInt32(&q.stopFlag) == 1 { + return + } + + // request task from queue in background + q.routineGroup.Run(func() { + loop: + for { + select { + case <-q.quit: + return + default: + task, err := q.worker.Request() + if task == nil || err != nil { + if err != nil { + select { + case <-q.quit: + break loop + case <-time.After(time.Second): + // sleep 1 second to fetch new task + } + } + } + if task != nil { + tasks <- task + break loop + } + } + } + }) + + // read task + select { + case task = <-tasks: + case <-q.quit: + select { + case task = <-tasks: + // queue task before shutdown the service + q.worker.Queue(task) + default: + } + return + } + + // check worker number + if q.BusyWorkers() < q.workerCount { + q.schedule() + } + + // get worker to execute new task + select { + case <-q.quit: + q.worker.Queue(task) + return + case <-q.ready: + select { + case <-q.quit: + q.worker.Queue(task) + return + default: + } + } + + // start new task + q.metric.IncBusyWorker() + q.routineGroup.Run(func() { + q.work(task) + }) } } diff --git a/queue_test.go b/queue_test.go index 7ffe6a2..c228119 100644 --- a/queue_test.go +++ b/queue_test.go @@ -39,26 +39,6 @@ func TestNewQueue(t *testing.T) { q.Wait() } -func TestWorkerNum(t *testing.T) { - w := &messageWorker{ - messages: make(chan QueuedMessage, 100), - } - q, err := NewQueue( - WithWorker(w), - WithWorkerCount(2), - ) - assert.NoError(t, err) - assert.NotNil(t, q) - - q.Start() - q.Start() - time.Sleep(20 * time.Millisecond) - assert.Equal(t, 4, q.Workers()) - assert.Equal(t, uint64(0), w.BusyWorkers()) - q.Shutdown() - q.Wait() -} - func TestShtdonwOnce(t *testing.T) { w := &messageWorker{ messages: make(chan QueuedMessage, 100), @@ -71,13 +51,12 @@ func TestShtdonwOnce(t *testing.T) { assert.NotNil(t, q) q.Start() - time.Sleep(20 * time.Millisecond) - assert.Equal(t, 2, q.Workers()) + assert.Equal(t, 0, q.BusyWorkers()) q.Shutdown() // don't panic here q.Shutdown() q.Wait() - assert.Equal(t, 0, q.Workers()) + assert.Equal(t, 0, q.BusyWorkers()) } func TestWorkerStatus(t *testing.T) { @@ -106,34 +85,6 @@ func TestWorkerStatus(t *testing.T) { q.Wait() } -func TestWorkerPanic(t *testing.T) { - w := &messageWorker{ - messages: make(chan QueuedMessage, 10), - } - q, err := NewQueue( - WithWorker(w), - WithWorkerCount(5), - ) - assert.NoError(t, err) - assert.NotNil(t, q) - - assert.NoError(t, q.Queue(mockMessage{ - message: "foobar", - })) - assert.NoError(t, q.Queue(mockMessage{ - message: "foobar", - })) - assert.NoError(t, q.Queue(mockMessage{ - message: "panic", - })) - q.Start() - time.Sleep(100 * time.Millisecond) - assert.Equal(t, 5, q.Workers()) - q.Shutdown() - q.Wait() - assert.Equal(t, 0, q.Workers()) -} - func TestCapacityReached(t *testing.T) { w := &messageWorker{ messages: make(chan QueuedMessage, 1), diff --git a/worker.go b/worker.go index 2e19096..a3e43a9 100644 --- a/worker.go +++ b/worker.go @@ -5,13 +5,15 @@ type Worker interface { // BeforeRun is called before starting the worker BeforeRun() error // Run is called to start the worker - Run() error + Run(task QueuedMessage) error // BeforeRun is called after starting the worker AfterRun() error // Shutdown is called if stop all worker Shutdown() error - // Queue to send message in Queue (single channel, NSQ or AWS SQS) - Queue(job QueuedMessage) error + // Queue to send message in Queue + Queue(task QueuedMessage) error + // Request to get message from Queue + Request() (QueuedMessage, error) // Capacity queue capacity = cap(channel name) Capacity() int // Usage is how many message in queue diff --git a/worker_empty.go b/worker_empty.go index e78c3c7..fa4fdff 100644 --- a/worker_empty.go +++ b/worker_empty.go @@ -5,11 +5,12 @@ var _ Worker = (*emptyWorker)(nil) // just for unit testing, don't use it. type emptyWorker struct{} -func (w *emptyWorker) BeforeRun() error { return nil } -func (w *emptyWorker) AfterRun() error { return nil } -func (w *emptyWorker) Run() error { return nil } -func (w *emptyWorker) Shutdown() error { return nil } -func (w *emptyWorker) Queue(job QueuedMessage) error { return nil } -func (w *emptyWorker) Capacity() int { return 0 } -func (w *emptyWorker) Usage() int { return 0 } -func (w *emptyWorker) BusyWorkers() uint64 { return uint64(0) } +func (w *emptyWorker) BeforeRun() error { return nil } +func (w *emptyWorker) AfterRun() error { return nil } +func (w *emptyWorker) Run(task QueuedMessage) error { return nil } +func (w *emptyWorker) Shutdown() error { return nil } +func (w *emptyWorker) Queue(task QueuedMessage) error { return nil } +func (w *emptyWorker) Request() (QueuedMessage, error) { return nil, nil } +func (w *emptyWorker) Capacity() int { return 0 } +func (w *emptyWorker) Usage() int { return 0 } +func (w *emptyWorker) BusyWorkers() uint64 { return uint64(0) } diff --git a/worker_message.go b/worker_message.go index 22e817b..402d375 100644 --- a/worker_message.go +++ b/worker_message.go @@ -14,13 +14,11 @@ type messageWorker struct { func (w *messageWorker) BeforeRun() error { return nil } func (w *messageWorker) AfterRun() error { return nil } -func (w *messageWorker) Run() error { - for msg := range w.messages { - if string(msg.Bytes()) == "panic" { - panic("show panic") - } - time.Sleep(20 * time.Millisecond) +func (w *messageWorker) Run(task QueuedMessage) error { + if string(task.Bytes()) == "panic" { + panic("show panic") } + time.Sleep(20 * time.Millisecond) return nil } @@ -29,14 +27,24 @@ func (w *messageWorker) Shutdown() error { return nil } -func (w *messageWorker) Queue(job QueuedMessage) error { +func (w *messageWorker) Queue(task QueuedMessage) error { select { - case w.messages <- job: + case w.messages <- task: return nil default: return errors.New("max capacity reached") } } + +func (w *messageWorker) Request() (QueuedMessage, error) { + select { + case task := <-w.messages: + return task, nil + default: + return nil, errors.New("no message in queue") + } +} + func (w *messageWorker) Capacity() int { return cap(w.messages) } func (w *messageWorker) Usage() int { return len(w.messages) } func (w *messageWorker) BusyWorkers() uint64 { return uint64(0) } diff --git a/worker_task.go b/worker_task.go index 6e7f8fe..e0f7471 100644 --- a/worker_task.go +++ b/worker_task.go @@ -14,14 +14,14 @@ type taskWorker struct { func (w *taskWorker) BeforeRun() error { return nil } func (w *taskWorker) AfterRun() error { return nil } -func (w *taskWorker) Run() error { - for msg := range w.messages { - if v, ok := msg.(Job); ok { - if v.Task != nil { - _ = v.Task(context.Background()) - } +func (w *taskWorker) Run(task QueuedMessage) error { + // for msg := range w.messages { + if v, ok := task.(Job); ok { + if v.Task != nil { + _ = v.Task(context.Background()) } } + // } return nil } @@ -38,6 +38,16 @@ func (w *taskWorker) Queue(job QueuedMessage) error { return errors.New("max capacity reached") } } + +func (w *taskWorker) Request() (QueuedMessage, error) { + select { + case task := <-w.messages: + return task, nil + default: + return nil, errors.New("no message in queue") + } +} + func (w *taskWorker) Capacity() int { return cap(w.messages) } func (w *taskWorker) Usage() int { return len(w.messages) } func (w *taskWorker) BusyWorkers() uint64 { return uint64(0) } From 7cf13467cf752154edcdcc2a204011a84165a481 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 21:05:41 +0800 Subject: [PATCH 2/7] chore: update Signed-off-by: Bo-Yi Wu --- .golangci.yml | 1 + queue.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 7520418..5a0031c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,6 +8,7 @@ linters: - depguard - dogsled - dupl + - errcheck - exportloopref - exhaustive - gochecknoinits diff --git a/queue.go b/queue.go index e15da16..cc9477b 100644 --- a/queue.go +++ b/queue.go @@ -253,7 +253,7 @@ func (q *Queue) start() { select { case task = <-tasks: // queue task before shutdown the service - q.worker.Queue(task) + _ = q.worker.Queue(task) default: } return @@ -267,12 +267,12 @@ func (q *Queue) start() { // get worker to execute new task select { case <-q.quit: - q.worker.Queue(task) + _ = q.worker.Queue(task) return case <-q.ready: select { case <-q.quit: - q.worker.Queue(task) + _ = q.worker.Queue(task) return default: } From d631bc5740e78ffaf9aaf1bc67f000c5add857bb Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 21:10:31 +0800 Subject: [PATCH 3/7] chore: update Signed-off-by: Bo-Yi Wu --- queue_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/queue_test.go b/queue_test.go index c228119..4b30f0c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -81,8 +81,7 @@ func TestWorkerStatus(t *testing.T) { assert.Equal(t, 4, q.Usage()) q.Start() time.Sleep(20 * time.Millisecond) - q.Shutdown() - q.Wait() + q.Release() } func TestCapacityReached(t *testing.T) { From 2c37dd18084b11b61ee8a0cd6e8735e7f0df3573 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 22:23:07 +0800 Subject: [PATCH 4/7] chore: update Signed-off-by: Bo-Yi Wu --- consumer.go | 4 ++-- consumer_test.go | 12 ++++++------ queue.go | 10 +++++----- worker_task.go | 2 -- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/consumer.go b/consumer.go index 6db64f3..cbd31e2 100644 --- a/consumer.go +++ b/consumer.go @@ -147,13 +147,13 @@ func (s *Consumer) Usage() int { } // Queue send notification to queue -func (s *Consumer) Queue(job QueuedMessage) error { +func (s *Consumer) Queue(task QueuedMessage) error { if atomic.LoadInt32(&s.stopFlag) == 1 { return ErrQueueShutdown } select { - case s.taskQueue <- job: + case s.taskQueue <- task: return nil default: return errMaxCapacity diff --git a/consumer_test.go b/consumer_test.go index 29ced21..6ad2381 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -156,9 +156,6 @@ func TestCancelJobAfterShutdown(t *testing.T) { } func TestGoroutineLeak(t *testing.T) { - m := mockMessage{ - message: "foo", - } w := NewConsumer( WithLogger(NewEmptyLogger()), WithFn(func(ctx context.Context, m QueuedMessage) error { @@ -181,13 +178,16 @@ func TestGoroutineLeak(t *testing.T) { }), ) q, err := NewQueue( - WithLogger(NewEmptyLogger()), + WithLogger(NewLogger()), WithWorker(w), WithWorkerCount(10), ) assert.NoError(t, err) - for i := 0; i < 500; i++ { - m.message = fmt.Sprintf("foobar: %d", i+1) + for i := 0; i < 400; i++ { + m := mockMessage{ + message: fmt.Sprintf("new message: %d", i+1), + } + assert.NoError(t, q.Queue(m)) } diff --git a/queue.go b/queue.go index cc9477b..cf502b2 100644 --- a/queue.go +++ b/queue.go @@ -211,10 +211,10 @@ func (q *Queue) schedule() { // start handle job func (q *Queue) start() { - var task QueuedMessage tasks := make(chan QueuedMessage, 1) for { + var task QueuedMessage if atomic.LoadInt32(&q.stopFlag) == 1 { return } @@ -227,8 +227,8 @@ func (q *Queue) start() { case <-q.quit: return default: - task, err := q.worker.Request() - if task == nil || err != nil { + t, err := q.worker.Request() + if t == nil || err != nil { if err != nil { select { case <-q.quit: @@ -238,8 +238,8 @@ func (q *Queue) start() { } } } - if task != nil { - tasks <- task + if t != nil { + tasks <- t break loop } } diff --git a/worker_task.go b/worker_task.go index e0f7471..17b4c28 100644 --- a/worker_task.go +++ b/worker_task.go @@ -15,13 +15,11 @@ type taskWorker struct { func (w *taskWorker) BeforeRun() error { return nil } func (w *taskWorker) AfterRun() error { return nil } func (w *taskWorker) Run(task QueuedMessage) error { - // for msg := range w.messages { if v, ok := task.(Job); ok { if v.Task != nil { _ = v.Task(context.Background()) } } - // } return nil } From c4b8d921616dd1e6b8a21d87759ba98288350dca Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 22:29:40 +0800 Subject: [PATCH 5/7] chore: update Signed-off-by: Bo-Yi Wu --- queue.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/queue.go b/queue.go index cf502b2..cbc0f0d 100644 --- a/queue.go +++ b/queue.go @@ -253,7 +253,9 @@ func (q *Queue) start() { select { case task = <-tasks: // queue task before shutdown the service - _ = q.worker.Queue(task) + if err := q.worker.Queue(task); err != nil { + q.logger.Errorf("can't re-queue message: %v", err) + } default: } return @@ -267,12 +269,16 @@ func (q *Queue) start() { // get worker to execute new task select { case <-q.quit: - _ = q.worker.Queue(task) + if err := q.worker.Queue(task); err != nil { + q.logger.Errorf("can't re-queue message: %v", err) + } return case <-q.ready: select { case <-q.quit: - _ = q.worker.Queue(task) + if err := q.worker.Queue(task); err != nil { + q.logger.Errorf("can't re-queue message: %v", err) + } return default: } From f1896c446e14ead2072f1c93e5ee68578bfe1d04 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 22:32:46 +0800 Subject: [PATCH 6/7] chore: update Signed-off-by: Bo-Yi Wu --- queue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_test.go b/queue_test.go index 4b30f0c..0e6a3a3 100644 --- a/queue_test.go +++ b/queue_test.go @@ -80,7 +80,7 @@ func TestWorkerStatus(t *testing.T) { assert.Equal(t, 100, q.Capacity()) assert.Equal(t, 4, q.Usage()) q.Start() - time.Sleep(20 * time.Millisecond) + time.Sleep(40 * time.Millisecond) q.Release() } From d3c2e69d1381d5e482653b4b0c25f485cfd2dab6 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 27 Mar 2022 22:40:38 +0800 Subject: [PATCH 7/7] chore: update Signed-off-by: Bo-Yi Wu --- queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue.go b/queue.go index cbc0f0d..d094e30 100644 --- a/queue.go +++ b/queue.go @@ -101,7 +101,7 @@ func (q *Queue) Shutdown() { } if q.metric.BusyWorkers() > 0 { - q.logger.Infof("shutdown all woker numbers: %d", q.metric.BusyWorkers()) + q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers()) } q.stopOnce.Do(func() {