mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-02 09:46:52 -05:00
Merge bitcoin/bitcoin#29112: sqlite: Disallow writing from multiple SQLiteBatch
s
cfcb9b1ecf
test: wallet, coverage for concurrent db transactions (furszy)548ecd1155
tests: Test for concurrent writes with db tx (Ava Chow)395bcd2454
sqlite: Ensure that only one SQLiteBatch is writing to db at a time (Ava Chow) Pull request description: The way that we have configured SQLite to run means that only one database transaction can be open at a time. Typically, each individual read and write operation will be its own transaction that is opened and committed automatically by SQLite. However, sometimes we want these operations to be batched into a multi-statement transaction, so `SQLiteBatch::TxnBegin`, `SQLiteBatch::TxnCommit`, and `SQLiteBatch::TxnAbort` are used to manage the transaction of the database. However, once a db transaction is begun with one `SQLiteBatch`, any operations performed by another `SQLiteBatch` will also occur within the same transaction. Furthermore, those other `SQLiteBatch`s will not be expecting a transaction to be active, and will abort it once the `SQLiteBatch` is destructed. This is problematic as it will prevent some data from being written, and also cause the `SQLiteBatch` that opened the transaction in the first place to be in an unexpected state and throw an error. To avoid this situation, we need to prevent the multiple batches from writing at the same time. To do so, I've implemented added a `CSemaphore` within `SQLiteDatabase` which will be used by any `SQLiteBatch` trying to do a write operation. `wait()` is called by `TxnBegin`, and at the beginning of `WriteKey`, `EraseKey`, and `ErasePrefix`. `post()` is called in `TxnCommit`, `TxnAbort` and at the end of `WriteKey`, `EraseKey`, and `ErasePrefix`. To avoid deadlocking on ` TxnBegin()` followed by a `WriteKey()`, `SQLiteBatch will now also track whether a transaction is in progress so that it knows whether to use the semaphore. This issue is not a problem for BDB wallets since BDB uses WAL and provides transaction objects that must be used if an operation is to occur within a transaction. Specifically, we either pass a transaction pointer, or a nullptr, to all BDB operations, and this allows for concurrent transactions so it doesn't have this problem. Fixes #29110 ACKs for top commit: josibake: ACKcfcb9b1ecf
furszy: ACKcfcb9b1ecf
ryanofsky: Code review ACKcfcb9b1ecf
. This looks great and I think it is ready for merge. Just holding off because josibake seemed ready to review https://github.com/bitcoin/bitcoin/pull/29112#issuecomment-1930372190 and might have more feedback. Tree-SHA512: 2dd5a8e76df52451a40e0b8a87c7139d68a0d8e1bf2ebc79168cc313e192dab87cfa4270ff17fea4f7b370060d3bc9b5d294d50f7e07994d9b5a69b40397c927
This commit is contained in:
commit
801ef07ebd
4 changed files with 129 additions and 6 deletions
|
@ -110,7 +110,7 @@ Mutex SQLiteDatabase::g_sqlite_mutex;
|
|||
int SQLiteDatabase::g_sqlite_count = 0;
|
||||
|
||||
SQLiteDatabase::SQLiteDatabase(const fs::path& dir_path, const fs::path& file_path, const DatabaseOptions& options, bool mock)
|
||||
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_use_unsafe_sync(options.use_unsafe_sync)
|
||||
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_write_semaphore(1), m_use_unsafe_sync(options.use_unsafe_sync)
|
||||
{
|
||||
{
|
||||
LOCK(g_sqlite_mutex);
|
||||
|
@ -408,7 +408,7 @@ void SQLiteBatch::Close()
|
|||
bool force_conn_refresh = false;
|
||||
|
||||
// If we began a transaction, and it wasn't committed, abort the transaction in progress
|
||||
if (m_database.HasActiveTxn()) {
|
||||
if (m_txn) {
|
||||
if (TxnAbort()) {
|
||||
LogPrintf("SQLiteBatch: Batch closed unexpectedly without the transaction being explicitly committed or aborted\n");
|
||||
} else {
|
||||
|
@ -442,6 +442,8 @@ void SQLiteBatch::Close()
|
|||
m_database.Close();
|
||||
try {
|
||||
m_database.Open();
|
||||
// If TxnAbort failed and we refreshed the connection, the semaphore was not released, so release it here to avoid deadlocks on future writes.
|
||||
m_database.m_write_semaphore.post();
|
||||
} catch (const std::runtime_error&) {
|
||||
// If open fails, cleanup this object and rethrow the exception
|
||||
m_database.Close();
|
||||
|
@ -493,6 +495,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
|
|||
if (!BindBlobToStatement(stmt, 1, key, "key")) return false;
|
||||
if (!BindBlobToStatement(stmt, 2, value, "value")) return false;
|
||||
|
||||
// Acquire semaphore if not previously acquired when creating a transaction.
|
||||
if (!m_txn) m_database.m_write_semaphore.wait();
|
||||
|
||||
// Execute
|
||||
int res = sqlite3_step(stmt);
|
||||
sqlite3_clear_bindings(stmt);
|
||||
|
@ -500,6 +505,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
|
|||
if (res != SQLITE_DONE) {
|
||||
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
|
||||
}
|
||||
|
||||
if (!m_txn) m_database.m_write_semaphore.post();
|
||||
|
||||
return res == SQLITE_DONE;
|
||||
}
|
||||
|
||||
|
@ -511,6 +519,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
|
|||
// Bind: leftmost parameter in statement is index 1
|
||||
if (!BindBlobToStatement(stmt, 1, blob, "key")) return false;
|
||||
|
||||
// Acquire semaphore if not previously acquired when creating a transaction.
|
||||
if (!m_txn) m_database.m_write_semaphore.wait();
|
||||
|
||||
// Execute
|
||||
int res = sqlite3_step(stmt);
|
||||
sqlite3_clear_bindings(stmt);
|
||||
|
@ -518,6 +529,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
|
|||
if (res != SQLITE_DONE) {
|
||||
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
|
||||
}
|
||||
|
||||
if (!m_txn) m_database.m_write_semaphore.post();
|
||||
|
||||
return res == SQLITE_DONE;
|
||||
}
|
||||
|
||||
|
@ -634,30 +648,43 @@ std::unique_ptr<DatabaseCursor> SQLiteBatch::GetNewPrefixCursor(Span<const std::
|
|||
|
||||
bool SQLiteBatch::TxnBegin()
|
||||
{
|
||||
if (!m_database.m_db || m_database.HasActiveTxn()) return false;
|
||||
if (!m_database.m_db || m_txn) return false;
|
||||
m_database.m_write_semaphore.wait();
|
||||
Assert(!m_database.HasActiveTxn());
|
||||
int res = Assert(m_exec_handler)->Exec(m_database, "BEGIN TRANSACTION");
|
||||
if (res != SQLITE_OK) {
|
||||
LogPrintf("SQLiteBatch: Failed to begin the transaction\n");
|
||||
m_database.m_write_semaphore.post();
|
||||
} else {
|
||||
m_txn = true;
|
||||
}
|
||||
return res == SQLITE_OK;
|
||||
}
|
||||
|
||||
bool SQLiteBatch::TxnCommit()
|
||||
{
|
||||
if (!m_database.HasActiveTxn()) return false;
|
||||
if (!m_database.m_db || !m_txn) return false;
|
||||
Assert(m_database.HasActiveTxn());
|
||||
int res = Assert(m_exec_handler)->Exec(m_database, "COMMIT TRANSACTION");
|
||||
if (res != SQLITE_OK) {
|
||||
LogPrintf("SQLiteBatch: Failed to commit the transaction\n");
|
||||
} else {
|
||||
m_txn = false;
|
||||
m_database.m_write_semaphore.post();
|
||||
}
|
||||
return res == SQLITE_OK;
|
||||
}
|
||||
|
||||
bool SQLiteBatch::TxnAbort()
|
||||
{
|
||||
if (!m_database.HasActiveTxn()) return false;
|
||||
if (!m_database.m_db || !m_txn) return false;
|
||||
Assert(m_database.HasActiveTxn());
|
||||
int res = Assert(m_exec_handler)->Exec(m_database, "ROLLBACK TRANSACTION");
|
||||
if (res != SQLITE_OK) {
|
||||
LogPrintf("SQLiteBatch: Failed to abort the transaction\n");
|
||||
} else {
|
||||
m_txn = false;
|
||||
m_database.m_write_semaphore.post();
|
||||
}
|
||||
return res == SQLITE_OK;
|
||||
}
|
||||
|
|
|
@ -58,6 +58,18 @@ private:
|
|||
sqlite3_stmt* m_delete_stmt{nullptr};
|
||||
sqlite3_stmt* m_delete_prefix_stmt{nullptr};
|
||||
|
||||
/** Whether this batch has started a database transaction and whether it owns SQLiteDatabase::m_write_semaphore.
|
||||
* If the batch starts a db tx, it acquires the semaphore and sets this to true, keeping the semaphore
|
||||
* until the transaction ends to prevent other batch objects from writing to the database.
|
||||
*
|
||||
* If this batch did not start a transaction, the semaphore is acquired transiently when writing and m_txn
|
||||
* is not set.
|
||||
*
|
||||
* m_txn is different from HasActiveTxn() as it is only true when this batch has started the transaction,
|
||||
* not just when any batch has started a transaction.
|
||||
*/
|
||||
bool m_txn{false};
|
||||
|
||||
void SetupSQLStatements();
|
||||
bool ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob);
|
||||
|
||||
|
@ -115,6 +127,10 @@ public:
|
|||
|
||||
~SQLiteDatabase();
|
||||
|
||||
// Batches must acquire this semaphore on writing, and release when done writing.
|
||||
// This ensures that only one batch is modifying the database at a time.
|
||||
CSemaphore m_write_semaphore;
|
||||
|
||||
bool Verify(bilingual_str& error);
|
||||
|
||||
/** Open the database if it is not already opened */
|
||||
|
|
|
@ -279,8 +279,48 @@ BOOST_AUTO_TEST_CASE(txn_close_failure_dangling_txn)
|
|||
BOOST_CHECK(!batch2->Exists(key));
|
||||
}
|
||||
|
||||
#endif // USE_SQLITE
|
||||
BOOST_AUTO_TEST_CASE(concurrent_txn_dont_interfere)
|
||||
{
|
||||
std::string key = "key";
|
||||
std::string value = "value";
|
||||
std::string value2 = "value_2";
|
||||
|
||||
DatabaseOptions options;
|
||||
DatabaseStatus status;
|
||||
bilingual_str error;
|
||||
const auto& database = MakeSQLiteDatabase(m_path_root / "sqlite", options, status, error);
|
||||
|
||||
std::unique_ptr<DatabaseBatch> handler = Assert(database)->MakeBatch();
|
||||
|
||||
// Verify concurrent db transactions does not interfere between each other.
|
||||
// Start db txn, write key and check the key does exist within the db txn.
|
||||
BOOST_CHECK(handler->TxnBegin());
|
||||
BOOST_CHECK(handler->Write(key, value));
|
||||
BOOST_CHECK(handler->Exists(key));
|
||||
|
||||
// But, the same key, does not exist in another handler
|
||||
std::unique_ptr<DatabaseBatch> handler2 = Assert(database)->MakeBatch();
|
||||
BOOST_CHECK(handler2->Exists(key));
|
||||
|
||||
// Attempt to commit the handler txn calling the handler2 methods.
|
||||
// Which, must not be possible.
|
||||
BOOST_CHECK(!handler2->TxnCommit());
|
||||
BOOST_CHECK(!handler2->TxnAbort());
|
||||
|
||||
// Only the first handler can commit the changes.
|
||||
BOOST_CHECK(handler->TxnCommit());
|
||||
// And, once commit is completed, handler2 can read the record
|
||||
std::string read_value;
|
||||
BOOST_CHECK(handler2->Read(key, read_value));
|
||||
BOOST_CHECK_EQUAL(read_value, value);
|
||||
|
||||
// Also, once txn is committed, single write statements are re-enabled.
|
||||
// Which means that handler2 can read the record changes directly.
|
||||
BOOST_CHECK(handler->Write(key, value2, /*fOverwrite=*/true));
|
||||
BOOST_CHECK(handler2->Read(key, read_value));
|
||||
BOOST_CHECK_EQUAL(read_value, value2);
|
||||
}
|
||||
#endif // USE_SQLITE
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
} // namespace wallet
|
||||
|
|
|
@ -9,7 +9,10 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
import concurrent.futures
|
||||
|
||||
from test_framework.blocktools import COINBASE_MATURITY
|
||||
from test_framework.descriptors import descsum_create
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import (
|
||||
assert_equal,
|
||||
|
@ -33,6 +36,41 @@ class WalletDescriptorTest(BitcoinTestFramework):
|
|||
self.skip_if_no_sqlite()
|
||||
self.skip_if_no_py_sqlite3()
|
||||
|
||||
def test_concurrent_writes(self):
|
||||
self.log.info("Test sqlite concurrent writes are in the correct order")
|
||||
self.restart_node(0, extra_args=["-unsafesqlitesync=0"])
|
||||
self.nodes[0].createwallet(wallet_name="concurrency", blank=True)
|
||||
wallet = self.nodes[0].get_wallet_rpc("concurrency")
|
||||
# First import a descriptor that uses hardened dervation so that topping up
|
||||
# Will require writing a ton to db
|
||||
wallet.importdescriptors([{"desc":descsum_create("wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h)"), "timestamp": "now", "active": True}])
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread:
|
||||
topup = thread.submit(wallet.keypoolrefill, newsize=1000)
|
||||
|
||||
# Then while the topup is running, we need to do something that will call
|
||||
# ChainStateFlushed which will trigger a write to the db, hopefully at the
|
||||
# same time that the topup still has an open db transaction.
|
||||
self.nodes[0].cli.gettxoutsetinfo()
|
||||
assert_equal(topup.result(), None)
|
||||
|
||||
wallet.unloadwallet()
|
||||
|
||||
# Check that everything was written
|
||||
wallet_db = self.nodes[0].wallets_path / "concurrency" / self.wallet_data_filename
|
||||
conn = sqlite3.connect(wallet_db)
|
||||
with conn:
|
||||
# Retrieve the bestblock_nomerkle record
|
||||
bestblock_rec = conn.execute("SELECT value FROM main WHERE hex(key) = '1262657374626C6F636B5F6E6F6D65726B6C65'").fetchone()[0]
|
||||
# Retrieve the number of descriptor cache records
|
||||
# Since we store binary data, sqlite's comparison operators don't work everywhere
|
||||
# so just retrieve all records and process them ourselves.
|
||||
db_keys = conn.execute("SELECT key FROM main").fetchall()
|
||||
cache_records = len([k[0] for k in db_keys if b"walletdescriptorcache" in k[0]])
|
||||
conn.close()
|
||||
|
||||
assert_equal(bestblock_rec[5:37][::-1].hex(), self.nodes[0].getbestblockhash())
|
||||
assert_equal(cache_records, 1000)
|
||||
|
||||
def run_test(self):
|
||||
if self.is_bdb_compiled():
|
||||
# Make a legacy wallet and check it is BDB
|
||||
|
@ -240,6 +278,8 @@ class WalletDescriptorTest(BitcoinTestFramework):
|
|||
conn.close()
|
||||
assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme")
|
||||
|
||||
self.test_concurrent_writes()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
WalletDescriptorTest().main ()
|
||||
|
|
Loading…
Add table
Reference in a new issue