Skip to content

storage/txnrecovery: add transaction recovery metrics #37807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func declareKeysEndTransaction(

// If the request is intending to finalize the transaction record then it
// needs to declare a few extra keys.
if !needsStaging(et) {
if !et.IsParallelCommit() {
// All requests that intent on resolving local intents need to depend on
// the range descriptor because they need to determine which intents are
// within the local range.
Expand Down Expand Up @@ -281,7 +281,7 @@ func evalEndTransaction(
// before being explicitly committed, write the staged transaction
// record and return without running commit triggers or resolving local
// intents.
if needsStaging(args) {
if args.IsParallelCommit() {
// It's not clear how to combine transaction recovery with commit
// triggers, so for now we don't allow them to mix. This shouldn't
// cause any issues and the txn coordinator knows not to mix them.
Expand Down Expand Up @@ -440,13 +440,6 @@ func canForwardSerializableTimestamp(txn *roachpb.Transaction, noRefreshSpans bo
return !txn.OrigTimestampWasObserved && noRefreshSpans
}

// needsStaging determines whether the EndTransaction request requires
// that a transaction move to the STAGING state before committing or
// not.
func needsStaging(args *roachpb.EndTransactionRequest) bool {
return args.IsParallelCommit()
}

const intentResolutionBatchSize = 500

// resolveLocalIntents synchronously resolves any intents that are
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,13 +1315,13 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
TestingKnobs: s.cfg.TestingKnobs.IntentResolverKnobs,
RangeDescriptorCache: s.cfg.RangeDescriptorCache,
})
s.metrics.registry.AddMetricStruct(s.intentResolver.Metrics)

// Create the recovery manager.
s.recoveryMgr = txnrecovery.NewManager(
s.cfg.AmbientCtx, s.cfg.Clock, s.db, stopper,
)

s.metrics.registry.AddMetricStruct(s.intentResolver.Metrics)
s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics())

s.rangeIDAlloc = idAlloc

Expand Down
49 changes: 43 additions & 6 deletions pkg/storage/txnrecovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Manager interface {
ResolveIndeterminateCommit(
context.Context, *roachpb.IndeterminateCommitError,
) (*roachpb.Transaction, error)

// Metrics returns the Manager's metrics struct.
Metrics() Metrics
}

const (
Expand All @@ -64,14 +67,13 @@ const (
)

// manager implements the Manager interface.
//
// TODO(nvanbenschoten): Add metrics to give visibility into transaction recovery.
type manager struct {
log.AmbientContext

clock *hlc.Clock
db *client.DB
stopper *stop.Stopper
metrics Metrics
txns singleflight.Group
sem chan struct{}
}
Expand All @@ -86,6 +88,7 @@ func NewManager(
clock: clock,
db: db,
stopper: stopper,
metrics: makeMetrics(),
sem: make(chan struct{}, defaultTaskLimit),
}
}
Expand Down Expand Up @@ -128,12 +131,16 @@ func (m *manager) ResolveIndeterminateCommit(
// outcome.
func (m *manager) resolveIndeterminateCommitForTxn(
txn *roachpb.Transaction,
) (*roachpb.Transaction, error) {
) (resTxn *roachpb.Transaction, resErr error) {
// Record the recovery attempt in the Manager's metrics.
onComplete := m.updateMetrics()
defer func() { onComplete(resTxn, resErr) }()

// TODO(nvanbenschoten): Set up tracing.
ctx := m.AnnotateCtx(context.Background())

var resTxn *roachpb.Transaction
err := m.stopper.RunTaskWithErr(ctx,
// Launch the recovery task.
resErr = m.stopper.RunTaskWithErr(ctx,
"recovery.manager: resolving indeterminate commit",
func(ctx context.Context) error {
// Grab semaphore with defaultTaskLimit.
Expand Down Expand Up @@ -164,7 +171,7 @@ func (m *manager) resolveIndeterminateCommitForTxn(
return err
},
)
return resTxn, err
return resTxn, resErr
}

// resolveIndeterminateCommitForTxnProbe performs the "probing phase" of the
Expand Down Expand Up @@ -318,3 +325,33 @@ func (m *manager) resolveIndeterminateCommitForTxnRecover(
recTxnResp := resps[0].GetInner().(*roachpb.RecoverTxnResponse)
return &recTxnResp.RecoveredTxn, nil
}

// Metrics implements the Manager interface.
func (m *manager) Metrics() Metrics {
return m.metrics
}

// updateMetrics updates the Manager's metrics to account for a new
// transaction recovery attempt. It returns a function that should
// be called when the recovery attempt completes.
func (m *manager) updateMetrics() func(*roachpb.Transaction, error) {
m.metrics.AttemptsPending.Inc(1)
m.metrics.Attempts.Inc(1)
return func(txn *roachpb.Transaction, err error) {
m.metrics.AttemptsPending.Dec(1)
if err != nil {
m.metrics.Failures.Inc(1)
} else {
switch txn.Status {
case roachpb.COMMITTED:
m.metrics.SuccessesAsCommitted.Inc(1)
case roachpb.ABORTED:
m.metrics.SuccessesAsAborted.Inc(1)
case roachpb.PENDING, roachpb.STAGING:
m.metrics.SuccessesAsPending.Inc(1)
default:
panic("unexpected")
}
}
}
}
74 changes: 64 additions & 10 deletions pkg/storage/txnrecovery/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,57 @@ func makeStagingTransaction(clock *hlc.Clock) roachpb.Transaction {
return txn
}

type metricVals struct {
attemptsPending int64
attempts int64
successesAsCommitted int64
successesAsAborted int64
successesAsPending int64
failures int64
}

func (v metricVals) merge(o metricVals) metricVals {
v.attemptsPending += o.attemptsPending
v.attempts += o.attempts
v.successesAsCommitted += o.successesAsCommitted
v.successesAsAborted += o.successesAsAborted
v.successesAsPending += o.successesAsPending
v.failures += o.failures
return v
}

func assertMetrics(t *testing.T, m Manager, v metricVals) {
assert.Equal(t, v.attemptsPending, m.Metrics().AttemptsPending.Value())
assert.Equal(t, v.attempts, m.Metrics().Attempts.Count())
assert.Equal(t, v.successesAsCommitted, m.Metrics().SuccessesAsCommitted.Count())
assert.Equal(t, v.successesAsAborted, m.Metrics().SuccessesAsAborted.Count())
assert.Equal(t, v.successesAsPending, m.Metrics().SuccessesAsPending.Count())
assert.Equal(t, v.failures, m.Metrics().Failures.Count())
}

// TestResolveIndeterminateCommit tests successful indeterminate commit
// resolution attempts. It tests the case where an intent is prevented
// and the case where an intent is not prevented.
func TestResolveIndeterminateCommit(t *testing.T) {
defer leaktest.AfterTest(t)()

var mockSender client.Sender
m, clock, stopper := makeManager(&mockSender)
defer stopper.Stop(context.Background())

txn := makeStagingTransaction(clock)
txn.InFlightWrites = []roachpb.SequencedWrite{
{Key: roachpb.Key("a"), Sequence: 1},
{Key: roachpb.Key("b"), Sequence: 2},
}

testutils.RunTrueAndFalse(t, "prevent", func(t *testing.T, prevent bool) {
var mockSender client.Sender
m, clock, stopper := makeManager(&mockSender)
defer stopper.Stop(context.Background())

txn := makeStagingTransaction(clock)
txn.InFlightWrites = []roachpb.SequencedWrite{
{Key: roachpb.Key("a"), Sequence: 1},
{Key: roachpb.Key("b"), Sequence: 2},
}

mockSender = client.SenderFunc(func(
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Probing Phase.
assertMetrics(t, m, metricVals{attemptsPending: 1, attempts: 1})

assert.Equal(t, 3, len(ba.Requests))
assert.IsType(t, &roachpb.QueryTxnRequest{}, ba.Requests[0].GetInner())
assert.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
Expand All @@ -89,6 +119,8 @@ func TestResolveIndeterminateCommit(t *testing.T) {
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Recovery Phase.
assertMetrics(t, m, metricVals{attemptsPending: 1, attempts: 1})

assert.Equal(t, 1, len(ba.Requests))
assert.IsType(t, &roachpb.RecoverTxnRequest{}, ba.Requests[0].GetInner())

Expand All @@ -110,15 +142,18 @@ func TestResolveIndeterminateCommit(t *testing.T) {
return br, nil
})

assertMetrics(t, m, metricVals{})
iceErr := roachpb.NewIndeterminateCommitError(txn)
resTxn, err := m.ResolveIndeterminateCommit(context.Background(), iceErr)
assert.NotNil(t, resTxn)
assert.Nil(t, err)

if !prevent {
assert.Equal(t, roachpb.COMMITTED, resTxn.Status)
assertMetrics(t, m, metricVals{attempts: 1, successesAsCommitted: 1})
} else {
assert.Equal(t, roachpb.ABORTED, resTxn.Status)
assertMetrics(t, m, metricVals{attempts: 1, successesAsAborted: 1})
}
})
}
Expand All @@ -140,10 +175,15 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
{Key: roachpb.Key("b"), Sequence: 2},
}

// Maintain an expected aggregation of metric updates.
var expMetrics metricVals
assertMetrics(t, m, expMetrics)

testCases := []struct {
name string
duringProbing bool
changedTxn roachpb.Transaction
metricImpact metricVals
}{
{
name: "transaction commit during probe",
Expand All @@ -154,6 +194,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.InFlightWrites = nil
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsCommitted: 1},
},
{
name: "transaction abort during probe",
Expand All @@ -164,6 +205,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.InFlightWrites = nil
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsAborted: 1},
},
{
name: "transaction restart during probe",
Expand All @@ -173,6 +215,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.BumpEpoch()
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsPending: 1},
},
{
name: "transaction timestamp increase during probe",
Expand All @@ -182,6 +225,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.Timestamp = txnCopy.Timestamp.Add(1, 0)
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsPending: 1},
},
{
name: "transaction commit during recovery",
Expand All @@ -192,6 +236,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.InFlightWrites = nil
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsCommitted: 1},
},
{
name: "transaction abort during recovery",
Expand All @@ -202,6 +247,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.InFlightWrites = nil
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsAborted: 1},
},
{
name: "transaction restart during recovery",
Expand All @@ -211,6 +257,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.BumpEpoch()
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsPending: 1},
},
{
name: "transaction timestamp increase during recovery",
Expand All @@ -220,6 +267,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
txnCopy.Timestamp = txnCopy.Timestamp.Add(1, 0)
return txnCopy
}(),
metricImpact: metricVals{attempts: 1, successesAsPending: 1},
},
}
for _, c := range testCases {
Expand All @@ -228,6 +276,8 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Probing Phase.
assertMetrics(t, m, expMetrics.merge(metricVals{attemptsPending: 1, attempts: 1}))

assert.Equal(t, 3, len(ba.Requests))
assert.IsType(t, &roachpb.QueryTxnRequest{}, ba.Requests[0].GetInner())
assert.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
Expand All @@ -251,6 +301,7 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
) (*roachpb.BatchResponse, *roachpb.Error) {
// Recovery Phase.
assert.False(t, c.duringProbing, "the recovery phase should not be run")
assertMetrics(t, m, expMetrics.merge(metricVals{attemptsPending: 1, attempts: 1}))

assert.Equal(t, 1, len(ba.Requests))
assert.IsType(t, &roachpb.RecoverTxnRequest{}, ba.Requests[0].GetInner())
Expand All @@ -272,6 +323,9 @@ func TestResolveIndeterminateCommitTxnChanges(t *testing.T) {
assert.NotNil(t, resTxn)
assert.Equal(t, c.changedTxn, *resTxn)
assert.Nil(t, err)

expMetrics = expMetrics.merge(c.metricImpact)
assertMetrics(t, m, expMetrics)
})
}
}
Expand Down
Loading