mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-03 09:56:38 -05:00
[net] Pass nRecvFloodSize to CNode
This commit is contained in:
parent
860402ef2e
commit
cd0c8eeb09
5 changed files with 19 additions and 17 deletions
19
src/net.cpp
19
src/net.cpp
|
@ -573,7 +573,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
|
||||||
pszDest ? pszDest : "",
|
pszDest ? pszDest : "",
|
||||||
conn_type,
|
conn_type,
|
||||||
/*inbound_onion=*/false,
|
/*inbound_onion=*/false,
|
||||||
CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session) });
|
CNodeOptions{
|
||||||
|
.i2p_sam_session = std::move(i2p_transient_session),
|
||||||
|
.recv_flood_size = nReceiveFloodSize,
|
||||||
|
});
|
||||||
pnode->AddRef();
|
pnode->AddRef();
|
||||||
|
|
||||||
// We're making a new connection, harvest entropy from the time (and our peer count)
|
// We're making a new connection, harvest entropy from the time (and our peer count)
|
||||||
|
@ -1053,6 +1056,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
|
||||||
CNodeOptions{
|
CNodeOptions{
|
||||||
.permission_flags = permission_flags,
|
.permission_flags = permission_flags,
|
||||||
.prefer_evict = discouraged,
|
.prefer_evict = discouraged,
|
||||||
|
.recv_flood_size = nReceiveFloodSize,
|
||||||
});
|
});
|
||||||
pnode->AddRef();
|
pnode->AddRef();
|
||||||
m_msgproc->InitializeNode(*pnode, nodeServices);
|
m_msgproc->InitializeNode(*pnode, nodeServices);
|
||||||
|
@ -1328,7 +1332,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
|
||||||
}
|
}
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
if (notify) {
|
if (notify) {
|
||||||
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
pnode->MarkReceivedMsgsForProcessing();
|
||||||
WakeMessageHandler();
|
WakeMessageHandler();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2754,8 +2758,6 @@ ServiceFlags CConnman::GetLocalServices() const
|
||||||
return nLocalServices;
|
return nLocalServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
|
|
||||||
|
|
||||||
CNode::CNode(NodeId idIn,
|
CNode::CNode(NodeId idIn,
|
||||||
std::shared_ptr<Sock> sock,
|
std::shared_ptr<Sock> sock,
|
||||||
const CAddress& addrIn,
|
const CAddress& addrIn,
|
||||||
|
@ -2780,6 +2782,7 @@ CNode::CNode(NodeId idIn,
|
||||||
m_conn_type{conn_type_in},
|
m_conn_type{conn_type_in},
|
||||||
id{idIn},
|
id{idIn},
|
||||||
nLocalHostNonce{nLocalHostNonceIn},
|
nLocalHostNonce{nLocalHostNonceIn},
|
||||||
|
m_recv_flood_size{node_opts.recv_flood_size},
|
||||||
m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
|
m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
|
||||||
{
|
{
|
||||||
if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
|
if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
|
||||||
|
@ -2795,7 +2798,7 @@ CNode::CNode(NodeId idIn,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
void CNode::MarkReceivedMsgsForProcessing()
|
||||||
{
|
{
|
||||||
AssertLockNotHeld(m_msg_process_queue_mutex);
|
AssertLockNotHeld(m_msg_process_queue_mutex);
|
||||||
|
|
||||||
|
@ -2809,10 +2812,10 @@ void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
||||||
LOCK(m_msg_process_queue_mutex);
|
LOCK(m_msg_process_queue_mutex);
|
||||||
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
|
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
|
||||||
m_msg_process_queue_size += nSizeAdded;
|
m_msg_process_queue_size += nSizeAdded;
|
||||||
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
|
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
|
||||||
{
|
{
|
||||||
LOCK(m_msg_process_queue_mutex);
|
LOCK(m_msg_process_queue_mutex);
|
||||||
if (m_msg_process_queue.empty()) return std::nullopt;
|
if (m_msg_process_queue.empty()) return std::nullopt;
|
||||||
|
@ -2821,7 +2824,7 @@ std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood
|
||||||
// Just take one message
|
// Just take one message
|
||||||
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
|
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
|
||||||
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
|
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
|
||||||
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
|
||||||
|
|
||||||
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
|
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
|
||||||
}
|
}
|
||||||
|
|
|
@ -350,6 +350,7 @@ struct CNodeOptions
|
||||||
NetPermissionFlags permission_flags = NetPermissionFlags::None;
|
NetPermissionFlags permission_flags = NetPermissionFlags::None;
|
||||||
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
|
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
|
||||||
bool prefer_evict = false;
|
bool prefer_evict = false;
|
||||||
|
size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000};
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Information about a peer */
|
/** Information about a peer */
|
||||||
|
@ -421,7 +422,7 @@ public:
|
||||||
const ConnectionType m_conn_type;
|
const ConnectionType m_conn_type;
|
||||||
|
|
||||||
/** Move all messages from the received queue to the processing queue. */
|
/** Move all messages from the received queue to the processing queue. */
|
||||||
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
void MarkReceivedMsgsForProcessing()
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||||
|
|
||||||
/** Poll the next message from the processing queue of this connection.
|
/** Poll the next message from the processing queue of this connection.
|
||||||
|
@ -429,7 +430,7 @@ public:
|
||||||
* Returns std::nullopt if the processing queue is empty, or a pair
|
* Returns std::nullopt if the processing queue is empty, or a pair
|
||||||
* consisting of the message and a bool that indicates if the processing
|
* consisting of the message and a bool that indicates if the processing
|
||||||
* queue has more entries. */
|
* queue has more entries. */
|
||||||
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
|
std::optional<std::pair<CNetMessage, bool>> PollMessage()
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||||
|
|
||||||
/** Account for the total size of a sent message in the per msg type connection stats. */
|
/** Account for the total size of a sent message in the per msg type connection stats. */
|
||||||
|
@ -621,6 +622,7 @@ private:
|
||||||
const uint64_t nLocalHostNonce;
|
const uint64_t nLocalHostNonce;
|
||||||
std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION};
|
std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION};
|
||||||
|
|
||||||
|
const size_t m_recv_flood_size;
|
||||||
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
||||||
|
|
||||||
Mutex m_msg_process_queue_mutex;
|
Mutex m_msg_process_queue_mutex;
|
||||||
|
@ -883,8 +885,6 @@ public:
|
||||||
/** Get a unique deterministic randomizer. */
|
/** Get a unique deterministic randomizer. */
|
||||||
CSipHasher GetDeterministicRandomizer(uint64_t id) const;
|
CSipHasher GetDeterministicRandomizer(uint64_t id) const;
|
||||||
|
|
||||||
unsigned int GetReceiveFloodSize() const;
|
|
||||||
|
|
||||||
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
||||||
|
|
||||||
/** Return true if we should disconnect the peer for failing an inactivity check. */
|
/** Return true if we should disconnect the peer for failing an inactivity check. */
|
||||||
|
|
|
@ -4887,7 +4887,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->fPauseSend) return false;
|
if (pfrom->fPauseSend) return false;
|
||||||
|
|
||||||
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
|
auto poll_result{pfrom->PollMessage()};
|
||||||
if (!poll_result) {
|
if (!poll_result) {
|
||||||
// No message to process
|
// No message to process
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -125,7 +125,6 @@ FUZZ_TARGET_INIT(connman, initialize_connman)
|
||||||
std::vector<CNodeStats> stats;
|
std::vector<CNodeStats> stats;
|
||||||
connman.GetNodeStats(stats);
|
connman.GetNodeStats(stats);
|
||||||
(void)connman.GetOutboundTargetBytesLeft();
|
(void)connman.GetOutboundTargetBytesLeft();
|
||||||
(void)connman.GetReceiveFloodSize();
|
|
||||||
(void)connman.GetTotalBytesRecv();
|
(void)connman.GetTotalBytesRecv();
|
||||||
(void)connman.GetTotalBytesSent();
|
(void)connman.GetTotalBytesSent();
|
||||||
(void)connman.GetTryNewOutboundPeer();
|
(void)connman.GetTryNewOutboundPeer();
|
||||||
|
|
|
@ -66,7 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
|
||||||
{
|
{
|
||||||
assert(node.ReceiveMsgBytes(msg_bytes, complete));
|
assert(node.ReceiveMsgBytes(msg_bytes, complete));
|
||||||
if (complete) {
|
if (complete) {
|
||||||
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
node.MarkReceivedMsgsForProcessing();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue