Skip to content

Commit bf38e4f

Browse files
craig[bot]tbg
craig[bot]
andcommitted
Merge #37204
37204: rpc: don't leave poison zero-nodeID connections in pool r=knz a=tbg An optimiziation to share the `(target,remoteNodeID)` connection under a second name `(target,0)` backfired because we were never unregistering the latter, meaning that clients requesting `(target,0)` would be handed an eternally broken connection. See #37200. Release note (bug fix): Avoid a source of internal connectivity problems that would resolve after restarting the affected node. Co-authored-by: Tobias Schottdorf <[email protected]>
2 parents cf76698 + 0dd9ca7 commit bf38e4f

File tree

2 files changed

+65
-10
lines changed

2 files changed

+65
-10
lines changed

pkg/rpc/context.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func NewContext(
442442
conn.dialErr = &roachpb.NodeUnavailableError{}
443443
}
444444
})
445-
ctx.removeConn(k.(connKey), conn)
445+
ctx.removeConn(conn, k.(connKey))
446446
return true
447447
})
448448
})
@@ -566,10 +566,14 @@ func (ctx *Context) SetLocalInternalServer(internalServer roachpb.InternalServer
566566
ctx.localInternalClient = internalClientAdapter{internalServer}
567567
}
568568

569-
func (ctx *Context) removeConn(key connKey, conn *Connection) {
570-
ctx.conns.Delete(key)
569+
// removeConn removes the given connection from the pool. The supplied connKeys
570+
// must represent *all* the keys under among which the connection was shared.
571+
func (ctx *Context) removeConn(conn *Connection, keys ...connKey) {
572+
for _, key := range keys {
573+
ctx.conns.Delete(key)
574+
}
571575
if log.V(1) {
572-
log.Infof(ctx.masterCtx, "closing %+v", key)
576+
log.Infof(ctx.masterCtx, "closing %+v", keys)
573577
}
574578
if grpcConn := conn.grpcConn; grpcConn != nil {
575579
if err := grpcConn.Close(); err != nil && !grpcutil.IsClosedConnection(err) {
@@ -719,10 +723,10 @@ func (ctx *Context) GRPCDialNode(target string, remoteNodeID roachpb.NodeID) *Co
719723
}
720724

721725
func (ctx *Context) grpcDialNodeInternal(target string, remoteNodeID roachpb.NodeID) *Connection {
722-
thisConnKey := connKey{target, remoteNodeID}
723-
value, ok := ctx.conns.Load(thisConnKey)
726+
thisConnKeys := []connKey{{target, remoteNodeID}}
727+
value, ok := ctx.conns.Load(thisConnKeys[0])
724728
if !ok {
725-
value, _ = ctx.conns.LoadOrStore(thisConnKey, newConnectionToNodeID(ctx.Stopper, remoteNodeID))
729+
value, _ = ctx.conns.LoadOrStore(thisConnKeys[0], newConnectionToNodeID(ctx.Stopper, remoteNodeID))
726730
if remoteNodeID != 0 {
727731
// If the first connection established at a target address is
728732
// for a specific node ID, then we want to reuse that connection
@@ -732,12 +736,25 @@ func (ctx *Context) grpcDialNodeInternal(target string, remoteNodeID roachpb.Nod
732736
// not strictly required for correctness.) This LoadOrStore will
733737
// ensure we're registering the connection we just created for
734738
// future use by these other dials.
735-
_, _ = ctx.conns.LoadOrStore(connKey{target, 0}, value)
739+
//
740+
// We need to be careful to unregister both connKeys when the
741+
// connection breaks. Otherwise, we leak the entry below which
742+
// "simulates" a hard network partition for anyone dialing without
743+
// the nodeID (gossip).
744+
//
745+
// See:
746+
// https://github.com/cockroachdb/cockroach/issues/37200
747+
otherKey := connKey{target, 0}
748+
if _, loaded := ctx.conns.LoadOrStore(otherKey, value); !loaded {
749+
thisConnKeys = append(thisConnKeys, otherKey)
750+
}
736751
}
737752
}
738753

739754
conn := value.(*Connection)
740755
conn.initOnce.Do(func() {
756+
// Either we kick off the heartbeat loop (and clean up when it's done),
757+
// or we clean up the connKey entries immediately.
741758
var redialChan <-chan struct{}
742759
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
743760
if conn.dialErr == nil {
@@ -748,13 +765,15 @@ func (ctx *Context) grpcDialNodeInternal(target string, remoteNodeID roachpb.Nod
748765
if err != nil && !grpcutil.IsClosedConnection(err) {
749766
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
750767
}
751-
ctx.removeConn(thisConnKey, conn)
768+
ctx.removeConn(conn, thisConnKeys...)
752769
})
753770
}); err != nil {
754771
conn.dialErr = err
755-
ctx.removeConn(thisConnKey, conn)
756772
}
757773
}
774+
if conn.dialErr != nil {
775+
ctx.removeConn(conn, thisConnKeys...)
776+
}
758777
})
759778

760779
return conn

pkg/rpc/context_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,42 @@ func TestHeartbeatHealth(t *testing.T) {
312312
}
313313
}
314314

315+
// TestConnectionRemoveNodeIDZero verifies that when a connection initiated via
316+
// GRPCDialNode fails, we also clean up the connection returned by
317+
// GRPCUnvalidatedDial.
318+
//
319+
// See #37200.
320+
func TestConnectionRemoveNodeIDZero(t *testing.T) {
321+
defer leaktest.AfterTest(t)()
322+
323+
ctx := context.Background()
324+
stopper := stop.NewStopper()
325+
defer stopper.Stop(ctx)
326+
327+
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
328+
clientCtx := newTestContext(clock, stopper)
329+
// Provoke an error.
330+
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1).Connect(context.Background())
331+
if err == nil {
332+
t.Fatal("expected some kind of error, got nil")
333+
}
334+
335+
// NB: this takes a moment because GRPCDialRaw only gives up on the initial
336+
// connection after 1s (more precisely, the redialChan gets closed only after
337+
// 1s), which seems difficult to configure ad-hoc.
338+
testutils.SucceedsSoon(t, func() error {
339+
var keys []connKey
340+
clientCtx.conns.Range(func(k, v interface{}) bool {
341+
keys = append(keys, k.(connKey))
342+
return true
343+
})
344+
if len(keys) > 0 {
345+
return errors.Errorf("still have connections %v", keys)
346+
}
347+
return nil
348+
})
349+
}
350+
315351
type interceptingListener struct {
316352
net.Listener
317353
connCB func(net.Conn)

0 commit comments

Comments
 (0)