0
0
Fork 0
mirror of https://github.com/bitcoin/bitcoin.git synced 2025-02-01 09:35:52 -05:00

net: count traffic bytes and number of messages globally

Before this change only per-peer stats were gathered. They
are lost when the peer disconnects.

So, collect the traffic stats globally in `CConnman`, broken down by
* direction (sent or received, (2))
* network of the peer (IPv4, IPv6, Tor, I2P, CJDNS (5))
* connection type (inbound, full outbound, feeler, etc, (6))
* message type (verack, ping, etc, (36))
This commit is contained in:
Vasil Dimov 2023-11-22 15:03:49 +01:00
parent 449a25b958
commit 8cbe02b3ae
No known key found for this signature in database
GPG key ID: 54DF06F64B55CBBF
6 changed files with 272 additions and 14 deletions

View file

@ -651,7 +651,7 @@ void CNode::CopyStats(CNodeStats& stats)
}
#undef X
bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete, NetStats& net_stats)
{
complete = false;
const auto time = GetTime<std::chrono::microseconds>();
@ -673,6 +673,12 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
// Message deserialization failed. Drop the message but don't disconnect the peer.
// store the size of the corrupt message
mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size;
net_stats.Record(NetStats::RECV,
ConnectedThroughNetwork(),
m_conn_type,
NET_MESSAGE_TYPE_OTHER,
/*num_messages=*/1,
msg.m_raw_message_size);
continue;
}
@ -684,6 +690,12 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
}
assert(i != mapRecvBytesPerMsgType.end());
i->second += msg.m_raw_message_size;
net_stats.Record(NetStats::RECV,
ConnectedThroughNetwork(),
m_conn_type,
/*msg_type=*/i->first,
/*num_messages=*/1,
msg.m_raw_message_size);
// push the message to the process queue,
vRecvMsg.push_back(std::move(msg));
@ -1584,7 +1596,7 @@ Transport::Info V2Transport::GetInfo() const noexcept
return info;
}
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node)
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
@ -1597,9 +1609,16 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
// there is an existing message still being sent, or (for v2 transports) when the
// handshake has not yet completed.
size_t memusage = it->GetMemoryUsage();
const auto msg_type = it->m_type;
if (node.m_transport->SetMessageToSend(*it)) {
// Update memory usage of send buffer (as *it will be deleted).
node.m_send_memusage -= memusage;
m_net_stats.Record(NetStats::SENT,
node.ConnectedThroughNetwork(),
node.m_conn_type,
msg_type,
/*num_messages=*/1,
/*num_bytes=*/0);
++it;
}
}
@ -1635,6 +1654,12 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
// Update statistics per message type.
if (!msg_type.empty()) { // don't report v2 handshake bytes for now
node.AccountForSentBytes(msg_type, nBytes);
m_net_stats.Record(NetStats::SENT,
node.ConnectedThroughNetwork(),
node.m_conn_type,
msg_type,
/*num_messages=*/0,
nBytes);
}
nSentSize += nBytes;
if ((size_t)nBytes != data.size()) {
@ -2145,7 +2170,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
if (nBytes > 0)
{
bool notify = false;
if (!pnode->ReceiveMsgBytes({pchBuf, (size_t)nBytes}, notify)) {
if (!pnode->ReceiveMsgBytes({pchBuf, (size_t)nBytes}, notify, m_net_stats)) {
LogDebug(BCLog::NET,
"receiving message bytes failed, %s\n",
pnode->DisconnectMsg(fLogIPs)
@ -3669,6 +3694,150 @@ void CConnman::RecordBytesSent(uint64_t bytes)
nMaxOutboundTotalBytesSentInCycle += bytes;
}
NetStats::NetStats()
: m_msg_type_to_index{[]() {
MsgTypeToIndex m;
size_t i{0};
for (const auto& msg_type : ALL_NET_MESSAGE_TYPES) {
m[msg_type] = i++;
}
return m;
}()}
{
}
void NetStats::Record(Direction direction,
Network net,
ConnectionType conn_type,
const std::string& msg_type,
size_t num_messages,
size_t num_bytes)
{
auto& d = m_data.at(DirectionToIndex(direction))
.at(NetworkToIndex(net))
.at(ConnectionTypeToIndex(conn_type))
.at(MessageTypeToIndex(msg_type));
d.count += num_messages;
d.bytes += num_bytes;
}
void NetStats::ForEach(std::function<void(NetStats::Direction dir,
Network net,
ConnectionType con,
const std::string& msg,
const BytesAndCount& data)> func) const
{
for (size_t dir_i = 0; dir_i < m_data.size(); ++dir_i) {
for (size_t net_i = 0; net_i < m_data[dir_i].size(); ++net_i) {
for (size_t con_i = 0; con_i < m_data[dir_i][net_i].size(); ++con_i) {
for (size_t msg_i = 0; msg_i < m_data[dir_i][net_i][con_i].size(); ++msg_i) {
func(DirectionFromIndex(dir_i),
NetworkFromIndex(net_i),
ConnectionTypeFromIndex(con_i),
MessageTypeFromIndex(msg_i),
m_data[dir_i][net_i][con_i][msg_i]);
}
}
}
}
}
constexpr size_t NetStats::DirectionToIndex(Direction direction)
{
switch (direction) {
case SENT: return 0;
case RECV: return 1;
}
assert(false);
}
constexpr NetStats::Direction NetStats::DirectionFromIndex(size_t index)
{
switch (index) {
case 0: return SENT;
case 1: return RECV;
}
assert(false);
}
constexpr size_t NetStats::NetworkToIndex(Network net)
{
switch (net) {
case NET_UNROUTABLE: return 0;
case NET_IPV4: return 1;
case NET_IPV6: return 2;
case NET_ONION: return 3;
case NET_I2P: return 4;
case NET_CJDNS: return 5;
case NET_INTERNAL: return 6;
case NET_MAX: assert(false);
}
assert(false);
}
constexpr Network NetStats::NetworkFromIndex(size_t index)
{
switch (index) {
case 0: return NET_UNROUTABLE;
case 1: return NET_IPV4;
case 2: return NET_IPV6;
case 3: return NET_ONION;
case 4: return NET_I2P;
case 5: return NET_CJDNS;
case 6: return NET_INTERNAL;
}
assert(false);
}
constexpr size_t NetStats::ConnectionTypeToIndex(ConnectionType conn_type)
{
switch (conn_type) {
case ConnectionType::INBOUND: return 0;
case ConnectionType::OUTBOUND_FULL_RELAY: return 1;
case ConnectionType::MANUAL: return 2;
case ConnectionType::FEELER: return 3;
case ConnectionType::BLOCK_RELAY: return 4;
case ConnectionType::ADDR_FETCH: return 5;
}
assert(false);
}
constexpr ConnectionType NetStats::ConnectionTypeFromIndex(size_t index)
{
switch (index) {
case 0: return ConnectionType::INBOUND;
case 1: return ConnectionType::OUTBOUND_FULL_RELAY;
case 2: return ConnectionType::MANUAL;
case 3: return ConnectionType::FEELER;
case 4: return ConnectionType::BLOCK_RELAY;
case 5: return ConnectionType::ADDR_FETCH;
}
assert(false);
}
size_t NetStats::MessageTypeToIndex(const std::string& msg_type) const
{
auto it = m_msg_type_to_index.find(msg_type);
if (it != m_msg_type_to_index.end()) {
return it->second;
}
// Unknown message (NET_MESSAGE_TYPE_OTHER), use the last entry in the array.
return ALL_NET_MESSAGE_TYPES.size();
}
std::string NetStats::MessageTypeFromIndex(size_t index)
{
if (index == ALL_NET_MESSAGE_TYPES.size()) {
return NET_MESSAGE_TYPE_OTHER;
}
return ALL_NET_MESSAGE_TYPES.at(index);
}
const NetStats& CConnman::GetNetStats() const
{
return m_net_stats;
}
uint64_t CConnman::GetMaxOutboundTarget() const
{
AssertLockNotHeld(m_total_bytes_sent_mutex);

View file

@ -659,6 +659,84 @@ public:
Info GetInfo() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
};
/**
* Network traffic (bytes and number of messages). Split by direction, network, connection type and message type.
*/
class NetStats
{
public:
/// Used to designate the direction of the recorded traffic.
enum Direction { SENT, RECV };
/// Number of elements in `Direction`.
static constexpr size_t NUM_DIRECTIONS{2};
struct BytesAndCount {
std::atomic_uint64_t bytes{0}; //!< Number of bytes transferred.
std::atomic_uint64_t count{0}; //!< Number of messages transferred.
BytesAndCount& operator+=(const BytesAndCount& toadd)
{
bytes += toadd.bytes;
count += toadd.count;
return *this;
}
};
NetStats();
/**
* Increment the number of messages transferred by `num_messages` and the number of bytes by `num_bytes`.
*/
void Record(Direction direction,
Network net,
ConnectionType conn_type,
const std::string& msg_type,
size_t num_messages,
size_t num_bytes);
/**
* Call the provided function for each stat.
*/
void ForEach(std::function<void(NetStats::Direction dir,
Network net,
ConnectionType con,
const std::string& msg,
const BytesAndCount& data)> func) const;
private:
// The ...FromIndex() and ...ToIndex() methods below convert from/to
// indexes of `m_data[]` to the actual values they represent. For example,
// assuming MessageTypeToIndex("ping") == 15, then everything stored in
// m_data[i][j][k][15] is traffic from "ping" messages (for any i, j, k).
static constexpr size_t DirectionToIndex(Direction direction);
static constexpr Direction DirectionFromIndex(size_t index);
static constexpr size_t NetworkToIndex(Network net);
static constexpr Network NetworkFromIndex(size_t index);
static constexpr size_t ConnectionTypeToIndex(ConnectionType conn_type);
static constexpr ConnectionType ConnectionTypeFromIndex(size_t index);
size_t MessageTypeToIndex(const std::string& msg_type) const;
static std::string MessageTypeFromIndex(size_t index);
// Access like m_data[direction index][net index][conn type index][msg type index].bytes = 123;
// Arrays are used so that this can be accessed from multiple threads without a mutex protection.
std::array<std::array<std::array<std::array<BytesAndCount, ALL_NET_MESSAGE_TYPES.size() + 1>,
NUM_CONNECTION_TYPES>,
NET_MAX>,
NUM_DIRECTIONS>
m_data;
using MsgTypeToIndex = std::unordered_map<std::string, size_t>;
/// Holds the index `i` in `m_data[][][][i]` of a given message type for quick lookup.
const MsgTypeToIndex m_msg_type_to_index;
};
struct CNodeOptions
{
NetPermissionFlags permission_flags = NetPermissionFlags::None;
@ -914,7 +992,8 @@ public:
* @return True if the peer should stay connected,
* False if the peer should be disconnected from.
*/
bool ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete) EXCLUSIVE_LOCKS_REQUIRED(!cs_vRecv);
bool ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete, NetStats& net_stats)
EXCLUSIVE_LOCKS_REQUIRED(!cs_vRecv);
void SetCommonVersion(int greatest_common_version)
{
@ -1272,6 +1351,8 @@ public:
bool MultipleManualOrFullOutboundConns(Network net) const EXCLUSIVE_LOCKS_REQUIRED(m_nodes_mutex);
const NetStats& GetNetStats() const;
private:
struct ListenSocket {
public:
@ -1371,7 +1452,7 @@ private:
NodeId GetNewNodeId();
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
std::pair<size_t, bool> SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
void DumpAddresses();
@ -1413,6 +1494,8 @@ private:
std::atomic<uint64_t> nTotalBytesRecv{0};
uint64_t nTotalBytesSent GUARDED_BY(m_total_bytes_sent_mutex) {0};
NetStats m_net_stats;
// outbound limit & stats
uint64_t nMaxOutboundTotalBytesSentInCycle GUARDED_BY(m_total_bytes_sent_mutex) {0};
std::chrono::seconds nMaxOutboundCycleStartTime GUARDED_BY(m_total_bytes_sent_mutex) {0};

View file

@ -12,9 +12,11 @@
* information we have available at the time of opening or accepting the
* connection. Aside from INBOUND, all types are initiated by us.
*
* If adding or removing types, please update CONNECTION_TYPE_DOC in
* src/rpc/net.cpp and src/qt/rpcconsole.cpp, as well as the descriptions in
* src/qt/guiutil.cpp and src/bitcoin-cli.cpp::NetinfoRequestHandler. */
* If adding or removing types, please update:
* - CONNECTION_TYPE_DOC in src/rpc/net.cpp and src/qt/rpcconsole.cpp
* - the descriptions in src/qt/guiutil.cpp and src/bitcoin-cli.cpp::NetinfoRequestHandler
* - NUM_CONNECTION_TYPES below.
*/
enum class ConnectionType {
/**
* Inbound connections are those initiated by a peer. This is the only
@ -77,6 +79,9 @@ enum class ConnectionType {
ADDR_FETCH,
};
/** Number of entries in ConnectionType. */
static constexpr size_t NUM_CONNECTION_TYPES{6};
/** Convert ConnectionType enum to a string value */
std::string ConnectionTypeAsString(ConnectionType conn_type);

View file

@ -39,6 +39,7 @@ FUZZ_TARGET(net, .init = initialize_net)
{
node.SetAddrLocal(*service_opt);
}
NetStats net_stats;
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) {
CallOneOf(
fuzzed_data_provider,
@ -61,7 +62,7 @@ FUZZ_TARGET(net, .init = initialize_net)
[&] {
const std::vector<uint8_t> b = ConsumeRandomLengthByteVector(fuzzed_data_provider);
bool complete;
node.ReceiveMsgBytes(b, complete);
node.ReceiveMsgBytes(b, complete, net_stats);
});
}

View file

@ -68,9 +68,9 @@ void ConnmanTestMsg::Handshake(CNode& node,
}
}
void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const
void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete)
{
assert(node.ReceiveMsgBytes(msg_bytes, complete));
assert(node.ReceiveMsgBytes(msg_bytes, complete, m_net_stats));
if (complete) {
node.MarkReceivedMsgsForProcessing();
}
@ -88,7 +88,7 @@ void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
}
}
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg)
{
bool queued = node.m_transport->SetMessageToSend(ser_msg);
assert(queued);

View file

@ -78,9 +78,9 @@ struct ConnmanTestMsg : public CConnman {
return m_msgproc->ProcessMessages(&node, flagInterruptMsgProc);
}
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete);
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg);
void FlushSendBuffer(CNode& node) const;
bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); };