-
Notifications
You must be signed in to change notification settings - Fork 3.9k
exec: Add memory estimation and monitoring for streaming operators. #38796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is pretty big PR. Reviews + discussion are very much appreciated. Let me know if I missed any operators! |
92f6093
to
9ee8613
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the general approach, although I'm wondering if it would be cleaner to either 1) just return the number of bytes that an operator will use or 2) Have the operators you care about implement an interface that will return the number of bytes used given some types and in either case increment the account in newColOperator
after creating an operator and then again after creating any post processing operators. I like the second approach better and it'll be nice to not have to care about the mon
package, modify tests that don't care about memory, or modify constructors. We'll have to do something similar for buffering operators and since they will return the same amount of memory regardless of the types.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, @rohany, and @solongordon)
pkg/sql/distsqlrun/column_exec_setup.go, line 554 at r1 (raw file):
func planSelectionOperators( ctx context.Context, tctx *tree.EvalContext,
nit: s/tctx/evalCtx
pkg/sql/distsqlrun/columnar_utils_test.go, line 76 at r1 (raw file):
columnarizers := make([]exec.Operator, len(inputs)) for i, input := range inputsColOp { c, err := newColumnarizer(ctx, flowCtx, int32(i)+1, input, nil)
nit: add block comments to nil
arguments
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 72 at r1 (raw file):
}{ { desc: "Test construct sorttopk",
nit: use camel case and no spaces, there's also no need to be overly descriptive (e.g. TopK
is enough in this case, on failure this will be printed as TestVectorizeSpaceError/TopK
which is informative enough). It's also nice to use short names to minimize typing mistakes for when you need to rerun a subset of tests
pkg/sql/exec/mem_estimation.go, line 32 at r1 (raw file):
// much space each byte array takes up. Use some default value as a // heuristic right now. acc += 100
I would put a big warning at the top that this function only really works for fixed-width types and maybe mention that there will be a transition to specifying batch sizes in terms of bytes, which will remove the need for any estimation.
pkg/sql/exec/mem_estimation.go, line 36 at r1 (raw file):
acc++ case types.Int16: acc += 2
nit: You could improve readability by extracting constants and using them here:
const (
sizeOfInt8 = int(unsafe.Sizeof(int8(0)))
sizeOfInt16 = int(unsafe.Sizeof(int16(0)))
sizeOfInt32 = int(unsafe.Sizeof(int32(0)))
sizeOfInt64 = int(unsafe.Sizeof(int64(0)))
sizeOfFloat32 = int(unsafe.Sizeof(float32(0)))
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
)
9ee8613
to
9c12409
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @solongordon)
pkg/sql/exec/mem_estimation.go, line 32 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I would put a big warning at the top that this function only really works for fixed-width types and maybe mention that there will be a transition to specifying batch sizes in terms of bytes, which will remove the need for any estimation.
Done.
pkg/sql/exec/mem_estimation.go, line 36 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
nit: You could improve readability by extracting constants and using them here:
const ( sizeOfInt8 = int(unsafe.Sizeof(int8(0))) sizeOfInt16 = int(unsafe.Sizeof(int16(0))) sizeOfInt32 = int(unsafe.Sizeof(int32(0))) sizeOfInt64 = int(unsafe.Sizeof(int64(0))) sizeOfFloat32 = int(unsafe.Sizeof(float32(0))) sizeOfFloat64 = int(unsafe.Sizeof(float64(0))) )
Ok, I wasn't sure if it was ok to use the unsafe package or not.
RFAL -- followed alfonso's suggestion, and the code seems cleaner than before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I think it does look cleaner although this is making me think that fully integrating #38394 into the allocation flow will probably be beneficial for streaming operators as well to both verify that memory declared is not too far off from actual memory requested (this would be great for logic tests)
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, @rohany, and @solongordon)
pkg/sql/distsqlrun/column_exec_setup.go, line 102 at r2 (raw file):
spec *distsqlpb.ProcessorSpec, inputs []exec.Operator, ) (exec.Operator, []types.T, int, error) {
Seems like we might as well make these named return variables at this point
pkg/sql/distsqlrun/column_exec_setup.go, line 479 at r2 (raw file):
// After constructing the base operator, calculate the memory usage // of the operator. if sMemOp, ok := op.(exec.StaticMemoryOperator); ok {
When we talked about this, did we mention that we sometimes created multiple operators in the above switch? Skimming it, I see it in the windower case, which is not something we care about right now but want to be sure we're not missing a case in the static operator creation.
pkg/sql/distsqlrun/column_exec_setup.go, line 495 at r2 (raw file):
if !post.Filter.Empty() { var helper exprHelper var memUsed int
nit: s/memUsed/selectionMem
or something similar. Also, another way to declare variables on multiple lines is:
var (
helper exprHelper
memUsed int
)
Up to you
pkg/sql/distsqlrun/column_exec_setup.go, line 919 at r2 (raw file):
return nil, nil, memUsed, err } inbox, err := colrpc.NewInbox(conv.FromColumnTypes(input.ColumnTypes))
I think we need to account for the static memory used by the inbox
here as well, otherwise we won't be counting remote data.
pkg/sql/distsqlrun/column_exec_setup.go, line 1064 at r2 (raw file):
} if err = acc.Grow(ctx, int64(memUsed)); err != nil { return errors.Wrapf(err, "Not enough memory to setup vectorized plan.")
nit: error message aren't capitalized or have punctuation by convention (https://github.com/golang/go/wiki/CodeReviewComments#error-strings)
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 28 at r2 (raw file):
) func TestVectorizeSpaceError(t *testing.T) {
I wonder if there's a way to do some black box testing of this addition (maybe not now). This will probably be easier with the addition of @solongordon's BatchAllocator but if we had some way to globally track the batches actually allocated, we could probably add a testing knob to logic tests similar to metadata verification that would then verify that memory usage reported to the monitor is around what was requested from the BatchAllocator.
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 108 at r2 (raw file):
}, { desc: "aggergation",
s/aggergation/aggregation
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 127 at r2 (raw file):
ctx, "Unlimited Monitor", mon.MemoryResource, nil, nil, math.MaxInt64, st) } else { memMon = mon.MakeMonitorWithLimit(
It's a bit subtle, but hard-limit monitors like this one are only used in processors that fall back to disk. To mirror the monitor that is used in setupVectorized
you have to do something like:
memMon := mon.MakeMonitor(...)
if succ {
memMon.Start(..., mon.MakeStandaloneBudget(math.MaxInt64))
} else {
memMon.Start(..., mon.MakeStandaloneBudget(1))
}
defer memMon.Stop()
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 134 at r2 (raw file):
err = acc.Grow(ctx, int64(memUsed)) if succ && err != nil { t.Fatal("Expected success, found: ", err)
nit: I think this will print double spaces
9c12409
to
0665485
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, @rohany, and @solongordon)
pkg/sql/distsqlrun/column_exec_setup.go, line 102 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Seems like we might as well make these named return variables at this point
Done.
pkg/sql/distsqlrun/column_exec_setup.go, line 479 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
When we talked about this, did we mention that we sometimes created multiple operators in the above switch? Skimming it, I see it in the windower case, which is not something we care about right now but want to be sure we're not missing a case in the static operator creation.
I went through the cases and made sure to increment the memory usage when we layer operators, or a StaticMemoryOperator gets layered over.
pkg/sql/distsqlrun/column_exec_setup.go, line 919 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think we need to account for the static memory used by the
inbox
here as well, otherwise we won't be counting remote data.
Done.
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 108 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
s/aggergation/aggregation
Done.
pkg/sql/distsqlrun/flow_vectorize_space_test.go, line 127 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
It's a bit subtle, but hard-limit monitors like this one are only used in processors that fall back to disk. To mirror the monitor that is used in
setupVectorized
you have to do something like:memMon := mon.MakeMonitor(...) if succ { memMon.Start(..., mon.MakeStandaloneBudget(math.MaxInt64)) } else { memMon.Start(..., mon.MakeStandaloneBudget(1)) } defer memMon.Stop()
Done.
918789f
to
2f1d295
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, @rohany, and @solongordon)
pkg/sql/distsqlrun/column_exec_setup.go, line 1108 at r3 (raw file):
op, outputTypes, memUsage, err := newColOperator(ctx, &f.FlowCtx, pspec, inputs) if err != nil { return errors.Wrapf(err, "Unable to vectorize execution plan.")
nit: same thing here and below about the error messages
We want to error out early of our vectorized execution if there is not enough memory available to run the query, especially if we can tell upfront that this is the case. Some streaming operators always use a static amount of memory, so we can monitor this memory during construction of the vectorized plan. Due to difficulties with traversing the vectorized flow once is it constructed, we monitor memory during construction of each operator, and have streaming operators estimate how much memory they will use during construction. This PR adds memory estimation to the following operators: * CountOp * Aggregate operators * TopK sorter * Columnarizer * Coalescer * OrderedSynchronizer * Projection operators Release note: None
2f1d295
to
b9d62ca
Compare
bors r=asubiotto |
38796: exec: Add memory estimation and monitoring for streaming operators. r=asubiotto a=rohany We want to error out early of our vectorized execution if there is not enough memory available to run the query, especially if we can tell upfront that this is the case. Some streaming operators always use a static amount of memory, so we can monitor this memory during construction of the vectorized plan. Due to difficulties with traversing the vectorized flow once it is constructed, we monitor memory during construction of each operator, and have streaming operators estimate how much memory they will use during construction. This PR adds memory estimation to the following operators: * CountOp * Aggregate operators * TopK sorter * Columnarizer * Coalescer * OrderedSynchronizer * Projection operators Release note: None Co-authored-by: Rohan Yadav <[email protected]>
Build succeeded |
Something I just realized: I think we might need to close the vectorized bound account on setup error as well, otherwise we'll never clear the memory when we fail to set up a vectorized flow in some cases. Failures on remote nodes won't ever call |
I'm touching this code now so will fix. |
That makes sense -- good catch. |
We want to error out early of our vectorized execution if there is not enough memory available to run the query, especially if we can tell upfront that this is the case. Some streaming operators always use a static amount of memory, so we can monitor this memory during construction of the vectorized plan. Due to difficulties with traversing the vectorized flow once it is constructed, we monitor memory during construction of each operator, and have streaming operators estimate how much memory they will use during construction. This PR adds memory estimation to the following operators:
Release note: None