Skip to content

Commit 59d35a6

Browse files
committed
chore(consumer): remove request timeout parameter.
Signed-off-by: Bo-Yi.Wu <[email protected]>
1 parent 9ec46ac commit 59d35a6

File tree

3 files changed

+7
-27
lines changed

3 files changed

+7
-27
lines changed

consumer.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"sync"
77
"sync/atomic"
8-
"time"
98

109
"github.com/golang-queue/queue/core"
1110
)
@@ -23,8 +22,6 @@ type Consumer struct {
2322
logger Logger
2423
stopOnce sync.Once
2524
stopFlag int32
26-
27-
requestTimeout time.Duration
2825
}
2926

3027
// Run to execute new task
@@ -74,7 +71,7 @@ func (s *Consumer) Request() (core.QueuedMessage, error) {
7471
return nil, ErrQueueHasBeenClosed
7572
}
7673
return task, nil
77-
case <-time.After(s.requestTimeout):
74+
default:
7875
return nil, ErrNoTaskInQueue
7976
}
8077
}
@@ -88,8 +85,6 @@ func NewConsumer(opts ...Option) *Consumer {
8885
exit: make(chan struct{}),
8986
logger: o.logger,
9087
runFunc: o.fn,
91-
92-
requestTimeout: o.requestTimeout,
9388
}
9489

9590
return w

consumer_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,11 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) {
497497
assert.Equal(t, 2, count)
498498
}
499499

500-
func TestRequestTimeout(t *testing.T) {
500+
func TestErrNoTaskInQueue(t *testing.T) {
501501
w := NewConsumer(
502502
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
503503
return nil
504504
}),
505-
WithRequestTimeout(10*time.Millisecond),
506505
)
507506
task, err := w.Request()
508507
assert.Nil(t, task)

options.go

+5-19
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,16 @@ package queue
33
import (
44
"context"
55
"runtime"
6-
"time"
76

87
"github.com/golang-queue/queue/core"
98
)
109

1110
var (
12-
defaultQueueSize = 4096
13-
defaultWorkerCount = runtime.NumCPU()
14-
defaultNewLogger = NewLogger()
15-
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
16-
defaultMetric = NewMetric()
17-
defaultRequestTimeout = 5 * time.Second
11+
defaultQueueSize = 4096
12+
defaultWorkerCount = runtime.NumCPU()
13+
defaultNewLogger = NewLogger()
14+
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
15+
defaultMetric = NewMetric()
1816
)
1917

2018
// An Option configures a mutex.
@@ -47,13 +45,6 @@ func WithQueueSize(num int) Option {
4745
})
4846
}
4947

50-
// WithQueueSize set worker count
51-
func WithRequestTimeout(timeout time.Duration) Option {
52-
return OptionFunc(func(q *Options) {
53-
q.requestTimeout = timeout
54-
})
55-
}
56-
5748
// WithLogger set custom logger
5849
func WithLogger(l Logger) Option {
5950
return OptionFunc(func(q *Options) {
@@ -90,9 +81,6 @@ type Options struct {
9081
worker core.Worker
9182
fn func(context.Context, core.QueuedMessage) error
9283
metric Metric
93-
94-
// timeout for request single task
95-
requestTimeout time.Duration
9684
}
9785

9886
// NewOptions initialize the default value for the options
@@ -104,8 +92,6 @@ func NewOptions(opts ...Option) *Options {
10492
worker: nil,
10593
fn: defaultFn,
10694
metric: defaultMetric,
107-
108-
requestTimeout: defaultRequestTimeout,
10995
}
11096

11197
// Loop through each option

0 commit comments

Comments
 (0)