mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-08 10:31:50 -05:00
net: add NetEventsInterface::g_msgproc_mutex
There are many cases where we assume message processing is single-threaded in order for how we access node-related memory to be safe. Add an explicit mutex that we can use to document this, which allows the compiler to catch any cases where we try to access that memory from other threads and break that assumption.
This commit is contained in:
parent
124e75a41e
commit
1e78f566d5
10 changed files with 35 additions and 9 deletions
|
@ -1978,8 +1978,12 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Mutex NetEventsInterface::g_msgproc_mutex;
|
||||||
|
|
||||||
void CConnman::ThreadMessageHandler()
|
void CConnman::ThreadMessageHandler()
|
||||||
{
|
{
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER);
|
SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER);
|
||||||
while (!flagInterruptMsgProc)
|
while (!flagInterruptMsgProc)
|
||||||
{
|
{
|
||||||
|
|
|
@ -629,6 +629,9 @@ private:
|
||||||
class NetEventsInterface
|
class NetEventsInterface
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
/** Mutex for anything that is only accessed via the msg processing thread */
|
||||||
|
static Mutex g_msgproc_mutex;
|
||||||
|
|
||||||
/** Initialize a peer (setup state, queue any initial messages) */
|
/** Initialize a peer (setup state, queue any initial messages) */
|
||||||
virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0;
|
virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0;
|
||||||
|
|
||||||
|
@ -642,7 +645,7 @@ public:
|
||||||
* @param[in] interrupt Interrupt condition for processing threads
|
* @param[in] interrupt Interrupt condition for processing threads
|
||||||
* @return True if there is more work to be done
|
* @return True if there is more work to be done
|
||||||
*/
|
*/
|
||||||
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
|
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send queued protocol messages to a given node.
|
* Send queued protocol messages to a given node.
|
||||||
|
@ -650,7 +653,7 @@ public:
|
||||||
* @param[in] pnode The node which we are sending messages to.
|
* @param[in] pnode The node which we are sending messages to.
|
||||||
* @return True if there is more work to be done
|
* @return True if there is more work to be done
|
||||||
*/
|
*/
|
||||||
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;
|
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
|
||||||
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
|
@ -515,9 +515,9 @@ public:
|
||||||
void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||||
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex);
|
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex);
|
||||||
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
|
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex);
|
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
|
||||||
bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing)
|
bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing)
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex);
|
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex);
|
||||||
|
|
||||||
/** Implement PeerManager */
|
/** Implement PeerManager */
|
||||||
void StartScheduledTasks(CScheduler& scheduler) override;
|
void StartScheduledTasks(CScheduler& scheduler) override;
|
||||||
|
@ -532,7 +532,7 @@ public:
|
||||||
void UnitTestMisbehaving(NodeId peer_id, int howmuch) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), howmuch, ""); };
|
void UnitTestMisbehaving(NodeId peer_id, int howmuch) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), howmuch, ""); };
|
||||||
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
|
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
|
||||||
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
|
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex);
|
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
|
||||||
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
|
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -3135,6 +3135,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
||||||
const std::chrono::microseconds time_received,
|
const std::chrono::microseconds time_received,
|
||||||
const std::atomic<bool>& interruptMsgProc)
|
const std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
|
AssertLockHeld(g_msgproc_mutex);
|
||||||
|
|
||||||
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId());
|
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId());
|
||||||
|
|
||||||
PeerRef peer = GetPeerRef(pfrom.GetId());
|
PeerRef peer = GetPeerRef(pfrom.GetId());
|
||||||
|
@ -4748,6 +4750,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer)
|
||||||
|
|
||||||
bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
|
bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
|
AssertLockHeld(g_msgproc_mutex);
|
||||||
|
|
||||||
bool fMoreWork = false;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
PeerRef peer = GetPeerRef(pfrom->GetId());
|
PeerRef peer = GetPeerRef(pfrom->GetId());
|
||||||
|
@ -5240,6 +5244,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
|
||||||
|
|
||||||
bool PeerManagerImpl::SendMessages(CNode* pto)
|
bool PeerManagerImpl::SendMessages(CNode* pto)
|
||||||
{
|
{
|
||||||
|
AssertLockHeld(g_msgproc_mutex);
|
||||||
|
|
||||||
PeerRef peer = GetPeerRef(pto->GetId());
|
PeerRef peer = GetPeerRef(pto->GetId());
|
||||||
if (!peer) return false;
|
if (!peer) return false;
|
||||||
const Consensus::Params& consensusParams = m_chainparams.GetConsensus();
|
const Consensus::Params& consensusParams = m_chainparams.GetConsensus();
|
||||||
|
|
|
@ -84,7 +84,7 @@ public:
|
||||||
|
|
||||||
/** Process a single message from a peer. Public for fuzz testing */
|
/** Process a single message from a peer. Public for fuzz testing */
|
||||||
virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
|
virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
|
||||||
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) = 0;
|
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
|
||||||
|
|
||||||
/** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */
|
/** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */
|
||||||
virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0;
|
virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0;
|
||||||
|
|
|
@ -45,6 +45,8 @@ BOOST_FIXTURE_TEST_SUITE(denialofservice_tests, TestingSetup)
|
||||||
// work.
|
// work.
|
||||||
BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
|
BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
|
||||||
{
|
{
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
ConnmanTestMsg& connman = static_cast<ConnmanTestMsg&>(*m_node.connman);
|
ConnmanTestMsg& connman = static_cast<ConnmanTestMsg&>(*m_node.connman);
|
||||||
// Disable inactivity checks for this test to avoid interference
|
// Disable inactivity checks for this test to avoid interference
|
||||||
connman.SetPeerConnectTimeout(99999s);
|
connman.SetPeerConnectTimeout(99999s);
|
||||||
|
@ -274,6 +276,8 @@ BOOST_AUTO_TEST_CASE(block_relay_only_eviction)
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE(peer_discouragement)
|
BOOST_AUTO_TEST_CASE(peer_discouragement)
|
||||||
{
|
{
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
|
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
|
||||||
auto connman = std::make_unique<ConnmanTestMsg>(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman);
|
auto connman = std::make_unique<ConnmanTestMsg>(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman);
|
||||||
auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(),
|
auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(),
|
||||||
|
@ -386,6 +390,8 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE(DoS_bantime)
|
BOOST_AUTO_TEST_CASE(DoS_bantime)
|
||||||
{
|
{
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
|
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
|
||||||
auto connman = std::make_unique<CConnman>(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman);
|
auto connman = std::make_unique<CConnman>(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman);
|
||||||
auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(),
|
auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(),
|
||||||
|
|
|
@ -73,6 +73,8 @@ void fuzz_target(FuzzBufferType buffer, const std::string& LIMIT_TO_MESSAGE_TYPE
|
||||||
SetMockTime(1610000000); // any time to successfully reset ibd
|
SetMockTime(1610000000); // any time to successfully reset ibd
|
||||||
chainstate.ResetIbd();
|
chainstate.ResetIbd();
|
||||||
|
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
const std::string random_message_type{fuzzed_data_provider.ConsumeBytesAsString(CMessageHeader::COMMAND_SIZE).c_str()};
|
const std::string random_message_type{fuzzed_data_provider.ConsumeBytesAsString(CMessageHeader::COMMAND_SIZE).c_str()};
|
||||||
if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) {
|
if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -40,6 +40,8 @@ FUZZ_TARGET_INIT(process_messages, initialize_process_messages)
|
||||||
SetMockTime(1610000000); // any time to successfully reset ibd
|
SetMockTime(1610000000); // any time to successfully reset ibd
|
||||||
chainstate.ResetIbd();
|
chainstate.ResetIbd();
|
||||||
|
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
std::vector<CNode*> peers;
|
std::vector<CNode*> peers;
|
||||||
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
|
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
|
||||||
for (int i = 0; i < num_peers_to_add; ++i) {
|
for (int i = 0; i < num_peers_to_add; ++i) {
|
||||||
|
|
|
@ -328,7 +328,7 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
|
||||||
}
|
}
|
||||||
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt) { return ConsumeNode<true>(fdp, node_id_in); }
|
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt) { return ConsumeNode<true>(fdp, node_id_in); }
|
||||||
|
|
||||||
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept;
|
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
class FuzzedFileProvider
|
class FuzzedFileProvider
|
||||||
{
|
{
|
||||||
|
|
|
@ -805,6 +805,8 @@ BOOST_AUTO_TEST_CASE(LocalAddress_BasicLifecycle)
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message)
|
BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message)
|
||||||
{
|
{
|
||||||
|
LOCK(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
// Tests the following scenario:
|
// Tests the following scenario:
|
||||||
// * -bind=3.4.5.6:20001 is specified
|
// * -bind=3.4.5.6:20001 is specified
|
||||||
// * we make an outbound connection to a peer
|
// * we make an outbound connection to a peer
|
||||||
|
|
|
@ -44,9 +44,10 @@ struct ConnmanTestMsg : public CConnman {
|
||||||
ServiceFlags remote_services,
|
ServiceFlags remote_services,
|
||||||
ServiceFlags local_services,
|
ServiceFlags local_services,
|
||||||
int32_t version,
|
int32_t version,
|
||||||
bool relay_txs);
|
bool relay_txs)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
|
||||||
|
|
||||||
void ProcessMessagesOnce(CNode& node) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); }
|
void ProcessMessagesOnce(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); }
|
||||||
|
|
||||||
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
|
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue