Skip to content

Commit 1da9b85

Browse files
committed
kv: don't clobber commit status when heartbeat fails
When a heartbeat failed while a request was in-flight, two proto updates could race and produce illegal behavior. For example, it was possible for a client to successfully commit but receive an ABORTED proto (and no error). This commit removes the ad-hoc manipulation of the state during heartbeat failures. Instead, before each new request sent through the heartbeat interceptor we check on the heartbeat loop's status; if it indicates an error, we refuse new requests. Fixes #39658. Release note: None
1 parent e445511 commit 1da9b85

File tree

2 files changed

+34
-29
lines changed

2 files changed

+34
-29
lines changed

pkg/kv/txn_interceptor_heartbeater.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ import (
2222
opentracing "github.com/opentracing/opentracing-go"
2323
)
2424

25+
type txnHeartbeaterStatus byte
26+
27+
const (
28+
txnHeartbeaterStatusReady txnHeartbeaterStatus = iota
29+
txnHeartbeaterStatusRunning
30+
txnHeartbeaterStatusRejecting
31+
)
32+
2533
// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
2634
// loop. Transaction coordinators heartbeat their transaction record
2735
// periodically to indicate the liveness of their transaction. Other actors like
@@ -84,9 +92,9 @@ type txnHeartbeater struct {
8492
mu struct {
8593
sync.Locker
8694

87-
// loopStarted indicates whether the heartbeat loop has been launched
95+
// status indicates whether the heartbeat loop has been launched
8896
// for the transaction or not. It remains true once the loop terminates.
89-
loopStarted bool
97+
status txnHeartbeaterStatus
9098

9199
// txnEnd is closed when the transaction is aborted or committed, terminating
92100
// the heartbeat loop. Nil if the heartbeat loop is not running.
@@ -122,20 +130,21 @@ func (h *txnHeartbeater) init(
122130
h.metrics = metrics
123131
h.mu.Locker = mu
124132
h.mu.txn = txn
133+
h.mu.status = txnHeartbeaterStatusReady
125134
h.gatekeeper = gatekeeper
126135
h.asyncAbortCallbackLocked = asyncAbortCallbackLocked
127136
}
128137

129-
// SendLocked is part of the txnInteceptor interface.
138+
// SendLocked is part of the txnInterceptor interface.
130139
func (h *txnHeartbeater) SendLocked(
131140
ctx context.Context, ba roachpb.BatchRequest,
132141
) (*roachpb.BatchResponse, *roachpb.Error) {
133142
// If finalErr is set, we reject everything but rollbacks.
134-
if h.mu.finalErr != nil {
143+
if h.mu.status == txnHeartbeaterStatusRejecting {
135144
singleRollback := ba.IsSingleEndTransactionRequest() &&
136145
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit
137146
if !singleRollback {
138-
return nil, h.mu.finalErr
147+
return nil, roachpb.NewErrorf("programming error: txnHeartbeater is finalized")
139148
}
140149
}
141150

@@ -161,12 +170,9 @@ func (h *txnHeartbeater) SendLocked(
161170
// Note that we don't do it for 1PC txns: they only leave intents around on
162171
// retriable errors if the batch has been split between ranges. We consider
163172
// that unlikely enough so we prefer to not pay for a goroutine.
164-
if !h.mu.loopStarted {
173+
if h.mu.status == txnHeartbeaterStatusReady {
165174
if _, haveEndTxn := ba.GetArg(roachpb.EndTransaction); !haveEndTxn {
166-
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
167-
h.mu.finalErr = roachpb.NewError(err)
168-
return nil, h.mu.finalErr
169-
}
175+
h.startHeartbeatLoopLocked(ctx)
170176
}
171177
}
172178
}
@@ -200,13 +206,13 @@ func (h *txnHeartbeater) closeLocked() {
200206
}
201207

202208
// startHeartbeatLoopLocked starts a heartbeat loop in a different goroutine.
203-
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
204-
if h.mu.loopStarted || h.mu.txnEnd != nil {
209+
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) {
210+
if h.mu.status != txnHeartbeaterStatusReady || h.mu.txnEnd != nil {
205211
log.Fatal(ctx, "attempting to start a second heartbeat loop ")
206212
}
207213

208214
log.VEventf(ctx, 2, "coordinator spawns heartbeat loop")
209-
h.mu.loopStarted = true
215+
h.mu.status = txnHeartbeaterStatusRunning
210216
h.mu.txnEnd = make(chan struct{})
211217

212218
// Create a new context so that the heartbeat loop doesn't inherit the
@@ -217,10 +223,12 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
217223
hbCtx := h.AnnotateCtx(context.Background())
218224
hbCtx = opentracing.ContextWithSpan(hbCtx, opentracing.SpanFromContext(ctx))
219225

220-
return h.stopper.RunAsyncTask(
226+
if err := h.stopper.RunAsyncTask(
221227
hbCtx, "kv.TxnCoordSender: heartbeat loop", func(ctx context.Context) {
222228
h.heartbeatLoop(ctx)
223-
})
229+
}); err != nil {
230+
h.mu.status = txnHeartbeaterStatusRejecting
231+
}
224232
}
225233

226234
// heartbeatLoop periodically sends a HeartbeatTxn request to the transaction
@@ -234,13 +242,10 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
234242
defer ticker.Stop()
235243
}
236244

237-
var finalErr *roachpb.Error
238245
defer func() {
239246
h.mu.Lock()
240-
// Prevent future SendLocked() calls.
241-
if finalErr != nil {
242-
h.mu.finalErr = finalErr
243-
}
247+
// Prevent future SendLocked() calls (except rollbacks).
248+
h.mu.status = txnHeartbeaterStatusRejecting
244249
if h.mu.txnEnd != nil {
245250
h.mu.txnEnd = nil
246251
}
@@ -264,15 +269,12 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
264269
// This error we're generating here should not be seen by clients. Since
265270
// the transaction is aborted, they should be rejected before they reach
266271
// this interceptor.
267-
finalErr = roachpb.NewErrorf("heartbeat failed fatally")
268272
return
269273
}
270274
case <-closer:
271275
// Transaction finished normally.
272-
finalErr = roachpb.NewErrorf("txnHeartbeater already closed")
273276
return
274277
case <-h.stopper.ShouldQuiesce():
275-
finalErr = roachpb.NewErrorf("node already quiescing")
276278
return
277279
}
278280
}
@@ -337,8 +339,16 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
337339
// TODO(nvanbenschoten): Make this the only case where we get back an
338340
// Aborted txn.
339341
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
340-
h.mu.txn.Status = roachpb.ABORTED
342+
// Note that it's possible that the txn actually committed but its
343+
// record got GC'ed. In that case, aborting won't hurt anyone though,
344+
// since all intents have already been resolved.
345+
// The only thing we must ascertain is that we don't tell the client
346+
// about this error - it will get either a definitive result of
347+
// its commit or an ambiguous one and we have nothing to offer that
348+
// provides more clarity. We do however prevent it from running more
349+
// requests in case it isn't aware that the transaction is over.
341350
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
351+
h.mu.status = txnHeartbeaterStatusRejecting
342352
h.abortTxnAsyncLocked(ctx)
343353
return false
344354
}
@@ -382,9 +392,6 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
382392
// abortTxnAsyncLocked send an EndTransaction(commmit=false) asynchronously.
383393
// The asyncAbortCallbackLocked callback is also called.
384394
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
385-
if h.mu.txn.Status != roachpb.ABORTED {
386-
log.Fatalf(ctx, "abortTxnAsyncLocked called for non-aborted txn: %s", h.mu.txn)
387-
}
388395
h.asyncAbortCallbackLocked(ctx)
389396
txn := h.mu.txn.Clone()
390397

pkg/storage/client_raft_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,8 +1091,6 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
10911091
func TestConcurrentRaftSnapshots(t *testing.T) {
10921092
defer leaktest.AfterTest(t)()
10931093

1094-
t.Skip("https://github.com/cockroachdb/cockroach/issues/39652")
1095-
10961094
mtc := &multiTestContext{
10971095
// This test was written before the multiTestContext started creating many
10981096
// system ranges at startup, and hasn't been update to take that into

0 commit comments

Comments
 (0)