mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-13 11:25:02 -05:00
Move vRecvGetData to net processing
This commit is contained in:
parent
673247b58c
commit
2d9f2fca43
2 changed files with 16 additions and 10 deletions
|
@ -848,7 +848,6 @@ public:
|
||||||
|
|
||||||
RecursiveMutex cs_sendProcessing;
|
RecursiveMutex cs_sendProcessing;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
|
||||||
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
||||||
|
|
||||||
std::atomic<int64_t> nLastSend{0};
|
std::atomic<int64_t> nLastSend{0};
|
||||||
|
|
|
@ -515,6 +515,9 @@ struct Peer {
|
||||||
/** Set of txids to reconsider once their parent transactions have been accepted **/
|
/** Set of txids to reconsider once their parent transactions have been accepted **/
|
||||||
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
|
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
|
||||||
|
|
||||||
|
/** Work queue of items requested by this peer **/
|
||||||
|
std::deque<CInv> vRecvGetData;
|
||||||
|
|
||||||
Peer(NodeId id) : m_id(id) {}
|
Peer(NodeId id) : m_id(id) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1754,7 +1757,10 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
||||||
{
|
{
|
||||||
AssertLockNotHeld(cs_main);
|
AssertLockNotHeld(cs_main);
|
||||||
|
|
||||||
std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin();
|
PeerRef peer = GetPeerRef(pfrom.GetId());
|
||||||
|
if (peer == nullptr) return;
|
||||||
|
|
||||||
|
std::deque<CInv>::iterator it = peer->vRecvGetData.begin();
|
||||||
std::vector<CInv> vNotFound;
|
std::vector<CInv> vNotFound;
|
||||||
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
|
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
|
||||||
|
|
||||||
|
@ -1766,7 +1772,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
||||||
// Process as many TX items from the front of the getdata queue as
|
// Process as many TX items from the front of the getdata queue as
|
||||||
// possible, since they're common and it's efficient to batch process
|
// possible, since they're common and it's efficient to batch process
|
||||||
// them.
|
// them.
|
||||||
while (it != pfrom.vRecvGetData.end() && it->IsGenTxMsg()) {
|
while (it != peer->vRecvGetData.end() && it->IsGenTxMsg()) {
|
||||||
if (interruptMsgProc) return;
|
if (interruptMsgProc) return;
|
||||||
// The send buffer provides backpressure. If there's no space in
|
// The send buffer provides backpressure. If there's no space in
|
||||||
// the buffer, pause processing until the next call.
|
// the buffer, pause processing until the next call.
|
||||||
|
@ -1814,7 +1820,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
||||||
|
|
||||||
// Only process one BLOCK item per call, since they're uncommon and can be
|
// Only process one BLOCK item per call, since they're uncommon and can be
|
||||||
// expensive to process.
|
// expensive to process.
|
||||||
if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) {
|
if (it != peer->vRecvGetData.end() && !pfrom.fPauseSend) {
|
||||||
const CInv &inv = *it++;
|
const CInv &inv = *it++;
|
||||||
if (inv.IsGenBlkMsg()) {
|
if (inv.IsGenBlkMsg()) {
|
||||||
ProcessGetBlockData(pfrom, chainparams, inv, connman);
|
ProcessGetBlockData(pfrom, chainparams, inv, connman);
|
||||||
|
@ -1823,7 +1829,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
||||||
// and continue processing the queue on the next call.
|
// and continue processing the queue on the next call.
|
||||||
}
|
}
|
||||||
|
|
||||||
pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it);
|
peer->vRecvGetData.erase(peer->vRecvGetData.begin(), it);
|
||||||
|
|
||||||
if (!vNotFound.empty()) {
|
if (!vNotFound.empty()) {
|
||||||
// Let the peer know that we didn't find what it asked for, so it doesn't
|
// Let the peer know that we didn't find what it asked for, so it doesn't
|
||||||
|
@ -2805,7 +2811,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
|
||||||
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
|
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
|
||||||
}
|
}
|
||||||
|
|
||||||
pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end());
|
peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end());
|
||||||
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
|
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2914,7 +2920,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
|
||||||
CInv inv;
|
CInv inv;
|
||||||
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
|
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
|
||||||
inv.hash = req.blockhash;
|
inv.hash = req.blockhash;
|
||||||
pfrom.vRecvGetData.push_back(inv);
|
peer->vRecvGetData.push_back(inv);
|
||||||
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
|
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -3873,8 +3879,9 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
|
||||||
PeerRef peer = GetPeerRef(pfrom->GetId());
|
PeerRef peer = GetPeerRef(pfrom->GetId());
|
||||||
if (peer == nullptr) return false;
|
if (peer == nullptr) return false;
|
||||||
|
|
||||||
if (!pfrom->vRecvGetData.empty())
|
if (!peer->vRecvGetData.empty()) {
|
||||||
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
|
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
LOCK2(cs_main, g_cs_orphans);
|
LOCK2(cs_main, g_cs_orphans);
|
||||||
|
@ -3888,7 +3895,7 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
|
||||||
|
|
||||||
// this maintains the order of responses
|
// this maintains the order of responses
|
||||||
// and prevents vRecvGetData to grow unbounded
|
// and prevents vRecvGetData to grow unbounded
|
||||||
if (!pfrom->vRecvGetData.empty()) return true;
|
if (!peer->vRecvGetData.empty()) return true;
|
||||||
{
|
{
|
||||||
LOCK(g_cs_orphans);
|
LOCK(g_cs_orphans);
|
||||||
if (!peer->m_orphan_work_set.empty()) return true;
|
if (!peer->m_orphan_work_set.empty()) return true;
|
||||||
|
@ -3921,7 +3928,7 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
|
||||||
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
|
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
|
||||||
if (interruptMsgProc)
|
if (interruptMsgProc)
|
||||||
return false;
|
return false;
|
||||||
if (!pfrom->vRecvGetData.empty())
|
if (!peer->vRecvGetData.empty())
|
||||||
fMoreWork = true;
|
fMoreWork = true;
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
|
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
|
||||||
|
|
Loading…
Add table
Reference in a new issue