Skip to content

Commit 4b6dac2

Browse files
author
Andrew Werner
committed
workload/tpcc: add tpcc-checks workload command
This commit adds a workload to run tpcc checks as a separate workload. This workload is an easy way to create an overload scenario. It can be run with `--as-of` at a historical timestamp to avoid interferring with foreground transactions. Although the hope was that this test could be used to starve node liveness, in practice it seems to kill nodes by OOM. Release note: None
1 parent 4d7d13a commit 4b6dac2

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed

pkg/workload/tpcc/checks_generator.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2017 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package tpcc
12+
13+
import (
14+
"context"
15+
gosql "database/sql"
16+
"fmt"
17+
"math/rand"
18+
19+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
20+
"github.com/cockroachdb/cockroach/pkg/workload"
21+
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
22+
"github.com/pkg/errors"
23+
"github.com/spf13/pflag"
24+
)
25+
26+
var tpccChecksMeta = workload.Meta{
27+
Name: `tpcc-checks`,
28+
Description: `tpcc-checks runs the TPC-C consistency checks as a workload.
29+
30+
It is primarily intended as a tool to create an overload scenario.
31+
An --as-of flag is exposed to prevent the work from interfering with a
32+
foreground TPC-C workload`,
33+
Version: `19.2.0`,
34+
PublicFacing: true,
35+
New: func() workload.Generator {
36+
g := &tpccChecks{}
37+
g.flags.FlagSet = pflag.NewFlagSet(`tpcc`, pflag.ContinueOnError)
38+
g.flags.Meta = map[string]workload.FlagMeta{
39+
`db`: {RuntimeOnly: true},
40+
`concurrency`: {RuntimeOnly: true},
41+
}
42+
g.flags.IntVar(&g.concurrency, `concurrency`, 1,
43+
`Number of concurrent workers. Defaults to 1.`,
44+
)
45+
g.flags.StringVar(&g.asOfSystemTime, "as-of", "",
46+
"Timestamp at which the query should be run."+
47+
" If non-empty the provided value will be used as the expression in an"+
48+
" AS OF SYSTEM TIME CLAUSE for all checks.")
49+
checkNames := func() (checkNames []string) {
50+
for _, c := range allChecks() {
51+
checkNames = append(checkNames, c.name)
52+
}
53+
return checkNames
54+
}()
55+
g.flags.StringSliceVar(&g.checks, "checks", checkNames,
56+
"Name of checks to be run.")
57+
g.connFlags = workload.NewConnFlags(&g.flags)
58+
{ // Set the dbOveride to default to "tpcc".
59+
dbOverrideFlag := g.flags.Lookup(`db`)
60+
dbOverrideFlag.DefValue = `tpcc`
61+
if err := dbOverrideFlag.Value.Set(`tpcc`); err != nil {
62+
panic(err)
63+
}
64+
}
65+
return g
66+
},
67+
}
68+
69+
func (w *tpccChecks) Flags() workload.Flags {
70+
return w.flags
71+
}
72+
73+
func init() {
74+
workload.Register(tpccChecksMeta)
75+
}
76+
77+
type tpccChecks struct {
78+
flags workload.Flags
79+
connFlags *workload.ConnFlags
80+
81+
asOfSystemTime string
82+
checks []string
83+
tolerateErrors bool
84+
concurrency int
85+
}
86+
87+
// The tables should already exist, if they do not an error will occur later.
88+
func (*tpccChecks) Tables() []workload.Table {
89+
return nil
90+
}
91+
92+
func (*tpccChecks) Meta() workload.Meta {
93+
return tpccChecksMeta
94+
}
95+
96+
// Ops implements the Opser interface.
97+
func (w *tpccChecks) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, error) {
98+
sqlDatabase, err := workload.SanitizeUrls(w, w.flags.Lookup("db").Value.String(), urls)
99+
if err != nil {
100+
return workload.QueryLoad{}, fmt.Errorf("%v", err)
101+
}
102+
dbs := make([]*gosql.DB, len(urls))
103+
for i, url := range urls {
104+
dbs[i], err = gosql.Open(`cockroach`, url)
105+
if err != nil {
106+
return workload.QueryLoad{}, errors.Wrapf(err, "failed to dial %s", url)
107+
}
108+
// Set the maximum number of open connections to 3x the concurrency because
109+
// that's the maximum number of connections used by any check at once.
110+
dbs[i].SetMaxOpenConns(3 * w.concurrency)
111+
dbs[i].SetMaxIdleConns(3 * w.concurrency)
112+
}
113+
114+
ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
115+
ql.WorkerFns = make([]func(context.Context) error, w.concurrency)
116+
checks, err := filterChecks(allChecks(), w.checks)
117+
118+
// Preregister all of the histograms so they always print.
119+
for i := range ql.WorkerFns {
120+
worker := newCheckWorker(dbs, checks, reg.GetHandle(), w.asOfSystemTime)
121+
ql.WorkerFns[i] = worker.run
122+
}
123+
124+
for _, c := range checks {
125+
reg.GetHandle().Get(c.name)
126+
}
127+
return ql, nil
128+
}
129+
130+
type checkWorker struct {
131+
dbs []*gosql.DB
132+
checks []check
133+
histograms *histogram.Histograms
134+
asOfSystemTime string
135+
dbPerm []int
136+
checkPerm []int
137+
i int
138+
}
139+
140+
func newCheckWorker(
141+
dbs []*gosql.DB, checks []check, histograms *histogram.Histograms, asOfSystemTime string,
142+
) *checkWorker {
143+
return &checkWorker{
144+
dbs: dbs,
145+
checks: checks,
146+
histograms: histograms,
147+
asOfSystemTime: asOfSystemTime,
148+
dbPerm: rand.Perm(len(dbs)),
149+
checkPerm: rand.Perm(len(checks)),
150+
}
151+
}
152+
153+
func (w *checkWorker) run(ctx context.Context) error {
154+
defer func() { w.i++ }()
155+
c := w.checks[w.checkPerm[w.i%len(w.checks)]]
156+
db := w.dbs[w.dbPerm[w.i%len(w.dbs)]]
157+
start := timeutil.Now()
158+
if err := c.f(db, w.asOfSystemTime); err != nil {
159+
return errors.Wrapf(err, "failed check %s", c.name)
160+
}
161+
w.histograms.Get(c.name).Record(timeutil.Since(start))
162+
return nil
163+
}
164+
165+
// filterChecks removes all elements from checks which do not have their name
166+
// in toRun. An error is returned if any elements of toRun do not exist in
167+
// checks. The checks slice is modified in place and returned.
168+
func filterChecks(checks []check, toRun []string) ([]check, error) {
169+
toRunSet := make(map[string]struct{}, len(toRun))
170+
for _, s := range toRun {
171+
toRunSet[s] = struct{}{}
172+
}
173+
filtered := checks[:0]
174+
for _, c := range checks {
175+
if _, exists := toRunSet[c.name]; exists {
176+
filtered = append(filtered, c)
177+
delete(toRunSet, c.name)
178+
}
179+
}
180+
if len(toRunSet) > 0 {
181+
return nil, fmt.Errorf("cannot run checks %v which do not exist", toRun)
182+
}
183+
return filtered, nil
184+
}

0 commit comments

Comments
 (0)