mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-03-05 14:06:27 -05:00
Merge bitcoin/bitcoin#28960: kernel: Remove dependency on CScheduler
d5228efb53
kernel: Remove dependency on CScheduler (TheCharlatan)06069b3913
scripted-diff: Rename MainSignals to ValidationSignals (TheCharlatan)0d6d2b650d
scripted-diff: Rename SingleThreadedSchedulerClient to SerialTaskRunner (TheCharlatan)4abde2c4e3
[refactor] Make MainSignals RAII styled (TheCharlatan)84f5c135b8
refactor: De-globalize g_signals (TheCharlatan)473dd4b97a
[refactor] Prepare for g_signals de-globalization (TheCharlatan)3fba3d5dee
[refactor] Make signals optional in mempool and chainman (TheCharlatan) Pull request description: By defining a virtual interface class for the scheduler client, users of the kernel can now define their own event consuming infrastructure, without having to spawn threads or rely on the scheduler design. Removing `CScheduler` also allows removing the thread and exception modules from the kernel library. To make the `CMainSignals` class easier to use from a kernel library perspective, remove its global instantiation and adopt RAII practices. Renames `CMainSignals` to `ValidationSignals`, which more accurately describes its purpose and scope. Also make the `ValidationSignals` in the `ChainstateManager` and CTxMemPool` optional. This could be useful in the future for using or testing these classes without having to instantiate any form of signal handling. --- This PR is part of the [libbitcoinkernel project](https://github.com/bitcoin/bitcoin/issues/27587). It improves the kernel API and removes two modules from the kernel library. ACKs for top commit: maflcko: re-ACKd5228efb53
🌄 ryanofsky: Code review ACKd5228efb53
. Just comment change since last review. vasild: ACKd5228efb53
furszy: diff ACKd5228ef
Tree-SHA512: e93a5f10eb6182effb84bb981859a7ce750e466efd8171045d8d9e7fe46e4065631d9f6f533c5967c4d34c9bb7d7a67e9f4593bd4c5b30cd7b3bbad7be7b331b
This commit is contained in:
commit
c07935bcf5
40 changed files with 366 additions and 290 deletions
|
@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=(
|
|||
"blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... }
|
||||
"coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2))
|
||||
"fs_tests/fsbridge_fstream" # deterministic test failure?
|
||||
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue()
|
||||
"scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue()
|
||||
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
|
||||
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10)
|
||||
)
|
||||
|
||||
TEST_BITCOIN_BINARY="src/test/test_bitcoin"
|
||||
|
|
|
@ -323,6 +323,7 @@ BITCOIN_CORE_H = \
|
|||
util/spanparsing.h \
|
||||
util/string.h \
|
||||
util/syserror.h \
|
||||
util/task_runner.h \
|
||||
util/thread.h \
|
||||
util/threadinterrupt.h \
|
||||
util/threadnames.h \
|
||||
|
@ -966,7 +967,6 @@ libbitcoinkernel_la_SOURCES = \
|
|||
pubkey.cpp \
|
||||
random.cpp \
|
||||
randomenv.cpp \
|
||||
scheduler.cpp \
|
||||
script/interpreter.cpp \
|
||||
script/script.cpp \
|
||||
script/script_error.cpp \
|
||||
|
@ -983,7 +983,6 @@ libbitcoinkernel_la_SOURCES = \
|
|||
util/batchpriority.cpp \
|
||||
util/chaintype.cpp \
|
||||
util/check.cpp \
|
||||
util/exception.cpp \
|
||||
util/fs.cpp \
|
||||
util/fs_helpers.cpp \
|
||||
util/hasher.cpp \
|
||||
|
@ -994,7 +993,6 @@ libbitcoinkernel_la_SOURCES = \
|
|||
util/strencodings.cpp \
|
||||
util/string.cpp \
|
||||
util/syserror.cpp \
|
||||
util/thread.cpp \
|
||||
util/threadnames.cpp \
|
||||
util/time.cpp \
|
||||
util/tokenpipe.cpp \
|
||||
|
|
|
@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
|
|||
generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY));
|
||||
generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY);
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
// Calls SyncWithValidationInterfaceQueue
|
||||
wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO);
|
||||
|
||||
auto bal = GetBalance(wallet); // Cache
|
||||
|
||||
|
|
|
@ -23,11 +23,10 @@
|
|||
#include <node/caches.h>
|
||||
#include <node/chainstate.h>
|
||||
#include <random.h>
|
||||
#include <scheduler.h>
|
||||
#include <script/sigcache.h>
|
||||
#include <util/chaintype.h>
|
||||
#include <util/fs.h>
|
||||
#include <util/thread.h>
|
||||
#include <util/task_runner.h>
|
||||
#include <validation.h>
|
||||
#include <validationinterface.h>
|
||||
|
||||
|
@ -68,16 +67,7 @@ int main(int argc, char* argv[])
|
|||
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
|
||||
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));
|
||||
|
||||
|
||||
// SETUP: Scheduling and Background Signals
|
||||
CScheduler scheduler{};
|
||||
// Start the lightweight task scheduler thread
|
||||
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
|
||||
|
||||
// Gather some entropy once per minute.
|
||||
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
|
||||
|
||||
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
|
||||
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};
|
||||
|
||||
class KernelNotifications : public kernel::Notifications
|
||||
{
|
||||
|
@ -118,6 +108,7 @@ int main(int argc, char* argv[])
|
|||
.chainparams = *chainparams,
|
||||
.datadir = abs_datadir,
|
||||
.notifications = *notifications,
|
||||
.signals = &validation_signals,
|
||||
};
|
||||
const node::BlockManager::Options blockman_opts{
|
||||
.chainparams = chainman_opts.chainparams,
|
||||
|
@ -235,9 +226,9 @@ int main(int argc, char* argv[])
|
|||
|
||||
bool new_block;
|
||||
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
|
||||
RegisterSharedValidationInterface(sc);
|
||||
validation_signals.RegisterSharedValidationInterface(sc);
|
||||
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
|
||||
UnregisterSharedValidationInterface(sc);
|
||||
validation_signals.UnregisterSharedValidationInterface(sc);
|
||||
if (!new_block && accepted) {
|
||||
std::cerr << "duplicate" << std::endl;
|
||||
break;
|
||||
|
@ -287,10 +278,9 @@ int main(int argc, char* argv[])
|
|||
epilogue:
|
||||
// Without this precise shutdown sequence, there will be a lot of nullptr
|
||||
// dereferencing and UB.
|
||||
scheduler.stop();
|
||||
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();
|
||||
|
||||
GetMainSignals().FlushBackgroundCallbacks();
|
||||
validation_signals.FlushBackgroundCallbacks();
|
||||
{
|
||||
LOCK(cs_main);
|
||||
for (Chainstate* chainstate : chainman.GetAll()) {
|
||||
|
@ -300,5 +290,4 @@ epilogue:
|
|||
}
|
||||
}
|
||||
}
|
||||
GetMainSignals().UnregisterBackgroundSignalScheduler();
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ bool BaseIndex::Init()
|
|||
return &m_chain->context()->chainman->GetChainstateForIndexing());
|
||||
// Register to validation interface before setting the 'm_synced' flag, so that
|
||||
// callbacks are not missed once m_synced is true.
|
||||
RegisterValidationInterface(this);
|
||||
m_chain->context()->validation_signals->RegisterValidationInterface(this);
|
||||
|
||||
CBlockLocator locator;
|
||||
if (!GetDB().ReadBestBlock(locator)) {
|
||||
|
@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const
|
|||
}
|
||||
|
||||
LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync()
|
|||
|
||||
void BaseIndex::Stop()
|
||||
{
|
||||
UnregisterValidationInterface(this);
|
||||
if (m_chain->context()->validation_signals) {
|
||||
m_chain->context()->validation_signals->UnregisterValidationInterface(this);
|
||||
}
|
||||
|
||||
if (m_thread_sync.joinable()) {
|
||||
m_thread_sync.join();
|
||||
|
|
47
src/init.cpp
47
src/init.cpp
|
@ -291,7 +291,7 @@ void Shutdown(NodeContext& node)
|
|||
|
||||
// Because these depend on each-other, we make sure that neither can be
|
||||
// using the other before destroying them.
|
||||
if (node.peerman) UnregisterValidationInterface(node.peerman.get());
|
||||
if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get());
|
||||
if (node.connman) node.connman->Stop();
|
||||
|
||||
StopTorControl();
|
||||
|
@ -317,7 +317,9 @@ void Shutdown(NodeContext& node)
|
|||
// fee estimator from validation interface.
|
||||
if (node.fee_estimator) {
|
||||
node.fee_estimator->Flush();
|
||||
UnregisterValidationInterface(node.fee_estimator.get());
|
||||
if (node.validation_signals) {
|
||||
node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get());
|
||||
}
|
||||
}
|
||||
|
||||
// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing
|
||||
|
@ -332,7 +334,7 @@ void Shutdown(NodeContext& node)
|
|||
|
||||
// After there are no more peers/RPC left to give us new data which may generate
|
||||
// CValidationInterface callbacks, flush them...
|
||||
GetMainSignals().FlushBackgroundCallbacks();
|
||||
if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();
|
||||
|
||||
// Stop and delete all indexes only after flushing background callbacks.
|
||||
if (g_txindex) {
|
||||
|
@ -367,17 +369,19 @@ void Shutdown(NodeContext& node)
|
|||
|
||||
#if ENABLE_ZMQ
|
||||
if (g_zmq_notification_interface) {
|
||||
UnregisterValidationInterface(g_zmq_notification_interface.get());
|
||||
if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get());
|
||||
g_zmq_notification_interface.reset();
|
||||
}
|
||||
#endif
|
||||
|
||||
node.chain_clients.clear();
|
||||
UnregisterAllValidationInterfaces();
|
||||
GetMainSignals().UnregisterBackgroundSignalScheduler();
|
||||
if (node.validation_signals) {
|
||||
node.validation_signals->UnregisterAllValidationInterfaces();
|
||||
}
|
||||
node.mempool.reset();
|
||||
node.fee_estimator.reset();
|
||||
node.chainman.reset();
|
||||
node.validation_signals.reset();
|
||||
node.scheduler.reset();
|
||||
node.kernel.reset();
|
||||
|
||||
|
@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
|
||||
assert(!node.scheduler);
|
||||
node.scheduler = std::make_unique<CScheduler>();
|
||||
auto& scheduler = *node.scheduler;
|
||||
|
||||
// Start the lightweight task scheduler thread
|
||||
node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); });
|
||||
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
|
||||
|
||||
// Gather some entropy once per minute.
|
||||
node.scheduler->scheduleEvery([]{
|
||||
scheduler.scheduleEvery([]{
|
||||
RandAddPeriodic();
|
||||
}, std::chrono::minutes{1});
|
||||
|
||||
// Check disk space every 5 minutes to avoid db corruption.
|
||||
node.scheduler->scheduleEvery([&args, &node]{
|
||||
scheduler.scheduleEvery([&args, &node]{
|
||||
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB
|
||||
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) {
|
||||
LogPrintf("Shutting down due to lack of disk space!\n");
|
||||
|
@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
}
|
||||
}, std::chrono::minutes{5});
|
||||
|
||||
GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler);
|
||||
assert(!node.validation_signals);
|
||||
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
|
||||
auto& validation_signals = *node.validation_signals;
|
||||
|
||||
// Create client interfaces for wallets that are supposed to be loaded
|
||||
// according to -wallet and -disablewallet options. This only constructs
|
||||
|
@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
|
||||
// Flush estimates to disk periodically
|
||||
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get();
|
||||
node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
|
||||
RegisterValidationInterface(fee_estimator);
|
||||
scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
|
||||
validation_signals.RegisterValidationInterface(fee_estimator);
|
||||
}
|
||||
|
||||
// Check port numbers
|
||||
|
@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
});
|
||||
|
||||
if (g_zmq_notification_interface) {
|
||||
RegisterValidationInterface(g_zmq_notification_interface.get());
|
||||
validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get());
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
.chainparams = chainparams,
|
||||
.datadir = args.GetDataDirNet(),
|
||||
.notifications = *node.notifications,
|
||||
.signals = &validation_signals,
|
||||
};
|
||||
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction
|
||||
|
||||
|
@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
|
||||
CTxMemPool::Options mempool_opts{
|
||||
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0,
|
||||
.signals = &validation_signals,
|
||||
};
|
||||
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)};
|
||||
if (!result) {
|
||||
|
@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
|
||||
// Drain the validation interface queue to ensure that the old indexes
|
||||
// don't have any pending work.
|
||||
SyncWithValidationInterfaceQueue();
|
||||
Assert(node.validation_signals)->SyncWithValidationInterfaceQueue();
|
||||
|
||||
for (auto* index : node.indexes) {
|
||||
index->Interrupt();
|
||||
|
@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
node.peerman = PeerManager::make(*node.connman, *node.addrman,
|
||||
node.banman.get(), chainman,
|
||||
*node.mempool, peerman_opts);
|
||||
RegisterValidationInterface(node.peerman.get());
|
||||
validation_signals.RegisterValidationInterface(node.peerman.get());
|
||||
|
||||
// ********************************************************* Step 8: start indexers
|
||||
|
||||
|
@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
|
||||
connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);
|
||||
|
||||
if (!node.connman->Start(*node.scheduler, connOptions)) {
|
||||
if (!node.connman->Start(scheduler, connOptions)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
|||
uiInterface.InitMessage(_("Done loading").translated);
|
||||
|
||||
for (const auto& client : node.chain_clients) {
|
||||
client->start(*node.scheduler);
|
||||
client->start(scheduler);
|
||||
}
|
||||
|
||||
BanMan* banman = node.banman.get();
|
||||
node.scheduler->scheduleEvery([banman]{
|
||||
scheduler.scheduleEvery([banman]{
|
||||
banman->DumpBanlist();
|
||||
}, DUMP_BANS_INTERVAL);
|
||||
|
||||
if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler);
|
||||
if (node.peerman) node.peerman->StartScheduledTasks(scheduler);
|
||||
|
||||
#if HAVE_SYSTEM
|
||||
StartupNotify(args);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include <optional>
|
||||
|
||||
class CChainParams;
|
||||
class ValidationSignals;
|
||||
|
||||
static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true};
|
||||
static constexpr auto DEFAULT_MAX_TIP_AGE{24h};
|
||||
|
@ -44,6 +45,7 @@ struct ChainstateManagerOpts {
|
|||
DBOptions coins_db{};
|
||||
CoinsViewOptions coins_view{};
|
||||
Notifications& notifications;
|
||||
ValidationSignals* signals{nullptr};
|
||||
//! Number of script check worker threads. Zero means no parallel verification.
|
||||
int worker_threads_num{0};
|
||||
};
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
#include <cstdint>
|
||||
#include <optional>
|
||||
|
||||
class ValidationSignals;
|
||||
|
||||
/** Default for -maxmempool, maximum megabytes of mempool memory usage */
|
||||
static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300};
|
||||
/** Default for -maxmempool when blocksonly is set */
|
||||
|
@ -56,6 +58,8 @@ struct MemPoolOptions {
|
|||
bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF};
|
||||
bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT};
|
||||
MemPoolLimits limits{};
|
||||
|
||||
ValidationSignals* signals{nullptr};
|
||||
};
|
||||
} // namespace kernel
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ class BanMan;
|
|||
class BaseIndex;
|
||||
class CBlockPolicyEstimator;
|
||||
class CConnman;
|
||||
class ValidationSignals;
|
||||
class CScheduler;
|
||||
class CTxMemPool;
|
||||
class ChainstateManager;
|
||||
|
@ -70,7 +71,10 @@ struct NodeContext {
|
|||
interfaces::WalletLoader* wallet_loader{nullptr};
|
||||
std::unique_ptr<CScheduler> scheduler;
|
||||
std::function<void()> rpc_interruption_point = [] {};
|
||||
//! Issues blocking calls about sync status, errors and warnings
|
||||
std::unique_ptr<KernelNotifications> notifications;
|
||||
//! Issues calls about blocks and transactions
|
||||
std::unique_ptr<ValidationSignals> validation_signals;
|
||||
std::atomic<int> exit_status{EXIT_SUCCESS};
|
||||
|
||||
//! Declare default constructor and destructor that are not inline, so code
|
||||
|
|
|
@ -460,19 +460,20 @@ public:
|
|||
class NotificationsHandlerImpl : public Handler
|
||||
{
|
||||
public:
|
||||
explicit NotificationsHandlerImpl(std::shared_ptr<Chain::Notifications> notifications)
|
||||
: m_proxy(std::make_shared<NotificationsProxy>(std::move(notifications)))
|
||||
explicit NotificationsHandlerImpl(ValidationSignals& signals, std::shared_ptr<Chain::Notifications> notifications)
|
||||
: m_signals{signals}, m_proxy{std::make_shared<NotificationsProxy>(std::move(notifications))}
|
||||
{
|
||||
RegisterSharedValidationInterface(m_proxy);
|
||||
m_signals.RegisterSharedValidationInterface(m_proxy);
|
||||
}
|
||||
~NotificationsHandlerImpl() override { disconnect(); }
|
||||
void disconnect() override
|
||||
{
|
||||
if (m_proxy) {
|
||||
UnregisterSharedValidationInterface(m_proxy);
|
||||
m_signals.UnregisterSharedValidationInterface(m_proxy);
|
||||
m_proxy.reset();
|
||||
}
|
||||
}
|
||||
ValidationSignals& m_signals;
|
||||
std::shared_ptr<NotificationsProxy> m_proxy;
|
||||
};
|
||||
|
||||
|
@ -761,12 +762,12 @@ public:
|
|||
}
|
||||
std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) override
|
||||
{
|
||||
return std::make_unique<NotificationsHandlerImpl>(std::move(notifications));
|
||||
return std::make_unique<NotificationsHandlerImpl>(validation_signals(), std::move(notifications));
|
||||
}
|
||||
void waitForNotificationsIfTipChanged(const uint256& old_tip) override
|
||||
{
|
||||
if (!old_tip.IsNull() && old_tip == WITH_LOCK(::cs_main, return chainman().ActiveChain().Tip()->GetBlockHash())) return;
|
||||
SyncWithValidationInterfaceQueue();
|
||||
validation_signals().SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
std::unique_ptr<Handler> handleRpc(const CRPCCommand& command) override
|
||||
{
|
||||
|
@ -822,6 +823,7 @@ public:
|
|||
NodeContext* context() override { return &m_node; }
|
||||
ArgsManager& args() { return *Assert(m_node.args); }
|
||||
ChainstateManager& chainman() { return *Assert(m_node.chainman); }
|
||||
ValidationSignals& validation_signals() { return *Assert(m_node.validation_signals); }
|
||||
NodeContext& m_node;
|
||||
};
|
||||
} // namespace
|
||||
|
|
|
@ -92,7 +92,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
|||
node.mempool->AddUnbroadcastTx(txid);
|
||||
}
|
||||
|
||||
if (wait_callback) {
|
||||
if (wait_callback && node.validation_signals) {
|
||||
// For transactions broadcast from outside the wallet, make sure
|
||||
// that the wallet has been notified of the transaction before
|
||||
// continuing.
|
||||
|
@ -101,7 +101,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
|||
// with a transaction to/from their wallet, immediately call some
|
||||
// wallet RPC, and get a stale result because callbacks have not
|
||||
// yet been processed.
|
||||
CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
promise.set_value();
|
||||
});
|
||||
callback_set = true;
|
||||
|
|
|
@ -395,7 +395,8 @@ static RPCHelpMan syncwithvalidationinterfacequeue()
|
|||
},
|
||||
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
|
||||
{
|
||||
SyncWithValidationInterfaceQueue();
|
||||
NodeContext& node = EnsureAnyNodeContext(request.context);
|
||||
CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
|
||||
return UniValue::VNULL;
|
||||
},
|
||||
};
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <core_io.h>
|
||||
#include <node/context.h>
|
||||
#include <policy/feerate.h>
|
||||
#include <policy/fees.h>
|
||||
#include <rpc/protocol.h>
|
||||
|
@ -21,10 +22,6 @@
|
|||
#include <cmath>
|
||||
#include <string>
|
||||
|
||||
namespace node {
|
||||
struct NodeContext;
|
||||
}
|
||||
|
||||
using node::NodeContext;
|
||||
|
||||
static RPCHelpMan estimatesmartfee()
|
||||
|
@ -68,7 +65,7 @@ static RPCHelpMan estimatesmartfee()
|
|||
const NodeContext& node = EnsureAnyNodeContext(request.context);
|
||||
const CTxMemPool& mempool = EnsureMemPool(node);
|
||||
|
||||
SyncWithValidationInterfaceQueue();
|
||||
CHECK_NONFATAL(mempool.m_signals)->SyncWithValidationInterfaceQueue();
|
||||
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
|
||||
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
|
||||
bool conservative = true;
|
||||
|
@ -156,8 +153,9 @@ static RPCHelpMan estimaterawfee()
|
|||
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
|
||||
{
|
||||
CBlockPolicyEstimator& fee_estimator = EnsureAnyFeeEstimator(request.context);
|
||||
const NodeContext& node = EnsureAnyNodeContext(request.context);
|
||||
|
||||
SyncWithValidationInterfaceQueue();
|
||||
CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
|
||||
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
|
||||
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
|
||||
double threshold = 0.95;
|
||||
|
|
|
@ -1042,9 +1042,9 @@ static RPCHelpMan submitblock()
|
|||
|
||||
bool new_block;
|
||||
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
|
||||
RegisterSharedValidationInterface(sc);
|
||||
CHECK_NONFATAL(chainman.m_options.signals)->RegisterSharedValidationInterface(sc);
|
||||
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
|
||||
UnregisterSharedValidationInterface(sc);
|
||||
CHECK_NONFATAL(chainman.m_options.signals)->UnregisterSharedValidationInterface(sc);
|
||||
if (!new_block && accepted) {
|
||||
return "duplicate";
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ static RPCHelpMan mockscheduler()
|
|||
|
||||
const NodeContext& node_context{EnsureAnyNodeContext(request.context)};
|
||||
CHECK_NONFATAL(node_context.scheduler)->MockForward(std::chrono::seconds{delta_seconds});
|
||||
SyncWithValidationInterfaceQueue();
|
||||
CHECK_NONFATAL(node_context.validation_signals)->SyncWithValidationInterfaceQueue();
|
||||
for (const auto& chain_client : node_context.chain_clients) {
|
||||
chain_client->schedulerMockForward(std::chrono::seconds(delta_seconds));
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const
|
|||
}
|
||||
|
||||
|
||||
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
|
||||
void SerialTaskRunner::MaybeScheduleProcessQueue()
|
||||
{
|
||||
{
|
||||
LOCK(m_callbacks_mutex);
|
||||
|
@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
|
|||
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::ProcessQueue()
|
||||
void SerialTaskRunner::ProcessQueue()
|
||||
{
|
||||
std::function<void()> callback;
|
||||
{
|
||||
|
@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue()
|
|||
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
|
||||
// to ensure both happen safely even if callback() throws.
|
||||
struct RAIICallbacksRunning {
|
||||
SingleThreadedSchedulerClient* instance;
|
||||
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
|
||||
SerialTaskRunner* instance;
|
||||
explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
|
||||
~RAIICallbacksRunning()
|
||||
{
|
||||
{
|
||||
|
@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue()
|
|||
callback();
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
|
||||
void SerialTaskRunner::insert(std::function<void()> func)
|
||||
{
|
||||
{
|
||||
LOCK(m_callbacks_mutex);
|
||||
|
@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func
|
|||
MaybeScheduleProcessQueue();
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::EmptyQueue()
|
||||
void SerialTaskRunner::flush()
|
||||
{
|
||||
assert(!m_scheduler.AreThreadsServicingQueue());
|
||||
bool should_continue = true;
|
||||
|
@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue()
|
|||
}
|
||||
}
|
||||
|
||||
size_t SingleThreadedSchedulerClient::CallbacksPending()
|
||||
size_t SerialTaskRunner::size()
|
||||
{
|
||||
LOCK(m_callbacks_mutex);
|
||||
return m_callbacks_pending.size();
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include <attributes.h>
|
||||
#include <sync.h>
|
||||
#include <threadsafety.h>
|
||||
#include <util/task_runner.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
|
@ -120,12 +121,16 @@ private:
|
|||
* B() will be able to observe all of the effects of callback A() which executed
|
||||
* before it.
|
||||
*/
|
||||
class SingleThreadedSchedulerClient
|
||||
class SerialTaskRunner : public util::TaskRunnerInterface
|
||||
{
|
||||
private:
|
||||
CScheduler& m_scheduler;
|
||||
|
||||
Mutex m_callbacks_mutex;
|
||||
|
||||
// We are not allowed to assume the scheduler only runs in one thread,
|
||||
// but must ensure all callbacks happen in-order, so we end up creating
|
||||
// our own queue here :(
|
||||
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
|
||||
bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
|
||||
|
||||
|
@ -133,7 +138,7 @@ private:
|
|||
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
|
||||
public:
|
||||
explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
|
||||
explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
|
||||
|
||||
/**
|
||||
* Add a callback to be executed. Callbacks are executed serially
|
||||
|
@ -141,15 +146,15 @@ public:
|
|||
* Practically, this means that callbacks can behave as if they are executed
|
||||
* in order by a single thread.
|
||||
*/
|
||||
void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
|
||||
/**
|
||||
* Processes all remaining queue members on the calling thread, blocking until queue is empty
|
||||
* Must be called after the CScheduler has no remaining processing threads!
|
||||
*/
|
||||
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
|
||||
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||
};
|
||||
|
||||
#endif // BITCOIN_SCHEDULER_H
|
||||
|
|
|
@ -70,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(coinstatsindex_initial_sync, TestChain100Setup)
|
|||
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
|
||||
// TSAN always sees the test thread waiting for the notification thread, and
|
||||
// avoid potential false positive reports.
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
// Shutdown sequence (c.f. Shutdown() in init.cpp)
|
||||
coin_stats_index.Stop();
|
||||
|
|
|
@ -47,7 +47,7 @@ void initialize_tx_pool()
|
|||
g_outpoints_coinbase_init_mature.push_back(prevout);
|
||||
}
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
struct OutpointsUpdater final : public CValidationInterface {
|
||||
|
@ -147,7 +147,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
|
|||
}
|
||||
|
||||
auto outpoints_updater = std::make_shared<OutpointsUpdater>(mempool_outpoints);
|
||||
RegisterSharedValidationInterface(outpoints_updater);
|
||||
node.validation_signals->RegisterSharedValidationInterface(outpoints_updater);
|
||||
|
||||
CTxMemPool tx_pool_{MakeMempool(fuzzed_data_provider, node)};
|
||||
MockedTxPool& tx_pool = *static_cast<MockedTxPool*>(&tx_pool_);
|
||||
|
@ -269,7 +269,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
|
|||
// Remember all added transactions
|
||||
std::set<CTransactionRef> added;
|
||||
auto txr = std::make_shared<TransactionsDelta>(added);
|
||||
RegisterSharedValidationInterface(txr);
|
||||
node.validation_signals->RegisterSharedValidationInterface(txr);
|
||||
|
||||
// When there are multiple transactions in the package, we call ProcessNewPackage(txs, test_accept=false)
|
||||
// and AcceptToMemoryPool(txs.back(), test_accept=true). When there is only 1 transaction, we might flip it
|
||||
|
@ -285,8 +285,8 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
|
|||
/*bypass_limits=*/false, /*test_accept=*/!single_submit));
|
||||
const bool passed = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
|
||||
|
||||
SyncWithValidationInterfaceQueue();
|
||||
UnregisterSharedValidationInterface(txr);
|
||||
node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
node.validation_signals->UnregisterSharedValidationInterface(txr);
|
||||
|
||||
// There is only 1 transaction in the package. We did a test-package-accept and a ATMP
|
||||
if (single_submit) {
|
||||
|
@ -310,7 +310,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
|
|||
CheckMempoolV3Invariants(tx_pool);
|
||||
}
|
||||
|
||||
UnregisterSharedValidationInterface(outpoints_updater);
|
||||
node.validation_signals->UnregisterSharedValidationInterface(outpoints_updater);
|
||||
|
||||
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ void initialize_process_message()
|
|||
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
|
||||
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
FUZZ_TARGET(process_message, .init = initialize_process_message)
|
||||
|
@ -89,6 +89,6 @@ FUZZ_TARGET(process_message, .init = initialize_process_message)
|
|||
}
|
||||
g_setup->m_node.peerman->SendMessages(&p2p_node);
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.connman->StopNodes();
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ void initialize_process_messages()
|
|||
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
|
||||
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
FUZZ_TARGET(process_messages, .init = initialize_process_messages)
|
||||
|
@ -89,6 +89,6 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
|
|||
g_setup->m_node.peerman->SendMessages(&random_node);
|
||||
}
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.connman->StopNodes();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ void initialize_tx_pool()
|
|||
g_outpoints_coinbase_init_immature;
|
||||
outpoints.push_back(prevout);
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
struct TransactionsDelta final : public CValidationInterface {
|
||||
|
@ -105,7 +105,7 @@ void Finish(FuzzedDataProvider& fuzzed_data_provider, MockedTxPool& tx_pool, Cha
|
|||
assert(tx_pool.size() < info_all.size());
|
||||
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
void MockTime(FuzzedDataProvider& fuzzed_data_provider, const Chainstate& chainstate)
|
||||
|
@ -285,7 +285,7 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
|
|||
std::set<CTransactionRef> removed;
|
||||
std::set<CTransactionRef> added;
|
||||
auto txr = std::make_shared<TransactionsDelta>(removed, added);
|
||||
RegisterSharedValidationInterface(txr);
|
||||
node.validation_signals->RegisterSharedValidationInterface(txr);
|
||||
const bool bypass_limits = fuzzed_data_provider.ConsumeBool();
|
||||
|
||||
// Make sure ProcessNewPackage on one transaction works.
|
||||
|
@ -303,8 +303,8 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
|
|||
|
||||
const auto res = WITH_LOCK(::cs_main, return AcceptToMemoryPool(chainstate, tx, GetTime(), bypass_limits, /*test_accept=*/false));
|
||||
const bool accepted = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
|
||||
SyncWithValidationInterfaceQueue();
|
||||
UnregisterSharedValidationInterface(txr);
|
||||
node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
node.validation_signals->UnregisterSharedValidationInterface(txr);
|
||||
|
||||
bool txid_in_mempool = tx_pool.exists(GenTxid::Txid(tx->GetHash()));
|
||||
bool wtxid_in_mempool = tx_pool.exists(GenTxid::Wtxid(tx->GetWitnessHash()));
|
||||
|
|
|
@ -25,7 +25,7 @@ static void mineBlock(const node::NodeContext& node, std::chrono::seconds block_
|
|||
block.fChecked = true; // little speedup
|
||||
SetMockTime(curr_time); // process block at current time
|
||||
Assert(node.chainman->ProcessNewBlock(std::make_shared<const CBlock>(block), /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
|
||||
SyncWithValidationInterfaceQueue(); // drain events queue
|
||||
node.validation_signals->SyncWithValidationInterfaceQueue(); // drain events queue
|
||||
}
|
||||
|
||||
// Verifying when network-limited peer connections are desirable based on the node's proximity to the tip
|
||||
|
@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE(connections_desirable_service_flags)
|
|||
|
||||
// By now, we tested that the connections desirable services flags change based on the node's time proximity to the tip.
|
||||
// Now, perform the same tests for when the node receives a block.
|
||||
RegisterValidationInterface(peerman.get());
|
||||
m_node.validation_signals->RegisterValidationInterface(peerman.get());
|
||||
|
||||
// First, verify a block in the past doesn't enable limited peers connections
|
||||
// At this point, our time is (NODE_NETWORK_LIMITED_ALLOW_CONN_BLOCKS + 1) * 10 minutes ahead the tip's time.
|
||||
|
|
|
@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
{
|
||||
CBlockPolicyEstimator& feeEst = *Assert(m_node.fee_estimator);
|
||||
CTxMemPool& mpool = *Assert(m_node.mempool);
|
||||
RegisterValidationInterface(&feeEst);
|
||||
m_node.validation_signals->RegisterValidationInterface(&feeEst);
|
||||
TestMemPoolEntryHelper entry;
|
||||
CAmount basefee(2000);
|
||||
CAmount deltaFee(100);
|
||||
|
@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
/*submitted_in_package=*/false,
|
||||
/*chainstate_is_current=*/true,
|
||||
/*has_no_mempool_parents=*/true)};
|
||||
GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
}
|
||||
uint256 hash = tx.GetHash();
|
||||
txHashes[j].push_back(hash);
|
||||
|
@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
// Check after just a few txs that combining buckets works as expected
|
||||
if (blocknum == 3) {
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
// At this point we should need to combine 3 buckets to get enough data points
|
||||
// So estimateFee(1) should fail and estimateFee(2) should return somewhere around
|
||||
// 9*baserate. estimateFee(2) %'s are 100,100,90 = average 97%
|
||||
|
@ -113,7 +113,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
}
|
||||
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
std::vector<CAmount> origFeeEst;
|
||||
// Highest feerate is 10*baseRate and gets in all blocks,
|
||||
|
@ -146,7 +146,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
}
|
||||
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
|
||||
for (int i = 2; i < 10;i++) {
|
||||
|
@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
/*submitted_in_package=*/false,
|
||||
/*chainstate_is_current=*/true,
|
||||
/*has_no_mempool_parents=*/true)};
|
||||
GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
}
|
||||
uint256 hash = tx.GetHash();
|
||||
txHashes[j].push_back(hash);
|
||||
|
@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
}
|
||||
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
for (int i = 1; i < 10;i++) {
|
||||
BOOST_CHECK(feeEst.estimateFee(i) == CFeeRate(0) || feeEst.estimateFee(i).GetFeePerK() > origFeeEst[i-1] - deltaFee);
|
||||
|
@ -212,7 +212,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
block.clear();
|
||||
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
|
||||
for (int i = 2; i < 10;i++) {
|
||||
|
@ -239,7 +239,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
/*submitted_in_package=*/false,
|
||||
/*chainstate_is_current=*/true,
|
||||
/*has_no_mempool_parents=*/true)};
|
||||
GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
|
||||
}
|
||||
uint256 hash = tx.GetHash();
|
||||
CTransactionRef ptx = mpool.get(hash);
|
||||
|
@ -257,7 +257,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
|
|||
block.clear();
|
||||
}
|
||||
// Wait for fee estimator to catch up
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
|
||||
for (int i = 2; i < 9; i++) { // At 9, the original estimate was already at the bottom (b/c scale = 2)
|
||||
BOOST_CHECK(feeEst.estimateFee(i).GetFeePerK() < origFeeEst[i-1] - deltaFee);
|
||||
|
|
|
@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||
CScheduler scheduler;
|
||||
|
||||
// each queue should be well ordered with respect to itself but not other queues
|
||||
SingleThreadedSchedulerClient queue1(scheduler);
|
||||
SingleThreadedSchedulerClient queue2(scheduler);
|
||||
SerialTaskRunner queue1(scheduler);
|
||||
SerialTaskRunner queue2(scheduler);
|
||||
|
||||
// create more threads than queues
|
||||
// if the queues only permit execution of one task at once then
|
||||
|
@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||
threads.emplace_back([&] { scheduler.serviceQueue(); });
|
||||
}
|
||||
|
||||
// these are not atomic, if SinglethreadedSchedulerClient prevents
|
||||
// these are not atomic, if SerialTaskRunner prevents
|
||||
// parallel execution at the queue level no synchronization should be required here
|
||||
int counter1 = 0;
|
||||
int counter2 = 0;
|
||||
|
@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||
// just simply count up on each queue - if execution is properly ordered then
|
||||
// the callbacks should run in exactly the order in which they were enqueued
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
queue1.AddToProcessQueue([i, &counter1]() {
|
||||
queue1.insert([i, &counter1]() {
|
||||
bool expectation = i == counter1++;
|
||||
assert(expectation);
|
||||
});
|
||||
|
||||
queue2.AddToProcessQueue([i, &counter2]() {
|
||||
queue2.insert([i, &counter2]() {
|
||||
bool expectation = i == counter2++;
|
||||
assert(expectation);
|
||||
});
|
||||
|
|
|
@ -71,7 +71,7 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
|
|||
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
|
||||
// TSAN always sees the test thread waiting for the notification thread, and
|
||||
// avoid potential false positive reports.
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
// shutdown sequence (c.f. Shutdown() in init.cpp)
|
||||
txindex.Stop();
|
||||
|
|
|
@ -95,12 +95,12 @@ COutPoint MineBlock(const NodeContext& node, std::shared_ptr<CBlock>& block)
|
|||
const auto old_height = WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight());
|
||||
bool new_block;
|
||||
BlockValidationStateCatcher bvsc{block->GetHash()};
|
||||
RegisterValidationInterface(&bvsc);
|
||||
node.validation_signals->RegisterValidationInterface(&bvsc);
|
||||
const bool processed{chainman.ProcessNewBlock(block, true, true, &new_block)};
|
||||
const bool duplicate{!new_block && processed};
|
||||
assert(!duplicate);
|
||||
UnregisterValidationInterface(&bvsc);
|
||||
SyncWithValidationInterfaceQueue();
|
||||
node.validation_signals->UnregisterValidationInterface(&bvsc);
|
||||
node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
const bool was_valid{bvsc.m_state && bvsc.m_state->IsValid()};
|
||||
assert(old_height + was_valid == WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight()));
|
||||
|
||||
|
|
|
@ -175,7 +175,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
|
|||
// from blocking due to queue overrun.
|
||||
m_node.scheduler = std::make_unique<CScheduler>();
|
||||
m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); });
|
||||
GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler);
|
||||
m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler));
|
||||
|
||||
m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES);
|
||||
m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node));
|
||||
|
@ -189,6 +189,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
|
|||
.datadir = m_args.GetDataDirNet(),
|
||||
.check_block_index = true,
|
||||
.notifications = *m_node.notifications,
|
||||
.signals = m_node.validation_signals.get(),
|
||||
.worker_threads_num = 2,
|
||||
};
|
||||
const BlockManager::Options blockman_opts{
|
||||
|
@ -206,8 +207,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
|
|||
ChainTestingSetup::~ChainTestingSetup()
|
||||
{
|
||||
if (m_node.scheduler) m_node.scheduler->stop();
|
||||
GetMainSignals().FlushBackgroundCallbacks();
|
||||
GetMainSignals().UnregisterBackgroundSignalScheduler();
|
||||
m_node.validation_signals->FlushBackgroundCallbacks();
|
||||
m_node.connman.reset();
|
||||
m_node.banman.reset();
|
||||
m_node.addrman.reset();
|
||||
|
@ -216,6 +216,7 @@ ChainTestingSetup::~ChainTestingSetup()
|
|||
m_node.mempool.reset();
|
||||
m_node.fee_estimator.reset();
|
||||
m_node.chainman.reset();
|
||||
m_node.validation_signals.reset();
|
||||
m_node.scheduler.reset();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node)
|
|||
// Default to always checking mempool regardless of
|
||||
// chainparams.DefaultConsistencyChecks for tests
|
||||
.check_ratio = 1,
|
||||
.signals = node.validation_signals.get(),
|
||||
};
|
||||
const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)};
|
||||
Assert(result);
|
||||
|
|
|
@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
|
|||
bool ignored;
|
||||
// Connect the genesis block and drain any outstanding events
|
||||
BOOST_CHECK(Assert(m_node.chainman)->ProcessNewBlock(std::make_shared<CBlock>(Params().GenesisBlock()), true, true, &ignored));
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
// subscribe to events (this subscriber will validate event ordering)
|
||||
const CBlockIndex* initial_tip = nullptr;
|
||||
|
@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
|
|||
initial_tip = m_node.chainman->ActiveChain().Tip();
|
||||
}
|
||||
auto sub = std::make_shared<TestSubscriber>(initial_tip->GetBlockHash());
|
||||
RegisterSharedValidationInterface(sub);
|
||||
m_node.validation_signals->RegisterSharedValidationInterface(sub);
|
||||
|
||||
// create a bunch of threads that repeatedly process a block generated above at random
|
||||
// this will create parallelism and randomness inside validation - the ValidationInterface
|
||||
|
@ -196,9 +196,9 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
|
|||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
UnregisterSharedValidationInterface(sub);
|
||||
m_node.validation_signals->UnregisterSharedValidationInterface(sub);
|
||||
|
||||
LOCK(cs_main);
|
||||
BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash());
|
||||
|
|
|
@ -102,7 +102,7 @@ BOOST_FIXTURE_TEST_CASE(chainstatemanager, TestChain100Setup)
|
|||
BOOST_CHECK_EQUAL(active_tip2, c2.m_chain.Tip());
|
||||
|
||||
// Let scheduler events finish running to avoid accessing memory that is going to be unloaded
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
|
||||
//! Test rebalancing the caches associated with each chainstate.
|
||||
|
@ -374,7 +374,7 @@ struct SnapshotTestSetup : TestChain100Setup {
|
|||
cs->ForceFlushStateToDisk();
|
||||
}
|
||||
// Process all callbacks referring to the old manager before wiping it.
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
LOCK(::cs_main);
|
||||
chainman.ResetChainstates();
|
||||
BOOST_CHECK_EQUAL(chainman.GetAll().size(), 0);
|
||||
|
@ -383,6 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup {
|
|||
.chainparams = ::Params(),
|
||||
.datadir = chainman.m_options.datadir,
|
||||
.notifications = *m_node.notifications,
|
||||
.signals = m_node.validation_signals.get(),
|
||||
};
|
||||
const BlockManager::Options blockman_opts{
|
||||
.chainparams = chainman_opts.chainparams,
|
||||
|
|
|
@ -28,7 +28,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
|
|||
const CBlock block_dummy;
|
||||
BlockValidationState state_dummy;
|
||||
while (generate) {
|
||||
GetMainSignals().BlockChecked(block_dummy, state_dummy);
|
||||
m_node.validation_signals->BlockChecked(block_dummy, state_dummy);
|
||||
}
|
||||
}};
|
||||
|
||||
|
@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
|
|||
// keep going for about 1 sec, which is 250k iterations
|
||||
for (int i = 0; i < 250000; i++) {
|
||||
auto sub = std::make_shared<TestSubscriberNoop>();
|
||||
RegisterSharedValidationInterface(sub);
|
||||
UnregisterSharedValidationInterface(sub);
|
||||
m_node.validation_signals->RegisterSharedValidationInterface(sub);
|
||||
m_node.validation_signals->UnregisterSharedValidationInterface(sub);
|
||||
}
|
||||
// tell the other thread we are done
|
||||
generate = false;
|
||||
|
@ -52,8 +52,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
|
|||
class TestInterface : public CValidationInterface
|
||||
{
|
||||
public:
|
||||
TestInterface(std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
|
||||
: m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy))
|
||||
TestInterface(ValidationSignals& signals, std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
|
||||
: m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals}
|
||||
{
|
||||
}
|
||||
virtual ~TestInterface()
|
||||
|
@ -64,14 +64,15 @@ public:
|
|||
{
|
||||
if (m_on_call) m_on_call();
|
||||
}
|
||||
static void Call()
|
||||
void Call()
|
||||
{
|
||||
CBlock block;
|
||||
BlockValidationState state;
|
||||
GetMainSignals().BlockChecked(block, state);
|
||||
m_signals.BlockChecked(block, state);
|
||||
}
|
||||
std::function<void()> m_on_call;
|
||||
std::function<void()> m_on_destroy;
|
||||
ValidationSignals& m_signals;
|
||||
};
|
||||
|
||||
// Regression test to ensure UnregisterAllValidationInterfaces calls don't
|
||||
|
@ -80,17 +81,23 @@ public:
|
|||
BOOST_AUTO_TEST_CASE(unregister_all_during_call)
|
||||
{
|
||||
bool destroyed = false;
|
||||
RegisterSharedValidationInterface(std::make_shared<TestInterface>(
|
||||
auto shared{std::make_shared<TestInterface>(
|
||||
*m_node.validation_signals,
|
||||
[&] {
|
||||
// First call should decrements reference count 2 -> 1
|
||||
UnregisterAllValidationInterfaces();
|
||||
m_node.validation_signals->UnregisterAllValidationInterfaces();
|
||||
BOOST_CHECK(!destroyed);
|
||||
// Second call should not decrement reference count 1 -> 0
|
||||
UnregisterAllValidationInterfaces();
|
||||
m_node.validation_signals->UnregisterAllValidationInterfaces();
|
||||
BOOST_CHECK(!destroyed);
|
||||
},
|
||||
[&] { destroyed = true; }));
|
||||
TestInterface::Call();
|
||||
[&] { destroyed = true; })};
|
||||
m_node.validation_signals->RegisterSharedValidationInterface(shared);
|
||||
BOOST_CHECK(shared.use_count() == 2);
|
||||
shared->Call();
|
||||
BOOST_CHECK(shared.use_count() == 1);
|
||||
BOOST_CHECK(!destroyed);
|
||||
shared.reset();
|
||||
BOOST_CHECK(destroyed);
|
||||
}
|
||||
|
||||
|
|
|
@ -406,7 +406,8 @@ CTxMemPool::CTxMemPool(const Options& opts)
|
|||
m_require_standard{opts.require_standard},
|
||||
m_full_rbf{opts.full_rbf},
|
||||
m_persist_v1_dat{opts.persist_v1_dat},
|
||||
m_limits{opts.limits}
|
||||
m_limits{opts.limits},
|
||||
m_signals{opts.signals}
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -487,12 +488,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
|
|||
// even if not directly reported below.
|
||||
uint64_t mempool_sequence = GetAndIncrementSequence();
|
||||
|
||||
if (reason != MemPoolRemovalReason::BLOCK) {
|
||||
if (reason != MemPoolRemovalReason::BLOCK && m_signals) {
|
||||
// Notify clients that a transaction has been removed from the mempool
|
||||
// for any reason except being included in a block. Clients interested
|
||||
// in transactions included in blocks can subscribe to the BlockConnected
|
||||
// notification.
|
||||
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
|
||||
m_signals->TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
|
||||
}
|
||||
TRACE5(mempool, removed,
|
||||
it->GetTx().GetHash().data(),
|
||||
|
@ -643,7 +644,9 @@ void CTxMemPool::removeForBlock(const std::vector<CTransactionRef>& vtx, unsigne
|
|||
removeConflicts(*tx);
|
||||
ClearPrioritisation(tx->GetHash());
|
||||
}
|
||||
GetMainSignals().MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
|
||||
if (m_signals) {
|
||||
m_signals->MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
|
||||
}
|
||||
lastRollingFeeUpdate = GetTime();
|
||||
blockSinceLastRollingFeeBump = true;
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include <vector>
|
||||
|
||||
class CChain;
|
||||
class ValidationSignals;
|
||||
|
||||
/** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */
|
||||
static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF;
|
||||
|
@ -447,6 +448,8 @@ public:
|
|||
|
||||
const Limits m_limits;
|
||||
|
||||
ValidationSignals* const m_signals;
|
||||
|
||||
/** Create a new CTxMemPool.
|
||||
* Sanity checks will be off by default for performance, because otherwise
|
||||
* accepting transactions becomes O(N^2) where N is the number of transactions
|
||||
|
|
52
src/util/task_runner.h
Normal file
52
src/util/task_runner.h
Normal file
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_UTIL_TASK_RUNNER_H
|
||||
#define BITCOIN_UTIL_TASK_RUNNER_H
|
||||
|
||||
#include <cstddef>
|
||||
#include <functional>
|
||||
|
||||
namespace util {
|
||||
|
||||
/** @file
|
||||
* This header provides an interface and simple implementation for a task
|
||||
* runner. Another threaded, serial implementation using a queue is available in
|
||||
* the scheduler module's SerialTaskRunner.
|
||||
*/
|
||||
|
||||
class TaskRunnerInterface
|
||||
{
|
||||
public:
|
||||
virtual ~TaskRunnerInterface() {}
|
||||
|
||||
/**
|
||||
* The callback can either be queued for later/asynchronous/threaded
|
||||
* processing, or be executed immediately for synchronous processing.
|
||||
*/
|
||||
|
||||
virtual void insert(std::function<void()> func) = 0;
|
||||
|
||||
/**
|
||||
* Forces the processing of all pending events.
|
||||
*/
|
||||
virtual void flush() = 0;
|
||||
|
||||
/**
|
||||
* Returns the number of currently pending events.
|
||||
*/
|
||||
virtual size_t size() = 0;
|
||||
};
|
||||
|
||||
class ImmediateTaskRunner : public TaskRunnerInterface
|
||||
{
|
||||
public:
|
||||
void insert(std::function<void()> func) override { func(); }
|
||||
void flush() override {}
|
||||
size_t size() override { return 0; }
|
||||
};
|
||||
|
||||
} // namespace util
|
||||
|
||||
#endif // BITCOIN_UTIL_TASK_RUNNER_H
|
|
@ -1228,13 +1228,14 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
|
|||
results.emplace(ws.m_ptx->GetWitnessHash(),
|
||||
MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize,
|
||||
ws.m_base_fees, effective_feerate, effective_feerate_wtxids));
|
||||
if (!m_pool.m_signals) continue;
|
||||
const CTransaction& tx = *ws.m_ptx;
|
||||
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
|
||||
ws.m_vsize, ws.m_entry->GetHeight(),
|
||||
args.m_bypass_limits, args.m_package_submission,
|
||||
IsCurrentForFeeEstimation(m_active_chainstate),
|
||||
m_pool.HasNoInputsOf(tx));
|
||||
GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
|
||||
m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
|
||||
}
|
||||
return all_submitted;
|
||||
}
|
||||
|
@ -1242,7 +1243,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
|
|||
MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args)
|
||||
{
|
||||
AssertLockHeld(cs_main);
|
||||
LOCK(m_pool.cs); // mempool "read lock" (held through GetMainSignals().TransactionAddedToMempool())
|
||||
LOCK(m_pool.cs); // mempool "read lock" (held through m_pool.m_signals->TransactionAddedToMempool())
|
||||
|
||||
Workspace ws(ptx);
|
||||
const std::vector<Wtxid> single_wtxid{ws.m_ptx->GetWitnessHash()};
|
||||
|
@ -1277,13 +1278,15 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
|
|||
return MempoolAcceptResult::FeeFailure(ws.m_state, CFeeRate(ws.m_modified_fees, ws.m_vsize), {ws.m_ptx->GetWitnessHash()});
|
||||
}
|
||||
|
||||
if (m_pool.m_signals) {
|
||||
const CTransaction& tx = *ws.m_ptx;
|
||||
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
|
||||
ws.m_vsize, ws.m_entry->GetHeight(),
|
||||
args.m_bypass_limits, args.m_package_submission,
|
||||
IsCurrentForFeeEstimation(m_active_chainstate),
|
||||
m_pool.HasNoInputsOf(tx));
|
||||
GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
|
||||
m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
|
||||
}
|
||||
|
||||
return MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees,
|
||||
effective_feerate, single_wtxid);
|
||||
|
@ -2695,9 +2698,9 @@ bool Chainstate::FlushStateToDisk(
|
|||
(bool)fFlushForPrune);
|
||||
}
|
||||
}
|
||||
if (full_flush_completed) {
|
||||
if (full_flush_completed && m_chainman.m_options.signals) {
|
||||
// Update best block in wallet (so we can detect restored wallets).
|
||||
GetMainSignals().ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
|
||||
m_chainman.m_options.signals->ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
|
||||
}
|
||||
} catch (const std::runtime_error& e) {
|
||||
return FatalError(m_chainman.GetNotifications(), state, std::string("System error while flushing: ") + e.what());
|
||||
|
@ -2864,7 +2867,9 @@ bool Chainstate::DisconnectTip(BlockValidationState& state, DisconnectedBlockTra
|
|||
UpdateTip(pindexDelete->pprev);
|
||||
// Let wallets know transactions went from 1-confirmed to
|
||||
// 0-confirmed or conflicted:
|
||||
GetMainSignals().BlockDisconnected(pblock, pindexDelete);
|
||||
if (m_chainman.m_options.signals) {
|
||||
m_chainman.m_options.signals->BlockDisconnected(pblock, pindexDelete);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2949,7 +2954,9 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew,
|
|||
{
|
||||
CCoinsViewCache view(&CoinsTip());
|
||||
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
|
||||
GetMainSignals().BlockChecked(blockConnecting, state);
|
||||
if (m_chainman.m_options.signals) {
|
||||
m_chainman.m_options.signals->BlockChecked(blockConnecting, state);
|
||||
}
|
||||
if (!rv) {
|
||||
if (state.IsInvalid())
|
||||
InvalidBlockFound(pindexNew, state);
|
||||
|
@ -3210,11 +3217,11 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main)
|
|||
return fNotify;
|
||||
}
|
||||
|
||||
static void LimitValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main) {
|
||||
static void LimitValidationInterfaceQueue(ValidationSignals& signals) LOCKS_EXCLUDED(cs_main) {
|
||||
AssertLockNotHeld(cs_main);
|
||||
|
||||
if (GetMainSignals().CallbacksPending() > 10) {
|
||||
SyncWithValidationInterfaceQueue();
|
||||
if (signals.CallbacksPending() > 10) {
|
||||
signals.SyncWithValidationInterfaceQueue();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3252,7 +3259,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
|
|||
// Note that if a validationinterface callback ends up calling
|
||||
// ActivateBestChain this may lead to a deadlock! We should
|
||||
// probably have a DEBUG_LOCKORDER test for this in the future.
|
||||
LimitValidationInterfaceQueue();
|
||||
if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
|
||||
|
||||
{
|
||||
LOCK(cs_main);
|
||||
|
@ -3291,7 +3298,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
|
|||
|
||||
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
|
||||
assert(trace.pblock && trace.pindex);
|
||||
GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
|
||||
if (m_chainman.m_options.signals) {
|
||||
m_chainman.m_options.signals->BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
|
||||
}
|
||||
}
|
||||
|
||||
// This will have been toggled in
|
||||
|
@ -3317,7 +3326,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
|
|||
// Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected
|
||||
if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) {
|
||||
// Notify ValidationInterface subscribers
|
||||
GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
|
||||
if (m_chainman.m_options.signals) {
|
||||
m_chainman.m_options.signals->UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
|
||||
}
|
||||
|
||||
// Always notify the UI if a new block tip was connected
|
||||
if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) {
|
||||
|
@ -3451,7 +3462,7 @@ bool Chainstate::InvalidateBlock(BlockValidationState& state, CBlockIndex* pinde
|
|||
if (m_chainman.m_interrupt) break;
|
||||
|
||||
// Make sure the queue of validation callbacks doesn't grow unboundedly.
|
||||
LimitValidationInterfaceQueue();
|
||||
if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
|
||||
|
||||
LOCK(cs_main);
|
||||
// Lock for as long as disconnectpool is in scope to make sure MaybeUpdateMempoolForReorg is
|
||||
|
@ -4237,8 +4248,9 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr<const CBlock>& pblock,
|
|||
|
||||
// Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW
|
||||
// (but if it does not build on our best tip, let the SendMessages loop relay it)
|
||||
if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev)
|
||||
GetMainSignals().NewPoWValidBlock(pindex, pblock);
|
||||
if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev && m_options.signals) {
|
||||
m_options.signals->NewPoWValidBlock(pindex, pblock);
|
||||
}
|
||||
|
||||
// Write block to history file
|
||||
if (fNewBlock) *fNewBlock = true;
|
||||
|
@ -4291,7 +4303,9 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr<const CBlock>& blo
|
|||
ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked);
|
||||
}
|
||||
if (!ret) {
|
||||
GetMainSignals().BlockChecked(*block, state);
|
||||
if (m_options.signals) {
|
||||
m_options.signals->BlockChecked(*block, state);
|
||||
}
|
||||
return error("%s: AcceptBlock FAILED (%s)", __func__, state.ToString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
|
||||
#include <validationinterface.h>
|
||||
|
||||
#include <attributes.h>
|
||||
#include <chain.h>
|
||||
#include <consensus/validation.h>
|
||||
#include <kernel/chain.h>
|
||||
|
@ -13,7 +12,8 @@
|
|||
#include <logging.h>
|
||||
#include <primitives/block.h>
|
||||
#include <primitives/transaction.h>
|
||||
#include <scheduler.h>
|
||||
#include <util/check.h>
|
||||
#include <util/task_runner.h>
|
||||
|
||||
#include <future>
|
||||
#include <unordered_map>
|
||||
|
@ -22,14 +22,14 @@
|
|||
std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept;
|
||||
|
||||
/**
|
||||
* MainSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
|
||||
* ValidationSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
|
||||
*
|
||||
* A std::unordered_map is used to track what callbacks are currently
|
||||
* registered, and a std::list is used to store the callbacks that are
|
||||
* currently registered as well as any callbacks that are just unregistered
|
||||
* and about to be deleted when they are done executing.
|
||||
*/
|
||||
class MainSignalsImpl
|
||||
class ValidationSignalsImpl
|
||||
{
|
||||
private:
|
||||
Mutex m_mutex;
|
||||
|
@ -42,12 +42,10 @@ private:
|
|||
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);
|
||||
|
||||
public:
|
||||
// We are not allowed to assume the scheduler only runs in one thread,
|
||||
// but must ensure all callbacks happen in-order, so we end up creating
|
||||
// our own queue here :(
|
||||
SingleThreadedSchedulerClient m_schedulerClient;
|
||||
std::unique_ptr<util::TaskRunnerInterface> m_task_runner;
|
||||
|
||||
explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {}
|
||||
explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner)
|
||||
: m_task_runner{std::move(Assert(task_runner))} {}
|
||||
|
||||
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
|
@ -94,77 +92,56 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
static CMainSignals g_signals;
|
||||
ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner)
|
||||
: m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {}
|
||||
|
||||
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler)
|
||||
ValidationSignals::~ValidationSignals() {}
|
||||
|
||||
void ValidationSignals::FlushBackgroundCallbacks()
|
||||
{
|
||||
assert(!m_internals);
|
||||
m_internals = std::make_unique<MainSignalsImpl>(scheduler);
|
||||
m_internals->m_task_runner->flush();
|
||||
}
|
||||
|
||||
void CMainSignals::UnregisterBackgroundSignalScheduler()
|
||||
size_t ValidationSignals::CallbacksPending()
|
||||
{
|
||||
m_internals.reset(nullptr);
|
||||
return m_internals->m_task_runner->size();
|
||||
}
|
||||
|
||||
void CMainSignals::FlushBackgroundCallbacks()
|
||||
{
|
||||
if (m_internals) {
|
||||
m_internals->m_schedulerClient.EmptyQueue();
|
||||
}
|
||||
}
|
||||
|
||||
size_t CMainSignals::CallbacksPending()
|
||||
{
|
||||
if (!m_internals) return 0;
|
||||
return m_internals->m_schedulerClient.CallbacksPending();
|
||||
}
|
||||
|
||||
CMainSignals& GetMainSignals()
|
||||
{
|
||||
return g_signals;
|
||||
}
|
||||
|
||||
void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
||||
void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
||||
{
|
||||
// Each connection captures the shared_ptr to ensure that each callback is
|
||||
// executed before the subscriber is destroyed. For more details see #18338.
|
||||
g_signals.m_internals->Register(std::move(callbacks));
|
||||
m_internals->Register(std::move(callbacks));
|
||||
}
|
||||
|
||||
void RegisterValidationInterface(CValidationInterface* callbacks)
|
||||
void ValidationSignals::RegisterValidationInterface(CValidationInterface* callbacks)
|
||||
{
|
||||
// Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle
|
||||
// is managed by the caller.
|
||||
RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}});
|
||||
}
|
||||
|
||||
void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
||||
void ValidationSignals::UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
||||
{
|
||||
UnregisterValidationInterface(callbacks.get());
|
||||
}
|
||||
|
||||
void UnregisterValidationInterface(CValidationInterface* callbacks)
|
||||
void ValidationSignals::UnregisterValidationInterface(CValidationInterface* callbacks)
|
||||
{
|
||||
if (g_signals.m_internals) {
|
||||
g_signals.m_internals->Unregister(callbacks);
|
||||
}
|
||||
m_internals->Unregister(callbacks);
|
||||
}
|
||||
|
||||
void UnregisterAllValidationInterfaces()
|
||||
void ValidationSignals::UnregisterAllValidationInterfaces()
|
||||
{
|
||||
if (!g_signals.m_internals) {
|
||||
return;
|
||||
}
|
||||
g_signals.m_internals->Clear();
|
||||
m_internals->Clear();
|
||||
}
|
||||
|
||||
void CallFunctionInValidationInterfaceQueue(std::function<void()> func)
|
||||
void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
|
||||
{
|
||||
g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
|
||||
m_internals->m_task_runner->insert(std::move(func));
|
||||
}
|
||||
|
||||
void SyncWithValidationInterfaceQueue()
|
||||
void ValidationSignals::SyncWithValidationInterfaceQueue()
|
||||
{
|
||||
AssertLockNotHeld(cs_main);
|
||||
// Block until the validation queue drains
|
||||
|
@ -183,7 +160,7 @@ void SyncWithValidationInterfaceQueue()
|
|||
do { \
|
||||
auto local_name = (name); \
|
||||
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
|
||||
m_internals->m_schedulerClient.AddToProcessQueue([=] { \
|
||||
m_internals->m_task_runner->insert([=] { \
|
||||
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
|
||||
event(); \
|
||||
}); \
|
||||
|
@ -192,7 +169,7 @@ void SyncWithValidationInterfaceQueue()
|
|||
#define LOG_EVENT(fmt, ...) \
|
||||
LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__)
|
||||
|
||||
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
|
||||
void ValidationSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
|
||||
// Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which
|
||||
// the chain actually updates. One way to ensure this is for the caller to invoke this signal
|
||||
// in the same critical section where the chain is updated
|
||||
|
@ -206,7 +183,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
|
|||
fInitialDownload);
|
||||
}
|
||||
|
||||
void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
|
||||
void ValidationSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
|
||||
{
|
||||
auto event = [tx, mempool_sequence, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
|
||||
|
@ -216,7 +193,7 @@ void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx
|
|||
tx.info.m_tx->GetWitnessHash().ToString());
|
||||
}
|
||||
|
||||
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
|
||||
void ValidationSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
|
||||
auto event = [tx, reason, mempool_sequence, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
|
||||
};
|
||||
|
@ -226,7 +203,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemP
|
|||
RemovalReasonToString(reason));
|
||||
}
|
||||
|
||||
void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
|
||||
void ValidationSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
|
||||
auto event = [role, pblock, pindex, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(role, pblock, pindex); });
|
||||
};
|
||||
|
@ -235,7 +212,7 @@ void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<con
|
|||
pindex->nHeight);
|
||||
}
|
||||
|
||||
void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
|
||||
void ValidationSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
|
||||
{
|
||||
auto event = [txs_removed_for_block, nBlockHeight, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); });
|
||||
|
@ -245,7 +222,7 @@ void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedM
|
|||
txs_removed_for_block.size());
|
||||
}
|
||||
|
||||
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
|
||||
void ValidationSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
|
||||
{
|
||||
auto event = [pblock, pindex, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); });
|
||||
|
@ -255,7 +232,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock
|
|||
pindex->nHeight);
|
||||
}
|
||||
|
||||
void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
|
||||
void ValidationSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
|
||||
auto event = [role, locator, this] {
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(role, locator); });
|
||||
};
|
||||
|
@ -263,13 +240,13 @@ void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &l
|
|||
locator.IsNull() ? "null" : locator.vHave.front().ToString());
|
||||
}
|
||||
|
||||
void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
|
||||
void ValidationSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
|
||||
LOG_EVENT("%s: block hash=%s state=%s", __func__,
|
||||
block.GetHash().ToString(), state.ToString());
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); });
|
||||
}
|
||||
|
||||
void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
|
||||
void ValidationSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
|
||||
LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString());
|
||||
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); });
|
||||
}
|
||||
|
|
|
@ -6,61 +6,29 @@
|
|||
#ifndef BITCOIN_VALIDATIONINTERFACE_H
|
||||
#define BITCOIN_VALIDATIONINTERFACE_H
|
||||
|
||||
#include <kernel/cs_main.h>
|
||||
#include <kernel/chain.h>
|
||||
#include <kernel/cs_main.h>
|
||||
#include <primitives/transaction.h> // CTransaction(Ref)
|
||||
#include <sync.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
namespace util {
|
||||
class TaskRunnerInterface;
|
||||
} // namespace util
|
||||
|
||||
class BlockValidationState;
|
||||
class CBlock;
|
||||
class CBlockIndex;
|
||||
struct CBlockLocator;
|
||||
class CValidationInterface;
|
||||
class CScheduler;
|
||||
enum class MemPoolRemovalReason;
|
||||
struct RemovedMempoolTransactionInfo;
|
||||
struct NewMempoolTransactionInfo;
|
||||
|
||||
/** Register subscriber */
|
||||
void RegisterValidationInterface(CValidationInterface* callbacks);
|
||||
/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
|
||||
void UnregisterValidationInterface(CValidationInterface* callbacks);
|
||||
/** Unregister all subscribers */
|
||||
void UnregisterAllValidationInterfaces();
|
||||
|
||||
// Alternate registration functions that release a shared_ptr after the last
|
||||
// notification is sent. These are useful for race-free cleanup, since
|
||||
// unregistration is nonblocking and can return before the last notification is
|
||||
// processed.
|
||||
/** Register subscriber */
|
||||
void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
|
||||
/** Unregister subscriber */
|
||||
void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
|
||||
|
||||
/**
|
||||
* Pushes a function to callback onto the notification queue, guaranteeing any
|
||||
* callbacks generated prior to now are finished when the function is called.
|
||||
*
|
||||
* Be very careful blocking on func to be called if any locks are held -
|
||||
* validation interface clients may not be able to make progress as they often
|
||||
* wait for things like cs_main, so blocking until func is called with cs_main
|
||||
* will result in a deadlock (that DEBUG_LOCKORDER will miss).
|
||||
*/
|
||||
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
|
||||
/**
|
||||
* This is a synonym for the following, which asserts certain locks are not
|
||||
* held:
|
||||
* std::promise<void> promise;
|
||||
* CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
* promise.set_value();
|
||||
* });
|
||||
* promise.get_future().wait();
|
||||
*/
|
||||
void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
|
||||
|
||||
/**
|
||||
* Implement this to subscribe to events generated in validation and mempool
|
||||
*
|
||||
|
@ -185,30 +153,65 @@ protected:
|
|||
* has been received and connected to the headers tree, though not validated yet.
|
||||
*/
|
||||
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
|
||||
friend class CMainSignals;
|
||||
friend class ValidationSignals;
|
||||
friend class ValidationInterfaceTest;
|
||||
};
|
||||
|
||||
class MainSignalsImpl;
|
||||
class CMainSignals {
|
||||
class ValidationSignalsImpl;
|
||||
class ValidationSignals {
|
||||
private:
|
||||
std::unique_ptr<MainSignalsImpl> m_internals;
|
||||
|
||||
friend void ::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface>);
|
||||
friend void ::UnregisterValidationInterface(CValidationInterface*);
|
||||
friend void ::UnregisterAllValidationInterfaces();
|
||||
friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
|
||||
std::unique_ptr<ValidationSignalsImpl> m_internals;
|
||||
|
||||
public:
|
||||
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
|
||||
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
|
||||
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
|
||||
void UnregisterBackgroundSignalScheduler();
|
||||
// The task runner will block validation if it calls its insert method's
|
||||
// func argument synchronously. In this class func contains a loop that
|
||||
// dispatches a single validation event to all subscribers sequentially.
|
||||
explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner);
|
||||
|
||||
~ValidationSignals();
|
||||
|
||||
/** Call any remaining callbacks on the calling thread */
|
||||
void FlushBackgroundCallbacks();
|
||||
|
||||
size_t CallbacksPending();
|
||||
|
||||
/** Register subscriber */
|
||||
void RegisterValidationInterface(CValidationInterface* callbacks);
|
||||
/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
|
||||
void UnregisterValidationInterface(CValidationInterface* callbacks);
|
||||
/** Unregister all subscribers */
|
||||
void UnregisterAllValidationInterfaces();
|
||||
|
||||
// Alternate registration functions that release a shared_ptr after the last
|
||||
// notification is sent. These are useful for race-free cleanup, since
|
||||
// unregistration is nonblocking and can return before the last notification is
|
||||
// processed.
|
||||
/** Register subscriber */
|
||||
void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
|
||||
/** Unregister subscriber */
|
||||
void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
|
||||
|
||||
/**
|
||||
* Pushes a function to callback onto the notification queue, guaranteeing any
|
||||
* callbacks generated prior to now are finished when the function is called.
|
||||
*
|
||||
* Be very careful blocking on func to be called if any locks are held -
|
||||
* validation interface clients may not be able to make progress as they often
|
||||
* wait for things like cs_main, so blocking until func is called with cs_main
|
||||
* will result in a deadlock (that DEBUG_LOCKORDER will miss).
|
||||
*/
|
||||
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
|
||||
|
||||
/**
|
||||
* This is a synonym for the following, which asserts certain locks are not
|
||||
* held:
|
||||
* std::promise<void> promise;
|
||||
* CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
* promise.set_value();
|
||||
* });
|
||||
* promise.get_future().wait();
|
||||
*/
|
||||
void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
|
||||
|
||||
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
|
||||
void TransactionAddedToMempool(const NewMempoolTransactionInfo&, uint64_t mempool_sequence);
|
||||
|
@ -221,6 +224,4 @@ public:
|
|||
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
|
||||
};
|
||||
|
||||
CMainSignals& GetMainSignals();
|
||||
|
||||
#endif // BITCOIN_VALIDATIONINTERFACE_H
|
||||
|
|
|
@ -71,7 +71,8 @@ std::shared_ptr<CWallet> TestLoadWallet(WalletContext& context)
|
|||
|
||||
void TestUnloadWallet(std::shared_ptr<CWallet>&& wallet)
|
||||
{
|
||||
SyncWithValidationInterfaceQueue();
|
||||
// Calls SyncWithValidationInterfaceQueue
|
||||
wallet->chain().waitForNotificationsIfTipChanged({});
|
||||
wallet->m_chain_notifications_handler.reset();
|
||||
UnloadWallet(std::move(wallet));
|
||||
}
|
||||
|
|
|
@ -814,7 +814,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
|
|||
// transactionAddedToMempool notifications, and create block and mempool
|
||||
// transactions paying to the wallet
|
||||
std::promise<void> promise;
|
||||
CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
m_node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
|
||||
promise.get_future().wait();
|
||||
});
|
||||
std::string error;
|
||||
|
@ -842,7 +842,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
|
|||
// Unblock notification queue and make sure stale blockConnected and
|
||||
// transactionAddedToMempool events are processed
|
||||
promise.set_value();
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
// AddToWallet events for block_tx and mempool_tx events are counted a
|
||||
// second time as the notification queue is processed
|
||||
BOOST_CHECK_EQUAL(addtx_count, 5);
|
||||
|
@ -865,7 +865,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
|
|||
m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]);
|
||||
mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
|
||||
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error));
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
});
|
||||
wallet = TestLoadWallet(context);
|
||||
// Since mempool transactions are requested at the end of loading, there will
|
||||
|
@ -905,7 +905,7 @@ BOOST_FIXTURE_TEST_CASE(RemoveTxs, TestChain100Setup)
|
|||
auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
|
||||
CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey()));
|
||||
|
||||
SyncWithValidationInterfaceQueue();
|
||||
m_node.validation_signals->SyncWithValidationInterfaceQueue();
|
||||
|
||||
{
|
||||
auto block_hash = block_tx.GetHash();
|
||||
|
|
Loading…
Add table
Reference in a new issue