Skip to content

Commit 04013f7

Browse files
author
Andrew Werner
committed
workload/tpcc: add tpcc-checks workload command
This commit adds a workload to run tpcc checks as a separate workload. This workload is an easy way to create an overload scenario. It can be run with `--as-of` at a historical timestamp to avoid interferring with foreground transactions. Although the hope was that this test could be used to starve node liveness, in practice it seems to kill nodes by OOM. Release note: None
1 parent d45b0bd commit 04013f7

File tree

4 files changed

+197
-11
lines changed

4 files changed

+197
-11
lines changed

pkg/ccl/workloadccl/allccl/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
_ "github.com/cockroachdb/cockroach/pkg/workload/rand"
3030
_ "github.com/cockroachdb/cockroach/pkg/workload/sqlsmith"
3131
_ "github.com/cockroachdb/cockroach/pkg/workload/tpcc"
32+
_ "github.com/cockroachdb/cockroach/pkg/workload/tpccchecks"
3233
_ "github.com/cockroachdb/cockroach/pkg/workload/tpcds"
3334
_ "github.com/cockroachdb/cockroach/pkg/workload/tpch"
3435
_ "github.com/cockroachdb/cockroach/pkg/workload/ycsb"

pkg/workload/tpcc/checks.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,19 @@ import (
1616
"github.com/pkg/errors"
1717
)
1818

