Skip to content

Commit 1475c62

Browse files
committed
As per review
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 2bce719 commit 1475c62

File tree

12 files changed

+100
-41
lines changed

12 files changed

+100
-41
lines changed

modules/doctor/queue.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,6 @@ import (
1414
"gitea.com/lunny/levelqueue"
1515
)
1616

17-
var uniqueQueueNames = []string{
18-
"code_indexer",
19-
"repo_stats_update",
20-
"mirror",
21-
"pr_patch_checker",
22-
"repo-archive",
23-
}
24-
25-
var queueNames = []string{
26-
"issue_indexer",
27-
"notification-service",
28-
"mail",
29-
"push_update",
30-
"task",
31-
}
32-
3317
var levelqueueTypes = []string{
3418
string(queue.PersistableChannelQueueType),
3519
string(queue.PersistableChannelUniqueQueueType),
@@ -38,8 +22,8 @@ var levelqueueTypes = []string{
3822
}
3923

4024
func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) error {
41-
for _, name := range uniqueQueueNames {
42-
q := setting.GetQueueSettings(name)
25+
for _, name := range queue.KnownUniqueQueueNames {
26+
q := setting.GetQueueSettings(string(name))
4327
if q.Type == "" {
4428
q.Type = string(queue.PersistableChannelQueueType)
4529
}
@@ -62,7 +46,7 @@ func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) err
6246

6347
db, err := nosql.GetManager().GetLevelDB(connection)
6448
if err != nil {
65-
logger.Error("Queue: %s\nUnable to open DB connection %s: %v", q.Name, connection, err)
49+
logger.Error("Queue: %s\nUnable to open DB connection %q: %v", q.Name, connection, err)
6650
return err
6751
}
6852
defer db.Close()
@@ -138,8 +122,15 @@ func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) err
138122

139123
func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
140124
connections := []string{}
125+
queueNames := make([]string, 0, len(queue.KnownUniqueQueueNames)+len(queue.KnownQueueNames))
126+
for _, name := range queue.KnownUniqueQueueNames {
127+
queueNames = append(queueNames, string(name))
128+
}
129+
for _, name := range queue.KnownQueueNames {
130+
queueNames = append(queueNames, string(name))
131+
}
141132

142-
for _, name := range append(uniqueQueueNames, queueNames...) {
133+
for _, name := range queueNames {
143134
q := setting.GetQueueSettings(name)
144135
if q.Type == "" {
145136
q.Type = string(queue.PersistableChannelQueueType)
@@ -159,6 +150,7 @@ func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
159150
for _, connection := range connections {
160151
if connection == q.ConnectionString {
161152
found = true
153+
break
162154
}
163155
}
164156
if !found {
@@ -170,6 +162,7 @@ func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
170162
for _, connection := range connections {
171163
if connection == q.DataDir {
172164
found = true
165+
break
173166
}
174167
}
175168
if !found {
@@ -181,7 +174,7 @@ func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
181174
logger.Info("LevelDB: %s", connection)
182175
db, err := nosql.GetManager().GetLevelDB(connection)
183176
if err != nil {
184-
logger.Error("Connection: %s Unable to open DB: %v", connection, err)
177+
logger.Error("Connection: %q Unable to open DB: %v", connection, err)
185178
return err
186179
}
187180
defer db.Close()
@@ -204,6 +197,7 @@ func init() {
204197
SkipDatabaseInitialization: false,
205198
Priority: 1,
206199
})
200+
207201
Register(&Check{
208202
Title: "List all entries in leveldb",
209203
Name: "queues-listdb",

modules/indexer/code/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func Init() {
163163
return unhandled
164164
}
165165

166-
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
166+
indexerQueue = queue.CreateUniqueQueue(queue.CodeIndexerQueueName, handler, &IndexerData{})
167167
if indexerQueue == nil {
168168
log.Fatal("Unable to create codes indexer queue")
169169
}

modules/indexer/issues/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func InitIssueIndexer(syncReindex bool) {
154154
return nil
155155
}
156156

157-
issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
157+
issueIndexerQueue = queue.CreateQueue(queue.IssueIndexerQueueName, handler, &IndexerData{})
158158

159159
if issueIndexerQueue == nil {
160160
log.Fatal("Unable to create issue indexer queue")

modules/indexer/stats/queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ func handle(data ...queue.Data) []queue.Data {
2828
}
2929

3030
func initStatsQueue() error {
31-
statsQueue = queue.CreateUniqueQueue("repo_stats_update", handle, int64(0))
31+
statsQueue = queue.CreateUniqueQueue(queue.RepoStatsUpdateQueueName, handle, int64(0))
3232
if statsQueue == nil {
33-
return fmt.Errorf("Unable to create repo_stats_update Queue")
33+
return fmt.Errorf("unable to create repo_stats_update Queue")
3434
}
3535

3636
go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)

modules/notification/ui/ui.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var _ base.Notifier = &notificationService{}
3434
// NewNotifier create a new notificationService notifier
3535
func NewNotifier() base.Notifier {
3636
ns := &notificationService{}
37-
ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{})
37+
ns.issueQueue = queue.CreateQueue(queue.NotificationQueueName, ns.handle, issueNotificationOpts{})
3838
return ns
3939
}
4040

modules/queue/setting.go

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,48 @@ import (
1313
"code.gitea.io/gitea/modules/setting"
1414
)
1515

16+
// UniqueQueueName represents an expected name for an UniqueQueue
17+
type UniqueQueueName string
18+
19+
// list of all expected UniqueQueues
20+
const (
21+
CodeIndexerQueueName UniqueQueueName = "code_indexer"
22+
RepoStatsUpdateQueueName UniqueQueueName = "repo_stats_update"
23+
MirrorQueueName UniqueQueueName = "mirror"
24+
PRPatchQueueName UniqueQueueName = "pr_patch_checker"
25+
RepoArchiveQueueName UniqueQueueName = "repo-archive"
26+
)
27+
28+
// KnownUniqueQueueNames represents the list of expected unique queues
29+
var KnownUniqueQueueNames = []UniqueQueueName{
30+
CodeIndexerQueueName,
31+
RepoStatsUpdateQueueName,
32+
MirrorQueueName,
33+
PRPatchQueueName,
34+
RepoArchiveQueueName,
35+
}
36+
37+
// QueueName represents an expected name for Queue
38+
type QueueName string
39+
40+
// list of all expected Queues
41+
const (
42+
IssueIndexerQueueName QueueName = "issue_indexer"
43+
NotificationQueueName QueueName = "notification-service"
44+
MailerQueueName QueueName = "mail"
45+
PushUpdateQueueName QueueName = "push_update"
46+
TaskQueueName QueueName = "task"
47+
)
48+
49+
// KnownQueueNames represents the list of expected queues
50+
var KnownQueueNames = []QueueName{
51+
IssueIndexerQueueName,
52+
NotificationQueueName,
53+
MailerQueueName,
54+
PushUpdateQueueName,
55+
TaskQueueName,
56+
}
57+
1658
func validType(t string) (Type, error) {
1759
if len(t) == 0 {
1860
return PersistableChannelQueueType, nil
@@ -37,8 +79,19 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
3779
}
3880

3981
// CreateQueue for name with provided handler and exemplar
40-
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
41-
q, cfg := getQueueSettings(name)
82+
func CreateQueue(name QueueName, handle HandlerFunc, exemplar interface{}) Queue {
83+
found := false
84+
for _, expected := range KnownQueueNames {
85+
if name == expected {
86+
found = true
87+
break
88+
}
89+
}
90+
if !found {
91+
log.Warn("%s is not an expected name for an Queue", name)
92+
}
93+
94+
q, cfg := getQueueSettings(string(name))
4295
if len(cfg) == 0 {
4396
return nil
4497
}
@@ -58,7 +111,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
58111
MaxAttempts: q.MaxAttempts,
59112
Config: cfg,
60113
QueueLength: q.QueueLength,
61-
Name: name,
114+
Name: string(name),
62115
}, exemplar)
63116
}
64117
if err != nil {
@@ -79,8 +132,19 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
79132
}
80133

81134
// CreateUniqueQueue for name with provided handler and exemplar
82-
func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
83-
q, cfg := getQueueSettings(name)
135+
func CreateUniqueQueue(name UniqueQueueName, handle HandlerFunc, exemplar interface{}) UniqueQueue {
136+
found := false
137+
for _, expected := range KnownUniqueQueueNames {
138+
if name == expected {
139+
found = true
140+
break
141+
}
142+
}
143+
if !found {
144+
log.Warn("%s is not an expected name for an UniqueQueue", name)
145+
}
146+
147+
q, cfg := getQueueSettings(string(name))
84148
if len(cfg) == 0 {
85149
return nil
86150
}
@@ -107,6 +171,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un
107171
MaxAttempts: q.MaxAttempts,
108172
Config: cfg,
109173
QueueLength: q.QueueLength,
174+
Name: string(name),
110175
}, exemplar)
111176
}
112177
if err != nil {

services/mailer/mailer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func NewContext() {
346346
Sender = &dummySender{}
347347
}
348348

349-
mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data {
349+
mailQueue = queue.CreateQueue(queue.MailerQueueName, func(data ...queue.Data) []queue.Data {
350350
for _, datum := range data {
351351
msg := datum.(*Message)
352352
gomailMsg := msg.ToMessage()

services/mirror/mirror.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func InitSyncMirrors() {
151151
if !setting.Mirror.Enabled {
152152
return
153153
}
154-
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
154+
mirrorQueue = queue.CreateUniqueQueue(queue.MirrorQueueName, queueHandle, new(SyncRequest))
155155

156156
go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
157157
}

services/pull/check.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,10 @@ func CheckPrsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin
278278

279279
// Init runs the task queue to test all the checking status pull requests
280280
func Init() error {
281-
prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "")
281+
prQueue = queue.CreateUniqueQueue(queue.PRPatchQueueName, handle, "")
282282

283283
if prQueue == nil {
284-
return fmt.Errorf("Unable to create pr_patch_checker Queue")
284+
return fmt.Errorf("unable to create pr_patch_checker Queue")
285285
}
286286

287287
go graceful.GetManager().RunWithShutdownFns(prQueue.Run)

services/repository/archiver/archiver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,9 @@ func Init() error {
261261
return nil
262262
}
263263

264-
archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest))
264+
archiverQueue = queue.CreateUniqueQueue(queue.RepoArchiveQueueName, handler, new(ArchiveRequest))
265265
if archiverQueue == nil {
266-
return errors.New("unable to create codes indexer queue")
266+
return errors.New("unable to create repo archiver queue")
267267
}
268268

269269
go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run)

services/repository/push.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func handle(data ...queue.Data) []queue.Data {
4444
}
4545

4646
func initPushQueue() error {
47-
pushQueue = queue.CreateQueue("push_update", handle, []*repo_module.PushUpdateOptions{})
47+
pushQueue = queue.CreateQueue(queue.PushUpdateQueueName, handle, []*repo_module.PushUpdateOptions{})
4848
if pushQueue == nil {
4949
return errors.New("unable to create push_update Queue")
5050
}

services/task/task.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@ func Run(t *models.Task) error {
3232
case structs.TaskTypeMigrateRepo:
3333
return runMigrateTask(t)
3434
default:
35-
return fmt.Errorf("Unknown task type: %d", t.Type)
35+
return fmt.Errorf("unknown task type: %d", t.Type)
3636
}
3737
}
3838

3939
// Init will start the service to get all unfinished tasks and run them
4040
func Init() error {
41-
taskQueue = queue.CreateQueue("task", handle, &models.Task{})
41+
taskQueue = queue.CreateQueue(queue.TaskQueueName, handle, &models.Task{})
4242

4343
if taskQueue == nil {
44-
return fmt.Errorf("Unable to create Task Queue")
44+
return fmt.Errorf("unable to create Task Queue")
4545
}
4646

4747
go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)

0 commit comments

Comments
 (0)