Skip to content

Commit 31871ae

Browse files
craig[bot]yuzefovichnvanbenschotenirfansharif
committed
42845: coldata: expose a modifier for batchSize variable r=yuzefovich a=yuzefovich This commit exposes a setter for batchSize private variable to be modified in tests and runs all tests in sql/colexec with a random batch size in [3, 4096] range. The tests in several places needed to be adjusted because their assumptions might no longer hold when batch size is modified. Fixes: #40791. Release note: None 43409: execinfrapb: print input types one per line in EXPLAIN (DISTSQL, TYPES) r=yuzefovich a=yuzefovich Previously, input types were printed in a single line within the input synchronizer box on the flow diagram. However, when we have a processor with two inputs and when each input has several columns, the boxes would collide and make the types unreadable. This commit changes it to print each type one per line, so there is no collision between different boxes of input synchronizer. Separately, cockroachdb.github.io has been adjusted so that the boxes for the input synchronizers could be displayed well in such scenario. Release note: None 43887: pkg/sql/opt: validate correct use of FOR UPDATE locking clauses r=nvanbenschoten a=nvanbenschoten The second commit updates our SQL grammar to be identical to Postgres regarding select statement locking clauses. Concretely, this results in the following changes in behavior: - We can now parse multiple locking clause items - We can now parse the `FOR READ ONLY` syntax, which is a no-op - We can now parse the `SELECT ... FOR UPDATE ... LIMIT` syntax Ref: https://github.com/postgres/postgres/blob/1a4a0329650b0545a54afb3c317aa289fd817f8a/src/backend/parser/gram.y#L11834 The third commit introduces a number of validation checks into the optbuilder to ensure that FOR UPDATE locking clauses are only being used in places where they're allowed. Most of this was based off of Postgres, which disallows the same set of operations to be used with `FOR [KEY] UPDATE/SHARE` locking. Ref: https://github.com/postgres/postgres/blob/1a4a0329650b0545a54afb3c317aa289fd817f8a/src/backend/parser/analyze.c#L2691 Release note (sql change): Invalid usages of FOR UPDATE locking clauses are now rejected by the SQL optimizer. 43980: storage: rename SSTSnapshotStorage{,Scratch,File} vars r=ajwerner a=irfansharif Around snapshot receiving code, we have variables for SSTSnapshotStorage, SSTSnapshotStorageScratch, and SSTSnapshotStorageFile types all abbreviated to some variation of sss{,s,f}. We also have "ssts" to talk about SSTs. This is all terribly confusing, we could do with some much needed clarity here. Release note: None. Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Nathan VanBenschoten <[email protected]> Co-authored-by: irfan sharif <[email protected]>
5 parents 24f60a8 + 3d83574 + c932bc5 + 8693846 + 5b64ca7 commit 31871ae

27 files changed

+699
-391
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
select_stmt ::=
2-
( simple_select locking_clause | select_clause sort_clause locking_clause | select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause sort_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause )
2+
( select_clause sort_clause | select_clause ( sort_clause | ) for_locking_clause opt_select_limit | select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) opt_for_locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause sort_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause ( sort_clause | ) for_locking_clause opt_select_limit | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) opt_for_locking_clause )
33

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,14 @@ scrub_database_stmt ::=
439439
'EXPERIMENTAL' 'SCRUB' 'DATABASE' database_name opt_as_of_clause
440440

441441
select_no_parens ::=
442-
simple_select locking_clause
443-
| select_clause sort_clause locking_clause
444-
| select_clause opt_sort_clause select_limit locking_clause
445-
| with_clause select_clause locking_clause
446-
| with_clause select_clause sort_clause locking_clause
447-
| with_clause select_clause opt_sort_clause select_limit locking_clause
442+
simple_select
443+
| select_clause sort_clause
444+
| select_clause opt_sort_clause for_locking_clause opt_select_limit
445+
| select_clause opt_sort_clause select_limit opt_for_locking_clause
446+
| with_clause select_clause
447+
| with_clause select_clause sort_clause
448+
| with_clause select_clause opt_sort_clause for_locking_clause opt_select_limit
449+
| with_clause select_clause opt_sort_clause select_limit opt_for_locking_clause
448450

449451
select_with_parens ::=
450452
'(' select_no_parens ')'
@@ -1172,19 +1174,28 @@ simple_select ::=
11721174
| table_clause
11731175
| set_operation
11741176

1175-
locking_clause ::=
1176-
for_locking_strength opt_locked_rels opt_nowait_or_skip
1177-
11781177
select_clause ::=
11791178
simple_select
11801179
| select_with_parens
11811180

