mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-08 10:31:50 -05:00
p2p: Add additional peers to download from when close to the tip
If we are 1024 or less blocks away from the tip and haven't requested or received a block from any peer for 30 seconds, add another peer to download the critical block from. Add up to two additional peers this way. Also adds test for the new behavior (co-authored by Greg Sanders) Co-authored-by: Greg Sanders <gsanders87@gmail.com>
This commit is contained in:
parent
c3d98815cc
commit
9019c08e4e
2 changed files with 86 additions and 25 deletions
|
@ -113,6 +113,8 @@ static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
|
|||
static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s};
|
||||
/** Maximum timeout for stalling block download. */
|
||||
static constexpr auto BLOCK_STALLING_TIMEOUT_MAX{64s};
|
||||
/** Timeout for stalling when close to the tip, after which we may add additional peers to download from */
|
||||
static constexpr auto BLOCK_NEARTIP_TIMEOUT_MAX{30s};
|
||||
/** Maximum depth of blocks we're willing to serve as compact blocks to peers
|
||||
* when requested. For older blocks, a regular BLOCK response will be sent. */
|
||||
static const int MAX_CMPCTBLOCK_DEPTH = 5;
|
||||
|
@ -787,7 +789,10 @@ private:
|
|||
std::atomic<int> m_best_height{-1};
|
||||
/** The time of the best chain tip block */
|
||||
std::atomic<std::chrono::seconds> m_best_block_time{0s};
|
||||
|
||||
/** The last time we requested a block from any peer */
|
||||
std::atomic<std::chrono::seconds> m_last_block_requested{0s};
|
||||
/** The last time we received a block from any peer */
|
||||
std::atomic<std::chrono::seconds> m_last_block_received{0s};
|
||||
/** Next time to check for stale tip */
|
||||
std::chrono::seconds m_stale_tip_check_time GUARDED_BY(cs_main){0s};
|
||||
|
||||
|
@ -1375,6 +1380,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st
|
|||
if (pit) {
|
||||
*pit = &itInFlight->second.second;
|
||||
}
|
||||
m_last_block_requested = GetTime<std::chrono::seconds>();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1623,6 +1629,30 @@ void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks, c
|
|||
if (waitingfor == -1) {
|
||||
// This is the first already-in-flight block.
|
||||
waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first;
|
||||
|
||||
// Decide whether to request this block from additional peers in parallel.
|
||||
// This is done if we are close (<=1024 blocks) from the tip, so that the usual
|
||||
// stalling mechanism doesn't work. To reduce excessive waste of bandwith, do this only
|
||||
// 30 seconds (BLOCK_NEARTIP_TIMEOUT_MAX) after a block was requested or received from any peer,
|
||||
// and only with up to 3 peers in parallel.
|
||||
bool already_requested_from_peer{false};
|
||||
auto range{mapBlocksInFlight.equal_range(pindex->GetBlockHash())};
|
||||
while (range.first != range.second) {
|
||||
if (range.first->second.first == peer.m_id) {
|
||||
already_requested_from_peer = true;
|
||||
break;
|
||||
}
|
||||
range.first++;
|
||||
}
|
||||
if (nMaxHeight <= nWindowEnd && // we have 1024 or less blocks left to download
|
||||
m_last_block_requested.load() > 0s &&
|
||||
GetTime<std::chrono::microseconds>() > m_last_block_requested.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
|
||||
GetTime<std::chrono::microseconds>() > m_last_block_received.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
|
||||
!already_requested_from_peer &&
|
||||
mapBlocksInFlight.count(pindex->GetBlockHash()) <= 2) {
|
||||
LogDebug(BCLog::NET, "Possible stalling close to tip: Requesting block %s additionally from peer %d\n", pindex->GetBlockHash().ToString(), peer.m_id);
|
||||
vBlocks.push_back(pindex);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -3608,6 +3638,7 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr<const CBlo
|
|||
m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
|
||||
if (new_block) {
|
||||
node.m_last_block_time = GetTime<std::chrono::seconds>();
|
||||
m_last_block_received = GetTime<std::chrono::seconds>();
|
||||
// In case this block came from a different peer than we requested
|
||||
// from, we can erase the block request now anyway (as we just stored
|
||||
// this block to disk).
|
||||
|
|
|
@ -154,33 +154,63 @@ class P2PIBDStallingTest(BitcoinTestFramework):
|
|||
|
||||
def near_tip_stalling(self):
|
||||
node = self.nodes[1]
|
||||
peers = []
|
||||
stall_block = self.blocks[0].sha256
|
||||
self.log.info("Part 2: Test stalling close to the tip")
|
||||
# only send 1024 headers, so that the window can't overshoot and the ibd stalling mechanism isn't triggered
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:self.NUM_BLOCKS-1]]
|
||||
self.log.info("Add two stalling peers")
|
||||
for id in range(2):
|
||||
peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=id, connection_type="outbound-full-relay"))
|
||||
peers[-1].block_store = self.block_dict
|
||||
peers[-1].send_message(headers_message)
|
||||
self.log.info("Part 3: Test stalling close to the tip")
|
||||
# only send <= 1024 headers, so that the window can't overshoot and the ibd stalling mechanism isn't triggered
|
||||
# make sure it works at different lengths
|
||||
for header_length in [1, 10, 1024]:
|
||||
peers = []
|
||||
stall_block = self.blocks[0].sha256
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:self.NUM_BLOCKS-1][:header_length]]
|
||||
|
||||
self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == 1)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
assert peers[0].stall_block_requested
|
||||
assert not peers[1].stall_block_requested
|
||||
self.mocktime = int(time.time())
|
||||
node.setmocktime(self.mocktime)
|
||||
|
||||
self.log.info("Check that after 9 minutes, nothing is done against the stalling")
|
||||
self.mocktime = int(time.time()) + 9 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
self.log.info(f"Add three stalling peers, sending {header_length} headers")
|
||||
for id in range(4):
|
||||
peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=id, connection_type="outbound-full-relay"))
|
||||
peers[-1].block_store = self.block_dict
|
||||
peers[-1].send_message(headers_message)
|
||||
|
||||
self.log.info("Check that after more than 10 minutes, the stalling peer is disconnected")
|
||||
self.mocktime += 2 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
peers[0].wait_for_disconnect()
|
||||
self.wait_until(lambda: peers[1].stall_block_requested)
|
||||
self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == 1)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
assert_equal(sum(peer.stall_block_requested for peer in peers), 1)
|
||||
|
||||
self.log.info("Check that after 30 seconds we request the block from a second peer")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 2)
|
||||
|
||||
self.log.info("Check that after another 30 seconds we request the block from a third peer")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 3)
|
||||
|
||||
self.log.info("Check that after another 30 seconds we aren't requesting it from a fourth peer yet")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 3)
|
||||
|
||||
self.log.info("Check that after another 20 minutes, first three stalling peers are disconnected")
|
||||
# 10 minutes BLOCK_DOWNLOAD_TIMEOUT_BASE + 2*5 minutes BLOCK_DOWNLOAD_TIMEOUT_PER_PEER
|
||||
self.mocktime += 20 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
# all peers have been requested
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 4)
|
||||
|
||||
self.log.info("Check that after another 20 minutes, last stalling peer is disconnected")
|
||||
# 10 minutes BLOCK_DOWNLOAD_TIMEOUT_BASE + 2*5 minutes BLOCK_DOWNLOAD_TIMEOUT_PER_PEER
|
||||
self.mocktime += 20 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
for peer in peers:
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
self.log.info("Provide missing block and check that the sync succeeds")
|
||||
peer = node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=0, connection_type="outbound-full-relay")
|
||||
peer.send_message(msg_block(self.block_dict[stall_block]))
|
||||
self.wait_until(lambda: node.getblockcount() == self.NUM_BLOCKS - 1)
|
||||
node.disconnect_p2ps()
|
||||
|
||||
def all_sync_send_with_ping(self, peers):
|
||||
for p in peers:
|
||||
|
|
Loading…
Add table
Reference in a new issue