19-
type check struct {
20-
name string
19+
// Check is a tpcc consistency check.
20+
type Check struct {
21+
Name string
2122
// If asOfSystemTime is non-empty it will be used to perform the check as
2223
// a historical query using the provided value as the argument to the
2324
// AS OF SYSTEM TIME clause.
24-
f func(db *gosql.DB, asOfSystemTime string) error
25-
expensive bool
25+
Fn func(db *gosql.DB, asOfSystemTime string) error
26+
Expensive bool
2627
}
2728

28-
func allChecks() []check {
29-
return []check{
29+
// AllChecks returns a slice of all of the checks.
30+
func AllChecks() []Check {
31+
return []Check{
3032
{"3.3.2.1", check3321, false},
3133
{"3.3.2.2", check3322, false},
3234
{"3.3.2.3", check3323, false},

pkg/workload/tpcc/tpcc.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,15 +289,15 @@ func (w *tpcc) Hooks() workload.Hooks {
289289
return nil
290290
},
291291
CheckConsistency: func(ctx context.Context, db *gosql.DB) error {
292-
for _, check := range allChecks() {
293-
if !w.expensiveChecks && check.expensive {
292+
for _, check := range AllChecks() {
293+
if !w.expensiveChecks && check.Expensive {
294294
continue
295295
}
296296
start := timeutil.Now()
297-
err := check.f(db, "" /* asOfSystemTime */)
298-
log.Infof(ctx, `check %s took %s`, check.name, timeutil.Since(start))
297+
err := check.Fn(db, "" /* asOfSystemTime */)
298+
log.Infof(ctx, `check %s took %s`, check.Name, timeutil.Since(start))
299299
if err != nil {
300-
return errors.Wrapf(err, `check failed: %s`, check.name)
300+
return errors.Wrapf(err, `check failed: %s`, check.Name)
301301
}
302302
}
303303
return nil
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Copyright 2017 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package tpcc
12+
13+
import (
14+
"context"
15+
gosql "database/sql"
16+
"fmt"
17+
"math/rand"
18+
19+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
20+
"github.com/cockroachdb/cockroach/pkg/workload"
21+
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
22+
"github.com/cockroachdb/cockroach/pkg/workload/tpcc"
23+
"github.com/pkg/errors"
24+
"github.com/spf13/pflag"
25+
)
26+
27+
var tpccChecksMeta = workload.Meta{
28+
Name: `tpcc-checks`,
29+
Description: `tpcc-checks runs the TPC-C consistency checks as a workload.`,
30+
Details: `It is primarily intended as a tool to create an overload scenario.
31+
An --as-of flag is exposed to prevent the work from interfering with a
32+
foreground TPC-C workload`,
33+
Version: `1.0.0`,
34+
New: func() workload.Generator {
35+
g := &tpccChecks{}
36+
g.flags.FlagSet = pflag.NewFlagSet(`tpcc`, pflag.ContinueOnError)
37+
g.flags.Meta = map[string]workload.FlagMeta{
38+
`db`: {RuntimeOnly: true},
39+
`concurrency`: {RuntimeOnly: true},
40+
`as-of`: {RuntimeOnly: true},
41+
}
42+
g.flags.IntVar(&g.concurrency, `concurrency`, 1,
43+
`Number of concurrent workers. Defaults to 1.`,
44+
)
45+
g.flags.StringVar(&g.asOfSystemTime, "as-of", "",
46+
"Timestamp at which the query should be run."+
47+
" If non-empty the provided value will be used as the expression in an"+
48+
" AS OF SYSTEM TIME CLAUSE for all checks.")
49+
checkNames := func() (checkNames []string) {
50+
for _, c := range tpcc.AllChecks() {
51+
checkNames = append(checkNames, c.Name)
52+
}
53+
return checkNames
54+
}()
55+
g.flags.StringSliceVar(&g.checks, "checks", checkNames,
56+
"Name of checks to be run.")
57+
g.connFlags = workload.NewConnFlags(&g.flags)
58+
{ // Set the dbOveride to default to "tpcc".
59+
dbOverrideFlag := g.flags.Lookup(`db`)
60+
dbOverrideFlag.DefValue = `tpcc`
61+
if err := dbOverrideFlag.Value.Set(`tpcc`); err != nil {
62+
panic(err)
63+
}
64+
}
65+
return g
66+
},
67+
}
68+
69+
func (w *tpccChecks) Flags() workload.Flags {
70+
return w.flags
71+
}
72+
73+
func init() {
74+
workload.Register(tpccChecksMeta)
75+
}
76+
77+
type tpccChecks struct {
78+
flags workload.Flags
79+
connFlags *workload.ConnFlags
80+
81+
asOfSystemTime string
82+
checks []string
83+
concurrency int
84+
}
85+
86+
// The tables should already exist, if they do not an error will occur later.
87+
func (*tpccChecks) Tables() []workload.Table {
88+
return nil
89+
}
90+
91+
func (*tpccChecks) Meta() workload.Meta {
92+
return tpccChecksMeta
93+
}
94+
95+
// Ops implements the Opser interface.
96+
func (w *tpccChecks) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, error) {
97+
sqlDatabase, err := workload.SanitizeUrls(w, w.flags.Lookup("db").Value.String(), urls)
98+
if err != nil {
99+
return workload.QueryLoad{}, fmt.Errorf("%v", err)
100+
}
101+
dbs := make([]*gosql.DB, len(urls))
102+
for i, url := range urls {
103+
dbs[i], err = gosql.Open(`cockroach`, url)
104+
if err != nil {
105+
return workload.QueryLoad{}, errors.Wrapf(err, "failed to dial %s", url)
106+
}
107+
// Set the maximum number of open connections to 3x the concurrency because
108+
// that's the maximum number of connections used by any check at once.
109+
dbs[i].SetMaxOpenConns(3 * w.concurrency)
110+
dbs[i].SetMaxIdleConns(3 * w.concurrency)
111+
}
112+
ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
113+
ql.WorkerFns = make([]func(context.Context) error, w.concurrency)
114+
checks, err := filterChecks(tpcc.AllChecks(), w.checks)
115+
if err != nil {
116+
return workload.QueryLoad{}, err
117+
}
118+
for i := range ql.WorkerFns {
119+
worker := newCheckWorker(dbs, checks, reg.GetHandle(), w.asOfSystemTime)
120+
ql.WorkerFns[i] = worker.run
121+
}
122+
// Preregister all of the histograms so they always print.
123+
for _, c := range checks {
124+
reg.GetHandle().Get(c.Name)
125+
}
126+
return ql, nil
127+
}
128+
129+
type checkWorker struct {
130+
dbs []*gosql.DB
131+
checks []tpcc.Check
132+
histograms *histogram.Histograms
133+
asOfSystemTime string
134+
dbPerm []int
135+
checkPerm []int
136+
i int
137+
}
138+
139+
func newCheckWorker(
140+
dbs []*gosql.DB, checks []tpcc.Check, histograms *histogram.Histograms, asOfSystemTime string,
141+
) *checkWorker {
142+
return &checkWorker{
143+
dbs: dbs,
144+
checks: checks,
145+
histograms: histograms,
146+
asOfSystemTime: asOfSystemTime,
147+
dbPerm: rand.Perm(len(dbs)),
148+
checkPerm: rand.Perm(len(checks)),
149+
}
150+
}
151+
152+
func (w *checkWorker) run(ctx context.Context) error {
153+
defer func() { w.i++ }()
154+
c := w.checks[w.checkPerm[w.i%len(w.checks)]]
155+
db := w.dbs[w.dbPerm[w.i%len(w.dbs)]]
156+
start := timeutil.Now()
157+
if err := c.Fn(db, w.asOfSystemTime); err != nil {
158+
return errors.Wrapf(err, "failed check %s", c.Name)
159+
}
160+
w.histograms.Get(c.Name).Record(timeutil.Since(start))
161+
return nil
162+
}
163+
164+
// filterChecks removes all elements from checks which do not have their name
165+
// in toRun. An error is returned if any elements of toRun do not exist in
166+
// checks. The checks slice is modified in place and returned.
167+
func filterChecks(checks []tpcc.Check, toRun []string) ([]tpcc.Check, error) {
168+
toRunSet := make(map[string]struct{}, len(toRun))
169+
for _, s := range toRun {
170+
toRunSet[s] = struct{}{}
171+
}
172+
filtered := checks[:0]
173+
for _, c := range checks {
174+
if _, exists := toRunSet[c.Name]; exists {
175+
filtered = append(filtered, c)
176+
delete(toRunSet, c.Name)
177+
}
178+
}
179+
if len(toRunSet) > 0 {
180+
return nil, fmt.Errorf("cannot run checks %v which do not exist", toRun)
181+
}
182+
return filtered, nil
183+
}

0 commit comments

Comments
 (0)