0
0
Fork 0
mirror of https://github.com/bitcoin/bitcoin.git synced 2025-02-09 10:43:19 -05:00

Merge #19829: net processing: Move block inventory state to net_processing

3002b4af2b [net processing] Guard m_continuation_block with m_block_inv_mutex (John Newbery)
184557e8e0 [net processing] Move hashContinue to net processing (John Newbery)
c853ef002e scripted-diff: rename vBlockHashesToAnnounce and vInventoryBlockToSend (John Newbery)
53b7ac1b7d [net processing] Move block inventory data to Peer (John Newbery)
78040f9168 [net processing] Rename nStartingHeight to m_starting_height (John Newbery)
77a2c2f8f9 [net processing] Move nStartingHeight to Peer (John Newbery)
717a374e74 [net processing] Improve documentation for Peer destruction/locking (John Newbery)

Pull request description:

  This continues the work of moving application layer data into net_processing, by moving all block inventory state into the new Peer object added in #19607.

  For motivation, see #19398.

ACKs for top commit:
  jonatack:
    re-ACK 3002b4af2b per `git diff 9aad3e4 3002b4a`
  Sjors:
    Code review re-ACK 3002b4af2b
  MarcoFalke:
    re-ACK 3002b4af2b 🌓

Tree-SHA512: eb2b474b73b025791ee3e6e41809926b332b48468763219f31638ca390f427632f05902dfc6a2c6bdc1ce47b215782f67874ddbf05b97d77d5897b7e2abfe4d9
This commit is contained in:
MarcoFalke 2020-12-22 12:40:40 +01:00
commit df127ecede
No known key found for this signature in database
GPG key ID: D2EA4850E7528B25
6 changed files with 92 additions and 64 deletions

View file

