0
0
Fork 0
mirror of https://github.com/bitcoin/bitcoin.git synced 2025-02-10 10:52:31 -05:00

Merge #12366: http: Join worker threads before deleting work queue

11e0151 http: Remove numThreads and ThreadCounter (Wladimir J. van der Laan)
f946654 http: Remove WaitExit from WorkQueue (Wladimir J. van der Laan)
b1c2370 http: Join worker threads before deleting work queue (Wladimir J. van der Laan)

Pull request description:

  This prevents a potential race condition if control flow ends up in
  `ShutdownHTTPServer` before the thread gets to `queue->Run()`,
  deleting the work queue while workers are still going to use it.

  Meant to fix #12362.

Tree-SHA512: 8108514aeee5b2067a3736ed028014b580d1cbf8530ac7682b8a23070133dfa1ca21db4358c9158ea57e8811e0551395b6cb769887876b9cfce067ee968d0642
This commit is contained in:
Wladimir J. van der Laan 2018-02-08 08:53:40 +01:00
commit 6db4fa7ad3
No known key found for this signature in database
GPG key ID: 1E4AED62986CD25D

View file

@ -73,34 +73,13 @@ private:
std::deque<std::unique_ptr<WorkItem>> queue; std::deque<std::unique_ptr<WorkItem>> queue;
bool running; bool running;
size_t maxDepth; size_t maxDepth;
int numThreads;
/** RAII object to keep track of number of running worker threads */
class ThreadCounter
{
public:
WorkQueue &wq;
explicit ThreadCounter(WorkQueue &w): wq(w)
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads += 1;
}
~ThreadCounter()
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads -= 1;
wq.cond.notify_all();
}
};
public: public:
explicit WorkQueue(size_t _maxDepth) : running(true), explicit WorkQueue(size_t _maxDepth) : running(true),
maxDepth(_maxDepth), maxDepth(_maxDepth)
numThreads(0)
{ {
} }
/** Precondition: worker threads have all stopped /** Precondition: worker threads have all stopped (they have been joined).
* (call WaitExit)
*/ */
~WorkQueue() ~WorkQueue()
{ {
@ -119,7 +98,6 @@ public:
/** Thread function */ /** Thread function */
void Run() void Run()
{ {
ThreadCounter count(*this);
while (true) { while (true) {
std::unique_ptr<WorkItem> i; std::unique_ptr<WorkItem> i;
{ {
@ -141,13 +119,6 @@ public:
running = false; running = false;
cond.notify_all(); cond.notify_all();
} }
/** Wait for worker threads to exit */
void WaitExit()
{
std::unique_lock<std::mutex> lock(cs);
while (numThreads > 0)
cond.wait(lock);
}
}; };
struct HTTPPathHandler struct HTTPPathHandler
@ -449,6 +420,7 @@ bool UpdateHTTPServerLogging(bool enable) {
std::thread threadHTTP; std::thread threadHTTP;
std::future<bool> threadResult; std::future<bool> threadResult;
static std::vector<std::thread> g_thread_http_workers;
bool StartHTTPServer() bool StartHTTPServer()
{ {
@ -460,8 +432,7 @@ bool StartHTTPServer()
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP); threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
for (int i = 0; i < rpcThreads; i++) { for (int i = 0; i < rpcThreads; i++) {
std::thread rpc_worker(HTTPWorkQueueRun, workQueue); g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
} }
return true; return true;
} }
@ -486,7 +457,10 @@ void StopHTTPServer()
LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (workQueue) { if (workQueue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
workQueue->WaitExit(); for (auto& thread: g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
delete workQueue; delete workQueue;
workQueue = nullptr; workQueue = nullptr;
} }