1181+
for_locking_clause ::=
1182+
for_locking_items
1183+
| 'FOR' 'READ' 'ONLY'
1184+
1185+
opt_select_limit ::=
1186+
select_limit
1187+
|
1188+
11821189
select_limit ::=
11831190
limit_clause offset_clause
11841191
| offset_clause limit_clause
11851192
| limit_clause
11861193
| offset_clause
11871194

1195+
opt_for_locking_clause ::=
1196+
for_locking_clause
1197+
|
1198+
11881199
set_rest_more ::=
11891200
generic_set
11901201

@@ -1578,18 +1589,8 @@ set_operation ::=
15781589
| select_clause 'INTERSECT' all_or_distinct select_clause
15791590
| select_clause 'EXCEPT' all_or_distinct select_clause
15801591

1581-
for_locking_strength ::=
1582-
'FOR' 'UPDATE'
1583-
| 'FOR' 'NO' 'KEY' 'UPDATE'
1584-
| 'FOR' 'SHARE'
1585-
| 'FOR' 'KEY' 'SHARE'
1586-
1587-
opt_locked_rels ::=
1588-
'OF' table_name_list
1589-
1590-
opt_nowait_or_skip ::=
1591-
'SKIP' 'LOCKED'
1592-
| 'NOWAIT'
1592+
for_locking_items ::=
1593+
( for_locking_item ) ( ( for_locking_item ) )*
15931594

15941595
offset_clause ::=
15951596
'OFFSET' a_expr
@@ -1943,6 +1944,9 @@ all_or_distinct ::=
19431944
| 'DISTINCT'
19441945
|
19451946

1947+
for_locking_item ::=
1948+
for_locking_strength opt_locked_rels opt_nowait_or_skip
1949+
19461950
var_list ::=
19471951
( var_value ) ( ( ',' var_value ) )*
19481952

@@ -2208,6 +2212,19 @@ interval_qualifier ::=
22082212
window_definition_list ::=
22092213
( window_definition ) ( ( ',' window_definition ) )*
22102214

2215+
for_locking_strength ::=
2216+
'FOR' 'UPDATE'
2217+
| 'FOR' 'NO' 'KEY' 'UPDATE'
2218+
| 'FOR' 'SHARE'
2219+
| 'FOR' 'KEY' 'SHARE'
2220+
2221+
opt_locked_rels ::=
2222+
'OF' table_name_list
2223+
2224+
opt_nowait_or_skip ::=
2225+
'SKIP' 'LOCKED'
2226+
| 'NOWAIT'
2227+
22112228
opt_join_hint ::=
22122229
'HASH'
22132230
| 'MERGE'

pkg/col/coldata/batch.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,39 @@ type Batch interface {
5858

5959
var _ Batch = &MemBatch{}
6060

61-
const maxBatchSize = 1024
61+
const (
62+
// MinBatchSize is the minimum acceptable size of batches.
63+
MinBatchSize = 3
64+
// MaxBatchSize is the maximum acceptable size of batches.
65+
MaxBatchSize = 4096
66+
)
6267

68+
// TODO(jordan): tune.
6369
var batchSize = uint16(1024)
6470

6571
// BatchSize is the maximum number of tuples that fit in a column batch.
66-
// TODO(jordan): tune
6772
func BatchSize() uint16 {
6873
return batchSize
6974
}
7075

76+
// SetBatchSizeForTests modifies batchSize variable. It should only be used in
77+
// tests.
78+
func SetBatchSizeForTests(newBatchSize uint16) {
79+
if newBatchSize > MaxBatchSize {
80+
panic(
81+
fmt.Sprintf("requested batch size %d is greater than MaxBatchSize %d",
82+
newBatchSize, MaxBatchSize),
83+
)
84+
}
85+
if newBatchSize < MinBatchSize {
86+
panic(
87+
fmt.Sprintf("requested batch size %d is smaller than MinBatchSize %d",
88+
newBatchSize, MinBatchSize),
89+
)
90+
}
91+
batchSize = newBatchSize
92+
}
93+
7194
// NewMemBatch allocates a new in-memory Batch. A coltypes.Unknown type
7295
// will create a placeholder Vec that may not be accessed.
7396
// TODO(jordan): pool these allocations.

pkg/col/coldata/nulls.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
package coldata
1212

13-
// zeroedNulls is a zeroed out slice representing a bitmap of size maxBatchSize.
13+
// zeroedNulls is a zeroed out slice representing a bitmap of size MaxBatchSize.
1414
// This is copied to efficiently set all nulls.
15-
var zeroedNulls [(maxBatchSize-1)/8 + 1]byte
15+
var zeroedNulls [(MaxBatchSize-1)/8 + 1]byte
1616

17-
// filledNulls is a slice representing a bitmap of size maxBatchSize with every
17+
// filledNulls is a slice representing a bitmap of size MaxBatchSize with every
1818
// single bit set.
19-
var filledNulls [(maxBatchSize-1)/8 + 1]byte
19+
var filledNulls [(MaxBatchSize-1)/8 + 1]byte
2020

2121
// bitMask[i] is a byte with a single bit set at i.
2222
var bitMask = [8]byte{0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80}

pkg/sql/colexec/aggregator_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestAggregatorOneFunc(t *testing.T) {
222222
{7},
223223
{8},
224224
},
225-
batchSize: 4,
225+
batchSize: 3,
226226
outputBatchSize: 1,
227227
name: "CarryBetweenInputAndOutputBatches",
228228
},
@@ -476,6 +476,11 @@ func TestAggregatorRandom(t *testing.T) {
476476
rng, _ := randutil.NewPseudoRand()
477477
ctx := context.Background()
478478
for _, groupSize := range []int{1, 2, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} {
479+
if groupSize == 0 {
480+
// We might be varying coldata.BatchSize() so that when it is divided by
481+
// 4, groupSize is 0. We want to skip such configuration.
482+
continue
483+
}
479484
for _, numInputBatches := range []int{1, 2, 64} {
480485
for _, hasNulls := range []bool{true, false} {
481486
for _, agg := range aggTypes {
@@ -495,7 +500,7 @@ func TestAggregatorRandom(t *testing.T) {
495500
curGroup := -1
496501
for i := range groups {
497502
if i%groupSize == 0 {
498-
expRowCounts = append(expRowCounts, int64(groupSize))
503+
expRowCounts = append(expRowCounts, 0)
499504
expCounts = append(expCounts, 0)
500505
expSums = append(expSums, 0)
501506
expMins = append(expMins, 2048)
@@ -508,8 +513,11 @@ func TestAggregatorRandom(t *testing.T) {
508513
// slower.
509514
aggCol[i] = 2048 * (rng.Float64() - 0.5)
510515

516+
// NULL values contribute to the row count, so we're updating
517+
// the row counts outside of the if block.
518+
expRowCounts[curGroup]++
511519
if hasNulls && rng.Float64() < nullProbability {
512-
aggColNulls.SetNull(uint16(i))
520+
aggColNulls.SetNull64(uint64(i))
513521
} else {
514522
expNulls[curGroup] = false
515523
expCounts[curGroup]++
@@ -704,12 +712,7 @@ func BenchmarkAggregator(b *testing.B) {
704712
a.(resetter).reset()
705713
source.reset()
706714
// Exhaust aggregator until all batches have been read.
707-
foundTuples := 0
708715
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
709-
foundTuples += int(b.Length())
710-
}
711-
if foundTuples != nTuples/groupSize {
712-
b.Fatalf("Found %d tuples, expected %d", foundTuples, nTuples/groupSize)
713716
}
714717
}
715718
},

pkg/sql/colexec/hashjoiner_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,12 @@ func TestHashJoiner(t *testing.T) {
912912
}
913913

914914
for _, outputBatchSize := range []uint16{1, 17, coldata.BatchSize()} {
915+
if outputBatchSize > coldata.BatchSize() {
916+
// It is possible for varied coldata.BatchSize() to be smaller than
917+
// requested outputBatchSize. Such configuration is invalid, and we skip
918+
// it.
919+
continue
920+
}
915921
for _, tc := range tcs {
916922
inputs := []tuples{tc.leftTuples, tc.rightTuples}
917923
typs := [][]coltypes.T{tc.leftTypes, tc.rightTypes}

pkg/sql/colexec/main_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ package colexec
1212

1313
import (
1414
"context"
15+
"fmt"
1516
"os"
1617
"testing"
1718

19+
"github.com/cockroachdb/cockroach/pkg/col/coldata"
1820
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1921
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2022
"github.com/cockroachdb/cockroach/pkg/util/mon"
@@ -43,6 +45,13 @@ func TestMain(m *testing.M) {
4345
testMemAcc = &memAcc
4446
testAllocator = NewAllocator(ctx, testMemAcc)
4547
defer testMemAcc.Close(ctx)
48+
rng, _ := randutil.NewPseudoRand()
49+
// Pick a random batch size in [coldata.MinBatchSize, coldata.MaxBatchSize]
50+
// range.
51+
randomBatchSize := uint16(coldata.MinBatchSize +
52+
rng.Intn(coldata.MaxBatchSize-coldata.MinBatchSize))
53+
fmt.Printf("coldata.BatchSize() is set to %d\n", randomBatchSize)
54+
coldata.SetBatchSizeForTests(randomBatchSize)
4655
return m.Run()
4756
}())
4857
}

pkg/sql/colexec/mergejoiner_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,10 +1814,25 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) {
18141814
defer leaktest.AfterTest(t)()
18151815
ctx := context.Background()
18161816
for _, groupSize := range []int{int(coldata.BatchSize()) / 8, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} {
1817+
if groupSize == 0 {
1818+
// We might be varying coldata.BatchSize() so that when it is divided by
1819+
// 4, groupSize is 0. We want to skip such configuration.
1820+
continue
1821+
}
18171822
for _, numInputBatches := range []int{1, 2, 16} {
18181823
t.Run(fmt.Sprintf("groupSize=%d/numInputBatches=%d", groupSize, numInputBatches),
18191824
func(t *testing.T) {
18201825
nTuples := int(coldata.BatchSize()) * numInputBatches
1826+
// There will be nTuples/groupSize "full" groups - i.e. groups of
1827+
// groupSize. Each of these "full" groups will produce groupSize^2
1828+
// tuples. The last group might be not full and will consist of
1829+
// nTuples % groupSize tuples. That group will produce
1830+
// lastGroupSize^2 tuples.
1831+
// Note that the math will still be correct in case when nTuples is
1832+
// divisible by groupSize - all the groups will be full and "last"
1833+
// group will be of size 0.
1834+
lastGroupSize := nTuples % groupSize
1835+
expCount := nTuples/groupSize*(groupSize*groupSize) + lastGroupSize*lastGroupSize
18211836
typs := []coltypes.T{coltypes.Int64, coltypes.Int64}
18221837
cols := []coldata.Vec{
18231838
testAllocator.NewMemColumn(typs[0], nTuples),
@@ -1870,9 +1885,9 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) {
18701885
i++
18711886
}
18721887

1873-
if count != groupSize*int(coldata.BatchSize())*numInputBatches {
1888+
if count != expCount {
18741889
t.Fatalf("found count %d, expected count %d",
1875-
count, groupSize*int(coldata.BatchSize())*numInputBatches)
1890+
count, expCount)
18761891
}
18771892
})
18781893
}

pkg/sql/colexec/orderedsynchronizer_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ func TestOrderedSyncRandomInput(t *testing.T) {
157157
numInputs := 3
158158
inputLen := 1024
159159
batchSize := uint16(16)
160+
if batchSize > coldata.BatchSize() {
161+
batchSize = coldata.BatchSize()
162+
}
160163

161164
// Generate a random slice of sorted ints.
162165
randInts := make([]int, inputLen)

pkg/sql/colexec/routers.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,14 @@ type routerOutput interface {
3535
cancel()
3636
}
3737

38-
// defaultRouterOutputBlockedThreshold is the number of unread values buffered
39-
// by the routerOutputOp after which the output is considered blocked.
40-
var defaultRouterOutputBlockedThreshold = int(coldata.BatchSize() * 2)
38+
// getDefaultRouterOutputBlockedThreshold returns the number of unread values
39+
// buffered by the routerOutputOp after which the output is considered blocked.
40+
// It is a function rather than a variable so that in tests we could modify
41+
// coldata.BatchSize() (if it were a variable, then its value would be
42+
// evaluated before we set the desired batch size).
43+
func getDefaultRouterOutputBlockedThreshold() int {
44+
return int(coldata.BatchSize()) * 2
45+
}
4146

4247
type routerOutputOp struct {
4348
// input is a reference to our router.
@@ -90,7 +95,7 @@ func newRouterOutputOp(
9095
allocator *Allocator, types []coltypes.T, unblockedEventsChan chan<- struct{},
9196
) *routerOutputOp {
9297
return newRouterOutputOpWithBlockedThresholdAndBatchSize(
93-
allocator, types, unblockedEventsChan, defaultRouterOutputBlockedThreshold, int(coldata.BatchSize()),
98+
allocator, types, unblockedEventsChan, getDefaultRouterOutputBlockedThreshold(), int(coldata.BatchSize()),
9499
)
95100
}
96101

0 commit comments

Comments
 (0)