diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index beefc32bee..ddd96a5064 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -14,7 +14,10 @@ #include #include #include +#include +#include +#include #include void ConnmanTestMsg::Handshake(CNode& node, @@ -137,3 +140,271 @@ std::vector GetRandomNodeEvictionCandidates(int n_candida } return candidates; } + +// Have different ZeroSock (or others that inherit from it) objects have different +// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two +// different objects comparing as equal. +static std::atomic g_mocked_sock_fd{0}; + +ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {} + +// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that. +ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; } + +ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; } + +ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const +{ + memset(buf, 0x0, len); + return len; +} + +int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; } + +int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; } + +int ZeroSock::Listen(int) const { return 0; } + +std::unique_ptr ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const +{ + if (addr != nullptr) { + // Pretend all connections come from 5.5.5.5:6789 + memset(addr, 0x00, *addr_len); + const socklen_t write_len = static_cast(sizeof(sockaddr_in)); + if (*addr_len >= write_len) { + *addr_len = write_len; + sockaddr_in* addr_in = reinterpret_cast(addr); + addr_in->sin_family = AF_INET; + memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); + addr_in->sin_port = htons(6789); + } + } + return std::make_unique(); +} + +int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const +{ + std::memset(opt_val, 0x0, *opt_len); + return 0; +} + +int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } + +int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const +{ + std::memset(name, 0x0, *name_len); + return 0; +} + +bool ZeroSock::SetNonBlocking() const { return true; } + +bool ZeroSock::IsSelectable() const { return true; } + +bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +{ + if (occurred != nullptr) { + *occurred = requested; + } + return true; +} + +bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +{ + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = events.requested; + } + return true; +} + +ZeroSock& ZeroSock::operator=(Sock&& other) +{ + assert(false && "Move of Sock into ZeroSock not allowed."); + return *this; +} + +StaticContentsSock::StaticContentsSock(const std::string& contents) + : m_contents{contents} +{ +} + +ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const +{ + const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; + std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); + if ((flags & MSG_PEEK) == 0) { + m_consumed += consume_bytes; + } + return consume_bytes; +} + +StaticContentsSock& StaticContentsSock::operator=(Sock&& other) +{ + assert(false && "Move of Sock into StaticContentsSock not allowed."); + return *this; +} + +ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags) +{ + WAIT_LOCK(m_mutex, lock); + + if (m_data.empty()) { + if (m_eof) { + return 0; + } + errno = EAGAIN; // Same as recv(2) on a non-blocking socket. + return -1; + } + + const size_t read_bytes{std::min(len, m_data.size())}; + + std::memcpy(buf, m_data.data(), read_bytes); + if ((flags & MSG_PEEK) == 0) { + m_data.erase(m_data.begin(), m_data.begin() + read_bytes); + } + + return read_bytes; +} + +std::optional DynSock::Pipe::GetNetMsg() +{ + V1Transport transport{NodeId{0}}; + + { + WAIT_LOCK(m_mutex, lock); + + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + + for (;;) { + Span s{m_data}; + if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s. + return std::nullopt; + } + m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size()); + if (transport.ReceivedMessageComplete()) { + break; + } + if (m_data.empty()) { + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + } + } + } + + bool reject{false}; + CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)}; + if (reject) { + return std::nullopt; + } + return std::make_optional(std::move(msg)); +} + +void DynSock::Pipe::PushBytes(const void* buf, size_t len) +{ + LOCK(m_mutex); + const uint8_t* b = static_cast(buf); + m_data.insert(m_data.end(), b, b + len); + m_cond.notify_all(); +} + +void DynSock::Pipe::Eof() +{ + LOCK(m_mutex); + m_eof = true; + m_cond.notify_all(); +} + +void DynSock::Pipe::WaitForDataOrEof(UniqueLock& lock) +{ + Assert(lock.mutex() == &m_mutex); + + m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { + AssertLockHeld(m_mutex); + return !m_data.empty() || m_eof; + }); +} + +DynSock::DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets) + : m_pipes{pipes}, m_accept_sockets{accept_sockets} +{ +} + +DynSock::~DynSock() +{ + m_pipes->send.Eof(); +} + +ssize_t DynSock::Recv(void* buf, size_t len, int flags) const +{ + return m_pipes->recv.GetBytes(buf, len, flags); +} + +ssize_t DynSock::Send(const void* buf, size_t len, int) const +{ + m_pipes->send.PushBytes(buf, len); + return len; +} + +std::unique_ptr DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const +{ + ZeroSock::Accept(addr, addr_len); + return m_accept_sockets->Pop().value_or(nullptr); +} + +bool DynSock::Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred) const +{ + EventsPerSock ev; + ev.emplace(this, Events{requested}); + const bool ret{WaitMany(timeout, ev)}; + if (occurred != nullptr) { + *occurred = ev.begin()->second.occurred; + } + return ret; +} + +bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +{ + const auto deadline = std::chrono::steady_clock::now() + timeout; + bool at_least_one_event_occurred{false}; + + for (;;) { + // Check all sockets for readiness without waiting. + for (auto& [sock, events] : events_per_sock) { + if ((events.requested & Sock::SEND) != 0) { + // Always ready for Send(). + events.occurred |= Sock::SEND; + at_least_one_event_occurred = true; + } + + if ((events.requested & Sock::RECV) != 0) { + auto dyn_sock = reinterpret_cast(sock.get()); + uint8_t b; + if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) { + events.occurred |= Sock::RECV; + at_least_one_event_occurred = true; + } + } + } + + if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) { + break; + } + + std::this_thread::sleep_for(10ms); + } + + return true; +} + +DynSock& DynSock::operator=(Sock&&) +{ + assert(false && "Move of Sock into DynSock not allowed."); + return *this; +} diff --git a/src/test/util/net.h b/src/test/util/net.h index d3aefda4f0..3e717341d8 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -6,6 +6,7 @@ #define BITCOIN_TEST_UTIL_NET_H #include +#include #include #include #include @@ -20,9 +21,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -131,99 +134,64 @@ constexpr auto ALL_NETWORKS = std::array{ Network::NET_INTERNAL, }; +/** + * A mocked Sock alternative that succeeds on all operations. + * Returns infinite amount of 0x0 bytes on reads. + */ +class ZeroSock : public Sock +{ +public: + ZeroSock(); + + ~ZeroSock() override; + + ssize_t Send(const void*, size_t len, int) const override; + + ssize_t Recv(void* buf, size_t len, int flags) const override; + + int Connect(const sockaddr*, socklen_t) const override; + + int Bind(const sockaddr*, socklen_t) const override; + + int Listen(int) const override; + + std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override; + + int GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const override; + + int SetSockOpt(int, int, const void*, socklen_t) const override; + + int GetSockName(sockaddr* name, socklen_t* name_len) const override; + + bool SetNonBlocking() const override; + + bool IsSelectable() const override; + + bool Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred = nullptr) const override; + + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; + +private: + ZeroSock& operator=(Sock&& other) override; +}; + /** * A mocked Sock alternative that returns a statically contained data upon read and succeeds * and ignores all writes. The data to be returned is given to the constructor and when it is * exhausted an EOF is returned by further reads. */ -class StaticContentsSock : public Sock +class StaticContentsSock : public ZeroSock { public: - explicit StaticContentsSock(const std::string& contents) - : Sock{INVALID_SOCKET}, - m_contents{contents} - { - } + explicit StaticContentsSock(const std::string& contents); - ~StaticContentsSock() override { m_socket = INVALID_SOCKET; } - - StaticContentsSock& operator=(Sock&& other) override - { - assert(false && "Move of Sock into MockSock not allowed."); - return *this; - } - - ssize_t Send(const void*, size_t len, int) const override { return len; } - - ssize_t Recv(void* buf, size_t len, int flags) const override - { - const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; - std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); - if ((flags & MSG_PEEK) == 0) { - m_consumed += consume_bytes; - } - return consume_bytes; - } - - int Connect(const sockaddr*, socklen_t) const override { return 0; } - - int Bind(const sockaddr*, socklen_t) const override { return 0; } - - int Listen(int) const override { return 0; } - - std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override - { - if (addr != nullptr) { - // Pretend all connections come from 5.5.5.5:6789 - memset(addr, 0x00, *addr_len); - const socklen_t write_len = static_cast(sizeof(sockaddr_in)); - if (*addr_len >= write_len) { - *addr_len = write_len; - sockaddr_in* addr_in = reinterpret_cast(addr); - addr_in->sin_family = AF_INET; - memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); - addr_in->sin_port = htons(6789); - } - } - return std::make_unique(""); - }; - - int GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const override - { - std::memset(opt_val, 0x0, *opt_len); - return 0; - } - - int SetSockOpt(int, int, const void*, socklen_t) const override { return 0; } - - int GetSockName(sockaddr* name, socklen_t* name_len) const override - { - std::memset(name, 0x0, *name_len); - return 0; - } - - bool SetNonBlocking() const override { return true; } - - bool IsSelectable() const override { return true; } - - bool Wait(std::chrono::milliseconds timeout, - Event requested, - Event* occurred = nullptr) const override - { - if (occurred != nullptr) { - *occurred = requested; - } - return true; - } - - bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override - { - for (auto& [sock, events] : events_per_sock) { - (void)sock; - events.occurred = events.requested; - } - return true; - } + /** + * Return parts of the contents that was provided at construction until it is exhausted + * and then return 0 (EOF). + */ + ssize_t Recv(void* buf, size_t len, int flags) const override; bool IsConnected(std::string&) const override { @@ -231,10 +199,163 @@ public: } private: + StaticContentsSock& operator=(Sock&& other) override; + const std::string m_contents; mutable size_t m_consumed{0}; }; +/** + * A mocked Sock alternative that allows providing the data to be returned by Recv() + * and inspecting the data that has been supplied to Send(). + */ +class DynSock : public ZeroSock +{ +public: + /** + * Unidirectional bytes or CNetMessage queue (FIFO). + */ + class Pipe + { + public: + /** + * Get bytes and remove them from the pipe. + * @param[in] buf Destination to write bytes to. + * @param[in] len Write up to this number of bytes. + * @param[in] flags Same as the flags of `recv(2)`. Just `MSG_PEEK` is honored. + * @return The number of bytes written to `buf`. `0` if `Eof()` has been called. + * If no bytes are available then `-1` is returned and `errno` is set to `EAGAIN`. + */ + ssize_t GetBytes(void* buf, size_t len, int flags = 0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Deserialize a `CNetMessage` and remove it from the pipe. + * If not enough bytes are available then the function will wait. If parsing fails + * or EOF is signaled to the pipe, then `std::nullopt` is returned. + */ + std::optional GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Push bytes to the pipe. + */ + void PushBytes(const void* buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Construct and push CNetMessage to the pipe. + */ + template + void PushNetMsg(const std::string& type, Args&&... payload) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Signal end-of-file on the receiving end (`GetBytes()` or `GetNetMsg()`). + */ + void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + private: + /** + * Return when there is some data to read or EOF has been signaled. + * @param[in,out] lock Unique lock that must have been derived from `m_mutex` by `WAIT_LOCK(m_mutex, lock)`. + */ + void WaitForDataOrEof(UniqueLock& lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + + Mutex m_mutex; + std::condition_variable m_cond; + std::vector m_data GUARDED_BY(m_mutex); + bool m_eof GUARDED_BY(m_mutex){false}; + }; + + struct Pipes { + Pipe recv; + Pipe send; + }; + + /** + * A basic thread-safe queue, used for queuing sockets to be returned by Accept(). + */ + class Queue + { + public: + using S = std::unique_ptr; + + void Push(S s) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + m_queue.push(std::move(s)); + } + + std::optional Pop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + if (m_queue.empty()) { + return std::nullopt; + } + S front{std::move(m_queue.front())}; + m_queue.pop(); + return front; + } + + bool Empty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + return m_queue.empty(); + } + + private: + mutable Mutex m_mutex; + std::queue m_queue GUARDED_BY(m_mutex); + }; + + /** + * Create a new mocked sock. + * @param[in] pipes Send/recv pipes used by the Send() and Recv() methods. + * @param[in] accept_sockets Sockets to return by the Accept() method. + */ + explicit DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets); + + ~DynSock(); + + ssize_t Recv(void* buf, size_t len, int flags) const override; + + ssize_t Send(const void* buf, size_t len, int) const override; + + std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override; + + bool Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred = nullptr) const override; + + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; + +private: + DynSock& operator=(Sock&&) override; + + std::shared_ptr m_pipes; + std::shared_ptr m_accept_sockets; +}; + +template +void DynSock::Pipe::PushNetMsg(const std::string& type, Args&&... payload) +{ + auto msg = NetMsg::Make(type, std::forward(payload)...); + V1Transport transport{NodeId{0}}; + + const bool queued{transport.SetMessageToSend(msg)}; + assert(queued); + + LOCK(m_mutex); + + for (;;) { + const auto& [bytes, _more, _msg_type] = transport.GetBytesToSend(/*have_next_message=*/true); + if (bytes.empty()) { + break; + } + m_data.insert(m_data.end(), bytes.begin(), bytes.end()); + transport.MarkBytesSent(bytes.size()); + } + + m_cond.notify_all(); +} + std::vector GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context); #endif // BITCOIN_TEST_UTIL_NET_H