@ -590,7 +590,6 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
stats.m_manual_connection = IsManualConn(); stats.m_manual_connection = IsManualConn();
X(m_bip152_highbandwidth_to); X(m_bip152_highbandwidth_to);
X(m_bip152_highbandwidth_from); X(m_bip152_highbandwidth_from);
X(nStartingHeight);
{ {
LOCK(cs_vSend); LOCK(cs_vSend);
X(mapSendBytesPerMsgCmd); X(mapSendBytesPerMsgCmd);
@ -2956,7 +2955,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
{ {
hSocket = hSocketIn; hSocket = hSocketIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
hashContinue = uint256();
if (conn_type_in != ConnectionType::BLOCK_RELAY) { if (conn_type_in != ConnectionType::BLOCK_RELAY) {
m_tx_relay = MakeUnique<TxRelay>(); m_tx_relay = MakeUnique<TxRelay>();
} }

View file

@ -705,7 +705,7 @@ public:
bool m_manual_connection; bool m_manual_connection;
bool m_bip152_highbandwidth_to; bool m_bip152_highbandwidth_to;
bool m_bip152_highbandwidth_from; bool m_bip152_highbandwidth_from;
int nStartingHeight; int m_starting_height;
uint64_t nSendBytes; uint64_t nSendBytes;
mapMsgCmdSize mapSendBytesPerMsgCmd; mapMsgCmdSize mapSendBytesPerMsgCmd;
uint64_t nRecvBytes; uint64_t nRecvBytes;
@ -993,8 +993,6 @@ protected:
mapMsgCmdSize mapRecvBytesPerMsgCmd GUARDED_BY(cs_vRecv); mapMsgCmdSize mapRecvBytesPerMsgCmd GUARDED_BY(cs_vRecv);
public: public:
uint256 hashContinue;
std::atomic<int> nStartingHeight{-1};
// We selected peer as (compact blocks) high-bandwidth peer (BIP152) // We selected peer as (compact blocks) high-bandwidth peer (BIP152)
std::atomic<bool> m_bip152_highbandwidth_to{false}; std::atomic<bool> m_bip152_highbandwidth_to{false};
// Peer selected us as (compact blocks) high-bandwidth peer (BIP152) // Peer selected us as (compact blocks) high-bandwidth peer (BIP152)
@ -1007,12 +1005,6 @@ public:
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0}; std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0}; std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};
// List of block ids we still have announce.
// There is no final sorting before sending, as they are always sent immediately
// and in the order requested.
std::vector<uint256> vInventoryBlockToSend GUARDED_BY(cs_inventory);
Mutex cs_inventory;
struct TxRelay { struct TxRelay {
mutable RecursiveMutex cs_filter; mutable RecursiveMutex cs_filter;
// We use fRelayTxes for two purposes - // We use fRelayTxes for two purposes -
@ -1043,9 +1035,6 @@ public:
// m_tx_relay == nullptr if we're not relaying transactions with this peer // m_tx_relay == nullptr if we're not relaying transactions with this peer
std::unique_ptr<TxRelay> m_tx_relay; std::unique_ptr<TxRelay> m_tx_relay;
// Used for headers announcements - unfiltered blocks to relay
std::vector<uint256> vBlockHashesToAnnounce GUARDED_BY(cs_inventory);
/** UNIX epoch time of the last block received from this peer that we had /** UNIX epoch time of the last block received from this peer that we had
* not yet seen (e.g. not already received from another peer), that passed * not yet seen (e.g. not already received from another peer), that passed
* preliminary validity checks and was saved to disk, even if we don't * preliminary validity checks and was saved to disk, even if we don't

View file

@ -791,6 +791,11 @@ void PeerManager::FinalizeNode(const CNode& node, bool& fUpdateConnectionTime) {
LOCK(cs_main); LOCK(cs_main);
int misbehavior{0}; int misbehavior{0};
{ {
// We remove the PeerRef from g_peer_map here, but we don't always
// destruct the Peer. Sometimes another thread is still holding a
// PeerRef, so the refcount is >= 1. Be careful not to do any
// processing here that assumes Peer won't be changed before it's
// destructed.
PeerRef peer = RemovePeer(nodeid); PeerRef peer = RemovePeer(nodeid);
assert(peer != nullptr); assert(peer != nullptr);
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
@ -870,6 +875,7 @@ bool PeerManager::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
PeerRef peer = GetPeerRef(nodeid); PeerRef peer = GetPeerRef(nodeid);
if (peer == nullptr) return false; if (peer == nullptr) return false;
stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
stats.m_starting_height = peer->m_starting_height;
return true; return true;
} }
@ -1309,13 +1315,17 @@ void PeerManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInde
} }
} }
// Relay to all peers {
m_connman.ForEachNode([&vHashes](CNode* pnode) { LOCK(m_peer_mutex);
LOCK(pnode->cs_inventory); for (auto& it : m_peer_map) {
for (const uint256& hash : reverse_iterate(vHashes)) { Peer& peer = *it.second;
pnode->vBlockHashesToAnnounce.push_back(hash); LOCK(peer.m_block_inv_mutex);
for (const uint256& hash : reverse_iterate(vHashes)) {
peer.m_blocks_for_headers_relay.push_back(hash);
}
} }
}); }
m_connman.WakeMessageHandler(); m_connman.WakeMessageHandler();
} }
@ -1465,7 +1475,7 @@ static void RelayAddress(const CNode& originator,
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
} }
void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, const CInv& inv, CConnman& connman) void static ProcessGetBlockData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
{ {
bool send = false; bool send = false;
std::shared_ptr<const CBlock> a_recent_block; std::shared_ptr<const CBlock> a_recent_block;
@ -1605,16 +1615,18 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c
} }
} }
// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == pfrom.hashContinue)
{ {
// Send immediately. This must send even if redundant, LOCK(peer.m_block_inv_mutex);
// and we want it right after the last block so they don't // Trigger the peer node to send a getblocks request for the next batch of inventory
// wait for other stuff first. if (inv.hash == peer.m_continuation_block) {
std::vector<CInv> vInv; // Send immediately. This must send even if redundant,
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash())); // and we want it right after the last block so they don't
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv)); // wait for other stuff first.
pfrom.hashContinue.SetNull(); std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
peer.m_continuation_block.SetNull();
}
} }
} }
} }
@ -1714,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) { if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++; const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) { if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman); ProcessGetBlockData(pfrom, peer, chainparams, inv, connman);
} }
// else: If the first item on the queue is an unknown type, we erase it // else: If the first item on the queue is an unknown type, we erase it
// and continue processing the queue on the next call. // and continue processing the queue on the next call.
@ -1764,7 +1776,9 @@ void PeerManager::SendBlockTransactions(CNode& pfrom, const CBlock& block, const
m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
} }
void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block) void PeerManager::ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block)
{ {
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
size_t nCount = headers.size(); size_t nCount = headers.size();
@ -1854,7 +1868,8 @@ void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHe
// Headers message had its maximum size; the peer may have more headers. // Headers message had its maximum size; the peer may have more headers.
// TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue // TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue
// from there instead. // from there instead.
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom.GetId(), pfrom.nStartingHeight); LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n",
pindexLast->nHeight, pfrom.GetId(), peer.m_starting_height);
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256())); m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256()));
} }
@ -2280,7 +2295,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ServiceFlags nServices; ServiceFlags nServices;
int nVersion; int nVersion;
std::string cleanSubVer; std::string cleanSubVer;
int nStartingHeight = -1; int starting_height = -1;
bool fRelay = true; bool fRelay = true;
vRecv >> nVersion >> nServiceInt >> nTime >> addrMe; vRecv >> nVersion >> nServiceInt >> nTime >> addrMe;
@ -2311,7 +2326,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
cleanSubVer = SanitizeString(strSubVer); cleanSubVer = SanitizeString(strSubVer);
} }
if (!vRecv.empty()) { if (!vRecv.empty()) {
vRecv >> nStartingHeight; vRecv >> starting_height;
} }
if (!vRecv.empty()) if (!vRecv.empty())
vRecv >> fRelay; vRecv >> fRelay;
@ -2360,7 +2375,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LOCK(pfrom.cs_SubVer); LOCK(pfrom.cs_SubVer);
pfrom.cleanSubVer = cleanSubVer; pfrom.cleanSubVer = cleanSubVer;
} }
pfrom.nStartingHeight = nStartingHeight; peer->m_starting_height = starting_height;
// set nodes not relaying blocks and tx and not serving (parts) of the historical blockchain as "clients" // set nodes not relaying blocks and tx and not serving (parts) of the historical blockchain as "clients"
pfrom.fClient = (!(nServices & NODE_NETWORK) && !(nServices & NODE_NETWORK_LIMITED)); pfrom.fClient = (!(nServices & NODE_NETWORK) && !(nServices & NODE_NETWORK_LIMITED));
@ -2440,7 +2455,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, peer=%d%s\n", LogPrint(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, peer=%d%s\n",
cleanSubVer, pfrom.nVersion, cleanSubVer, pfrom.nVersion,
pfrom.nStartingHeight, addrMe.ToString(), pfrom.GetId(), peer->m_starting_height, addrMe.ToString(), pfrom.GetId(),
remoteAddr); remoteAddr);
int64_t nTimeOffset = nTime - GetTime(); int64_t nTimeOffset = nTime - GetTime();
@ -2474,7 +2489,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
if (!pfrom.IsInboundConn()) { if (!pfrom.IsInboundConn()) {
LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n", LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n",
pfrom.nVersion.load(), pfrom.nStartingHeight, pfrom.nVersion.load(), peer->m_starting_height,
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToString()) : ""), pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToString()) : ""),
pfrom.ConnectionTypeAsString()); pfrom.ConnectionTypeAsString());
} }
@ -2786,13 +2801,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
break; break;
} }
WITH_LOCK(pfrom.cs_inventory, pfrom.vInventoryBlockToSend.push_back(pindex->GetBlockHash())); WITH_LOCK(peer->m_block_inv_mutex, peer->m_blocks_for_inv_relay.push_back(pindex->GetBlockHash()));
if (--nLimit <= 0) if (--nLimit <= 0) {
{
// When this block is requested, we'll send an inv that'll // When this block is requested, we'll send an inv that'll
// trigger the peer to getblocks the next batch of inventory. // trigger the peer to getblocks the next batch of inventory.
LogPrint(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); LogPrint(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
pfrom.hashContinue = pindex->GetBlockHash(); WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
break; break;
} }
} }
@ -3316,7 +3330,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// the peer if the header turns out to be for an invalid block. // the peer if the header turns out to be for an invalid block.
// Note that if a peer tries to build on an invalid chain, that // Note that if a peer tries to build on an invalid chain, that
// will be detected and the peer will be disconnected/discouraged. // will be detected and the peer will be disconnected/discouraged.
return ProcessHeadersMessage(pfrom, {cmpctblock.header}, /*via_compact_block=*/true); return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.header}, /*via_compact_block=*/true);
} }
if (fBlockReconstructed) { if (fBlockReconstructed) {
@ -3459,7 +3473,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ReadCompactSize(vRecv); // ignore tx count; assume it is 0. ReadCompactSize(vRecv); // ignore tx count; assume it is 0.
} }
return ProcessHeadersMessage(pfrom, headers, /*via_compact_block=*/false); return ProcessHeadersMessage(pfrom, *peer, headers, /*via_compact_block=*/false);
} }
if (msg_type == NetMsgType::BLOCK) if (msg_type == NetMsgType::BLOCK)
@ -4067,6 +4081,7 @@ public:
bool PeerManager::SendMessages(CNode* pto) bool PeerManager::SendMessages(CNode* pto)
{ {
PeerRef peer = GetPeerRef(pto->GetId());
const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); const Consensus::Params& consensusParams = m_chainparams.GetConsensus();
// We must call MaybeDiscourageAndDisconnect first, to ensure that we'll // We must call MaybeDiscourageAndDisconnect first, to ensure that we'll
@ -4192,7 +4207,7 @@ bool PeerManager::SendMessages(CNode* pto)
got back an empty response. */ got back an empty response. */
if (pindexStart->pprev) if (pindexStart->pprev)
pindexStart = pindexStart->pprev; pindexStart = pindexStart->pprev;
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight); LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height);
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256())); m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256()));
} }
} }
@ -4208,11 +4223,11 @@ bool PeerManager::SendMessages(CNode* pto)
// If no header would connect, or if we have too many // If no header would connect, or if we have too many
// blocks, or if the peer doesn't want headers, just // blocks, or if the peer doesn't want headers, just
// add all to the inv queue. // add all to the inv queue.
LOCK(pto->cs_inventory); LOCK(peer->m_block_inv_mutex);
std::vector<CBlock> vHeaders; std::vector<CBlock> vHeaders;
bool fRevertToInv = ((!state.fPreferHeaders && bool fRevertToInv = ((!state.fPreferHeaders &&
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) || (!state.fPreferHeaderAndIDs || peer->m_blocks_for_headers_relay.size() > 1)) ||
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE); peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE);
const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery
ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date
@ -4221,7 +4236,7 @@ bool PeerManager::SendMessages(CNode* pto)
// Try to find first header that our peer doesn't have, and // Try to find first header that our peer doesn't have, and
// then send all headers past that one. If we come across any // then send all headers past that one. If we come across any
// headers that aren't on ::ChainActive(), give up. // headers that aren't on ::ChainActive(), give up.
for (const uint256 &hash : pto->vBlockHashesToAnnounce) { for (const uint256& hash : peer->m_blocks_for_headers_relay) {
const CBlockIndex* pindex = LookupBlockIndex(hash); const CBlockIndex* pindex = LookupBlockIndex(hash);
assert(pindex); assert(pindex);
if (::ChainActive()[pindex->nHeight] != pindex) { if (::ChainActive()[pindex->nHeight] != pindex) {
@ -4238,7 +4253,7 @@ bool PeerManager::SendMessages(CNode* pto)
// which should be caught by the prior check), but one // which should be caught by the prior check), but one
// way this could happen is by using invalidateblock / // way this could happen is by using invalidateblock /
// reconsiderblock repeatedly on the tip, causing it to // reconsiderblock repeatedly on the tip, causing it to
// be added multiple times to vBlockHashesToAnnounce. // be added multiple times to m_blocks_for_headers_relay.
// Robustly deal with this rare situation by reverting // Robustly deal with this rare situation by reverting
// to an inv. // to an inv.
fRevertToInv = true; fRevertToInv = true;
@ -4310,10 +4325,10 @@ bool PeerManager::SendMessages(CNode* pto)
} }
if (fRevertToInv) { if (fRevertToInv) {
// If falling back to using an inv, just try to inv the tip. // If falling back to using an inv, just try to inv the tip.
// The last entry in vBlockHashesToAnnounce was our tip at some point // The last entry in m_blocks_for_headers_relay was our tip at some point
// in the past. // in the past.
if (!pto->vBlockHashesToAnnounce.empty()) { if (!peer->m_blocks_for_headers_relay.empty()) {
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back(); const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();
const CBlockIndex* pindex = LookupBlockIndex(hashToAnnounce); const CBlockIndex* pindex = LookupBlockIndex(hashToAnnounce);
assert(pindex); assert(pindex);
@ -4327,13 +4342,13 @@ bool PeerManager::SendMessages(CNode* pto)
// If the peer's chain has this block, don't inv it back. // If the peer's chain has this block, don't inv it back.
if (!PeerHasHeader(&state, pindex)) { if (!PeerHasHeader(&state, pindex)) {
pto->vInventoryBlockToSend.push_back(hashToAnnounce); peer->m_blocks_for_inv_relay.push_back(hashToAnnounce);
LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__, LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__,
pto->GetId(), hashToAnnounce.ToString()); pto->GetId(), hashToAnnounce.ToString());
} }
} }
} }
pto->vBlockHashesToAnnounce.clear(); peer->m_blocks_for_headers_relay.clear();
} }
// //
@ -4341,18 +4356,18 @@ bool PeerManager::SendMessages(CNode* pto)
// //
std::vector<CInv> vInv; std::vector<CInv> vInv;
{ {
LOCK(pto->cs_inventory); LOCK(peer->m_block_inv_mutex);
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX)); vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_MAX));
// Add blocks // Add blocks
for (const uint256& hash : pto->vInventoryBlockToSend) { for (const uint256& hash : peer->m_blocks_for_inv_relay) {
vInv.push_back(CInv(MSG_BLOCK, hash)); vInv.push_back(CInv(MSG_BLOCK, hash));
if (vInv.size() == MAX_INV_SZ) { if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear(); vInv.clear();
} }
} }
pto->vInventoryBlockToSend.clear(); peer->m_blocks_for_inv_relay.clear();
if (pto->m_tx_relay != nullptr) { if (pto->m_tx_relay != nullptr) {
LOCK(pto->m_tx_relay->cs_tx_inventory); LOCK(pto->m_tx_relay->cs_tx_inventory);

View file

@ -36,6 +36,7 @@ struct CNodeStateStats {
int m_misbehavior_score = 0; int m_misbehavior_score = 0;
int nSyncHeight = -1; int nSyncHeight = -1;
int nCommonHeight = -1; int nCommonHeight = -1;
int m_starting_height = -1;
std::vector<int> vHeightInFlight; std::vector<int> vHeightInFlight;
}; };
@ -46,6 +47,8 @@ struct CNodeStateStats {
* Memory is owned by shared pointers and this object is destructed when * Memory is owned by shared pointers and this object is destructed when
* the refcount drops to zero. * the refcount drops to zero.
* *
* Mutexes inside this struct must not be held when locking m_peer_mutex.
*
* TODO: move most members from CNodeState to this structure. * TODO: move most members from CNodeState to this structure.
* TODO: move remaining application-layer data members from CNode to this structure. * TODO: move remaining application-layer data members from CNode to this structure.
*/ */
@ -60,6 +63,25 @@ struct Peer {
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */ /** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false}; bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};
/** Protects block inventory data members */
Mutex m_block_inv_mutex;
/** List of blocks that we'll anounce via an `inv` message.
* There is no final sorting before sending, as they are always sent
* immediately and in the order requested. */
std::vector<uint256> m_blocks_for_inv_relay GUARDED_BY(m_block_inv_mutex);
/** Unfiltered list of blocks that we'd like to announce via a `headers`
* message. If we can't announce via a `headers` message, we'll fall back to
* announcing via `inv`. */
std::vector<uint256> m_blocks_for_headers_relay GUARDED_BY(m_block_inv_mutex);
/** The final block hash that we sent in an `inv` message to this peer.
* When the peer requests this block, we send an `inv` message to trigger
* the peer to request the next sequence of block hashes.
* Most peers use headers-first syncing, which doesn't use this mechanism */
uint256 m_continuation_block GUARDED_BY(m_block_inv_mutex) {};
/** This peer's reported block height when we connected */
std::atomic<int> m_starting_height{-1};
/** Set of txids to reconsider once their parent transactions have been accepted **/ /** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
@ -180,7 +202,9 @@ private:
void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);
/** Process a single headers message from a peer. */ /** Process a single headers message from a peer. */
void ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block); void ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block);
void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req); void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req);
@ -210,7 +234,8 @@ private:
* on extra block-relay-only peers. */ * on extra block-relay-only peers. */
bool m_initial_sync_finished{false}; bool m_initial_sync_finished{false};
/** Protects m_peer_map */ /** Protects m_peer_map. This mutex must not be locked while holding a lock
* on any of the mutexes inside a Peer object. */
mutable Mutex m_peer_mutex; mutable Mutex m_peer_mutex;
/** /**
* Map of all Peer objects, keyed by peer id. This map is protected * Map of all Peer objects, keyed by peer id. This map is protected

View file

@ -1109,7 +1109,6 @@ void RPCConsole::updateDetailWidget()
ui->peerVersion->setText(QString::number(stats->nodeStats.nVersion)); ui->peerVersion->setText(QString::number(stats->nodeStats.nVersion));
ui->peerSubversion->setText(QString::fromStdString(stats->nodeStats.cleanSubVer)); ui->peerSubversion->setText(QString::fromStdString(stats->nodeStats.cleanSubVer));
ui->peerDirection->setText(stats->nodeStats.fInbound ? tr("Inbound") : tr("Outbound")); ui->peerDirection->setText(stats->nodeStats.fInbound ? tr("Inbound") : tr("Outbound"));
ui->peerHeight->setText(QString::number(stats->nodeStats.nStartingHeight));
if (stats->nodeStats.m_permissionFlags == PF_NONE) { if (stats->nodeStats.m_permissionFlags == PF_NONE) {
ui->peerPermissions->setText(tr("N/A")); ui->peerPermissions->setText(tr("N/A"));
} else { } else {
@ -1135,6 +1134,8 @@ void RPCConsole::updateDetailWidget()
ui->peerCommonHeight->setText(QString("%1").arg(stats->nodeStateStats.nCommonHeight)); ui->peerCommonHeight->setText(QString("%1").arg(stats->nodeStateStats.nCommonHeight));
else else
ui->peerCommonHeight->setText(tr("Unknown")); ui->peerCommonHeight->setText(tr("Unknown"));
ui->peerHeight->setText(QString::number(stats->nodeStateStats.m_starting_height));
} }
ui->detailWidget->show(); ui->detailWidget->show();

View file

@ -133,8 +133,8 @@ static RPCHelpMan getpeerinfo()
{RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + ".\n" {RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + ".\n"
"Please note this output is unlikely to be stable in upcoming releases as we iterate to\n" "Please note this output is unlikely to be stable in upcoming releases as we iterate to\n"
"best capture connection behaviors."}, "best capture connection behaviors."},
{RPCResult::Type::NUM, "startingheight", "The starting height (block) of the peer"},
{RPCResult::Type::NUM, "banscore", "The ban score (DEPRECATED, returned only if config option -deprecatedrpc=banscore is passed)"}, {RPCResult::Type::NUM, "banscore", "The ban score (DEPRECATED, returned only if config option -deprecatedrpc=banscore is passed)"},
{RPCResult::Type::NUM, "startingheight", "The starting height (block) of the peer"},
{RPCResult::Type::NUM, "synced_headers", "The last header we have in common with this peer"}, {RPCResult::Type::NUM, "synced_headers", "The last header we have in common with this peer"},
{RPCResult::Type::NUM, "synced_blocks", "The last block we have in common with this peer"}, {RPCResult::Type::NUM, "synced_blocks", "The last block we have in common with this peer"},
{RPCResult::Type::ARR, "inflight", "", {RPCResult::Type::ARR, "inflight", "",
@ -224,12 +224,12 @@ static RPCHelpMan getpeerinfo()
// addnode is deprecated in v0.21 for removal in v0.22 // addnode is deprecated in v0.21 for removal in v0.22
obj.pushKV("addnode", stats.m_manual_connection); obj.pushKV("addnode", stats.m_manual_connection);
} }
obj.pushKV("startingheight", stats.nStartingHeight);
if (fStateStats) { if (fStateStats) {
if (IsDeprecatedRPCEnabled("banscore")) { if (IsDeprecatedRPCEnabled("banscore")) {
// banscore is deprecated in v0.21 for removal in v0.22 // banscore is deprecated in v0.21 for removal in v0.22
obj.pushKV("banscore", statestats.m_misbehavior_score); obj.pushKV("banscore", statestats.m_misbehavior_score);
} }
obj.pushKV("startingheight", statestats.m_starting_height);
obj.pushKV("synced_headers", statestats.nSyncHeight); obj.pushKV("synced_headers", statestats.nSyncHeight);
obj.pushKV("synced_blocks", statestats.nCommonHeight); obj.pushKV("synced_blocks", statestats.nCommonHeight);
UniValue heights(UniValue::VARR); UniValue heights(UniValue::VARR);