Skip to content

exec: Add memory estimation and monitoring for streaming operators. #38796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 120 additions & 90 deletions pkg/sql/distsqlrun/column_exec_setup.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/columnar_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func verifyColOperator(
columnarizers[i] = c
}

colOp, _, err := newColOperator(ctx, flowCtx, pspec, columnarizers)
colOp, _, _, err := newColOperator(ctx, flowCtx, pspec, columnarizers)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/distsqlrun/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type columnarizer struct {
}

var _ exec.Operator = &columnarizer{}
var _ exec.StaticMemoryOperator = &columnarizer{}

// newColumnarizer returns a new columnarizer.
func newColumnarizer(flowCtx *FlowCtx, processorID int32, input RowSource) (*columnarizer, error) {
Expand All @@ -54,9 +55,14 @@ func newColumnarizer(flowCtx *FlowCtx, processorID int32, input RowSource) (*col
return nil, err
}
c.Init()

return c, nil
}

func (c *columnarizer) EstimateStaticMemoryUsage() int {
return exec.EstimateBatchSizeBytes(conv.FromColumnTypes(c.OutputTypes()), coldata.BatchSize)
}

func (c *columnarizer) Init() {
typs := conv.FromColumnTypes(c.OutputTypes())
c.batch = coldata.NewMemBatch(typs)
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type FlowCtx struct {

// local is true if this flow is being run as part of a local-only query.
local bool

vectorizedBoundAccount *mon.BoundAccount
}

// NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext.
Expand Down Expand Up @@ -497,7 +499,9 @@ func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
f.spec = spec

if f.EvalCtx.SessionData.Vectorize != sessiondata.VectorizeOff {
err := f.setupVectorized(ctx)
acc := f.EvalCtx.Mon.MakeBoundAccount()
f.vectorizedBoundAccount = &acc
err := f.setupVectorized(ctx, f.vectorizedBoundAccount)
if err == nil {
log.VEventf(ctx, 1, "vectorized flow.")
return nil
Expand Down Expand Up @@ -691,6 +695,11 @@ func (f *Flow) Cleanup(ctx context.Context) {
if f.status == FlowFinished {
panic("flow cleanup called twice")
}

if f.vectorizedBoundAccount != nil {
f.vectorizedBoundAccount.Close(ctx)
}

// This closes the monitor opened in ServerImpl.setupFlow.
f.EvalCtx.Stop(ctx)
for _, p := range f.processors {
Expand Down
143 changes: 143 additions & 0 deletions pkg/sql/distsqlrun/flow_vectorize_space_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package distsqlrun

import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

func TestVectorizeSpaceError(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)

flowCtx := &FlowCtx{
Settings: st,
EvalCtx: &evalCtx,
}

// Without a limit, the default sorter creates a vectorized operater
// that we don't know memory usage of statically.
sorterCore := &distsqlpb.SorterSpec{
OutputOrdering: distsqlpb.Ordering{
Columns: []distsqlpb.Ordering_Column{
{
ColIdx: 0,
Direction: distsqlpb.Ordering_Column_ASC,
},
},
},
}

aggregatorCore := &distsqlpb.AggregatorSpec{
Type: distsqlpb.AggregatorSpec_SCALAR,
Aggregations: []distsqlpb.AggregatorSpec_Aggregation{
{
Func: distsqlpb.AggregatorSpec_MAX,
ColIdx: []uint32{0},
},
},
}

input := []distsqlpb.InputSyncSpec{{
ColumnTypes: []types.T{*types.Int},
}}

testCases := []struct {
desc string
spec *distsqlpb.ProcessorSpec
}{
{
desc: "topk",
spec: &distsqlpb.ProcessorSpec{
Input: input,
Core: distsqlpb.ProcessorCoreUnion{
Sorter: sorterCore,
},
Post: distsqlpb.PostProcessSpec{
Limit: 5,
},
},
},
{
desc: "projection",
spec: &distsqlpb.ProcessorSpec{
Input: input,
Core: distsqlpb.ProcessorCoreUnion{
Sorter: sorterCore,
},
Post: distsqlpb.PostProcessSpec{
RenderExprs: []distsqlpb.Expression{{Expr: "@1 + 1"}},
},
},
},
{
desc: "in_projection",
spec: &distsqlpb.ProcessorSpec{
Input: input,
Core: distsqlpb.ProcessorCoreUnion{
Sorter: sorterCore,
},
Post: distsqlpb.PostProcessSpec{
RenderExprs: []distsqlpb.Expression{{Expr: "@1 IN (1, 2)"}},
},
},
},
{
desc: "aggregation",
spec: &distsqlpb.ProcessorSpec{
Input: input,
Core: distsqlpb.ProcessorCoreUnion{
Aggregator: aggregatorCore,
},
},
},
}

for _, tc := range testCases {
for _, succ := range []bool{true, false} {
t.Run(fmt.Sprintf("%s-success-expected-%t", tc.desc, succ), func(t *testing.T) {
inputs := []exec.Operator{exec.NewZeroOp(nil)}
memMon := mon.MakeMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st)
if succ {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
} else {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1))
}
acc := memMon.MakeBoundAccount()
_, _, memUsed, err := newColOperator(ctx, flowCtx, tc.spec, inputs)
if err != nil {
t.Fatal(err)
}
err = acc.Grow(ctx, int64(memUsed))
if succ && err != nil {
t.Fatal("Expected success, found:", err)
}
if !succ && err == nil {
t.Fatal("Expected memory error, found nothing.")
}
})
}
}
}
5 changes: 5 additions & 0 deletions pkg/sql/exec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type orderedAggregator struct {
}

var _ Operator = &orderedAggregator{}
var _ StaticMemoryOperator = &orderedAggregator{}

// NewOrderedAggregator creates an ordered aggregator on the given grouping
// columns. aggCols is a slice where each index represents a new aggregation
Expand Down Expand Up @@ -218,6 +219,10 @@ func makeAggregateFuncs(
return funcs, outTyps, nil
}

func (a *orderedAggregator) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes(a.outputTypes, coldata.BatchSize*2)
}

func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
a.input.Init()

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec/coalescer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type coalescerOp struct {
}

var _ Operator = &coalescerOp{}
var _ StaticMemoryOperator = &coalescerOp{}

// NewCoalescerOp creates a new coalescer operator on the given input operator
// with the given column types.
Expand All @@ -38,6 +39,10 @@ func NewCoalescerOp(input Operator, colTypes []types.T) Operator {
}
}

func (p *coalescerOp) EstimateStaticMemoryUsage() int {
return 2 * EstimateBatchSizeBytes(p.inputTypes, coldata.BatchSize)
}

func (p *coalescerOp) Init() {
p.input.Init()
p.group = coldata.NewMemBatch(p.inputTypes)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/exec/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Inbox struct {
}

var _ exec.Operator = &Inbox{}
var _ exec.StaticMemoryOperator = &Inbox{}

// NewInbox creates a new Inbox.
func NewInbox(typs []types.T) (*Inbox, error) {
Expand Down Expand Up @@ -112,6 +113,11 @@ func NewInbox(typs []types.T) (*Inbox, error) {
return i, nil
}

// EstimateStaticMemoryUsage implements the StaticMemoryOperator interface.
func (i *Inbox) EstimateStaticMemoryUsage() int {
return exec.EstimateBatchSizeBytes(i.typs, coldata.BatchSize)
}

// maybeInit calls Inbox.init if the inbox is not initialized and returns an
// error if the initialization was not successful. Usually this is because the
// given context is canceled before the remote stream arrives.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type countOp struct {
}

var _ Operator = &countOp{}
var _ StaticMemoryOperator = &countOp{}

// NewCountOp returns a new count operator that counts the rows in its input.
func NewCountOp(input Operator) Operator {
Expand All @@ -40,6 +41,10 @@ func NewCountOp(input Operator) Operator {
return c
}

func (c *countOp) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes([]types.T{types.Int64}, 1)
}

func (c *countOp) Init() {
c.input.Init()
// Our output is always just one row.
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/exec/execgen/cmd/execgen/projection_ops_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type {{template "opRConstName" .}} struct {
outputIdx int
}

func (p {{template "opRConstName" .}}) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes([]types.T{types.{{.RetTyp}}}, coldata.BatchSize)
}

func (p {{template "opRConstName" .}}) Next(ctx context.Context) coldata.Batch {
batch := p.input.Next(ctx)
n := batch.Length()
Expand Down Expand Up @@ -95,6 +99,10 @@ type {{template "opLConstName" .}} struct {
outputIdx int
}

func (p {{template "opLConstName" .}}) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes([]types.T{types.{{.RetTyp}}}, coldata.BatchSize)
}

func (p {{template "opLConstName" .}}) Next(ctx context.Context) coldata.Batch {
batch := p.input.Next(ctx)
n := batch.Length()
Expand Down Expand Up @@ -139,6 +147,10 @@ type {{template "opName" .}} struct {
outputIdx int
}

func (p {{template "opName" .}}) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes([]types.T{types.{{.RetTyp}}}, coldata.BatchSize)
}

func (p {{template "opName" .}}) Next(ctx context.Context) coldata.Batch {
batch := p.input.Next(ctx)
n := batch.Length()
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/exec/mem_estimation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package exec

import (
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)

const (
sizeOfBool = int(unsafe.Sizeof(true))
sizeOfInt8 = int(unsafe.Sizeof(int8(0)))
sizeOfInt16 = int(unsafe.Sizeof(int16(0)))
sizeOfInt32 = int(unsafe.Sizeof(int32(0)))
sizeOfInt64 = int(unsafe.Sizeof(int64(0)))
sizeOfFloat32 = int(unsafe.Sizeof(float32(0)))
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
)

// EstimateBatchSizeBytes returns an estimated amount of bytes needed to
// store a batch in memory that has column types vecTypes.
// WARNING: This only is correct for fixed width types, and returns an
// estimate for non fixed width types. In future it might be possible to
// remove the need for estimation by specifying batch sizes in terms of bytes.
func EstimateBatchSizeBytes(vecTypes []types.T, batchLength int) int {
// acc represents the number of bytes to represent a row in the batch.
acc := 0
for _, t := range vecTypes {
switch t {
case types.Bool:
acc += sizeOfBool
case types.Bytes:
// We don't know without looking at the data in a batch to see how
// much space each byte array takes up. Use some default value as a
// heuristic right now.
acc += 100
case types.Int8:
acc += sizeOfInt8
case types.Int16:
acc += sizeOfInt16
case types.Int32:
acc += sizeOfInt32
case types.Int64:
acc += sizeOfInt64
case types.Float32:
acc += sizeOfFloat32
case types.Float64:
acc += sizeOfFloat64
case types.Decimal:
// Similar to byte arrays, we can't tell how much space is used
// to hold the arbitrary precision decimal objects.
acc += 50
default:
panic(fmt.Sprintf("unhandled type %s", t))
}
}
return acc * batchLength
}
Loading