mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-03-05 14:06:27 -05:00
Merge bitcoin/bitcoin#27257: refactor, net: End friendship of CNode, CConnman and ConnmanTestMsg
3566aa7d49
[net] Remove CNode friends (dergoegge)3eac5e7cd1
[net] Add CNode helper for send byte accounting (dergoegge)60441a3432
scripted-diff: [net] Rename CNode process queue members (dergoegge)6693c499f7
[net] Make cs_vProcessMsg a non-recursive mutex (dergoegge)23d9352654
[net] Make CNode msg process queue members private (dergoegge)897e342d6e
[net] Encapsulate CNode message polling (dergoegge)cc5cdf8776
[net] Deduplicate marking received message for processing (dergoegge)ad44aa5c64
[net] Add connection type getter to CNode (dergoegge) Pull request description: We should define clear interfaces between CNode, CConnman and PeerManager. This PR makes a small step in that direction by ending the friendship of CNode, CConnman and ConnmanTestMsg. CNode's message processing queue is made private in the process and its mutex is turned into a non-recursive mutex. ACKs for top commit: jnewbery: utACK3566aa7d49
vasild: ACK3566aa7d49
theStack: re-ACK3566aa7d49
brunoerg: re-ACK3566aa7d49
Tree-SHA512: 26b87da5054e32401b693b2904e9c5f40e35a53937c0b6cf44b8597034ad07bacf27d87cdffc54d3e7ccfebde4231ef30a38d326f88cc18133bbb34688ead567
This commit is contained in:
commit
2305643646
4 changed files with 72 additions and 47 deletions
52
src/net.cpp
52
src/net.cpp
|
@ -917,7 +917,7 @@ bool CConnman::AttemptToEvictConnection()
|
|||
.m_is_local = node->addr.IsLocal(),
|
||||
.m_network = node->ConnectedThroughNetwork(),
|
||||
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
|
||||
.m_conn_type = node->m_conn_type,
|
||||
.m_conn_type = node->GetConnectionType(),
|
||||
};
|
||||
vEvictionCandidates.push_back(candidate);
|
||||
}
|
||||
|
@ -1092,7 +1092,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
|
|||
|
||||
// Count existing connections
|
||||
int existing_connections = WITH_LOCK(m_nodes_mutex,
|
||||
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
|
||||
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; }););
|
||||
|
||||
// Max connections of specified type already exist
|
||||
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
|
||||
|
@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
|
|||
}
|
||||
RecordBytesRecv(nBytes);
|
||||
if (notify) {
|
||||
size_t nSizeAdded = 0;
|
||||
for (const auto& msg : pnode->vRecvMsg) {
|
||||
// vRecvMsg contains only completed CNetMessage
|
||||
// the single possible partially deserialized message are held by TransportDeserializer
|
||||
nSizeAdded += msg.m_raw_message_size;
|
||||
}
|
||||
{
|
||||
LOCK(pnode->cs_vProcessMsg);
|
||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
|
||||
pnode->nProcessQueueSize += nSizeAdded;
|
||||
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
||||
}
|
||||
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
||||
WakeMessageHandler();
|
||||
}
|
||||
}
|
||||
|
@ -1722,7 +1711,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
|
|||
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
|
||||
|
||||
// Make sure our persistent outbound slots belong to different netgroups.
|
||||
switch (pnode->m_conn_type) {
|
||||
switch (pnode->GetConnectionType()) {
|
||||
// We currently don't take inbound connections into account. Since they are
|
||||
// free to make, an attacker could make them to prevent us from connecting to
|
||||
// certain peers.
|
||||
|
@ -2806,6 +2795,37 @@ CNode::CNode(NodeId idIn,
|
|||
}
|
||||
}
|
||||
|
||||
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
||||
{
|
||||
AssertLockNotHeld(m_msg_process_queue_mutex);
|
||||
|
||||
size_t nSizeAdded = 0;
|
||||
for (const auto& msg : vRecvMsg) {
|
||||
// vRecvMsg contains only completed CNetMessage
|
||||
// the single possible partially deserialized message are held by TransportDeserializer
|
||||
nSizeAdded += msg.m_raw_message_size;
|
||||
}
|
||||
|
||||
LOCK(m_msg_process_queue_mutex);
|
||||
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
|
||||
m_msg_process_queue_size += nSizeAdded;
|
||||
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
||||
}
|
||||
|
||||
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
|
||||
{
|
||||
LOCK(m_msg_process_queue_mutex);
|
||||
if (m_msg_process_queue.empty()) return std::nullopt;
|
||||
|
||||
std::list<CNetMessage> msgs;
|
||||
// Just take one message
|
||||
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
|
||||
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
|
||||
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
||||
|
||||
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
|
||||
}
|
||||
|
||||
bool CConnman::NodeFullyConnected(const CNode* pnode)
|
||||
{
|
||||
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
|
||||
|
@ -2840,7 +2860,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||
bool optimisticSend(pnode->vSendMsg.empty());
|
||||
|
||||
//log total amount of bytes per message type
|
||||
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
|
||||
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
|
||||
pnode->nSendSize += nTotalSize;
|
||||
|
||||
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
|
||||
|
|
35
src/net.h
35
src/net.h
|
@ -347,9 +347,6 @@ struct CNodeOptions
|
|||
/** Information about a peer */
|
||||
class CNode
|
||||
{
|
||||
friend class CConnman;
|
||||
friend struct ConnmanTestMsg;
|
||||
|
||||
public:
|
||||
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
|
||||
const std::unique_ptr<const TransportSerializer> m_serializer;
|
||||
|
@ -376,10 +373,6 @@ public:
|
|||
Mutex m_sock_mutex;
|
||||
Mutex cs_vRecv;
|
||||
|
||||
RecursiveMutex cs_vProcessMsg;
|
||||
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
|
||||
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
|
||||
|
||||
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
||||
|
||||
std::atomic<std::chrono::seconds> m_last_send{0s};
|
||||
|
@ -417,6 +410,30 @@ public:
|
|||
std::atomic_bool fPauseRecv{false};
|
||||
std::atomic_bool fPauseSend{false};
|
||||
|
||||
const ConnectionType& GetConnectionType() const
|
||||
{
|
||||
return m_conn_type;
|
||||
}
|
||||
|
||||
/** Move all messages from the received queue to the processing queue. */
|
||||
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||
|
||||
/** Poll the next message from the processing queue of this connection.
|
||||
*
|
||||
* Returns std::nullopt if the processing queue is empty, or a pair
|
||||
* consisting of the message and a bool that indicates if the processing
|
||||
* queue has more entries. */
|
||||
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||
|
||||
/** Account for the total size of a sent message in the per msg type connection stats. */
|
||||
void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
|
||||
{
|
||||
mapSendBytesPerMsgType[msg_type] += sent_bytes;
|
||||
}
|
||||
|
||||
bool IsOutboundOrBlockRelayConn() const {
|
||||
switch (m_conn_type) {
|
||||
case ConnectionType::OUTBOUND_FULL_RELAY:
|
||||
|
@ -602,6 +619,10 @@ private:
|
|||
|
||||
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
||||
|
||||
Mutex m_msg_process_queue_mutex;
|
||||
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
|
||||
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
|
||||
|
||||
// Our address, as reported by the peer
|
||||
CService addrLocal GUARDED_BY(m_addr_local_mutex);
|
||||
mutable Mutex m_addr_local_mutex;
|
||||
|
|
|
@ -4860,8 +4860,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||
{
|
||||
AssertLockHeld(g_msgproc_mutex);
|
||||
|
||||
bool fMoreWork = false;
|
||||
|
||||
PeerRef peer = GetPeerRef(pfrom->GetId());
|
||||
if (peer == nullptr) return false;
|
||||
|
||||
|
@ -4889,17 +4887,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||
// Don't bother if send buffer is too full to respond anyway
|
||||
if (pfrom->fPauseSend) return false;
|
||||
|
||||
std::list<CNetMessage> msgs;
|
||||
{
|
||||
LOCK(pfrom->cs_vProcessMsg);
|
||||
if (pfrom->vProcessMsg.empty()) return false;
|
||||
// Just take one message
|
||||
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
|
||||
pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize();
|
||||
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
|
||||
if (!poll_result) {
|
||||
// No message to process
|
||||
return false;
|
||||
}
|
||||
CNetMessage& msg(msgs.front());
|
||||
|
||||
CNetMessage& msg{poll_result->first};
|
||||
bool fMoreWork = poll_result->second;
|
||||
|
||||
TRACE6(net, inbound_message,
|
||||
pfrom->GetId(),
|
||||
|
|
|
@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
|
|||
{
|
||||
assert(node.ReceiveMsgBytes(msg_bytes, complete));
|
||||
if (complete) {
|
||||
size_t nSizeAdded = 0;
|
||||
for (const auto& msg : node.vRecvMsg) {
|
||||
// vRecvMsg contains only completed CNetMessage
|
||||
// the single possible partially deserialized message are held by TransportDeserializer
|
||||
nSizeAdded += msg.m_raw_message_size;
|
||||
}
|
||||
{
|
||||
LOCK(node.cs_vProcessMsg);
|
||||
node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
|
||||
node.nProcessQueueSize += nSizeAdded;
|
||||
node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
|
||||
}
|
||||
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue