Skip to content

Commit d21dad8

Browse files
craig[bot]asubiotto
craig[bot]
andcommitted
Merge #40668
40668: exec: reset internal unsafeBatch in orderedAggregator r=yuzefovich a=asubiotto Not doing so could lead to correctness results. This was not caught by tests, so runTests has been extended to check for operators that can be initialized with a variable output size. This increases verifySelAndNullResets's test coverage, since it doesn't do anything if only a single batch is output, which is the case for most unit tests. Release note: None Release justification: This commit fixes correctness bugs and increases test coverage (Category 2). Fixes #40641 Co-authored-by: Alfonso Subiotto Marqués <[email protected]>
2 parents 66d92be + a558a37 commit d21dad8

File tree

6 files changed

+31
-22
lines changed

6 files changed

+31
-22
lines changed

pkg/sql/colexec/aggregator.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,11 @@ func (a *orderedAggregator) EstimateStaticMemoryUsage() int {
246246
return EstimateBatchSizeBytes(a.outputTypes, coldata.BatchSize*2)
247247
}
248248

249-
func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
249+
func (a *orderedAggregator) initWithOutputBatchSize(outputSize uint16) {
250+
a.initWithInputAndOutputBatchSize(coldata.BatchSize, int(outputSize))
251+
}
252+
253+
func (a *orderedAggregator) initWithInputAndOutputBatchSize(inputSize, outputSize int) {
250254
a.input.Init()
251255

252256
// Twice the input batchSize is allocated to avoid having to check for
@@ -261,10 +265,11 @@ func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
261265
}
262266

263267
func (a *orderedAggregator) Init() {
264-
a.initWithBatchSize(coldata.BatchSize, coldata.BatchSize)
268+
a.initWithInputAndOutputBatchSize(coldata.BatchSize, coldata.BatchSize)
265269
}
266270

267271
func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
272+
a.unsafeBatch.ResetInternalBatch()
268273
if a.scratch.shouldResetInternalBatch {
269274
a.scratch.ResetInternalBatch()
270275
a.scratch.shouldResetInternalBatch = false

pkg/sql/colexec/aggregator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func TestAggregatorOneFunc(t *testing.T) {
279279
out := newOpTestOutput(a, []int{0}, tc.expected)
280280
// Explicitly reinitialize the aggregator with the given output batch
281281
// size.
282-
a.(*orderedAggregator).initWithBatchSize(tc.batchSize, tc.outputBatchSize)
282+
a.(*orderedAggregator).initWithInputAndOutputBatchSize(tc.batchSize, tc.outputBatchSize)
283283
if err := out.VerifyAnyOrder(); err != nil {
284284
t.Fatal(err)
285285
}

pkg/sql/colexec/mergejoiner.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,8 @@ type mergeJoinBase struct {
351351
right mergeJoinInput
352352

353353
// Output buffer definition.
354-
output coldata.Batch
355-
needToResetOutput bool
356-
outputBatchSize uint16
354+
output coldata.Batch
355+
outputBatchSize uint16
357356
// outputReady is a flag to indicate that merge joiner is ready to emit an
358357
// output batch.
359358
outputReady bool
@@ -402,10 +401,10 @@ func (o *mergeJoinBase) EstimateStaticMemoryUsage() int {
402401
}
403402

404403
func (o *mergeJoinBase) Init() {
405-
o.initWithBatchSize(coldata.BatchSize)
404+
o.initWithOutputBatchSize(coldata.BatchSize)
406405
}
407406

408-
func (o *mergeJoinBase) initWithBatchSize(outBatchSize uint16) {
407+
func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize uint16) {
409408
o.output = coldata.NewMemBatchWithSize(o.getOutColTypes(), int(outBatchSize))
410409
o.left.source.Init()
411410
o.right.source.Init()

pkg/sql/colexec/mergejoiner_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/util/randutil"
2323
)
2424

25-
type mjTestInitializer interface {
26-
initWithBatchSize(outBatchSize uint16)
27-
}
28-
2925
// TODO(yuzefovich): add unit tests for cases with ON expression.
3026

3127
type mjTestCase struct {
@@ -1528,8 +1524,8 @@ func TestMergeJoiner(t *testing.T) {
15281524
// We use a custom verifier function so that we can get the merge join op
15291525
// to use a custom output batch size per test, to exercise more cases.
15301526
var mergeJoinVerifier verifier = func(output *opTestOutput) error {
1531-
if mj, ok := output.input.(mjTestInitializer); ok {
1532-
mj.initWithBatchSize(tc.outputBatchSize)
1527+
if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok {
1528+
mj.initWithOutputBatchSize(tc.outputBatchSize)
15331529
} else {
15341530
t.Fatalf("unexpectedly merge joiner doesn't implement mjTestInitializer")
15351531
}
@@ -1597,7 +1593,7 @@ func TestMergeJoinerMultiBatch(t *testing.T) {
15971593
t.Fatal("error in merge join op constructor", err)
15981594
}
15991595

1600-
a.(*mergeJoinInnerOp).initWithBatchSize(outBatchSize)
1596+
a.(*mergeJoinInnerOp).initWithOutputBatchSize(outBatchSize)
16011597

16021598
i := 0
16031599
count := 0
@@ -1729,7 +1725,7 @@ func TestMergeJoinerLongMultiBatchCount(t *testing.T) {
17291725
t.Fatal("error in merge join op constructor", err)
17301726
}
17311727

1732-
a.(*mergeJoinInnerOp).initWithBatchSize(outBatchSize)
1728+
a.(*mergeJoinInnerOp).initWithOutputBatchSize(outBatchSize)
17331729

17341730
count := 0
17351731
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {

pkg/sql/colexec/mergejoiner_tmpl.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,13 +1238,10 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) calculateOutputCount(
12381238
}
12391239

12401240
func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Context) coldata.Batch {
1241+
o.output.ResetInternalBatch()
12411242
for {
12421243
switch o.state {
12431244
case mjEntry:
1244-
if o.needToResetOutput {
1245-
o.needToResetOutput = false
1246-
o.output.ResetInternalBatch()
1247-
}
12481245
o.initProberState(ctx)
12491246

12501247
if o.nonEmptyBufferedGroup() {
@@ -1281,7 +1278,6 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Conte
12811278
o.output.SetLength(o.builderState.outCount)
12821279
// Reset builder out count.
12831280
o.builderState.outCount = uint16(0)
1284-
o.needToResetOutput = true
12851281
o.outputReady = false
12861282
return o.output
12871283
}

pkg/sql/colexec/utils_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ func maybeHasNulls(b coldata.Batch) bool {
7878

7979
type testRunner func(*testing.T, []tuples, []coltypes.T, tuples, verifier, []int, func([]Operator) (Operator, error))
8080

81+
// variableOutputBatchSizeInitializer is implemented by operators that can be
82+
// initialized with variable output size batches. This allows runTests to
83+
// increase test coverage of these operators.
84+
type variableOutputBatchSizeInitializer interface {
85+
initWithOutputBatchSize(uint16)
86+
}
87+
8188
// runTests is a helper that automatically runs your tests with varied batch
8289
// sizes and with and without a random selection vector.
8390
// tups is the sets of input tuples.
@@ -219,7 +226,13 @@ func runTestsWithoutAllNullsInjection(
219226
if err != nil {
220227
t.Fatal(err)
221228
}
222-
op.Init()
229+
if vbsiOp, ok := op.(variableOutputBatchSizeInitializer); ok {
230+
// initialize the operator with a very small output batch size to
231+
// increase the likelihood that multiple batches will be output.
232+
vbsiOp.initWithOutputBatchSize(1)
233+
} else {
234+
op.Init()
235+
}
223236
ctx := context.Background()
224237
b := op.Next(ctx)
225238
if round == 1 {

0 commit comments

Comments
 (0)