diff --git a/consumer.go b/consumer.go index 31d7e69..b72ccfc 100644 --- a/consumer.go +++ b/consumer.go @@ -21,20 +21,6 @@ type Consumer struct { logger Logger stopOnce sync.Once stopFlag int32 - metric Metric -} - -func (s *Consumer) incBusyWorker() { - s.metric.IncBusyWorker() -} - -func (s *Consumer) decBusyWorker() { - s.metric.DecBusyWorker() -} - -// BusyWorkers returns the numbers of workers has been busy. -func (s *Consumer) BusyWorkers() uint64 { - return s.metric.BusyWorkers() } func (s *Consumer) handle(job Job) error { @@ -43,10 +29,8 @@ func (s *Consumer) handle(job Job) error { panicChan := make(chan interface{}, 1) startTime := time.Now() ctx, cancel := context.WithTimeout(context.Background(), job.Timeout) - s.incBusyWorker() defer func() { cancel() - s.decBusyWorker() }() // run the job @@ -157,7 +141,6 @@ func NewConsumer(opts ...Option) *Consumer { stop: make(chan struct{}), logger: o.logger, runFunc: o.fn, - metric: o.metric, } return w diff --git a/consumer_test.go b/consumer_test.go index fba3a10..6097a97 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -321,30 +321,3 @@ func TestTaskJobComplete(t *testing.T) { } assert.Equal(t, context.DeadlineExceeded, w.handle(job)) } - -func TestBusyWorkerCount(t *testing.T) { - job := Job{ - Timeout: 200 * time.Millisecond, - Task: func(ctx context.Context) error { - time.Sleep(100 * time.Millisecond) - return nil - }, - } - - w := NewConsumer() - - assert.Equal(t, uint64(0), w.BusyWorkers()) - go func() { - assert.NoError(t, w.handle(job)) - }() - go func() { - assert.NoError(t, w.handle(job)) - }() - - time.Sleep(50 * time.Millisecond) - assert.Equal(t, uint64(2), w.BusyWorkers()) - time.Sleep(100 * time.Millisecond) - assert.Equal(t, uint64(0), w.BusyWorkers()) - - assert.NoError(t, w.Shutdown()) -}