mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-10 10:52:31 -05:00
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
This commit is contained in:
parent
c5a8b1b946
commit
4d712e366c
3 changed files with 24 additions and 16 deletions
12
src/net.cpp
12
src/net.cpp
|
@ -1239,9 +1239,19 @@ void CConnman::ThreadSocketHandler()
|
||||||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||||
pnode->CloseSocketDisconnect();
|
pnode->CloseSocketDisconnect();
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
if (notify)
|
if (notify) {
|
||||||
|
auto it(pnode->vRecvMsg.begin());
|
||||||
|
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||||
|
if (!it->complete())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
LOCK(pnode->cs_vProcessMsg);
|
||||||
|
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||||
|
}
|
||||||
WakeMessageHandler();
|
WakeMessageHandler();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else if (nBytes == 0)
|
else if (nBytes == 0)
|
||||||
{
|
{
|
||||||
// socket closed gracefully
|
// socket closed gracefully
|
||||||
|
|
|
@ -608,6 +608,9 @@ public:
|
||||||
std::deque<std::vector<unsigned char>> vSendMsg;
|
std::deque<std::vector<unsigned char>> vSendMsg;
|
||||||
CCriticalSection cs_vSend;
|
CCriticalSection cs_vSend;
|
||||||
|
|
||||||
|
CCriticalSection cs_vProcessMsg;
|
||||||
|
std::list<CNetMessage> vProcessMsg;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
std::deque<CInv> vRecvGetData;
|
||||||
std::list<CNetMessage> vRecvMsg;
|
std::list<CNetMessage> vRecvMsg;
|
||||||
CCriticalSection cs_vRecvMsg;
|
CCriticalSection cs_vRecvMsg;
|
||||||
|
|
|
@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
auto it = pfrom->vRecvMsg.begin();
|
std::list<CNetMessage> msgs;
|
||||||
if (it == pfrom->vRecvMsg.end())
|
{
|
||||||
|
LOCK(pfrom->cs_vProcessMsg);
|
||||||
|
if (pfrom->vProcessMsg.empty())
|
||||||
return false;
|
return false;
|
||||||
|
// Just take one message
|
||||||
// end, if an incomplete message is found
|
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||||
if (!it->complete())
|
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||||
return false;
|
}
|
||||||
|
CNetMessage& msg(msgs.front());
|
||||||
// get next message
|
|
||||||
CNetMessage msg = std::move(*it);
|
|
||||||
|
|
||||||
// at this point, any failure means we can delete the current message
|
|
||||||
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
|
|
||||||
|
|
||||||
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
|
|
||||||
|
|
||||||
msg.SetVersion(pfrom->GetRecvVersion());
|
msg.SetVersion(pfrom->GetRecvVersion());
|
||||||
// Scan for message start
|
// Scan for message start
|
||||||
|
|
Loading…
Add table
Reference in a new issue