mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-03-07 14:25:09 -05:00
Merge 5e1ff82251
into 85f96b01b7
This commit is contained in:
commit
dad51bd862
2 changed files with 195 additions and 31 deletions
|
@ -100,6 +100,8 @@ static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
|
|||
static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s};
|
||||
/** Maximum timeout for stalling block download. */
|
||||
static constexpr auto BLOCK_STALLING_TIMEOUT_MAX{64s};
|
||||
/** Timeout for stalling when close to the tip, after which we may add additional peers to download from */
|
||||
static constexpr auto BLOCK_NEARTIP_TIMEOUT_MAX{30s};
|
||||
/** Maximum depth of blocks we're willing to serve as compact blocks to peers
|
||||
* when requested. For older blocks, a regular BLOCK response will be sent. */
|
||||
static const int MAX_CMPCTBLOCK_DEPTH = 5;
|
||||
|
@ -746,7 +748,10 @@ private:
|
|||
std::atomic<int> m_best_height{-1};
|
||||
/** The time of the best chain tip block */
|
||||
std::atomic<std::chrono::seconds> m_best_block_time{0s};
|
||||
|
||||
/** The last time we requested a block from any peer */
|
||||
std::atomic<std::chrono::seconds> m_last_block_requested{0s};
|
||||
/** The last time we received a block from any peer */
|
||||
std::atomic<std::chrono::seconds> m_last_block_received{0s};
|
||||
/** Next time to check for stale tip */
|
||||
std::chrono::seconds m_stale_tip_check_time GUARDED_BY(cs_main){0s};
|
||||
|
||||
|
@ -1213,6 +1218,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st
|
|||
if (pit) {
|
||||
*pit = &itInFlight->second.second;
|
||||
}
|
||||
m_last_block_requested = GetTime<std::chrono::seconds>();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1461,6 +1467,30 @@ void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks, c
|
|||
if (waitingfor == -1) {
|
||||
// This is the first already-in-flight block.
|
||||
waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first;
|
||||
|
||||
// Decide whether to request this block from additional peers in parallel.
|
||||
// This is done if we are close (<=1024 blocks) from the tip, so that the usual
|
||||
// stalling mechanism doesn't work. To reduce excessive waste of bandwith, do this only
|
||||
// 30 seconds (BLOCK_NEARTIP_TIMEOUT_MAX) after a block was requested or received from any peer,
|
||||
// and only with up to 3 peers in parallel.
|
||||
bool already_requested_from_peer{false};
|
||||
auto range{mapBlocksInFlight.equal_range(pindex->GetBlockHash())};
|
||||
while (range.first != range.second) {
|
||||
if (range.first->second.first == peer.m_id) {
|
||||
already_requested_from_peer = true;
|
||||
break;
|
||||
}
|
||||
range.first++;
|
||||
}
|
||||
if (nMaxHeight <= nWindowEnd && // we have 1024 or less blocks left to download
|
||||
m_last_block_requested.load() > 0s &&
|
||||
GetTime<std::chrono::microseconds>() > m_last_block_requested.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
|
||||
GetTime<std::chrono::microseconds>() > m_last_block_received.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
|
||||
!already_requested_from_peer &&
|
||||
mapBlocksInFlight.count(pindex->GetBlockHash()) <= 2) {
|
||||
LogDebug(BCLog::NET, "Possible stalling close to tip: Requesting block %s additionally from peer %d\n", pindex->GetBlockHash().ToString(), peer.m_id);
|
||||
vBlocks.push_back(pindex);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -3269,6 +3299,7 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr<const CBlo
|
|||
m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
|
||||
if (new_block) {
|
||||
node.m_last_block_time = GetTime<std::chrono::seconds>();
|
||||
m_last_block_received = GetTime<std::chrono::seconds>();
|
||||
// In case this block came from a different peer than we requested
|
||||
// from, we can erase the block request now anyway (as we just stored
|
||||
// this block to disk).
|
||||
|
|
|
@ -13,14 +13,26 @@ from test_framework.blocktools import (
|
|||
create_coinbase
|
||||
)
|
||||
from test_framework.messages import (
|
||||
COutPoint,
|
||||
CTransaction,
|
||||
CTxIn,
|
||||
CTxOut,
|
||||
HeaderAndShortIDs,
|
||||
MSG_BLOCK,
|
||||
MSG_TYPE_MASK,
|
||||
msg_cmpctblock,
|
||||
msg_sendcmpct,
|
||||
)
|
||||
from test_framework.script import (
|
||||
CScript,
|
||||
OP_TRUE,
|
||||
)
|
||||
from test_framework.p2p import (
|
||||
CBlockHeader,
|
||||
msg_block,
|
||||
msg_headers,
|
||||
P2PDataStore,
|
||||
p2p_lock,
|
||||
)
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import (
|
||||
|
@ -31,6 +43,7 @@ from test_framework.util import (
|
|||
class P2PStaller(P2PDataStore):
|
||||
def __init__(self, stall_block):
|
||||
self.stall_block = stall_block
|
||||
self.stall_block_requested = False
|
||||
super().__init__()
|
||||
|
||||
def on_getdata(self, message):
|
||||
|
@ -39,6 +52,8 @@ class P2PStaller(P2PDataStore):
|
|||
if (inv.type & MSG_TYPE_MASK) == MSG_BLOCK:
|
||||
if (inv.hash != self.stall_block):
|
||||
self.send_message(msg_block(self.block_store[inv.hash]))
|
||||
else:
|
||||
self.stall_block_requested = True
|
||||
|
||||
def on_getheaders(self, message):
|
||||
pass
|
||||
|
@ -47,44 +62,50 @@ class P2PStaller(P2PDataStore):
|
|||
class P2PIBDStallingTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.setup_clean_chain = True
|
||||
self.num_nodes = 1
|
||||
self.num_nodes = 3
|
||||
|
||||
def setup_network(self):
|
||||
self.setup_nodes()
|
||||
# Don't connect the nodes
|
||||
|
||||
def prepare_blocks(self):
|
||||
self.log.info("Prepare blocks without sending them to any node")
|
||||
self.NUM_BLOCKS = 1025
|
||||
self.block_dict = {}
|
||||
self.blocks = []
|
||||
|
||||
def run_test(self):
|
||||
NUM_BLOCKS = 1025
|
||||
NUM_PEERS = 4
|
||||
node = self.nodes[0]
|
||||
tip = int(node.getbestblockhash(), 16)
|
||||
blocks = []
|
||||
height = 1
|
||||
block_time = node.getblock(node.getbestblockhash())['time'] + 1
|
||||
self.log.info("Prepare blocks without sending them to the node")
|
||||
block_dict = {}
|
||||
for _ in range(NUM_BLOCKS):
|
||||
blocks.append(create_block(tip, create_coinbase(height), block_time))
|
||||
blocks[-1].solve()
|
||||
tip = blocks[-1].sha256
|
||||
block_time = int(time.time())
|
||||
for _ in range(self.NUM_BLOCKS):
|
||||
self.blocks.append(create_block(tip, create_coinbase(height), block_time))
|
||||
self.blocks[-1].solve()
|
||||
tip = self.blocks[-1].sha256
|
||||
block_time += 1
|
||||
height += 1
|
||||
block_dict[blocks[-1].sha256] = blocks[-1]
|
||||
stall_block = blocks[0].sha256
|
||||
self.block_dict[self.blocks[-1].sha256] = self.blocks[-1]
|
||||
|
||||
def ibd_stalling(self):
|
||||
NUM_PEERS = 4
|
||||
stall_block = self.blocks[0].sha256
|
||||
node = self.nodes[0]
|
||||
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(b) for b in blocks[:NUM_BLOCKS-1]]
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:self.NUM_BLOCKS-1]]
|
||||
peers = []
|
||||
|
||||
self.log.info("Part 1: Test stalling during IBD")
|
||||
self.log.info("Check that a staller does not get disconnected if the 1024 block lookahead buffer is filled")
|
||||
self.mocktime = int(time.time()) + 1
|
||||
node.setmocktime(self.mocktime)
|
||||
for id in range(NUM_PEERS):
|
||||
peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=id, connection_type="outbound-full-relay"))
|
||||
peers[-1].block_store = block_dict
|
||||
peers[-1].block_store = self.block_dict
|
||||
peers[-1].send_message(headers_message)
|
||||
|
||||
# Need to wait until 1023 blocks are received - the magic total bytes number is a workaround in lack of an rpc
|
||||
# returning the number of downloaded (but not connected) blocks.
|
||||
bytes_recv = 172761 if not self.options.v2transport else 169692
|
||||
self.wait_until(lambda: self.total_bytes_recv_for_blocks() == bytes_recv)
|
||||
|
||||
# Wait until all blocks are received (except for stall_block), so that no other blocks are in flight.
|
||||
self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == 1)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
# If there was a peer marked for stalling, it would get disconnected
|
||||
self.mocktime += 3
|
||||
|
@ -93,7 +114,7 @@ class P2PIBDStallingTest(BitcoinTestFramework):
|
|||
assert_equal(node.num_test_p2p_connections(), NUM_PEERS)
|
||||
|
||||
self.log.info("Check that increasing the window beyond 1024 blocks triggers stalling logic")
|
||||
headers_message.headers = [CBlockHeader(b) for b in blocks]
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks]
|
||||
with node.assert_debug_log(expected_msgs=['Stall started']):
|
||||
for p in peers:
|
||||
p.send_message(headers_message)
|
||||
|
@ -139,17 +160,123 @@ class P2PIBDStallingTest(BitcoinTestFramework):
|
|||
with node.assert_debug_log(expected_msgs=['Decreased stalling timeout to 2 seconds']):
|
||||
for p in peers:
|
||||
if p.is_connected and (stall_block in p.getdata_requests):
|
||||
p.send_message(msg_block(block_dict[stall_block]))
|
||||
p.send_message(msg_block(self.block_dict[stall_block]))
|
||||
|
||||
self.log.info("Check that all outstanding blocks get connected")
|
||||
self.wait_until(lambda: node.getblockcount() == NUM_BLOCKS)
|
||||
self.wait_until(lambda: node.getblockcount() == self.NUM_BLOCKS)
|
||||
|
||||
def total_bytes_recv_for_blocks(self):
|
||||
total = 0
|
||||
for info in self.nodes[0].getpeerinfo():
|
||||
if ("block" in info["bytesrecv_per_msg"].keys()):
|
||||
total += info["bytesrecv_per_msg"]["block"]
|
||||
return total
|
||||
def near_tip_stalling(self):
|
||||
node = self.nodes[1]
|
||||
self.log.info("Part 3: Test stalling close to the tip")
|
||||
# only send <= 1024 headers, so that the window can't overshoot and the ibd stalling mechanism isn't triggered
|
||||
# make sure it works at different lengths
|
||||
for header_length in [1, 10, 1024]:
|
||||
peers = []
|
||||
stall_block = self.blocks[0].sha256
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:self.NUM_BLOCKS-1][:header_length]]
|
||||
|
||||
self.mocktime = int(time.time())
|
||||
node.setmocktime(self.mocktime)
|
||||
|
||||
self.log.info(f"Add three stalling peers, sending {header_length} headers")
|
||||
for id in range(4):
|
||||
peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=id, connection_type="outbound-full-relay"))
|
||||
peers[-1].block_store = self.block_dict
|
||||
peers[-1].send_message(headers_message)
|
||||
|
||||
self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == 1)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
assert_equal(sum(peer.stall_block_requested for peer in peers), 1)
|
||||
|
||||
self.log.info("Check that after 30 seconds we request the block from a second peer")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 2)
|
||||
|
||||
self.log.info("Check that after another 30 seconds we request the block from a third peer")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 3)
|
||||
|
||||
self.log.info("Check that after another 30 seconds we aren't requesting it from a fourth peer yet")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.all_sync_send_with_ping(peers)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 3)
|
||||
|
||||
self.log.info("Check that after another 20 minutes, first three stalling peers are disconnected")
|
||||
# 10 minutes BLOCK_DOWNLOAD_TIMEOUT_BASE + 2*5 minutes BLOCK_DOWNLOAD_TIMEOUT_PER_PEER
|
||||
self.mocktime += 20 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
# all peers have been requested
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 4)
|
||||
|
||||
self.log.info("Check that after another 20 minutes, last stalling peer is disconnected")
|
||||
# 10 minutes BLOCK_DOWNLOAD_TIMEOUT_BASE + 2*5 minutes BLOCK_DOWNLOAD_TIMEOUT_PER_PEER
|
||||
self.mocktime += 20 * 60
|
||||
node.setmocktime(self.mocktime)
|
||||
for peer in peers:
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
self.log.info("Provide missing block and check that the sync succeeds")
|
||||
peer = node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=0, connection_type="outbound-full-relay")
|
||||
peer.send_message(msg_block(self.block_dict[stall_block]))
|
||||
self.wait_until(lambda: node.getblockcount() == self.NUM_BLOCKS - 1)
|
||||
node.disconnect_p2ps()
|
||||
|
||||
def at_tip_stalling(self):
|
||||
self.log.info("Test stalling and interaction with compact blocks when at tip")
|
||||
node = self.nodes[2]
|
||||
peers = []
|
||||
# Create a block with a tx (would be invalid, but this doesn't matter since we will only ever send the header)
|
||||
tx = CTransaction()
|
||||
tx.vin.append(CTxIn(COutPoint(self.blocks[1].vtx[0].sha256, 0), scriptSig=b""))
|
||||
tx.vout.append(CTxOut(49 * 100000000, CScript([OP_TRUE])))
|
||||
tx.calc_sha256()
|
||||
block_time = self.blocks[1].nTime + 1
|
||||
block = create_block(self.blocks[1].sha256, create_coinbase(3), block_time, txlist=[tx])
|
||||
block.solve()
|
||||
|
||||
for id in range(3):
|
||||
peers.append(node.add_outbound_p2p_connection(P2PStaller(block.sha256), p2p_idx=id, connection_type="outbound-full-relay"))
|
||||
|
||||
# First Peer is a high-bw compact block peer
|
||||
peers[0].send_and_ping(msg_sendcmpct(announce=True, version=2))
|
||||
peers[0].block_store = self.block_dict
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:2]]
|
||||
peers[0].send_message(headers_message)
|
||||
self.wait_until(lambda: node.getblockcount() == 2)
|
||||
|
||||
self.log.info("First peer announces via cmpctblock")
|
||||
cmpct_block = HeaderAndShortIDs()
|
||||
cmpct_block.initialize_from_block(block)
|
||||
peers[0].send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
|
||||
with p2p_lock:
|
||||
assert "getblocktxn" in peers[0].last_message
|
||||
|
||||
self.log.info("Also announce block from other peers by header")
|
||||
headers_message = msg_headers()
|
||||
headers_message.headers = [CBlockHeader(block)]
|
||||
for peer in peers[1:4]:
|
||||
peer.send_and_ping(headers_message)
|
||||
|
||||
self.log.info("Check that block is requested from two more header-announcing peers")
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 0)
|
||||
|
||||
self.mocktime = int(time.time()) + 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 1)
|
||||
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 2)
|
||||
|
||||
self.log.info("Check that block is not requested from a third header-announcing peer")
|
||||
self.mocktime += 31
|
||||
node.setmocktime(self.mocktime)
|
||||
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 2)
|
||||
|
||||
def all_sync_send_with_ping(self, peers):
|
||||
for p in peers:
|
||||
|
@ -162,6 +289,12 @@ class P2PIBDStallingTest(BitcoinTestFramework):
|
|||
return True
|
||||
return False
|
||||
|
||||
def run_test(self):
|
||||
self.prepare_blocks()
|
||||
self.ibd_stalling()
|
||||
self.near_tip_stalling()
|
||||
self.at_tip_stalling()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
P2PIBDStallingTest(__file__).main()
|
||||
|
|
Loading…
Add table
Reference in a new issue