@@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1;
60
60
/// them in lighthouse.
61
61
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR : f32 = 0.1 ;
62
62
63
+ /// A fraction of `PeerManager::target_peers` that need to be outbound-only connections.
64
+ const MIN_OUTBOUND_ONLY_FACTOR : f32 = 0.1 ;
65
+
63
66
/// The main struct that handles peer's reputation and connection status.
64
67
pub struct PeerManager < TSpec : EthSpec > {
65
68
/// Storage of network globals to access the `PeerDB`.
@@ -835,7 +838,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
835
838
/// NOTE: This is experimental and will likely be adjusted
836
839
fn update_peer_scores ( & mut self ) {
837
840
/* Check how long have peers been in this state and update their reputations if needed */
838
-
839
841
let mut to_ban_peers = Vec :: new ( ) ;
840
842
let mut to_unban_peers = Vec :: new ( ) ;
841
843
@@ -910,12 +912,16 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
910
912
/// The Peer manager's heartbeat maintains the peer count and maintains peer reputations.
911
913
///
912
914
/// It will request discovery queries if the peer count has not reached the desired number of
913
- /// peers.
915
+ /// overall peers, as well as the desired number of outbound-only peers.
914
916
///
915
917
/// NOTE: Discovery will only add a new query if one isn't already queued.
916
918
fn heartbeat ( & mut self ) {
917
919
let peer_count = self . network_globals . connected_or_dialing_peers ( ) ;
918
- if peer_count < self . target_peers {
920
+ let mut outbound_only_peer_count = self . network_globals . connected_outbound_only_peers ( ) ;
921
+ let min_outbound_only_target =
922
+ ( self . target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR ) . ceil ( ) as usize ;
923
+
924
+ if peer_count < self . target_peers || outbound_only_peer_count < min_outbound_only_target {
919
925
// If we need more peers, queue a discovery lookup.
920
926
if self . discovery . started {
921
927
debug ! ( self . log, "Starting a new peer discovery query" ; "connected_peers" => peer_count, "target_peers" => self . target_peers) ;
@@ -931,19 +937,28 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
931
937
932
938
let connected_peer_count = self . network_globals . connected_peers ( ) ;
933
939
if connected_peer_count > self . target_peers {
934
- //remove excess peers with the worst scores, but keep subnet peers
935
- for ( peer_id, _) in self
940
+ // Remove excess peers with the worst scores, but keep subnet peers.
941
+ // Must also ensure that the outbound-only peer count does not go below the minimum threshold.
942
+ outbound_only_peer_count = self . network_globals . connected_outbound_only_peers ( ) ;
943
+ let mut n_outbound_removed = 0 ;
944
+ for ( peer_id, info) in self
936
945
. network_globals
937
946
. peers
938
947
. read ( )
939
948
. worst_connected_peers ( )
940
949
. iter ( )
941
950
. filter ( |( _, info) | !info. has_future_duty ( ) )
942
- . take ( connected_peer_count - self . target_peers )
943
- //we only need to disconnect peers with healthy scores, since the others got already
944
- //disconnected in update_peer_scores
945
- . filter ( |( _, info) | info. score_state ( ) == ScoreState :: Healthy )
946
951
{
952
+ if disconnecting_peers. len ( ) == connected_peer_count - self . target_peers {
953
+ break ;
954
+ }
955
+ if info. is_outbound_only ( ) {
956
+ if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed {
957
+ n_outbound_removed += 1 ;
958
+ } else {
959
+ continue ;
960
+ }
961
+ }
947
962
disconnecting_peers. push ( * * peer_id) ;
948
963
}
949
964
}
@@ -1045,3 +1060,296 @@ enum ConnectingType {
1045
1060
multiaddr : Multiaddr ,
1046
1061
} ,
1047
1062
}
1063
+
1064
+ #[ cfg( test) ]
1065
+ mod tests {
1066
+ use super :: * ;
1067
+ use crate :: discovery:: enr:: build_enr;
1068
+ use crate :: discovery:: enr_ext:: CombinedKeyExt ;
1069
+ use crate :: rpc:: methods:: MetaData ;
1070
+ use crate :: Enr ;
1071
+ use discv5:: enr:: CombinedKey ;
1072
+ use slog:: { o, Drain } ;
1073
+ use std:: net:: UdpSocket ;
1074
+ use types:: { EnrForkId , MinimalEthSpec } ;
1075
+
1076
+ type E = MinimalEthSpec ;
1077
+
1078
+ pub fn unused_port ( ) -> u16 {
1079
+ let socket = UdpSocket :: bind ( "127.0.0.1:0" ) . expect ( "should create udp socket" ) ;
1080
+ let local_addr = socket. local_addr ( ) . expect ( "should read udp socket" ) ;
1081
+ local_addr. port ( )
1082
+ }
1083
+
1084
+ pub fn build_log ( level : slog:: Level , enabled : bool ) -> slog:: Logger {
1085
+ let decorator = slog_term:: TermDecorator :: new ( ) . build ( ) ;
1086
+ let drain = slog_term:: FullFormat :: new ( decorator) . build ( ) . fuse ( ) ;
1087
+ let drain = slog_async:: Async :: new ( drain) . build ( ) . fuse ( ) ;
1088
+
1089
+ if enabled {
1090
+ slog:: Logger :: root ( drain. filter_level ( level) . fuse ( ) , o ! ( ) )
1091
+ } else {
1092
+ slog:: Logger :: root ( drain. filter ( |_| false ) . fuse ( ) , o ! ( ) )
1093
+ }
1094
+ }
1095
+
1096
+ async fn build_peer_manager ( target : usize ) -> PeerManager < E > {
1097
+ let keypair = libp2p:: identity:: Keypair :: generate_secp256k1 ( ) ;
1098
+ let config = NetworkConfig {
1099
+ discovery_port : unused_port ( ) ,
1100
+ target_peers : target,
1101
+ ..Default :: default ( )
1102
+ } ;
1103
+ let enr_key: CombinedKey = CombinedKey :: from_libp2p ( & keypair) . unwrap ( ) ;
1104
+ let enr: Enr = build_enr :: < E > ( & enr_key, & config, EnrForkId :: default ( ) ) . unwrap ( ) ;
1105
+ let log = build_log ( slog:: Level :: Debug , false ) ;
1106
+ let globals = NetworkGlobals :: new (
1107
+ enr,
1108
+ 9000 ,
1109
+ 9000 ,
1110
+ MetaData {
1111
+ seq_number : 0 ,
1112
+ attnets : Default :: default ( ) ,
1113
+ } ,
1114
+ vec ! [ ] ,
1115
+ & log,
1116
+ ) ;
1117
+ PeerManager :: new ( & keypair, & config, Arc :: new ( globals) , & log)
1118
+ . await
1119
+ . unwrap ( )
1120
+ }
1121
+
1122
+ #[ tokio:: test]
1123
+ async fn test_peer_manager_disconnects_correctly_during_heartbeat ( ) {
1124
+ let mut peer_manager = build_peer_manager ( 3 ) . await ;
1125
+
1126
+ // Create 5 peers to connect to.
1127
+ // 2 will be outbound-only, and have the lowest score.
1128
+ let peer0 = PeerId :: random ( ) ;
1129
+ let peer1 = PeerId :: random ( ) ;
1130
+ let peer2 = PeerId :: random ( ) ;
1131
+ let outbound_only_peer1 = PeerId :: random ( ) ;
1132
+ let outbound_only_peer2 = PeerId :: random ( ) ;
1133
+
1134
+ peer_manager. connect_ingoing ( & peer0, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1135
+ peer_manager. connect_ingoing ( & peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1136
+ peer_manager. connect_ingoing ( & peer2, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1137
+ peer_manager. connect_outgoing ( & outbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1138
+ peer_manager. connect_outgoing ( & outbound_only_peer2, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1139
+
1140
+ // Set the outbound-only peers to have the lowest score.
1141
+ peer_manager
1142
+ . network_globals
1143
+ . peers
1144
+ . write ( )
1145
+ . peer_info_mut ( & outbound_only_peer1)
1146
+ . unwrap ( )
1147
+ . add_to_score ( -1.0 ) ;
1148
+
1149
+ peer_manager
1150
+ . network_globals
1151
+ . peers
1152
+ . write ( )
1153
+ . peer_info_mut ( & outbound_only_peer2)
1154
+ . unwrap ( )
1155
+ . add_to_score ( -2.0 ) ;
1156
+
1157
+ // Check initial connected peers.
1158
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 5 ) ;
1159
+
1160
+ peer_manager. heartbeat ( ) ;
1161
+
1162
+ // Check that we disconnected from two peers.
1163
+ // Check that one outbound-only peer was removed because it had the worst score
1164
+ // and that we did not disconnect the other outbound peer due to the minimum outbound quota.
1165
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 3 ) ;
1166
+ assert ! ( peer_manager
1167
+ . network_globals
1168
+ . peers
1169
+ . read( )
1170
+ . is_connected( & outbound_only_peer1) ) ;
1171
+ assert ! ( !peer_manager
1172
+ . network_globals
1173
+ . peers
1174
+ . read( )
1175
+ . is_connected( & outbound_only_peer2) ) ;
1176
+
1177
+ peer_manager. heartbeat ( ) ;
1178
+
1179
+ // Check that if we are at target number of peers, we do not disconnect any.
1180
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 3 ) ;
1181
+ }
1182
+
1183
+ #[ tokio:: test]
1184
+ async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat ( ) {
1185
+ let mut peer_manager = build_peer_manager ( 20 ) . await ;
1186
+
1187
+ // Connect to 20 ingoing-only peers.
1188
+ for _i in 0 ..19 {
1189
+ let peer = PeerId :: random ( ) ;
1190
+ peer_manager. connect_ingoing ( & peer, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1191
+ }
1192
+
1193
+ // Connect an outbound-only peer.
1194
+ // Give it the lowest score so that it is evaluated first in the disconnect list iterator.
1195
+ let outbound_only_peer = PeerId :: random ( ) ;
1196
+ peer_manager. connect_ingoing ( & outbound_only_peer, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1197
+ peer_manager
1198
+ . network_globals
1199
+ . peers
1200
+ . write ( )
1201
+ . peer_info_mut ( & ( outbound_only_peer) )
1202
+ . unwrap ( )
1203
+ . add_to_score ( -1.0 ) ;
1204
+ // After heartbeat, we will have removed one peer.
1205
+ // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection.
1206
+ peer_manager. heartbeat ( ) ;
1207
+ assert_eq ! (
1208
+ peer_manager. network_globals. connected_or_dialing_peers( ) ,
1209
+ 20
1210
+ ) ;
1211
+ }
1212
+
1213
+ #[ tokio:: test]
1214
+ async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat ( ) {
1215
+ let mut peer_manager = build_peer_manager ( 3 ) . await ;
1216
+
1217
+ // Create 3 peers to connect to.
1218
+ let peer0 = PeerId :: random ( ) ;
1219
+ let inbound_only_peer1 = PeerId :: random ( ) ;
1220
+ let outbound_only_peer1 = PeerId :: random ( ) ;
1221
+
1222
+ peer_manager. connect_ingoing ( & peer0, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1223
+ peer_manager. connect_outgoing ( & peer0, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1224
+
1225
+ // Connect to two peers that are on the threshold of being disconnected.
1226
+ peer_manager. connect_ingoing ( & inbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1227
+ peer_manager. connect_outgoing ( & outbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1228
+ peer_manager
1229
+ . network_globals
1230
+ . peers
1231
+ . write ( )
1232
+ . peer_info_mut ( & ( inbound_only_peer1) )
1233
+ . unwrap ( )
1234
+ . add_to_score ( -19.9 ) ;
1235
+ peer_manager
1236
+ . network_globals
1237
+ . peers
1238
+ . write ( )
1239
+ . peer_info_mut ( & ( outbound_only_peer1) )
1240
+ . unwrap ( )
1241
+ . add_to_score ( -19.9 ) ;
1242
+ // Update the gossipsub scores to induce connection downgrade
1243
+ // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection.
1244
+ // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected,
1245
+ // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed.
1246
+ peer_manager
1247
+ . network_globals
1248
+ . peers
1249
+ . write ( )
1250
+ . peer_info_mut ( & ( inbound_only_peer1) )
1251
+ . unwrap ( )
1252
+ . set_gossipsub_score ( -85.0 ) ;
1253
+ peer_manager
1254
+ . network_globals
1255
+ . peers
1256
+ . write ( )
1257
+ . peer_info_mut ( & ( outbound_only_peer1) )
1258
+ . unwrap ( )
1259
+ . set_gossipsub_score ( -85.0 ) ;
1260
+
1261
+ peer_manager. heartbeat ( ) ;
1262
+
1263
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 1 ) ;
1264
+ }
1265
+
1266
+ #[ tokio:: test]
1267
+ async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target ( ) {
1268
+ let mut peer_manager = build_peer_manager ( 3 ) . await ;
1269
+
1270
+ // Create 4 peers to connect to.
1271
+ // One pair will be unhealthy inbound only and outbound only peers.
1272
+ let peer0 = PeerId :: random ( ) ;
1273
+ let peer1 = PeerId :: random ( ) ;
1274
+ let inbound_only_peer1 = PeerId :: random ( ) ;
1275
+ let outbound_only_peer1 = PeerId :: random ( ) ;
1276
+
1277
+ peer_manager. connect_ingoing ( & peer0, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1278
+ peer_manager. connect_ingoing ( & peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1279
+
1280
+ // Connect to two peers that are on the threshold of being disconnected.
1281
+ peer_manager. connect_ingoing ( & inbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1282
+ peer_manager. connect_outgoing ( & outbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1283
+ peer_manager
1284
+ . network_globals
1285
+ . peers
1286
+ . write ( )
1287
+ . peer_info_mut ( & ( inbound_only_peer1) )
1288
+ . unwrap ( )
1289
+ . add_to_score ( -19.9 ) ;
1290
+ peer_manager
1291
+ . network_globals
1292
+ . peers
1293
+ . write ( )
1294
+ . peer_info_mut ( & ( outbound_only_peer1) )
1295
+ . unwrap ( )
1296
+ . add_to_score ( -19.9 ) ;
1297
+ peer_manager
1298
+ . network_globals
1299
+ . peers
1300
+ . write ( )
1301
+ . peer_info_mut ( & ( inbound_only_peer1) )
1302
+ . unwrap ( )
1303
+ . set_gossipsub_score ( -85.0 ) ;
1304
+ peer_manager
1305
+ . network_globals
1306
+ . peers
1307
+ . write ( )
1308
+ . peer_info_mut ( & ( outbound_only_peer1) )
1309
+ . unwrap ( )
1310
+ . set_gossipsub_score ( -85.0 ) ;
1311
+ peer_manager. heartbeat ( ) ;
1312
+ // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers,
1313
+ // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target).
1314
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 2 ) ;
1315
+ }
1316
+
1317
+ #[ tokio:: test]
1318
+ async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy ( ) {
1319
+ let mut peer_manager = build_peer_manager ( 3 ) . await ;
1320
+
1321
+ // Create 5 peers to connect to.
1322
+ // One will be unhealthy inbound only and outbound only peers.
1323
+ let peer0 = PeerId :: random ( ) ;
1324
+ let peer1 = PeerId :: random ( ) ;
1325
+ let peer2 = PeerId :: random ( ) ;
1326
+ let inbound_only_peer1 = PeerId :: random ( ) ;
1327
+ let outbound_only_peer1 = PeerId :: random ( ) ;
1328
+
1329
+ peer_manager. connect_ingoing ( & peer0, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1330
+ peer_manager. connect_ingoing ( & peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1331
+ peer_manager. connect_ingoing ( & peer2, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1332
+ peer_manager. connect_outgoing ( & outbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1333
+ // Have one peer be on the verge of disconnection.
1334
+ peer_manager. connect_ingoing ( & inbound_only_peer1, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) ) ;
1335
+ peer_manager
1336
+ . network_globals
1337
+ . peers
1338
+ . write ( )
1339
+ . peer_info_mut ( & ( inbound_only_peer1) )
1340
+ . unwrap ( )
1341
+ . add_to_score ( -19.9 ) ;
1342
+ peer_manager
1343
+ . network_globals
1344
+ . peers
1345
+ . write ( )
1346
+ . peer_info_mut ( & ( inbound_only_peer1) )
1347
+ . unwrap ( )
1348
+ . set_gossipsub_score ( -85.0 ) ;
1349
+
1350
+ peer_manager. heartbeat ( ) ;
1351
+ // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer,
1352
+ // the number of connected peers updates and we will not remove too many peers.
1353
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 3 ) ;
1354
+ }
1355
+ }
0 commit comments