From 395bcd245476d7c83abf2d82e31e90b6d0c432f3 Mon Sep 17 00:00:00 2001 From: Ava Chow Date: Mon, 18 Dec 2023 17:06:04 -0500 Subject: [PATCH 1/3] sqlite: Ensure that only one SQLiteBatch is writing to db at a time A SQLiteBatch need to wait for any other batch to finish writing before it can begin writing, otherwise db txn state may be incorrectly modified. To enforce this, each SQLiteDatabase has a semaphore which acts as a lock and is acquired by a batch when it begins a write, erase, or a transaction, and is released by it when it is done. To avoid deadlocking on itself for writing during a transaction, SQLiteBatch also keeps track of whether it has begun a transaction. --- src/wallet/sqlite.cpp | 37 ++++++++++++++++++++++++++++++++----- src/wallet/sqlite.h | 16 ++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/src/wallet/sqlite.cpp b/src/wallet/sqlite.cpp index cff3628049..2cbeedbde6 100644 --- a/src/wallet/sqlite.cpp +++ b/src/wallet/sqlite.cpp @@ -110,7 +110,7 @@ Mutex SQLiteDatabase::g_sqlite_mutex; int SQLiteDatabase::g_sqlite_count = 0; SQLiteDatabase::SQLiteDatabase(const fs::path& dir_path, const fs::path& file_path, const DatabaseOptions& options, bool mock) - : WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_use_unsafe_sync(options.use_unsafe_sync) + : WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_write_semaphore(1), m_use_unsafe_sync(options.use_unsafe_sync) { { LOCK(g_sqlite_mutex); @@ -408,7 +408,7 @@ void SQLiteBatch::Close() bool force_conn_refresh = false; // If we began a transaction, and it wasn't committed, abort the transaction in progress - if (m_database.HasActiveTxn()) { + if (m_txn) { if (TxnAbort()) { LogPrintf("SQLiteBatch: Batch closed unexpectedly without the transaction being explicitly committed or aborted\n"); } else { @@ -442,6 +442,8 @@ void SQLiteBatch::Close() m_database.Close(); try { m_database.Open(); + // If TxnAbort failed and we refreshed the connection, the semaphore was not released, so release it here to avoid deadlocks on future writes. + m_database.m_write_semaphore.post(); } catch (const std::runtime_error&) { // If open fails, cleanup this object and rethrow the exception m_database.Close(); @@ -493,6 +495,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite) if (!BindBlobToStatement(stmt, 1, key, "key")) return false; if (!BindBlobToStatement(stmt, 2, value, "value")) return false; + // Acquire semaphore if not previously acquired when creating a transaction. + if (!m_txn) m_database.m_write_semaphore.wait(); + // Execute int res = sqlite3_step(stmt); sqlite3_clear_bindings(stmt); @@ -500,6 +505,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite) if (res != SQLITE_DONE) { LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res)); } + + if (!m_txn) m_database.m_write_semaphore.post(); + return res == SQLITE_DONE; } @@ -511,6 +519,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span blob) // Bind: leftmost parameter in statement is index 1 if (!BindBlobToStatement(stmt, 1, blob, "key")) return false; + // Acquire semaphore if not previously acquired when creating a transaction. + if (!m_txn) m_database.m_write_semaphore.wait(); + // Execute int res = sqlite3_step(stmt); sqlite3_clear_bindings(stmt); @@ -518,6 +529,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span blob) if (res != SQLITE_DONE) { LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res)); } + + if (!m_txn) m_database.m_write_semaphore.post(); + return res == SQLITE_DONE; } @@ -634,30 +648,43 @@ std::unique_ptr SQLiteBatch::GetNewPrefixCursor(SpanExec(m_database, "BEGIN TRANSACTION"); if (res != SQLITE_OK) { LogPrintf("SQLiteBatch: Failed to begin the transaction\n"); + m_database.m_write_semaphore.post(); + } else { + m_txn = true; } return res == SQLITE_OK; } bool SQLiteBatch::TxnCommit() { - if (!m_database.HasActiveTxn()) return false; + if (!m_database.m_db || !m_txn) return false; + Assert(m_database.HasActiveTxn()); int res = Assert(m_exec_handler)->Exec(m_database, "COMMIT TRANSACTION"); if (res != SQLITE_OK) { LogPrintf("SQLiteBatch: Failed to commit the transaction\n"); + } else { + m_txn = false; + m_database.m_write_semaphore.post(); } return res == SQLITE_OK; } bool SQLiteBatch::TxnAbort() { - if (!m_database.HasActiveTxn()) return false; + if (!m_database.m_db || !m_txn) return false; + Assert(m_database.HasActiveTxn()); int res = Assert(m_exec_handler)->Exec(m_database, "ROLLBACK TRANSACTION"); if (res != SQLITE_OK) { LogPrintf("SQLiteBatch: Failed to abort the transaction\n"); + } else { + m_txn = false; + m_database.m_write_semaphore.post(); } return res == SQLITE_OK; } diff --git a/src/wallet/sqlite.h b/src/wallet/sqlite.h index ad91be1064..0a3243fe19 100644 --- a/src/wallet/sqlite.h +++ b/src/wallet/sqlite.h @@ -58,6 +58,18 @@ private: sqlite3_stmt* m_delete_stmt{nullptr}; sqlite3_stmt* m_delete_prefix_stmt{nullptr}; + /** Whether this batch has started a database transaction and whether it owns SQLiteDatabase::m_write_semaphore. + * If the batch starts a db tx, it acquires the semaphore and sets this to true, keeping the semaphore + * until the transaction ends to prevent other batch objects from writing to the database. + * + * If this batch did not start a transaction, the semaphore is acquired transiently when writing and m_txn + * is not set. + * + * m_txn is different from HasActiveTxn() as it is only true when this batch has started the transaction, + * not just when any batch has started a transaction. + */ + bool m_txn{false}; + void SetupSQLStatements(); bool ExecStatement(sqlite3_stmt* stmt, Span blob); @@ -115,6 +127,10 @@ public: ~SQLiteDatabase(); + // Batches must acquire this semaphore on writing, and release when done writing. + // This ensures that only one batch is modifying the database at a time. + CSemaphore m_write_semaphore; + bool Verify(bilingual_str& error); /** Open the database if it is not already opened */ From 548ecd11553bea28bc79e6f9840836283e9c4e99 Mon Sep 17 00:00:00 2001 From: Ava Chow Date: Mon, 18 Dec 2023 16:06:36 -0500 Subject: [PATCH 2/3] tests: Test for concurrent writes with db tx There are occasions where a multi-statement tx is begun in one batch, and a second batch is created which does a normal write (without a multi-statement tx). These should not conflict with each other and all of the data should end up being written to disk. --- test/functional/wallet_descriptor.py | 40 ++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/test/functional/wallet_descriptor.py b/test/functional/wallet_descriptor.py index e9321b72e2..cbd3898f92 100755 --- a/test/functional/wallet_descriptor.py +++ b/test/functional/wallet_descriptor.py @@ -9,7 +9,10 @@ try: except ImportError: pass +import concurrent.futures + from test_framework.blocktools import COINBASE_MATURITY +from test_framework.descriptors import descsum_create from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, @@ -33,6 +36,41 @@ class WalletDescriptorTest(BitcoinTestFramework): self.skip_if_no_sqlite() self.skip_if_no_py_sqlite3() + def test_concurrent_writes(self): + self.log.info("Test sqlite concurrent writes are in the correct order") + self.restart_node(0, extra_args=["-unsafesqlitesync=0"]) + self.nodes[0].createwallet(wallet_name="concurrency", blank=True) + wallet = self.nodes[0].get_wallet_rpc("concurrency") + # First import a descriptor that uses hardened dervation so that topping up + # Will require writing a ton to db + wallet.importdescriptors([{"desc":descsum_create("wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h)"), "timestamp": "now", "active": True}]) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread: + topup = thread.submit(wallet.keypoolrefill, newsize=1000) + + # Then while the topup is running, we need to do something that will call + # ChainStateFlushed which will trigger a write to the db, hopefully at the + # same time that the topup still has an open db transaction. + self.nodes[0].cli.gettxoutsetinfo() + assert_equal(topup.result(), None) + + wallet.unloadwallet() + + # Check that everything was written + wallet_db = self.nodes[0].wallets_path / "concurrency" / self.wallet_data_filename + conn = sqlite3.connect(wallet_db) + with conn: + # Retrieve the bestblock_nomerkle record + bestblock_rec = conn.execute("SELECT value FROM main WHERE hex(key) = '1262657374626C6F636B5F6E6F6D65726B6C65'").fetchone()[0] + # Retrieve the number of descriptor cache records + # Since we store binary data, sqlite's comparison operators don't work everywhere + # so just retrieve all records and process them ourselves. + db_keys = conn.execute("SELECT key FROM main").fetchall() + cache_records = len([k[0] for k in db_keys if b"walletdescriptorcache" in k[0]]) + conn.close() + + assert_equal(bestblock_rec[5:37][::-1].hex(), self.nodes[0].getbestblockhash()) + assert_equal(cache_records, 1000) + def run_test(self): if self.is_bdb_compiled(): # Make a legacy wallet and check it is BDB @@ -240,6 +278,8 @@ class WalletDescriptorTest(BitcoinTestFramework): conn.close() assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme") + self.test_concurrent_writes() + if __name__ == '__main__': WalletDescriptorTest().main () From cfcb9b1ecf9be5267487713fa1e112ca09a1ae55 Mon Sep 17 00:00:00 2001 From: furszy Date: Sat, 23 Dec 2023 12:49:41 -0300 Subject: [PATCH 3/3] test: wallet, coverage for concurrent db transactions Verifying that a database handler can't commit/abort changes occurring in a different database handler. --- src/wallet/test/db_tests.cpp | 42 +++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/src/wallet/test/db_tests.cpp b/src/wallet/test/db_tests.cpp index 7e6219378f..adbbb94318 100644 --- a/src/wallet/test/db_tests.cpp +++ b/src/wallet/test/db_tests.cpp @@ -279,8 +279,48 @@ BOOST_AUTO_TEST_CASE(txn_close_failure_dangling_txn) BOOST_CHECK(!batch2->Exists(key)); } -#endif // USE_SQLITE +BOOST_AUTO_TEST_CASE(concurrent_txn_dont_interfere) +{ + std::string key = "key"; + std::string value = "value"; + std::string value2 = "value_2"; + DatabaseOptions options; + DatabaseStatus status; + bilingual_str error; + const auto& database = MakeSQLiteDatabase(m_path_root / "sqlite", options, status, error); + + std::unique_ptr handler = Assert(database)->MakeBatch(); + + // Verify concurrent db transactions does not interfere between each other. + // Start db txn, write key and check the key does exist within the db txn. + BOOST_CHECK(handler->TxnBegin()); + BOOST_CHECK(handler->Write(key, value)); + BOOST_CHECK(handler->Exists(key)); + + // But, the same key, does not exist in another handler + std::unique_ptr handler2 = Assert(database)->MakeBatch(); + BOOST_CHECK(handler2->Exists(key)); + + // Attempt to commit the handler txn calling the handler2 methods. + // Which, must not be possible. + BOOST_CHECK(!handler2->TxnCommit()); + BOOST_CHECK(!handler2->TxnAbort()); + + // Only the first handler can commit the changes. + BOOST_CHECK(handler->TxnCommit()); + // And, once commit is completed, handler2 can read the record + std::string read_value; + BOOST_CHECK(handler2->Read(key, read_value)); + BOOST_CHECK_EQUAL(read_value, value); + + // Also, once txn is committed, single write statements are re-enabled. + // Which means that handler2 can read the record changes directly. + BOOST_CHECK(handler->Write(key, value2, /*fOverwrite=*/true)); + BOOST_CHECK(handler2->Read(key, read_value)); + BOOST_CHECK_EQUAL(read_value, value2); +} +#endif // USE_SQLITE BOOST_AUTO_TEST_SUITE_END() } // namespace wallet